我们在用矩阵处理真实数据的时候,一般都是非常稀疏矩阵,为了节省存储空间,通常只会存储非0的数据。
下面我们来做一个稀疏矩阵:
新建2个稀疏矩阵数据文件sm1.csv, sm2.csv
只存储非0的数据,3列存储,第一列“原矩阵行”,第二列“原矩阵列”,第三列“原矩阵值”。
sm1.csv
1,1,1
1,4,3
2,1,2
2,2,5
2,4,4
3,4,1
4,1,4
4,2,7
4,3,1
4,4,2
sm2.csv
1,1,5
2,2,2
4,1,3
4,2,1
代码:
package org.edu.bupt.xiaoye.sparsemartrix;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SparseMartrixMultiply {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
private String flag; // m1 or m2
private int rowNumA = 4; // 矩阵A的行数,因为要在对B的矩阵处理中要用
private int colNumA = 4;// 矩阵A的列数
private int rolNumB = 4;
private int colNumB = 2;// 矩阵B的列数
private static final Text k = new Text();
private static final Text v = new Text();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();// 判断读的数据集
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] records = value.toString().split(",");
int x = Integer.parseInt(records[0]);
int y = Integer.parseInt(records[1]);
int num = Integer.parseInt(records[2]);
if (flag.equals("m1")) {
String[] vs = value.toString().split(",");
for (int j = 0; j < colNumB; j++) {
k.set(x + "," + (j + 1));
v.set("A" + ":" + y + "," + num);
context.write(k, v);
}
} else if (flag.equals("m2")) {
for (int j = 0; j < rowNumA; j++) {
k.set((j + 1) + "," + y);
v.set("B:" + x + "," + num);
context.write(k, v);
}
}
}
}
public static class MyReducer extends
Reducer<Text, Text, Text, IntWritable> {
private static IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/*
* 这里和一般矩阵不同
* 一般矩阵中,当进行第二次reduce方法调用的时候,会对数组a和b的所有元素都重新赋值
* 而在稀疏矩阵中,不会对数组的所有元素重新赋值,从而会发生上次调用reduce方法残留的数组元素值对这一次reduce产生影响。
*/
int[] a = new int[4];
int[] b = new int[4];
for (Text value : values) {
String[] vs = value.toString().split(":");
if (vs[0].equals("A")) {
String[] ints = vs[1].toString().split(",");
a[Integer.parseInt(ints[0]) - 1] = Integer
.parseInt(ints[1]);
} else {
String[] ints = vs[1].toString().split(",");
b[Integer.parseInt(ints[0]) - 1] = Integer
.parseInt(ints[1]);
}
}
// 用矩阵A的行乘矩阵B的列
int sum = 0;
for (int i = 0; i < 4; i++) {
sum += a[i] * b[i];
}
v.set(sum);
context.write(key, v);
}
}
public static void run(Map<String, String> path) throws Exception {
String input = path.get("input");
String output = path.get("output");
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(input), conf);
final Path outPath = new Path(output);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
conf.set("hadoop.job.user", "hadoop");
// conf.set("mapred.job.tracker", "10.103.240.160:9001");
final Job job = new Job(conf);
FileInputFormat.setInputPaths(job, input);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);// 设置个数为1
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}
驱动类:
package org.edu.bupt.xiaoye.sparsemartrix;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
public class MainRun {
public static final String HDFS = "hdfs://10.103.240.160:9000";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
public static void main(String[] args) {
martrixMultiply();
}
public static void martrixMultiply() {
Map<String, String> path = new HashMap<String, String>();
path.put("input", HDFS + "/usr/hadoop/SparseMatrix");// HDFS的目录
path.put("output", HDFS + "/usr/hadoop/SparseMatrix/output");
try {
SparseMartrixMultiply.run(path);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(0);
}
}
这里注意需要注意的地方:
在reducer中定义数组a和b的时候,不要定义成MyMapper类成员。我就是因为这里定义成了成员变量导致出了错误调了好久。
/*
* 这里和一般矩阵不同
* 一般矩阵中,当进行第二次reduce方法调用的时候,会对数组a和b的所有元素都重新赋值
* 而在稀疏矩阵中,不会对数组的所有元素重新赋值,从而会发生上次调用reduce方法残留的数组元素值对这一次reduce产生影响。
*/
分享到:
相关推荐
最近在研究hadoop与mapReduce,网上教程只有个wordcount程序示范,太简单,故写了个相对复杂点的涉及到多个文件之间同时运算的矩阵乘法的代码用于实验与测试,上传供大家学习与参考。 调用方法: 执行:hadoop jar ...
用Hadoop实现的大矩阵乘法,包括代码设计思路以及可以执行的源代码。已在hadoop-1.0.3平台测试通过,对初学者是很好的材料。
2.矩阵相乘实验(matrix) (1)写matrix代码并把代码生成jar包 (2)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (3):查看运行结果 3.网页排名实验(pagerank) (1):写pagerank代码并把代码...
文档较详尽的讲述了MR的简介,MR初学分析示例(有代码)、MR特性,MR的执行过程(有代码),MR单元测试介绍(有代码)、HA的架构和配置、同时也向大众推荐了两本书。其中部分有较为详尽的链接以供参考。
1、资源内容:基于Hadoop MapReduce的矩阵乘法 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功,功能ok的情况下才上传的。 3、适用对象...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
Hadoop集群配置及MapReduce开发手册
java操作hadoop之mapreduce计算整数的最大值和最小值实战源码,附带全部所需jar包,欢迎下载一起学习。
有详细的Hadoop集群配置说明,另外还讲解了mapreduce开发和Hadoop性能优化。
Hadoop介绍,HDFS和MapReduce工作原理
windows下配置cygwin、hadoop等并运行mapreduce及mapreduce程序讲解
并通过Eclipse进行MapReduce程序的开发,步骤详细完整,在相关过程中配有完整代码和解释,全程无误,只需复制粘贴即可,小白新手按步骤一步一步来也能搭建Hadoop集群成功并进行MapReduce程序开发!!!喜欢请关注...
hadoop-mapreduce-examples-2.7.1.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
Hadoop+HDFS和MapReduce架构浅析
详细介绍Hadoop家族中的MapReduce原理
在hadoop平台上,用mapreduce编程实现大数据的词频统计