PV UV TOPN

时间:2020-04-25
本文章向大家介绍PV UV TOPN,主要包括PV UV TOPN使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
 1 package com.day07
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 object PV {
 7   def main(args: Array[String]): Unit = {
 8     //todo:创建sparkconf,设置appName
 9     //todo:setMaster("local[2]")在本地模拟spark运行 这里的数字表示 使用2个线程
10     val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
11     //todo:创建SparkContext
12     val sc: SparkContext = new SparkContext(sparkConf)
13     //todo:读取数据
14     val file: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day07\\src\\main\\resources\\access.log")
15     //todo:将一行数据作为输入,输出("pv",1)
16     val pvAndOne: RDD[(String, Int)] = file.map(x => ("pv", 1))
17     //todo:聚合输出
18     val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_ + _)
19     totalPV.foreach(println)
20     sc.stop()
21   }
22 
23 }
 1 package com.day07
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 object UV {
 7   def main(args: Array[String]): Unit = {
 8     //todo:构建SparkConf和 SparkContext
 9     val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
10     val sc: SparkContext = new SparkContext(sparkConf)
11     //todo:读取数据
12     val file: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day07\\src\\main\\resources\\access.log")
13     //todo:对每一行分隔,获取IP地址
14     val ips: RDD[(String)] = file.map(_.split(" ")).map(x => x(0))
15     //todo:对ip地址进行去重,最后输出格式 ("UV",1)
16     val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x => ("UV", 1))
17     //todo:聚合输出
18     val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_ + _)
19     totalUV.foreach(println)
20     //todo:数据结果保存
21     totalUV.saveAsTextFile("C:\\Users\\Dell\\Desktop\\在线配置\\out")
22     sc.stop()
23   }
24 }
 1 package com.day07
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 object TopN {
 7   def main(args: Array[String]): Unit = {
 8     val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")
 9     val sc: SparkContext = new SparkContext(sparkConf)
10     sc.setLogLevel("WARN")
11     //读取数据
12     val file: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day07\\src\\main\\resources\\access.log")
13     //将一行数据作为输入,输出(来源URL,1)
14     val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length > 10).map(x => (x(10), 1))
15     //聚合 排序-->降序
16     val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_ + _).sortBy(_._2, false)
17     //通过take取topN,这里是取前5名
18     val finalResult: Array[(String, Int)] = result.take(5)
19     println(finalResult.toBuffer)
20 
21     sc.stop()
22 
23   }
24 
25 }

原文地址:https://www.cnblogs.com/xjqi/p/12774558.html