Spark之离线统计热点城市信息
时间:2022-07-23
本文章向大家介绍Spark之离线统计热点城市信息,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
一、需求分析
通过网站或者电信运营商采集的日志信息,以及全国各个城市IP段信息,来判断用户的IP段,统计用户集中的地点信息。通过分析得到的数据,进行业务部门进行热点城市图绘制的数据支持。
二、数据准备
1. 城市IP段信息
// IP起始段 | IP结束段 | IP开始数字 | IP结束数字| | 经纬度信息
1.86.64.0|1.86.95.255|22429696|22437887|亚洲|中国|陕西|延安||电信|610600|China|CN|109.49081|36.596537
1.86.96.0|1.86.111.255|22437888|22441983|亚洲|中国|陕西|铜川||电信|610200|China|CN|108.963122|34.90892
1.86.112.0|1.86.127.255|22441984|22446079|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.86.192.0|1.86.223.255|22462464|22470655|亚洲|中国|陕西|安康||电信|610900|China|CN|109.029273|32.6903
1.87.152.0|1.87.175.255|22517760|22523903|亚洲|中国|陕西|西安||电信|610100|China|CN|108.948024|34.263161
1.87.176.0|1.87.191.255|22523904|22527999|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.88.0.0|1.91.255.255|22544384|22806527|亚洲|中国|北京|北京||歌华有线|110100|China|CN|116.405285|39.904989
1.92.0.0|1.93.255.255|22806528|22937599|亚洲|中国|北京|北京||鹏博士|110100|China|CN|116.405285|39.904989
······
2. 采集的IP相关日志信息
// 我们主要关心IP信息,如115.120.36.118,对应的通常是基站数据
20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,30
20090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html
20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|
20090121000133446262000|115.120.12.157|v.ifeng.com|/live/|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)|http://www.ifeng.com/|userid=1232466610953_4339; location=186; sclocationid=10002; vjuids=22644b162.11ef4bc1624.0.63ad06717b426; vjlast=1232466614,1232467297,13
20090121000133456256000|115.120.7.240|cqbbs.soufun.com|/3110502342~-1~2118/23004348_23004348.htm|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)||new_historysignlist=%u534E%u6DA6%u4E8C%u5341%u56DB%u57CE%7Chttp%3A//cqbbs.soufun.com/board/3110502342/%7C%7C%u9A8F%u9038%u7B2C%u4E00%u6C5F%u5CB8%7Chttp%3A//cqbbs.soufun.com/board/3110169184/%7C%7C%u793E%u533A%u4E4B%u661F%7Chttp%3A//cqbbs.soufun.com/board/sqzx/%7C%7C; SoufunSessionID=2y5xyr45kslc0zbdooqnoo55; viewUser=1; vjuids=-870e9088.11ee89aba57.0.be9c3d988def8; vjlast=1232263101,1232380806,11; new_viewtype=1; articlecolor=#000000; usersms_pop_type=1; articlecount=186; __utma=101868291.755195653.1232450942.1232450942.1232450942.1; __utmz=101868291.1232450942.1.1.utmccn=(referral)
······
三、代码实现
1.主程序
/**
* @Author bigdatalearnshare
*/
object IpLocation {
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.appName("test")
.master("local[*]")
.getOrCreate()
val ipInfos = sparkSession.read.textFile("/bigdatalearnshare/location/ip.txt").rdd
.map(_.split("\|"))
.map(info => (info(2), info(3), s"${info(6)}-${info(7)}-${info(9)}-${info(13)}-${info(14)}"))
.collect()
val ipBroadcast = sparkSession.sparkContext.broadcast(ipInfos)
// format-http存储采集的IP日志信息
val locationIp = sparkSession.read.textFile("/bigdatalearnshare/location/format-http").rdd
.map(_.split("\|"))
.map(_ (1))
.mapPartitions { iter =>
val ipInfoVal = ipBroadcast.value
iter.map { ip =>
val ipNum = ip2Num(ip)
val index = binarySearch(ipInfoVal, ipNum)
(ipInfoVal(index)._3, ip)
}
}
locationIp.map(x => (x._1, 1)).reduceByKey(_ + _)
.foreachPartition(saveData2MySQL(_))
sparkSession.stop()
}
}
2.首先我们要将IP地址转换为long型数字,目的是方便确定用户日志信息中的IP所处的IP段
/** 将ip转换成Long数字. 如1.86.64.0 => 22429696 */
def ip2Num(ip: String): Long = {
var ipNum = 0L
ip.split("[.]")
.foreach { i =>
ipNum = i.toLong | ipNum << 8L
}
ipNum
}
3.日志的IP对应的IP段广播信息进行比较,采用二分查找法确定
/** ip段中, ip是有序的, 所以采用二分查找法 */
def binarySearch(ipInfos: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = ipInfos.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= ipInfos(middle)._1.toLong) && (ip <= ipInfos(middle)._2.toLong))
return middle
if (ip < ipInfos(middle)._1.toLong) high = middle - 1 else low = middle + 1
}
-1
}
4.将统计结果落库到mysql
def saveData2MySQL(iterator: Iterator[(String, Int)], batchSize: Int = 1000): Unit = {
var conn: Connection = null
var pst: PreparedStatement = null
val sql = "INSERT INTO location_info (location, ip_count) VALUES (?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdatalearnshare", "root", "root")
// 生产中为了安全起见, 应该首先通过Connection的API获取metadata, 判断事务的支持情况, 失败情况下进行回滚. 这里为了方便, 不在做相应判断
conn.setAutoCommit(false)
pst = conn.prepareStatement(sql)
var rowCount = 0
iterator.foreach { case (location, count) =>
pst.setString(1, location)
pst.setInt(2, count)
pst.addBatch()
rowCount += 1
if (rowCount % batchSize == 0) {
pst.executeBatch()
conn.commit()
rowCount = 0
}
}
if (rowCount > 0) {
pst.executeBatch()
conn.commit()
}
} catch {
case e: Exception => println(e)
} finally {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}
四、统计结果示例
(河北-石家庄-卫视创捷-114.502461-38.045474,383)
(云南-昆明-鹏博士-102.712251-25.040609,126)
(重庆-重庆-铁通-106.56347-29.52311,3)
(重庆-重庆-铁通-107.39007-29.70292,47)
(重庆-重庆-铁通-106.51107-29.50197,91)
(重庆-重庆-铁通-106.504962-29.533155,400)
(北京-北京-鹏博士-116.405285-39.904989,1535)
(重庆-重庆-铁通-106.57434-29.60658,177)
......
- Go 语言实现的网络连接池:Pool
- go语言操作redis连接池的方法
- WebVR如此近 - three.js的WebVR示例程序解析
- 【Dev Club分享】基于RxJava的一种MVP实现
- Android 动态链接库加载原理及 HotFix 方案介绍
- 如果裸写一个goroutine pool
- 【Dev Club 分享】微信 iOS SQLite 源码优化实践
- 移动客户端中高效使用 SQLite
- 【Dev Club 分享】微信热补丁 Tinker 的实践演进之路
- Android 进程保活招式大全
- 【Dev Club 分享】H5 视频直播那些事
- Android Patch 方案与持续交付
- Linux系统下MongoDB的简单安装与基本操作
- Go语言同步(Synchronization)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- php中isset与empty函数的困惑与用法分析
- 布隆过滤器(bloom filter)及php和redis实现布隆过滤器的方法
- PHP使用反向Ajax技术实现在线客服系统详解
- PHP设计模式之适配器模式(Adapter)原理与用法详解
- laravel5.5框架的上传图片功能实例分析【仅传到服务器端】
- Laravel5.1框架自带权限控制系统 ACL用法分析
- php使用filter_var函数判断邮箱,url,ip格式示例
- Python json格式化打印实现过程解析
- PHP使用DOM对XML解析处理操作示例
- OpenCV Python实现图像指定区域裁剪
- Laravel框架实现多个视图共享相同数据的方法详解
- python如何快速生成时间戳
- 从python读取sql的实例方法
- PHP+iframe模拟Ajax上传文件功能示例
- Centos7 Yum安装PHP7.2流程教程详解