国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
分布式計(jì)算開源框架Hadoop入門實(shí)踐

 分布式計(jì)算開源框架Hadoop入門實(shí)踐收藏

 | 舊一篇: 休假

Author :岑文初

Email: wenchu.cenwc@alibaba-inc.com

msn: cenwenchu_79@hotmail.com

blog: http://blog.csdn.net/cenwenchu79/

 

... 2

What is Hadoop. 2

Why is hadoop. 6

How to Use Hadoop & Tips 7

環(huán)境:... 7

部署考慮:... 7

實(shí)施步驟:... 7

Hadoop Command. 10

Hadoop基本流程以及簡(jiǎn)單應(yīng)用的開發(fā)... 11

基本流程:... 11

代碼范例:... 13

Hadoop集群測(cè)試... 18

隨想... 19

 


       SIP項(xiàng)目設(shè)計(jì)的過程中,對(duì)于它龐大的日志在早先就考慮使用任務(wù)分解的多線程處理模式來分析統(tǒng)計(jì),在前面有一篇Blog中提到了那部分的設(shè)計(jì),但是由于統(tǒng)計(jì)的內(nèi)容暫時(shí)還是十分簡(jiǎn)單,所以就采用Memcache作為計(jì)數(shù)器結(jié)合Mysql完成了訪問控制以及統(tǒng)計(jì)的工作。但未來,對(duì)于海量日志分析的工作,還是需要有所準(zhǔn)備?,F(xiàn)在最火的技術(shù)詞匯莫過于“云計(jì)算”,在Open API日益盛行的今天,互聯(lián)網(wǎng)應(yīng)用的數(shù)據(jù)將會(huì)越來越有價(jià)值,如何去分析這些數(shù)據(jù),挖掘其內(nèi)在價(jià)值,就需要分布式計(jì)算來支撐起海量數(shù)據(jù)的分析工作。

       回過頭來看,早先那種多線程,多任務(wù)分解的日志分析設(shè)計(jì),其實(shí)是分布式計(jì)算的一個(gè)單機(jī)版縮略,如何將這種單機(jī)的工作分拆,變成集群工作協(xié)同,其實(shí)就是分布式計(jì)算框架設(shè)計(jì)所涉及的。在去年參加BEA的大會(huì)時(shí)候,BEAVMWare合作采用虛擬機(jī)來構(gòu)建集群,無非就是希望使得計(jì)算機(jī)硬件能夠類似于應(yīng)用程序中的資源池中的資源,使用者無需關(guān)心資源的分配情況,最大化了硬件資源的使用價(jià)值。分布式計(jì)算也是如此,具體的計(jì)算任務(wù)交由哪一臺(tái)機(jī)器執(zhí)行,執(zhí)行后由誰來匯總,這都由分布式框架的Master來抉擇,而使用者只需簡(jiǎn)單的將待分析內(nèi)容的提供給分布式計(jì)算系統(tǒng)作為輸入,就可以得到分布式計(jì)算后的結(jié)果。    HadoopApache開源組織的一個(gè)分布式計(jì)算開源框架,在很多大型網(wǎng)站上都已經(jīng)得到了應(yīng)用,亞馬遜,Facebook,Yahoo等等。對(duì)于我來說,最近的一個(gè)使用點(diǎn)就是服務(wù)集成平臺(tái)的日志分析,服務(wù)集成平臺(tái)的日志量將會(huì)很大,這也正好符合了分布式計(jì)算的適用場(chǎng)景(日志分析,索引建立就是兩大應(yīng)用場(chǎng)景)。

       當(dāng)前沒有正式確定使用,所以也是自己業(yè)余摸索,后續(xù)所寫的相關(guān)內(nèi)容,都是一個(gè)新手的學(xué)習(xí)過程,難免會(huì)有一些錯(cuò)誤,只是希望記錄下來可以分享給更多志同道合的朋友。

 

What is Hadoop

       搞什么東西之前,第一步是要知道What,然后是Why,最后才是How,但很多開發(fā)的朋友在做了多年項(xiàng)目以后,都習(xí)慣是先How,然后What,最后才是Why,這樣只會(huì)變得浮躁,同時(shí)往往會(huì)將技術(shù)誤用不適合的場(chǎng)景。

       Hadoop框架中最核心設(shè)計(jì)就是:MapReduceHDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡(jiǎn)單的一句話解釋MapReduce就是任務(wù)的分解與結(jié)果的匯總。HDFSHadoop分布式文件系統(tǒng)的縮寫,為分布式計(jì)算存儲(chǔ)提供了底層支持。

       MapReduce從它名字上來看就大致可以看出個(gè)緣由,兩個(gè)動(dòng)詞Map,Reduce,Map(展開)就是將一個(gè)任務(wù)分解成為多個(gè)任務(wù),Reduce就是將分解后多任務(wù)處理的結(jié)果匯總起來,得出最后的分析結(jié)果。這不是什么新思想,其實(shí)在前面提到了多線程,多任務(wù)的設(shè)計(jì)就可以找到這種思想的影子。不論是現(xiàn)實(shí)社會(huì),還是在程序設(shè)計(jì)中,一項(xiàng)工作往往可以被拆分成為多個(gè)任務(wù),任務(wù)之間的關(guān)系可以分為兩種:一種是不相關(guān)的任務(wù),可以并行執(zhí)行;另一種是任務(wù)之間有相互的依賴,先后順序不能夠顛倒,這類任務(wù)是無法并行處理的?;氐竭^去,大學(xué)老師上課時(shí)讓大家去分析關(guān)鍵路徑,無非就是找最省時(shí)的任務(wù)分解執(zhí)行方式。在分布式系統(tǒng)中,機(jī)器集群就可以看作硬件資源池,將并行的任務(wù)拆分交由每一個(gè)空閑機(jī)器資源去處理,能夠極大地提高計(jì)算效率,同時(shí)這種資源無關(guān)性,對(duì)于計(jì)算集群的擴(kuò)展無疑提供了最好的設(shè)計(jì)保證。(其實(shí)我一直認(rèn)為Hadoop的卡通圖標(biāo)不應(yīng)該是一個(gè)小象,應(yīng)該是螞蟻,分布式計(jì)算就好比螞蟻吃大象,廉價(jià)的機(jī)器群可以匹敵任何高性能的計(jì)算機(jī),縱向擴(kuò)展的曲線始終敵不過橫向擴(kuò)展的斜線)。任務(wù)分解處理以后,那就需要將處理以后的結(jié)果在匯總起來,這就是Reduce要做的工作。

 

1 MapReduce

       上圖就是MapReduce大致的結(jié)構(gòu)圖,在Map前還可能會(huì)對(duì)輸入的數(shù)據(jù)有split的過程,保證任務(wù)并行效率,在Map之后還會(huì)有shuffle的過程,對(duì)于提高Reduce的效率以及減小數(shù)據(jù)傳輸?shù)膲毫τ泻艽蟮膸椭:竺鏁?huì)具體提及這些部分的細(xì)節(jié)。

 

       HDFS是分布式計(jì)算的存儲(chǔ)基石,Hadoop的分布式文件系統(tǒng)和其他分布式文件系統(tǒng)有很多類似的特質(zhì)。

       分布式文件系統(tǒng)基本的幾個(gè)特點(diǎn):

1.       對(duì)于整個(gè)集群有單一的命名空間。

2.       數(shù)據(jù)一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創(chuàng)建之前是無法看到文件存在。

3.       文件會(huì)被分割成多個(gè)文件塊,每個(gè)文件塊被分配存儲(chǔ)到數(shù)據(jù)節(jié)點(diǎn)上,而且根據(jù)配置會(huì)有復(fù)制文件塊來保證數(shù)據(jù)的安全性。

 

2 HDFS

       上圖中展現(xiàn)了整個(gè)HDFS三個(gè)重要角色:NameNode,DataNode,Client。

NameNode可以看作是分布式文件系統(tǒng)中的管理者,主要負(fù)責(zé)管理文件系統(tǒng)的命名空間,集群配置信息,存儲(chǔ)塊的復(fù)制。NameNode會(huì)存儲(chǔ)文件系統(tǒng)的Meta-data在內(nèi)存中,這些信息主要包括了文件信息,每一個(gè)文件對(duì)應(yīng)的文件塊的信息,每一個(gè)文件塊在DataNode的信息。

DataNode是文件存儲(chǔ)的基本單元。它存儲(chǔ)Block在本地文件系統(tǒng)中,保存了BlockMeta-data,同時(shí)周期性的發(fā)送所有存在的block的報(bào)告給NameNode

Client就是需要獲取分布式文件系統(tǒng)文件的應(yīng)用程序。

這里通過三個(gè)操作來說明他們之間的交互關(guān)系。

 

文件寫入:

1.       ClientNameNode發(fā)起文件寫入的請(qǐng)求。

2.       NameNode根據(jù)文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。

3.       Client將文件劃分為多個(gè)Block,根據(jù)DataNode的地址信息,按順序?qū)懭氲矫恳粋€(gè)DataNode塊中。

 

文件讀?。?/font>

1.       ClientNameNode發(fā)起文件讀取的請(qǐng)求。

2.       NameNode返回文件存儲(chǔ)的DataNode的信息。

3.       Client讀取文件信息。

 

文件Block復(fù)制:

1.       NameNode發(fā)現(xiàn)部分文件的block不符合最小復(fù)制數(shù)或者部分DataNode失效。

2.       通知DataNode相互復(fù)制Block。

3.       DataNode開始直接相互復(fù)制。

 

最后在說一下HDFS的幾個(gè)設(shè)計(jì)特點(diǎn):(對(duì)于框架設(shè)計(jì)值得借鑒)

1.  Block的放置

默認(rèn)不配置,一個(gè)Block會(huì)有三份備份。一份放在NameNode指定的DataNode,另一份放在與指定DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了數(shù)據(jù)安全,考慮同一Rack的失敗情況以及不同Rack之間數(shù)據(jù)拷貝性能問題就采用這種配置方式。

 

2.  心跳檢測(cè)DataNode的健康狀況,如果發(fā)現(xiàn)問題就采取數(shù)據(jù)備份的方式來保證數(shù)據(jù)的安全性。

 

3.  數(shù)據(jù)復(fù)制。(DataNode失敗的時(shí)候,需要平衡DataNode的存儲(chǔ)利用率的時(shí)候,需要平衡DataNode數(shù)據(jù)交互壓力的時(shí)候)

這里先說一下,使用HDFSbalancer命令,可以配置一個(gè)Threshold來平衡每一個(gè)DataNode磁盤利用率。例如設(shè)置了Threshold10%,那么執(zhí)行balancer命令的時(shí)候,首先統(tǒng)計(jì)所有DataNode的磁盤利用率的均值,然后判斷如果某一個(gè)DataNode的磁盤利用率超過這個(gè)均值Threshold以上,那么將會(huì)把這個(gè)DataNodeblock轉(zhuǎn)移到磁盤利用率低的DataNode,這對(duì)于新節(jié)點(diǎn)的加入來說十分有用。

 

4.  數(shù)據(jù)交驗(yàn)。采用CRC32作數(shù)據(jù)交驗(yàn)。在文件Block寫入的時(shí)候除了寫入數(shù)據(jù)還會(huì)寫入交驗(yàn)信息,在讀取的時(shí)候需要交驗(yàn)后再讀入。

5.  NameNode是單點(diǎn)。如果失敗的話,任務(wù)處理信息將會(huì)紀(jì)錄在本地文件系統(tǒng)和遠(yuǎn)端的文件系統(tǒng)中。

6.  數(shù)據(jù)管道性的寫入。

當(dāng)客戶端要寫入文件到DataNode上,首先客戶端讀取一個(gè)Block然后寫到第一個(gè)DataNode上,然后由第一個(gè)DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個(gè)BlockNataNode都成功寫入,客戶端才會(huì)繼續(xù)開始寫下一個(gè)Block

7.  安全模式。

在分布式文件系統(tǒng)啟動(dòng)的時(shí)候,開始的時(shí)候會(huì)有安全模式,當(dāng)分布式文件系統(tǒng)處于安全模式的情況下,文件系統(tǒng)中的內(nèi)容不允許修改也不允許刪除,直到安全模式結(jié)束。安全模式主要是為了系統(tǒng)啟動(dòng)的時(shí)候檢查各個(gè)DataNode上數(shù)據(jù)塊的有效性,同時(shí)根據(jù)策略必要的復(fù)制或者刪除部分?jǐn)?shù)據(jù)塊。運(yùn)行期通過命令也可以進(jìn)入安全模式。在實(shí)踐過程中,系統(tǒng)啟動(dòng)的時(shí)候去修改和刪除文件也會(huì)有安全模式不允許修改的出錯(cuò)提示,只需要等待一會(huì)兒即可。

 

       綜合MapReduceHDFS來看Hadoop的結(jié)構(gòu):

 

 

3 Hadoop

       Hadoop的系統(tǒng)中,會(huì)有一臺(tái)Master,主要負(fù)責(zé)NameNode的工作以及JobTracker的工作。JobTracker是的主要職責(zé)就是啟動(dòng),跟蹤,調(diào)度各個(gè)Slave的任務(wù)執(zhí)行。還會(huì)有多臺(tái)Slave,每一臺(tái)Slave通常具有DataNode的功能以及TaskTracker的工作。TaskTracker根據(jù)應(yīng)用要求來結(jié)合本地?cái)?shù)據(jù)執(zhí)行Map任務(wù)以及Reduce任務(wù)。

       說到這里,就要提到分布式計(jì)算的最重要的一個(gè)設(shè)計(jì)點(diǎn):Moving Computation is Cheaper than Moving Data。就是在分布式處理中,移動(dòng)數(shù)據(jù)的代價(jià)總是高于轉(zhuǎn)移計(jì)算的代價(jià)。簡(jiǎn)單來說就是分而治之的工作,需要將數(shù)據(jù)也分而存儲(chǔ),本地任務(wù)處理本地?cái)?shù)據(jù)然后歸總,這樣才會(huì)保證分布式計(jì)算的高效性。


Why is hadoop

       說完了What,簡(jiǎn)單的說一下Why。官方網(wǎng)站已經(jīng)給了很多的說明,這里就大致說一下其優(yōu)點(diǎn)及使用的場(chǎng)景(沒有不好的工具,只用不適用的工具,因此選擇好場(chǎng)景才能夠真正發(fā)揮分布式計(jì)算的作用)

1.  可擴(kuò)展。不論是存儲(chǔ)的可擴(kuò)展還是計(jì)算的可擴(kuò)展都是Hadoop的設(shè)計(jì)根本。

2.  經(jīng)濟(jì)??蚣芸梢赃\(yùn)行在任何普通的PC上。

3.  可靠。分布式文件系統(tǒng)的備份恢復(fù)機(jī)制以及MapReduce的任務(wù)監(jiān)控保證了分布式處理的可靠性。

4.  高效。分布式文件系統(tǒng)的高效數(shù)據(jù)交互實(shí)現(xiàn)以及MapReduce結(jié)合Local Data處理的模式,為高效處理海量的信息作了基礎(chǔ)準(zhǔn)備。

 

使用場(chǎng)景:個(gè)人覺得最適合的就是海量數(shù)據(jù)的分析,其實(shí)Google最早提出MapReduce也就是為了海量數(shù)據(jù)分析。同時(shí)HDFS最早是為了搜索引擎實(shí)現(xiàn)而開發(fā)的,后來才被用于分布式計(jì)算框架中。

海量數(shù)據(jù)被分割于多個(gè)節(jié)點(diǎn),然后由每一個(gè)節(jié)點(diǎn)并行計(jì)算,將得出結(jié)果歸并到輸出。同時(shí)第一階段的輸出又可以作為下一階段計(jì)算的輸入,因此可以想象到一個(gè)樹狀結(jié)構(gòu)的分布式計(jì)算圖,在不同階段都有不同產(chǎn)出,同時(shí)并行和串行結(jié)合的計(jì)算也可以很好的在分布式集群的資源下得以高效的處理。

 


How to Use Hadoop & Tips

其實(shí)參看Hadoop官方文檔已經(jīng)能夠很容易配置分布式框架運(yùn)行環(huán)境了,不過這里既然寫了就再多寫一點(diǎn),同時(shí)有一些細(xì)節(jié)需要注意的也說一下,其實(shí)也就是這些細(xì)節(jié)會(huì)讓人摸索半天。

Hadoop可以單機(jī)跑,也可以配置集群跑,單機(jī)跑就不需要多說了,只需要按照Demo的運(yùn)行說明直接執(zhí)行命令即可。這里主要重點(diǎn)說一下集群配置運(yùn)行的過程。

 

環(huán)境:

7臺(tái)普通的機(jī)器,操作系統(tǒng)都是linux。內(nèi)存和CPU就不說了,反正Hadoop一大特點(diǎn)就是機(jī)器在多不在精。JDK必須是1.5以上的,這個(gè)切記。7臺(tái)機(jī)器的機(jī)器名務(wù)必不同,后續(xù)會(huì)談到機(jī)器名對(duì)于MapReduce有很大的影響。

 

部署考慮:

正如上面我描述的,對(duì)于Hadoop的集群來說,可以分成兩大類角色,MasterSlave,前者主要配置NameNodeJobTracker的角色,負(fù)責(zé)總管分布式數(shù)據(jù)和分解任務(wù)的執(zhí)行,后者配置DataNodeTaskTracker的角色,負(fù)責(zé)分布式數(shù)據(jù)存儲(chǔ)以及任務(wù)的執(zhí)行。本來打算一臺(tái)機(jī)器是否可以配置成為Master同時(shí)也是Slave,不過發(fā)現(xiàn)在NameNode初始化的過程中以及TaskTracker執(zhí)行過程中機(jī)器名配置好像有沖突(NameNodeTaskTracker對(duì)于Hosts的配置有些沖突,究竟是把機(jī)器名對(duì)應(yīng)IP放在配置前面還是把Localhost對(duì)應(yīng)IP放在前面有點(diǎn)問題,不過可能也是我自己的問題吧,這個(gè)大家可以根據(jù)實(shí)施情況給我反饋)。最后反正決定一臺(tái)Master,六臺(tái)Slave,后續(xù)復(fù)雜的應(yīng)用開發(fā)和測(cè)試結(jié)果的比對(duì)會(huì)增加機(jī)器配置。

 

實(shí)施步驟:

1.  在所有的機(jī)器上都建立相同的目錄,也可以就建立相同的用戶,以該用戶的home路徑來做hadoop的安裝路徑。例如我在所有的機(jī)器上都建立了/home/wenchu

2.  下載Hadoop,先解壓到Master上。這里我是下載的0.17.1的版本。此時(shí)Hadoop的安裝路徑就是/home/wenchu/hadoop-0.17.1

3.  解壓后進(jìn)入conf目錄,主要需要修改以下文件:hadoop-env.shhadoop-site.xml,masters,slaves。

Hadoop的基礎(chǔ)配置文件是hadoop-default.xml,看Hadoop的代碼可以知道,默認(rèn)建立一個(gè)Job的時(shí)候會(huì)建立JobConfig,Config首先讀入hadoop-default.xml的配置,然后再讀入hadoop-site.xml的配置(這個(gè)文件初始的時(shí)候配置為空),hadoop-site.xml中主要配置你需要覆蓋的hadoop-default.xml的系統(tǒng)級(jí)配置,以及你需要在你的MapReduce過程中使用的自定義配置(具體的一些使用例如final等參考文檔)。

 

以下是一個(gè)簡(jiǎn)單的hadoop-site.xml的配置:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>

   <name>fs.default.name</name>//你的namenode的配置,機(jī)器名加端口

   <value>hdfs://10.2.224.46:54310/</value>

</property>

<property>

   <name>mapred.job.tracker</name>//你的JobTracker的配置,機(jī)器名加端口

   <value>hdfs://10.2.224.46:54311/</value>

</property>

<property>

   <name>dfs.replication</name>//數(shù)據(jù)需要備份的數(shù)量,默認(rèn)是三

   <value>1</value>

</property>

<property>

    <name>hadoop.tmp.dir</name>//Hadoop的默認(rèn)臨時(shí)路徑,這個(gè)最好配置,然后在新增節(jié)點(diǎn)或者其他情況下莫名其妙的DataNode啟動(dòng)不了,就刪除此文件中的tmp目錄即可。不過如果刪除了NameNode機(jī)器的此目錄,那么就需要重新執(zhí)行NameNode格式化的命令了。

    <value>/home/wenchu/hadoop/tmp/</value>

</property>

<property>

   <name>mapred.child.java.opts</name>//java虛擬機(jī)的一些參數(shù)可以參照配置

   <value>-Xmx512m</value>

</property>

<property>

  <name>dfs.block.size</name>//block的大小,單位字節(jié),后面會(huì)提到用處,必須是512的倍數(shù),因?yàn)椴捎?/span>crc作文件完整性交驗(yàn),默認(rèn)配置512checksum的最小單元。

  <value>5120000</value>

  <description>The default block size for new files.</description>

</property>

</configuration>

 

hadoop-env.sh文件只需要修改一個(gè)參數(shù):

# The java implementation to use.  Required.

export JAVA_HOME=/usr/ali/jdk1.5.0_10

配置你的Java路徑,記住一定要1.5版本以上,免得莫名其妙出現(xiàn)問題。

 

Masters中配置Mastersip或者機(jī)器名,如果是機(jī)器名那么需要在/etc/hosts中有所設(shè)置。

Slaves中配置的是Slavesip或者機(jī)器名,同樣如果是機(jī)器名需要在/etc/hosts中有所設(shè)置。

范例如下:我這里配置的都是ip.

Masters:

10.2.224.46

 

Slaves:

10.2.226.40

10.2.226.39

10.2.226.38

10.2.226.37

10.2.226.41

10.2.224.36

 

4.  建立Master到每一臺(tái)Slavessh受信證書。由于Master將會(huì)通過SSH啟動(dòng)所有的SlaveHadoop,所以需要建立單向或者雙向證書保證命令執(zhí)行時(shí)不需要再輸入密碼。Master和所有的Slave機(jī)器上執(zhí)行:ssh-keygen -t rsa。執(zhí)行此命令的時(shí)候,看到提示只需要回車。然后就會(huì)在/root/.ssh/下面產(chǎn)生id_rsa.pub的證書文件,通過scpMaster機(jī)器上的這個(gè)文件拷貝到Slave上(記得修改名稱),例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub,然后執(zhí)行cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_keys,建立authorized_keys文件即可,可以打開這個(gè)文件看看,也就是rsa的公鑰作為key,user@IP作為value。此時(shí)可以試驗(yàn)一下,從master sshslave已經(jīng)不需要密碼了。由slave反向建立也是同樣,為什么要反向呢,其實(shí)如果一直都是Master啟動(dòng)和關(guān)閉的話那么沒有必要建立反向,只是如果想在Slave也可以關(guān)閉Hadoop就需要建立反向。

5.  Master上的Hadoop通過scp拷貝到每一個(gè)Slave相同的目錄下,根據(jù)每一個(gè)SlaveJava_HOME的不同修改其hadoop-env.sh

6.  修改Master/etc/profile

新增以下內(nèi)容:具體的內(nèi)容根據(jù)你的安裝路徑修改,這步只是為了方便使用

export HADOOP_HOME=/home/wenchu/hadoop-0.17.1

export PATH=$PATH:$HADOOP_HOME/bin

 

修改完執(zhí)行 source /etc/profile來使得其生效。

7.  Master上執(zhí)行Hadoop namenode –format,這是第一需要做的初始化,可以看作格式化吧,以后除了在上面我提到過刪除了Master上的hadoop.tmp.dir目錄,否則是不需要再次執(zhí)行的。

8.  然后執(zhí)行Master上的start-all.sh,這個(gè)命令可以直接執(zhí)行,因?yàn)樵?/font>6已經(jīng)添加到了path路徑了,這個(gè)命令是啟動(dòng)hdfsmapreduce兩部分,當(dāng)然你也可以分開單獨(dú)啟動(dòng)hdfsmapreduce,分別是bin目錄下的start-dfs.shstart-mapred.sh。

9.  檢查Masterlogs目錄看看Namenode日志以及JobTracker日志是否正常啟動(dòng)。

10.              檢查Slavelogs目錄看看Datanode日志以及TaskTracker日志是否正常。

11.              如果需要關(guān)閉,那么就直接執(zhí)行stop-all.sh即可。

 

以上步驟就可以啟動(dòng)Hadoop的分布式環(huán)境,然后在Master的機(jī)器進(jìn)入Master的安裝目錄,執(zhí)行hadoop jar hadoop-0.17.1-examples.jar wordcount 輸入路徑 輸出路徑,就可以看到字?jǐn)?shù)統(tǒng)計(jì)的效果了。此處的輸入路徑和輸出路徑都指的是HDFS中的路徑,因此你可以首先通過拷貝本地文件系統(tǒng)中的目錄到HDFS中的方式來建立HDFS中的輸入路徑:

hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。其中/home/wenchu/test-in是本地路徑,test-in是將會(huì)建立在HDFS中的路徑,執(zhí)行完畢以后可以通過hadoop dfs –ls可以看到test-in目錄已經(jīng)存在,同時(shí)可以通過hadoop dfs –ls test-in看來里面的內(nèi)容。輸出路徑要求是在HDFS中不存在的,當(dāng)執(zhí)行完那個(gè)demo以后,就可以通過hadoop dfs –ls 輸出路徑看到其中的內(nèi)容,具體文件的內(nèi)容可以通過hadoop dfs –cat 文件名稱來查看。

 

注意事項(xiàng):這部分是我在使用過程中花了一些時(shí)間走的彎路

1.  MasterSlave上的幾個(gè)conf配置文件不需要全部同步,如果確定都是通過Master去啟動(dòng)和關(guān)閉,那么Slave機(jī)器上的配置不需要去維護(hù)。但如果希望在任意一臺(tái)機(jī)器都可以啟動(dòng)和關(guān)閉Hadoop,那么就需要全部保持一致了。

2.  MasterSlave機(jī)器上的/etc/hosts中必須把集群中機(jī)器都配置上去,就算在各個(gè)配置文件中使用的是ip。這個(gè)吃過不少苦頭,原來以為如果配成ip就不需要去配置host,結(jié)果發(fā)現(xiàn)在執(zhí)行Reduce的時(shí)候總是卡住,在拷貝的時(shí)候就無法繼續(xù)下去,不斷重試。另外如果集群中如果有兩臺(tái)機(jī)器的機(jī)器名如果重復(fù)也會(huì)出現(xiàn)問題。

3.  如果在新增了節(jié)點(diǎn)或者刪除節(jié)點(diǎn)的時(shí)候出現(xiàn)了問題,首先就去刪除Slavehadoop.tmp.dir,然后重新啟動(dòng)試試看,如果還是不行那就干脆把Masterhadoop.tmp.dir刪除(意味著dfs上的數(shù)據(jù)也會(huì)丟失),如果刪除了Masterhadoop.tmp.dir那么就需要重新namenode –format了。

4.  Map任務(wù)個(gè)數(shù)以及Reduce任務(wù)個(gè)數(shù)配置。前面分布式文件系統(tǒng)設(shè)計(jì)提到一個(gè)文件被放入到分布式文件系統(tǒng)中,會(huì)被分割成多個(gè)block放置到每一個(gè)的DataNode上,默認(rèn)dfs.block.size應(yīng)該是64M,也就是說如果你放置到HDFS上的數(shù)據(jù)小于64,那么將只有一個(gè)Block,此時(shí)會(huì)被放置到某一個(gè)DataNode中,這個(gè)可以通過使用命令:hadoop dfsadmin –report就可以看到各個(gè)節(jié)點(diǎn)存儲(chǔ)的情況。也可以直接去某一個(gè)DataNode查看目錄:hadoop.tmp.dir/dfs/data/current就可以看到那些block了。Block的數(shù)量將會(huì)直接影響到Map的個(gè)數(shù)。當(dāng)然可以通過配置來設(shè)定MapReduce的任務(wù)個(gè)數(shù)。Map的個(gè)數(shù)通常默認(rèn)和HDFS需要處理的blocks相同。也可以通過配置Map的數(shù)量或者配置minimum split size來設(shè)定,實(shí)際的個(gè)數(shù)為:max(min(block_size,data/#maps),min_split_size)。Reduce可以通過這個(gè)公式計(jì)算:0.95*num_nodes*mapred.tasktracker.tasks.maximum。

 

總的來說出了問題或者啟動(dòng)的時(shí)候最好去看看日志,這樣心里有底。

 

Hadoop Command

       這部分內(nèi)容其實(shí)可以通過命令的Help以及介紹了解,我主要側(cè)重于介紹一下我用的比較多的幾個(gè)命令。

Hadoop dfs 這個(gè)命令后面加參數(shù)就是對(duì)于HDFS的操作,和linux操作系統(tǒng)的命令很類似,例如:

Hadoop dfs –ls 就是查看/usr/root目錄下的內(nèi)容,默認(rèn)如果不填路徑這就是當(dāng)前用戶路徑

Hadoop dfs –rmr xxx就是刪除目錄,還有很多命令看看就很容易上手

 

Hadoop dfsadmin –report 這個(gè)命令可以全局的查看DataNode的情況。

Hadoop job 后面增加參數(shù)是對(duì)于當(dāng)前運(yùn)行的Job的操作,例如list,kill

 

Hadoop balancer就是前面提到的均衡磁盤負(fù)載的命令。

 

其他就不詳細(xì)介紹了。

 

Hadoop基本流程以及簡(jiǎn)單應(yīng)用的開發(fā)

基本流程:

 

 

 

 

 

 

 

 

一個(gè)圖片太大了,只好分割成為兩部分。根據(jù)流程圖來說一下具體的一個(gè)任務(wù)執(zhí)行的情況。

 

1.  分布式環(huán)境中客戶端創(chuàng)建任務(wù)并提交。

2.  InputFormatMap前的預(yù)處理,主要負(fù)責(zé)以下工作:

a)         驗(yàn)證輸入的格式是否符合JobConfig的輸入定義,這個(gè)在實(shí)現(xiàn)Map和構(gòu)建Conf的時(shí)候就會(huì)知道,不定義可以是Writable的任意子類。

b)        input的文件split為邏輯上的輸入InputSplit,其實(shí)這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會(huì)被劃分為多個(gè)block。

c)         通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來切分還需要RecordReader來實(shí)現(xiàn),例如最簡(jiǎn)單的默認(rèn)方式就是回車換行的切分)

 

3.  RecordReader處理后的結(jié)果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key,value對(duì)到臨時(shí)中間文件。

4.  Combiner可選擇配置,主要作用是在每一個(gè)Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過程中的數(shù)據(jù)傳輸量。

5.  Partitioner可選擇配置,主要作用是在多個(gè)Reduce的情況下,指定Map的結(jié)果由某一個(gè)Reduce處理,每一個(gè)Reduce都會(huì)有單獨(dú)的輸出文件。(后面的代碼實(shí)例中有介紹使用場(chǎng)景)

6.  Reduce執(zhí)行具體的業(yè)務(wù)邏輯,并且將處理結(jié)果輸出給OutputFormat。

7.  OutputFormat的職責(zé)是,驗(yàn)證輸出目錄是否已經(jīng)存在,同時(shí)驗(yàn)證輸出結(jié)果類型是否如Config中配置,最后輸出Reduce匯總后的結(jié)果。

 

 

 

代碼范例:

 

業(yè)務(wù)場(chǎng)景描述:

           可設(shè)定輸入和輸出路徑(操作系統(tǒng)的路徑非HDFS路徑),根據(jù)訪問日志分析某一個(gè)應(yīng)用訪問某一個(gè)API的總次數(shù)和總流量,統(tǒng)計(jì)后分別輸出到兩個(gè)文件中。

 

僅僅為了測(cè)試,因此沒有去細(xì)分很多類,將所有的類都?xì)w并于一個(gè)類便于說明問題。

 

 

 

 

 

4 測(cè)試代碼類圖

       LogAnalysiser就是主類,主要負(fù)責(zé)創(chuàng)建,提交任務(wù),并且輸出部分信息。內(nèi)部的幾個(gè)子類用途可以參看流程中提到的角色職責(zé)。具體的看看幾個(gè)類和方法的代碼片斷:

 

LogAnalysiser::MapClass

         public static class MapClass extends MapReduceBase

       implements Mapper<LongWritable, Text, Text, LongWritable>

         {

                   public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)

                                     throws IOException

                   {       

                            String line = value.toString();//沒有配置RecordReader,所以默認(rèn)采用line的實(shí)現(xiàn),key就是行號(hào),value就是行內(nèi)容

                            if (line == null || line.equals(""))

                                     return;

                            String[] words = line.split(",");

                            if (words == null || words.length < 8)

                                     return;

                            String appid = words[1];

                            String apiName = words[2];

                            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));

                            Text record = new Text();

                            record.set(new StringBuffer("flow::").append(appid)

                                                                 .append("::").append(apiName).toString());

                            reporter.progress();

                            output.collect(record, recbytes);//輸出流量的統(tǒng)計(jì)結(jié)果,通過flow::作為前綴來標(biāo)示。

                            record.clear();

                            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());

                            output.collect(record, new LongWritable(1));//輸出次數(shù)的統(tǒng)計(jì)結(jié)果,通過count::作為前綴來標(biāo)示

                   }       

         }

 

LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>

      {

           public int getPartition(Text key, LongWritable value, int numPartitions)

           {

                 if (numPartitions >= 2)//Reduce 個(gè)數(shù),判斷流量還是次數(shù)的統(tǒng)計(jì)分配到不同的Reduce

                      if (key.toString().startsWith("flow::"))

                            return 0;

                      else

                            return 1;

                 else

                      return 0;

           }

           public void configure(JobConf job){}  

}

LogAnalysiser:: CombinerClass

參看ReduceClass,通常兩者可以使用一個(gè),不過這里有些不同的處理就分成了兩個(gè)。在ReduceClass中藍(lán)色的行表示在CombinerClass中不存在。

 

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase

           implements Reducer<Text, LongWritable,Text, LongWritable>

      {

           public void reduce(Text key, Iterator<LongWritable> values,

                      OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException

           {

                 Text newkey = new Text();

                 newkey.set(key.toString().substring(key.toString().indexOf("::")+2));

                 LongWritable result = new LongWritable();

                 long tmp = 0;

                 int counter = 0;

                 while(values.hasNext())//累加同一個(gè)key的統(tǒng)計(jì)結(jié)果

                 {

                      tmp = tmp + values.next().get();

                     

                      counter = counter +1;//擔(dān)心處理太久,JobTracker長時(shí)間沒有收到報(bào)告會(huì)認(rèn)為TaskTracker已經(jīng)失效,因此定時(shí)報(bào)告一下

                      if (counter == 1000)

                      {

                            counter = 0;

                            reporter.progress();

                      }

                 }

                 result.set(tmp);

                 output.collect(newkey, result);//輸出最后的匯總結(jié)果

           }    

      }

 

LogAnalysiser

      public static void main(String[] args)

      {

           try

           {

                 run(args);

           } catch (Exception e)

           {

                 e.printStackTrace();

           }

      }

    public static void run(String[] args) throws Exception

      {

           if (args == null || args.length <2)

           {

                 System.out.println("need inputpath and outputpath");

                 return;

           }

           String inputpath = args[0];

           String outputpath = args[1];

           String shortin = args[0];

           String shortout = args[1];

           if (shortin.indexOf(File.separator) >= 0)

                 shortin = shortin.substring(shortin.lastIndexOf(File.separator));

           if (shortout.indexOf(File.separator) >= 0)

                 shortout = shortout.substring(shortout.lastIndexOf(File.separator));

           SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");

           shortout = new StringBuffer(shortout).append("-")

                 .append(formater.format(new Date())).toString();

          

          

           if (!shortin.startsWith("/"))

                 shortin = "/" + shortin;

           if (!shortout.startsWith("/"))

                 shortout = "/" + shortout;

           shortin = "/user/root" + shortin;

           shortout = "/user/root" + shortout;              

           File inputdir = new File(inputpath);

           File outputdir = new File(outputpath);

           if (!inputdir.exists() || !inputdir.isDirectory())

           {

                 System.out.println("inputpath not exist or isn‘t dir!");

                 return;

           }

           if (!outputdir.exists())

           {

                 new File(outputpath).mkdirs();

           }

          

           JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config

           FileSystem fileSys = FileSystem.get(conf);

           fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS

 

           conf.setJobName("analysisjob");

           conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會(huì)檢查

           conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會(huì)檢查

           conf.setMapperClass(MapClass.class);

           conf.setCombinerClass(CombinerClass.class);

           conf.setReducerClass(ReduceClass.class);

           conf.setPartitionerClass(PartitionerClass.class);

           conf.set("mapred.reduce.tasks", "2");//強(qiáng)制需要有兩個(gè)Reduce來分別處理流量和次數(shù)的統(tǒng)計(jì)

           FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑

           FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑

          

           Date startTime = new Date();

            System.out.println("Job started: " + startTime);

            JobClient.runJob(conf);

            Date end_time = new Date();

            System.out.println("Job ended: " + end_time);

            System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");

            //刪除輸入和輸出的臨時(shí)文件

           fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));

           fileSys.delete(new Path(shortin),true);

           fileSys.delete(new Path(shortout),true);

      }

以上的代碼就完成了所有的邏輯性代碼,然后還需要一個(gè)注冊(cè)驅(qū)動(dòng)類來注冊(cè)業(yè)務(wù)Class為一個(gè)可標(biāo)示的命令,讓hadoop jar可以執(zhí)行。

public class ExampleDriver {

  public static void main(String argv[]){

    ProgramDriver pgd = new ProgramDriver();

    try {

      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");

      pgd.driver(argv);

    }

    catch(Throwable e){

      e.printStackTrace();

    }

  }

}

將代碼打成jar,并且設(shè)置jarmainClassExampleDriver這個(gè)類。

 

在分布式環(huán)境啟動(dòng)以后執(zhí)行如下語句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

 

/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會(huì)看見整個(gè)執(zhí)行過程,包括了Map,Reduce的進(jìn)度。執(zhí)行完畢會(huì)在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個(gè)文件:part-00000part-00001分別記錄了統(tǒng)計(jì)后的結(jié)果。

       如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce的創(chuàng)建情況以及執(zhí)行情況。

 

在運(yùn)行期也可以通過瀏覽器來查看Map,Reduce的情況:

http://MasterIP:50030/jobtracker.jsp

 

 


Hadoop集群測(cè)試

       首先這里使用上面的范例作為測(cè)試,也沒有做太多的優(yōu)化配置,這個(gè)測(cè)試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。

 

文件復(fù)制數(shù)為1,blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時(shí)間(秒)

2

95

38

2

950

337

4

95

24

4

950

178

6

95

21

6

950

114

 

Blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時(shí)間(秒)

2(文件復(fù)制數(shù)為1

950

337

2(文件復(fù)制數(shù)為3

950

339

6(文件復(fù)制數(shù)為1

950

114

6(文件復(fù)制數(shù)為3

950

117

 

文件復(fù)制數(shù)為1

 

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時(shí)間(秒)

6(blocksize 5M)

95

21

6(blocksize 77M)

95

26

4(blocksize 5M)

950

178

4(blocksize 50M)

950

54

6(blocksize 5M)

950

114

6(blocksize 50M)

950

44

6(blocksize 77M)

950

74

 

測(cè)試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測(cè)幾次同樣條件下都是一樣。

測(cè)試結(jié)果可以看出一下幾點(diǎn):

1.       機(jī)器數(shù)對(duì)于性能還是有幫助的(等于沒說^_^)。

2.       文件復(fù)制數(shù)的增加只對(duì)安全性有幫助,但是對(duì)于性能沒有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準(zhǔn)備的時(shí)間很長。

3.       blocksize對(duì)于性能影響很大,首先如果將block劃分的太小,那么將會(huì)增加job的數(shù)量,同時(shí)也增加了協(xié)作的代價(jià),降低了性能,但是配置的太大也會(huì)讓job不能最大化并行處理。所以這個(gè)值的配置需要根據(jù)數(shù)據(jù)處理的量來考慮。

4.       最后就是除了這個(gè)表里面列出來的結(jié)果,應(yīng)該去仔細(xì)看輸出目錄中的_logs/history中的xxx_analysisjob這個(gè)文件,里面記錄了全部的執(zhí)行過程以及讀寫情況。這個(gè)可以更加清楚地了解哪里可能會(huì)更加耗時(shí)。

 

 

隨想

       “云計(jì)算”熱的燙手,就和SAASWeb2,SNS等等一樣,往往都是在搞概念,只有真正踏踏實(shí)實(shí)的那些大型的互聯(lián)網(wǎng)公司,才會(huì)投入人力物力去研究符合自己的分布式計(jì)算。其實(shí)當(dāng)你的數(shù)據(jù)量沒有那么大的時(shí)候,這種分布式計(jì)算也就僅僅只是一個(gè)玩具而已,真正只有解決問題的過程中,它深層次的問題才會(huì)被挖掘出來。

       這篇文章僅僅是為了給對(duì)分布式計(jì)算有興趣的朋友拋個(gè)磚,要想真的掘到金子,那么就踏踏實(shí)實(shí)的去用,去想,去分析。后續(xù)自己也會(huì)更進(jìn)一步的去研究框架中的實(shí)現(xiàn)機(jī)制,在解決自己?jiǎn)栴}的同時(shí),也能夠貢獻(xiàn)一些什么。

       前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實(shí)有多少架構(gòu)師知道什么叫做架構(gòu),架構(gòu)師的職責(zé)是什么,與其追求這么一個(gè)名號(hào),還不如踏踏實(shí)實(shí)的作塊石頭沉到水底,積累和沉淀的過程就是一種成長。

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
生活服務(wù)
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服