Spark之离线统计热点城市信息

时间:2022-07-23
本文章向大家介绍Spark之离线统计热点城市信息,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一、需求分析

通过网站或者电信运营商采集的日志信息,以及全国各个城市IP段信息,来判断用户的IP段,统计用户集中的地点信息。通过分析得到的数据,进行业务部门进行热点城市图绘制的数据支持。

二、数据准备

1. 城市IP段信息

// IP起始段 | IP结束段 | IP开始数字 | IP结束数字|                                     | 经纬度信息
1.86.64.0|1.86.95.255|22429696|22437887|亚洲|中国|陕西|延安||电信|610600|China|CN|109.49081|36.596537
1.86.96.0|1.86.111.255|22437888|22441983|亚洲|中国|陕西|铜川||电信|610200|China|CN|108.963122|34.90892
1.86.112.0|1.86.127.255|22441984|22446079|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.86.192.0|1.86.223.255|22462464|22470655|亚洲|中国|陕西|安康||电信|610900|China|CN|109.029273|32.6903
1.87.152.0|1.87.175.255|22517760|22523903|亚洲|中国|陕西|西安||电信|610100|China|CN|108.948024|34.263161
1.87.176.0|1.87.191.255|22523904|22527999|亚洲|中国|陕西|咸阳||电信|610400|China|CN|108.705117|34.333439
1.88.0.0|1.91.255.255|22544384|22806527|亚洲|中国|北京|北京||歌华有线|110100|China|CN|116.405285|39.904989
1.92.0.0|1.93.255.255|22806528|22937599|亚洲|中国|北京|北京||鹏博士|110100|China|CN|116.405285|39.904989
······

2. 采集的IP相关日志信息

// 我们主要关心IP信息,如115.120.36.118,对应的通常是基站数据
20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,30
20090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html
20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|
20090121000133446262000|115.120.12.157|v.ifeng.com|/live/|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)|http://www.ifeng.com/|userid=1232466610953_4339; location=186; sclocationid=10002; vjuids=22644b162.11ef4bc1624.0.63ad06717b426; vjlast=1232466614,1232467297,13
20090121000133456256000|115.120.7.240|cqbbs.soufun.com|/3110502342~-1~2118/23004348_23004348.htm|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; CIBA)||new_historysignlist=%u534E%u6DA6%u4E8C%u5341%u56DB%u57CE%7Chttp%3A//cqbbs.soufun.com/board/3110502342/%7C%7C%u9A8F%u9038%u7B2C%u4E00%u6C5F%u5CB8%7Chttp%3A//cqbbs.soufun.com/board/3110169184/%7C%7C%u793E%u533A%u4E4B%u661F%7Chttp%3A//cqbbs.soufun.com/board/sqzx/%7C%7C; SoufunSessionID=2y5xyr45kslc0zbdooqnoo55; viewUser=1; vjuids=-870e9088.11ee89aba57.0.be9c3d988def8; vjlast=1232263101,1232380806,11; new_viewtype=1; articlecolor=#000000; usersms_pop_type=1; articlecount=186; __utma=101868291.755195653.1232450942.1232450942.1232450942.1; __utmz=101868291.1232450942.1.1.utmccn=(referral)
······

三、代码实现

1.主程序

/**
  * @Author bigdatalearnshare
  */
object IpLocation {

  def main(args: Array[String]) {

    val sparkSession = SparkSession
      .builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .appName("test")
      .master("local[*]")
      .getOrCreate()

    val ipInfos = sparkSession.read.textFile("/bigdatalearnshare/location/ip.txt").rdd
      .map(_.split("\|"))
      .map(info => (info(2), info(3), s"${info(6)}-${info(7)}-${info(9)}-${info(13)}-${info(14)}"))
      .collect()

    val ipBroadcast = sparkSession.sparkContext.broadcast(ipInfos)

    // format-http存储采集的IP日志信息
    val locationIp = sparkSession.read.textFile("/bigdatalearnshare/location/format-http").rdd
      .map(_.split("\|"))
      .map(_ (1))
      .mapPartitions { iter =>
        val ipInfoVal = ipBroadcast.value
        iter.map { ip =>
          val ipNum = ip2Num(ip)
          val index = binarySearch(ipInfoVal, ipNum)

          (ipInfoVal(index)._3, ip)
        }
      }

    locationIp.map(x => (x._1, 1)).reduceByKey(_ + _)
      .foreachPartition(saveData2MySQL(_))

    sparkSession.stop()
  }
}

2.首先我们要将IP地址转换为long型数字,目的是方便确定用户日志信息中的IP所处的IP段

/** 将ip转换成Long数字. 如1.86.64.0 => 22429696 */
def ip2Num(ip: String): Long = {
    var ipNum = 0L

    ip.split("[.]")
      .foreach { i =>
        ipNum = i.toLong | ipNum << 8L
      }
      
    ipNum
}

3.日志的IP对应的IP段广播信息进行比较,采用二分查找法确定

/** ip段中, ip是有序的, 所以采用二分查找法 */
  def binarySearch(ipInfos: Array[(String, String, String)], ip: Long): Int = {
    var low = 0
    var high = ipInfos.length - 1

    while (low <= high) {
      val middle = (low + high) / 2

      if ((ip >= ipInfos(middle)._1.toLong) && (ip <= ipInfos(middle)._2.toLong))
        return middle

      if (ip < ipInfos(middle)._1.toLong) high = middle - 1 else low = middle + 1
    }

    -1
 }

4.将统计结果落库到mysql

def saveData2MySQL(iterator: Iterator[(String, Int)], batchSize: Int = 1000): Unit = {
    var conn: Connection = null
    var pst: PreparedStatement = null
    val sql = "INSERT INTO location_info (location, ip_count) VALUES (?, ?)"
    try {
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdatalearnshare", "root", "root")

      // 生产中为了安全起见, 应该首先通过Connection的API获取metadata, 判断事务的支持情况, 失败情况下进行回滚. 这里为了方便, 不在做相应判断
      conn.setAutoCommit(false)
      pst = conn.prepareStatement(sql)

      var rowCount = 0
      iterator.foreach { case (location, count) =>
        pst.setString(1, location)
        pst.setInt(2, count)

        pst.addBatch()
        rowCount += 1

        if (rowCount % batchSize == 0) {
          pst.executeBatch()
          conn.commit()
          rowCount = 0
        }
      }

      if (rowCount > 0) {
        pst.executeBatch()
        conn.commit()
      }

    } catch {
      case e: Exception => println(e)
    } finally {
      if (pst != null) pst.close()
      if (conn != null) conn.close()
    }
 }

四、统计结果示例

(河北-石家庄-卫视创捷-114.502461-38.045474,383)
(云南-昆明-鹏博士-102.712251-25.040609,126)
(重庆-重庆-铁通-106.56347-29.52311,3)
(重庆-重庆-铁通-107.39007-29.70292,47)
(重庆-重庆-铁通-106.51107-29.50197,91)
(重庆-重庆-铁通-106.504962-29.533155,400)
(北京-北京-鹏博士-116.405285-39.904989,1535)
(重庆-重庆-铁通-106.57434-29.60658,177)
......