Author :岑文初
Email: wenchu.cenwc@alibaba-inc.com
blog: http://blog.csdn.net/cenwenchu79/
Hadoop基本流程以及簡(jiǎn)單應(yīng)用的開發(fā)
在SIP項(xiàng)目設(shè)計(jì)的過程中,對(duì)于它龐大的日志在早先就考慮使用任務(wù)分解的多線程處理模式來分析統(tǒng)計(jì),在前面有一篇Blog中提到了那部分的設(shè)計(jì),但是由于統(tǒng)計(jì)的內(nèi)容暫時(shí)還是十分簡(jiǎn)單,所以就采用Memcache作為計(jì)數(shù)器結(jié)合Mysql完成了訪問控制以及統(tǒng)計(jì)的工作。但未來,對(duì)于海量日志分析的工作,還是需要有所準(zhǔn)備?,F(xiàn)在最火的技術(shù)詞匯莫過于“云計(jì)算”,在Open API日益盛行的今天,互聯(lián)網(wǎng)應(yīng)用的數(shù)據(jù)將會(huì)越來越有價(jià)值,如何去分析這些數(shù)據(jù),挖掘其內(nèi)在價(jià)值,就需要分布式計(jì)算來支撐起海量數(shù)據(jù)的分析工作。
回過頭來看,早先那種多線程,多任務(wù)分解的日志分析設(shè)計(jì),其實(shí)是分布式計(jì)算的一個(gè)單機(jī)版縮略,如何將這種單機(jī)的工作分拆,變成集群工作協(xié)同,其實(shí)就是分布式計(jì)算框架設(shè)計(jì)所涉及的。在去年參加BEA的大會(huì)時(shí)候,BEA和VMWare合作采用虛擬機(jī)來構(gòu)建集群,無非就是希望使得計(jì)算機(jī)硬件能夠類似于應(yīng)用程序中的資源池中的資源,使用者無需關(guān)心資源的分配情況,最大化了硬件資源的使用價(jià)值。分布式計(jì)算也是如此,具體的計(jì)算任務(wù)交由哪一臺(tái)機(jī)器執(zhí)行,執(zhí)行后由誰來匯總,這都由分布式框架的Master來抉擇,而使用者只需簡(jiǎn)單的將待分析內(nèi)容的提供給分布式計(jì)算系統(tǒng)作為輸入,就可以得到分布式計(jì)算后的結(jié)果。 Hadoop是Apache開源組織的一個(gè)分布式計(jì)算開源框架,在很多大型網(wǎng)站上都已經(jīng)得到了應(yīng)用,亞馬遜,Facebook,Yahoo等等。對(duì)于我來說,最近的一個(gè)使用點(diǎn)就是服務(wù)集成平臺(tái)的日志分析,服務(wù)集成平臺(tái)的日志量將會(huì)很大,這也正好符合了分布式計(jì)算的適用場(chǎng)景(日志分析,索引建立就是兩大應(yīng)用場(chǎng)景)。
當(dāng)前沒有正式確定使用,所以也是自己業(yè)余摸索,后續(xù)所寫的相關(guān)內(nèi)容,都是一個(gè)新手的學(xué)習(xí)過程,難免會(huì)有一些錯(cuò)誤,只是希望記錄下來可以分享給更多志同道合的朋友。
搞什么東西之前,第一步是要知道What,然后是Why,最后才是How,但很多開發(fā)的朋友在做了多年項(xiàng)目以后,都習(xí)慣是先How,然后What,最后才是Why,這樣只會(huì)變得浮躁,同時(shí)往往會(huì)將技術(shù)誤用不適合的場(chǎng)景。
Hadoop框架中最核心設(shè)計(jì)就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡(jiǎn)單的一句話解釋MapReduce就是任務(wù)的分解與結(jié)果的匯總。HDFS是Hadoop分布式文件系統(tǒng)的縮寫,為分布式計(jì)算存儲(chǔ)提供了底層支持。
MapReduce從它名字上來看就大致可以看出個(gè)緣由,兩個(gè)動(dòng)詞Map,Reduce,Map(展開)就是將一個(gè)任務(wù)分解成為多個(gè)任務(wù),Reduce就是將分解后多任務(wù)處理的結(jié)果匯總起來,得出最后的分析結(jié)果。這不是什么新思想,其實(shí)在前面提到了多線程,多任務(wù)的設(shè)計(jì)就可以找到這種思想的影子。不論是現(xiàn)實(shí)社會(huì),還是在程序設(shè)計(jì)中,一項(xiàng)工作往往可以被拆分成為多個(gè)任務(wù),任務(wù)之間的關(guān)系可以分為兩種:一種是不相關(guān)的任務(wù),可以并行執(zhí)行;另一種是任務(wù)之間有相互的依賴,先后順序不能夠顛倒,這類任務(wù)是無法并行處理的?;氐竭^去,大學(xué)老師上課時(shí)讓大家去分析關(guān)鍵路徑,無非就是找最省時(shí)的任務(wù)分解執(zhí)行方式。在分布式系統(tǒng)中,機(jī)器集群就可以看作硬件資源池,將并行的任務(wù)拆分交由每一個(gè)空閑機(jī)器資源去處理,能夠極大地提高計(jì)算效率,同時(shí)這種資源無關(guān)性,對(duì)于計(jì)算集群的擴(kuò)展無疑提供了最好的設(shè)計(jì)保證。(其實(shí)我一直認(rèn)為Hadoop的卡通圖標(biāo)不應(yīng)該是一個(gè)小象,應(yīng)該是螞蟻,分布式計(jì)算就好比螞蟻吃大象,廉價(jià)的機(jī)器群可以匹敵任何高性能的計(jì)算機(jī),縱向擴(kuò)展的曲線始終敵不過橫向擴(kuò)展的斜線)。任務(wù)分解處理以后,那就需要將處理以后的結(jié)果在匯總起來,這就是Reduce要做的工作。
上圖就是MapReduce大致的結(jié)構(gòu)圖,在Map前還可能會(huì)對(duì)輸入的數(shù)據(jù)有split的過程,保證任務(wù)并行效率,在Map之后還會(huì)有shuffle的過程,對(duì)于提高Reduce的效率以及減小數(shù)據(jù)傳輸?shù)膲毫τ泻艽蟮膸椭:竺鏁?huì)具體提及這些部分的細(xì)節(jié)。
HDFS是分布式計(jì)算的存儲(chǔ)基石,Hadoop的分布式文件系統(tǒng)和其他分布式文件系統(tǒng)有很多類似的特質(zhì)。
分布式文件系統(tǒng)基本的幾個(gè)特點(diǎn):
1. 對(duì)于整個(gè)集群有單一的命名空間。
2. 數(shù)據(jù)一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創(chuàng)建之前是無法看到文件存在。
3. 文件會(huì)被分割成多個(gè)文件塊,每個(gè)文件塊被分配存儲(chǔ)到數(shù)據(jù)節(jié)點(diǎn)上,而且根據(jù)配置會(huì)有復(fù)制文件塊來保證數(shù)據(jù)的安全性。
圖 2 HDFS
上圖中展現(xiàn)了整個(gè)HDFS三個(gè)重要角色:NameNode,DataNode,Client。
NameNode可以看作是分布式文件系統(tǒng)中的管理者,主要負(fù)責(zé)管理文件系統(tǒng)的命名空間,集群配置信息,存儲(chǔ)塊的復(fù)制。NameNode會(huì)存儲(chǔ)文件系統(tǒng)的Meta-data在內(nèi)存中,這些信息主要包括了文件信息,每一個(gè)文件對(duì)應(yīng)的文件塊的信息,每一個(gè)文件塊在DataNode的信息。
DataNode是文件存儲(chǔ)的基本單元。它存儲(chǔ)Block在本地文件系統(tǒng)中,保存了Block的Meta-data,同時(shí)周期性的發(fā)送所有存在的block的報(bào)告給NameNode。
Client就是需要獲取分布式文件系統(tǒng)文件的應(yīng)用程序。
這里通過三個(gè)操作來說明他們之間的交互關(guān)系。
文件寫入:
1. Client向NameNode發(fā)起文件寫入的請(qǐng)求。
2. NameNode根據(jù)文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。
3. Client將文件劃分為多個(gè)Block,根據(jù)DataNode的地址信息,按順序?qū)懭氲矫恳粋€(gè)DataNode塊中。
文件讀?。?/font>
1. Client向NameNode發(fā)起文件讀取的請(qǐng)求。
2. NameNode返回文件存儲(chǔ)的DataNode的信息。
3. Client讀取文件信息。
文件Block復(fù)制:
1. NameNode發(fā)現(xiàn)部分文件的block不符合最小復(fù)制數(shù)或者部分DataNode失效。
2. 通知DataNode相互復(fù)制Block。
3. DataNode開始直接相互復(fù)制。
最后在說一下HDFS的幾個(gè)設(shè)計(jì)特點(diǎn):(對(duì)于框架設(shè)計(jì)值得借鑒)
1. Block的放置
默認(rèn)不配置,一個(gè)Block會(huì)有三份備份。一份放在NameNode指定的DataNode,另一份放在與指定DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了數(shù)據(jù)安全,考慮同一Rack的失敗情況以及不同Rack之間數(shù)據(jù)拷貝性能問題就采用這種配置方式。
2. 心跳檢測(cè)DataNode的健康狀況,如果發(fā)現(xiàn)問題就采取數(shù)據(jù)備份的方式來保證數(shù)據(jù)的安全性。
3. 數(shù)據(jù)復(fù)制。(DataNode失敗的時(shí)候,需要平衡DataNode的存儲(chǔ)利用率的時(shí)候,需要平衡DataNode數(shù)據(jù)交互壓力的時(shí)候)
這里先說一下,使用HDFS的balancer命令,可以配置一個(gè)Threshold來平衡每一個(gè)DataNode磁盤利用率。例如設(shè)置了Threshold為10%,那么執(zhí)行balancer命令的時(shí)候,首先統(tǒng)計(jì)所有DataNode的磁盤利用率的均值,然后判斷如果某一個(gè)DataNode的磁盤利用率超過這個(gè)均值Threshold以上,那么將會(huì)把這個(gè)DataNode的block轉(zhuǎn)移到磁盤利用率低的DataNode,這對(duì)于新節(jié)點(diǎn)的加入來說十分有用。
4. 數(shù)據(jù)交驗(yàn)。采用CRC32作數(shù)據(jù)交驗(yàn)。在文件Block寫入的時(shí)候除了寫入數(shù)據(jù)還會(huì)寫入交驗(yàn)信息,在讀取的時(shí)候需要交驗(yàn)后再讀入。
5. NameNode是單點(diǎn)。如果失敗的話,任務(wù)處理信息將會(huì)紀(jì)錄在本地文件系統(tǒng)和遠(yuǎn)端的文件系統(tǒng)中。
6. 數(shù)據(jù)管道性的寫入。
當(dāng)客戶端要寫入文件到DataNode上,首先客戶端讀取一個(gè)Block然后寫到第一個(gè)DataNode上,然后由第一個(gè)DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個(gè)Block的NataNode都成功寫入,客戶端才會(huì)繼續(xù)開始寫下一個(gè)Block。
7. 安全模式。
在分布式文件系統(tǒng)啟動(dòng)的時(shí)候,開始的時(shí)候會(huì)有安全模式,當(dāng)分布式文件系統(tǒng)處于安全模式的情況下,文件系統(tǒng)中的內(nèi)容不允許修改也不允許刪除,直到安全模式結(jié)束。安全模式主要是為了系統(tǒng)啟動(dòng)的時(shí)候檢查各個(gè)DataNode上數(shù)據(jù)塊的有效性,同時(shí)根據(jù)策略必要的復(fù)制或者刪除部分?jǐn)?shù)據(jù)塊。運(yùn)行期通過命令也可以進(jìn)入安全模式。在實(shí)踐過程中,系統(tǒng)啟動(dòng)的時(shí)候去修改和刪除文件也會(huì)有安全模式不允許修改的出錯(cuò)提示,只需要等待一會(huì)兒即可。
綜合MapReduce和HDFS來看Hadoop的結(jié)構(gòu):
在Hadoop的系統(tǒng)中,會(huì)有一臺(tái)Master,主要負(fù)責(zé)NameNode的工作以及JobTracker的工作。JobTracker是的主要職責(zé)就是啟動(dòng),跟蹤,調(diào)度各個(gè)Slave的任務(wù)執(zhí)行。還會(huì)有多臺(tái)Slave,每一臺(tái)Slave通常具有DataNode的功能以及TaskTracker的工作。TaskTracker根據(jù)應(yīng)用要求來結(jié)合本地?cái)?shù)據(jù)執(zhí)行Map任務(wù)以及Reduce任務(wù)。
說到這里,就要提到分布式計(jì)算的最重要的一個(gè)設(shè)計(jì)點(diǎn):Moving Computation is Cheaper than Moving Data。就是在分布式處理中,移動(dòng)數(shù)據(jù)的代價(jià)總是高于轉(zhuǎn)移計(jì)算的代價(jià)。簡(jiǎn)單來說就是分而治之的工作,需要將數(shù)據(jù)也分而存儲(chǔ),本地任務(wù)處理本地?cái)?shù)據(jù)然后歸總,這樣才會(huì)保證分布式計(jì)算的高效性。
說完了What,簡(jiǎn)單的說一下Why。官方網(wǎng)站已經(jīng)給了很多的說明,這里就大致說一下其優(yōu)點(diǎn)及使用的場(chǎng)景(沒有不好的工具,只用不適用的工具,因此選擇好場(chǎng)景才能夠真正發(fā)揮分布式計(jì)算的作用)
1. 可擴(kuò)展。不論是存儲(chǔ)的可擴(kuò)展還是計(jì)算的可擴(kuò)展都是Hadoop的設(shè)計(jì)根本。
2. 經(jīng)濟(jì)??蚣芸梢赃\(yùn)行在任何普通的PC上。
3. 可靠。分布式文件系統(tǒng)的備份恢復(fù)機(jī)制以及MapReduce的任務(wù)監(jiān)控保證了分布式處理的可靠性。
4. 高效。分布式文件系統(tǒng)的高效數(shù)據(jù)交互實(shí)現(xiàn)以及MapReduce結(jié)合Local Data處理的模式,為高效處理海量的信息作了基礎(chǔ)準(zhǔn)備。
使用場(chǎng)景:個(gè)人覺得最適合的就是海量數(shù)據(jù)的分析,其實(shí)Google最早提出MapReduce也就是為了海量數(shù)據(jù)分析。同時(shí)HDFS最早是為了搜索引擎實(shí)現(xiàn)而開發(fā)的,后來才被用于分布式計(jì)算框架中。
海量數(shù)據(jù)被分割于多個(gè)節(jié)點(diǎn),然后由每一個(gè)節(jié)點(diǎn)并行計(jì)算,將得出結(jié)果歸并到輸出。同時(shí)第一階段的輸出又可以作為下一階段計(jì)算的輸入,因此可以想象到一個(gè)樹狀結(jié)構(gòu)的分布式計(jì)算圖,在不同階段都有不同產(chǎn)出,同時(shí)并行和串行結(jié)合的計(jì)算也可以很好的在分布式集群的資源下得以高效的處理。
其實(shí)參看Hadoop官方文檔已經(jīng)能夠很容易配置分布式框架運(yùn)行環(huán)境了,不過這里既然寫了就再多寫一點(diǎn),同時(shí)有一些細(xì)節(jié)需要注意的也說一下,其實(shí)也就是這些細(xì)節(jié)會(huì)讓人摸索半天。
Hadoop可以單機(jī)跑,也可以配置集群跑,單機(jī)跑就不需要多說了,只需要按照Demo的運(yùn)行說明直接執(zhí)行命令即可。這里主要重點(diǎn)說一下集群配置運(yùn)行的過程。
7臺(tái)普通的機(jī)器,操作系統(tǒng)都是linux。內(nèi)存和CPU就不說了,反正Hadoop一大特點(diǎn)就是機(jī)器在多不在精。JDK必須是1.5以上的,這個(gè)切記。7臺(tái)機(jī)器的機(jī)器名務(wù)必不同,后續(xù)會(huì)談到機(jī)器名對(duì)于MapReduce有很大的影響。
正如上面我描述的,對(duì)于Hadoop的集群來說,可以分成兩大類角色,Master和Slave,前者主要配置NameNode和JobTracker的角色,負(fù)責(zé)總管分布式數(shù)據(jù)和分解任務(wù)的執(zhí)行,后者配置DataNode和TaskTracker的角色,負(fù)責(zé)分布式數(shù)據(jù)存儲(chǔ)以及任務(wù)的執(zhí)行。本來打算一臺(tái)機(jī)器是否可以配置成為Master同時(shí)也是Slave,不過發(fā)現(xiàn)在NameNode初始化的過程中以及TaskTracker執(zhí)行過程中機(jī)器名配置好像有沖突(NameNode和TaskTracker對(duì)于Hosts的配置有些沖突,究竟是把機(jī)器名對(duì)應(yīng)IP放在配置前面還是把Localhost對(duì)應(yīng)IP放在前面有點(diǎn)問題,不過可能也是我自己的問題吧,這個(gè)大家可以根據(jù)實(shí)施情況給我反饋)。最后反正決定一臺(tái)Master,六臺(tái)Slave,后續(xù)復(fù)雜的應(yīng)用開發(fā)和測(cè)試結(jié)果的比對(duì)會(huì)增加機(jī)器配置。
1. 在所有的機(jī)器上都建立相同的目錄,也可以就建立相同的用戶,以該用戶的home路徑來做hadoop的安裝路徑。例如我在所有的機(jī)器上都建立了/home/wenchu。
2. 下載Hadoop,先解壓到Master上。這里我是下載的
3. 解壓后進(jìn)入conf目錄,主要需要修改以下文件:hadoop-env.sh,hadoop-site.xml,masters,slaves。
Hadoop的基礎(chǔ)配置文件是hadoop-default.xml,看Hadoop的代碼可以知道,默認(rèn)建立一個(gè)Job的時(shí)候會(huì)建立Job的Config,Config首先讀入hadoop-default.xml的配置,然后再讀入hadoop-site.xml的配置(這個(gè)文件初始的時(shí)候配置為空),hadoop-site.xml中主要配置你需要覆蓋的hadoop-default.xml的系統(tǒng)級(jí)配置,以及你需要在你的MapReduce過程中使用的自定義配置(具體的一些使用例如final等參考文檔)。
以下是一個(gè)簡(jiǎn)單的hadoop-site.xml的配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>//你的namenode的配置,機(jī)器名加端口
<value>hdfs://10.2.224.46:54310/</value>
</property>
<property>
<name>mapred.job.tracker</name>//你的JobTracker的配置,機(jī)器名加端口
<value>hdfs://10.2.224.46:54311/</value>
</property>
<property>
<name>dfs.replication</name>//數(shù)據(jù)需要備份的數(shù)量,默認(rèn)是三
<value>1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>//Hadoop的默認(rèn)臨時(shí)路徑,這個(gè)最好配置,然后在新增節(jié)點(diǎn)或者其他情況下莫名其妙的DataNode啟動(dòng)不了,就刪除此文件中的tmp目錄即可。不過如果刪除了NameNode機(jī)器的此目錄,那么就需要重新執(zhí)行NameNode格式化的命令了。
<value>/home/wenchu/hadoop/tmp/</value>
</property>
<property>
<name>mapred.child.java.opts</name>//java虛擬機(jī)的一些參數(shù)可以參照配置
<value>-Xmx
</property>
<property>
<name>dfs.block.size</name>//block的大小,單位字節(jié),后面會(huì)提到用處,必須是512的倍數(shù),因?yàn)椴捎?/span>crc作文件完整性交驗(yàn),默認(rèn)配置512是checksum的最小單元。
<value>5120000</value>
<description>The default block size for new files.</description>
</property>
</configuration>
hadoop-env.sh文件只需要修改一個(gè)參數(shù):
# The java implementation to use. Required.
export JAVA_HOME=/usr/ali/jdk
配置你的Java路徑,記住一定要1.5版本以上,免得莫名其妙出現(xiàn)問題。
Masters中配置Masters的ip或者機(jī)器名,如果是機(jī)器名那么需要在/etc/hosts中有所設(shè)置。
Slaves中配置的是Slaves的ip或者機(jī)器名,同樣如果是機(jī)器名需要在/etc/hosts中有所設(shè)置。
范例如下:我這里配置的都是ip.
Masters:
10.2.224.46
Slaves:
10.2.226.40
10.2.226.39
10.2.226.38
10.2.226.37
10.2.226.41
10.2.224.36
4. 建立Master到每一臺(tái)Slave的ssh受信證書。由于Master將會(huì)通過SSH啟動(dòng)所有的Slave的Hadoop,所以需要建立單向或者雙向證書保證命令執(zhí)行時(shí)不需要再輸入密碼。Master和所有的Slave機(jī)器上執(zhí)行:ssh-keygen -t rsa。執(zhí)行此命令的時(shí)候,看到提示只需要回車。然后就會(huì)在/root/.ssh/下面產(chǎn)生id_rsa.pub的證書文件,通過scp將Master機(jī)器上的這個(gè)文件拷貝到Slave上(記得修改名稱),例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub,然后執(zhí)行cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_keys,建立authorized_keys文件即可,可以打開這個(gè)文件看看,也就是rsa的公鑰作為key,user@IP作為value。此時(shí)可以試驗(yàn)一下,從master ssh到slave已經(jīng)不需要密碼了。由slave反向建立也是同樣,為什么要反向呢,其實(shí)如果一直都是Master啟動(dòng)和關(guān)閉的話那么沒有必要建立反向,只是如果想在Slave也可以關(guān)閉Hadoop就需要建立反向。
5. 將Master上的Hadoop通過scp拷貝到每一個(gè)Slave相同的目錄下,根據(jù)每一個(gè)Slave的Java_HOME的不同修改其hadoop-env.sh。
6. 修改Master上/etc/profile:
新增以下內(nèi)容:具體的內(nèi)容根據(jù)你的安裝路徑修改,這步只是為了方便使用
export HADOOP_HOME=/home/wenchu/hadoop-
export PATH=$PATH:$HADOOP_HOME/bin
修改完執(zhí)行 source /etc/profile來使得其生效。
7. 在Master上執(zhí)行Hadoop namenode –format,這是第一需要做的初始化,可以看作格式化吧,以后除了在上面我提到過刪除了Master上的hadoop.tmp.dir目錄,否則是不需要再次執(zhí)行的。
8. 然后執(zhí)行Master上的start-all.sh,這個(gè)命令可以直接執(zhí)行,因?yàn)樵?/font>6已經(jīng)添加到了path路徑了,這個(gè)命令是啟動(dòng)hdfs和mapreduce兩部分,當(dāng)然你也可以分開單獨(dú)啟動(dòng)hdfs和mapreduce,分別是bin目錄下的start-dfs.sh和start-mapred.sh。
9. 檢查Master的logs目錄看看Namenode日志以及JobTracker日志是否正常啟動(dòng)。
10. 檢查Slave的logs目錄看看Datanode日志以及TaskTracker日志是否正常。
11. 如果需要關(guān)閉,那么就直接執(zhí)行stop-all.sh即可。
以上步驟就可以啟動(dòng)Hadoop的分布式環(huán)境,然后在Master的機(jī)器進(jìn)入Master的安裝目錄,執(zhí)行hadoop jar hadoop-
hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。其中/home/wenchu/test-in是本地路徑,test-in是將會(huì)建立在HDFS中的路徑,執(zhí)行完畢以后可以通過hadoop dfs –ls可以看到test-in目錄已經(jīng)存在,同時(shí)可以通過hadoop dfs –ls test-in看來里面的內(nèi)容。輸出路徑要求是在HDFS中不存在的,當(dāng)執(zhí)行完那個(gè)demo以后,就可以通過hadoop dfs –ls 輸出路徑看到其中的內(nèi)容,具體文件的內(nèi)容可以通過hadoop dfs –cat 文件名稱來查看。
注意事項(xiàng):這部分是我在使用過程中花了一些時(shí)間走的彎路
1. Master和Slave上的幾個(gè)conf配置文件不需要全部同步,如果確定都是通過Master去啟動(dòng)和關(guān)閉,那么Slave機(jī)器上的配置不需要去維護(hù)。但如果希望在任意一臺(tái)機(jī)器都可以啟動(dòng)和關(guān)閉Hadoop,那么就需要全部保持一致了。
2. Master和Slave機(jī)器上的/etc/hosts中必須把集群中機(jī)器都配置上去,就算在各個(gè)配置文件中使用的是ip。這個(gè)吃過不少苦頭,原來以為如果配成ip就不需要去配置host,結(jié)果發(fā)現(xiàn)在執(zhí)行Reduce的時(shí)候總是卡住,在拷貝的時(shí)候就無法繼續(xù)下去,不斷重試。另外如果集群中如果有兩臺(tái)機(jī)器的機(jī)器名如果重復(fù)也會(huì)出現(xiàn)問題。
3. 如果在新增了節(jié)點(diǎn)或者刪除節(jié)點(diǎn)的時(shí)候出現(xiàn)了問題,首先就去刪除Slave的hadoop.tmp.dir,然后重新啟動(dòng)試試看,如果還是不行那就干脆把Master的hadoop.tmp.dir刪除(意味著dfs上的數(shù)據(jù)也會(huì)丟失),如果刪除了Master的hadoop.tmp.dir那么就需要重新namenode –format了。
4. Map任務(wù)個(gè)數(shù)以及Reduce任務(wù)個(gè)數(shù)配置。前面分布式文件系統(tǒng)設(shè)計(jì)提到一個(gè)文件被放入到分布式文件系統(tǒng)中,會(huì)被分割成多個(gè)block放置到每一個(gè)的DataNode上,默認(rèn)dfs.block.size應(yīng)該是
總的來說出了問題或者啟動(dòng)的時(shí)候最好去看看日志,這樣心里有底。
這部分內(nèi)容其實(shí)可以通過命令的Help以及介紹了解,我主要側(cè)重于介紹一下我用的比較多的幾個(gè)命令。
Hadoop dfs 這個(gè)命令后面加參數(shù)就是對(duì)于HDFS的操作,和linux操作系統(tǒng)的命令很類似,例如:
Hadoop dfs –ls 就是查看/usr/root目錄下的內(nèi)容,默認(rèn)如果不填路徑這就是當(dāng)前用戶路徑
Hadoop dfs –rmr xxx就是刪除目錄,還有很多命令看看就很容易上手
Hadoop dfsadmin –report 這個(gè)命令可以全局的查看DataNode的情況。
Hadoop job 后面增加參數(shù)是對(duì)于當(dāng)前運(yùn)行的Job的操作,例如list,kill等
Hadoop balancer就是前面提到的均衡磁盤負(fù)載的命令。
其他就不詳細(xì)介紹了。
1. 分布式環(huán)境中客戶端創(chuàng)建任務(wù)并提交。
2. InputFormat做Map前的預(yù)處理,主要負(fù)責(zé)以下工作:
a) 驗(yàn)證輸入的格式是否符合JobConfig的輸入定義,這個(gè)在實(shí)現(xiàn)Map和構(gòu)建Conf的時(shí)候就會(huì)知道,不定義可以是Writable的任意子類。
b) 將input的文件split為邏輯上的輸入InputSplit,其實(shí)這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會(huì)被劃分為多個(gè)block。
c) 通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來切分還需要RecordReader來實(shí)現(xiàn),例如最簡(jiǎn)單的默認(rèn)方式就是回車換行的切分)
3. RecordReader處理后的結(jié)果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key,value對(duì)到臨時(shí)中間文件。
4. Combiner可選擇配置,主要作用是在每一個(gè)Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過程中的數(shù)據(jù)傳輸量。
5. Partitioner可選擇配置,主要作用是在多個(gè)Reduce的情況下,指定Map的結(jié)果由某一個(gè)Reduce處理,每一個(gè)Reduce都會(huì)有單獨(dú)的輸出文件。(后面的代碼實(shí)例中有介紹使用場(chǎng)景)
6. Reduce執(zhí)行具體的業(yè)務(wù)邏輯,并且將處理結(jié)果輸出給OutputFormat。
7. OutputFormat的職責(zé)是,驗(yàn)證輸出目錄是否已經(jīng)存在,同時(shí)驗(yàn)證輸出結(jié)果類型是否如Config中配置,最后輸出Reduce匯總后的結(jié)果。
業(yè)務(wù)場(chǎng)景描述:
可設(shè)定輸入和輸出路徑(操作系統(tǒng)的路徑非HDFS路徑),根據(jù)訪問日志分析某一個(gè)應(yīng)用訪問某一個(gè)API的總次數(shù)和總流量,統(tǒng)計(jì)后分別輸出到兩個(gè)文件中。
僅僅為了測(cè)試,因此沒有去細(xì)分很多類,將所有的類都?xì)w并于一個(gè)類便于說明問題。
LogAnalysiser就是主類,主要負(fù)責(zé)創(chuàng)建,提交任務(wù),并且輸出部分信息。內(nèi)部的幾個(gè)子類用途可以參看流程中提到的角色職責(zé)。具體的看看幾個(gè)類和方法的代碼片斷:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();//沒有配置RecordReader,所以默認(rèn)采用line的實(shí)現(xiàn),key就是行號(hào),value就是行內(nèi)容
if (line == null || line.equals(""))
return;
String[] words = line.split(",");
if (words == null || words.length < 8)
return;
String appid = words[1];
String apiName = words[2];
LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
Text record = new Text();
record.set(new StringBuffer("flow::").append(appid)
.append("::").append(apiName).toString());
reporter.progress();
output.collect(record, recbytes);//輸出流量的統(tǒng)計(jì)結(jié)果,通過flow::作為前綴來標(biāo)示。
record.clear();
record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
output.collect(record, new LongWritable(1));//輸出次數(shù)的統(tǒng)計(jì)結(jié)果,通過count::作為前綴來標(biāo)示
}
}
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable>
{
public int getPartition(Text key, LongWritable value, int numPartitions)
{
if (numPartitions >= 2)//Reduce 個(gè)數(shù),判斷流量還是次數(shù)的統(tǒng)計(jì)分配到不同的Reduce
if (key.toString().startsWith("flow::"))
return 0;
else
return 1;
else
return 0;
}
public void configure(JobConf job){}
}
LogAnalysiser:: CombinerClass
參看ReduceClass,通常兩者可以使用一個(gè),不過這里有些不同的處理就分成了兩個(gè)。在ReduceClass中藍(lán)色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase
implements Reducer<Text, LongWritable,Text, LongWritable>
{
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
{
Text newkey = new Text();
newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
LongWritable result = new LongWritable();
long tmp = 0;
int counter = 0;
while(values.hasNext())//累加同一個(gè)key的統(tǒng)計(jì)結(jié)果
{
tmp = tmp + values.next().get();
counter = counter +1;//擔(dān)心處理太久,JobTracker長時(shí)間沒有收到報(bào)告會(huì)認(rèn)為TaskTracker已經(jīng)失效,因此定時(shí)報(bào)告一下
if (counter == 1000)
{
counter = 0;
reporter.progress();
}
}
result.set(tmp);
output.collect(newkey, result);//輸出最后的匯總結(jié)果
}
}
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn‘t dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS中
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會(huì)檢查
conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會(huì)檢查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//強(qiáng)制需要有兩個(gè)Reduce來分別處理流量和次數(shù)的統(tǒng)計(jì)
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//刪除輸入和輸出的臨時(shí)文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
以上的代碼就完成了所有的邏輯性代碼,然后還需要一個(gè)注冊(cè)驅(qū)動(dòng)類來注冊(cè)業(yè)務(wù)Class為一個(gè)可標(biāo)示的命令,讓hadoop jar可以執(zhí)行。
public class ExampleDriver {
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
}
}
將代碼打成jar,并且設(shè)置jar的mainClass為ExampleDriver這個(gè)類。
在分布式環(huán)境啟動(dòng)以后執(zhí)行如下語句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會(huì)看見整個(gè)執(zhí)行過程,包括了Map,Reduce的進(jìn)度。執(zhí)行完畢會(huì)在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個(gè)文件:part-00000和part-00001分別記錄了統(tǒng)計(jì)后的結(jié)果。
如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce的創(chuàng)建情況以及執(zhí)行情況。
在運(yùn)行期也可以通過瀏覽器來查看Map,Reduce的情況:
http://MasterIP:50030/jobtracker.jsp
首先這里使用上面的范例作為測(cè)試,也沒有做太多的優(yōu)化配置,這個(gè)測(cè)試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。
文件復(fù)制數(shù)為1,blocksize
Slave數(shù) | 處理記錄數(shù)(萬條) | 執(zhí)行時(shí)間(秒) |
2 | 95 | 38 |
2 | 950 | 337 |
4 | 95 | 24 |
4 | 950 | 178 |
6 | 95 | 21 |
6 | 950 | 114 |
Blocksize
Slave數(shù) | 處理記錄數(shù)(萬條) | 執(zhí)行時(shí)間(秒) |
2(文件復(fù)制數(shù)為1) | 950 | 337 |
2(文件復(fù)制數(shù)為3) | 950 | 339 |
6(文件復(fù)制數(shù)為1) | 950 | 114 |
6(文件復(fù)制數(shù)為3) | 950 | 117 |
文件復(fù)制數(shù)為1
Slave數(shù) | 處理記錄數(shù)(萬條) | 執(zhí)行時(shí)間(秒) |
6(blocksize | 95 | 21 |
6(blocksize | 95 | 26 |
4(blocksize | 950 | 178 |
4(blocksize | 950 | 54 |
6(blocksize | 950 | 114 |
6(blocksize | 950 | 44 |
6(blocksize | 950 | 74 |
測(cè)試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測(cè)幾次同樣條件下都是一樣。
測(cè)試結(jié)果可以看出一下幾點(diǎn):
1. 機(jī)器數(shù)對(duì)于性能還是有幫助的(等于沒說^_^)。
2. 文件復(fù)制數(shù)的增加只對(duì)安全性有幫助,但是對(duì)于性能沒有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準(zhǔn)備的時(shí)間很長。
3. blocksize對(duì)于性能影響很大,首先如果將block劃分的太小,那么將會(huì)增加job的數(shù)量,同時(shí)也增加了協(xié)作的代價(jià),降低了性能,但是配置的太大也會(huì)讓job不能最大化并行處理。所以這個(gè)值的配置需要根據(jù)數(shù)據(jù)處理的量來考慮。
4. 最后就是除了這個(gè)表里面列出來的結(jié)果,應(yīng)該去仔細(xì)看輸出目錄中的_logs/history中的xxx_analysisjob這個(gè)文件,里面記錄了全部的執(zhí)行過程以及讀寫情況。這個(gè)可以更加清楚地了解哪里可能會(huì)更加耗時(shí)。
“云計(jì)算”熱的燙手,就和SAAS,Web2,SNS等等一樣,往往都是在搞概念,只有真正踏踏實(shí)實(shí)的那些大型的互聯(lián)網(wǎng)公司,才會(huì)投入人力物力去研究符合自己的分布式計(jì)算。其實(shí)當(dāng)你的數(shù)據(jù)量沒有那么大的時(shí)候,這種分布式計(jì)算也就僅僅只是一個(gè)玩具而已,真正只有解決問題的過程中,它深層次的問題才會(huì)被挖掘出來。
這篇文章僅僅是為了給對(duì)分布式計(jì)算有興趣的朋友拋個(gè)磚,要想真的掘到金子,那么就踏踏實(shí)實(shí)的去用,去想,去分析。后續(xù)自己也會(huì)更進(jìn)一步的去研究框架中的實(shí)現(xiàn)機(jī)制,在解決自己?jiǎn)栴}的同時(shí),也能夠貢獻(xiàn)一些什么。
前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實(shí)有多少架構(gòu)師知道什么叫做架構(gòu),架構(gòu)師的職責(zé)是什么,與其追求這么一個(gè)名號(hào),還不如踏踏實(shí)實(shí)的作塊石頭沉到水底,積累和沉淀的過程就是一種成長。
聯(lián)系客服