1. 收集階段
在mapper
中,調用context.write(key,value)
實際是調用代理newoutputcollector
的wirte
方法
1
2
3
4
|
public void write(keyout key, valueout value ) throws ioexception, interruptedexception { output.write(key, value); } |
實際調用的是mapoutputbuffer
的collect()
,在進行收集前,調用partitioner來計算每個key-value的分區號
1
2
3
4
5
|
@override public void write(k key, v value) throws ioexception, interruptedexception { collector.collect(key, value, partitioner.getpartition(key, value, partitions)); } |
2. newoutputcollector對象的創建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@suppresswarnings ( "unchecked" ) newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext, jobconf job, taskumbilicalprotocol umbilical, taskreporter reporter ) throws ioexception, classnotfoundexception { // 創建實際用來收集key-value的緩存區對象 collector = createsortingcollector(job, reporter); // 獲取總的分區個數 partitions = jobcontext.getnumreducetasks(); if (partitions > 1 ) { partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>) reflectionutils.newinstance(jobcontext.getpartitionerclass(), job); } else { // 默認情況,直接創建一個匿名內部類,所有的key-value都分配到0號分區 partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() { @override public int getpartition(k key, v value, int numpartitions) { return partitions - 1 ; } }; } } |
3. 創建環形緩沖區對象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@suppresswarnings ( "unchecked" ) private <key, value> mapoutputcollector<key, value> createsortingcollector(jobconf job, taskreporter reporter) throws ioexception, classnotfoundexception { mapoutputcollector.context context = new mapoutputcollector.context( this , job, reporter); // 從當前job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設置,使用mapoutputbuffer.class class <?>[] collectorclasses = job.getclasses( jobcontext.map_output_collector_class_attr, mapoutputbuffer. class ); int remainingcollectors = collectorclasses.length; exception lastexception = null ; for ( class clazz : collectorclasses) { try { if (!mapoutputcollector. class .isassignablefrom(clazz)) { throw new ioexception( "invalid output collector class: " + clazz.getname() + " (does not implement mapoutputcollector)" ); } class <? extends mapoutputcollector> subclazz = clazz.assubclass(mapoutputcollector. class ); log.debug( "trying map output collector class: " + subclazz.getname()); // 創建緩沖區對象 mapoutputcollector<key, value> collector = reflectionutils.newinstance(subclazz, job); // 創建完緩沖區對象后,執行初始化 collector.init(context); log.info( "map output collector class = " + collector.getclass().getname()); return collector; } catch (exception e) { string msg = "unable to initialize mapoutputcollector " + clazz.getname(); if (--remainingcollectors > 0 ) { msg += " (" + remainingcollectors + " more collector(s) to try)" ; } lastexception = e; log.warn(msg, e); } } throw new ioexception( "initialization of all the collectors failed. " + "error in last collector was :" + lastexception.getmessage(), lastexception); } |
3. mapoutputbuffer的初始化 環形緩沖區對象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
@suppresswarnings ( "unchecked" ) public void init(mapoutputcollector.context context ) throws ioexception, classnotfoundexception { job = context.getjobconf(); reporter = context.getreporter(); maptask = context.getmaptask(); mapoutputfile = maptask.getmapoutputfile(); sortphase = maptask.getsortphase(); spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records); // 獲取分區總個數,取決于reducetask的數量 partitions = job.getnumreducetasks(); rfs = ((localfilesystem)filesystem.getlocal(job)).getraw(); //sanity checks // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設置,就是0.8 final float spillper = job.getfloat(jobcontext.map_sort_spill_percent, ( float ) 0.8 ); // 獲取mapreduce.task.io.sort.mb,如果沒設置,就是100mb final int sortmb = job.getint(jobcontext.io_sort_mb, 100 ); indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit, index_cache_memory_limit_default); if (spillper > ( float ) 1.0 || spillper <= ( float ) 0.0 ) { throw new ioexception( "invalid \"" + jobcontext.map_sort_spill_percent + "\": " + spillper); } if ((sortmb & 0x7ff ) != sortmb) { throw new ioexception( "invalid \"" + jobcontext.io_sort_mb + "\": " + sortmb); } // 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引 sorter = reflectionutils.newinstance(job.getclass( "map.sort.class" , quicksort. class , indexedsorter. class ), job); // buffers and accounting int maxmemusage = sortmb << 20 ; maxmemusage -= maxmemusage % metasize; // 存放key-value kvbuffer = new byte [maxmemusage]; bufvoid = kvbuffer.length; // 存儲key-value的屬性信息,分區號,索引等 kvmeta = bytebuffer.wrap(kvbuffer) .order(byteorder.nativeorder()) .asintbuffer(); setequator( 0 ); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxrec = kvmeta.capacity() / nmeta; softlimit = ( int )(kvbuffer.length * spillper); bufferremaining = softlimit; log.info(jobcontext.io_sort_mb + ": " + sortmb); log.info( "soft limit at " + softlimit); log.info( "bufstart = " + bufstart + "; bufvoid = " + bufvoid); log.info( "kvstart = " + kvstart + "; length = " + maxrec); // k/v serialization // 獲取快速排序的key的比較器,排序只按照key進行排序! comparator = job.getoutputkeycomparator(); // 獲取key-value的序列化器 keyclass = ( class <k>)job.getmapoutputkeyclass(); valclass = ( class <v>)job.getmapoutputvalueclass(); serializationfactory = new serializationfactory(job); keyserializer = serializationfactory.getserializer(keyclass); keyserializer.open(bb); valserializer = serializationfactory.getserializer(valclass); valserializer.open(bb); // output counters mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes); mapoutputrecordcounter = reporter.getcounter(taskcounter.map_output_records); fileoutputbytecounter = reporter .getcounter(taskcounter.map_output_materialized_bytes); // 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器 // compression if (job.getcompressmapoutput()) { class <? extends compressioncodec> codecclass = job.getmapoutputcompressorclass(defaultcodec. class ); codec = reflectionutils.newinstance(codecclass, job); } else { codec = null ; } // 獲取combiner組件 // combiner final counters.counter combineinputcounter = reporter.getcounter(taskcounter.combine_input_records); combinerrunner = combinerrunner.create(job, gettaskid(), combineinputcounter, reporter, null ); if (combinerrunner != null ) { final counters.counter combineoutputcounter = reporter.getcounter(taskcounter.combine_output_records); combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job); } else { combinecollector = null ; } spillinprogress = false ; minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3 ); // 設置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程! spillthread.setdaemon( true ); spillthread.setname( "spillthread" ); spilllock.lock(); try { // 啟動線程 spillthread.start(); while (!spillthreadrunning) { spilldone.await(); } } catch (interruptedexception e) { throw new ioexception( "spill thread failed to initialize" , e); } finally { spilllock.unlock(); } if (sortspillexception != null ) { throw new ioexception( "spill thread failed to initialize" , sortspillexception); } } |
4. paritionner的獲取
從配置中讀取mapreduce.job.partitioner.class
,如果沒有指定,采用hashpartitioner.class
如果reducetask > 1, 還沒有設置分區組件,使用hashpartitioner
1
2
3
4
5
6
|
@suppresswarnings ( "unchecked" ) public class <? extends partitioner<?,?>> getpartitionerclass() throws classnotfoundexception { return ( class <? extends partitioner<?,?>>) conf.getclass(partitioner_class_attr, hashpartitioner. class ); } |
1
2
3
4
5
6
7
|
public class hashpartitioner<k, v> extends partitioner<k, v> { /** use {@link object#hashcode()} to partition. **/ public int getpartition(k key, v value, int numreducetasks) { return (key.hashcode() & integer.max_value) % numreducetasks; } } |
分區號的限制:0 <= 分區號 < 總的分區數(reducetask的個數)
1
2
3
4
|
if (partition < 0 || partition >= partitions) { throw new ioexception( "illegal partition for " + key + " (" + partition + ")" ); } |
5.maptask shuffle的流程
①在map()調用context.write()
②調用mapoutputbuffer的collect()
- 調用分區組件partitionner計算當前這組key-value的分區號
③將當前key-value收集到mapoutputbuffer中
- 如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!
④溢寫前,先根據分區號,將相同分區號的key-value,采用快速排序算法,進行排序!
- 排序并不在內存中移動key-value,而是記錄排序后key-value的有序索引!
⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中
- 如果沒有定義combiner,直接溢寫!
- 如果定義了combiner,使用combinerrunner.conbine()對key-value處理后再次溢寫!
⑥多次溢寫后,每次溢寫都會產生一個臨時文件
⑦最后,執行一次flush(),將剩余的key-value進行溢寫
⑧mergeparts: 將多次溢寫的結果,保存為一個總的文件!
- 在合并為一個總的文件前,會執行歸并排序,保證合并后的文件,各個分區也是有序的!
- 如果定義了conbiner,conbiner會再次運行(前提是溢寫的文件個數大于3)!
- 否則,就直接溢寫!
⑨最終保證生成一個最終的文件,這個文件根據總區號,分為若干部分,每個部分的key-value都已經排好序,等待reducetask來拷貝相應分區的數據
6. combiner
combiner其實就是reducer類型:
1
2
|
class <? extends reducer<k,v,k,v>> cls = ( class <? extends reducer<k,v,k,v>>) job.getcombinerclass(); |
combiner的運行時機:
maptask:
- ①每次溢寫前,如果指定了combiner,會運行
- ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行combiner,前提是片段數>=3
reducetask:
③reducetask在運行時,需要啟動shuffle進程拷貝maptask產生的數據!
- 數據在copy后,進入shuffle工作的內存,在內存中進行merge和sort!
- 數據過多,內部不夠,將部分數據溢寫在磁盤!
- 如果有溢寫的過程,那么combiner會再次運行!
①一定會運行,②,③需要條件!
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。如果你想了解更多相關內容請查看下面相關鏈接
原文鏈接:https://blog.csdn.net/qq_43193797/article/details/86097451