2013-04-17 00:24:56| 分類:
Hadoop文檔 | 標(biāo)簽:
mapreduce實(shí)例 |字號(hào)
例子來自hadoop_the_definitive_guide這本書
MapReduce是一種用于數(shù)據(jù)處理的編程模式。這種模式通常是比較簡(jiǎn)單的,但是編程過程中通常不會(huì)被簡(jiǎn)單的表達(dá)和方便的應(yīng)用。Hadoop能夠支持各種語言的Mapreduce的變成實(shí)現(xiàn)。在本章,我們將列舉出JAVA,Ruby,Python和C++這些語言的變成實(shí)現(xiàn)。這里的MapReduce編程,能夠在靈活的使用和配置足夠的機(jī)器,去處理那些海量數(shù)據(jù)的分析。MapReduce本身就是為處理海量數(shù)據(jù)而生,那么讓我們開始學(xué)習(xí)吧。
天氣數(shù)據(jù)
我們使用天氣的數(shù)據(jù)作為我們的示例,通常氣象站幾乎在每個(gè)小時(shí),很多地點(diǎn)都在手機(jī)我們的氣溫信息,并采用日志的方式記錄下來,所以用MapReduce來分析這些數(shù)據(jù)是在合適不過了。
數(shù)據(jù)格式
我們使用的數(shù)據(jù)是從北美國家氣象中心獲?。∟CDC,
http://www.ncdc.noaa.gov/),數(shù)據(jù)是按照行存儲(chǔ)的ASCII格式的數(shù)據(jù),每行一條記錄。這種可選的變長(zhǎng)格式可以囊括很多氣象元素的信息。簡(jiǎn)單的點(diǎn),我們把分析點(diǎn)聚焦在最基本的元素上,比如說溫度,也就是說那些經(jīng)常被提及和被關(guān)注的。
Example 2-1 列出了一條記錄的域?qū)傩裕╢ields)格式,一行被分割成很多行,以便于我們添加注釋加以解釋。
Example 2-1. Format of a National Climate Data Center record
0057
332130 # USAF 氣象站ID
99999 # WBAN 氣象站ID
19500101 # 記錄日期
0300 # 記錄時(shí)間
4
+51317 # 維度 (degrees x 1000)
+028783 # 經(jīng)度 (degrees x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 風(fēng)向 (degrees)
1 # quality code
N
0072
1
00450 # 采集高度(meters)
1 # quality code
C
N
010000 # 可見度 (meters)
1 # quality code
N9
-0128 # 空氣溫度 (degrees Celsius x 10)
1 # quality code
-0139 # 最低氣溫 (degrees Celsius x 10)
1 # quality code
10268 # 大氣壓力 (hectopascals x 10)
1 # quality code
數(shù)據(jù)文件按照日期和氣象站的地點(diǎn)組織,目錄從1901到2001年按照年來分目錄,其中每個(gè)氣象站的數(shù)據(jù)按照gzip壓縮方式打包到一個(gè)文件中,下面這個(gè)例子列出累1990年的那個(gè)目錄信息。
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
目前為止,已經(jīng)有成千上萬個(gè)氣象站,全部的數(shù)據(jù)將由一些相對(duì)來說比較小的文件組成,相對(duì)來說處理小文件比較容易。所以這就是為什么我們按照年份拆分成小文件。
使用UNIX 工具來分析這些數(shù)據(jù)
我們想找到每年有最高溫度的那條記錄。我們先不使用Hadoop,實(shí)現(xiàn)一個(gè),之后用來檢測(cè)Hadoop的計(jì)算結(jié)果。
當(dāng)然了這種分析使用AWK時(shí)在合適不過了。Example 2-2是一個(gè)簡(jiǎn)單的實(shí)現(xiàn)腳本。
Example 2-2. A program for finding the maximum recorded temperature by year from NCDC weather
records
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
Done
這個(gè)腳本遍歷所有的按照年分割的壓縮文件,先打印出年份,然后對(duì)每個(gè)文件使用awk,匹配出滿足的filed數(shù)據(jù)(溫度),通過加0的方式,將溫度轉(zhuǎn)換成整形,下面是驗(yàn)證溫度是否合法(9999的溫度表示丟失),之后就是按照數(shù)據(jù)比較,找出最大值了。下面是執(zhí)行結(jié)果。
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
這里的溫度的值被放大了10倍,所以在這里 1901年的最高氣溫是31.7度,這個(gè)腳本在高性能的計(jì)算機(jī)上足足跑了42分鐘,為了加速計(jì)算,我們就需要并行的執(zhí)行程序,理論上不同的年份我們都可以在那些閑置的機(jī)器上起一個(gè)單獨(dú)的進(jìn)程來處理。然而卻存在下面幾個(gè)問題。
首先,如果數(shù)據(jù)分割不等,就有可能造成不同的年份的數(shù)據(jù)大小不同,有些進(jìn)程處理的要比其他進(jìn)程處理的快一些。最終的執(zhí)行時(shí)間取決于運(yùn)行時(shí)間最長(zhǎng)的那個(gè)。
第二,數(shù)據(jù)的聚合也是一個(gè)問題,有些數(shù)據(jù)出完后,你可能需要進(jìn)一步的處理,使用更多的規(guī)則,再次進(jìn)行并行計(jì)算。
第三,你仍然被計(jì)算機(jī)的計(jì)算能力所限制著,一臺(tái)機(jī)器的處理能力有限,但使用多臺(tái)那么你的協(xié)調(diào)成本也會(huì)上來,如何控制總體的進(jìn)度,你如何處理失敗的進(jìn)程等等都是一個(gè)問題。
所以總結(jié)起來這樣處理起來還是很復(fù)雜的,那么下面介紹使用Hadoop來使這一切變得更簡(jiǎn)單吧。
使用Hadoop分析數(shù)據(jù)
為了使用Hadoop在并行處理上的優(yōu)勢(shì),我們要提供MapReduce工作方式的描述,在經(jīng)過一些本地的,小規(guī)模測(cè)試后,我們開始在我們的集群上應(yīng)用它,
Map 與 Reduce
MapReduce將進(jìn)程分為兩部分,分為map和reduce,每部分都采用key-value對(duì)的方式進(jìn)行輸入和輸出,這些都是又程序員來進(jìn)行選擇。程序員需要分別為map和reduce提供實(shí)現(xiàn)函數(shù)。
在這里我們的map的輸入就是NCDC的原始?xì)庀髷?shù)據(jù)。我們把數(shù)據(jù)格式化成文本格式,每一行代表一個(gè)數(shù)據(jù)記錄.key就是當(dāng)前行相對(duì)文件起始的偏移,不過在這里我們用不到它(這個(gè)key),所以忽略它。
我們的map函數(shù)比較簡(jiǎn)單,就是從那些數(shù)據(jù)中抽取出我們關(guān)心的數(shù)據(jù),年份和溫度。這樣的話map函數(shù)就是一個(gè)數(shù)據(jù)預(yù)處理的過程,他組裝好數(shù)據(jù)然后交給reduce函數(shù)去繼續(xù)進(jìn)一步查找每年中的最高氣溫。當(dāng)然map函數(shù)也是一個(gè)很好丟棄垃圾數(shù)據(jù)的地方:這里我們丟棄那些無效的或不正確的溫度。
為了我們能夠更好的理解map的工作,可以從下面這些樣本數(shù)據(jù)中來理解。
(一些無用的數(shù)據(jù)我們用省略號(hào)忽略了)
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
這樣的話,每行的數(shù)據(jù)就被解析成key-value對(duì):
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
keys在這里是每行相對(duì)于文件起始位置的偏移量,這里我們忽略不用。Map函數(shù)僅僅從中解析出年份和溫度(數(shù)據(jù)中加粗的部分),然后將他們輸出(溫度被轉(zhuǎn)換成整形。)
(1950, 0)
(1950, 22)
(1950, ?11)
(1949, 111)
(1949, 78)
從map返回的output,在被送往reduce函數(shù)之前會(huì)被mapReduce函數(shù)進(jìn)行處理。把key-value對(duì)進(jìn)行排序和分組。所以接下來在reduce函數(shù)里看到的將是如下格式的輸入:
(1949, [111, 78])
(1950, [0, 22, ?11])
每年的溫度數(shù)據(jù)在后面都可以通過后面的列表讀取,所有的reduce函數(shù)需要做的就是遍歷他然后找出最大的數(shù)據(jù)即可,最后結(jié)果如下。
(1949, 111)
(1950, 22)
最后輸出這種結(jié)果:每年的最高氣溫。
整個(gè)的數(shù)據(jù)流被表示成像Figure 2-1這種形式。有點(diǎn)像Unix的管道,用來表示MapReduce數(shù)據(jù)流。MapReduce數(shù)據(jù)流我們將在后面的章節(jié)繼續(xù)介紹。
Figure 2-1. MapReduce logical data flow
Java MapReduce
在了解了MapReduce的工作方式后,下一步我們用編碼實(shí)現(xiàn)它。我們需要做三件事:實(shí)現(xiàn)map,reduce函數(shù),以及一些執(zhí)行job的代碼。Map方法是通過實(shí)現(xiàn)一個(gè)聲明了map方法的Mapper接口實(shí)現(xiàn)的,Example2-3給出累示例:
Example 2-3. Mapper for maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}
Mapper接口是一種通用類型,其中的map方法有四個(gè)參數(shù),用來指明輸入key,輸入value,輸出key和輸出value。在之前的例子中,輸入key是一個(gè)長(zhǎng)整形的偏移量,輸入值是一行的文本,輸出key是一年,輸出值是溫度(一個(gè)整形)。但這里并沒有使用JAVA的內(nèi)置數(shù)據(jù)類型,Hadoop提供了他直接的基本數(shù)據(jù)類型用于網(wǎng)絡(luò)傳輸進(jìn)行數(shù)據(jù)序列化。這些都是在org.apache.hadoop.io package這個(gè)包里。這里我們使用LongWritable來體態(tài) JavaLong,Text替代JAVA String,IntWritable替代JAVA Integer。
map方法傳遞key和value,我們把文本的值轉(zhuǎn)換成Java String(value.toString()),然后使用subString()解析出我們需要的數(shù)據(jù)。
Map方法同樣也提供了一個(gè)OutputCollector實(shí)例,用于輸出。這樣的話,我們將年份定義成Text對(duì)象(我們只是把他作為key),溫度使用IntWritable類型。我們只需要在獲取溫度值后,記錄到output記錄里就可以了。
reduce方法的定義與之類似,見Example 2-4.
Example 2-4. Reducer for maximum temperature example
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MaxTemperatureReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
依然,reduce的函數(shù)也是由四種參數(shù)類型組成輸入和輸出類型,reduce函數(shù)的輸入類型必須與map里的輸出類型相對(duì)應(yīng):Text和IntWriteable類型。在這個(gè)例子中,輸出output是text和IntWritable,這是我們通過遍歷數(shù)據(jù)比較大小找出的一年中的最高氣溫。
第三部分是我們編寫MapReduce的運(yùn)行腳本(參見Example2-5)
Example 2-5. Application to find the maximum temperature in the weather dataset
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
一個(gè)JobConf對(duì)象是用于配置一個(gè)job的。他描述了應(yīng)該怎樣執(zhí)行一個(gè)job.當(dāng)我們?cè)贖adoop集群上執(zhí)行job的時(shí)候,我們通常會(huì)將代碼打包成JAR文件(用于Hadoop的集群分布式處理),我們可以再JobConf的構(gòu)造函數(shù)里指定一個(gè)類的名稱,這樣hadoop就會(huì)在本地找出包含此類的相關(guān)JAR文件。
在創(chuàng)建完JobConf對(duì)象后,我們需要指定輸入和輸出路徑。我們通過FileInputFormat中的靜態(tài)方法addInputPath方法來指定一個(gè)輸入路徑,可以是一個(gè)單獨(dú)的文件,也可以是一個(gè)目錄(這種情況下,他會(huì)將這個(gè)目錄下的所有文件都作為輸入),或者一個(gè)文件匹配正則。同樣的話addInputPath可以被多次調(diào)用,用以添加多條路徑。
輸出路徑(只能有一個(gè))可以通過FileOutputFormat的靜態(tài)方法setOutputPath來指定。這個(gè)目錄用來存放reduce函數(shù)的執(zhí)行結(jié)果,在執(zhí)行job前這個(gè)目錄不能存在,否則Hadoop將不能執(zhí)行job.這樣是為了防止數(shù)據(jù)丟失(偶然的,其他的,不可預(yù)知的,進(jìn)程對(duì)這個(gè)目錄的重寫)
接下來我們就是通過 setMapperClass()和setReducerClass()來指定map和reduce.
setOutputKeyClass()和setOutputValueClass()用于控制map和reduce函數(shù),就像我們的例子里一樣,他們經(jīng)常是相同的,當(dāng)然你也可以通過setMapOutputKeyClass() 和setMapOutputValueClass().來分別指定
輸入類型用于控制輸入的數(shù)據(jù)格式,這里我們使用默認(rèn)的TextInputFormat
在我們對(duì)map和reduce設(shè)置完成后,我們就可以調(diào)用JobClicen的靜態(tài)方法runJob()去執(zhí)行job了,等待他完成后他們會(huì)向進(jìn)程的控制終端輸出進(jìn)程信息。
一個(gè)執(zhí)行的實(shí)例
寫完這個(gè)MapReduce后,我們先在一個(gè)小量的數(shù)據(jù)集上去驗(yàn)證我們的結(jié)果。先按照附錄A的說明搭建我們的獨(dú)立環(huán)境吧(單臺(tái)機(jī)器),這種模式hadoop使用本地文件系統(tǒng)并使用本地job執(zhí)行。讓我們測(cè)試結(jié)果。
% export HADOOP_CLASSPATH=build/classes
% hadoop MaxTemperature input/ncdc/sample.txt output
09/04/07 12:34:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=Job
Tracker, sessionId=
09/04/07 12:34:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the
arguments. Applications should implement Tool for the same.
09/04/07 12:34:35 WARN mapred.JobClient: No job jar file set. User classes may not
be found. See JobConf(Class) or JobConf#setJar(String).
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1
09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001
09/04/07 12:34:35 INFO mapred.FileInputFormat: Total input paths to process : 1
09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1
09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100
09/04/07 12:34:35 INFO mapred.MapTask: data buffer = 79691776/99614720
09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680
09/04/07 12:34:35 INFO mapred.MapTask: Starting flush of map output
09/04/07 12:34:36 INFO mapred.MapTask: Finished spill 0
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is
done. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner: file:/Users/tom/workspace/htdg/input/n
cdc/sample.txt:0+529
09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.Merger: Merging 1 sorted segments
09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments
left of total size: 57 bytes
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done
. And is in the process of commiting
09/04/07 12:34:36 INFO mapred.LocalJobRunner:
09/04/07 12:34:36 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is
allowed to commit now
09/04/07 12:34:36 INFO mapred.FileOutputCommitter: Saved output of task
'attempt_local_0001_r_000000_0' to file:/Users/tom/workspace/htdg/output
09/04/07 12:34:36 INFO mapred.LocalJobRunner: reduce > reduce
09/04/07 12:34:36 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
09/04/07 12:34:36 INFO mapred.JobClient: map 100% reduce 100%
09/04/07 12:34:36 INFO mapred.JobClient: Job complete: job_local_0001
09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13
09/04/07 12:34:36 INFO mapred.JobClient: FileSystemCounters
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_READ=27571
09/04/07 12:34:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=53907
09/04/07 12:34:36 INFO mapred.JobClient: Map-Reduce Framework
09/04/07 12:34:36 INFO mapred.JobClient: Reduce input groups=2
09/04/07 12:34:36 INFO mapred.JobClient: Combine output records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map input records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce shuffle bytes=0
09/04/07 12:34:36 INFO mapred.JobClient: Reduce output records=2
09/04/07 12:34:36 INFO mapred.JobClient: Spilled Records=10
09/04/07 12:34:36 INFO mapred.JobClient: Map output bytes=45
09/04/07 12:34:36 INFO mapred.JobClient: Map input bytes=529
09/04/07 12:34:36 INFO mapred.JobClient: Combine input records=0
09/04/07 12:34:36 INFO mapred.JobClient: Map output records=5
09/04/07 12:34:36 INFO mapred.JobClient: Reduce input records=5
hadoop命令行的第一個(gè)參數(shù)指定了一個(gè)類名稱,他通過JVM來執(zhí)行這個(gè)class ,這樣在加載hadoop庫(因?yàn)樗麄円蕾囘@些庫),指定路徑,獲取配置信息的時(shí)候比直接使用JAVA方便。
我們可以通過定義HADOOP_CLASSPATH環(huán)境變量去添加應(yīng)用類的路徑classPath.
注意:當(dāng)我們?cè)诒镜兀▎螕簦┠J较聢?zhí)行Hadoop的時(shí)候,假設(shè)你已經(jīng)設(shè)置了HADOOP_CLASSPATH.命令必須從這個(gè)已經(jīng)安裝的目錄執(zhí)行。
Job會(huì)輸出一些對(duì)我們有用的信息(警告時(shí)告訴我們JAR文件沒有找到,在集群模式下我們不會(huì)看到),比如說我們能夠看到hadoop為每個(gè)job(作業(yè))都分配了像job_local_0001這種ID。這里執(zhí)行了一個(gè)map任務(wù)和一個(gè)reduce任務(wù)(輸出了這種帶ID的信息attempt_local_0001_m_000000_0)。任務(wù)ID對(duì)于我們調(diào)試mapReduce作業(yè)是非常有幫助的。
最后那個(gè)段落輸出了 Counters 這種信息,用來統(tǒng)計(jì)每個(gè)作業(yè)的執(zhí)行,經(jīng)常用于檢測(cè)處理的數(shù)據(jù)總數(shù)是否復(fù)合預(yù)期。比如說,我們可以最終通過這個(gè)系統(tǒng)的總記錄數(shù):map輸入5個(gè),并輸出5個(gè)。
輸出結(jié)果被輸送到指定的output目錄,每個(gè)reduce生成一個(gè)文件。當(dāng)前任務(wù)只有一個(gè)reduce,所以我們找到如下的文件名part_00000:
cat output/part-00000
1949 111
1950 22
這個(gè)結(jié)果與之前我們的驗(yàn)證時(shí)一樣的。1949年11.1度,1950年2.2度(1950年又這么冷么?垃圾氣象站的檢測(cè)結(jié)果)