免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
從hdfs批量導(dǎo)出數(shù)據(jù)到hbase表中

將hdfs中的日志數(shù)據(jù)導(dǎo)入到hbase中。

打成jar包在服務(wù)器使用

hadoop jar xxx.jar 包名.類名           

運(yùn)行

需要將hbase類庫加到hadoop的classpath中,在hadoop-env.sh文件中添加hbase類庫

export HADOOP_CLASSPATH=/usr/local/hbase/lib/* 即可

  1. package hbase.test;  
  2.   
  3. import java.io.IOException;  
  4. import java.text.SimpleDateFormat;  
  5. import java.util.Date;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.hbase.HBaseConfiguration;  
  9. import org.apache.hadoop.hbase.client.Put;  
  10. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  11. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
  12. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  13. import org.apache.hadoop.io.LongWritable;  
  14. import org.apache.hadoop.io.NullWritable;  
  15. import org.apache.hadoop.io.Text;  
  16. import org.apache.hadoop.mapreduce.Job;  
  17. import org.apache.hadoop.mapreduce.Mapper;  
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  20.   
  21.   
  22. public class HbaseImport {  
  23.       
  24.     //讀取hdfs中的數(shù)據(jù)源,解析并產(chǎn)生rowkey  
  25.     static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{  
  26.         SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");  
  27.         @Override  
  28.         protected void map(LongWritable key, Text value,  
  29.                 Mapper<LongWritable, Text, LongWritable, Text>.Context context)  
  30.                 throws IOException, InterruptedException {  
  31.             String line = value.toString();  
  32.             String[] split = line.split("\t");//拆分每一行日志為數(shù)組  
  33.             //將第一列的時(shí)間戳轉(zhuǎn)換格式  
  34.             String dateStr = format.format(new Date(Long.parseLong(split[0])));  
  35.             //將電話號(hào)碼和日期拼在一起作為導(dǎo)入到hbase的rowkey  
  36.             String rowKey = split[1] + ":" + dateStr;  
  37.             //將rowkey和原來的每一行內(nèi)容作為新產(chǎn)生行內(nèi)容  
  38.             Text v2 = new Text();  
  39.             v2.set(rowKey + "\t" + line);  
  40.             //還將原來的key作為和新產(chǎn)生的行輸出到reduce,也就是在原來每一行日志前面加上了自己生成的rowkey,其他沒變  
  41.             context.write(key, v2);  
  42.         }  
  43.     }  
  44.       
  45.     //將map端傳過來的數(shù)據(jù)導(dǎo)入hbase  
  46.     static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{  
  47.         public static final String COLUMN_FAMILY = "cf";  
  48.         public static final String COLUMN_NAME_RAW = "raw";  
  49.         public static final String COLUMN_NAME_REPORTTIME = "reportTime";  
  50.         public static final String COLUMN_NAME_MSISDN = "msisdn";  
  51.         public static final String COLUMN_NAME_APMAC = "apmac";  
  52.         public static final String COLUMN_NAME_ACMAC = "acmac";  
  53.         public static final String COLUMN_NAME_HOST = "host";  
  54.         public static final String COLUMN_NAME_SITETYPE = "siteType";  
  55.         public static final String COLUMN_NAME_UPPACKNUM = "upPackNum";  
  56.         public static final String COLUMN_NAME_DOWNPACKNUM = "downPackNum";  
  57.         public static final String COLUMN_NAME_UPPAYLOAD = "upPayLoad";  
  58.         public static final String COLUMN_NAME_DOWNPAYLOAD = "downPayLoad";  
  59.         public static final String COLUMN_NAME_HTTPSTATUS = "httpStatus";  
  60.         @Override  
  61.         protected void reduce(LongWritable k2, Iterable<Text> v2s,  
  62.                 TableReducer<LongWritable, Text, NullWritable>.Context context)  
  63.                 throws IOException, InterruptedException {  
  64.             for (Text v2 : v2s) {  
  65.                 String[] split = v2.toString().split("\t");  
  66.                 String rowKey = split[0];//此時(shí)數(shù)組第一位元素是rowkey  
  67.                   
  68.                 Put put = new Put(rowKey.getBytes());  
  69.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_RAW.getBytes(), v2.toString().getBytes());  
  70.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_REPORTTIME.getBytes(), split[1].getBytes());  
  71.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_MSISDN.getBytes(), split[2].getBytes());  
  72.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_APMAC.getBytes(), split[3].getBytes());  
  73.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_ACMAC.getBytes(), split[4].getBytes());  
  74.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HOST.getBytes(), split[5].getBytes());  
  75.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_SITETYPE.getBytes(), split[6].getBytes());  
  76.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPACKNUM.getBytes(), split[7].getBytes());  
  77.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPACKNUM.getBytes(), split[8].getBytes());  
  78.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPAYLOAD.getBytes(), split[9].getBytes());  
  79.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPAYLOAD.getBytes(), split[10].getBytes());  
  80.                 put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HTTPSTATUS.getBytes(), split[11].getBytes());  
  81.                   
  82.                 context.write(NullWritable.get(), put);  
  83.             }  
  84.         }  
  85.     }  
  86.       
  87.     public static void main(String[] args) throws Exception {  
  88.         Configuration conf = HBaseConfiguration.create();  
  89.         conf.set("hbase.rootdir", "hdfs://hadoop4:9000/hbase");  
  90.         conf.set("hbase.zookeeper.quorum", "hadoop4");  
  91.         conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//輸出到hbase的表名  
  92.         conf.set("dfs.socket.tomeout", "180000");  
  93.           
  94.         Job job = new Job(conf,HbaseImport.class.getSimpleName());  
  95.         //當(dāng)打成jar包時(shí),必須有以下兩行代碼  
  96.         TableMapReduceUtil.addDependencyJars(job);  
  97.         job.setJarByClass(HbaseImport.class);  
  98.           
  99.         job.setInputFormatClass(TextInputFormat.class);  
  100.         job.setOutputFormatClass(TableOutputFormat.class);  
  101.           
  102.         job.setMapperClass(BatchImportMapper.class);  
  103.         job.setMapOutputKeyClass(LongWritable.class);  
  104.         job.setMapOutputValueClass(Text.class);  
  105.           
  106.         job.setReducerClass(BatchImportReducer.class);  
  107.           
  108.         FileInputFormat.setInputPaths(job, "hdfs://hadoop4:9000/data/wlan");  
  109.           
  110.         job.waitForCompletion(true);  
  111.     }  
  112. }  


本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
kafka+storm+hbase架構(gòu)設(shè)計(jì)
MapReduce編程之通過MapReduce讀取數(shù)據(jù),往Hbase中寫數(shù)據(jù)
Java API 與HBase交互實(shí)例
HiveHbase集成實(shí)踐
hbase的coprocessor使用
【大數(shù)據(jù)計(jì)算】(二) HBase 的安裝和基礎(chǔ)編程
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服