* 生成數(shù)據(jù) SparkSQLDataManually.java */package com.tom.spark.SparkApps.sql;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Random;/** * 論壇數(shù)據(jù)自動生成代碼,數(shù)據(jù)格式如下: * data:日期,格式為yyyy-MM-dd * timestamp:時間戳 * userID:用戶ID * pageID:頁面ID * channelID:板塊ID * action:點擊和注冊 */public class SparkSQLDataManually { static String yesterday = yesterday(); static String[] channelNames = new String[] { "Spark", "Scala", "Kafka", "Flink", "Hadoop", "Storm", "Hive", "Impala", "HBase", "ML" }; static String[] actionNames = new String[] { "register","view" }; public static void main(String[] args) { /** * 通過傳遞進來的參數(shù)生成制定大小規(guī)模的數(shù)據(jù) */ long numberItems = 5000; String path = "."; if (args.length > 0) { numberItems = Integer.valueOf(args[0]); path = args[1]; System.out.println(path); } System.out.println("User log number is : " + numberItems); //具體的論壇頻道 /** * 昨天的時間生成 */ userlogs(numberItems, path); } private static void userlogs(long numberItems, String path) { // TODO Auto-generated method stub Random random = new Random(); StringBuffer userLogBuffer = new StringBuffer(""); int[] unregisteredUsers = new int[]{1,2,3,4,5,6,7,8}; for(int i = 0; i < numberItems; i++) { long timestamp = new Date().getTime(); Long userID = 0L; long pageID = 0; //隨機生成的用戶ID if(unregisteredUsers[random.nextInt(8)] == 1) { userID = null; } else { userID = (long) random.nextInt((int) numberItems); } //隨機生成的頁面ID pageID = random.nextInt((int) numberItems); //隨機生成Channel String channel = channelNames[random.nextInt(10)]; //隨機生成acton行為 String action = actionNames[random.nextInt(2)]; userLogBuffer.append(yesterday) .append("\t") .append(timestamp) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(channel) .append("\t") .append(action) .append("\n"); }// System.out.print(userLogBuffer); PrintWriter pw = null; try { pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path + "\\userlog.log"))); System.out.println(path + "userlog.log"); pw.write(userLogBuffer.toString()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { pw.close(); } } private static String yesterday() { SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd"); Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DATE, -1); Date yesterday = cal.getTime(); return date.format(yesterday); }}
/** * 計算PV、UV、熱門板塊、跳出率、新用戶注冊比率 */package com.tom.spark.SparkApps.sql;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.hive.HiveContext;/** * Table in hive database creation: * sqlContext.sql("create table userlogs(date string, timestamp bigint, userID bigint, pageID bigint, channel string, action string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'") * */public class SparkSQLUserlogsOps { /** * @param args */ public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SparkSQLUserlogsOps").setMaster("spark://Master:7077"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); HiveContext hiveContext = new HiveContext(sc); String yesterday = getYesterday(); pvStat(hiveContext, yesterday); //PV uvStat(hiveContext, yesterday); //UV hotChannel(hiveContext, yesterday); //熱門板塊 jumpOutStat(hiveContext, yesterday); //跳出率 newUserRegisterPercentStat(hiveContext, yesterday); //新用戶注冊的比例 } private static void newUserRegisterPercentStat(HiveContext hiveContext, String yesterday) { // TODO Auto-generated method stub hiveContext.sql("use hive"); String newUserSQL = "select count(*) " + "from userlogs " + "where action = 'View' and date='"+ yesterday+"' and userID is NULL "// + "limit 10" ; String RegisterUserSQL = "SELECT count(*) " + "from userlogs" + "where action = 'Register' and date='"+ yesterday+"' "// + "limit 10" ; Object newUser = hiveContext.sql(newUserSQL).collect()[0].get(0); Object RegisterUser = hiveContext.sql(RegisterUserSQL).collect()[0].get(0); double total = Double.valueOf(newUser.toString()); double register = Double.valueOf(RegisterUser.toString()); System.out.println("模擬新用戶注冊比例:" + register / total); } private static void jumpOutStat(HiveContext hiveContext, String yesterday) { // TODO Auto-generated method stub hiveContext.sql("use hive"); String totalPvSQL = "select count(*) " + "from " + "userlogs " + "where action = 'View' and date='"+ yesterday+"' "// + "limit 10" ; String pv2OneSQL = "SELECT count(*) " + "from " + "(SELECT count(*) totalNumber from userlogs " + "where action = 'View' and date='"+ yesterday+"' " + "group by userID " + "having totalNumber = 1) subquery "// + "limit 10" ; Object totalPv = hiveContext.sql(totalPvSQL).collect()[0].get(0); Object pv2One = hiveContext.sql(pv2OneSQL).collect()[0].get(0); double total = Double.valueOf(totalPv.toString()); double pv21 = Double.valueOf(pv2One.toString()); System.out.println("跳出率為" + pv21 / total); } private static void uvStat(HiveContext hiveContext, String yesterday) { // TODO Auto-generated method stub hiveContext.sql("use hive"); String sqlText = "select date, pageID, uv " + "from " + "(select date, pageID, count(distinct(userID)) uv from userlogs " + "where action = 'View' and date='"+ yesterday+"' " + "group by date, pageID) subquery " + "order by uv desc "// + "limit 10" ; hiveContext.sql(sqlText).show(); } private static void hotChannel(HiveContext hiveContext, String yesterday) { // TODO Auto-generated method stub hiveContext.sql("use hive"); String sqlText = "select date, pageID, pv " + "from " + "(select date, pageID, count(1) pv from userlogs " + "where action = 'View' and date='"+ yesterday+"' " + "group by date, pageID) subquery " + "order by pv desc "// + "limit 10" ; hiveContext.sql(sqlText).show(); } private static void pvStat(HiveContext hiveContext, String yesterday) { // TODO Auto-generated method stub hiveContext.sql("use hive"); String sqlText = "select date, channel, channelpv " + "from " + "(select date, channel, count(*) channelpv from userlogs " + "where action = 'View' and date='"+ yesterday+"' " + "group by date, channel) subquery " + "order by channelpv desc "// + "limit 10" ; hiveContext.sql(sqlText).show(); //把執(zhí)行結(jié)果放到數(shù)據(jù)庫或Hive中 //select date, pageID, pv from (select date, pageID, count(1) pv from userlogs where action = 'View' and //date='2017-03-10' group by date, pageID) subquery order by pv desc limit 10 } private static String getYesterday() { SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd"); Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DATE, -2); Date yesterday = cal.getTime(); return date.format(yesterday); }}