新增的Java MapReduce API
Hadoop的版本0.20.0包含有一個(gè)新的 Java MapReduce API,有時(shí)也稱為"上下文對(duì)象"(context object),旨在使API在今后更容易擴(kuò)展。新的API 在類型上不兼容先前的API,所以,需要重寫以前的應(yīng)用程序才能使新的API發(fā)揮作用。
新增的API 和舊的API 之間,有下面幾個(gè)明顯的區(qū)別。
新的API 傾向于使用虛類,而不是接口,因?yàn)檫@更容易擴(kuò)展。例如,可以無(wú)需修改類的實(shí)現(xiàn)而在虛類中添加一個(gè)方法(即用默認(rèn)的實(shí)現(xiàn))。在新的API 中, mapper 和reducer現(xiàn)在都是虛類。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依舊放在org.apache.hadoop.mapred中。
新的API充分使用上下文對(duì)象,使用戶代碼能與MapReduce系統(tǒng)通信。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。
新的API 同時(shí)支持"推"(push)和"拉"(pull)式的迭代。這兩類API,均可以將鍵/值對(duì)記錄推給mapper,但除此之外,新的API 也允許把記錄從map()方法中拉出。對(duì)reducer來(lái)說(shuō)是一樣的。"拉"式處理數(shù)據(jù)的好處是可以實(shí)現(xiàn)數(shù)據(jù)的批量處理,而非逐條記錄地處理。
新增的API實(shí)現(xiàn)了配置的統(tǒng)一。舊API 通過(guò)一個(gè)特殊的JobConf 對(duì)象配置作業(yè),該對(duì)象是Hadoop配置對(duì)象的一個(gè)擴(kuò)展 (用于配置守護(hù)進(jìn)程,詳情請(qǐng)參見(jiàn)第130頁(yè)的"API配置"小節(jié))。在新的API 中,我們丟棄這種區(qū)分,所有作業(yè)的配置均通過(guò)Configuration 來(lái)完成。
新API中作業(yè)控制由Job類實(shí)現(xiàn),而非JobClient類,新API中刪除了JobClient類。
輸出文件的命名方式稍有不同。map的輸出文件名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序號(hào),為整數(shù),且從0開(kāi)始算)。
例2-6 顯示了使用新API 重寫的MaxTemperature應(yīng)用。不同之處已加粗顯示。
將舊API寫的Mapper和Reducer類轉(zhuǎn)換為新API時(shí),記住將map()和reduce()的簽名轉(zhuǎn)換為新形式。如果只是將類的繼承修改為對(duì)新的Mapper和Reducer類的繼承,編譯的時(shí)候也不會(huì)報(bào)錯(cuò)或顯示警告信息,因?yàn)樾碌腗apper和Reducer類同樣也提供了等價(jià)的map()和reduce()函數(shù)。但是,自己寫的mapper或reducer代碼是不會(huì)被調(diào)用的,這會(huì)導(dǎo)致難以診斷的錯(cuò)誤。
例2-6. 用新上下文對(duì)象MapReduce API重寫的MaxTemperature應(yīng)用
- public class NewMaxTemperature {
-
- static class NewMaxTemperatureMapper
- extends Mapper<LongWritable, Text, Text, IntWritable> {
-
- private static final int MISSING = 9999;
- public void map(LongWritable key, Text value,Context context)
- throws IOException, InterruptedException {
-
- String line = value.toString();
- String year = line.substring(15, 19);
- int airTemperature;
-
- if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
- airTemperature = Integer.parseInt(line.substring(88, 92));
- } else {
- airTemperature = Integer.parseInt(line.substring(87, 92));
- }
-
- String quality = line.substring(92, 93);
- if (airTemperature != MISSING && quality.matches("[01459]")) {
- context.write(new Text(year), new IntWritable(airTemperature));
- }
- }
- }
-
- static class NewMaxTemperatureReducer
- extends Reducer<Text, IntWritable, Text, IntWritable> {
-
- public void reduce(Text key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException {
-
- int maxValue = Integer.MIN_VALUE;
- for (IntWritable value : values) {
- maxValue = Math.max(maxValue, value.get());
- }
- context.write(key, new IntWritable(maxValue));
- }
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("Usage: NewMaxTemperature
- <input path> <output path>");
- System.exit(-1);
- }
-
- Job job = new Job();
- job.setJarByClass(NewMaxTemperature.class);
-
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- job.setMapperClass(NewMaxTemperatureMapper.class);
- job.setReducerClass(NewMaxTemperatureReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
原來(lái)MapReduce代碼可在《Hadoop權(quán)威指南》內(nèi)找到,大家可進(jìn)行對(duì)比。
又一例子:Hadoop in Action中第四章:
- package com;
-
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
-
- public class tt extends Configured implements Tool {
-
- public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- //split的作用是將該字符串里面的變量賦值給citation這個(gè)字符串?dāng)?shù)組當(dāng)中。
- String[] citation = value.toString().split(",");
- //使用新的API取代了collect相關(guān)的API,將map中的key和value進(jìn)行了互換。
- context.write(new Text(citation[1]), new Text(citation[0]));
- }
- }
-
- public static class Reduce extends Reducer<Text, Text, Text, Text> { //前兩個(gè)參數(shù)設(shè)置是輸入?yún)?shù),后兩個(gè)參數(shù)是輸出參數(shù)。
-
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
- String csv ="";
-
- //Text類型是類似于String類型的文本格式,但是在處理編碼上還是和String有差別,與內(nèi)存序列化有關(guān),是hadoop經(jīng)過(guò)封裝之后的新類。
- for (Text val:values) {
- if (csv.length() > 0) csv += ",";
- csv += val.toString();
- }
-
- context.write(key, new Text(csv));
- }
- }
-
- public int run(String[] args) throws Exception { //由hadoop本身調(diào)用該程序
- Configuration conf = getConf();
- Job job = new Job(conf, "tt"); //利用job取代了jobclient
- job.setJarByClass(tt.class);
-
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
-
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class); //此處如果不進(jìn)行設(shè)置,系統(tǒng)會(huì)拋出異常,還要記住新舊API不能混用
-
- System.exit(job.waitForCompletion(true)?0:1);
- return 0;
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new tt(), args); //調(diào)用新的類的方法免除配置的相關(guān)瑣碎的細(xì)節(jié)
- System.exit(res);
- }
- }
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)
點(diǎn)擊舉報(bào)。