Hudi自带工具DeltaStreamer的实时入湖最佳实践
摘要:本文介绍如何使用Hudi自带入湖工具DeltaStreamer进行数据的实时入湖。
本文分享自华为云社区《华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践》,作者: 晋红轻 。
背景
传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用sqoop定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
然而实时同步从一开始就面临如下几个挑战:
- 小文件问题。不论是spark的microbatch模式,还是flink的逐条处理模式,每次写入HDFS时都是几MB甚至几十KB的文件。长时间下来产生的大量小文件,会对HDFS namenode产生巨大的压力。
- 对update操作的支持。HDFS系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。
- 事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序commit操作时一次性写入HDFS,当程序rollback时,已写入或部分写入的数据能随之删除。
Hudi就是针对以上问题的解决方案之一。使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启–enable-hive-sync 即可同步数据到hive表。
Hudi DeltaStreamer写入工具介绍
DeltaStreamer工具使用参考 https://hudi.apache.org/cn/docs/writing_data.html
HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
- 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
- 支持json、avro或自定义记录类型的传入数据
- 管理检查点,回滚和恢复
- 利用DFS或Confluent schema注册表的Avro模式。
- 支持自定义转换操作
场景说明
- 生产库数据通过CDC工具(debezium)实时录入到MRS集群中Kafka的指定topic里。
- 通过Hudi提供的DeltaStreamer工具,读取Kafka指定topic里的数据并解析处理。
- 同时使用DeltaStreamer工具将处理后的数据写入到MRS集群的hive里。
样例数据简介
生产库MySQL原始数据:
CDC工具debezium简介
对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/
完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息
增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);
对应kafka消息体:
更改操作: UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;
对应kafka消息体:
删除操作: delete from hudi.hudisource3 where uid=11;
对应kafka消息体:
调试步骤
华为MRS Hudi样例工程获取
根据实际MRS版本登录github获取样例代码: https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0
打开工程SparkOnHudiJavaExample
样例代码修改及介绍
1.debeziumJsonParser
说明:对debezium的消息体进行解析,获取到op字段。
源码如下:
package com.huawei.bigdata.hudi.examples; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; public class debeziumJsonParser { public static String getOP(String message){ JSONObject json_obj = JSON.parseObject(message); String op = json_obj.getJSONObject("payload").get("op").toString(); return op; } }
2.MyJsonKafkaSource
说明:DeltaStreamer默认使用org.apache.hudi.utilities.sources.JsonKafkaSource消费kafka指定topic的数据,如果消费阶段涉及数据的解析操作,则需要重写MyJsonKafkaSource进行处理。
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; import java.util.Map; /** * Read json kafka data. */ public class MyJsonKafkaSource extends JsonSource { private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class); private final KafkaOffsetGen offsetGen; private final HoodieDeltaStreamerMetrics metrics; public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(properties, sparkContext, sparkSession, schemaProvider); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder(); this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build()); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); offsetGen = new KafkaOffsetGen(properties); } @Override protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD<String> newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{ //过滤空行和脏数据 String msg = (String)x.value(); if (msg == null) { return false; } try{ String op = debeziumJsonParser.getOP(msg); }catch (Exception e){ return false; } return true; }).map((x) -> { //将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小 String msg = (String)x.value(); String op = debeziumJsonParser.getOP(msg); JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField); Boolean is_delete = false; String out_str = ""; Object out_obj = new Object(); if(op.equals("c")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else if(op.equals("u")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else { is_delete = true; out_obj = json_obj.getJSONObject("payload").get("before"); } Map out_map = (Map)out_obj; out_map.put("_hoodie_is_deleted",is_delete); out_map.put("op",op); return out_map.toString(); }); } }
3.TransformerExample
说明: 入湖hudi表或者hive表时候需要指定的字段
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.transform.Transformer; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * 功能描述 * 对获取的数据进行format */ public class TransformerExample implements Transformer, Serializable { /** * format data * * @param JavaSparkContext jsc * @param SparkSession sparkSession * @param Dataset<Row> rowDataset * @param TypedProperties properties * @return Dataset<Row> */ @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList = new ArrayList<>(); for (Row row : rowJavaRdd.collect()) { Row one_row = buildRow(row); rowList.add(one_row); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; } private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true)); fields.add(DataTypes.createStructField("op", DataTypes.StringType, true)); } private Row buildRow(Row row) { Integer uid = row.getInt(0); String uname = row.getString(1); String age = row.getString(2); String sex = row.getString(3); String mostlike = row.getString(4); String lastview = row.getString(5); String totalcost = row.getString(6); Boolean _hoodie_is_deleted = row.getBoolean(7); String op = row.getString(8); Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op); return returnRow; } }
4.DataSchemaProviderExample
说明: 分别指定MyJsonKafkaSource返回的数据格式为source schema,TransformerExample写入的数据格式为target schema
以下是源码
package com.huawei.bigdata.hudi.examples; import org.apache.avro.Schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; /** * 功能描述 * 提供sorce和target的schema */ public class DataSchemaProviderExample extends SchemaProvider { public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } /** * source schema * * @return Schema */ @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; } /** * target schema * * @return Schema */ @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; } }
将工程打包(hudi-security-examples-0.7.0.jar)以及json解析包(fastjson-1.2.4.jar)上传至MRS客户端
DeltaStreamer启动命令
登录客户端执行一下命令获取环境变量以及认证
source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env
DeltaStreamer启动命令如下:
spark-submit --master yarn-client \ --jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \ --driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \ --target-base-path /tmp/huditest/delta_demo2 \ --table-type COPY_ON_WRITE \ --target-table delta_demo2 \ --source-ordering-field uid \ --source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \ --schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \ --transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \ --enable-hive-sync --continuous
kafka.properties配置
// hudi配置 hoodie.datasource.write.recordkey.field=uid hoodie.datasource.write.partitionpath.field= hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.datasource.write.hive_style_partitioning=true hoodie.delete.shuffle.parallelism=10 hoodie.upsert.shuffle.parallelism=10 hoodie.bulkinsert.shuffle.parallelism=10 hoodie.insert.shuffle.parallelism=10 hoodie.finalize.write.parallelism=10 hoodie.cleaner.parallelism=10 hoodie.datasource.write.precombine.field=uid hoodie.base.path = /tmp/huditest/delta_demo2 hoodie.timeline.layout.version = 1 // hive config hoodie.datasource.hive_sync.table=delta_demo2 hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor hoodie.datasource.hive_sync.use_jdbc=false // Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudisource // checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/ // Kafka props bootstrap.servers=172.16.9.117:21005 auto.offset.reset=earliest group.id=a5 offset.rang.limit=10000
注意:kafka服务端配置 allow.everyone.if.no.acl.found 为true
使用Spark查询
spark-shell --master yarn val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select * from hudi_ro_table").show()
Mysql增加操作对应spark中hudi表查询结果:
Mysql更新操作对应spark中hudi表查询结果:
删除操作:
使用Hive查询
beeline select * from delta_demo2;
Mysql增加操作对应hive表中查询结果:
Mysql更新操作对应hive表中查询结果:
Mysql删除操作对应hive表中查询结果:
原文地址:https://www.cnblogs.com/huaweiyun/p/15089471.html
- mybatis 使用tips - 使用多个参数
- 从高的角度看自动化测试
- Django中请求的生命周期
- 程序猿python学习AIphaZero,TensorFlow强化学习AI游戏,100行代码运行看看!
- awk中NF的使用
- tar.gz 解压
- Python&机器学习之项目实践
- JAVA CDI 学习(5) - 如何向RESTFul Service中注入EJB实例
- mysql5.7 column cannot be null
- 区块链大热 价值近20万的Matrix.io被启用
- 比特币科普之什么是区块高度?
- 如何正确并快速理解MapReduce
- mysqldump的简单使用
- mac:在当前文件夹打开terminal终端
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- android 使用okhttp可能引发OOM的一个点
- 到底有几个进程在运行
- Android中butterknife的使用与自动化查找组件插件详解
- Android生成条形码和二维码功能
- gettext msgfmt安装及po/mo文件编译转换
- Android实现底部带刻度的进度条样式
- Java 实现 Kafka Producer
- Android短信验证服务分享
- android 震动和提示音的实现代码
- 一个简单的Android圆弧刷新动画
- AccessibilityService实现微信发红包功能
- 处理一次k8s、calico无法分配podIP的心路历程
- Android自定义控件实现时钟效果
- Android倒计时控件 Splash界面5秒自动跳转
- Android仿抖音上下滑动布局