將hdfs中的日志數(shù)據(jù)導(dǎo)入到hbase中。
打成jar包在服務(wù)器使用
hadoop jar xxx.jar 包名.類名
運(yùn)行
需要將hbase類庫加到hadoop的classpath中,在hadoop-env.sh文件中添加hbase類庫
export HADOOP_CLASSPATH=/usr/local/hbase/lib/* 即可
- package hbase.test;
-
- import java.io.IOException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
- import org.apache.hadoop.hbase.mapreduce.TableReducer;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-
- public class HbaseImport {
-
- //讀取hdfs中的數(shù)據(jù)源,解析并產(chǎn)生rowkey
- static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
- SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
- @Override
- protected void map(LongWritable key, Text value,
- Mapper<LongWritable, Text, LongWritable, Text>.Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- String[] split = line.split("\t");//拆分每一行日志為數(shù)組
- //將第一列的時(shí)間戳轉(zhuǎn)換格式
- String dateStr = format.format(new Date(Long.parseLong(split[0])));
- //將電話號(hào)碼和日期拼在一起作為導(dǎo)入到hbase的rowkey
- String rowKey = split[1] + ":" + dateStr;
- //將rowkey和原來的每一行內(nèi)容作為新產(chǎn)生行內(nèi)容
- Text v2 = new Text();
- v2.set(rowKey + "\t" + line);
- //還將原來的key作為和新產(chǎn)生的行輸出到reduce,也就是在原來每一行日志前面加上了自己生成的rowkey,其他沒變
- context.write(key, v2);
- }
- }
-
- //將map端傳過來的數(shù)據(jù)導(dǎo)入hbase
- static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
- public static final String COLUMN_FAMILY = "cf";
- public static final String COLUMN_NAME_RAW = "raw";
- public static final String COLUMN_NAME_REPORTTIME = "reportTime";
- public static final String COLUMN_NAME_MSISDN = "msisdn";
- public static final String COLUMN_NAME_APMAC = "apmac";
- public static final String COLUMN_NAME_ACMAC = "acmac";
- public static final String COLUMN_NAME_HOST = "host";
- public static final String COLUMN_NAME_SITETYPE = "siteType";
- public static final String COLUMN_NAME_UPPACKNUM = "upPackNum";
- public static final String COLUMN_NAME_DOWNPACKNUM = "downPackNum";
- public static final String COLUMN_NAME_UPPAYLOAD = "upPayLoad";
- public static final String COLUMN_NAME_DOWNPAYLOAD = "downPayLoad";
- public static final String COLUMN_NAME_HTTPSTATUS = "httpStatus";
- @Override
- protected void reduce(LongWritable k2, Iterable<Text> v2s,
- TableReducer<LongWritable, Text, NullWritable>.Context context)
- throws IOException, InterruptedException {
- for (Text v2 : v2s) {
- String[] split = v2.toString().split("\t");
- String rowKey = split[0];//此時(shí)數(shù)組第一位元素是rowkey
-
- Put put = new Put(rowKey.getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_RAW.getBytes(), v2.toString().getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_REPORTTIME.getBytes(), split[1].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_MSISDN.getBytes(), split[2].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_APMAC.getBytes(), split[3].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_ACMAC.getBytes(), split[4].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HOST.getBytes(), split[5].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_SITETYPE.getBytes(), split[6].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPACKNUM.getBytes(), split[7].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPACKNUM.getBytes(), split[8].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_UPPAYLOAD.getBytes(), split[9].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_DOWNPAYLOAD.getBytes(), split[10].getBytes());
- put.add(COLUMN_FAMILY.getBytes(), COLUMN_NAME_HTTPSTATUS.getBytes(), split[11].getBytes());
-
- context.write(NullWritable.get(), put);
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.rootdir", "hdfs://hadoop4:9000/hbase");
- conf.set("hbase.zookeeper.quorum", "hadoop4");
- conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//輸出到hbase的表名
- conf.set("dfs.socket.tomeout", "180000");
-
- Job job = new Job(conf,HbaseImport.class.getSimpleName());
- //當(dāng)打成jar包時(shí),必須有以下兩行代碼
- TableMapReduceUtil.addDependencyJars(job);
- job.setJarByClass(HbaseImport.class);
-
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TableOutputFormat.class);
-
- job.setMapperClass(BatchImportMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(Text.class);
-
- job.setReducerClass(BatchImportReducer.class);
-
- FileInputFormat.setInputPaths(job, "hdfs://hadoop4:9000/data/wlan");
-
- job.waitForCompletion(true);
- }
- }
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)
點(diǎn)擊舉報(bào)。