SparkSQL与Hive metastore Parquet转换
Spark SQL为了更好的性能,在读写Hive metastore parquet格式的表时,会默认使用自己的Parquet SerDe,而不是采用Hive的SerDe进行序列化和反序列化。该行为可以通过配置参数spark.sql.hive.convertMetastoreParquet进行控制,默认true。
这里从表schema的处理角度而言,就必须注意Hive和Parquet兼容性,主要有两个区别:
1.Hive是大小写敏感的,但Parquet相反
2.Hive会将所有列视为nullable,但是nullability在parquet里有独特的意义
由于上面的原因,在将Hive metastore parquet转化为Spark SQL parquet时,需要兼容处理一下Hive和Parquet的schema,即需要对二者的结构进行一致化。主要处理规则是:
1.有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持Parquet侧的数据类型,这样就可以处理到nullability类型了(空值问题)
2.兼容处理的schema应只包含在Hive元数据里的schema信息,主要体现在以下两个方面:
(1)只出现在Parquet schema的字段会被忽略
(2)只出现在Hive元数据里的字段将会被视为nullable,并处理到兼容后的schema中
关于schema(或者说元数据metastore),Spark SQL在处理Parquet表时,同样为了更好的性能,会缓存Parquet的元数据信息。此时,如果我们直接通过Hive或者其他工具对该Parquet表进行修改导致了元数据的变化,那么Spark SQL缓存的元数据并不能同步更新,此时需要手动刷新Spark SQL缓存的元数据,来确保元数据的一致性,方式如下:
// 第一种方式应用的比较多
1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")
2. sparkSession.catalog.refreshByPath(s"${path}")
最后说一下最近后台小伙伴在生产中遇到的一个问题,大家如果在业务处理中遇到类似的问题,提供一个思路。
在说问题之前首先了解一个参数spark.sql.parquet.writeLegacyFormat(默认false)的作用:
设置为true时,数据会以Spark1.4和更早的版本的格式写入。比如decimal类型的值会被以Apache Parquet的fixed-length byte array格式写出,该格式是其他系统例如Hive、Impala等使用的。
设置为false时,会使用parquet的新版格式。例如,decimals会以int-based格式写出。如果Spark SQL要以Parquet输出并且结果会被不支持新格式的其他系统使用的话,需要设置为true。
比如,对于decimal数据类型的兼容处理,不设置true时,经常会报类似如下的错误:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
...
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
...
此时我们需要将spark.sql.parquet.writeLegacyFormat设置为true来解决上述的异常问题。
但如果同时设置spark.sql.hive.convertMetastoreParquet为false时,要注意一些数据类型以及精度的处理,比如对于decimal类型的处理。通过一个例子复原一下当时的场景:
1.创建Hive外部表testdb.test_decimal,其中字段fee_rate为decimal(10,10)
CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING ,
`fee_rate` DECIMAL(10,10))
PARTITIONED BY (`dt` STRING )
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES ( 'serialization.format' = '1' )
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://hadoop/data/test_decimal'
TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;
2.将testdb.item中的数据处理后保存到testdb.test_decimal中
// 这里为了展示方便,直接查询testdb.item中的数据
// 注意: 字段fee_rate的类型为decimal(10,6)
select no, fee_rate from testdb.item where dt=20190528;
// testdb.item中数据示例如下
+-------------------+----------------+
| no| fee_rate|
+-------------------+----------------+
| 1| 0.000000|
| 2| 0.000000|
| 3| 0.000000|
+-------------------+----------------+
3.将testdb.item中的数据保存到testdb.test_decimal中
// tmp是上述查询testdb.item获得的临时表
// 以parquet格式保存到test_decimal的20200529分区中
save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`;
msck repair TABLE testdb.item;
上述1-3都能成功执行,数据也能保存到testdb.test_decimal中,但是当查询testdb.test_decimal中的数据时,比如执行sql:
select * from testdb.test_decimal where dt = 20200529;
会报如下空指针的异常:
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)
at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443)
at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
...
究其原因是因为按照上述两个参数的配置,testdb.item中fee_rate字段类型为decimal(10,6),数据为0.000000,经过一系列处理0.000000最终会被处理为0,看下边最终导致空指针异常的部分,就会一目了然。
public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) {
if (bd == null) {
return null;
} else {
bd = trim(bd);
if (bd.scale() > maxScale) {
bd = bd.setScale(maxScale, RoundingMode.HALF_UP);
}
// testdb.test_decimal中fee_rate的类型decimal(10,10),即precision为10,scale也为10
// 对应这里即maxPrecision和maxScale分别为10,则maxIntDigits为0
int maxIntDigits = maxPrecision - maxScale;
// bd对应0。对于0而言,precision为1,scale为0
// 处理之后 intDigits为1
int intDigits = bd.precision() - bd.scale();
return intDigits > maxIntDigits ? null : bd;
}
}
解决办法也很简单,就是将testdb.test_decimal中的fee_rate数据类型和依赖的表testdb.item中的fee_rate保持完全一致,即也为decimal(10,6)。
这个现象在实际应用环境中经常遇到,通用的解决办法就是将要保存的表中的数据类型与依赖的表(物理表或者临时表)的字段类型保持完全一致。
- 浅谈国外航空发动机大数据应用
- asp.net mvc SelectList 的selected 失效及解决方案
- 类极客公园火箭发射“返回顶部”jQuery效果(WordPress代码教程)
- 利用ganymed-ssh2远程执行其它Linux机器上的shell命令
- 浅析软件开发的3个层次
- MSBUILD 命令行编译的时候请注意msbuild文件名称或路经中空格导致出错
- Python可以用来做什么?
- 使用View Model从表现层分离领域模型
- 代码实现WordPress点击进入随机一篇文章的方法
- UE4新手编程之创建空白关卡和添加碰撞体
- Office Web Apps
- hadoop: hive 1.2.0 在mac机上的安装与配置
- ASP.NET 5运行时升级到Beta5
- WordPress免插件仅代码实现“返回顶部、返回底部、评论”效果(样式二)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 教你如何修改Linux远程登录欢迎提示信息
- 详解linux 定时任务 crontabs 安装及使用方法
- 解决Centos7安装nginx后提示“Welcome to nginx on Fedora!”,conf.d目录下无default.conf文件
- 详解Linux中PostgreSQL和PostGIS的安装和使用
- 检测ip和port是否可连接的方法
- Linux关机时执行指定脚本功能实现
- 适用于稀疏的嵌入、独热编码数据的损失函数回顾和PyTorch实现
- CentOS7下实现终端输入中文设置详解
- CentOS 7.2搭建VNC远程桌面服务的方法
- Ubuntu挂载3T硬盘或大于2T磁盘的方法
- linux chroot命令详解
- Linux中如何查询运行文件的全路径的方法
- 基于可变自动编码器(VAE)的生成建模,理解可变自动编码器背后的原理
- Centos7上网及添加静态IP方法介绍
- 解决yum安装报错Protected multilib versions的问题