Hive迁移Saprk SQL的坑和改进办法

时间:2022-05-07
本文章向大家介绍Hive迁移Saprk SQL的坑和改进办法,主要内容包括360-Spark集群概况、360-Spark应用、SparkSQL替换Hive、SparkSQL部署方案、Hive迁移SparkSQL – 坑 & 改进、SQL兼容 (SQL二义性问题)、transformation bugs (行尾部空列导致的数组越界)、transformation bugs (Script的标准错误缓冲区打满导致transform流程卡住)、输入小文件合并的改进 (增加支持自定义inputFormat类)、输出小文件合并的改进 (增加自动合并结果文件)、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

Qcon 全球软件开发者大会2016北京站 演讲主题:Spark在360的大规模实践与经验分享 李远策

360-Spark集群概况

360-Spark集群概况

360-Spark应用

MLLib • 算法:LDA、LR、FP-Growth、ALS、KMeans、随机森林等。 • 业务:新闻主题分类、新闻推荐、APP推荐、恶意代码识别、恶意域名检测等。 GraphX • 算法:PageRank、Louvain、LPA、连通子图等。 • 业务:搜索PageValue、网站安全监测等。 SparkSQL • 采用HiveContext替换公司90%以上的Hive作业,每天例行1.5W+作业。 • 每个Hive SQL平均3轮MR作业,平均性能提升2~5倍。

SparkSQL替换Hive

Hive迁移到SparkSQL的“正确打开方式”: 1、编译Spark加上-Phive -Phive-thriftserver参数 2、部署Spark(Yarn)集群 3、配置SparkSQL共用Hive的元数据库 4、用spark-hive(spark-sql)工具替换原有的hive命令 5、-e/–f 或者thriftserver提交作业。

SparkSQL部署方案

SparkSQL部署方案

Hive迁移SparkSQL – 坑 & 改进

SQL兼容 (Insert overwrite [local] directory的支持)

例如:insert overwrite directory ‘/tmp/testdir’ select *from T1; Hive中支持,SparkSQL暂时不支持。

因为SparkSQL-HiveContext的SQL解析调用了Hive的ParseDriver. parse完成,所以语法解析上不存在问题。

解决方案: 1、解析AST中的TOK_DIR和TOK_LOCAL_DIR将其转化成新定义的逻辑计划WriteToDirectory 2、将逻辑计划WriteToDirectory转换成新定义的物理计划WriteToDirectory。 3、在物理计划WriteToDirectory执行方法中复用InsertIntoHiveTable中的saveAsHiveFile逻辑将结果写到HDFS中。 4、如果是local directory则将结果再拉回到本地

SQL兼容 (SQL二义性问题)

例如:

    select C.id from (
           select A.id from testb as A
           join
          (select id from testb ) B
    on A.id=B.id) C;

C.id is A.id or B.id ?

transformation bugs (行尾部空列导致的数组越界)

例如: 001tABCt002t [001, ABC, 002] 003tEFGtt [003, EFG]

    new GenericInternalRow(
        prevLine.split(ioschema.outputRowFormatMap(
                              “TOK_TABLEROWFORMATFIELD”))
                     .map(CatalystTypeConverters.convertToCatalyst))

transformation bugs (Script的标准错误缓冲区打满导致transform流程卡住)

transformation

输入小文件合并的改进 (增加支持自定义inputFormat类)

默认采用建表时指定的InpurFormat,如果是默认的TextInputFormat,当小文件比较多是可能会导致RDD的partition数太多,导致性能下降。

解决办法: 通过参数允许用户指定InputFormat,在TableReader中反射生成对应的InputFormat对象并传入到HadoopRDD的构造函数中。

使用方法:

set spark.sql.hive.inputformat=org.apache.hadoop.mapred.lib.CombineTextInputFormat;

输出小文件合并的改进 (增加自动合并结果文件)

当spark.sql.shuffle.partitions设置的比较大且结果数据集比较小时,会产生大量的小文件(文件数等同spark.sql.shuffle.partitions)。 解决办法: 在最后的执行计划中加入一个repartition transformation。通过参数控制最终的partitions数且不影响shuffle partition的数量。 使用方法: set spark.sql.result.partitions=10;

• 支持yarn-cluster模式,减小client的负载默认的yarn-client模式下Scheduler会运行在client上,加重client机器的负载。 解决办法: 让sparkSQL工具支持yarn-cluster模式。 1)在Yarn集群上部署SparkSQL依赖的hive metastore jar包。 2)开通Yarn nodemanager节点访问Hive metastore数据库的权限。 3)解决“”“转义问题。如 spark-hive –e “select * fromuser where name = ”张三””;在yarn-cluster模式中会触发两次command执行从而导致“”被转义两次。

邮箱: liyuance@gmail.com liyuance@360.cn 急招大数据运维和运维开发人员,谢谢