最近幾天一直在看Hadoop相關的書籍,目前稍微有點感覺,自己就仿照著WordCount程序自己編寫了一個統計關聯商品。
需求描述:
根據超市的銷售清單,計算商品之間的關聯程度(即統計同時買A商品和B商品的次數)。
數據格式:
超市銷售清單簡化為如下格式:一行表示一個清單,每個商品采用 "," 分割,如下圖所示:
需求分析:
采用hadoop中的mapreduce對該需求進行計算。
map函數主要拆分出關聯的商品,輸出結果為 key為商品A,value為商品B,對于第一條三條結果拆分結果如下圖所示:
這里為了統計出和A、B兩件商品想關聯的商品,所以商品A、B之間的關系輸出兩條結果即 A-B、B-A。
reduce函數分別對和商品A相關的商品進行分組統計,即分別求value中的各個商品出現的次數,輸出結果為key為商品A|商品B,value為該組合出現的次數。針對上面提到的5條記錄,對map輸出中key值為R的做下分析:
通過map函數的處理,得到如下圖所示的記錄:
reduce中對map輸出的value值進行分組計數,得到的結果如下圖所示
將商品A B作為key,組合個數作為value輸出,輸出結果如下圖所示:
對于需求的實現過程的分析到目前就結束了,下面就看下具體的代碼實現
代碼實現:
關于代碼就不做詳細的介紹,具體參照代碼之中的注釋吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package com; import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool{ /** * map類,實現數據的預處理 * 輸出結果key為商品A value為關聯商品B * @author lulei */ public static class MapT extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); if (!(line == null || "" .equals(line))) { //分割商品 String []vs = line.split( "," ); //兩兩組合,構成一條記錄 for ( int i = 0 ; i < (vs.length - 1 ); i++) { if ( "" .equals(vs[i])) { //排除空記錄 continue ; } for ( int j = i+ 1 ; j < vs.length; j++) { if ( "" .equals(vs[j])) { continue ; } //輸出結果 context.write( new Text(vs[i]), new Text(vs[j])); context.write( new Text(vs[j]), new Text(vs[i])); } } } } } /** * reduce類,實現數據的計數 * 輸出結果key 為商品A|B value為該關聯次數 * @author lulei */ public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> { private int count; /** * 初始化 */ public void setup(Context context) { //從參數中獲取最小記錄個數 String countStr = context.getConfiguration().get( "count" ); try { this .count = Integer.parseInt(countStr); } catch (Exception e) { this .count = 0 ; } } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String keyStr = key.toString(); HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); //利用hash統計B商品的次數 for (Text value : values) { String valueStr = value.toString(); if (hashMap.containsKey(valueStr)) { hashMap.put(valueStr, hashMap.get(valueStr) + 1 ); } else { hashMap.put(valueStr, 1 ); } } //將結果輸出 for (Entry<String, Integer> entry : hashMap.entrySet()) { if (entry.getValue() >= this .count) { //只輸出次數不小于最小值的 context.write( new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); } } } } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); conf.set( "count" , arg0[ 2 ]); Job job = new Job(conf); job.setJobName( "jobtest" ); job.setOutputFormatClass(TextOutputFormat. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(Text. class ); job.setMapperClass(MapT. class ); job.setReducerClass(ReduceT. class ); FileInputFormat.addInputPath(job, new Path(arg0[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(arg0[ 1 ])); job.waitForCompletion( true ); return job.isSuccessful() ? 0 : 1 ; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub if (args.length != 3 ) { System.exit(- 1 ); } try { int res = ToolRunner.run( new Configuration(), new Test(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
上傳運行:
將程序打包成jar文件,上傳到機群之中。將測試數據也上傳到HDFS分布式文件系統中。
命令運行截圖如下圖所示:
運行結束后查看相應的HDFS文件系統,如下圖所示:
到此一個完整的mapreduce程序就完成了,關于hadoop的學習,自己還將繼續~感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!