最码农 最码农
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
  • 注册
  • 登录
首页 › 大数据 › MapReduce框架原理-OutputFormat 数据输出

MapReduce框架原理-OutputFormat 数据输出

Cosy
10月前大数据阅读 363

OutputFormat 接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1.文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

2.SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

3.自定义OutputFormat
根据用户需求,自定义实现输出。

自定义 OutputFormat

1.使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

2.自定义OutputFormat步骤

  • (1)自定义一个类继承FileOutputFormat。
  • (2)改写RecordWriter,具体改写输出数据的方法write()。

自定义OutputFormat案例实操

1)需求

过滤输入的log日志,包含atguigu的网站输出到e:/cosyblogs.log,不包含cosyblogs的网站输出到e:/other.log。

(1)输入数据

log.txt

点击下载测试数据

(2)期望输出数据

cosyblogs.log、other.log

2)需求分析

MapReduce框架原理-OutputFormat 数据输出-最码农

3)案例实操

(1)编写 LogMapper 类


package com.cosyblogs.mr.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    /**
     * 核心处理方法
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 直接写出
        context.write(value, NullWritable.get());
    }
}

(2)编写 LogReducer 类


package com.cosyblogs.mr.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 遍历直接写出
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}

(3)自定义一个 OutputFormat 类


package com.cosyblogs.mr.outputformat;


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 自定义的OutputFormat需要继承Hadoop提供的OutputFormat
 */
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {

    /**
     * 返回RecordWriter
     * @param job
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        LogRecordWriter lrw = new LogRecordWriter(job);
        return lrw;
    }
}

(4)编写 LogRecordWriter 类


package com.cosyblogs.mr.outputformat;


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
 * 自定义LogRecordWriter 需要继承Hadoop提供的RecordWriter
 */
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    // 定义输出路径
    private  String atguiguPath = "F:\\logs\\cosyblogs.txt";
    private String otherPath = "F:\\logs\\other.txt";
    private FileSystem fs;
    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    /**
     * 初始化工作
     * @param job
     */
    public LogRecordWriter(TaskAttemptContext job) throws IOException {
        // 获取Hadoop的文件系统对象
        fs = FileSystem.get(job.getConfiguration());
        // 获取输出流 atguiguOut
        atguiguOut = fs.create(new Path(atguiguPath));
        // 获取输出流otherOut
        otherOut = fs.create(new Path(otherPath));
    }


    /**
     * 实现数据写出的逻辑
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 获取当前输入数据
        String logData = key.toString();
        if(logData.contains("atguigu")){
            atguiguOut.writeBytes(logData + "\n");
        }else {
            otherOut.writeBytes(logData + "\n");
        }
    }

    /**
     * 关闭资源
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

(5)编写 LogDriver 类


package com.cosyblogs.mr.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 指定自定义的OutputFormat类
        job.setOutputFormatClass(LogOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("F:\\in\\log"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\logs\\log"));
        job.waitForCompletion(true);
    }
}

MapReduce 教程
赞赏 赞(0) 收藏(0)
MapReduce框架原理-ReduceTask工作机制
上一篇
MapReduce框架原理-Join 多种应用
下一篇
再想想
暂无评论
随 机 推 荐
基于Layui的兽音译者加密页面
Hadoop 小文件优化方法
Hadoop2.x 和 Hadoop3.x 的新特性
MapReduce框架原理-InputFormat数据输入
Hadoop概论(一)—— 开篇词
Hadoop HDFS(二)
Hadoop HA 高可用
Hadoop 数据压缩
  • 0
  • 0
介绍

我们致力于打造一个原创的计算机相关技术的博客网站,旨在为访客提供一个优质的计算机技术教程交流平台。网站开辟了很多于计算机相关的栏目,并且收集了不少实用资源,同时也鼓励欢迎访客一起分享、交流、学习。

灵魂推荐
Veer图库 数码荔枝
栏目标题
首页 动态广场 精选栏目 闲言碎语 左邻右里 笔记屋
Copyright © 2021-2023 最码农. 苏ICP备20033168号
  • 首页
  • 动态广场
  • 精选栏目
  • 闲言碎语
  • 左邻右里
  • 笔记屋
# 教程 # # Hadoop # # HDFS # # 人工智能 # # 算法 #
Cosy
即使世界毁灭,也总有回光返照的那一刻
90
文章
3
评论
425
喜欢