Audience and Pre-Requisites
1.讀者和預(yù)備知識(shí)
Serial vs. Parallel Programming
2.串行編程與并行編程
The Basics
3.基礎(chǔ)知識(shí)
What is MapReduce?
4.MapReduce
MapReduce Execution Overview
5.MapReduce執(zhí)行概要
MapReduce Examples
6.MapReduce例子
References
7.參考資料
--------------------------------------------------------------------------------
Audience and Pre-Requisites
1.讀者和預(yù)備知識(shí)
This tutorial covers the basics of parallel programming and the MapReduce programming model. The pre-requisites are significant programming experience with a language such as C++ or Java, and data structures & algorithms.
本教程包含并行編程和MapReduce編程模型。預(yù)備的知識(shí)包括一定的編程經(jīng)驗(yàn)(C++,Java etc)、數(shù)據(jù)結(jié)構(gòu)和算法知識(shí)。
Serial vs. Parallel Programming
2.串行編程與并行編程
In the early days of computing, programs were serial, that is, a program consisted of a sequence of instructions, where each instruction executed one after the other. It ran from start to finish on a single processor.
在早期的計(jì)算機(jī),程序都是串行的,那意味著,一個(gè)程序由一連串順序執(zhí)行的指令組成。它可以在一個(gè)處理器里從頭到尾完成全部工作。
Parallel programming developed as a means of improving performance and efficiency. In a parallel program, the processing is broken up into parts, each of which can be executed concurrently. The instructions from each part run simultaneously on different CPUs. These CPUs can exist on a single machine, or they can be CPUs in a set of computers connected via a network.
為了提高程序的性能和效率,并行編程應(yīng)運(yùn)而生。在并行式程序里,處理過(guò)程被分為多個(gè)部分,每個(gè)部分可以并發(fā)執(zhí)行。其中每個(gè)部分里面的指令可以同時(shí)在不同的CPU上運(yùn)行。這些CPU可以在同一臺(tái)機(jī)器上,也可以在網(wǎng)絡(luò)中的一組機(jī)器上。
Not only are parallel programs faster, they can also be used to solve problems on large datasets using non-local resources. When you have a set of computers connected on a network, you have a vast pool of CPUs, and you often have the ability to read and write very large files (assuming a distributed file system is also in place).
并行式程序不僅運(yùn)行更快,而且能被用于解決分布式大數(shù)據(jù)集的問(wèn)題。當(dāng)你有一組網(wǎng)絡(luò)中的機(jī)器、大量的CPU、一個(gè)分布式文件系統(tǒng)和讀寫海量文件的權(quán)限,這時(shí)候,并行式程序?qū)@得非常實(shí)用。
The Basics
3.基礎(chǔ)知識(shí)
The first step in building a parallel program is identifying sets of tasks that can run concurrently and/or paritions of data that can be processed concurrently. Sometimes it's just not possible. Consider a Fibonacci function:
建立并行式程序的第一步是確定能并行執(zhí)行的任務(wù),或者能并行處理的數(shù)據(jù)。不過(guò),有時(shí)候這并不可能??紤]一個(gè)Fibonacci(斐波那契)函數(shù):
Fk+2 = Fk + Fk+1
Fk+2 = Fk + Fk+1
A function to compute this based on the form above, cannot be "parallelized" because each computed value is dependent on previously computed values.
計(jì)算上述函數(shù)的程序并不能被并行化。因?yàn)槊看斡?jì)算都基于前一次計(jì)算的值。
A common situation is having a large amount of consistent data which must be processed. If the data can be decomposed into equal-size partitions, we can devise a parallel solution. Consider a huge array which can be broken up into sub-arrays.
而一般的情況是,我們需要處理一批大量的數(shù)據(jù),這些數(shù)據(jù)可以被分成同等大小的若干份,例如一個(gè)大的數(shù)組被分成子數(shù)組。如果每個(gè)數(shù)組的元素都需要被處理,而且數(shù)組間沒(méi)有依賴關(guān)系,執(zhí)行的計(jì)算任務(wù)之間也不需要通信,這樣的話將是一個(gè)執(zhí)行并行式計(jì)算的理想環(huán)境。下面介紹一種一般的實(shí)現(xiàn)并行計(jì)算的技術(shù)----master/worker(master和worker分別代表計(jì)算環(huán)境中主/從的角色----譯者注)。
If the same processing is required for each array element, with no dependencies in the computations, and no communication required between tasks, we have an ideal parallel computing opportunity. Here is a common implementation technique called master/worker.
The MASTER:
Master:
initializes the array and splits it up according to the number of available WORKERS
初始化需要處理的數(shù)組,然后根據(jù)可用的worker數(shù)量,把數(shù)組分解成若干部分(子數(shù)組)。
sends each WORKER its subarray
把子數(shù)組發(fā)送給worker
receives the results from each WORKER
接收worker返回的處理結(jié)果
The WORKER:
worker:
receives the subarray from the MASTER
接收master發(fā)送過(guò)來(lái)的子數(shù)組
performs processing on the subarray
對(duì)子數(shù)組進(jìn)行處理
returns results to MASTER
向master返回結(jié)果
This model implements static load balancing which is commonly used if all tasks are performing the same amount of work on identical machines. In general, load balancing refers to techniques which try to spread tasks among the processors in a parallel system to avoid some processors being idle while others have tasks queueing up for execution.
一般來(lái)說(shuō),在并行式系統(tǒng)里,負(fù)載平衡是指在處理器之間平衡分配任務(wù),避免出現(xiàn)某些處理器有等待執(zhí)行的任務(wù)而某些處理器則空閑的情況。而上面提到的這個(gè)模型應(yīng)用的是靜態(tài)的負(fù)載平衡,它常用于每臺(tái)機(jī)器負(fù)責(zé)同等工作量的情況。
A static load balancer allocates processes to processors at run time while taking no account of current network load. Dynamic algorithms are more flexible, though more computationally expensive, and give some consideration to the network load before allocating the new process to a processor.
靜態(tài)負(fù)載平衡在分配任務(wù)的時(shí)候并不考慮現(xiàn)時(shí)的網(wǎng)絡(luò)負(fù)載情況。而動(dòng)態(tài)負(fù)載算法雖然花費(fèi)更高,但它更加靈活。在分配任務(wù)之前它會(huì)對(duì)網(wǎng)絡(luò)負(fù)載進(jìn)行考慮。
As an example of the MASTER/WORKER technique, consider one of the methods for approximating pi. The first step is to inscribe a circle inside a square:
下面考慮一個(gè)MASTER/WORKER的具體例子:估算PI。首先,設(shè)有一個(gè)圓在正方形內(nèi)。如圖1.
圖1
The area of the square, denoted As = (2r)2 or 4r2. The area of the circle, denoted Ac, is pi * r2. So:
所以,正方形的面積As = (2r)2 or 4r2
圓的面積Ac = pi*r2
pi = Ac / r2
所以有,
pi = Ac / r2
As = 4r2
r2 = As / 4
pi = 4 * Ac / As
As = 4r2
r2 = As / 4
pi = 4 * Ac / As
The reason we are doing all these algebraic manipulation is we can parallelize this method in the following way.
通過(guò)上面代數(shù)運(yùn)算的分析,我們可以試著用并行式的方法解決這個(gè)問(wèn)題。
首先,問(wèn)題的解決可以分解為以下四步。
Randomly generate points in the square
1.在正方形內(nèi)隨機(jī)產(chǎn)生一些點(diǎn)
Count the number of generated points that are both in the circle and in the square
2.計(jì)算同時(shí)在圓內(nèi)和正方形內(nèi)的點(diǎn)
r = the number of points in the circle divided by the number of points in the square
3.計(jì)算r,r = 圓內(nèi)的點(diǎn)數(shù)目/所有正方形內(nèi)的點(diǎn)數(shù)目
PI = 4 * r
4. PI= 4*r
And here is how we parallelize it:
然后,我們?cè)囍鴮?duì)這個(gè)過(guò)程進(jìn)行并行化處理。
NUMPOINTS = 100000; // some large number - the bigger, the closer the approximation
NUMPOINTS = 100000; // 隨機(jī)選取的點(diǎn)的數(shù)量,越大則估算越準(zhǔn)確
p = number of WORKERS;
p = WORKERS的數(shù)目
numPerWorker = NUMPOINTS / p;
每個(gè)worker處理的點(diǎn)數(shù)目 numPerWorker = NUMPOINTS / p
countCircle = 0; // one of these for each WORKER
countCircle = 0; // 圓內(nèi)的點(diǎn)的計(jì)數(shù)器,每個(gè)worker維護(hù)一個(gè),初始化為0
// each WORKER does the following:
//r每個(gè)worker執(zhí)行如下的工作。
for (i = 0; i < numPerWorker; i++) {
generate 2 random numbers that lie inside the square;
xcoord = first random number;
ycoord = second random number;
if (xcoord, ycoord)
lies inside the circle countCircle++;}//
//master執(zhí)行如下的工作:
receives from WORKERS their countCircle values computes PI from these values: PI = 4.0 * countCircle / NUMPOINTS;(應(yīng)該對(duì)每個(gè)worker返回的countCircle進(jìn)行相加------譯者注)
4.MapReduce
Now that we have seen some basic examples of parallel programming, we can look at the MapReduce programming model. This model derives from the map and reduce combinators from a functional language like Lisp.
上面,我們給出了一些并行式編程的列子,現(xiàn)在我們看看MapReduce的編程模型。這個(gè)模型的靈感來(lái)自于函數(shù)式編程語(yǔ)言Lisp中的map(映射)和reduce(化簡(jiǎn))。
In Lisp, a map takes as input a function and a sequence of values. It then applies the function to each value in the sequence. A reduce combines all the elements of a sequence using a binary operation. For example, it can use "+" to add up all the elements in the sequence.
在Lisp里面,map接收一個(gè)函數(shù)和一個(gè)序列作為輸入,然后把這個(gè)輸入的函數(shù)應(yīng)用于這個(gè)序列里的每個(gè)元素。而reduce則通過(guò)一個(gè)二進(jìn)制操作把一個(gè)序列的元素聯(lián)合起來(lái),例如它可以使用加法對(duì)序列里的元素求和。
MapReduce is inspired by these concepts. It was developed within Google as a mechanism for processing large amounts of raw data, for example, crawled documents or web request logs. This data is so large, it must be distributed across thousands of machines in order to be processed in a reasonable time. This distribution implies parallel computing since the same computations are performed on each CPU, but with a different dataset. MapReduce is an abstraction that allows Google engineers to perform simple computations while hiding the details of parallelization, data distribution, load balancing and fault tolerance.
MapReduce是受到這些概念的啟發(fā)而產(chǎn)生的一個(gè)模型。Google對(duì)這個(gè)模型進(jìn)行了實(shí)現(xiàn),用來(lái)處理巨量的數(shù)據(jù),例如網(wǎng)絡(luò)爬蟲得到的文檔和web訪問(wèn)到的記錄。由于數(shù)據(jù)量大,它必須被分布到數(shù)千臺(tái)機(jī)器進(jìn)行處理。因?yàn)橛?jì)算分布到不同的CPU,且每個(gè)CPU處理不同的數(shù)據(jù)集,所以這樣的分布式處理意味著可以采用并行計(jì)算。通過(guò)MapReduce的抽象,google工程師可以簡(jiǎn)單地進(jìn)行運(yùn)算,而不必考慮并行運(yùn)算的細(xì)節(jié)、數(shù)據(jù)的分配、負(fù)載平衡和錯(cuò)誤處理。
Map, written by a user of the MapReduce library, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the reduce function.
MapReduce 庫(kù)的用戶指定一個(gè)Map函數(shù),通過(guò)這個(gè)函數(shù)接收key/value對(duì),然后產(chǎn)生一系列的中間key/value對(duì)。MapReduce庫(kù)把所有具有相同key的中間對(duì)組合起來(lái),傳遞到reduce函數(shù)。
The reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. [1]
Reduce函數(shù)同樣由用戶指定。它接收一個(gè)中間key和對(duì)應(yīng)該key的一個(gè)數(shù)據(jù)集,然后把這個(gè)大的數(shù)據(jù)集組合成一個(gè)更小數(shù)據(jù)集。
Consider the problem of counting the number of occurrences of each word in a large collection of documents:
下面考慮一個(gè)統(tǒng)計(jì)詞頻的問(wèn)題。
map(String key, String value):
// key: document name
// value: document contents for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of countsint result = 0;
for each v in values: result += ParseInt(v);
Emit(AsString(result)); [1]
在上面的例子中,map函數(shù)對(duì)每個(gè)單詞記一次計(jì)數(shù)(在這個(gè)例子中為“1”)。reduce函數(shù)把同一個(gè)單詞的計(jì)數(shù)全部加起來(lái)。
MapReduce Execution Overview
5.MapReduce執(zhí)行概要
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits or shards. The input shards can be processed in parallel on different machines.
對(duì)于需要處理的數(shù)據(jù),首先M塊,然后把數(shù)據(jù)塊分配到多臺(tái)機(jī)器上。每個(gè)機(jī)器對(duì)數(shù)據(jù)塊進(jìn)行Map函數(shù)處理。這樣,輸入的數(shù)據(jù)塊就能同時(shí)在不同的機(jī)器上進(jìn)行并行處理。
Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and the partitioning function are specifed by the user.
接著,對(duì)map操作產(chǎn)生的中間key進(jìn)行分塊,分成r塊。分塊函數(shù)和分塊數(shù)目可由用戶指定,例如可以采用函數(shù)hash(key)modR進(jìn)行分塊。
The illustration below shows the overall fow of a MapReduce operation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in the illustration correspond to the numbers in the list below).
下面列出了一個(gè)MapReduce操作的整體流程。
The MapReduce library in the user program first shards the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece. It then starts up many copies of the program on a cluster of machines.
1.MapReduce庫(kù)把輸入的文件(數(shù)據(jù))分成M塊(一般每塊16-64M),然后機(jī)器集群中運(yùn)行多個(gè)mapreduce程序的副本。
One of the copies of the program is special: the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.
2.其中有個(gè)特別的副本:master。其余的都是worker。worker執(zhí)行master分配的任務(wù)。總共有m個(gè)map任務(wù)和r個(gè)reduce任務(wù)需要分配。master選擇空閑的worker分配這些任務(wù)。
A worker who is assigned a map task reads the contents of the corresponding input shard. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.
3.被分配到map任務(wù)的worker讀取對(duì)應(yīng)的數(shù)據(jù)塊。然后通過(guò)數(shù)據(jù)塊分析出key/value對(duì),然后把它們傳遞到用戶定義的map函數(shù)進(jìn)行處理,產(chǎn)生中間的key/value對(duì),在內(nèi)存進(jìn)行緩存。
Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.
4.每隔一段時(shí)間,被緩存的數(shù)據(jù)對(duì)通過(guò)分區(qū)函數(shù)被映射到不同的R個(gè)區(qū)域,然后寫入到本地磁盤。然后這些數(shù)據(jù)的位置被傳遞到master,master把這些位置傳到負(fù)責(zé)reduce任務(wù)的worker
When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. If the amount of intermediate data is too large to fit in memory, an external sort is used.
5.負(fù)責(zé)reduce任務(wù)的worker接收到這些位置信息以后,使用RPC讀取這些數(shù)據(jù)。當(dāng)所有中間數(shù)據(jù)讀取完畢后。通過(guò)中間key對(duì)數(shù)據(jù)進(jìn)行分組,key相同的數(shù)據(jù)對(duì)被分到同一組。
The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
6.reduce worker把整理后的數(shù)據(jù)傳遞到reduce函數(shù)進(jìn)行處理。對(duì)于根據(jù)reduce的分區(qū),reduce函數(shù)的輸出結(jié)果被寫入到不同的輸出文件。
When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.
7.當(dāng)所有map和reduce操作完成后,master喚醒用戶程序。
After successful completion, the output of the MapReduce execution is available in the R output files. [1]
8.完成后,MapReduce的執(zhí)行結(jié)果被保存在R個(gè)輸出文件。
To detect failure, the master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.
為了檢測(cè)可能的故障,master周期性地ping各個(gè)worker。如果某個(gè)worker響應(yīng)超時(shí),master把worker標(biāo)識(shí)為故障。這個(gè)worker處理的任何map操作結(jié)果需要回滾,回滾后的數(shù)據(jù)可由其他正常的worker進(jìn)行處理。類似的,任何在故障機(jī)器上的map或reduce任務(wù)會(huì)被標(biāo)識(shí)為空閑(未分配),master重新對(duì)這些任務(wù)進(jìn)行分配。
Completed map tasks are re-executed when failure occurs because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global fille system.
因?yàn)閙ap任務(wù)把處理后的數(shù)據(jù)存儲(chǔ)在本地磁盤上,所以故障機(jī)器上的map任務(wù)需要重新執(zhí)行。而reduce任務(wù)吧輸出數(shù)據(jù)存儲(chǔ)到全局文件系統(tǒng),所以即時(shí)發(fā)生故障也不需重新執(zhí)行。
MapReduce Examples
6.MapReduce例子
Here are a few simple examples of interesting programs that can be easily expressed as MapReduce computations.
下面是一些mapreduce應(yīng)用中有趣的例子。
Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output.
分布式grep(正則表達(dá)式匹配):map函數(shù)對(duì)符合樣式(正則表達(dá)式規(guī)則)的行進(jìn)行標(biāo)識(shí)。reduce函數(shù)是一個(gè)恒等函數(shù),它只負(fù)責(zé)把中間數(shù)據(jù)發(fā)送到輸出文件。
Count of URL Access Frequency: The map function processes logs of web page requests and outputs. The reduce function adds together all values for the same URL and emits a pair.
URL訪問(wèn)計(jì)數(shù):map函數(shù)處理web頁(yè)面的請(qǐng)求日志。reduce函數(shù)根據(jù)URL進(jìn)行累計(jì)。
Reverse Web-Link Graph: The map function outputspairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: .
web連接圖反轉(zhuǎn):map函數(shù)輸出數(shù)據(jù)對(duì)<目標(biāo)URL,可能的連接>。reduce函數(shù)輸出一個(gè)數(shù)據(jù)對(duì)<目標(biāo)URL,所有可能的連接列表>。
Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list ofpairs. The map function emits a pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final pair.
主機(jī)的詞條矢量:詞條矢量通過(guò)<詞條,頻率>數(shù)據(jù)對(duì)總結(jié)了文檔或文檔集里面重要的單詞。map函數(shù)對(duì)輸入文檔進(jìn)行<詞條,頻率>分析。reduce函數(shù)把所有中間結(jié)果發(fā)送到一臺(tái)特定的主機(jī)。主機(jī)把根據(jù)詞條進(jìn)行累加,丟棄頻率低的詞條,最后得出一個(gè)合適的詞條進(jìn)行主機(jī)描述。
Inverted Index: The map function parses each document, and emits a sequence ofpairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. [1]
反向索引:map函數(shù)對(duì)每個(gè)文檔進(jìn)行索引分析,產(chǎn)生一串?dāng)?shù)據(jù)對(duì)。reduce函數(shù)接收含有特定的單詞數(shù)據(jù)對(duì),然后對(duì)相應(yīng)的文檔ID進(jìn)行排序,得出一個(gè)<索引,文檔ID列表>對(duì)。所有這些輸出的數(shù)據(jù)對(duì)組成了一個(gè)簡(jiǎn)單的反向索引??梢酝ㄟ^(guò)這種方式對(duì)單詞的位置保持跟蹤。
References
7.參考資料
[1] Dean, Jeff and Ghemawat, Sanjay. MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce-osdi04.pdf
[1] Dean, Jeff and Ghemawat, Sanjay. MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce-osdi04.pdf
[2] Lammal, Ralf. Google's MapReduce Programming Model Revisited. http://www.cs.vu.nl/~ralf/MapReduce/paper.pdf
[2] Lammal, Ralf. Google's MapReduce Programming Model Revisited. http://www.cs.vu.nl/~ralf/MapReduce/paper.pdf
[3] Open Source MapReduce: http://lucene.apache.org/hadoop/
[3] Open Source MapReduce: http://lucene.apache.org/hadoop/
聯(lián)系客服