国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
Map-Reduce入門1
Hadoop學(xué)習(xí)總結(jié)之三:Map-Reduce入門
文章分類:Java編程
 
1、Map-Reduce的邏輯過程
假設(shè)我們需要處理一批有關(guān)天氣的數(shù)據(jù),其格式如下:
按照ASCII碼存儲(chǔ),每行一條記錄 每一行字符從0開始計(jì)數(shù),第15個(gè)到第18個(gè)字符為年 第25個(gè)到第29個(gè)字符為溫度,其中第25位是符號(hào)+/-
0067011990999991950051507+0000+
0043011990999991950051512+0022+
0043011990999991950051518-0011+
0043012650999991949032412+0111+
0043012650999991949032418+0078+
0067011990999991937051507+0001+
0043011990999991937051512-0002+
0043011990999991945051518+0001+
0043012650999991945032412+0002+
0043012650999991945032418+0078+
現(xiàn)在需要統(tǒng)計(jì)出每年的最高溫度。
Map-Reduce主要包括兩個(gè)步驟:Map和Reduce
每一步都有key-value對(duì)作為輸入和輸出:
map階段的key-value對(duì)的格式是由輸入的格式所決定的,如果是默認(rèn)的TextInputFormat,則每行作為一個(gè)記錄進(jìn)程處理,其中key為此行的開頭相對(duì)于文件的起始位置,value就是此行的字符文本 map階段的輸出的key-value對(duì)的格式必須同reduce階段的輸入key-value對(duì)的格式相對(duì)應(yīng)
對(duì)于上面的例子,在map過程,輸入的key-value對(duì)如下:
(0, 0067011990999991950051507+0000+)
(33, 0043011990999991950051512+0022+)
(66, 0043011990999991950051518-0011+)
(99, 0043012650999991949032412+0111+)
(132, 0043012650999991949032418+0078+)
(165, 0067011990999991937051507+0001+)
(198, 0043011990999991937051512-0002+)
(231, 0043011990999991945051518+0001+)
(264, 0043012650999991945032412+0002+)
(297, 0043012650999991945032418+0078+)
在map過程中,通過對(duì)每一行字符串的解析,得到年-溫度的key-value對(duì)作為輸出:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)
在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個(gè)列表中作為reduce的輸入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])
在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:
(1950, 22)
(1949, 111)
(1937, 1)
(1945, 78)
其邏輯過程可用如下圖表示:
2、編寫Map-Reduce程序
編寫Map-Reduce程序,一般需要實(shí)現(xiàn)兩個(gè)函數(shù):mapper中的map函數(shù)和reducer中的reduce函數(shù)。
一般遵循以下格式:
map: (K1, V1)  ->  list(K2, V2)
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
reduce: (K2, list(V))  ->  list(K3, V3)
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
throws IOException;
}
對(duì)于上面的例子,則實(shí)現(xiàn)的mapper如下:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(25) == '+') {
airTemperature = Integer.parseInt(line.substring(26, 30));
} else {
airTemperature = Integer.parseInt(line.substring(25, 30));
}
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
實(shí)現(xiàn)的reducer如下:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));
}
}
欲運(yùn)行上面實(shí)現(xiàn)的Mapper和Reduce,則需要生成一個(gè)Map-Reduce得任務(wù)(Job),其基本包括以下三部分:
輸入的數(shù)據(jù),也即需要處理的數(shù)據(jù) Map-Reduce程序,也即上面實(shí)現(xiàn)的Mapper和Reducer 此任務(wù)的配置項(xiàng)JobConf
欲配置JobConf,需要大致了解Hadoop運(yùn)行job的基本原理:
Hadoop將Job分成task進(jìn)行處理,共兩種task:map task和reduce task Hadoop有兩類的節(jié)點(diǎn)控制job的運(yùn)行:JobTracker和TaskTracker JobTracker協(xié)調(diào)整個(gè)job的運(yùn)行,將task分配到不同的TaskTracker上 TaskTracker負(fù)責(zé)運(yùn)行task,并將結(jié)果返回給JobTracker
Hadoop將輸入數(shù)據(jù)分成固定大小的塊,我們稱之input split Hadoop為每一個(gè)input split創(chuàng)建一個(gè)task,在此task中依次處理此split中的一個(gè)個(gè)記錄(record) Hadoop會(huì)盡量讓輸入數(shù)據(jù)塊所在的DataNode和task所執(zhí)行的DataNode(每個(gè)DataNode上都有一個(gè)TaskTracker)為同一個(gè),可以提高運(yùn)行效率,所以input split的大小也一般是HDFS的block的大小。 Reduce task的輸入一般為Map Task的輸出,Reduce Task的輸出為整個(gè)job的輸出,保存在HDFS上。 在reduce中,相同key的所有的記錄一定會(huì)到同一個(gè)TaskTracker上面運(yùn)行,然而不同的key可以在不同的TaskTracker上面運(yùn)行,我們稱之為partition partition的規(guī)則為:(K2, V2) –> Integer, 也即根據(jù)K2,生成一個(gè)partition的id,具有相同id的K2則進(jìn)入同一個(gè)partition,被同一個(gè)TaskTracker上被同一個(gè)Reducer進(jìn)行處理。
public interface Partitioner<K2, V2> extends JobConfigurable {
int getPartition(K2 key, V2 value, int numPartitions);
}
下圖大概描述了Map-Reduce的Job運(yùn)行的基本原理:
下面我們討論JobConf,其有很多的項(xiàng)可以進(jìn)行配置:
setInputFormat:設(shè)置map的輸入格式,默認(rèn)為TextInputFormat,key為L(zhǎng)ongWritable, value為Text setNumMapTasks:設(shè)置map任務(wù)的個(gè)數(shù),此設(shè)置通常不起作用,map任務(wù)的個(gè)數(shù)取決于輸入的數(shù)據(jù)所能分成的input split的個(gè)數(shù) setMapperClass:設(shè)置Mapper,默認(rèn)為IdentityMapper setMapRunnerClass:設(shè)置MapRunner, map task是由MapRunner運(yùn)行的,默認(rèn)為MapRunnable,其功能為讀取input split的一個(gè)個(gè)record,依次調(diào)用Mapper的map函數(shù) setMapOutputKeyClass和setMapOutputValueClass:設(shè)置Mapper的輸出的key-value對(duì)的格式 setOutputKeyClass和setOutputValueClass:設(shè)置Reducer的輸出的key-value對(duì)的格式 setPartitionerClass和setNumReduceTasks:設(shè)置Partitioner,默認(rèn)為HashPartitioner,其根據(jù)key的hash值來決定進(jìn)入哪個(gè)partition,每個(gè)partition被一個(gè)reduce task處理,所以partition的個(gè)數(shù)等于reduce task的個(gè)數(shù) setReducerClass:設(shè)置Reducer,默認(rèn)為IdentityReducer setOutputFormat:設(shè)置任務(wù)的輸出格式,默認(rèn)為TextOutputFormat FileInputFormat.addInputPath:設(shè)置輸入文件的路徑,可以使一個(gè)文件,一個(gè)路徑,一個(gè)通配符??梢员徽{(diào)用多次添加多個(gè)路徑 FileOutputFormat.setOutputPath:設(shè)置輸出文件的路徑,在job運(yùn)行前此路徑不應(yīng)該存在
當(dāng)然不用所有的都設(shè)置,由上面的例子,可以編寫Map-Reduce程序如下:
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max temperature");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
3、Map-Reduce數(shù)據(jù)流(data flow)
Map-Reduce的處理過程主要涉及以下四個(gè)部分:
客戶端Client:用于提交Map-reduce任務(wù)job JobTracker:協(xié)調(diào)整個(gè)job的運(yùn)行,其為一個(gè)Java進(jìn)程,其main class為JobTracker TaskTracker:運(yùn)行此job的task,處理input split,其為一個(gè)Java進(jìn)程,其main class為TaskTracker HDFS:hadoop分布式文件系統(tǒng),用于在各個(gè)進(jìn)程間共享Job相關(guān)的文件
3.1、任務(wù)提交
JobClient.runJob()創(chuàng)建一個(gè)新的JobClient實(shí)例,調(diào)用其submitJob()函數(shù)。
向JobTracker請(qǐng)求一個(gè)新的job ID 檢測(cè)此job的output配置 計(jì)算此job的input splits 將Job運(yùn)行所需的資源拷貝到JobTracker的文件系統(tǒng)中的文件夾中,包括job jar文件,job.xml配置文件,input splits 通知JobTracker此Job已經(jīng)可以運(yùn)行了
提交任務(wù)后,runJob每隔一秒鐘輪詢一次job的進(jìn)度,將進(jìn)度返回到命令行,直到任務(wù)運(yùn)行完畢。
3.2、任務(wù)初始化
當(dāng)JobTracker收到submitJob調(diào)用的時(shí)候,將此任務(wù)放到一個(gè)隊(duì)列中,job調(diào)度器將從隊(duì)列中獲取任務(wù)并初始化任務(wù)。
初始化首先創(chuàng)建一個(gè)對(duì)象來封裝job運(yùn)行的tasks, status以及progress。
在創(chuàng)建task之前,job調(diào)度器首先從共享文件系統(tǒng)中獲得JobClient計(jì)算出的input splits。
其為每個(gè)input split創(chuàng)建一個(gè)map task。
每個(gè)task被分配一個(gè)ID。
3.3、任務(wù)分配
TaskTracker周期性的向JobTracker發(fā)送heartbeat。
在heartbeat中,TaskTracker告知JobTracker其已經(jīng)準(zhǔn)備運(yùn)行一個(gè)新的task,JobTracker將分配給其一個(gè)task。
在JobTracker為TaskTracker選擇一個(gè)task之前,JobTracker必須首先按照優(yōu)先級(jí)選擇一個(gè)Job,在最高優(yōu)先級(jí)的Job中選擇一個(gè)task。
TaskTracker有固定數(shù)量的位置來運(yùn)行map task或者reduce task。
默認(rèn)的調(diào)度器對(duì)待map task優(yōu)先于reduce task
當(dāng)選擇reduce task的時(shí)候,JobTracker并不在多個(gè)task之間進(jìn)行選擇,而是直接取下一個(gè),因?yàn)閞educe task沒有數(shù)據(jù)本地化的概念。
3.4、任務(wù)執(zhí)行
TaskTracker被分配了一個(gè)task,下面便要運(yùn)行此task。
首先,TaskTracker將此job的jar從共享文件系統(tǒng)中拷貝到TaskTracker的文件系統(tǒng)中。
TaskTracker從distributed cache中將job運(yùn)行所需要的文件拷貝到本地磁盤。
其次,其為每個(gè)task創(chuàng)建一個(gè)本地的工作目錄,將jar解壓縮到文件目錄中。
其三,其創(chuàng)建一個(gè)TaskRunner來運(yùn)行task。
TaskRunner創(chuàng)建一個(gè)新的JVM來運(yùn)行task。
被創(chuàng)建的child JVM和TaskTracker通信來報(bào)告運(yùn)行進(jìn)度。
3.4.1、Map的過程
MapRunnable從input split中讀取一個(gè)個(gè)的record,然后依次調(diào)用Mapper的map函數(shù),將結(jié)果輸出。
map的輸出并不是直接寫入硬盤,而是將其寫入緩存memory buffer。
當(dāng)buffer中數(shù)據(jù)的到達(dá)一定的大小,一個(gè)背景線程將數(shù)據(jù)開始寫入硬盤。
在寫入硬盤之前,內(nèi)存中的數(shù)據(jù)通過partitioner分成多個(gè)partition。
在同一個(gè)partition中,背景線程會(huì)將數(shù)據(jù)按照key在內(nèi)存中排序。
每次從內(nèi)存向硬盤flush數(shù)據(jù),都生成一個(gè)新的spill文件。
當(dāng)此task結(jié)束之前,所有的spill文件被合并為一個(gè)整的被partition的而且排好序的文件。
reducer可以通過http協(xié)議請(qǐng)求map的輸出文件,tracker.http.threads可以設(shè)置http服務(wù)線程數(shù)。
3.4.2、Reduce的過程
當(dāng)map task結(jié)束后,其通知TaskTracker,TaskTracker通知JobTracker。
對(duì)于一個(gè)job,JobTracker知道TaskTracer和map輸出的對(duì)應(yīng)關(guān)系。
reducer中一個(gè)線程周期性的向JobTracker請(qǐng)求map輸出的位置,直到其取得了所有的map輸出。
reduce task需要其對(duì)應(yīng)的partition的所有的map輸出。
reduce task中的copy過程即當(dāng)每個(gè)map task結(jié)束的時(shí)候就開始拷貝輸出,因?yàn)椴煌膍ap task完成時(shí)間不同。
reduce task中有多個(gè)copy線程,可以并行拷貝map輸出。
當(dāng)很多map輸出拷貝到reduce task后,一個(gè)背景線程將其合并為一個(gè)大的排好序的文件。
當(dāng)所有的map輸出都拷貝到reduce task后,進(jìn)入sort過程,將所有的map輸出合并為大的排好序的文件。
最后進(jìn)入reduce過程,調(diào)用reducer的reduce函數(shù),處理排好序的輸出的每個(gè)key,最后的結(jié)果寫入HDFS。
3.5、任務(wù)結(jié)束
當(dāng)JobTracker獲得最后一個(gè)task的運(yùn)行成功的報(bào)告后,將job得狀態(tài)改為成功。
當(dāng)JobClient從JobTracker輪詢的時(shí)候,發(fā)現(xiàn)此job已經(jīng)成功結(jié)束,則向用戶打印消息,從runJob函數(shù)中返回。
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Hadoop | In Programming We Trust
hadoop中mapreduce部分執(zhí)行流程
HBase/Hadoop學(xué)習(xí)筆記
mapreduce源碼分析作業(yè)分配過程
Hadoop之大數(shù)據(jù)平臺(tái)基礎(chǔ)(2)
Hadoop優(yōu)化
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服