java 矩陣乘法的mapreduce程序?qū)崿F(xiàn)
map函數(shù):對(duì)于矩陣M中的每個(gè)元素m(ij),產(chǎn)生一系列的key-value對(duì)<(i,k),(M,j,m(ij))>
其中k=1,2.....知道矩陣N的總列數(shù);對(duì)于矩陣N中的每個(gè)元素n(jk),產(chǎn)生一系列的key-value對(duì)<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩陣M的總列數(shù)。
map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package com.cb.matrix; import static org.mockito.Matchers.intThat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.Mapper; import com.sun.org.apache.bcel.internal.generic.NEW; public class MatrixMapper extends Mapper<Object, Text, Text, Text> { private Text map_key= new Text(); private Text map_value= new Text(); private int columnN; private int rowM; /** * 執(zhí)行map()函數(shù)前先由conf.get()得到main函數(shù)中提供的必要變量 * 也就是從輸入文件名中得到的矩陣維度信息 */ @Override protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration config=context.getConfiguration(); columnN=Integer.parseInt(config.get( "columnN" )); rowM =Integer.parseInt(config.get( "rowM" )); } @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //得到文件名,從而區(qū)分輸入矩陣M和N FileSplit fileSplit=(FileSplit)context.getInputSplit(); String fileName=fileSplit.getPath().getName(); if (fileName.contains( "M" )) { String[] tuple =value.toString().split( "," ); int i =Integer.parseInt(tuple[ 0 ]); String[] tuples=tuple[ 1 ].split( "\t" ); int j=Integer.parseInt(tuples[ 0 ]); int Mij=Integer.parseInt(tuples[ 1 ]); for ( int k= 1 ;k<columnN+ 1 ;k++){ map_key.set(i+ "," +k); map_value.set( "M" + "," +j+ "," +Mij); context.write(map_key, map_value); } } else if (fileName.contains( "N" )){ String[] tuple=value.toString().split( "," ); int j=Integer.parseInt(tuple[ 0 ]); String[] tuples =tuple[ 1 ].split( "\t" ); int k=Integer.parseInt(tuples[ 0 ]); int Njk=Integer.parseInt(tuples[ 1 ]); for ( int i= 1 ;i<rowM+ 1 ;i++){ map_key.set(i+ "," +k); map_value.set( "N" + "," +j+ "," +Njk); context.write(map_key, map_value); } } } } |
reduce函數(shù):對(duì)于每個(gè)鍵(i,k)相關(guān)聯(lián)的值(M,j,m(ij))及(N,j,n(jk)),根據(jù)相同的j值將m(ij)和n(jk)分別存入不同的數(shù)組中,然后將倆者的第j個(gè)元素抽取出來分別相乘,最后相加,即可得到p(jk)的值。
reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.cb.matrix; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MatrixReducer extends Reducer<Text, Text, Text, Text> { private int sum= 0 ; private int columnM; @Override protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration conf =context.getConfiguration(); columnM=Integer.parseInt(conf.get( "columnM" )); } @Override protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2) throws IOException, InterruptedException { // TODO Auto-generated method stub int [] M= new int [columnM+ 1 ]; int [] N= new int [columnM+ 1 ]; for (Text val:arg1){ String[] tuple=val.toString().split( "," ); if (tuple[ 0 ].equals( "M" )){ M[Integer.parseInt(tuple[ 1 ])]=Integer.parseInt(tuple[ 2 ]); } else { N[Integer.parseInt(tuple[ 1 ])]=Integer.parseInt(tuple[ 2 ]); } for ( int j= 1 ;j<columnM+ 1 ;j++){ sum+=M[j]*N[j]; } arg2.write(arg0, new Text(Integer.toString(sum))); sum= 0 ; } } } |
感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!
原文鏈接:https://my.oschina.net/u/3264690/blog/909239