0604-6.1.0-如何使用StreamSets实时采集指定数据目录文件并写入库Kudu
温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github: https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1
文档编写目的
Fayson在前面写过多篇StreamSets的文章,本篇文章主要介绍通过StreamSets实时的方式读取本地的数据文件,通过解析处理将文件中的内容写入到Kudu中。在进行本篇文章学习前你还需要了解:
《如何在CDH中安装和使用StreamSets》
- 内容概述
1.测试环境准备
2.准备测试数据
3.配置StreamSets
4.流程测试及数据验证
- 测试环境
1.RedHat7.4
2.CM和CDH版本为6.1.0
3.Kudu 1.8.0
2
测试环境准备
1.通过Hue使用Impala创建一个Kudu表,创建脚本如下:
CREATE TABLE user_info_kudu (
id STRING COMPRESSION snappy,
name STRING COMPRESSION snappy,
sex STRING COMPRESSION snappy,
city STRING COMPRESSION snappy,
occupation STRING COMPRESSION snappy,
mobile_phone_num STRING COMPRESSION snappy,
fix_phone_num STRING COMPRESSION snappy,
bank_name STRING COMPRESSION snappy,
address STRING COMPRESSION snappy,
marriage STRING COMPRESSION snappy,
child_num INT COMPRESSION snappy,
PRIMARY KEY (id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='master,hadoop13'
);
在创建Kudu表的时候增加了kudu.master的配置参数,如果Impala中未集成kudu则需要增加该参数,集成方式如下:
2.准备测试数据文件
[root@hadoop13 data]# cat user_infoaa.txt
411025200708151236,濮敬才,1,竹山县,生产工作、运输工作和部分体力劳动者,13702734056,15103111241,广州银行48,台东东二路21号-20-7,0,2
653000199408254560,人思巧,0,怀化,商业工作人员,15305590235,15306212544,广州银行17,台东东二路21号-20-7,0,0
500000195305076075,詹致,1,商丘,企事业单位的负责人,13507721161,15105419035,广州银行81,台东东二路21号-20-2,0,4
130522198207211990,和东,1,阳泉,商业工作人员,13205104083,13105301541,广州银行6,台东东二路21号-2-9,0,0
准备了两个数据文件共100条测试数据,数据的id是唯一的。
3.在StreamSets服务所在节点上创建一个/data1/tmp的数据目录,用于配置StreamSets的采集目录
3
创建Pipline
1.登录StreamSets,创建一个directory2kudu的Pipline
2.在Pipline流程中添加Directory作为源并配置基础信息
3.配置Kafka相关信息,如Broker、ZK及Topic
配置采集的数据目录及文件读取方式
配置数据格式化方式,由于数据文件是以“,”分割因此选择CSV方式
Root Field Type选择为List,为会每行数据转化成List<Map<String, String>>格式的数据。
4.配置数据解析模块,这里选择使用“JavaScript Evaluator”
在JavaScript配置项选择处理数据的方式为Batch by Batch
配置数据解析代码,在Script配置项增加如下代码片段
for(var i = 0; i < records.length; i++) {
try {
var info = records[i];
var newRecord = sdcFunctions.createRecord(true);
var userInfoMap = sdcFunctions.createMap(true);
userInfoMap.id = info.value[0]['value'];
userInfoMap.name = info.value[1]['value'];
userInfoMap.sex = info.value[2]['value'];
userInfoMap.city = info.value[3]['value'];
userInfoMap.occupation = info.value[4]['value'];
userInfoMap.tel = info.value[5]['value'];
userInfoMap.fixPhoneNum = info.value[5]['value'];
userInfoMap.bankName = info.value[7]['value'];
userInfoMap.address = info.value[8]['value'];
userInfoMap.marriage = info.value[9]['value'];
userInfoMap.childNum = info.value[10]['value'];
newRecord.value = userInfoMap;
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
5.添加Kudu模块及配置基本信息
6.配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割
Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀
Field to Column Mapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。
Default Opertation:设置操作类型如:insert、upsert、delete
4
流程测试验证
1.启动的directory2kudu,启动成功如下图显示
2.向/data1/tmp目录下拷贝一个准备好的数据文件
可以看到Pipline监控数据的变化,采集到50条数据
user_info_kudu表数据显示有50条记录
3.再次向/data1/tmp目录拷贝一个数据文件
可以看到Pipline监控数据的变化,采集到100条数据
user_info_kudu表数据显示有100条记录
入库的数据总条数
5
总结
1.通过StreamSets可以方便的监听指定的数据目录进行数据采集,可以在Directory模块上配置文件的过滤规则、采集频率以及数据的格式化方式。
2.StreamSets的Directory模块会将数据文件的数据以行为单位解析传输,通过List或着Map的方式封装
3.通过Process提供的JavaScript Evaluator模块来进行数据解析转换为能Kudu接收大数据格式。
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP实现字符串大小写转函数的功能实例
- 在django中实现choices字段获取对应字段值
- python语言中有算法吗
- jQuery ajax+PHP实现的级联下拉列表框功能示例
- PHP+mysql防止SQL注入的方法小结
- 浅谈tensorflow 中的图片读取和裁剪方式
- Python实现SMTP邮件发送
- 浅谈多卡服务器下隐藏部分 GPU 和 TensorFlow 的显存使用设置
- 通过PHP设置BugFree获取邮箱通知
- Django封装交互接口代码
- 使用K.function()调试keras操作
- tensorflow图像裁剪进行数据增强操作
- ThinkPHP3.2.3框架Memcache缓存使用方法实例总结
- Python+PyQt5+MySQL实现天气管理系统
- 浅谈Python协程