spark操作实例

时间:2020-04-11
本文章向大家介绍spark操作实例,主要包括spark操作实例使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

启动命令

./bin/spark-shell
./bin/spark-shell --master yarn-client    //在yarn上启动

操作实例1

val df=sql("select * from default.orders")

df.select("user_id").distinct.count()

//selectExpr里面可以用hive sql语句
df.selectExpr("max(cast(user_id as int))").show()

df.groupBy("order_dow").count().show()

cache和persist 将数据放入内存

val priors = spark.sql("select * from default.order_products_prior")
val df2 = df.join(priors,"order_id").cache
val df1 = df.groupBy("order_dow").count().cache()
df2.uppersist  //从内存中释放

操作实例2

import org.apache.spark.sql.SparkSession

object TestFunc {
  def main(args: Array[String]): Unit = {
//    实例化sparksession 在client端自动实例化sparksession
//    Spark session available as 'spark'.
    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[2]")
      .enableHiveSupport()
      .getOrCreate()

    val df = spark.sql("select * from badou.orders")
    val priors = spark.sql("select * from badou.order_products_prior")

    """
      |4.每个用户根据order_hour_of_day这列的值对order_dow进行排序
      |1  2 08
      |1  3 07
      |
      |1 [(2,08),(3,07)]
      |
      |=> 1 [(3,07),(2,08)] 一个用户最喜爱购买商品的top3
      |rdd: (user_id,(order_number,order_hour_of_day))   
    """.stripMargin

    import spark.implicits._
    val orderNumberSort = df.select("user_id","order_number","order_hour_of_day")
      .rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))    //DataFrame转RDD
      .groupByKey()
      .mapValues(_.toArray.sortWith(_._2<_._2).slice(0,2))
      .toDF("user_id","order_sort_by_hour")

//    udf
    import org.apache.spark.sql.functions._
    val plusUDF = udf((col1:String,col2:String)=>col1.toInt+col2.toInt)
    df.withColumn("plus",plusUDF(col("order_number"),col("order_dow"))).show()


  }
}

原文地址:https://www.cnblogs.com/xumaomao/p/12681326.html