Spark UDF1 返回复杂结构

时间:2022-07-22
本文章向大家介绍Spark UDF1 返回复杂结构,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Spark UDF1 返回复杂结构

由java开发UDF1需指定返回值的DataType,spark-2.3.1暂不支持Array、Map这些复杂结构。因此,需要自定义DataType,满足定制化需求。以下以自定义Map结构的DataType为例进行说明。

自定义UDF1

UDF mapFilterUdf 返回Map结构

BoolFilterUdf.java

package com.sogo.getimei.udf;

import org.apache.spark.sql.api.java.UDF1;

import java.util.HashMap;
import java.util.Map;

/**
 * @Created by IntelliJ IDEA.
 * @author: MikeLiu
 * @Date: 2020/8/3
 * @Time: 22:40
 * @des:
 */
public class BoolFilterUdf {

    /**
     * 这里主要保存不变的映射数据,比如黑名单(目前使用这种方式向UDF传入字典等非DF的列)
     */
    public static Map<String, String> filterMap;

    /**
     * 返回值为Boolean类型
     */
    public static UDF1<String, Boolean> boolFilterUdf = new UDF1<String, Boolean>() {
        @Override
        public Boolean call(String s) throws Exception {
            return filterMap.containsKey(s);
        }
    };

    /**
     * 返回值为Map类型
     */
    public static UDF1<String, Map<String, String>> mapFilterUdf = new UDF1<String, Map<String, String>>() {
        @Override
        public Map<String, String> call(String s) throws Exception {
            return filterMap.containsKey(s) ? BoolFilterUdf.filterMap : null;
        }
    };

    /**
     * 构造函数,初始化配置字典 
     */
    public BoolFilterUdf() {
        BoolFilterUdf.filterMap = new HashMap<>();
    }

    /**
     * 构造函数,初始化配置字典 
     */
    public BoolFilterUdf(Map<String, String> filterMap) {
        BoolFilterUdf.filterMap = filterMap;
    }

}

注册

返回基础数据结构

基础数据结构类型:BooleanType, IntegerType, ShortType,LongType, FloatType, DoubleType, ByteType, StringType, DateType, NullType, TimestampType

import com.sogo.getimei.udf.BoolFilterUdf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;

// 初始化SparkSession
SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
// 注册临时UDF
spark.udf().register("boolFilterUdf", BoolFilterUdf.boolFilterUdf, DataTypes.BooleanType);

返回复杂数据结构

以返回Map<String, String>结构为例说明。

文章1指出可以通过fromJson方法来构建复杂的结构,但不能用于java;文章2给出了scale代码的json格式,返回的数据结构更复杂。基于此,本文从简单到组合,给出可执行的java实现。

构建MapType的json

String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType).json();

mapTypeJson的内容为:

{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true}

构建DataType

DataType mapDataType = DataType.fromJson(mapTypeJson);

注册

spark.udf().register("mapFilterUdf", BoolFilterUdf.mapFilterUdf, mapDataType);

使用

// df的schema结构: userid string
df.selectExpr("mapFilterUdf(userid) as id_map")
        .filter("id_map is not null")
        .show(10, 0);

扩展推广: 构造struct类的DataType

目标struct的形式:

struct<name:string,scores:map<string,float>,friends:array<string>>

上面已完成了Map类型的DataTypede构造,同样可以构造出Array类型的DataType如下:

DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());

然后借助StructField和StructType完成struct类的DataType的构建

List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);

得到的json string的结果与文章2描述的一致,之后可以直接通过json格式配置DataType

{
    "type":"struct",
    "fields":[
        {
            "name":"name",
            "type":"string",
            "nullable":true,
            "metadata":{

            }
        },
        {
            "name":"scores",
            "type":{
                "type":"map",
                "keyType":"string",
                "valueType":"float",
                "valueContainsNull":true
            },
            "nullable":true,
            "metadata":{

            }
        },
        {
            "name":"friends",
            "type":{
                "type":"array",
                "elementType":"string",
                "containsNull":true
            },
            "nullable":true,
            "metadata":{

            }
        }
    ]
}

哎,没想到报错了。错误中的value就是StudyEntity类的toString()方法返回的结果。文章3可遇到了这个问题,可惜没有解答,怎么办呢?

The value ({"friends":["liu11","liu12","liu13"],"name":"liu1","scores":{"Chn":99.0,"Math":98.0,"Eng":97.0}}) of the type (com.s
ogo.getimei.entity.StudyEntity) cannot be converted to struct<name:string,scores:map<string,float>,friends:array<string>>

失望之余,测试了下map<String, List<String>>结构,发现可以:

DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, arrayStringDataType).json();
DataType mapStringListDataType = DataType.fromJson(mapTypeJson);

本着继续探索的精神,发现使用Row类型替换Entity能解决问题。如下:</br>

StudyEntity.java (含UDF)

package com.sogo.getimei.entity;

import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;

import java.io.Serializable;
import java.util.*;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/4
 * @Time: 14:09
 * @des:
 */
@Setter
@Getter
public class StudyEntity implements Serializable {
    private String name;
    private Map<String, Float> scores;
    private List<String> friends;

    /**
     * UDF: use Row class to return struct type
     */
    public static UDF1<String, Row> structFilterUdf = new UDF1<String, Row>() {
        @Override
        public Row call(String s) throws Exception {
            StudyEntity parse = StudyEntity.parse(s);
            return RowFactory.create(parse.getName(), parse.getScores(), parse.getFriends());
        }
    };

    /**
     * 这种方法运行时报错:toString()的内容无法转换成struct
     */
    public static UDF1<String, StudyEntity> parseStudyEntityUdf = new UDF1<String, StudyEntity>() {
        @Override
        public StudyEntity call(String s) throws Exception {
            return StudyEntity.parse(s);
        }
    };
    public static StudyEntity parse(String s) {
        // liu t Chinese:98,Math:99,English:97 t zhangsan,lisi,zhaoli
        if (StringUtils.isEmpty(s)) {
            return null;
        }
        StudyEntity studyEntity = new StudyEntity();
        String[] fields = s.split("t", -1);
        if (fields.length < 3) {
            return null;
        }
        // name
        studyEntity.setName(fields[0]);
        // scores
        Map<String, Float> scores = new HashMap<>();
        for (String score : fields[1].split(",", -1)) {
            String[] courseAndScore = score.split(":", -1);
            scores.put(courseAndScore[0], Float.valueOf(courseAndScore[1]));
        }
        studyEntity.setScores(scores);
        // friends
        String[] friendStrs = fields[2].split(",", -1);
        List<String> friends = new ArrayList<>();
        Collections.addAll(friends, friendStrs);
        studyEntity.setFriends(friends);
        return studyEntity;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }

}

逻辑处理代码

DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.FloatType).json();
DataType mapStringFloatDataType = DataType.fromJson(mapTypeJson);

List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);

// UDF 注册
spark.udf().register("structFilterUdf", StudyEntity.structFilterUdf, studyDataType);
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/study_test_data.txt")
        .withColumn("split_col", split(col("value"), "t"))
        .selectExpr("structFilterUdf(value) as study")
        .selectExpr("study.name as name", "study.scores as scores", "study.friends as friends");
studyDs.show(20, 0);
studyDs.printSchema();

测试数据 study_test_data.txt:以't'分割,1=name, 2=scores, 3=friends

liu1	Chn:99,Math:98,Eng:97	liu11,liu12,liu13
liu2	Chn:89,Math:88,Eng:87	liu21,liu22,liu23
liu3	Chn:79,Math:78,Eng:77	liu31,liu32,liu33
liu4	Chn:69,Math:68,Eng:67	liu41,liu42,liu43

输出结果

+----+----------------------------------------+---------------------+
|name|scores                                  |friends               |
+----+----------------------------------------+---------------------+
|liu1|[Chn -> 99.0, Math -> 98.0, Eng -> 97.0]|[liu11, liu12, liu13]|
|liu2|[Chn -> 89.0, Math -> 88.0, Eng -> 87.0]|[liu21, liu22, liu23]|
|liu3|[Chn -> 79.0, Math -> 78.0, Eng -> 77.0]|[liu31, liu32, liu33]|
|liu4|[Chn -> 69.0, Math -> 68.0, Eng -> 67.0]|[liu41, liu42, liu43]|
+----+----------------------------------------+---------------------+

root
 |-- name: string (nullable = true)
 |-- scores: map (nullable = true)
 |    |-- key: string
 |    |-- value: float (valueContainsNull = true)
 |-- friends: array (nullable = true)
 |    |-- element: string (containsNull = true)

再探究:struct 中嵌套 struct

继续深究 struct 中嵌套 struct 的问题,也即文章5中遇到的问题。实现发现,若直接返回Entity(或者struct等非基础数据类型时)都会报错。因此,可以通过将它们转换成Row类型解决。以下以解决文章5中的返回PersonEntity为例说明。</br>

public class PersonEntity {
    private String name;
    private Integer age;
    private List<AddressEntity> address;
}
public class AddressEntity {
    private String street;
    private String city;
}

嵌套Entity实现

AddressEntity.java

package com.sogo.getimei.entity;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/6
 * @Time: 16:08
 * @des:
 */
@Setter
@Getter
public class AddressEntity implements Serializable {
    private String street;
    private String city;

    public AddressEntity() {}

    public AddressEntity(String street, String city) {
        this.street = street;
        this.city = city;
    }

    /**
    * 构造 AddressEntity的 DataType
    */
    public static DataType dataType() {
        List<StructField> structFieldList = new ArrayList<>(2);
        structFieldList.add(DataTypes.createStructField("street", DataTypes.StringType, true));
        structFieldList.add(DataTypes.createStructField("city", DataTypes.StringType, true));
        String jsonStr = DataTypes.createStructType(structFieldList).json();
        // 组装成struct DataType
        return DataType.fromJson(jsonStr);
    }
}

PersonEntity.java </br>

在 personParseUdf 中,先将List<AddressEntity> 转换成了 List<Row>,再将PersonEntity转换成Row(包含List<Row>)。

package com.sogo.getimei.entity;

import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/6
 * @Time: 16:05
 * @des:
 */
@Setter
@Getter
public class PersonEntity implements Serializable {
    private String name;
    private Integer age;
    private List<AddressEntity> address;

    public static DataType dataType() {
        List<StructField> structFieldList = new ArrayList<>(3);
        // name
        structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        // age
        structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        // address
        DataType arrayDataType = DataType.fromJson(DataTypes.createArrayType(AddressEntity.dataType()).json());
        structFieldList.add(DataTypes.createStructField("address", arrayDataType, true));
        // final struct
        String jsonStr = DataTypes.createStructType(structFieldList).json();
        return DataType.fromJson(jsonStr);
    }

    public static UDF1<String, Row> personParseUdf = new UDF1<String, Row>() {
        @Override
        public Row call(String s) throws Exception {
            PersonEntity personEntity = PersonEntity.parse(s);
            List<Row> rowList = new ArrayList<>();
            for (AddressEntity addressEntity : personEntity.getAddress()) {
                rowList.add(RowFactory.create(addressEntity.getStreet(), addressEntity.getCity()));
            }
            return RowFactory.create(personEntity.getName(), personEntity.getAge(), rowList);
        }
    };

    public static PersonEntity parse(String str) {
        if (StringUtils.isEmpty(str)) {
            return null;
        }
        String[] fields = str.split("t", -1);
        PersonEntity personEntity = new PersonEntity();
        personEntity.setName(fields[0]);
        personEntity.setAge(Integer.valueOf(fields[1]));
        List<AddressEntity> address = new ArrayList<>();
        String[] fieldsAddress = fields[2].split(",", -1);
        for (String s : fieldsAddress) {
            String[] add = s.split(":", -1);
            address.add(new AddressEntity(add[0], add[1]));
        }
        personEntity.setAddress(address);
        return personEntity;
    }
}

测试

测试代码

// UDF 注册
spark.udf().register("personParseUdf", PersonEntity.personParseUdf, PersonEntity.dataType());
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/person_test_data.txt")
        .selectExpr("personParseUdf(value) as person")
        .selectExpr("person.name as name", "person.age as age", "person.address as address");
studyDs.show(20, 0);
studyDs.printSchema();

测试数据

liu1	90	Chn:99,Math:98,Eng:97
liu2	80	Chn:89,Math:88,Eng:87
liu3	70	Chn:79,Math:78,Eng:77
liu4	60	Chn:69,Math:68,Eng:67

测试结果

+----+---+----------------------------------+
|name|age|address                           |
+----+---+----------------------------------+
|liu1|90 |[[Chn, 99], [Math, 98], [Eng, 97]]|
|liu2|80 |[[Chn, 89], [Math, 88], [Eng, 87]]|
|liu3|70 |[[Chn, 79], [Math, 78], [Eng, 77]]|
|liu4|60 |[[Chn, 69], [Math, 68], [Eng, 67]]|
+----+---+----------------------------------+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- city: string (nullable = true)

小结

Spark UDF1 返回基础数结构时,直接使用DataTypes中已定义的;返回Map、Array结构时,先使用createArrayType、createMapType创建对应的json string,再使用DataType.fromJson(...)创建DataType;返回struct或者struct的嵌套结构时,需要将RowFactory.create(...)将struct转换成Row。

参考文献

1 如何使用Spark UDF返回复杂类型 https://mlog.club/article/1574696

2 使用 json定义spark sql schema 代码例子 http://spark.coolplayer.net/?p=3674

3 Failed to execute user defined function in Apache Spark using Scala https://stackoverflow.com/questions/44570303/failed-to-execute-user-defined-function-in-apache-spark-using-scala

4 How to create a Row from a List or Array in Spark using java https://stackoverflow.com/questions/39696403/how-to-create-a-row-from-a-list-or-array-in-spark-using-java

5 Spark - Transforming Complex Data Types https://stackoverflow.com/questions/58232599/spark-transforming-complex-data-types