免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書(shū)等14項(xiàng)超值服

開(kāi)通VIP
Hadoop MapReduce新舊API區(qū)別

新增的Java MapReduce API

Hadoop的版本0.20.0包含有一個(gè)新的 Java MapReduce API,有時(shí)也稱為"上下文對(duì)象"(context object),旨在使API在今后更容易擴(kuò)展。新的API 在類型上不兼容先前的API,所以,需要重寫以前的應(yīng)用程序才能使新的API發(fā)揮作用。

新增的API 和舊的API 之間,有下面幾個(gè)明顯的區(qū)別。

新的API 傾向于使用虛類,而不是接口,因?yàn)檫@更容易擴(kuò)展。例如,可以無(wú)需修改類的實(shí)現(xiàn)而在虛類中添加一個(gè)方法(即用默認(rèn)的實(shí)現(xiàn))。在新的API 中, mapper 和reducer現(xiàn)在都是虛類。

新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依舊放在org.apache.hadoop.mapred中。

新的API充分使用上下文對(duì)象,使用戶代碼能與MapReduce系統(tǒng)通信。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。

新的API 同時(shí)支持"推"(push)和"拉"(pull)式的迭代。這兩類API,均可以將鍵/值對(duì)記錄推給mapper,但除此之外,新的API 也允許把記錄從map()方法中拉出。對(duì)reducer來(lái)說(shuō)是一樣的。"拉"式處理數(shù)據(jù)的好處是可以實(shí)現(xiàn)數(shù)據(jù)的批量處理,而非逐條記錄地處理。

新增的API實(shí)現(xiàn)了配置的統(tǒng)一。舊API 通過(guò)一個(gè)特殊的JobConf 對(duì)象配置作業(yè),該對(duì)象是Hadoop配置對(duì)象的一個(gè)擴(kuò)展 (用于配置守護(hù)進(jìn)程,詳情請(qǐng)參見(jiàn)第130頁(yè)的"API配置"小節(jié))。在新的API 中,我們丟棄這種區(qū)分,所有作業(yè)的配置均通過(guò)Configuration 來(lái)完成。

新API中作業(yè)控制由Job類實(shí)現(xiàn),而非JobClient類,新API中刪除了JobClient類。

輸出文件的命名方式稍有不同。map的輸出文件名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序號(hào),為整數(shù),且從0開(kāi)始算)。

例2-6 顯示了使用新API 重寫的MaxTemperature應(yīng)用。不同之處已加粗顯示。

將舊API寫的Mapper和Reducer類轉(zhuǎn)換為新API時(shí),記住將map()和reduce()的簽名轉(zhuǎn)換為新形式。如果只是將類的繼承修改為對(duì)新的Mapper和Reducer類的繼承,編譯的時(shí)候也不會(huì)報(bào)錯(cuò)或顯示警告信息,因?yàn)樾碌腗apper和Reducer類同樣也提供了等價(jià)的map()和reduce()函數(shù)。但是,自己寫的mapper或reducer代碼是不會(huì)被調(diào)用的,這會(huì)導(dǎo)致難以診斷的錯(cuò)誤。

例2-6. 用新上下文對(duì)象MapReduce API重寫的MaxTemperature應(yīng)用

  1. public class NewMaxTemperature {    
  2.    
  3.     static class NewMaxTemperatureMapper    
  4.         extends Mapper<LongWritable, Text, Text, IntWritable> {    
  5.    
  6.             private static final int MISSING = 9999;    
  7.             public void map(LongWritable key, Text value,Context context)    
  8.             throws IOException, InterruptedException {                 
  9.        
  10.                 String line = value.toString();           
  11.                 String year = line.substring(1519);           
  12.                 int airTemperature;           
  13.                   
  14.             if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs            
  15.                     airTemperature = Integer.parseInt(line.substring(8892));           
  16.                 } else {             
  17.                     airTemperature = Integer.parseInt(line.substring(8792));           
  18.             }           
  19.                   
  20.             String quality = line.substring(9293);    
  21.                 if (airTemperature != MISSING && quality.matches("[01459]")) {    
  22.                     context.write(new Text(year), new IntWritable(airTemperature));           
  23.                 }         
  24.         }       
  25.     }       
  26.      
  27.     static class NewMaxTemperatureReducer    
  28.             extends Reducer<Text, IntWritable, Text, IntWritable> {           
  29.           
  30.         public void reduce(Text key, Iterable<IntWritable> values, Context context)             
  31.                 throws IOException, InterruptedException {                 
  32.    
  33.                 int maxValue = Integer.MIN_VALUE;           
  34.                 for (IntWritable value : values) {             
  35.                 maxValue = Math.max(maxValue, value.get());           
  36.                 }           
  37.                 context.write(key, new IntWritable(maxValue));         
  38.             }       
  39.     }    
  40.    
  41.     public static void main(String[] args) throws Exception {         
  42.         if (args.length != 2) {           
  43.             System.err.println("Usage: NewMaxTemperature  
  44.             <input path> <output path>");       
  45.             System.exit(-1);    
  46.         }             
  47.   
  48.         Job job = new Job();         
  49.         job.setJarByClass(NewMaxTemperature.class);    
  50.   
  51.         FileInputFormat.addInputPath(job, new Path(args[0]));    
  52.         FileOutputFormat.setOutputPath(job, new Path(args[1]));             
  53.   
  54.         job.setMapperClass(NewMaxTemperatureMapper.class);         
  55.         job.setReducerClass(NewMaxTemperatureReducer.class);    
  56.         job.setOutputKeyClass(Text.class);    
  57.         job.setOutputValueClass(IntWritable.class);             
  58.   
  59.         System.exit(job.waitForCompletion(true) ? 0 : 1);       
  60.     }    
  61.     }    

   原來(lái)MapReduce代碼可在《Hadoop權(quán)威指南》內(nèi)找到,大家可進(jìn)行對(duì)比。


  又一例子:Hadoop in Action中第四章:

  

  1. package com;  
  2.   
  3. import java.io.IOException;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.conf.Configured;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  16. import org.apache.hadoop.util.Tool;                   
  17. import org.apache.hadoop.util.ToolRunner;  
  18.   
  19.   
  20. public class tt extends Configured implements Tool {  
  21.       
  22.     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {  
  23.         public void map(LongWritable key, Text value, Context context)  
  24.             throws IOException, InterruptedException {  
  25.             //split的作用是將該字符串里面的變量賦值給citation這個(gè)字符串?dāng)?shù)組當(dāng)中。  
  26.             String[] citation = value.toString().split(",");  
  27.             //使用新的API取代了collect相關(guān)的API,將map中的key和value進(jìn)行了互換。  
  28.             context.write(new Text(citation[1]), new Text(citation[0]));    
  29.         }  
  30.     }  
  31.       
  32.     public static class Reduce extends Reducer<Text, Text, Text, Text> {  //前兩個(gè)參數(shù)設(shè)置是輸入?yún)?shù),后兩個(gè)參數(shù)是輸出參數(shù)。  
  33.           
  34.         public void reduce(Text key, Iterable<Text> values, Context context)  
  35.             throws IOException, InterruptedException {  
  36.             String csv ="";  
  37.               
  38.             //Text類型是類似于String類型的文本格式,但是在處理編碼上還是和String有差別,與內(nèi)存序列化有關(guān),是hadoop經(jīng)過(guò)封裝之后的新類。  
  39.             for (Text val:values) {  
  40.                 if (csv.length() > 0) csv += ",";  
  41.                 csv += val.toString();  
  42.             }  
  43.           
  44.             context.write(key, new Text(csv));  
  45.         }  
  46.     }  
  47.       
  48.     public int run(String[] args) throws Exception {  //由hadoop本身調(diào)用該程序  
  49.         Configuration conf = getConf();  
  50.         Job job = new Job(conf, "tt"); //利用job取代了jobclient  
  51.         job.setJarByClass(tt.class);  
  52.           
  53.         Path in = new Path(args[0]);  
  54.         Path out = new Path(args[1]);  
  55.         FileInputFormat.setInputPaths(job, in);  
  56.         FileOutputFormat.setOutputPath(job, out);  
  57.           
  58.         job.setMapperClass(MapClass.class);  
  59.         job.setReducerClass(Reduce.class);  
  60.         job.setInputFormatClass(TextInputFormat.class);  
  61.         job.setOutputFormatClass(TextOutputFormat.class);  
  62.         job.setOutputKeyClass(Text.class);  
  63.         job.setOutputValueClass(Text.class);  //此處如果不進(jìn)行設(shè)置,系統(tǒng)會(huì)拋出異常,還要記住新舊API不能混用  
  64.           
  65.         System.exit(job.waitForCompletion(true)?0:1);  
  66.         return 0;  
  67.     }  
  68.       
  69.     public static void main(String[] args) throws Exception {  
  70.         int res = ToolRunner.run(new Configuration(), new tt(), args);    //調(diào)用新的類的方法免除配置的相關(guān)瑣碎的細(xì)節(jié)  
  71.         System.exit(res);  
  72.     }  
  73. }  

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開(kāi)APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
多個(gè)mapreduce工作相互依賴處理方法完整實(shí)例(JobControl)
Hadoop學(xué)習(xí)之路(6)MapReduce自定義分區(qū)實(shí)現(xiàn)
MapReduce簡(jiǎn)單實(shí)例:wordcount--大數(shù)據(jù)紀(jì)錄片第五記
11-Hadoop Yarn資源調(diào)用框架、排序
Hadoop Map/Reduce編程模型實(shí)現(xiàn)海量數(shù)據(jù)處理—數(shù)字求和-Hadoop學(xué)習(xí) -...
【小白視角】大數(shù)據(jù)基礎(chǔ)實(shí)踐(五) MapReduce編程基礎(chǔ)操作
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服