Kafka及Spring Cloud Stream
安装
下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect
在kafka server端 config/server.properties中设置
必须要配置:
advertised.listeners=PLAINTEXT://192.168.3.201:9092 # 公布访问地址和端口
启动kafka
bin/kafka-server-start.sh ../config/server.properties
检测是否启动
netstat -tunlp | egrep " (2181|9092)"
或 lsof -i:9092
测试发送信息和消费消息
创建主题
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 - topic test
生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
./kafkaconsole-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
如果想在外部使用kafka必须 9092 端口加入到防火墙列表
firewall-cmd --list-ports 查询所有放行端口 firewall-cmd --add-port=9092/tcp # 临时端口放行 firewall-cmd --add-port=9092/tcp --permanent # 永久放行 firewall-cmd --reload # 重新载入放行列表
简单API的应用
引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
编写生成者
package com.example.springkafka.api;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @Date: 2018/11/6 20:25
* @Description: 生产者
*/
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","192.168.3.221:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
String topic = "message"; // 主题
Integer partition = 0; // 指定分区
long timeMillis = System.currentTimeMillis(); // 毫秒值 15分钟
String key = "key-message"; // key
String value = "value-message"; // value
// 创建ProducerRecord
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value);
// 生产消息
kafkaProducer.send(producerRecord);
kafkaProducer.close();
}
}
编写消费者
package com.example.springkafka.api;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/**
* @Date: 2018/11/6 20:25
* @Description: 消费者
*/
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.3.221:9092");
properties.setProperty("group.id", "group-1");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
// 创建kafka的消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 订阅kafka主题
kafkaConsumer.subscribe(Arrays.asList("message"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("========offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
}
}
}
spring kafka
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
生成者与消费者配置
# 生成者配置
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 192.168.3.221:9092
consumer: # 消费者
group-id: gerry-1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
topic: gerry
生成者代码
package com.example.springcloudkafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Date: 2018/11/6 21:03
* @Description:
*/
@RestController
public class KafkaProducerController {
public final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic}") String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@PostMapping("message/send") // 这种方式只支持post
public boolean sendMessage(@RequestParam String message) {
kafkaTemplate.send(topic,message);
return true;
}
}
消费者代码
package com.example.springcloudkafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Date: 2018/11/6 21:20
* @Description:
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics={"${kafka.topic}"})
public void getMessage(String message) {
System.out.println("kafka 消费者监听,接收到消息:" + message);
}
}
Spring Cloud Stream
官方定义三个接口 Source=> 发送者 Producer、Publisher Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce
Spring Cloud Stream Binder: Kafka
引入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
配置:
# 生成者配置
spring:
kafka:
bootstrap-servers: 192.168.3.221:9092
cloud:
stream:
bindings:
output:
destination: ${kafka.topic}
input:
destination: ${kafka.topic}
kafka:
topic: cloud-stream
生产者:
package com.example.springcloudstreamkafkademo.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel messageChannel;
@Autowired
private Source source;
/**
* 发送信息
* @param message
*/
public void send(String message) {
// 通过消息管道发送消息
// messageChannel.send(MessageBuilder.withPayload(message).build());
source.output().send(MessageBuilder.withPayload(message).build());
}
}
消费者
package com.example.springcloudstreamkafkademo.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@EnableBinding(value={Sink.class})
public class MessageConsumerBean {
@Autowired
@Qualifier(Sink.INPUT)
private SubscribableChannel subscribableChannel;
//1、 当subscribableChannel注入完成后完成回调
@PostConstruct
public void init() {
subscribableChannel.subscribe(message->{
System.out.println(message.getPayload());
});
}
// 2、@ServiceActivator
@ServiceActivator(inputChannel=Sink.INPUT)
public void message(String message) {
System.out.println("@ServiceActivator:"+message);
}
//3、@StreamListener
@StreamListener(Sink.INPUT)
public void onMessage(String message) {
System.out.println("@StreamListener:"+message);
}
}
Spring Cloud Stream Binder: RabbitMQ
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
配置
spring:
cloud:
stream:
bindings:
output:
destination: ${rabbit.queue}
input:
destination: ${rabbit.queue}
rabbitmq:
host: 192.168.3.221
port: 5672
username: rabbit
password: rabbit
rabbit:
queue: cloud-stream-queue
代码同kafka
完整代码详见:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20
- shiro权限控制(一):shiro介绍以及整合SSM框架
- websocket(三) 进阶!netty框架实现websocket达到高并发
- Kafka源码系列之Broker的IO服务及业务处理
- Dubbo(五) Dubbo入门demo——helloworld
- Dubbo(四) Dubbo-Admin项目 Dubbo管理台
- volley请求原理
- Dubbo(三) 安装Zookeeper 单机-集群
- ASP.NET MVC Preview生命周期分析
- Dubbo(二) 认识Zookeeper
- Kafka源码系列之使用要点总结及重要错误解决
- Kafka源码系列之实现自己的kafka监控
- Kafka源码系列之副本同步机制及isr列表更新
- Kafka源码系列之topic创建分区分配及leader选举
- Kafka源码系列之如何删除topic
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 推荐算法之: LFM 推荐算法
- 推荐算法之: DeepFM及使用DeepCTR测试
- Cypress系列(61)- 断言最佳实践
- Cypress系列(62)- 改造 PageObject 模式
- CPU 执行程序的秘密,藏在了这 15 张图里
- Cypress系列(63)- 使用 Custom Commands
- Python字符串操作大全
- Cypress系列(64)- 数据驱动策略
- 别只会搜日志了,求你懂点原理吧
- Cypress系列(65)- 测试运行失败自动重试
- CentOS7下编译FFMPEG源代码
- Android 的 Presentation 双屏异显,遇到的问题总结
- 音视频相关开发库和资料
- Flink深入浅出: 应用部署与原理图解(v1.11)
- 用 Github Actions 在 K8S 中运行 CI 测试