`
小野bupt
  • 浏览: 14047 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

在MapReduce远程提交输出结果边为空

 
阅读更多

今天在做hadoop小实验(类似于倒排索引),

在本地运行时本地job“Running job: job_local_0001” 的得到的数据格式为为:

Hello	file3.txt:1;
MapReduce	file3.txt:2;file1.txt:1;file2.txt:1;
bye	file3.txt:1;
is	file1.txt:1;file2.txt:2;
powerful	file2.txt:1;
simple	file2.txt:1;file1.txt:1;


而 提交到集群上运行“Running job: job_201405091426_0019”得到数据格式为空值。

输入文件内容为:

file1.txt
	MapReduce is simple
file2.txt
	MapReduce is powerful is simple
file3.txt
	Hello MapReduce bye MapReduce
这三个文件。


搞了半天不知道什么问题。记录下来 以后解决。

程序源码如下:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyInverseIndex {
	public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
	public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		conf.set("hadoop.job.user","hadoop");
		conf.set("mapred.job.tracker", "10.103.240.160:9001");
		final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPath);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(1);//设置个数为1
		job.waitForCompletion(true);
		
	}

	/**
	 * 只适用于文档中只出现一行 (可以一次读取整个文档)
	 * 
	 * @author hadoop
	 * 
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Map<String, Integer> map = new HashMap();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] words = line.split(" ");
			for (String s : words) {
				if (map.containsKey(s)) {
					map.put(s, map.get(s) + 1);
				} else {
					map.put(s, 1);
				}
			}
			Set<String> keys = map.keySet();
			for (Iterator it = keys.iterator(); it.hasNext();) {
				String s = (String) it.next();
				context.write(new Text(s),
						new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
			}
			map.clear();
		}

	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException 
		{
			StringBuffer files = new StringBuffer();
			for(Text fileName : values){
				files.append(fileName+";");
			}
			context.write(key , new Text(files.toString()));			
		}
	}

}

今天在检查的时候,重写了一遍,把MyReduce改成这样就好了,奇怪。

public static class MyReduce extends Reducer<Text, Text, Text, Text> {		
		private Text result = new Text();
		// 实现reduce函数
		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			StringBuffer fileList = new StringBuffer();
			for(Text value : values){
				fileList.append(value.toString()+";");
			}
			result.set(fileList.toString());
			context.write(key, result);
		}
	}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics