Kafka 快速起步
主要内容: 1. kafka 安装、启动 2. 消息的 生产、消费 3. 配置启动集群 4. 集群下的容错测试 5. 从文件中导入数据,并导出到文件
单机示例
安装
tar -xzf kafka_2.10-0.10.1.1.tgz
cd kafka_2.10-0.10.1.1
启动
> bin/zookeeper-server-start.sh
config/zookeeper.properties
> bin/kafka-server-start.sh
config/server.properties
创建topic
打开一个新的终端窗口
bin/kafka-topics.sh --create
--zookeeper localhost:2181
--replication-factor 1
--partitions 1
--topic test
发送消息
打开一个新的终端窗口
bin/kafka-console-producer.sh
--broker-list localhost:9092
--topic test
进入输入模式,随意输入信息,例如:
hello world
hi
获取消息
打开一个新的终端窗口
bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic test
--from-beginning
便会显示出刚才发送的两条消息:
hello world
hi
这时可以打开发送消息的终端窗口,输入新的信息,再返回来就可以看到自动接收到了新消息
配置集群
新建两个启动配置文件
> cp config/server.properties
config/server-1.properties
> cp config/server.properties
config/server-2.properties
修改 config/server-1.properties 的以下几项配置:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=logs/kafka-logs-1
修改 config/server-2.properties 的以下几项配置:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=logs/kafka-logs-2
启动
> bin/kafka-server-start.sh
config/server-1.properties &
> bin/kafka-server-start.sh
config/server-2.properties &
创建一个topic,设置3个复制
bin/kafka-topics.sh --create
--zookeeper localhost:2181
--replication-factor 3
--partitions 1
--topic my-replicated-topic
发送消息
bin/kafka-console-producer.sh
--broker-list localhost:9092
--topic my-replicated-topic
输入消息:
my test message 1
my test message 2
获取消息
bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--from-beginning
--topic my-replicated-topic
可以正常取得消息
容错测试
# 取得server1的进程号
ps aux | grep server-1.properties
# 杀掉进程
kill -9 43116
读取消息
bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--from-beginning
--topic my-replicated-topic
返回信息:
my test message 1
my test message 2
仍然可以正常取得消息
Kafka Connect
Kafka 中的 connecter 可以与外部系统进行连接,例如文件系统、数据库
下面实验一个简单文件系统交互,从一个文件中导入数据,然后导出到另一个文件中
创建一个测试文件,用于导入数据使用
echo -e "foonbar" > test.txt
启动 connect,执行数据的导入导出
bin/connect-standalone.sh
config/connect-standalone.properties
config/connect-file-source.properties
config/connect-file-sink.properties
命令执行后,会输出一系列的日志信息,等待执行完毕
查看导出结果
cat test.sink.txt
返回结果:
foo
bar
成功导出了 test.txt 中的数据
过程分析
执行第2步的命令后,为什么是去读test.txt
?为什么写入了test.sink.txt
?中间的过程是什么样的?
原因是在于两个配置文件
config/connect-file-source.properties (导入配置)
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
file
指定了是从test.txt
中导入数据
topic
指定了把数据发送到connect-test
这个topic
connect-file-sink.properties(导出配置)
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
file
指定了把数据导出到test.txt
中导入数据
topic
指定从connect-test
这个topic中读取数据
查看一下connect-test
这个topic
bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic connect-test
--from-beginning
结果为:
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
现在向test.txt中添加一条新数据:
echo "Another line" >> test.txt
再次执行 cat test.sink.txt
就会看到刚刚添加的数据:
foo
bar
Another line
- java小技术之生成二维码
- java实现发送邮件服务器,SMTP协议发送邮件
- HttpURLConnection实现两个服务端的对接
- java获取properties配置文件值
- 安全退出app,activoty栈管理
- JavaBean转Map方法
- JsBridge实现JavaScript和Java的互相调用
- JAVA-FTP批量大文件传输
- 独家 | 一文读懂TensorFlow(附代码、学习资料)
- 解决openssh漏洞,升级openssh版本
- 解决NTPD漏洞,升级Ntpd版本
- 独家 | 手把手教TensorFlow(附代码)
- HBase Region自动切分细节
- eclipse搭建ssh后台
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android 天气APP(二十)增加欢迎页及白屏黑屏处理、展示世界国家/地区的城市数据
- Android 天气APP(二十二)改动些许UI、增加更多空气质量数据和生活建议数据展示
- Android 自定义View 之 RectF用法详解
- Android 天气APP(二十五)地图天气(下)嵌套滑动布局渲染天气数据
- Android 天气APP(二十六)增加自动更新(检查版本、通知栏下载、自动安装)
- Android 天气APP(二十七)增加地图天气的逐小时天气、太阳和月亮数据
- Android 天气APP(二十八)地图搜索定位
- DevEco Studio项目构建讲解、编写页面、布局介绍、页面跳转
- Android 天气APP(二十九)壁纸设置、图片查看、图片保存
- Chrome 私人珍藏-stylus插件实现个性化百度界面定制
- Python 基础篇-简单的异常捕获
- Python 技巧篇-让我的程序暂停一下
- Python+selenium 技术篇-浏览器后台运行
- Python 基础篇-python3安装pyHook和pywin32库
- 漫画:如何螺旋遍历二维数组?(修订版)