目录
MapReduceOutputFormat 接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1.文本输出TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
2.SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
3.自定义OutputFormat
根据用户需求,自定义实现输出。
自定义 OutputFormat
1.使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。
2.自定义OutputFormat步骤
- (1)自定义一个类继承FileOutputFormat。
- (2)改写RecordWriter,具体改写输出数据的方法write()。
自定义OutputFormat案例实操
1)需求
过滤输入的log日志,包含atguigu的网站输出到e:/cosyblogs.log,不包含cosyblogs的网站输出到e:/other.log。
(1)输入数据
log.txt
点击下载测试数据(2)期望输出数据
cosyblogs.log、other.log
2)需求分析

3)案例实操
(1)编写 LogMapper 类
package com.cosyblogs.mr.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
/**
* 核心处理方法
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 直接写出
context.write(value, NullWritable.get());
}
}
(2)编写 LogReducer 类
package com.cosyblogs.mr.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 遍历直接写出
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
(3)自定义一个 OutputFormat 类
package com.cosyblogs.mr.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 自定义的OutputFormat需要继承Hadoop提供的OutputFormat
*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
/**
* 返回RecordWriter
* @param job
* @return
* @throws IOException
* @throws InterruptedException
*/
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
LogRecordWriter lrw = new LogRecordWriter(job);
return lrw;
}
}
(4)编写 LogRecordWriter 类
package com.cosyblogs.mr.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* 自定义LogRecordWriter 需要继承Hadoop提供的RecordWriter
*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
// 定义输出路径
private String atguiguPath = "F:\\logs\\cosyblogs.txt";
private String otherPath = "F:\\logs\\other.txt";
private FileSystem fs;
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
/**
* 初始化工作
* @param job
*/
public LogRecordWriter(TaskAttemptContext job) throws IOException {
// 获取Hadoop的文件系统对象
fs = FileSystem.get(job.getConfiguration());
// 获取输出流 atguiguOut
atguiguOut = fs.create(new Path(atguiguPath));
// 获取输出流otherOut
otherOut = fs.create(new Path(otherPath));
}
/**
* 实现数据写出的逻辑
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 获取当前输入数据
String logData = key.toString();
if(logData.contains("atguigu")){
atguiguOut.writeBytes(logData + "\n");
}else {
otherOut.writeBytes(logData + "\n");
}
}
/**
* 关闭资源
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
(5)编写 LogDriver 类
package com.cosyblogs.mr.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 指定自定义的OutputFormat类
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("F:\\in\\log"));
FileOutputFormat.setOutputPath(job, new Path("F:\\logs\\log"));
job.waitForCompletion(true);
}
}