Storm读取Kafka数据是如何实现的
时间:2022-05-06
本文章向大家介绍Storm读取Kafka数据是如何实现的,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
问题导读 1.本文基于什么版本? 2.Storm读取Kafka数据是如何实现的? 3.实现一个Kafka Spout有哪两种方式? Strom整合Kafka版本信息
Storm与Kafka的版本信息:
- Storm:apache-storm-0.9.2-incubating
- Kafka:kafka_2.9.2-0.8.1.1.tgz
Strom从Kafka中读取数据本质
实现Storm读取Kafka中的数据,参考官网介绍, 本部分主要参考自storm-kafka的README。
Strom从Kafka中读取数据,本质:实现一个Storm中的Spout,来读取Kafka中的数据;这个Spout,可以称为Kafka Spout。实现一个Kafka Spout有两条路:
- core storm spout;
- Trident spout;
无论用哪种方式实现Kafka Spout,都分为两步走:
- 实现BrokerHost接口:用于记录Kafka broker host与partition之间的映射关系;具体两种实现方式:
- ZkHosts类:从zookeeper中动态的获取kafka broker与partition之间的映射关系;初始化时,需要配置zookeeper的ip:port;默认,每60s从zookeeper中请求一次映射关系;
- StaticHosts类:当broker–partition之间的映射关系是静态时,常使用此方法;
- 继承KafkaConfig类:用于存储Kafka相关的参数;将上面实例的BrokerHost对象,作为参数传入KafkaConfig,例,Kafka的一个构造方法为KafkaConfig(BrokerHosts hosts, String topic);当前其实现方式有两个:
- SpoutConfig:Core KafkaSpout只接受此配置方式;
- TridentKafkaConfig:TridentKafkaEmitter只接受此配置方式;
KafkaConfig类中涉及到的配置参数默认值如下:
[Bash shell] 纯文本查看 复制代码
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
上面的MultiScheme类型的参数shceme,其负责:将Kafka中取出的byte[]转换为storm所需的tuple,这是一个扩展点,默认是原文输出。两种实现:SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme可将读取的byte[]转换为String。
notes(ningg):几个疑问,列在下面了
- ZkHosts类的一个构造方法ZkHosts(String brokerZkStr, String brokerZkPath),其中brokerZkPath的含义,原始给出的说法是:”rokerZkPath is the root directory under which all the topics and partition information is stored. by Default this is /brokers which is what default kafka implementation uses.”
- SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id),其中,zkRoot是一个root目录,用于存储consumer的offset;那这个zkRoot对应的目录物理上在哪台机器?
配置实例Core Kafka Spout
本质是设置一个读取Kafka中数据的Kafka Spout,然后,将从替换原始local mode下,topology中的Spout即可。下面是一个已经验证过的实例
[Bash shell] 纯文本查看 复制代码
TopologyBuilder builder = new TopologyBuilder();
BrokerHosts hosts = new ZkHosts("121.7.2.12:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "ningg", "/" + "ningg", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set Spout.
builder.setSpout("word", kafkaSpout, 3);
builder.setBolt("result", new ExclamationBolt(), 3).shuffleGrouping("word");
Config conf = new Config();
conf.setDebug(true);
// submit topology in local mode
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Trident Kafka Spout(todo) todo 下面的样例并还没验证:
[Bash shell] 纯文本查看 复制代码
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
- Golang中巧用defer进行错误处理
- Unity应用架构设计(10)——绕不开的协程和多线程(Part 2)
- 厚土Go学习笔记 | 07. 基本类型
- Unity应用架构设计(11)——一个网络层的构建
- Go语言·Web调优详解
- Unity应用架构设计(12)——AOP思想的实践
- 厚土Go学习笔记 | 26. 函数闭包
- 学会代码执行函数,让老哥带你勇闯天涯!
- ASP.NET Core知多少(6):VS Code联调Angular + .NetCore
- 线程安全知多少
- Parallel线程安全问题
- 厚土Go学习笔记 | 25. 函数值 函数是函数也是值
- GoStub框架使用指南
- 厚土Go学习笔记 | 24. map字典测试用例(文字出现次数统计)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- redis集群搭建
- redis慢查询日志,php安装redis扩展,redis存储session,redis主从配置
- jQuery介绍与常见选择器的使用
- redis常用操作,redis操作键值,redis安全设置
- redis介绍,redis安装,redis持久化,redis数据类型
- AJAX的post请求与上传文件
- memcached的一些简单使用
- nosql介绍,memrcached介绍,安装memcached,查看memcachedq状态
- 如何在IDEA2017创建Maven的Web工程
- JSP上传文件与导出Excel表
- 关于CentOS中tomcat的8005端口启动不起来的解决办法
- 安装ansible以及简单使用
- 设计模式之职责链
- 转录组分析 | 使用SAMtools将SAM文件转换为BAM文件、排序、建立索引
- Matlab系列之那些数学函数(讨论功能已加入)