String boot with Apache kafka 完整的发布订阅例子
时间:2022-05-03
本文章向大家介绍String boot with Apache kafka 完整的发布订阅例子,主要内容包括本文节选自电子书《Netkiller Java 手札》地址 http://www.netkiller.cn/、5.21.7. 完整的发布订阅实例、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
本文节选自电子书《Netkiller Java 手札》地址 http://www.netkiller.cn/
5.21.7. 完整的发布订阅实例
上面的例子仅仅是做了一个热身,现在我们将实现 一个完整的例子。
例 5.5. Spring boot with Apache kafka.
SpringApplication
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
//import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
//import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
// @EnableMongoRepositories
// @EnableJpaRepositories
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Consumer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import cn.netkiller.kafka.consumer.Consumer;
@Configuration
@EnableKafka
public class ConsumerConfiguration {
public ConsumerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> consumerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Consumer receiver() {
return new Consumer();
}
}
Producer configuration
package cn.netkiller.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import cn.netkiller.kafka.producer.Producer;
@Configuration
public class ProducerConfiguration {
public ProducerConfiguration() {
// TODO Auto-generated constructor stub
}
@Bean
public Map<String, Object> producerConfigs() {
HashMap<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public Producer sender() {
return new Producer();
}
}
Consumer
package cn.netkiller.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
public class Consumer {
public Consumer() {
// TODO Auto-generated constructor stub
}
private static final Logger logger = LoggerFactory
.getLogger(Consumer.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "helloworld.t")
public void receiveMessage(String message) {
logger.info("received message='{}'", message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Producer
package cn.netkiller.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
/*
* public Sender() { // TODO Auto-generated constructor stub }
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// you can register a callback with the listener to receive the result
// of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("unable to send message='{}'", message, ex);
}
});
// alternatively, to block the sending thread, to await the result,
// invoke the future’s get() method
}
}
Controller
package cn.netkiller.web;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@Controller
@RequestMapping("/test")
public class KafkaTestController {
private static final Logger logger = LoggerFactory.getLogger(IndexController.class);
public KafkaTestController() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@RequestMapping("/ping")
@ResponseBody
public String ping() {
String message = "PONG";
return message;
}
@RequestMapping("/kafka/send")
@ResponseBody
public String testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
logger.info(receiver.getLatch().getCount() + "");
return "OK";
}
}
SpringBootTest
package cn.netkiller;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import cn.netkiller.kafka.consumer.Consumer;
import cn.netkiller.kafka.producer.Producer;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTests {
public SpringKafkaApplicationTests() {
// TODO Auto-generated constructor stub
}
@Autowired
private Producer sender;
@Autowired
private Consumer receiver;
@Test
public void testReceiver() throws Exception {
sender.sendMessage("helloworld.t", "Hello Spring Kafka!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
- 使用redis构建可靠分布式锁
- 提高服务器程序性能的一些方法
- A+B for Matrices 及 C++ transform的用法
- socket读写返回值的处理
- 记录服务上线一年来的点点滴滴
- 实现两个N*N矩阵的乘法,矩阵由一维数组表示
- C++实现线程安全的单例模式
- 实现两个N*N矩阵的乘法,矩阵由一维数组表示
- 一步一步实现读写锁
- 二维数组的查找
- 从I/O复用谈epoll为什么高效
- C++ STL算法系列2---find ,find_first_of , find_if , adjacent_find的使用
- 2017企业安全技术热词有哪些?
- Raft协议实战之Redis Sentinel的选举Leader源码解析
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法