1.啟動spark集群,就是執(zhí)行sbin/start-all.sh,啟動master和多個worker節(jié)點,master主要作為集群的管理和監(jiān)控,worker節(jié)點主要擔(dān)任運行各個application的任務(wù)。master節(jié)點需要讓worker節(jié)點匯報自身狀況,比如cpu,內(nèi)存多大,這個過程都是通過心跳機制來完成的
2.master收到worker的匯報信息之后,會給予worker信息
3.driver提交任務(wù)給spark集群[driver和master之間的通信是通過akkaactor來做的,也就是說master是akkaactor異步通信模型中的一個actor模型,driver也是一樣,driver異步向mater發(fā)送注冊信息(registerapplication)異步注冊信息]
4.master節(jié)點對application預(yù)估,7個g的內(nèi)存完成任務(wù),對任務(wù)進行分配,每一個worker節(jié)點上都分配3.5g的內(nèi)存去執(zhí)行任務(wù),在master就對各個worker上的任務(wù)進行整體的監(jiān)控調(diào)度
5.worker節(jié)點領(lǐng)到任務(wù),開始執(zhí)行,在worker節(jié)點上啟動相應(yīng)的executor進程來執(zhí)行,每個executor中都有一個線程池的概念,里面存有多個task線程
6.executor會從線程池中取出task去計算rddpatition中的數(shù)據(jù),transformation操作,action操作
7.worker節(jié)點向driver節(jié)點匯報計算狀態(tài)
通過本地并行化集合創(chuàng)建rdd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class javalocalsumapp{ public static void main(string[] args){ sparkconf conf = new sparkconf().setappname( "javalocalsumapp" ); javasparkcontext sc = new javasparkcontext(conf); list<integer> list = arrays.aslist( 1 , 3 , 4 , 5 , 6 , 7 , 8 ); //通過本地并行化集合創(chuàng)建rdd javardd <integer> listrdd = sc.parallelize(list); //求和 integer sum = listrdd.reduce( new function2<integer,integer,integer,integer>(){ @override public integer call(integer v1,integer v2) throws exception{ return v1+v2; } } ); system.out.println(sum) } } //java 中的函數(shù)式編程,需要將編譯器設(shè)置成1.8 listrdd.reduce((v1,v2)=> v1+v2) |
sparktransformation和action操作
rdd:彈性分布式數(shù)據(jù)集,是一種集合,支持多種來源,有容錯機制,可以被緩存,支持并行操作,一個rdd代表一個分區(qū)里的數(shù)據(jù)集
rdd有兩種操作算子:
transformation(轉(zhuǎn)化):transformation屬于延遲計算,當(dāng)一個rdd轉(zhuǎn)換成另一個rdd時并沒有立即進行轉(zhuǎn)換,緊緊是記住了數(shù)據(jù)集的邏輯操作
action(執(zhí)行):觸發(fā)spark作業(yè)的運行,真正觸發(fā)轉(zhuǎn)換算子的計算
spark算子的作用
該圖描述的是spark在運行轉(zhuǎn)換中通過算子對rdd進行轉(zhuǎn)換,算子是rdd中定義的函數(shù),可以對rdd中的數(shù)據(jù)進行轉(zhuǎn)換和操作。
輸入:在spark程序運行中,數(shù)據(jù)從外部數(shù)據(jù)空間(如分布式存儲:textfile讀取hdfs等,parallelize方法輸入scala集合或數(shù)據(jù))輸入spark ,數(shù)據(jù)進入spark運行時數(shù)據(jù)空間,轉(zhuǎn)化為spark中的數(shù)據(jù)塊,通過blockmanager進行管理
運行:在spark數(shù)據(jù)輸入形成rdd后便可以通過變換算子,如filter等。對數(shù)據(jù)進行操作并將rdd轉(zhuǎn)換為新的rdd,通過action算子,觸發(fā)spark提交作業(yè),如果數(shù)據(jù)需要復(fù)用,可以通過cache算子,將數(shù)據(jù)緩存到內(nèi)存
輸出:程序運行結(jié)束數(shù)據(jù)會輸出spark運行時空間,存儲到分布式存儲中(如saveastextfile輸出到hdfs),或scala數(shù)據(jù)或集合中(collect輸出到scala集合,count返回scala int 型數(shù)據(jù))
transformation 和 actions操作概況
transformation
map(func):返回一個新的分布式數(shù)據(jù)集,由每個原元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
filter(func) :返回一個新的數(shù)據(jù)集,由經(jīng)過func函數(shù)
flatmap(func):類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數(shù)的返回值是一個seq,而不是單一元素)
sample(withreplacement, frac, seed): 根據(jù)給定的隨機種子seed,隨機抽樣出數(shù)量為frac的數(shù)據(jù)
union(otherdataset): 返回一個新的數(shù)據(jù)集,由原數(shù)據(jù)集和參數(shù)聯(lián)合而成
roupbykey([numtasks]): 在一個由(k,v)對組成的數(shù)據(jù)集上調(diào)用,返回一個(k,seq[v])對的數(shù)據(jù)集。注意:默認(rèn)情況下,使用8個并行任務(wù)進行分組,你可以傳入numtask可選參數(shù),根據(jù)數(shù)據(jù)量設(shè)置不同數(shù)目的task
reducebykey(func, [numtasks]): 在一個(k,v)對的數(shù)據(jù)集上使用,返回一個(k,v)對的數(shù)據(jù)集,key相同的值,都被使用指定的reduce函數(shù)聚合到一起。和groupbykey類似,任務(wù)的個數(shù)是可以通過第二個可選參數(shù)來配置的。
join(otherdataset, [numtasks]): 在類型為(k,v)和(k,w)類型的數(shù)據(jù)集上調(diào)用,返回一個(k,(v,w))對,每個key中的所有元素都在一起的數(shù)據(jù)集
groupwith(otherdataset, [numtasks]): 在類型為(k,v)和(k,w)類型的數(shù)據(jù)集上調(diào)用,返回一個數(shù)據(jù)集,組成元素為(k, seq[v], seq[w]) tuples。這個操作在其它框架,稱為cogroup
cartesian(otherdataset): 笛卡爾積。但在數(shù)據(jù)集t和u上調(diào)用時,返回一個(t,u)對的數(shù)據(jù)集,所有元素交互進行笛卡爾積。
actions操作
reduce(func): 通過函數(shù)func聚集數(shù)據(jù)集中的所有元素。func函數(shù)接受2個參數(shù),返回一個值。這個函數(shù)必須是關(guān)聯(lián)性的,確保可以被正確的并發(fā)執(zhí)行
collect(): 在driver的程序中,以數(shù)組的形式,返回數(shù)據(jù)集的所有元素。這通常會在使用filter或者其它操作后,返回一個足夠小的數(shù)據(jù)子集再使用,直接將整個rdd集collect返回,很可能會讓driver程序oom
count(): 返回數(shù)據(jù)集的元素個數(shù)
take(n): 返回一個數(shù)組,由數(shù)據(jù)集的前n個元素組成。注意,這個操作目前并非在多個節(jié)點上,并行執(zhí)行,而是driver程序所在機器,單機計算所有的元素(gateway的內(nèi)存壓力會增大,需要謹(jǐn)慎使用)
first(): 返回數(shù)據(jù)集的第一個元素(類似于take(1))
saveastextfile(path): 將數(shù)據(jù)集的元素,以textfile的形式,保存到本地文件系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。spark將會調(diào)用每個元素的tostring方法,并將它轉(zhuǎn)換為文件中的一行文本
saveassequencefile(path): 將數(shù)據(jù)集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統(tǒng),hdfs或者任何其它hadoop支持的文件系統(tǒng)。rdd的元素必須由key-value對組成,并都實現(xiàn)了hadoop的writable接口,或隱式可以轉(zhuǎn)換為writable(spark包括了基本類型的轉(zhuǎn)換,例如int,double,string等等)
foreach(func): 在數(shù)據(jù)集的每一個元素上,運行函數(shù)func。這通常用于更新一個累加器變量,或者和外部存儲系統(tǒng)做交互
wordcount執(zhí)行過程
總結(jié)
以上就是本文關(guān)于spark 調(diào)度架構(gòu)原理詳解的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
原文鏈接:http://blog.csdn.net/qq_16103331/article/details/53363858