Spark UDF小结

时间:2022-07-23
本文章向大家介绍Spark UDF小结,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前言

Spark UDF 增加了对 DS 数据结构的操作灵活性,但是使用不当会抵消Spark底层优化。

Spark UDF物理解释

文章1中提到 Spark UDF/UDAF/UDTF对数据的处理物理解释如下:

UDF =》一个输入一个输出。相当于map

UDAF =》多个输入一个输出。相当于reduce

UDTF =》一个输入多个输出。相当于flatMap

其中一个输入这种概念不好理解,而Spark3.0.0官方文档2说明了是对数据行进行操作,与数据列无关:

Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result. In addition, Hive also supports UDTFs (User Defined Tabular Functions) that act on one row as input and return multiple rows as output.

Spark UDF使用场景(排坑)

Spark UDF/UDAF/UDTF 可实现复杂的业务逻辑。但是,在Spark DS中,如列裁剪、谓词下推等底层自动优化无法穿透到UDF中,这就要求进入UDF内的数据尽可能有效。

以下的例子是由于误使用UDF导致的性能下降:

实现功能

筛选出搜索过特定词条的用户,并分析这些用户使用的app

数据schema

userDs的shema

DataFrame[appInputList: array<struct<inputList:array<struct<fwordSeg:array<string>,fwords:string,timestamp:bigint>>,packageName:string>>, citycode: int, date: int, useid: string]

代码实现(bad example)

filterRowQueryUdf 中匹配输入的query并裁剪出满足条件用户的app。本以为在UDF中做了裁剪,会减少数据量级。然后,忽略掉了输入的数据量较大,造成了性能瓶颈。

userDs.groupBy("userid").agg
 Dataset<Row> userFilterDs = userDs.groupBy("userid")
    .agg(collect_list(struct("date", "appInputList")).alias("date_appInputLists"))
    .selectExpr("userid", "filterRowQueryUdf(date_appInputLists) as date_package")
    .filter(col("date_package").isNotNull())

代码实现(优化后)

手动进行列裁剪,仅传入需要的字段到UDF。对于3TB的输入数据,计算耗时从30min降至7min.

Dataset<Row> userFilterDs = userDs
        .selectExpr("userid", "explode(appInputList) as appInputList")
        .selectExpr("userid", "explode(appInputList.inputList) as inputList")
        .selectExpr("userid", "inputList.fwords as fwords")
        .groupBy("userid")
        .agg(collect_list("fwords").alias("fwords"))
        //.filter("filterKeyWordUdf(fwords)")
        .filter("filterQueryWordsOneUdf(fwords)")
        .select("userid")
Dataset<Row> userAppDs = userDs
        .selectExpr("userid", "date", "explode(appInputList) as appInputList")
        .selectExpr("userid", "date", "appInputList.packageName as packageName")
        .join(userFilterDs, "userid")

小结

了解Spark DS自动进行的优化,让处理逻辑顺应自动优化的方向,小码农也会有春天。

参考文献

1 SparkSql中UDF、UDAF、UDTF https://www.cnblogs.com/wuxiaolong4/p/11924172.html

2 Integration with Hive UDFs/UDAFs/UDTFs https://spark.apache.org/docs/3.0.0/sql-ref-functions-udf-hive.html