Spark UDF加载外部资源

时间:2022-07-24
本文章向大家介绍Spark UDF加载外部资源,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Spark UDF加载外部资源

前言

由于Spark UDF的输入参数必须是数据列column,在UDF中进行如Redis查询、白/黑名单过滤前,需要加载外部资源(如配置参数、白名单)初始化它们的实例。若它们都能被序列化,从Driver端初始化+broadcast的方式可以完成构建。而Redis、字典树等存在不能序列化的对象,也就无法从Driver端发送到Excutor端。因此,整体的思路是:在Driver端初始化可以被序列化的资源,在Excutor端利用资源构建不可序列化对象,从而分布完成整个对象的构建。

同时结合单列的思想,在每个Excutor端仅完成一次构建。核心关键在于在Excutor初始化静态变量等不可序列化的成员,以下提供3种解决思路。

  1. 在UDF的call方法中加载外部资源
  2. UDF的静态成员变量lazy初始化
  3. 用mapPartition替换UDF

本文以构建字典树为进行说明,Redis连接可以参考文章1

准备工作

本部分介绍AtKwdBo类、WordTrieEntity类;AtKwdBo类:使用AtKwdBo类接收构建字典树的词包;WordTrieEntity类:字典树的构造与字符串匹配

序列化问题

文章中3总结了序列化的问题,如下:

  1. 反序列化时serializable版本号不一致时会导致不能反序列化。
  2. 子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。 注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化
  3. 被关键字transient修饰的变量不能被序列化。
  4. 静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。

AtKwdBo类

keywords记录关键词,stopwords记录否词。若用户query词命中stopwords中的任一否词,过滤掉该条query词;若用户query命中keywords中的任一关键词,则命中当前词包。用户所有query必须命中所有词包,才能筛选出该用户。

AtKwdBo.java

 package com.sogo.getimei.entity;

import java.io.Serializable;
import java.util.*;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class AtKwdBo implements Serializable {
    private Set<String> keywords;
    private Set<String> stopwords;
    /**
     * just for test
     * @return
     */
    public static List<AtKwdBo> generateKeyWord() {
        // Keyword
        List<AtKwdBo> atKwdBos = new ArrayList<>();
        AtKwdBo atKwdBo = new AtKwdBo();
        Set<String> keywords = new HashSet<>();
        keywords.add("小米手机");
        keywords.add("雷军");
        keywords.add("小米10周年");
        atKwdBo.setKeywords(keywords);
        Set<String> stopwords = new HashSet<>();
        stopwords.add("华为手机");
        atKwdBo.setStopwords(stopwords);
        atKwdBos.add(atKwdBo);

        return atKwdBos;
    }
}

字典树

字典树(AC自动机)需要引用的maven依赖如下:

<!--AC自动机-->
<!-- https://mvnrepository.com/artifact/org.ahocorasick/ahocorasick -->
<dependency>
    <groupId>org.ahocorasick</groupId>
    <artifactId>ahocorasick</artifactId>
    <version>0.4.0</version>
</dependency>

字典树的构建方法

private static Trie buildTrie(Set<String> stringSet) {
    return Trie.builder().addKeywords(stringSet).build();
}

基于字典树构建 "关键词字典树" 和 "停词字典树":

注:主要实现词包间的与或非逻辑,具体细节可以忽略

WordTrieEntity.java

package com.sogo.getimei.entity;

import lombok.Getter;
import lombok.Setter;
import org.ahocorasick.trie.Trie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/26
 * @Time: 23:08
 * @des:
 */
@Setter
@Getter
public class WordTrieEntity implements Serializable {
    // LOGGER
    private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieEntity.class);
    // 不被序列化
    private transient Trie keywordsTrie;
    // 不被序列化
    private transient Trie stopwordsTrie;

    public WordTrieEntity(Trie keywordsTrie, Trie stopwordsTrie) {
        this.keywordsTrie = keywordsTrie;
        this.stopwordsTrie = stopwordsTrie;
    }

    public static List<WordTrieEntity> generateKeywordTrieList(List<AtKwdBo> atKwdBos) {
        // get key word
        List<WordTrieEntity> keywordsTrieList = new ArrayList<>();
        for (AtKwdBo atKwdBo : atKwdBos) {
            Trie keywordsTrie = buildTrie(atKwdBo.getKeywords());
            Trie stopwordsTrie = buildTrie(atKwdBo.getStopwords());
            keywordsTrieList.add(new WordTrieEntity(keywordsTrie, stopwordsTrie));
        }
        System.out.println("I am initialized in WordTrieEntity");
        return keywordsTrieList;
    }

    private static Trie buildTrie(Set<String> stringSet) {
        return Trie.builder().addKeywords(stringSet).build();
    }

    public static Boolean contains(Seq<String> stringSeq, List<WordTrieEntity> wordTrieList) {
        for (WordTrieEntity wordTrie : wordTrieList) {
            // 词包间是“与”的关系
            if (Boolean.FALSE.equals(contains(wordTrie, stringSeq))) {
                return false;
            }
        }
        return true;
    }

    private static Boolean contains(WordTrieEntity wordTrie, Seq<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.apply(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, List<String> stringSeq) {
        // 只要存在一个即可
        for (int i = 0; i < stringSeq.size(); i ++)
            // 词包内是“或”的关系
            if (Boolean.TRUE.equals(contains(wordTrie, stringSeq.get(i)))) {
                return true;
            }
        // 都不存在时,返回false
        return false;
    }

    private static Boolean contains(WordTrieEntity wordTrie, String query) {
        // 否词
        if (null != wordTrie.getStopwordsTrie() && wordTrie.getStopwordsTrie().containsMatch(query)) {
            return false;
        }
        // 匹配关键词
        if (null == wordTrie.getKeywordsTrie()) {
            LOGGER.error("keyword is null");
        }
        return null != wordTrie.getKeywordsTrie() && wordTrie.getKeywordsTrie().containsMatch(query);
    }
}

在UDF的call方法中加载外部资源

Spark UDF在注册时就需要实例化,之后有且仅会(自动)调用call方法。考虑到字典树中存在不能被序列化的对象,因此将字典树用static关键词修饰。而静态成员变量在Driver端初始化,不会传输到Excutor端,调用时会出现空指针异常(另外一种表现是:在local模式下测试正常,在yarn模式报错)。因此,我们需要在call方法中初始化(因为此时调用发生在Excutor端)。为了防止字典树被多次初始化,我们模拟单列:

UDF代码

FilterQueryByAcAutoUdf.java

wordTrieList成员变量是个List结构,其中一个元素对应一个词包,词包中包含有关键词和否词。

package com.sogo.getimei.udf;

import com.sogo.getimei.entity.AtKwdBo;
import com.sogo.getimei.entity.WordTrieEntity;
import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.api.java.UDF1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;

import java.io.Serializable;
import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/26
 * @Time: 14:24
 * @des:
 */
@Getter
@Setter
public class FilterQueryByAcAutoUdf implements UDF1<Seq<String>, Boolean>, Serializable{
    // log
    private final Logger LOGGER = LoggerFactory.getLogger(FilterQueryByAcAutoUdf.class);
    // static修饰: 1. 字典树中有不能被序列化的对象; 2. 模拟单列,仅需被初始化一次;
    private static List<WordTrieEntity> wordTrieList;
    // 若 AtKwdBo 中存在不能被序列化的对象,本方法不适用
    private List<AtKwdBo> atKwdBos;

    public FilterQueryByAcAutoUdf(List<AtKwdBo> atKwdBos) {
        this.atKwdBos = atKwdBos;
//        // 直接在这里初始化,不会传到Excutor,也即执行时会有空指针异常的问题
//        wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos);
    }

    // 模拟单列:懒汉模式,2次校验,保存在Excutor中仅被初始化一次
    @Override
    public Boolean call(Seq<String> stringSeq) throws Exception {
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            synchronized (FilterQueryByAcAutoUdf.class) {
                if (null == wordTrieList || wordTrieList.isEmpty()) {
                    // 若 AtKwdBo 中存在不能被序列化的对象,可以放在此处初始化
                    wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos);
                    LOGGER.error("[DEBUG] Test how many times it will be initial; wordTrieList is null or empty");
                }
            }
        }
        return WordTrieEntity.contains(stringSeq, wordTrieList);
    }
}

调用代码

spark.udf().register("filterQueryWordsUdf",
        new FilterQueryByAcAutoUdf(AtKwdBo.generateKeyWord(),
        DataTypes.BooleanType);

Dataset<Row> acDs = waplxDs.filter("filterQueryWordsUdf(fwords)").selectExpr("imei", "explode(fwords) as fwords")

测试

输入数据

waplxDs的schema如下

root
 |-- imei: string (nullable = true)
 |-- fwords: array (nullable = true)
 |    |-- element: string (containsNull = false)

输出结果

匹配出搜索了"小米手机"的用户

+----------------------------------------+------------------------------+
|imei                                    |fwords                        |
+----------------------------------------+------------------------------+
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大              |
|26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了                    |
|26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸|
|26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机                      |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife         |
+----------------------------------------+------------------------------+

log日志仅出现一次

I am initialized in WordTrieEntity
 ERROR com.sogo.getimei.udf.FilterQueryByAcAutoUdf - [DEBUG] Test how many times it will be initial; wordTrieList is null or empty

输出结果符合预期,本地测试正常,集群测试正常。查看Excutor日志可知,每个Excutor中wordTrieList仅被初始化一次。

UDF的静态成员变量lazy初始化

FilterQueryByAcAutoUdf0类只包含静态变量和静态方法,在Driver无需实例化,因此wordTrieList = WordTrieEntity.generateKeywordTrieList(AtKwdBo.generateKeyWord()); 不会被执行,仅在调用FilterQueryByAcAutoUDF.call方法时才会被执行2,这就保证在每个Excutor都会构建出字典树,不会出现空指针异常的问题。本方法适应于词包固定的情况,当程序运行起来后,由词包构建的字典树就不会改变。

文章2中讲明了静态成员变量初始化实机为:读取一个类的静态字段

UDF代码

FilterQueryByAcAutoUdf0.java

package com.sogo.getimei.udf;

import com.sogo.getimei.entity.AtKwdBo;
import com.sogo.getimei.entity.WordTrieEntity;
import org.apache.spark.sql.api.java.UDF1;
import scala.Serializable;
import scala.collection.Seq;

import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/27
 * @Time: 16:43
 * @des:
 */
public class FilterQueryByAcAutoUdf0 implements Serializable {
    // lazy初始化,在使用时才会被初始化(仅在Excutor端调用)
    private static List<WordTrieEntity> wordTrieList = WordTrieEntity.generateKeywordTrieList(AtKwdBo.generateKeyWord());

    private FilterQueryByAcAutoUdf0() {
        System.out.println("I am initialized in FilterQueryByAcAutoUdf0");
    }

    public static UDF1<Seq<String>, Boolean> FilterQueryByAcAutoUDF = new UDF1<Seq<String>, Boolean>() {
        @Override
        public Boolean call(Seq<String> stringSeq) throws Exception {
            // 在此处调用wordTrieList,才会初始化wordTrieList,且仅被初始化一次
            return WordTrieEntity.contains(stringSeq, wordTrieList);
        }
    };
}

调用代码

spark.udf().register("filterQueryWordsUdf",
        FilterQueryByAcAutoUdf0.FilterQueryByAcAutoUDF,
        DataTypes.BooleanType);

Dataset<Row> acDs = waplxDs.filter("filterQueryWordsUdf(fwords)")
        .selectExpr("imei", "explode(fwords) as fwords")

测试

waplxDs同上,输出结果符合预期,如下:

+----------------------------------------+------------------------------+
|imei                                    |fwords                        |
+----------------------------------------+------------------------------+
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大              |
|26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了                    |
|26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸|
|26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机                      |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife         |
+----------------------------------------+------------------------------+

用mapPartition替换UDF

解决写Spark UDF 麻烦,那就用Dataset的mapPartition算子代码。使用mapPartition算子,我们也不能在Driver端初始化不能被序列化的成员变量。使用broadcast+单例既保证了尽量少的拷贝、尽量少的初始化。

构建字典树(模拟单列的方式)

WordTrieInitEntity.java

package com.sogo.getimei.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/27
 * @Time: 10:44
 * @des:
 */
public class WordTrieInitEntity implements Serializable {
    // logger
    private final Logger logger = LoggerFactory.getLogger(WordTrieEntity.class);
    // key word and stop word
    private static List<WordTrieEntity> wordTrieList;
    // resource to build wordTrieList
    private List<AtKwdBo> atKwdBos;

    public WordTrieInitEntity(List<AtKwdBo> atKwdBos) {
        // 在 Driver 端初始化(可序列化的)资源数据
        this.atKwdBos = atKwdBos;
    }

    /**
     *  在 Excutor 端进行初始化字典树
     * @return 字典树
     */
    public List<WordTrieEntity> getWordTrieList() {
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            synchronized (WordTrieInitEntity.class) {
                if (null == wordTrieList || wordTrieList.isEmpty()) {
                    wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos);
                }
            }
        }
        return wordTrieList;
    }
}

调用代码

// 实例化对象,初始化非静态成员变量
WordTrieInitEntity wordTrieInitEntity = new WordTrieInitEntity(AtKwdBo.generateKeyWord());
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
// 广播,此时主要广播构建字典树的词包数据
Broadcast<WordTrieInitEntity> wordTriesBroadcast = javaSparkContext.broadcast(wordTrieInitEntity);
// 使用mapPartition取代UDF
Dataset<Row> acDs = waplxDs.mapPartitions(new MapPartitionsFunction<Row, String>() {
    @Override
    public Iterator<String> call(Iterator<Row> iterator) throws Exception {
        // 在此处(Excutor中)构建字典树
        List<WordTrieEntity> wordTries = wordTriesBroadcast.value().getWordTrieList();
        logger.error("[DEBUG] wordTries:[{}]", wordTries);
        List<String> res = new ArrayList<>();
        if (null == wordTries || wordTries.isEmpty()) {
            logger.error("word tries is null or empty");
            return res.iterator();
        }
        // 整个Excutor公用一份字典树
        while (iterator.hasNext()) {
            Row row = iterator.next();
            Seq<String> fwords = row.getAs("fwords");
            if (WordTrieEntity.contains(fwords, wordTries)) {
                res.add(row.getAs("imei"));
            } else {
                res.add(null);
            }
        }
        return res.iterator();
    }
}, Encoders.STRING())
        .filter(col("value").isNotNull())
        .selectExpr("value as imei");

测试

waplxDs同上,输出结果中包含"26E014B8B77C0A442EC31E59505A1CED4D446779"符合预期,如下:

+----------------------------------------+
|imei                                    |
+----------------------------------------+
|26E014B8B77C0A442EC31E59505A1CED4D446779|
|3902BD5C873086B7D22CECFF73916E644D6A5533|
|3A8FC47D656B554EE8772285A6793FEF4F445134|
|76AFE3AC3337952787FAF1C1C1F188014D6A4531|
|0CDDB52E6CDD2BC085DAD34B1F59EAFD4F445578|
|0DD25A855BD894B2EC547D8842AF594C4E6A6B34|
|B3141189E2CC830F5E7D6FF2395B3C664D445534|
|31CB2E300E8DBF85A52523AD0EF59BD94E446779|
|E8D927387934B0829B74AB22EBC4202E4E6A4533|
|F32DBA15DEE9BCD619B2FB4A1D8665344F444D35|
+----------------------------------------+

用mapPartition替换UDF (实现mapPartition)

在主逻辑代码中new mapPartition 减弱了程序的可读性,因此实现mapPartition类中进行词包匹配:

实现mapPartition

WordTrieMapPartitionImpl.java

package com.sogo.getimei.entity;

import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.mutable.Seq;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan
 * @Date: 2020/8/29
 * @Time: 11:54
 * @des:
 */
public class WordTrieMapPartitionImpl implements MapPartitionsFunction<Row, User2QueriesEntity> {
    // logger
    private static final Logger LOGGER = LoggerFactory.getLogger(WordTrieMapPartitionImpl.class);
    // key word and stop word
    private static List<WordTrieEntity> wordTrieList;
    // resource to build wordTrieList
    private List<AtKwdBo> atKwdBos;
    // constructor
    public WordTrieMapPartitionImpl(List<AtKwdBo> atKwdBos) {
        this.atKwdBos = atKwdBos;
    }

    @Override
    public Iterator<User2QueriesEntity> call(Iterator<Row> iterator) throws Exception {
        // 在 Excutor 端进行初始化字典树,单例保证仅被初始化一次
        WordTrieMapPartitionImpl.wordTrieList = WordTrieMapPartitionImpl.getWordTrieList(atKwdBos);
        List<User2QueriesEntity> res = new ArrayList<>();
        while (iterator.hasNext()) {
            Row inputRow = iterator.next();
            Seq<String> fwords = inputRow.getAs("fwords");
            if (Boolean.TRUE.equals(WordTrieEntity.contains(fwords, wordTrieList))) {
                // JavaConverters 在spark2.3.1版本
                //List<String> fwordlist = JavaConverters.seqAsJavaListConverter(fwords).asJava();
                // // JavaConverters 在spark3.0.0版本
                // List<String> fwordlist = JavaConverters.seqAsJavaList(fwords)
                List<String> fwordlist = new ArrayList<>();
                for (int i = 0; i < fwords.length(); i ++) {
                    fwordlist.add(fwords.apply(i));
                }
                res.add(new User2QueriesEntity(inputRow.getAs("imei"), fwordlist));
            } else {
                // 由于返回User2QueriesEntity对象,需要new一个,不能直接用null替代
                res.add(new User2QueriesEntity());
            }
        }
        return res.iterator();
    }

    /**
     *  在 Excutor 端进行初始化字典树
     * @return 字典树
     */
    private static List<WordTrieEntity> getWordTrieList(List<AtKwdBo> atKwdBos) {
        if (null == wordTrieList || wordTrieList.isEmpty()) {
            synchronized (WordTrieInitEntity.class) {
                if (null == wordTrieList || wordTrieList.isEmpty()) {
                    wordTrieList = WordTrieEntity.generateKeywordTrieList(atKwdBos);
                }
            }
        }
        return wordTrieList;
    }

}

User2QueriesEntity.java

package com.sogo.getimei.entity;

import lombok.Getter;
import lombok.Setter;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

import java.io.Serializable;
import java.util.List;

@Setter
@Getter
public class User2QueriesEntity implements Serializable {
    private String imei;
    private List<String> fwords;

    public User2QueriesEntity() {}

    public User2QueriesEntity(String imei, List<String> fwords) {
        this.imei = imei;
        this.fwords = fwords;
    }

    public static Encoder<User2QueriesEntity> getEncoder() {
        return Encoders.bean(User2QueriesEntity.class);
    }
}

调用代码

JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<List<AtKwdBo>> AtKwdBoListBc = javaSparkContext.broadcast(AtKwdBo.generateKeyWord().get(1));
Dataset<Row> acDs = waplxDs
        .mapPartitions(new WordTrieMapPartitionImpl(AtKwdBoListBc.value()), User2QueriesEntity.getEncoder())
        .filter("imei is not null")
        .selectExpr("imei", "explode(fwords) as fwords")
        .cache();
acDs.show(20, 0);
acDs.printSchema();

测试

waplxDS同上,输出结果符合预期,字典树在Excutor端仅被构建一次:

+----------------------------------------+------------------------------+
|imei                                    |fwords                        |
+----------------------------------------+------------------------------+
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx5座套                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|荣威rx3导航                   |
|26E014B8B77C0A442EC31E59505A1CED4D446779|太靠左右边那么大              |
|26E014B8B77C0A442EC31E59505A1CED4D446779|电动车禁了                    |
|26E014B8B77C0A442EC31E59505A1CED4D446779|说实在的电动车禁了可以减少车祸|
|26E014B8B77C0A442EC31E59505A1CED4D446779|小米手机                      |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装高德            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装百度            |
|26E014B8B77C0A442EC31E59505A1CED4D446779|自己给汽车安装carlife         |
+----------------------------------------+------------------------------+
only showing top 10 rows

root
 |-- imei: string (nullable = true)
 |-- fwords: string (nullable = true)

小结

在Spark DS 中处理不能被序列化的对象时,要想在Excutor上使用它们,必须在Excutor中被初始化。因为,在Driver端初始化由static和transient修饰的对象(或成员变量)时,不会被发送到Excutor。这就是说,我们需要在Excutor上初始化它们,也即在Excutor执行的算子或方法中初始化它们。另一方面,为了保证在Excutor中仅初始化一次,可以使用单列、broadcast、static的lazy加载等方式。

参考文献

1 Spark中redis连接池的几种使用方法 http://mufool.com/2017/07/04/spark-redis/

2 java机制:类的加载详解 https://blog.csdn.net/mawei7510/article/details/83412304

3 生成dataset的几种方式 https://www.cnblogs.com/lyy-blog/p/9814662.html