5分钟Flink - 自定义Data Sink
时间:2022-07-24
本文章向大家介绍5分钟Flink - 自定义Data Sink,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
文章内容
继承上一篇Source源是MySQL的思路,本文想要想要将数据Sink到MySQL
那咱们本文的基本思路是,先把数据生产至Kafka,然后将Kafka中的数据Sink到MySQL,这么一条流下来,不断的往Kafka生产数据,不断的往MySQL插入数据
代码版本
Flink : 1.10.0 Scala : 2.12.6
下面图中是Flink1.10.0版本官网给出的可以sink的组件,大家可以自寻查看
1. 准备pom依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.12</artifactId>
<version>1.10.0</version>
</dependency>
2. 创建MySQL表:
CREATE TABLE `person` (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
name varchar(260) NOT NULL DEFAULT '' COMMENT '姓名',
age int(11) unsigned NOT NULL DEFAULT '0' COMMENT '年龄',
sex tinyint(2) unsigned NOT NULL DEFAULT '2' COMMENT '0:女, 1男',
email text COMMENT '邮箱',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='人员定义';
3. 准备Person bean
package com.tech.bean
import scala.beans.BeanProperty
class Person() {
@BeanProperty var id:Int = 0
@BeanProperty var name:String = _
@BeanProperty var age:Int = 0
@BeanProperty var sex:Int = 2
@BeanProperty var email:String = _
}
4. 工具类 - 向Kafka生产数据
package com.tech.util
import java.util.Properties
import com.google.gson.Gson
import com.tech.bean.Person
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
/**
* 创建 topic:
* kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person
*
* 消费数据:
* kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person --from-beginning
*/
object ProduceToKafkaUtil {
final val broker_list: String = "localhost:9092"
final val topic = "person"
def produceMessageToKafka(): Unit = {
val writeProps = new Properties()
writeProps.setProperty("bootstrap.servers", broker_list)
writeProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
writeProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](writeProps)
for (i <- 1 to 10000) {
val person: Person = new Person()
person.setId(i)
person.setName("Johngo" + i)
person.setAge(10 + i)
person.setSex(i%2)
person.setEmail("Johngo" + i + "@flink.com")
val record = new ProducerRecord[String, String](topic, null, null, new Gson().toJson(person))
producer.send(record)
println("SendMessageToKafka: " + new Gson().toJson(person))
Thread.sleep(3000)
}
producer.flush()
}
def main(args: Array[String]): Unit = {
this.produceMessageToKafka()
}
}
可以通过下面kafka语句消费进行测试是否写入了kafka
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person --from-beginning
将程序启动,终端消费数据看看
说明可以往Kafka生产数据了
5. SinkToMySQL - 自定义Sink到MySQL
继承RichSinkFunction,进行自定义Sink的开发
文件名:RichSinkFunctionToMySQL.scala
package com.tech.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.tech.bean.Person
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
class RichSinkFunctionToMySQL extends RichSinkFunction[Person]{
var isRUNNING: Boolean = true
var ps: PreparedStatement = null
var conn: Connection = null
// 建立连接
def getConnection():Connection = {
var conn: Connection = null
val DB_URL: String = "jdbc:mysql://localhost:3306/flinkData?useUnicode=true&characterEncoding=UTF-8"
val USER: String = "root"
val PASS: String = "admin123"
try{
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(DB_URL, USER, PASS)
} catch {
case _: Throwable => println("due to the connect error then exit!")
}
conn
}
/**
* open()初始化建立和 MySQL 的连接
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = getConnection()
val sql: String = "insert into Person(id, name, password, age) values(?, ?, ?, ?);"
ps = this.conn.prepareStatement(sql)
}
/**
* 组装数据,进行数据的插入操作
* 对每条数据的插入都要调用invoke()方法
*
* @param value
*/
override def invoke(value: Person): Unit = {
ps.setInt(1, value.getId())
ps.setString(2, value.getName())
ps.setInt(3, value.getAge())
ps.setInt(4, value.getSex())
ps.setString(5, value.getEmail())
ps.executeUpdate()
}
override def close(): Unit = {
if (conn != null) {
conn.close()
}
if(ps != null) {
ps.close()
}
}
}
6. Flink程序调用起来
文件名:FromKafkaToMySQL.scala
package com.tech.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import com.google.gson.Gson
import com.tech.bean.Person
import com.tech.sink.RichSinkFunctionToMySQL
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
object FromKafkaToMySQL {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
val brokerList: String = "localhost:9092"
val topic = "person"
val gs: Gson = new Gson
val props = new Properties()
props.setProperty("bootstrap.servers", brokerList)
props.setProperty("group.id", "person")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), props)
val stream = env.addSource(consumer).map(
line => {
print("line" + line + "n")
JSON.parseObject(line, classOf[Person])
})
stream.addSink(new RichSinkFunctionToMySQL())
env.execute("FromKafkaToMySQL")
}
}
控制台能打印出来了
再看MySQL中的数据
现在可以看到MySQL中的数据也出现了,至此也就完成了SinkToMySQL的方案
作者:Johngo
- 【Scikit-Learn 中文文档】分解成分中的信号(矩阵分解问题) - 无监督学习 - 用户指南 | ApacheCN
- 区块链技术在非能源领域的应用场景
- Python读书笔记8
- How to debug .NET Core RC2 app with Visual Studio Code on Windows?
- 难道.NET Core到R2连中文编码都不支持吗?
- .NET Core RC2发布在即,我们试着用记事本编写一个ASP.NET Core RC2 MVC程序
- matplotlib的基本用法(四)——设置legend图例
- TensorFlow深度学习笔记 文本与序列的深度模型
- 究竟哪里安全?加拿大VS中国治安大数据起底!意料之中还是之外?
- muduo网络库学习之EventLoop(四):EventLoopThread 类、EventLoopThreadPool 类
- 从小程序游戏开放可以看出,微信已经开始转移小程序战场了
- 17年AI在游戏中完胜人类,AlphaGo的下一个目标是什么?DeepMind有一个45年超越人类计划
- PHP常用的正则表达式
- ASP.NET Core管道深度剖析(3):管道是如何处理HTTP请求的?
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- CentOS环境下安装Redis3.0及phpredis扩展测试示例
- 使用Apache commons-cli包进行命令行参数解析的示例代码
- 详解如何在Ubuntu 16.04上增加Swap分区
- Mac本地文件上传到CentOS云服务器方法
- linux中把.c的文件编译成.so文件
- Ubuntu16.04 中 locate文件查找命令
- Ubuntu 16.04与Apache虚拟主机配置的步骤详解
- Linux删除目录下的文件的10种方法小结
- 利用Linux防火墙隔离本地欺骗地址的方法详解
- 视图在SQL中的作用是什么,它是怎样工作的?
- Linux命令行上如何使用日历详解
- 在Linux下修改和重置root密码的方法(超简单)
- 在Centos上搭建Maven中央仓库的方法
- 详解Ubuntu16.04启动器图标异常解决方法
- Linux(ubuntu)下实现增加/删除文件权限