比较简单,直接上代码:
这是MapReduce功能代码:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyInverseIndex {
public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUTPUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
conf.set("hadoop.job.user","hadoop");
conf.set("mapred.job.tracker", "10.103.240.160:9001");
final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setJarByClass(MyInverseIndex.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);//设置个数为1
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
/**
* 只适用于文档中只出现一行 (可以一次读取整个文档)
*
* @author hadoop
*
*/
public static class MyMapper extends Mapper<NullWritable, BytesWritable, Text, Text> {
Map<String, Integer> map = new HashMap();
@Override
public void map(NullWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String line = new String(value.getBytes(), "utf-8").trim();
String[] words = line.split(" ");
for (String s : words) {
if (map.containsKey(s)) {
map.put(s, map.get(s) + 1);
} else {
map.put(s, 1);
}
}
Set<String> keys = map.keySet();
for (Iterator it = keys.iterator(); it.hasNext();) {
String s = (String) it.next();
context.write(new Text(s),
new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
}
map.clear();
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer fileList = new StringBuffer();
for(Text value : values){
fileList.append(value.toString()+";");
}
result.set(fileList.toString());
context.write(key, result);
}
}
}
这是重写的InputFormat类:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* 为了阻止文件分片,并将该整个文件作为一条记录处理
*
* @author Xiaoye
*
*/
public class WholeFileInputFormat extends
FileInputFormat<NullWritable, BytesWritable> {
/**
* 重写该方法将阻止将一个文件分片
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
/**
* 获取RecordReader,该类用来将分片分割成记录,从而生成key和value值给map处理。
*
*/
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
这是RecordReader实现类:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 继承RecordReader
* 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。
* 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片
* 的所有内容
* @author Xiaoye
*/
public class WholeFileRecordReader extends
RecordReader<NullWritable, BytesWritable> {
// 用来盛放传递过来的分片
private FileSplit fileSplit;
private Configuration conf;
//将作为key-value中的value值返回
private BytesWritable value = new BytesWritable();
// 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成
private boolean processed = false;
/**
* 设置RecordReader的分片和配置对象。
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
/**
* 核心算法
* 用来产生key-value值
* 将生成的value值存入value对象中
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
/*
* 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容
* 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中
*/
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
//do nothing
}
}
其中有需要注意的 fileSplit中存放的是分片信息,并没有分片内容。
在BytesWritable 转为byte[]再转为String类型时需要调用trim()方法,不然会有空格影响key值的group。
分享到:
相关推荐
HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将充满文本文件的目录转换为 SequenceFiles,以文件名作为键,内容作为值。 将默认的 SequenceFileInputFormat 与 Hive 一起使用时,Hive 仅...
1.MapTask调用Inputformat方法创建一个RecordReader RecordReader以此调用nextkeyvalue getcurrentkey getcurrentvalue方法 获取传递给Mapper类,每读取一行数据就会调用一次map方法,然后将 通过逻辑处理后的输出到...
MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...
一个完整的Hadoop单词频率统计MapReduce程序。包括:pom.xml,打包jar程序,自定义的FileInputFormat,自定义的RecordReader(key为行号,value为每行文本,可修改),MapReduce程序。可以作为自己业务MapReduce程序...
Hadoop 伙计们, 在这里,您将找到一些... #pdf文件夹:它包含如何在hadoop中编写我们自己的Custome FileInPutFormat,RecordReader类以处理PDF文件。 #Java到HDFS的连接在此软件包中,您将找到如何从Java连接到HD
HadoopMapReduce作业有着独一无二的代码架构,这种代码架构拥有特定的模板和结构。这样的架构会给测试驱动开发和单元测试带来一些麻烦。这篇文章是运用MRUnit,Mockito和PowerMock的真实范例。我会介绍1.使用MRUnit...
HVPI 是一个开源的 Hadoop 视频处理接口 HVPI,用于扩展 Hadoop 以支持视频分析应用程序。 现在,它已partly opened 。 4月底将全面开放,因为届时我们将得到IEEE云委员会的结果。 ##Video R/W Interface 我们在 ...
MapReduce recordreader 用于将 mailmain 压缩存档和存储解析为 Hive