一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - MapTask階段shuffle源碼分析

MapTask階段shuffle源碼分析

2021-07-01 14:0743193797 Java教程

今天小編就為大家分享一篇關于MapTask階段shuffle源碼分析,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧

1. 收集階段

mapper中,調用context.write(key,value)實際是調用代理newoutputcollectorwirte方法

?
1
2
3
4
public void write(keyout key, valueout value
          ) throws ioexception, interruptedexception {
  output.write(key, value);
 }

實際調用的是mapoutputbuffercollect(),在進行收集前,調用partitioner來計算每個key-value的分區號

?
1
2
3
4
5
@override
  public void write(k key, v value) throws ioexception, interruptedexception {
   collector.collect(key, value,
            partitioner.getpartition(key, value, partitions));
  }

2. newoutputcollector對象的創建

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@suppresswarnings("unchecked")
  newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext,
            jobconf job,
            taskumbilicalprotocol umbilical,
            taskreporter reporter
            ) throws ioexception, classnotfoundexception {
  // 創建實際用來收集key-value的緩存區對象
   collector = createsortingcollector(job, reporter);
  // 獲取總的分區個數
   partitions = jobcontext.getnumreducetasks();
   if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>)
     reflectionutils.newinstance(jobcontext.getpartitionerclass(), job);
   } else {
    // 默認情況,直接創建一個匿名內部類,所有的key-value都分配到0號分區
    partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() {
     @override
     public int getpartition(k key, v value, int numpartitions) {
      return partitions - 1;
     }
    };
   }
  }

3. 創建環形緩沖區對象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@suppresswarnings("unchecked")
 private <key, value> mapoutputcollector<key, value>
     createsortingcollector(jobconf job, taskreporter reporter)
  throws ioexception, classnotfoundexception {
  mapoutputcollector.context context =
   new mapoutputcollector.context(this, job, reporter);
  // 從當前job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設置,使用mapoutputbuffer.class
  class<?>[] collectorclasses = job.getclasses(
   jobcontext.map_output_collector_class_attr, mapoutputbuffer.class);
  int remainingcollectors = collectorclasses.length;
  exception lastexception = null;
  for (class clazz : collectorclasses) {
   try {
    if (!mapoutputcollector.class.isassignablefrom(clazz)) {
     throw new ioexception("invalid output collector class: " + clazz.getname() +
      " (does not implement mapoutputcollector)");
    }
    class<? extends mapoutputcollector> subclazz =
     clazz.assubclass(mapoutputcollector.class);
    log.debug("trying map output collector class: " + subclazz.getname());
   // 創建緩沖區對象
    mapoutputcollector<key, value> collector =
     reflectionutils.newinstance(subclazz, job);
   // 創建完緩沖區對象后,執行初始化
    collector.init(context);
    log.info("map output collector class = " + collector.getclass().getname());
    return collector;
   } catch (exception e) {
    string msg = "unable to initialize mapoutputcollector " + clazz.getname();
    if (--remainingcollectors > 0) {
     msg += " (" + remainingcollectors + " more collector(s) to try)";
    }
    lastexception = e;
    log.warn(msg, e);
   }
  }
  throw new ioexception("initialization of all the collectors failed. " +
   "error in last collector was :" + lastexception.getmessage(), lastexception);
 }

3. mapoutputbuffer的初始化   環形緩沖區對象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@suppresswarnings("unchecked")
  public void init(mapoutputcollector.context context
          ) throws ioexception, classnotfoundexception {
   job = context.getjobconf();
   reporter = context.getreporter();
   maptask = context.getmaptask();
   mapoutputfile = maptask.getmapoutputfile();
   sortphase = maptask.getsortphase();
   spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records);
   // 獲取分區總個數,取決于reducetask的數量
   partitions = job.getnumreducetasks();
   rfs = ((localfilesystem)filesystem.getlocal(job)).getraw();
   //sanity checks
   // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設置,就是0.8
   final float spillper =
    job.getfloat(jobcontext.map_sort_spill_percent, (float)0.8);
   // 獲取mapreduce.task.io.sort.mb,如果沒設置,就是100mb
   final int sortmb = job.getint(jobcontext.io_sort_mb, 100);
   indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit,
                     index_cache_memory_limit_default);
   if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new ioexception("invalid \"" + jobcontext.map_sort_spill_percent +
      "\": " + spillper);
   }
   if ((sortmb & 0x7ff) != sortmb) {
    throw new ioexception(
      "invalid \"" + jobcontext.io_sort_mb + "\": " + sortmb);
   }
// 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引
   sorter = reflectionutils.newinstance(job.getclass("map.sort.class",
      quicksort.class, indexedsorter.class), job);
   // buffers and accounting
   int maxmemusage = sortmb << 20;
   maxmemusage -= maxmemusage % metasize;
   // 存放key-value
   kvbuffer = new byte[maxmemusage];
   bufvoid = kvbuffer.length;
  // 存儲key-value的屬性信息,分區號,索引等
   kvmeta = bytebuffer.wrap(kvbuffer)
     .order(byteorder.nativeorder())
     .asintbuffer();
   setequator(0);
   bufstart = bufend = bufindex = equator;
   kvstart = kvend = kvindex;
   maxrec = kvmeta.capacity() / nmeta;
   softlimit = (int)(kvbuffer.length * spillper);
   bufferremaining = softlimit;
   log.info(jobcontext.io_sort_mb + ": " + sortmb);
   log.info("soft limit at " + softlimit);
   log.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
   log.info("kvstart = " + kvstart + "; length = " + maxrec);
   // k/v serialization
    // 獲取快速排序的key的比較器,排序只按照key進行排序!
   comparator = job.getoutputkeycomparator();
  // 獲取key-value的序列化器
   keyclass = (class<k>)job.getmapoutputkeyclass();
   valclass = (class<v>)job.getmapoutputvalueclass();
   serializationfactory = new serializationfactory(job);
   keyserializer = serializationfactory.getserializer(keyclass);
   keyserializer.open(bb);
   valserializer = serializationfactory.getserializer(valclass);
   valserializer.open(bb);
   // output counters
   mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes);
   mapoutputrecordcounter =
    reporter.getcounter(taskcounter.map_output_records);
   fileoutputbytecounter = reporter
     .getcounter(taskcounter.map_output_materialized_bytes);
   // 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器
   // compression
   if (job.getcompressmapoutput()) {
    class<? extends compressioncodec> codecclass =
     job.getmapoutputcompressorclass(defaultcodec.class);
    codec = reflectionutils.newinstance(codecclass, job);
   } else {
    codec = null;
   }
   // 獲取combiner組件
   // combiner
   final counters.counter combineinputcounter =
    reporter.getcounter(taskcounter.combine_input_records);
   combinerrunner = combinerrunner.create(job, gettaskid(),
                       combineinputcounter,
                       reporter, null);
   if (combinerrunner != null) {
    final counters.counter combineoutputcounter =
     reporter.getcounter(taskcounter.combine_output_records);
    combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job);
   } else {
    combinecollector = null;
   }
   spillinprogress = false;
   minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3);
   // 設置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程!
   spillthread.setdaemon(true);
   spillthread.setname("spillthread");
   spilllock.lock();
   try {
   // 啟動線程
    spillthread.start();
    while (!spillthreadrunning) {
     spilldone.await();
    }
   } catch (interruptedexception e) {
    throw new ioexception("spill thread failed to initialize", e);
   } finally {
    spilllock.unlock();
   }
   if (sortspillexception != null) {
    throw new ioexception("spill thread failed to initialize",
      sortspillexception);
   }
  }

4. paritionner的獲取

從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,采用hashpartitioner.class

如果reducetask > 1, 還沒有設置分區組件,使用hashpartitioner

?
1
2
3
4
5
6
@suppresswarnings("unchecked")
 public class<? extends partitioner<?,?>> getpartitionerclass()
   throws classnotfoundexception {
  return (class<? extends partitioner<?,?>>)
   conf.getclass(partitioner_class_attr, hashpartitioner.class);
 }
?
1
2
3
4
5
6
7
public class hashpartitioner<k, v> extends partitioner<k, v> {
 /** use {@link object#hashcode()} to partition. **/
 public int getpartition(k key, v value,
             int numreducetasks) {
  return (key.hashcode() & integer.max_value) % numreducetasks;
 }
}

分區號的限制:0 <= 分區號 < 總的分區數(reducetask的個數)

?
1
2
3
4
if (partition < 0 || partition >= partitions) {
    throw new ioexception("illegal partition for " + key + " (" +
      partition + ")");
   }

5.maptask shuffle的流程

              ①在map()調用context.write()

              ②調用mapoutputbuffer的collect()

  •                             調用分區組件partitionner計算當前這組key-value的分區號

              ③將當前key-value收集到mapoutputbuffer中

  •                             如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!

              ④溢寫前,先根據分區號,將相同分區號的key-value,采用快速排序算法,進行排序!

  •                             排序并不在內存中移動key-value,而是記錄排序后key-value的有序索引!

              ⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中

  •                             如果沒有定義combiner,直接溢寫!
  •                             如果定義了combiner,使用combinerrunner.conbine()對key-value處理后再次溢寫!

              ⑥多次溢寫后,每次溢寫都會產生一個臨時文件

              ⑦最后,執行一次flush(),將剩余的key-value進行溢寫

              ⑧mergeparts: 將多次溢寫的結果,保存為一個總的文件!

  •                      在合并為一個總的文件前,會執行歸并排序,保證合并后的文件,各個分區也是有序的!
  •                      如果定義了conbiner,conbiner會再次運行(前提是溢寫的文件個數大于3)!
  •                      否則,就直接溢寫!

              ⑨最終保證生成一個最終的文件,這個文件根據總區號,分為若干部分,每個部分的key-value都已經排好序,等待reducetask來拷貝相應分區的數據

6. combiner

combiner其實就是reducer類型:

?
1
2
class<? extends reducer<k,v,k,v>> cls =
    (class<? extends reducer<k,v,k,v>>) job.getcombinerclass();

combiner的運行時機:

maptask:

  •               ①每次溢寫前,如果指定了combiner,會運行
  •               ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行combiner,前提是片段數>=3

reducetask:

              ③reducetask在運行時,需要啟動shuffle進程拷貝maptask產生的數據!

  •                      數據在copy后,進入shuffle工作的內存,在內存中進行merge和sort!
  •                      數據過多,內部不夠,將部分數據溢寫在磁盤!
  •                      如果有溢寫的過程,那么combiner會再次運行!

①一定會運行,②,③需要條件!

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。如果你想了解更多相關內容請查看下面相關鏈接

原文鏈接:https://blog.csdn.net/qq_43193797/article/details/86097451

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美精选欧美极品 | 国产欧美国产精品第一区 | 日本在线视频免费观看 | 国产真实偷乱视频在线观看 | 免费看一级毛片 | 摸逼小说| 国产愉拍精品视频手机 | 青青青视频蜜桃一区二区 | 日本护士xxxx爽爽爽 | 欧美成人免费草草影院视频 | fuqer日本老师 | 日韩视频免费看 | 第一次破女视频国产一级 | 亚洲AV无码一区二区三区乱子伦 | 国内永久第一免费福利视频 | 国产精品国产三级在线专区 | 人与善xuanwen在线400 | 青草草产国视频 | 亚洲国产精品一区二区久久 | 日本老师xxxxx18 | 午夜十八岁禁 | zoofilivideo杂交3d| 小草观看免费高清视频 | 成人精品亚洲人成在线 | 8x8x华人免费 | 无码天堂亚洲国产AV久久 | 国产视频一区二区 | 国产视频一区二 | 性生大片免费看 | 国产成人一区二区三区 | 成人午夜毛片 | 俺去俺来也www色官网免费的 | 国产日韩欧美一区 | 欧美一区二区三区在线观看不卡 | 亚洲狠狠综合久久 | 国产精品一区牛牛影视 | 久久免费看少妇高潮A片JA | 日韩亚洲欧美一区二区三区 | 高清视频在线播放ww | 青草国产福利视频免费观看 | 亚洲欧美日韩高清 |