Spark Core项目实战 | 页面单跳转化率统计

时间:2022-07-24
本文章向大家介绍Spark Core项目实战 | 页面单跳转化率统计,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

目录

  • 一.页面单跳转化率统计
    • 需求简介
    • 思路分析
    • 二.具体实现
    • 具体业务实现
    • 完整项目代码

一.页面单跳转化率统计

需求简介

  • 计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率
  • 比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率.
  • 产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
  • 数据分析师,可以此数据做更深一步的计算和分析。
  • 企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。
  • 在该模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应得 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。

需要注意的一点是,页面的访问时有先后的,要做好排序。

思路分析

  1. 读取到规定的页面
  2. 过滤出来规定页面的日志记录, 并统计出来每个页面的访问次数 countByKey 是行动算子 reduceByKey 是转换算子
  3. 明确哪些页面需要计算跳转次数 1-2, 2-3, 3-4 …
  4. 按照 session 统计所有页面的跳转次数, 并且需要按照时间升序来排序
  5. 按照 session 分组, 然后并对每组内的 UserVisitAction 进行排序
  6. 转换访问流水
  7. 过滤出来和统计目标一致的跳转
  8. 统计跳转次数
  9. 计算跳转率

二.具体实现

具体业务实现

import java.text.DecimalFormat

import bean.UserVisitAction
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object PageConversion {
  def statPageConversionRate(sc:SparkContext,
                             UserVisitActionRDD: RDD[UserVisitAction],
                             pageString:String): Unit ={

    //1.做出来目标跳转流
    val pages = pageString.split(",")
    val prePages = pages.take(pages.length-1)
    val postPages = pages.takeRight(pages.length-1)
    //结果为List(1->2, 2->3, 3->4, 4->5, 5->6, 6->7)
    val targetPageFlows = prePages.zip(postPages).map {
      case (pre, post) => s"$pre->$post"
    }
    //1.1把targetpages做广播变量,优化性能
    val targetPageFlowBC = sc.broadcast(targetPageFlows)
    //2.计算分母,计算需要页面的点击量 Map(5 -> 3563, 1 -> 3640, 6 -> 3593, 2 -> 3559, 3 -> 3672, 4 -> 3602)
    val pageAndCount = UserVisitActionRDD.filter(action => {
      prePages.contains(action.page_id.toString)
    })
      .map(action => (action.page_id, 1))
      .countByKey()

    //3.计算分子
    //3.1 按照sessionId分组,不能先对需要的页面做过滤,否则会应用调整的逻辑
    val sessionGrouped: RDD[(String, Iterable[UserVisitAction])] = UserVisitActionRDD.groupBy(_.session_id)
    var pageFlowsRDD = sessionGrouped.flatMap {
      case (sid, actionit) =>
        //把每个session的行为做一个时间排序
        val actions: List[UserVisitAction] = actionit.toList.sortBy(_.action_time)
        val preActions = actions.take(actions.length - 1)
        val postActions = actions.takeRight(actions.length - 1)

        preActions.zip(postActions).map {
          case (preAction, postAction) => s"${preAction.page_id}->${postAction.page_id}"
        }.filter(flow => targetPageFlowBC.value.contains(flow)) //使用广播变量
    }

    //3.2聚合
    val pageFlowAndCount: collection.Map[String, Long] = pageFlowsRDD.map((_, 1)).countByKey()

    val f = new DecimalFormat(".00%")
    //4.计算跳转率
    val result: collection.Map[String, Any] = pageFlowAndCount.map {
          //pageAndCount分母
          //1->2  count/1的点击量
          case (flow, count) =>
            val rate = count.toDouble / pageAndCount(flow.split("->")(0).toLong)
            (flow,f.format(rate).toString)
        }
    println(result)

  }
/*
1,2,4,5,4,7  计算他们的跳转率
1.想办法做出来跳转流
      1->2,  2->3   3->4
2.计算跳转率
     1->2跳转率
     分子
         1->2跳转流的个数
            如何计算?
                1.保证是同一个session才能计算,其实就是按照session进行分组

                2.按照时间排序

                3.RDD["1->2","1->2"."2->3"]  map() reduceByKey
                  RDD[UserVisitAction]  map
                  RDD[1,2,3,4,5,6,7]
                  如果做跳转流
                  rdd1= RDD[1,2,3,4,5,6]
                  rdd2= RDD[2,3,4,5,6,7]
                  rdd3 = rdd1.zip(zip).map(...)
                  过滤出来目标跳转流,然后再聚合

     分母
        页面:1.的点击数
 */

完整项目代码

import bean.UserVisitAction
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("project")
    val sc = new SparkContext(conf)

    //从数据把文件读出
    val sourceRDD = sc.textFile("D:\idea\spark-knight1\input\user_visit_action.txt")

    //把数据封装号(封装到样例类中)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
      val fields = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })

    //需求3
  PageConversion.statPageConversionRate(sc,userVisitActionRDD,"1,2,3,4,5,6,7")
    //关闭项目(sc)
    sc.stop()
  }

}

Scala中DecimalFomat的使用如下:

object Test1 {
  def main(args: Array[String]): Unit = {
    val pi = 3.1415927
    //取一位整数
    println(new DecimalFormat("0").format(pi))  //3
    //取一位整数和两位小数
    println(new DecimalFormat("0.00").format(pi))  //3.14
    //取两位整数和三位小数,整数不足部分以0填补
    println(new DecimalFormat("00.000").format(pi))  //03.142
    //取所有整数部分
    println(new DecimalFormat("#").format(pi))  //3
    //以百分比方式计数,并取两位小数
    println(new DecimalFormat("#.##%").format(pi))  //314.16%
    //不取小数
    println(new DecimalFormat("#%").format(pi))   //314%
    val speed = 299792458
    //显示为科学计数法,并取五位小数
    println(new DecimalFormat("#.#####E0").format(speed))  //2.99792E8
    //显示为两位整数的科学计数法,并取四位小数
    println(new DecimalFormat("#.####E0").format(speed))   //2.9979E8
    //每三位以逗号进行分隔
    println(new DecimalFormat(",###").format(speed))   //299,792,458
    //将格式嵌入文本
    println(new DecimalFormat("速度#").format(speed))  //速度#
  }
}

版权声明:

本文为《暴走大数据》整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶