Hadoop MultipleOutputs輸出到多個文件中的實現(xiàn)方法
1.輸出到多個文件或多個文件夾:
驅(qū)動中不需要額外改變,只需要在MapClass或Reduce類中加入如下代碼
1
2
3
4
5
6
7
|
private MultipleOutputs<Text,IntWritable> mos; public void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs(context); } public void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } |
然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);
在MapClass或Reduce中使用,輸出時也會有默認(rèn)的文件part-m-00*或part-r-00*,不過這些文件是無內(nèi)容的,大小為0. 而且只有part-m-00*會傳給Reduce。
注意:multipleOutputs.write(key, value, baseOutputPath)方法的第三個函數(shù)表明了該輸出所在的目錄(相對于用戶指定的輸出目錄)。
如果baseOutputPath不包含文件分隔符“/”,那么輸出的文件格式為baseOutputPath-r-nnnnn(name-r-nnnnn);
如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么輸出文件則為029070-99999/1901/part-r-nnnnn
2.案例-需求
需求,下面是有些測試數(shù)據(jù),要對這些數(shù)據(jù)按類目輸出到output中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
1512,iphone5s,4英寸,指紋識別,A7處理器,64位,M7協(xié)處理器,低功耗 1512,iphone5,4英寸,A6處理器,IOS7 1512,iphone4s,3.5英寸,A5處理器,雙核,經(jīng)典 50019780,ipad,9.7英寸,retina屏幕,豐富的應(yīng)用 50019780,yoga,聯(lián)想,待機18小時,外形獨特 50019780,nexus 7,華碩&google,7英寸 50019780,ipad mini 2,retina顯示屏,蘋果,7.9英寸 1101,macbook air,蘋果超薄,OS X mavericks 1101,macbook pro,蘋果,OS X lion 1101,thinkpad yoga,聯(lián)想,windows 8,超級本 |
3.Mapper程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package cn.edu.bjut.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultiOutPutMapper extends Mapper<LongWritable, Text, IntWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if ( null != line && 0 != line.length()) { String[] arr = line.split( "," ); context.write( new IntWritable(Integer.parseInt(arr[ 0 ])), value); } } } |
4.Reducer程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package cn.edu.bjut.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class MultiOutPutReducer extends Reducer<IntWritable, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs = null ; @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text text : values) { multipleOutputs.write( "KeySpilt" , NullWritable.get(), text, key.toString()+ "/" ); multipleOutputs.write( "AllPart" , NullWritable.get(), text); } } @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { if ( null != multipleOutputs) { multipleOutputs.close(); multipleOutputs = null ; } } } |
5.主程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package cn.edu.bjut.multioutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MainJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "aaa" ); job.setJarByClass(MainJob. class ); job.setMapperClass(MultiOutPutMapper. class ); job.setMapOutputKeyClass(IntWritable. class ); job.setMapOutputValueClass(Text. class ); job.setReducerClass(MultiOutPutReducer. class ); job.setOutputKeyClass(NullWritable. class ); job.setOutputValueClass(Text. class ); FileInputFormat.addInputPath(job, new Path(args[ 0 ])); MultipleOutputs.addNamedOutput(job, "KeySpilt" , TextOutputFormat. class , NullWritable. class , Text. class ); MultipleOutputs.addNamedOutput(job, "AllPart" , TextOutputFormat. class , NullWritable. class , Text. class ); Path outPath = new Path(args[ 1 ]); FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true ); } FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion( true ); } } |
如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
原文鏈接:http://blog.csdn.net/zhyooo123/article/details/77866458