Flink写出数据到HBase的Sink实现
时间:2022-07-23
本文章向大家介绍Flink写出数据到HBase的Sink实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
文章目录
- 一、MyHbaseSink
- 1、继承RichSinkFunction<输入的数据类型>类
- 2、实现open方法,创建连接对象
- 3、实现invoke方法,批次写入数据到Hbase
- 4、实现close方法,关闭连接
- 二、HBaseUtil工具类
- 1、继承RichSinkFunction<输入的数据类型>类
- 2、实现open方法,创建连接对象
- 3、实现invoke方法,批次写入数据到Hbase
- 4、实现close方法,关闭连接
一、MyHbaseSink
1、继承RichSinkFunction<输入的数据类型>类
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {
private transient Integer maxSize = 1000;
private transient Long delayTime = 5000L;
public MyHbaseSink() {
}
public MyHbaseSink(Integer maxSize, Long delayTime) {
this.maxSize = maxSize;
this.delayTime = delayTime;
}
private transient Connection connection;
private transient Long lastInvokeTime;
private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
// 创建连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取全局配置文件,并转为ParameterTool
ParameterTool params =
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//创建一个Hbase的连接
connection = HBaseUtil.getConnection(
params.getRequired("hbase.zookeeper.quorum"),
params.getInt("hbase.zookeeper.property.clientPort", 2181)
);
// 获取系统当前时间
lastInvokeTime = System.currentTimeMillis();
}
3、实现invoke方法,批次写入数据到Hbase
@Override
public void invoke(Tuple2<String, Double> value, Context context) throws Exception {
String rk = value.f0;
//创建put对象,并赋rk值
Put put = new Put(rk.getBytes());
// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());
puts.add(put);// 添加put对象到list集合
//使用ProcessingTime
long currentTime = System.currentTimeMillis();
//开始批次提交数据
if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
//获取一个Hbase表
Table table = connection.getTable(TableName.valueOf("database:table"));
table.put(puts);//批次提交
puts.clear();
lastInvokeTime = currentTime;
table.close();
}
}
4、实现close方法,关闭连接
@Override
public void close() throws Exception {
connection.close();
}
二、HBaseUtil工具类
- Hbase的工具类,用来创建Hbase的Connection
public class HBaseUtil {
/**
* @param zkQuorum zookeeper地址,多个要用逗号分隔
* @param port zookeeper端口号
* @return connection
*/
public static Connection getConnection(String zkQuorum, int port) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", port + "");
Connection connection = ConnectionFactory.createConnection(conf);
return connection;
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 手写一个JDK1.7的简版HashMap
- MySQL存储过程创建与使用
- 一些有意思的JavaScript代码片段
- Flutter 完成全平台制霸:实现 Windows 应用支持
- python包:urllib——使用urllib下载无限制链接图片
- 初探 TensorFlow.js
- 如何使用 Apache Directory Studio 连接 JumpCloud
- 0812-5.16.2-如何获取CDSW上提交Spark作业的真实用户
- GLMM:广义线性混合模型(遗传参数评估)
- 特征锦囊:今天一起搞懂机器学习里的L1与L2正则化
- 【一天一大 lee】二叉搜索树的最近公共祖先 (难度:简单) - Day2020092
- Spring多数据源事务如何玩? | Spring系列46篇
- 使用Mfuzz包做时间序列分析
- 网络安全 | 瑞哥带你全方位解读防火墙技术!
- 【SpringBoot DB 系列】Jooq 之新增记录使用姿势