最码农 最码农
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
  • 注册
  • 登录
首页 › 大数据 › MapReduce框架原理-Shuffle 机制

MapReduce框架原理-Shuffle 机制

Cosy
10月前大数据阅读 382

Shuffle 机制

Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

MapReduce框架原理-Shuffle 机制-最码农

Partition分区

1、问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)。

2、默认Partitioner分区


public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

3、自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法


public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
    // 控制分区代码逻辑
    ……
    return partition;
    }
}

(2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

4、分区总结

  • (1)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  • (2)如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
  • (3)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
  • (4)分区号必须从零开始,逐一累加。

5、案例分析

例如:假设自定义分区数为5,则

  • (1)job.setNumReduceTasks(1); —— 会正常运行,只不过会产生一个输出文件
  • (2)job.setNumReduceTasks(2); —— 会报错
  • (3)job.setNumReduceTasks(6); —— 大于5,程序会正常运行,会产生空文件

Partition 分区案例实操

1)需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据

phone_data .txt

点击下载测试数据

(2)期望输出数据

手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到一个文件中。

2)需求分析

MapReduce框架原理-Shuffle 机制-最码农

3)在之前案例的基础上,增加一个分区类


package com.cosyblogs.mr.partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
	  public int getPartition(Text key, FlowBean value, int numPartitions) {
    // 1 获取电话号码的前三位
		    String preNum = key.toString().substring(0, 3);
		    int partition = 4;
		    // 2 判断是哪个省
		    if ("136".equals(preNum)) {
			      partition = 0;
		    }else if ("137".equals(preNum)) {
			      partition = 1;
		    }else if ("138".equals(preNum)) {
			      partition = 2;
		    }else if ("139".equals(preNum)) {
			      partition = 3;
		    }
		    return partition;
    }
}

4)在驱动函数中增加自定义数据分区设置和ReduceTask设置


package com.cosyblogs.mr.partitioner;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

		// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
		args = new String[]{"e:/output1","e:/output2"};

		// 1 获取配置信息,或者job对象实例
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);

		// 2 指定本程序的jar包所在的本地路径
		job.setJarByClass(FlowsumDriver.class);

		// 3 指定本业务job要使用的mapper/Reducer业务类
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);

		// 4 指定mapper输出数据的kv类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		// 5 指定最终输出的数据的kv类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		// 8 指定自定义数据分区
		job.setPartitionerClass(ProvincePartitioner.class);

		// 9 同时指定相应数量的reduce task
		job.setNumReduceTasks(5);
		
		// 6 指定job的输入原始文件所在目录
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

WritableComparable 排序

排序概述

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数 据按 照key进 行排 序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

(1)部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

(2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序:(GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

(4)二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序 WritableComparable 原理分析

bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。


@Override
    public int compare(WritableComparable a, WritableComparable b) {
        FlowBean abean = (FlowBean) a;
        FlowBean bbean = (FlowBean) b;
        return abean.getSumFlow().compareTo(bbean.getSumFlow());
    }

WritableComparable 排序案例实操(全排序)

1)需求

根据前面案例序列化案例产生的结果再次对总流量进行倒序排序。

MapReduce框架原理-Shuffle 机制-最码农
点击下载测试数据

2)需求分析

MapReduce框架原理-Shuffle 机制-最码农

3)代码实现

(1)FlowBean 对象在在需求 1 基础上增加了比较功能


package com.cosyblogs.mr.writablecomparable;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 流量对象(实现Hadoop的序列化)
 *
 */
public class FlowBean implements WritableComparable<FlowBean> {

    private Integer upFlow;
    private Integer downFlow;
    private Integer sumFlow;

    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Integer sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow  + "\t" + downFlow + "\t" +sumFlow;
    }

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    public void write(DataOutput out) throws IOException {
        out.writeInt(upFlow);
        out.writeInt(downFlow);
        out.writeInt(sumFlow);
    }

    /**
     * 反序列化方法
     * @param in
     * @throws IOException
     */
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readInt();
        downFlow = in.readInt();
        sumFlow = in.readInt();
    }

    /**
     * 计算上下行流量的总和
     */
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    /**
     * 自定义排序规则
     * 需求:根总流量倒序
     * @param o
     * @return
     */
    public int compareTo(FlowBean o) {
        return -this.getSumFlow().compareTo(o.getSumFlow());
    }
}

(2)编写 Mapper 类


package com.cosyblogs.mr.writablecomparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    private Text outv = new Text();

    private FlowBean  outk = new FlowBean();

    /**
     * 核心业务逻辑处理
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取当前行数据
        String line = value.toString();
        // 切割数据  1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
        String[] phoneDatas = line.split("\t");
        // 获取输出数据的key(手机号)
        outv.set(phoneDatas[1]);
        // 获取输出数据的value
        outk.setUpFlow(Integer.parseInt(phoneDatas[phoneDatas.length-3]));
        outk.setDownFlow(Integer.parseInt(phoneDatas[phoneDatas.length-2]));
        outk.setSumFlow();

        // 将数据输出
        context.write(outk, outv);

    }
}

(3)编写 Reducer 类


package com.cosyblogs.mr.writablecomparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    private FlowBean outv = new FlowBean();


    /**
     * 核心业务逻辑处理
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 遍历输出
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

(4)编写 Driver 类


package com.atguigu.mr.writablecomparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{"E:/input","E:/output"};
        // 1 获取配置信息,或者 job 对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 2 指定本程序的 jar 包所在的本地路径
        job.setJarByClass(FlowCountSortDriver.class);
        // 3 指定本业务 job 要使用的 mapper/Reducer 业务类
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);
        // 4 指定 mapper 输出数据的 kv 类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 5 指定最终输出的数据的 kv 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6 指定 job 的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包, 提交给 yarn 去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

WritableComparable 排序案例实操(区内排序)

1)需求

要求每个省份手机号输出的文件中按照总流量内部排序。

2)需求分析

基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

MapReduce框架原理-Shuffle 机制-最码农

3)案例实操

(1)增加自定义分区类


package com.cosyblogs.mr.writablecomparable;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定义一个分区器对象,需要继承Hadoop的提供的Partitioner对象
 */
public class PhonePartitioner extends Partitioner<FlowBean,Text> {
    /**
     * 定义当前kv所属分区的规则
     * @param text
     * @param flowBean
     * @param numPartitions
     * @return
     *  136 --> 0
     *  137 --> 1
     *  138 --> 2
     *  139 --> 3
     *  其他 --> 4
     */
    public int getPartition(FlowBean flowBean,Text text,int numPartitions) {

        int phonePartitions;
        // 获取手机号
        String phoneNum = text.toString();
        if(phoneNum.startsWith("136")){
            phonePartitions = 0;
        }else if(phoneNum.startsWith("137")){
            phonePartitions = 1;
        }else if(phoneNum.startsWith("138")){
            phonePartitions = 2;
        }else if(phoneNum.startsWith("139")){
            phonePartitions = 3;
        }else {
            phonePartitions =4;
        }
        return phonePartitions;
    }
}

(2)在驱动类中添加分区类


// 加载自定义分区类
job.setPartitionerClass(ProvincePartitioner.class);

// 设置 Reducetask 个数
job.setNumReduceTasks(5);

Combiner合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2)Combiner组件的父类就是Reducer。

(3)Combiner和Reducer的区别在于运行的位置 Combiner 是在每一个MapTask所在的节点运行。

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4

Reducer
(3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2

Reducer是接收全局所有Mapper的输出结果

(6)自定义 Combiner 实现步骤

  • (a)自定义一个 Combiner 继承 Reducer,重写 Reduce 方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
        // 1 汇总操作
        int count = 0;
        for(IntWritable v :values){
            count += v.get();
        }
        // 2 写出
        context.write(key, new IntWritable(count));
    }
}

(b)在 Job 驱动类中设置


job.setCombinerClass(WordcountCombiner.class);

Combiner 合并案例实操

1)需求

统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用 Combiner 功能。

(1)数据输入

hello.txt

(2)期望输出数据

期望:Combine 输入数据多,输出时经过合并,输出数据降低。

2)需求分析

需求:对每一个MapTask的输出局部汇总(Combiner)


package com.cosyblogs.mr.combiner;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	IntWritable v = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		// 1 汇总
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}
		v.set(sum);
		// 2 写出
		context.write(key, v);
	}
}

(2)在 WordcountDriver 驱动类中指定 Combiner


// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountCombiner.class);

4)案例实操 – 方案二

(1)将 WordcountReducer 作为 Combiner 在 WordcountDriver 驱动类中指定


// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);

运行程序,如下图所示

MapReduce框架原理-Shuffle 机制-最码农
MapReduce 教程
赞赏 赞(0) 收藏(0)
MapReduce框架原理-MapReduce工作流程
上一篇
MapReduce框架原理-MapTask工作机制
下一篇
再想想
暂无评论
随 机 推 荐
基于Layui的兽音译者加密页面
Hadoop 小文件优化方法
Hadoop2.x 和 Hadoop3.x 的新特性
MapReduce框架原理-InputFormat数据输入
Hadoop概论(一)—— 开篇词
Hadoop HDFS(二)
Hadoop HA 高可用
Hadoop 数据压缩
  • 0
  • 0
介绍

我们致力于打造一个原创的计算机相关技术的博客网站,旨在为访客提供一个优质的计算机技术教程交流平台。网站开辟了很多于计算机相关的栏目,并且收集了不少实用资源,同时也鼓励欢迎访客一起分享、交流、学习。

灵魂推荐
Veer图库 数码荔枝
栏目标题
首页 动态广场 精选栏目 闲言碎语 左邻右里 笔记屋
Copyright © 2021-2023 最码农. 苏ICP备20033168号
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
# 教程 # # Hadoop # # HDFS # # 人工智能 # # 算法 #
Cosy
即使世界毁灭,也总有回光返照的那一刻
90
文章
3
评论
425
喜欢