Spark UDF1 返回复杂结构
Spark UDF1 返回复杂结构
由java开发UDF1需指定返回值的DataType,spark-2.3.1暂不支持Array、Map这些复杂结构。因此,需要自定义DataType,满足定制化需求。以下以自定义Map结构的DataType为例进行说明。
自定义UDF1
UDF mapFilterUdf 返回Map结构
BoolFilterUdf.java
package com.sogo.getimei.udf;
import org.apache.spark.sql.api.java.UDF1;
import java.util.HashMap;
import java.util.Map;
/**
* @Created by IntelliJ IDEA.
* @author: MikeLiu
* @Date: 2020/8/3
* @Time: 22:40
* @des:
*/
public class BoolFilterUdf {
/**
* 这里主要保存不变的映射数据,比如黑名单(目前使用这种方式向UDF传入字典等非DF的列)
*/
public static Map<String, String> filterMap;
/**
* 返回值为Boolean类型
*/
public static UDF1<String, Boolean> boolFilterUdf = new UDF1<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return filterMap.containsKey(s);
}
};
/**
* 返回值为Map类型
*/
public static UDF1<String, Map<String, String>> mapFilterUdf = new UDF1<String, Map<String, String>>() {
@Override
public Map<String, String> call(String s) throws Exception {
return filterMap.containsKey(s) ? BoolFilterUdf.filterMap : null;
}
};
/**
* 构造函数,初始化配置字典
*/
public BoolFilterUdf() {
BoolFilterUdf.filterMap = new HashMap<>();
}
/**
* 构造函数,初始化配置字典
*/
public BoolFilterUdf(Map<String, String> filterMap) {
BoolFilterUdf.filterMap = filterMap;
}
}
注册
返回基础数据结构
基础数据结构类型:BooleanType, IntegerType, ShortType,LongType, FloatType, DoubleType, ByteType, StringType, DateType, NullType, TimestampType
import com.sogo.getimei.udf.BoolFilterUdf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
// 初始化SparkSession
SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
// 注册临时UDF
spark.udf().register("boolFilterUdf", BoolFilterUdf.boolFilterUdf, DataTypes.BooleanType);
返回复杂数据结构
以返回Map<String, String>结构为例说明。
文章1指出可以通过fromJson方法来构建复杂的结构,但不能用于java;文章2给出了scale代码的json格式,返回的数据结构更复杂。基于此,本文从简单到组合,给出可执行的java实现。
构建MapType的json
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType).json();
mapTypeJson的内容为:
{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true}
构建DataType
DataType mapDataType = DataType.fromJson(mapTypeJson);
注册
spark.udf().register("mapFilterUdf", BoolFilterUdf.mapFilterUdf, mapDataType);
使用
// df的schema结构: userid string
df.selectExpr("mapFilterUdf(userid) as id_map")
.filter("id_map is not null")
.show(10, 0);
扩展推广: 构造struct类的DataType
目标struct的形式:
struct<name:string,scores:map<string,float>,friends:array<string>>
上面已完成了Map类型的DataTypede构造,同样可以构造出Array类型的DataType如下:
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
然后借助StructField和StructType完成struct类的DataType的构建
List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);
得到的json string的结果与文章2描述的一致,之后可以直接通过json格式配置DataType
{
"type":"struct",
"fields":[
{
"name":"name",
"type":"string",
"nullable":true,
"metadata":{
}
},
{
"name":"scores",
"type":{
"type":"map",
"keyType":"string",
"valueType":"float",
"valueContainsNull":true
},
"nullable":true,
"metadata":{
}
},
{
"name":"friends",
"type":{
"type":"array",
"elementType":"string",
"containsNull":true
},
"nullable":true,
"metadata":{
}
}
]
}
哎,没想到报错了。错误中的value就是StudyEntity类的toString()方法返回的结果。文章3可遇到了这个问题,可惜没有解答,怎么办呢?
The value ({"friends":["liu11","liu12","liu13"],"name":"liu1","scores":{"Chn":99.0,"Math":98.0,"Eng":97.0}}) of the type (com.s
ogo.getimei.entity.StudyEntity) cannot be converted to struct<name:string,scores:map<string,float>,friends:array<string>>
失望之余,测试了下map<String, List<String>>结构,发现可以:
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, arrayStringDataType).json();
DataType mapStringListDataType = DataType.fromJson(mapTypeJson);
本着继续探索的精神,发现使用Row类型替换Entity能解决问题。如下:</br>
StudyEntity.java (含UDF)
package com.sogo.getimei.entity;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import java.io.Serializable;
import java.util.*;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/4
* @Time: 14:09
* @des:
*/
@Setter
@Getter
public class StudyEntity implements Serializable {
private String name;
private Map<String, Float> scores;
private List<String> friends;
/**
* UDF: use Row class to return struct type
*/
public static UDF1<String, Row> structFilterUdf = new UDF1<String, Row>() {
@Override
public Row call(String s) throws Exception {
StudyEntity parse = StudyEntity.parse(s);
return RowFactory.create(parse.getName(), parse.getScores(), parse.getFriends());
}
};
/**
* 这种方法运行时报错:toString()的内容无法转换成struct
*/
public static UDF1<String, StudyEntity> parseStudyEntityUdf = new UDF1<String, StudyEntity>() {
@Override
public StudyEntity call(String s) throws Exception {
return StudyEntity.parse(s);
}
};
public static StudyEntity parse(String s) {
// liu t Chinese:98,Math:99,English:97 t zhangsan,lisi,zhaoli
if (StringUtils.isEmpty(s)) {
return null;
}
StudyEntity studyEntity = new StudyEntity();
String[] fields = s.split("t", -1);
if (fields.length < 3) {
return null;
}
// name
studyEntity.setName(fields[0]);
// scores
Map<String, Float> scores = new HashMap<>();
for (String score : fields[1].split(",", -1)) {
String[] courseAndScore = score.split(":", -1);
scores.put(courseAndScore[0], Float.valueOf(courseAndScore[1]));
}
studyEntity.setScores(scores);
// friends
String[] friendStrs = fields[2].split(",", -1);
List<String> friends = new ArrayList<>();
Collections.addAll(friends, friendStrs);
studyEntity.setFriends(friends);
return studyEntity;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
逻辑处理代码
DataType arrayStringDataType = DataType.fromJson(DataTypes.createArrayType(DataTypes.StringType).json());
String mapTypeJson = DataTypes.createMapType(DataTypes.StringType, DataTypes.FloatType).json();
DataType mapStringFloatDataType = DataType.fromJson(mapTypeJson);
List<StructField> structFieldList = new ArrayList<>();
// 添加name字段, 类型为String DataType
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// 添加scores字段,类型为Map DataType
structFieldList.add(DataTypes.createStructField("scores", mapStringFloatDataType, true));
// 添加friends字段, 类型为 Array DataType
structFieldList.add(DataTypes.createStructField("friends", arrayStringDataType, true));
String studyTypeJsonStr = DataTypes.createStructType(structFieldList).json();
// 最后组装成struct DataType
DataType studyDataType = DataType.fromJson(studyTypeJsonStr);
// UDF 注册
spark.udf().register("structFilterUdf", StudyEntity.structFilterUdf, studyDataType);
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/study_test_data.txt")
.withColumn("split_col", split(col("value"), "t"))
.selectExpr("structFilterUdf(value) as study")
.selectExpr("study.name as name", "study.scores as scores", "study.friends as friends");
studyDs.show(20, 0);
studyDs.printSchema();
测试数据 study_test_data.txt:以't'分割,1=name, 2=scores, 3=friends
liu1 Chn:99,Math:98,Eng:97 liu11,liu12,liu13
liu2 Chn:89,Math:88,Eng:87 liu21,liu22,liu23
liu3 Chn:79,Math:78,Eng:77 liu31,liu32,liu33
liu4 Chn:69,Math:68,Eng:67 liu41,liu42,liu43
输出结果
+----+----------------------------------------+---------------------+
|name|scores |friends |
+----+----------------------------------------+---------------------+
|liu1|[Chn -> 99.0, Math -> 98.0, Eng -> 97.0]|[liu11, liu12, liu13]|
|liu2|[Chn -> 89.0, Math -> 88.0, Eng -> 87.0]|[liu21, liu22, liu23]|
|liu3|[Chn -> 79.0, Math -> 78.0, Eng -> 77.0]|[liu31, liu32, liu33]|
|liu4|[Chn -> 69.0, Math -> 68.0, Eng -> 67.0]|[liu41, liu42, liu43]|
+----+----------------------------------------+---------------------+
root
|-- name: string (nullable = true)
|-- scores: map (nullable = true)
| |-- key: string
| |-- value: float (valueContainsNull = true)
|-- friends: array (nullable = true)
| |-- element: string (containsNull = true)
再探究:struct 中嵌套 struct
继续深究 struct 中嵌套 struct 的问题,也即文章5中遇到的问题。实现发现,若直接返回Entity(或者struct等非基础数据类型时)都会报错。因此,可以通过将它们转换成Row类型解决。以下以解决文章5中的返回PersonEntity为例说明。</br>
public class PersonEntity {
private String name;
private Integer age;
private List<AddressEntity> address;
}
public class AddressEntity {
private String street;
private String city;
}
嵌套Entity实现
AddressEntity.java
package com.sogo.getimei.entity;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/6
* @Time: 16:08
* @des:
*/
@Setter
@Getter
public class AddressEntity implements Serializable {
private String street;
private String city;
public AddressEntity() {}
public AddressEntity(String street, String city) {
this.street = street;
this.city = city;
}
/**
* 构造 AddressEntity的 DataType
*/
public static DataType dataType() {
List<StructField> structFieldList = new ArrayList<>(2);
structFieldList.add(DataTypes.createStructField("street", DataTypes.StringType, true));
structFieldList.add(DataTypes.createStructField("city", DataTypes.StringType, true));
String jsonStr = DataTypes.createStructType(structFieldList).json();
// 组装成struct DataType
return DataType.fromJson(jsonStr);
}
}
PersonEntity.java </br>
在 personParseUdf 中,先将List<AddressEntity> 转换成了 List<Row>,再将PersonEntity转换成Row(包含List<Row>)。
package com.sogo.getimei.entity;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan
* @Date: 2020/8/6
* @Time: 16:05
* @des:
*/
@Setter
@Getter
public class PersonEntity implements Serializable {
private String name;
private Integer age;
private List<AddressEntity> address;
public static DataType dataType() {
List<StructField> structFieldList = new ArrayList<>(3);
// name
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
// age
structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
// address
DataType arrayDataType = DataType.fromJson(DataTypes.createArrayType(AddressEntity.dataType()).json());
structFieldList.add(DataTypes.createStructField("address", arrayDataType, true));
// final struct
String jsonStr = DataTypes.createStructType(structFieldList).json();
return DataType.fromJson(jsonStr);
}
public static UDF1<String, Row> personParseUdf = new UDF1<String, Row>() {
@Override
public Row call(String s) throws Exception {
PersonEntity personEntity = PersonEntity.parse(s);
List<Row> rowList = new ArrayList<>();
for (AddressEntity addressEntity : personEntity.getAddress()) {
rowList.add(RowFactory.create(addressEntity.getStreet(), addressEntity.getCity()));
}
return RowFactory.create(personEntity.getName(), personEntity.getAge(), rowList);
}
};
public static PersonEntity parse(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
String[] fields = str.split("t", -1);
PersonEntity personEntity = new PersonEntity();
personEntity.setName(fields[0]);
personEntity.setAge(Integer.valueOf(fields[1]));
List<AddressEntity> address = new ArrayList<>();
String[] fieldsAddress = fields[2].split(",", -1);
for (String s : fieldsAddress) {
String[] add = s.split(":", -1);
address.add(new AddressEntity(add[0], add[1]));
}
personEntity.setAddress(address);
return personEntity;
}
}
测试
测试代码
// UDF 注册
spark.udf().register("personParseUdf", PersonEntity.personParseUdf, PersonEntity.dataType());
// 数据处理
Dataset<Row> studyDs = spark.read().text("./data/input/person_test_data.txt")
.selectExpr("personParseUdf(value) as person")
.selectExpr("person.name as name", "person.age as age", "person.address as address");
studyDs.show(20, 0);
studyDs.printSchema();
测试数据
liu1 90 Chn:99,Math:98,Eng:97
liu2 80 Chn:89,Math:88,Eng:87
liu3 70 Chn:79,Math:78,Eng:77
liu4 60 Chn:69,Math:68,Eng:67
测试结果
+----+---+----------------------------------+
|name|age|address |
+----+---+----------------------------------+
|liu1|90 |[[Chn, 99], [Math, 98], [Eng, 97]]|
|liu2|80 |[[Chn, 89], [Math, 88], [Eng, 87]]|
|liu3|70 |[[Chn, 79], [Math, 78], [Eng, 77]]|
|liu4|60 |[[Chn, 69], [Math, 68], [Eng, 67]]|
+----+---+----------------------------------+
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- address: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)
小结
Spark UDF1 返回基础数结构时,直接使用DataTypes中已定义的;返回Map、Array结构时,先使用createArrayType、createMapType创建对应的json string,再使用DataType.fromJson(...)创建DataType;返回struct或者struct的嵌套结构时,需要将RowFactory.create(...)将struct转换成Row。
参考文献
1 如何使用Spark UDF返回复杂类型 https://mlog.club/article/1574696
2 使用 json定义spark sql schema 代码例子 http://spark.coolplayer.net/?p=3674
- 【Dev Club分享】iOS黑客技术大揭秘
- Linux终端:用cat命令查看不可见字符
- golang 函数定义及其接口实例
- 分享两种圣诞节雪花特效JS代码(网站下雪效果)
- React 移动 web 极致优化
- golang 高效字符串拼接
- Linux+Nginx/Apache/Tomcat新增SSL证书,开启https访问教程
- golang 使用时间通过md5生成token
- golang中对map操作类
- Nginx在线服务状态下平滑升级或新增模块的详细操作记录
- 【Dev Club分享】微信读书iOS性能优化
- [svn: E155004]svn update报database is locked错误的解决办法
- WordPress高亮插件:Crayon Syntax Highlighter加载优化
- 深入理解 ButterKnife,让你的程序学会写代码
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Transform和Task之间有关?| Gradle
- user.ini文件构成的PHP后门
- Android厂商推送Plugin化 | 掘金技术征文-双节特别篇
- 关于lnmp目录禁止执行的绕过与正确方法
- Quill编辑器自定义字体和字体大小
- emlog某重要插件前台SQL注入+Getshell
- ES6中的对象与类
- QQ某业务主站DOM XSS挖掘与分析(绕过WAF)
- 品优购电梯导航案例
- stm32mp157开发板LCD 显示测试&触摸屏测试
- stm32mp157开发板LINE 接口测试方法
- stm32mp157开发板MIC 接口测试方法
- stm32mp157开发板声卡接口测试
- stm32mp157开发板USB Host 接口&OTG 接口测试
- Centos7安装ovs