SparkStreaming读Kafka数据写Kudu

时间:2022-06-02
本文章向大家介绍SparkStreaming读Kafka数据写Kudu,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson介绍过《SparkStreaming读HBase写HDFS》及《SparkingStreaming读Kafka写Kudu》,本篇文章Fayson主要介绍使用Scala语言开发一个SparkStreaming应用读取Kafka数据并写入Kudu。本文的数据流图如下:

  • 内容概述

1.环境准备

2.编写SparkSteaming代码读取Kafka数据并写入Kudu

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1.集群已安装Kafka

2.集群已安装Kudu且正常运行

3.集群未启用Kerberos

2.环境准备


1.编写向Kafka生成数据的ReadUserInfoFIleToKafka.java代码,具体内容可以在Fayson的GitHub上查看

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadUserInfoFIleToKafka.java
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

2.使用mvn命令将编写好的代码编译打包封装成脚本

mvn clean package

使用mvn命令将工程依赖包导出到lib目录

mvn dependency:copy-dependencies -DoutputDirectory=/Users/fayson/Desktop/lib

(可左右滑动)

编写run.sh脚本

#!/bin/bash

#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_kudu_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadUserInfoFIleToKafka $read_file

(可左右滑动)

准备测试数据ods_user_600.txt

封装好的脚本目录结构如下:

将打包好的jar包拷贝至lib目录下。

3. 创建用于测试的Kafka Topic

kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_kudu_topic

(可左右滑动)

4. 通过CM配置SparkKudu依赖包kudu-spark_2.10-1.4.0-cdh5.12.1.jar,将依赖包部署至CDH集群所有节点的/opt/cloudera/parcels/CDH/jars目录,然后通过CM配置Spark GateWay的spark-env.sh配置

export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/jars/kudu-spark_2.10-1.4.0-cdh5.12.1.jar

(可左右滑动)

还需要依赖spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar包,保存并重新部署客户端配置。

3.编写SparkStreaming写Kudu示例


1.使用Maven创建Scala工程,工程依赖pom文件

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-assembly_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark_2.10</artifactId>
    <version>1.4.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.4.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

2.在resource目录下创建0285.properties文件,内容如下

kafka.brokers=cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.topics=kafka_sparkstreaming_kudu_topic
kudumaster.list=cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051

(可左右滑动)

3. 新建Kafka2Spark2Kudu.scala文件,示例代码如下:

package com.cloudera.streaming
import java.io.{File, FileInputStream}
import scala.collection.JavaConversions._
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.StringUtils
import org.apache.kudu.client.CreateTableOptions
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.types._
import org.apache.log4j.{Level, Logger}
import scala.util.parsing.json.JSON
/**
  * package: com.cloudera.streaming
  * describe: SparkStreaming 应用实时读取Kafka数据,解析后存入Kudu
  * 使用spark-submit的方式提交作业
    spark-submit --class com.cloudera.streaming.Kafka2Spark2Kudu 
    --master yarn-client --num-executors 3 --driver-memory 1g 
    --driver-cores 1 --executor-memory 1g --executor-cores 1 
    spark-demo-1.0-SNAPSHOT.jar
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/5/28
  * creat_time: 上午10:09
  * 公众号:Hadoop实操
  */
object Kafka2Spark2Kudu {
  Logger.getLogger("org").setLevel(Level.ERROR) //设置日志级别
  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0285.properties"
  /**
    * 建表Schema定义
    */
  val userInfoSchema = StructType(
      //         col name   type     nullable?
      StructField("id", StringType , false) ::
      StructField("name" , StringType, true ) ::
      StructField("sex" , StringType, true ) ::
      StructField("city" , StringType, true ) ::
      StructField("occupation" , StringType, true ) ::
      StructField("tel" , StringType, true ) ::
      StructField("fixPhoneNum" , StringType, true ) ::
      StructField("bankName" , StringType, true ) ::
      StructField("address" , StringType, true ) ::
      StructField("marriage" , StringType, true ) ::
      StructField("childNum", StringType , true ) :: Nil
  )
  /**
    * 定义一个UserInfo对象
    */
  case class UserInfo (
    id: String,
    name: String,
    sex: String,
    city: String,
    occupation: String,
    tel: String,
    fixPhoneNum: String,
    bankName: String,
    address: String,
    marriage: String,
    childNum: String
  )
  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Kafka2Spark2Kudu.getClass.getClassLoader.getResource("0285.properties"))
      val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0285.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }
    val brokers = properties.getProperty("kafka.brokers")
    val topics = properties.getProperty("kafka.topics")
    val kuduMaster = properties.getProperty("kudumaster.list")
    println("kafka.brokers:" + brokers)
    println("kafka.topics:" + topics)
    println("kudu.master:" + kuduMaster)
    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(kuduMaster)) {
      println("未配置Kafka和KuduMaster信息")
      System.exit(0)
    }
    val topicsSet = topics.split(",").toSet
    val sparkConf = new SparkConf().setAppName("Kafka2Spark2Kudu")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    //引入隐式
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val kuduContext = new KuduContext(kuduMaster, sc)
    //判断表是否存在
    if(!kuduContext.tableExists("user_info")) {
      println("create Kudu Table :{user_info}")
      val createTableOptions = new CreateTableOptions()
      createTableOptions.addHashPartitions(List("id"), 8).setNumReplicas(3)
      kuduContext.createTable("user_info", userInfoSchema, Seq("id"), createTableOptions)
    }
    dStream.foreachRDD(rdd => {
      //将rdd数据重新封装为Rdd[UserInfo]
      val newrdd = rdd.map(line => {
        val jsonObj =  JSON.parseFull(line._2)
        val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
        new UserInfo(
          map.get("id").get.asInstanceOf[String],
          map.get("name").get.asInstanceOf[String],
          map.get("sex").get.asInstanceOf[String],
          map.get("city").get.asInstanceOf[String],
          map.get("occupation").get.asInstanceOf[String],
          map.get("mobile_phone_num").get.asInstanceOf[String],
          map.get("fix_phone_num").get.asInstanceOf[String],
          map.get("bank_name").get.asInstanceOf[String],
          map.get("address").get.asInstanceOf[String],
          map.get("marriage").get.asInstanceOf[String],
          map.get("child_num").get.asInstanceOf[String]
        )
      })
      //将RDD转换为DataFrame
      val userinfoDF = sqlContext.createDataFrame(newrdd)
      kuduContext.upsertRows(userinfoDF, "user_info")
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

(可左右滑动)

3. 使用mvn命令将编写好的SparkStreaming代码打包,注意由于工程中有scala代码在编译是命令中需要加scala:compile

mvn clean scala:compile package

(可左右滑动)

4.流程测试


1. 将编译好的SparkStreaming应用Jar包上传至有Spark Gateway节点的服务器上

conf/0285.properties内容如下:

2. 使用spark-submit命令提交SparkStreaming作业

spark-submit --class com.cloudera.streaming.Kafka2Spark2Kudu 
  --master yarn-client --num-executors 3 --driver-memory 1g 
  --driver-cores 1 --executor-memory 1g --executor-cores 1 
  spark-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看SparkStreaming作业是否正常运行

Yarn的8088界面查看

3. Spark作业启动成功后自动创建Kudu的user_info表

进入表明显页面,找到Impala建表语句

CREATE EXTERNAL TABLE `user_info` STORED AS KUDU
TBLPROPERTIES(
  'kudu.table_name' = 'user_info',
  'kudu.master_addresses' = 'cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051');

(可左右滑动)

在Hue中执行建表

表中无数据

4. 运行脚本向Kafka生产数据

[root@cdh01 0283-kafka-shell]# cd /root/0283-kafka-shell
[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

5. 通过Hue查看Kudu的user_info表数据

Kafka的数据已成功的录入到Kudu的user_info表中

统计写入的数据量为600,与写入Kafka的数据一致

5.总结


1.由于Spark中默认没有Spark-Streaming-Kafka的依赖包,需要将相应的依赖包添加到/opt/cloudera/parcels/CDH/jars目录下,然后在spark-env.sh中配置相应的依赖包路径,否则会报类找不到的异常。

2.访问Kudu使用的是kudu-spark_2.10-1.4.0-cdh5.12.1.jar,需要将该依赖包添加到/opt/cloudera/parcels/CDH/jars目录下,并配置spark-env.sh的环境变量

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/streaming/Kafka2Spark2Kudu.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操