1、 概述
傳統(tǒng)的MapReduce框架(見我的博文:傳統(tǒng)MapReduce框架)把一個(gè)作業(yè)的執(zhí)行過程分為兩個(gè)階段:map和reduce,在map階段,每個(gè)map task讀取一個(gè)block,并調(diào)用map()函數(shù)進(jìn)行處理,然后將結(jié)果寫到本地磁盤(注意,不是HDFS)上;在reduce階段,每個(gè)reduce task遠(yuǎn)程的從map task所在節(jié)點(diǎn)上讀取數(shù)據(jù),調(diào)用reduce()函數(shù)進(jìn)行數(shù)據(jù)處理,并將最終結(jié)果寫到HDFS。從以上過程可以看出,map階段和reduce階段的結(jié)果均要寫磁盤,這雖然會(huì)降低系統(tǒng)性能,但可以提高可靠性。正是由于這個(gè)原因,傳統(tǒng)的MapReduce不能顯式地支持迭代編程,如果用戶硬要在傳統(tǒng)MapReduce上運(yùn)行迭代式作業(yè),性能將非常低。為此,不少改進(jìn)型的MapReduce出現(xiàn)了,它們能很好地支持迭代式開發(fā)。本文組織結(jié)構(gòu)如下:下一節(jié)將介紹幾個(gè)常見的迭代式作業(yè)并分析它們的特點(diǎn);第3節(jié)介紹迭代式MapReduce框架需要解決的幾個(gè)難題;第4節(jié)介紹現(xiàn)在比較有名的迭代式MapReduce框架;第5節(jié)介紹迭代式MapReduce框架仍未解決的問題;最后一節(jié)給出了一些迭代式MapReduce框架的資料。
2、 迭代式作業(yè)
在數(shù)據(jù)挖掘,信息檢索等領(lǐng)域,有很多算法需要多次迭代,本節(jié)介紹兩個(gè)常見的作業(yè),一個(gè)是PageRank,另一個(gè)是SSSP(Single Source Shortest Path)。PageRank是一個(gè)非常有名的網(wǎng)頁重要性衡量因素,它是一個(gè)多次迭代的過程,如下圖所示,每次迭代,PageRank由兩個(gè)作業(yè)MR1和MR2完成,這樣迭代多次,直到相鄰的兩次迭代中PR之差小于某一個(gè)閾值。
單源最短路徑問題實(shí)際上也是多次迭代的過程,主要思想是:設(shè)G=(V,E)是一個(gè)帶權(quán)有向圖,R是G的鄰接矩陣。整個(gè)算法始終把圖中頂點(diǎn)集合V分成兩組,第一組為已求出最短路徑的頂點(diǎn)集合(用S表示,初始時(shí)S中只有一個(gè)源點(diǎn),在每次迭代中求得一條最短路徑 , 并將該路徑的另一頂點(diǎn)加入到集合S中,直到全部頂點(diǎn)都加入到S中,算法就結(jié)束了),第二組為其余未確定最短路徑的頂點(diǎn)集合(用U表示)。在每次迭代中,從U中選擇一個(gè)當(dāng)前路徑最短的頂點(diǎn),轉(zhuǎn)存到S中,直到U為空。
更多迭代式作業(yè)以及在Hadoop上的實(shí)現(xiàn)方法,請(qǐng)參見Apache開源項(xiàng)目Mahout 以及它的論壇。
3、 技術(shù)難點(diǎn)
從PageRank和SSSP的整個(gè)計(jì)算過程可以看出:
(1) 輸入數(shù)據(jù)都動(dòng)態(tài)數(shù)據(jù)和靜態(tài)數(shù)據(jù)兩部分組成。對(duì)于PageRank, L屬于靜態(tài)數(shù)據(jù),而R屬于動(dòng)態(tài)數(shù)據(jù);對(duì)于SSSP,R屬于靜態(tài)數(shù)據(jù),S和U屬于動(dòng)態(tài)數(shù)據(jù)。傳輸動(dòng)態(tài)數(shù)據(jù)是不可避免的,而靜態(tài)數(shù)據(jù)可以采用某種策略避免重復(fù)傳輸。怎樣避免傳輸靜態(tài)數(shù)據(jù)?
(2) 每次迭代,如果所有task重復(fù)重新創(chuàng)建,代價(jià)將非常高。怎樣重用task以提高效率(task pool)?
(3) 每次迭代,數(shù)據(jù)怎么樣存儲(chǔ)?如果總是寫磁盤,代價(jià)將非常高。
(4) 何時(shí)迭代終止,怎樣改變編程模型,允許用戶指定合適終止迭代.
4、 迭代式MapReduce框架
現(xiàn)在出現(xiàn)了不少迭代式MapReduce框架,比較有名的是Twister和Haloop(Ha,loop)。下面分別給予介紹。
Twister是由一個(gè)印度人開發(fā)的,其架構(gòu)如下:
在Twister中,大文件不會(huì)自動(dòng)被切割成一個(gè)一個(gè)block,因而用戶需提前把文件分成一個(gè)一個(gè)小文件,以供每個(gè)task處理。在map階段,經(jīng)過map()處理完的結(jié)果被放在分布式內(nèi)存中,然后通過一個(gè)broker network(NaradaBroking系統(tǒng))將數(shù)據(jù)push給各個(gè)reduce task(Twister假設(shè)內(nèi)存足夠大,中間數(shù)據(jù)可以全部放在內(nèi)存中);在reduce階段,所有reduce task產(chǎn)生的結(jié)果通過一個(gè)combine操作進(jìn)行歸并,此時(shí),用戶可以進(jìn)行條件判定, 確定迭代是否結(jié)束。combine后的數(shù)據(jù)直接被送給map task,開始新一輪的迭代。為了提高容錯(cuò)性,Twister每隔一段時(shí)間會(huì)將map task和reduce task產(chǎn)生的結(jié)果寫到磁盤上,這樣,一旦某個(gè)task失敗,它可以從最近的備份中獲取輸入,重新計(jì)算。
為了避免每次迭代重新創(chuàng)建task,Twister維護(hù)了一個(gè)task pool,每次需要task時(shí)直接從pool中取。在Twister中,所有消息和數(shù)據(jù)都是通過broker network傳遞的,該broker network是一個(gè)獨(dú)立的模塊,目前支持NaradaBroking和ActiveMQ。
總體上說,Twister還是一個(gè)研究性項(xiàng)目,它的一些設(shè)計(jì)策略決定了它不太可能在實(shí)際中應(yīng)用,如數(shù)據(jù)全部放在分布式內(nèi)存中;沒有分布式文件系統(tǒng),只提供了一個(gè)tool進(jìn)行文件存取和訪問;計(jì)算模型抽象程度不夠,支持的應(yīng)用類型不夠多。
Haloop是在Hadoop基礎(chǔ)上擴(kuò)展而來的,其架構(gòu)如下:
Haloop進(jìn)行的改進(jìn)有:
(1) 提供了一套新的編程接口,以方便用戶進(jìn)行迭代式程序開發(fā)。
(2) master node(jobtracker)包含一個(gè)循環(huán)控制模塊,它不斷的啟動(dòng)map-reduce計(jì)算知道滿足迭代終止條件。
(3) 設(shè)計(jì)了新的Task scheduler,以便更好的利用data locality特性。
(4) 數(shù)據(jù)在各個(gè)task tracker會(huì)被緩存(cache)和建索引(index)。
下面介紹技術(shù)創(chuàng)新點(diǎn):
(1) Hadoop 將所有迭代式任務(wù)抽象為:
SetFixedPointThreshold:設(shè)置兩次迭代的終止條件,即距離差是否達(dá)到某一個(gè)閾值
setMaxNumOfIterations:設(shè)置迭代次數(shù)
setIterationInput:設(shè)置變化的輸入數(shù)據(jù)
AddInvariantTable:設(shè)置不變的輸入數(shù)據(jù)
(2) Loop-aware 任務(wù)調(diào)度。Haloop在第一次迭代時(shí)會(huì)將不變的輸入數(shù)據(jù)保存到該計(jì)算節(jié)點(diǎn)上,以后每次調(diào)度task,盡量放在固定的那些節(jié)點(diǎn)上(locality)。這樣,每次迭代,不變的數(shù)據(jù)就不必重復(fù)傳輸了。
(3) Cache和Index。Map task的輸入與輸出,Reduce task的輸出都會(huì)被建索引和緩存,以加快數(shù)據(jù)處理速度。其中,緩存是指數(shù)據(jù)被寫到本次磁盤,以供下一輪循環(huán)迭代時(shí)直接使用。
總體上說,Haloop比Twister抽象度更高,支持更多的計(jì)算,同時(shí),由于是在Hadoop基礎(chǔ)上修改上,因而繼承了Hadoop的很多優(yōu)點(diǎn)。
5、 總結(jié)
目前在迭代式MapReduce研究方面,還處于發(fā)展階段。Twister和Haloop的模型抽象度不夠高,支持的計(jì)算有限。
6、 參考資料
(1) Twister主頁:http://www.iterativemapreduce.org/
(2) Twister論文:Twister: A Runtime for Iterative MapReduce
(http://www.iterativemapreduce.org/hpdc-camera-ready-submission.pdf)
(3) Haloop主頁:http://code.google.com/p/haloop/
(4) Haloop論文:HaLoop: Efficient Iterative Data Processing on Large Clusters(http://www.ics.uci.edu/~yingyib/papers/HaLoop_camera_ready.pdf)
聯(lián)系客服