5分钟Flink - 自定义Data Sink

时间:2022-07-24
本文章向大家介绍5分钟Flink - 自定义Data Sink,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

文章内容

继承上一篇Source源是MySQL的思路,本文想要想要将数据Sink到MySQL

那咱们本文的基本思路是,先把数据生产至Kafka,然后将Kafka中的数据Sink到MySQL,这么一条流下来,不断的往Kafka生产数据,不断的往MySQL插入数据

代码版本

Flink : 1.10.0 Scala : 2.12.6

下面图中是Flink1.10.0版本官网给出的可以sink的组件,大家可以自寻查看

1. 准备pom依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.15</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

2. 创建MySQL表:

CREATE TABLE `person` (
  id int(10) unsigned NOT NULL AUTO_INCREMENT,
  name varchar(260) NOT NULL DEFAULT '' COMMENT '姓名',
  age int(11) unsigned NOT NULL DEFAULT '0' COMMENT '年龄',
  sex tinyint(2) unsigned NOT NULL DEFAULT '2' COMMENT '0:女, 1男',
  email text COMMENT '邮箱',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='人员定义';

3. 准备Person bean

package com.tech.bean

import scala.beans.BeanProperty

class Person() {
  @BeanProperty var id:Int = 0
  @BeanProperty var name:String = _
  @BeanProperty var age:Int = 0
  @BeanProperty var sex:Int = 2
  @BeanProperty var email:String = _
}

4. 工具类 - 向Kafka生产数据

package com.tech.util

import java.util.Properties

import com.google.gson.Gson
import com.tech.bean.Person
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

/**
  * 创建 topic:
  * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person
  *
  * 消费数据:
  * kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person --from-beginning
  */
object ProduceToKafkaUtil {
  final val broker_list: String = "localhost:9092"
  final val topic = "person"

  def produceMessageToKafka(): Unit = {
    val writeProps = new Properties()
    writeProps.setProperty("bootstrap.servers", broker_list)
    writeProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    writeProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](writeProps)

    for (i <- 1 to 10000) {
      val person: Person = new Person()
      person.setId(i)
      person.setName("Johngo" + i)
      person.setAge(10 + i)
      person.setSex(i%2)
      person.setEmail("Johngo" + i + "@flink.com")
      val record = new ProducerRecord[String, String](topic, null, null,  new Gson().toJson(person))
      producer.send(record)
      println("SendMessageToKafka: " + new Gson().toJson(person))
      Thread.sleep(3000)
    }
    producer.flush()
  }

  def main(args: Array[String]): Unit = {
    this.produceMessageToKafka()
  }
}

可以通过下面kafka语句消费进行测试是否写入了kafka

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic person --from-beginning

将程序启动,终端消费数据看看

说明可以往Kafka生产数据了

5. SinkToMySQL - 自定义Sink到MySQL

继承RichSinkFunction,进行自定义Sink的开发

文件名:RichSinkFunctionToMySQL.scala

package com.tech.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.tech.bean.Person
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

class RichSinkFunctionToMySQL extends RichSinkFunction[Person]{
  var isRUNNING: Boolean = true
  var ps: PreparedStatement = null
  var conn: Connection = null

  // 建立连接
  def getConnection():Connection = {
    var conn: Connection = null
    val DB_URL: String = "jdbc:mysql://localhost:3306/flinkData?useUnicode=true&characterEncoding=UTF-8"
    val USER: String = "root"
    val PASS: String = "admin123"

    try{
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection(DB_URL, USER, PASS)
    } catch {
      case _: Throwable => println("due to the connect error then exit!")
    }
    conn
  }

  /**
    * open()初始化建立和 MySQL 的连接
    * @param parameters
    */
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = getConnection()
    val sql: String = "insert into Person(id, name, password, age) values(?, ?, ?, ?);"
    ps = this.conn.prepareStatement(sql)
  }

  /**
    * 组装数据,进行数据的插入操作
    * 对每条数据的插入都要调用invoke()方法
    *
    * @param value
    */
    override def invoke(value: Person): Unit = {
      ps.setInt(1, value.getId())
      ps.setString(2, value.getName())
      ps.setInt(3, value.getAge())
      ps.setInt(4, value.getSex())
      ps.setString(5, value.getEmail())
      ps.executeUpdate()
    }

  override def close(): Unit = {
    if (conn != null) {
      conn.close()
    }
    if(ps != null) {
      ps.close()
    }
  }
}

6. Flink程序调用起来

文件名:FromKafkaToMySQL.scala

package com.tech.flink

import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import com.google.gson.Gson
import com.tech.bean.Person
import com.tech.sink.RichSinkFunctionToMySQL
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object FromKafkaToMySQL {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())

    val brokerList: String = "localhost:9092"
    val topic = "person"
    val gs: Gson = new Gson

    val props = new Properties()
    props.setProperty("bootstrap.servers", brokerList)
    props.setProperty("group.id", "person")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")


    val consumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), props)
    val stream = env.addSource(consumer).map(
        line => {
        print("line" + line + "n")
        JSON.parseObject(line, classOf[Person])
    })

    stream.addSink(new RichSinkFunctionToMySQL())

    env.execute("FromKafkaToMySQL")
  }
}

控制台能打印出来了

再看MySQL中的数据

现在可以看到MySQL中的数据也出现了,至此也就完成了SinkToMySQL的方案

作者:Johngo