`
小野bupt
  • 浏览: 13922 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

重写RecordReader和InputFormat实现单个文件不分片,整个分片作为一条记录处理。(倒排索引)

 
阅读更多

比较简单,直接上代码:

这是MapReduce功能代码:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyInverseIndex {
	public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
	public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		conf.set("hadoop.job.user","hadoop");
		conf.set("mapred.job.tracker", "10.103.240.160:9001");
		
		final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setJarByClass(MyInverseIndex.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(1);//设置个数为1
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
		
	}

	/**
	 * 只适用于文档中只出现一行 (可以一次读取整个文档)
	 * 
	 * @author hadoop
	 * 
	 */
	public static class MyMapper extends Mapper<NullWritable, BytesWritable, Text, Text> {
		Map<String, Integer> map = new HashMap();

		@Override
		public  void map(NullWritable key, BytesWritable value, Context context)
				throws IOException, InterruptedException {
			String line = new String(value.getBytes(), "utf-8").trim();
			String[] words = line.split(" ");
			for (String s : words) {
				if (map.containsKey(s)) {
					map.put(s, map.get(s) + 1);
				} else {
					map.put(s, 1);
				}
			}
			Set<String> keys = map.keySet();
			for (Iterator it = keys.iterator(); it.hasNext();) {
				String s = (String) it.next();
				context.write(new Text(s),
						new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
			}
			map.clear();
		}
	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {		
		private Text result = new Text();
		// 实现reduce函数
		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			StringBuffer fileList = new StringBuffer();
			for(Text value : values){
				fileList.append(value.toString()+";");
			}
			result.set(fileList.toString());
			context.write(key, result);
		}
	}
		
}
这是重写的InputFormat类:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * 为了阻止文件分片,并将该整个文件作为一条记录处理
 * 
 * @author Xiaoye
 * 
 */
public class WholeFileInputFormat extends
		FileInputFormat<NullWritable, BytesWritable> {

	/**
	 * 重写该方法将阻止将一个文件分片
	 */
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	/**
	 * 获取RecordReader,该类用来将分片分割成记录,从而生成key和value值给map处理。
	 * 
	 */
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		WholeFileRecordReader reader = new WholeFileRecordReader();
		reader.initialize(split, context);
		return reader;
	}
}

这是RecordReader实现类:


package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 继承RecordReader
 * 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。
 * 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片
 * 的所有内容
 * @author Xiaoye
 */
public class WholeFileRecordReader extends
		RecordReader<NullWritable, BytesWritable> {
	// 用来盛放传递过来的分片
	private FileSplit fileSplit;
	private Configuration conf;
	//将作为key-value中的value值返回
	private BytesWritable value = new BytesWritable();
	// 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成
	private boolean processed = false;

	/**
	 * 设置RecordReader的分片和配置对象。
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		this.fileSplit = (FileSplit) split;
		this.conf = context.getConfiguration();
	}

	/**
	 * 核心算法
	 * 用来产生key-value值
	 * 将生成的value值存入value对象中
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!processed) {
			/*
			 * 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容
			 * 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中
			 */
			byte[] contents = new byte[(int) fileSplit.getLength()];
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(conf);
			FSDataInputStream in = null;
			try {
				in = fs.open(file);
				IOUtils.readFully(in, contents, 0, contents.length);
				value.set(contents, 0, contents.length);
			} finally {
				IOUtils.closeStream(in);
			}
			processed = true;
			return true;
		}
		return false;
	}

	@Override
	public NullWritable getCurrentKey() throws IOException,
			InterruptedException {
		return NullWritable.get();
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return processed ? 1.0f : 0.0f;
	}

	@Override
	public void close() throws IOException {
		//do nothing
	}

}

其中有需要注意的 fileSplit中存放的是分片信息,并没有分片内容。

在BytesWritable 转为byte[]再转为String类型时需要调用trim()方法,不然会有空格影响key值的group。

分享到:
评论

相关推荐

    SequenceFileKeyValueInputFormat:自定义 Hadoop InputFormat

    HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将充满文本文件的目录转换为 SequenceFiles,以文件名作为键,内容作为值。 将默认的 SequenceFileInputFormat 与 Hive 一起使用时,Hive 仅...

    大数据常用组件流程

    1.MapTask调用Inputformat方法创建一个RecordReader RecordReader以此调用nextkeyvalue getcurrentkey getcurrentvalue方法 获取传递给Mapper类,每读取一行数据就会调用一次map方法,然后将 通过逻辑处理后的输出到...

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...

    MyMapreduceDemo.zip

    一个完整的Hadoop单词频率统计MapReduce程序。包括:pom.xml,打包jar程序,自定义的FileInputFormat,自定义的RecordReader(key为行号,value为每行文本,可修改),MapReduce程序。可以作为自己业务MapReduce程序...

    Hadoop:Hadoop编程

    Hadoop 伙计们, 在这里,您将找到一些... #pdf文件夹:它包含如何在hadoop中编写我们自己的Custome FileInPutFormat,RecordReader类以处理PDF文件。 #Java到HDFS的连接在此软件包中,您将找到如何从Java连接到HD

    HadoopMapReduce作业的单元测试

    HadoopMapReduce作业有着独一无二的代码架构,这种代码架构拥有特定的模板和结构。这样的架构会给测试驱动开发和单元测试带来一些麻烦。这篇文章是运用MRUnit,Mockito和PowerMock的真实范例。我会介绍1.使用MRUnit...

    hvpi:扩展 Hadoop 以支持视频分析应用程序

    HVPI 是一个开源的 Hadoop 视频处理接口 HVPI,用于扩展 Hadoop 以支持视频分析应用程序。 现在,它已partly opened 。 4月底将全面开放,因为届时我们将得到IEEE云委员会的结果。 ##Video R/W Interface 我们在 ...

    mailman-MapReduce:我正在整理的面向 Hadoop 的演示

    MapReduce recordreader 用于将 mailmain 压缩存档和存储解析为 Hive

Global site tag (gtag.js) - Google Analytics