Spark Core项目实战 | Top10 热门品类

时间:2022-07-24
本文章向大家介绍Spark Core项目实战 | Top10 热门品类,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

目录

  • 一.数据准备
  • 二.Top10热门品类
    • 简介
    • 思路
  • 具体实现
    • 提前准备
    • 建立项目APP
    • 建立项目bean
    • 建立项目ACC
    • 完整项目代码

一.数据准备

本实战项目的数据是采集自电商的用户行为数据. 主要包含用户的 4 种行为: 搜索, 点击, 下单和支付. 数据格式如下, 不同的字段使用下划线分割开_:

数据说明:

  1. 数据采用_分割字段
  2. 每一行表示用户的一个行为, 所以每一行只能是四种行为中的一种.
  3. 如果搜索关键字是 null, 表示这次不是搜索
  4. 如果点击的品类 id 和产品 id 是 -1 表示这次不是点击
  5. 下单行为来说一次可以下单多个产品, 所以品类 id 和产品 id 都是多个, id 之间使用逗号,分割. 如果本次不是下单行为, 则他们相关数据用null来表示
  6. 支付行为和下单行为类似.

二.Top10热门品类

简介

品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.

思路

思路 1 分别统计每个品类 点击的次数下单的次数支付的次数. 缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常的耗时 思路 2 最好的办法应该是遍历一次能够计算出来上述的 3 个指标. 使用累加器可以达成我们的需求.

  1. 遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器
  2. 定义累加器
  3. 当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
  4. 遍历完成之后就得到每个每个品类 id 和操作类型的数量.
  5. 按照点击下单支付的顺序来排序
  6. 取出 Top10

具体实现

添加依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 打包插件, 否则 scala 类不会编译并打包进去 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

建立项目APP

测试能否读到数据(写一步,测试一步,养成良好的习惯)

import org.apache.spark.{SparkConf, SparkContext}

object ProjectApp {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("project")
        val sc = new SparkContext(conf)

        //从数据把文件读出
        val sourceRDD = sc.textFile("D:\idea\spark-knight\spark-core-project\input\user_visit_action.txt")

        //测试一下能否读取数据
        sourceRDD.collect().foreach(println)

        //关闭项目(sc)
        sc.stop()
      }
}

能够读到数据

建立项目bean

封装用户行为的bean类

/**
 * 用户访问动作表
 *
 * @param date               用户点击行为的日期
 * @param user_id            用户的ID
 * @param session_id         Session的ID
 * @param page_id            某个页面的ID
 * @param action_time        动作的时间点
 * @param search_keyword     用户搜索的关键词
 * @param click_category_id  某一个商品品类的ID
 * @param click_product_id   某一个商品的ID
 * @param order_category_ids 一次订单中所有品类的ID集合
 * @param order_product_ids  一次订单中所有商品的ID集合
 * @param pay_category_ids   一次支付中所有品类的ID集合
 * @param pay_product_ids    一次支付中所有商品的ID集合
 * @param city_id            城市 id
 */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long)
case class CategoryCountInfo(categoryId: String,
                             clickCount: Long,
                             orderCount: Long,
                             payCount: Long)

建立项目ACC

定义累加器

import bean.UserVisitAction
import org.apache.spark.util.AccumulatorV2

//in:UserVisitAction  out:Map[(品类,"click")->count] (品类,"order") ->    (品类,"pay"),->count
class CategoryAcc extends AccumulatorV2[UserVisitAction,Map[(String,String),Long]]{

  private var map = Map[(String, String), Long]()

  // 判断累加器是否为"零"
  override def isZero: Boolean = map.isEmpty

  // 复制累加器
  override def copy(): AccumulatorV2[UserVisitAction, Map[(String, String), Long]] = {
    val acc = new CategoryAcc
    acc.map = map
    acc
  }

  override def reset(): Unit = Map[(String, String), Long]()  // 不可变集合需要赋值个新的空集合

  //分区器累加
  override def add(v: UserVisitAction): Unit = {
    //分别计算3个指标
    // 对不同的行为做不同的处理  if语句 或 模式匹配
    v match {
      //点击行为
      case action if action.click_category_id != -1 =>
        val key = (action.click_category_id.toString, "click")
        // 这里其实是等价于 map = map + (.....)  不可变集合是给map赋值新的集合
        map += key -> (map.getOrElse(key, 0L) + 1L)


      // 下单行为  切出来的是字符串 "null", 不是空的null
      case action if action.order_category_ids != "null" =>
        // 切出来这次下单的多个品类
        val cIds: Array[String] = action.order_category_ids.split(",")
        cIds.foreach(cid => {
          val key: (String, String) = (cid, "order")
          map += key -> (map.getOrElse(key, 0L) + 1L)
        })

      // 支付行为
      case action if action.pay_category_ids != "null" =>
        val cIds: Array[String] = action.pay_category_ids.split(",")
        cIds.foreach(cid => {
          val key: (String, String) = (cid, "pay")
          map += key -> (map.getOrElse(key, 0L) + 1L)
        })
      // 其他非正常情况, 做任何处理
      case _ =>
    }
  }

  override def merge(other: AccumulatorV2[UserVisitAction, Map[(String, String), Long]]): Unit = {
    // 1.把other中的map合并到map中
    // 合并map
   /*other match {
      case o: CategoryAcc =>
        o.map.foreach {
          case (key, count) =>
            map += key -> (map.getOrElse(key, 0L) + count)
        }
      case _ =>
        throw new UnsupportedOperationException
    }*/
    // 2.对other的map进行折叠, 把结果都折叠到map中
    // 如果是可变map, 则所有的变化都是在原集合中发生变化, 最后的值可以不用再一次添加
    // 如果是不变map, 则计算的结果, 必须重新赋值给原的map变量
    map = other match {
      case o: CategoryAcc =>
        o.map.foldLeft(map) {
          // case出来的任何东西都不能改, 只能读
          case (map, (cidAction, count)) =>
            // 对不可变来说所以这是错的. !!!!!!!!!!!
            // map += cidAction -> (map.getOrElse(cidAction, 0L) + count)
            // 相当于 map = map + (....)
            //直接返回新的集合就可以了
            map + (cidAction -> (map.getOrElse(cidAction, 0L) + count))
        }
      case _ =>
        throw new UnsupportedOperationException
    }
    }


  override def value: Map[(String, String), Long] = map
}

完整项目代码

1.App补全

import bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("project")
    val sc = new SparkContext(conf)

    //从数据把文件读出
    val sourceRDD = sc.textFile("D:\idea\spark-knight\spark-core-project\input\user_visit_action.txt")

    //把数据封装号(封装到样例类中)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
      val fields = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })

   //需求1:
  CategoryTopApp.calcCategoryTop10(sc, userVisitActionRDD)

    //关闭项目(sc)
    sc.stop()
  }
}

2.计算Top10 热门品类的具体代码(在APP中创建)

import acc.CategoryAcc
import bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD


object CategoryTopApp {
    def calcCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]) = {

      // 使用累加器完成3个指标的累加:   点击 下单量 支付量
      val acc = new CategoryAcc
      sc.register(acc)
      userVisitActionRDD.foreach(action => acc.add(action))
      // 1. 把一个品类的三个指标封装到一个map中
      val cidActionCountGrouped: Map[String, Map[(String, String), Long]] = acc.value.groupBy(_._1._1)
      // 2. 把结果封装到样例类中
      val categoryCountInfoArray: Array[CategoryCountInfo] = cidActionCountGrouped.map {
        case (cid, map) =>
          CategoryCountInfo(cid,
            map.getOrElse((cid, "click"), 0L),
            map.getOrElse((cid, "order"), 0L),
            map.getOrElse((cid, "pay"), 0L)
          )
      }.toArray
      // 3. 对数据进行排序取top10
      val result: Array[CategoryCountInfo] = categoryCountInfoArray
        .sortBy(info => (-info.clickCount, -info.orderCount, -info.payCount))
        .take(10)

      result.foreach(println)

    }
}

3.运行结果

版权声明:

本文为《暴走大数据》整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶