一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

云服務器|WEB服務器|FTP服務器|郵件服務器|虛擬主機|服務器安全|DNS服務器|服務器知識|Nginx|IIS|Tomcat|

服務器之家 - 服務器技術 - 服務器知識 - Hadoop streaming詳細介紹

Hadoop streaming詳細介紹

2022-01-04 17:02Hadoop streaming 服務器知識

這篇文章主要介紹了Hadoop streaming詳細介紹的相關資料,需要的朋友可以參考下

Hadoop streaming

Hadoop為MapReduce提供了不同的API,可以方便我們使用不同的編程語言來使用MapReduce框架,而不是只局限于Java。這里要介紹的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作為我們mapreduce程序和MapReduce框架之間的接口。所以你可以用任何語言來編寫MapReduce程序,只要該語言可以往standard input/output上進行讀寫。

streamming是天然適用于文字處理的(text processing),當然,也僅適用純文本的處理,對于需要對象和序列化的場景,hadoop streaming無能為力。它力圖使我們能夠快捷的通過各種腳本語言,快速的處理大量的文本文件。以下是steaming的一些特點:

  1. Map函數的輸入是通過stand input一行一行的接收數據的。(不像Java API,通過InputFormat類做預處理,使得Map函數的輸入是有Key和value的)
  2. Map函數的output則必須限定為key-value pair,key和value之間用\t分開。(MapReduce框架在處理intermediate的Map輸出時,必須做sort和partition,即shuffle)
  3. Reduce函數的input是Map函數的output也是key-value pair,key和value之間用\t分開。

常用的Streaming編程語言:

  1. bash shell
  2. ruby
  3. python

Ruby

下面是一個Ruby編寫的MapReduce程序的示例:

map

max_temperature_map.rb:

?
1
2
3
4
5
6
7
ruby
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
  • 從標準輸入讀入一行data。
  • 處理數據之后,生成一個鍵值對,用\t分隔,輸出到標準輸出

reduce

max_temperature_reduce.rb:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
ruby
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
  1. 從標準輸入讀入一行數據
  2. 數據是用\t分隔的鍵值對
  3. 數據是被MapReduce根據key排序之后順序一行一行讀入
  4. reduce函數對數據進行處理,并輸出,輸出仍是用\t分隔的鍵值對

運行

?
1
2
3
4
5
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
  1. hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar指明了使用hadoop streaming
  2. hadoop-*-streaming.jar會將input里的文件,一行一行的輸出到標準輸出。
  3. 用-mapper指定Map函數。類似于通過管道將數據傳給rb文件: data|ch02/src/main/ruby/max_temperature_map.rb
  4. -reducer指定Reduce函數。

Python

Map

?
1
2
3
4
5
6
7
8
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

Reduce

?
1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)

運行

?
1
2
3
4
5
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py

Bash shell

Map

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz

運行

?
1
2
3
4
5
6
7
8
9
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
  1. 這里的-D mapred.reduce.tasks=0將reduce task觀掉,因此也不需要設置-reducer
  2. 只使用Mapper,可以通過MapReduce幫助我們并行的完成一些平時只能串行的shell腳本
  3. 注意這里的-file,在集群模式下,需要并行運行時,需要-file把文件傳輸到其他節點

Combiner

在streaming模式下,仍然可以運行Combiner,兩種方法:

  1. 通過Java編寫一個combiner的函數,并使用-combiner option
  2. 以命令行的管道模式完成combiner的任務

這里具體解釋第二種方法:

?
1
2
3
4
5
6
7
8
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb

注意看-mapper這一行,通關管道的方式,把mapper的臨時輸出文件(intermediate file,Map完成后的臨時文件)作為輸入,送到sort進行排序,然后送到reduce腳本,來完成類似于combiner的工作。這時候的輸出才真正的作為shuffle的輸入,被分組并在網絡上發送到Reduce

感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 法国老妇性xx在线播放 | 成人观看免费大片在线观看 | 国产不卡视频一区二区在线观看 | 视频一区二区在线 | 亚洲人尿尿| 婚前试爱免费看 | 奇米网在线 | 99久久久久久久 | 男人在线网址 | 国产精品麻豆99久久 | 国产区香蕉精品系列在线观看不卡 | 亚洲日本中文字幕天天更新 | 草草视频人人爽 | 国产精品怡红院在线观看 | oneday高清在线观看 | 亚洲成人影院在线观看 | 成人国产精品视频 | 三级无删减高清在线影院 | 99爱免费视频 | 99久久精品免费看国产一区二区 | 色婷婷久久综合中文久久一本` | 亚洲网站在线 | 手机在线免费观看日本推理片 | 成人免费视频大全 | free性日本| 国产精品青青在线观看香蕉 | 日韩欧美亚洲每日更新网 | 我的绝色岳每雯雯 | 为什么丈夫插我我却喜欢被打着插 | 日韩欧美在线观看综合网另类 | 微拍秒拍99福利精品小视频 | 日韩欧美高清 | 男人把大ji巴放进男人免费视频 | 白丝超短裙被输出娇喘不停小说 | 亚洲高清视频在线 | 日本高清在线看免费观看 | 午夜久久免影院欧洲 | 精品国产午夜久久久久九九 | 欧美日韩精品一区二区三区高清视频 | 久久精品国产免费播高清无卡 | 免费看国产一级特黄aa大片 |