目录
MapReduceShuffle 机制
Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

Partition分区
1、问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)。
2、默认Partitioner分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
3、自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 控制分区代码逻辑
……
return partition;
}
}
(2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
4、分区总结
- (1)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
- (2)如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
- (3)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
- (4)分区号必须从零开始,逐一累加。
5、案例分析
例如:假设自定义分区数为5,则
- (1)job.setNumReduceTasks(1); —— 会正常运行,只不过会产生一个输出文件
- (2)job.setNumReduceTasks(2); —— 会报错
- (3)job.setNumReduceTasks(6); —— 大于5,程序会正常运行,会产生空文件
Partition 分区案例实操
1)需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
phone_data .txt
点击下载测试数据(2)期望输出数据
手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到一个文件中。
2)需求分析

3)在之前案例的基础上,增加一个分区类
package com.cosyblogs.mr.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
4)在驱动函数中增加自定义数据分区设置和ReduceTask设置
package com.cosyblogs.mr.partitioner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"e:/output1","e:/output2"};
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 8 指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
WritableComparable 排序
排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数 据按 照key进 行排 序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
自定义排序 WritableComparable 原理分析
bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。
@Override
public int compare(WritableComparable a, WritableComparable b) {
FlowBean abean = (FlowBean) a;
FlowBean bbean = (FlowBean) b;
return abean.getSumFlow().compareTo(bbean.getSumFlow());
}
WritableComparable 排序案例实操(全排序)
1)需求
根据前面案例序列化案例产生的结果再次对总流量进行倒序排序。

2)需求分析

3)代码实现
(1)FlowBean 对象在在需求 1 基础上增加了比较功能
package com.cosyblogs.mr.writablecomparable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 流量对象(实现Hadoop的序列化)
*
*/
public class FlowBean implements WritableComparable<FlowBean> {
private Integer upFlow;
private Integer downFlow;
private Integer sumFlow;
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getSumFlow() {
return sumFlow;
}
public void setSumFlow(Integer sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" +sumFlow;
}
/**
* 序列化方法
* @param out
* @throws IOException
*/
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
/**
* 反序列化方法
* @param in
* @throws IOException
*/
public void readFields(DataInput in) throws IOException {
upFlow = in.readInt();
downFlow = in.readInt();
sumFlow = in.readInt();
}
/**
* 计算上下行流量的总和
*/
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
/**
* 自定义排序规则
* 需求:根总流量倒序
* @param o
* @return
*/
public int compareTo(FlowBean o) {
return -this.getSumFlow().compareTo(o.getSumFlow());
}
}
(2)编写 Mapper 类
package com.cosyblogs.mr.writablecomparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private Text outv = new Text();
private FlowBean outk = new FlowBean();
/**
* 核心业务逻辑处理
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取当前行数据
String line = value.toString();
// 切割数据 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String[] phoneDatas = line.split("\t");
// 获取输出数据的key(手机号)
outv.set(phoneDatas[1]);
// 获取输出数据的value
outk.setUpFlow(Integer.parseInt(phoneDatas[phoneDatas.length-3]));
outk.setDownFlow(Integer.parseInt(phoneDatas[phoneDatas.length-2]));
outk.setSumFlow();
// 将数据输出
context.write(outk, outv);
}
}
(3)编写 Reducer 类
package com.cosyblogs.mr.writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
private FlowBean outv = new FlowBean();
/**
* 核心业务逻辑处理
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 遍历输出
for (Text value : values) {
context.write(value,key);
}
}
}
(4)编写 Driver 类
package com.atguigu.mr.writablecomparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"E:/input","E:/output"};
// 1 获取配置信息,或者 job 对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的 jar 包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);
// 3 指定本业务 job 要使用的 mapper/Reducer 业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
// 4 指定 mapper 输出数据的 kv 类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最终输出的数据的 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 指定 job 的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包, 提交给 yarn 去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
WritableComparable 排序案例实操(区内排序)
1)需求
要求每个省份手机号输出的文件中按照总流量内部排序。
2)需求分析
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

3)案例实操
(1)增加自定义分区类
package com.cosyblogs.mr.writablecomparable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定义一个分区器对象,需要继承Hadoop的提供的Partitioner对象
*/
public class PhonePartitioner extends Partitioner<FlowBean,Text> {
/**
* 定义当前kv所属分区的规则
* @param text
* @param flowBean
* @param numPartitions
* @return
* 136 --> 0
* 137 --> 1
* 138 --> 2
* 139 --> 3
* 其他 --> 4
*/
public int getPartition(FlowBean flowBean,Text text,int numPartitions) {
int phonePartitions;
// 获取手机号
String phoneNum = text.toString();
if(phoneNum.startsWith("136")){
phonePartitions = 0;
}else if(phoneNum.startsWith("137")){
phonePartitions = 1;
}else if(phoneNum.startsWith("138")){
phonePartitions = 2;
}else if(phoneNum.startsWith("139")){
phonePartitions = 3;
}else {
phonePartitions =4;
}
return phonePartitions;
}
}
(2)在驱动类中添加分区类
// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);
// 设置 Reducetask 个数
job.setNumReduceTasks(5);
Combiner合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置 Combiner 是在每一个MapTask所在的节点运行。
(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
Reducer是接收全局所有Mapper的输出结果
(6)自定义 Combiner 实现步骤
- (a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 汇总操作
int count = 0;
for(IntWritable v :values){
count += v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
}
(b)在 Job 驱动类中设置
job.setCombinerClass(WordcountCombiner.class);
Combiner 合并案例实操
1)需求
统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。
(1)数据输入
hello.txt
(2)期望输出数据
期望:Combine 输入数据多,输出时经过合并,输出数据降低。
2)需求分析
需求:对每一个MapTask的输出局部汇总(Combiner)
package com.cosyblogs.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 1 汇总
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
// 2 写出
context.write(key, v);
}
}
(2)在 WordcountDriver 驱动类中指定 Combiner
// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountCombiner.class);
4)案例实操 – 方案二
(1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);
运行程序,如下图所示
