Kafka及Spring Cloud Stream

时间:2022-07-24
本文章向大家介绍Kafka及Spring Cloud Stream,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

安装

下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect

在kafka server端 config/server.properties中设置

必须要配置:

advertised.listeners=PLAINTEXT://192.168.3.201:9092 # 公布访问地址和端口

启动kafka

bin/kafka-server-start.sh ../config/server.properties

检测是否启动

netstat -tunlp | egrep " (2181|9092)"

或 lsof -i:9092

测试发送信息和消费消息

创建主题

./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 - topic test

生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费者

./kafkaconsole-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

如果想在外部使用kafka必须 9092 端口加入到防火墙列表

firewall-cmd --list-ports 查询所有放行端口 firewall-cmd --add-port=9092/tcp # 临时端口放行 firewall-cmd --add-port=9092/tcp --permanent # 永久放行 firewall-cmd --reload # 重新载入放行列表

简单API的应用

引入依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

编写生成者

package com.example.springkafka.api;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @Date: 2018/11/6 20:25
 * @Description: 生产者
 */
public class KafkaProducerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.3.221:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        String topic = "message"; // 主题
        Integer partition = 0; // 指定分区
        long timeMillis = System.currentTimeMillis(); // 毫秒值 15分钟
        String key = "key-message"; // key
        String value = "value-message"; // value
        // 创建ProducerRecord
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value);
        // 生产消息
        kafkaProducer.send(producerRecord);
        kafkaProducer.close();
    }
}

编写消费者

package com.example.springkafka.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @Date: 2018/11/6 20:25
 * @Description: 消费者
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.3.221:9092");
        properties.setProperty("group.id", "group-1");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        // 创建kafka的消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 订阅kafka主题
        kafkaConsumer.subscribe(Arrays.asList("message"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("========offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value());
        }
    }
}

spring kafka

依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

生成者与消费者配置

# 生成者配置
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 192.168.3.221:9092
    consumer: # 消费者
      group-id: gerry-1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  topic: gerry

生成者代码

package com.example.springcloudkafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Date: 2018/11/6 21:03
 * @Description:
 */
@RestController
public class KafkaProducerController {
    public final KafkaTemplate<String, String> kafkaTemplate;
    private final String topic;

    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping("message/send") // 这种方式只支持post
    public boolean sendMessage(@RequestParam String message) {
        kafkaTemplate.send(topic,message);

        return true;
    }
}

消费者代码

package com.example.springcloudkafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Date: 2018/11/6 21:20
 * @Description:
 */
@Component
public class KafkaConsumerListener {

    @KafkaListener(topics={"${kafka.topic}"})
    public void getMessage(String message) {
        System.out.println("kafka 消费者监听,接收到消息:" + message);
    }
}

Spring Cloud Stream

官方定义三个接口 Source=> 发送者 Producer、Publisher Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce

Spring Cloud Stream Binder: Kafka

引入依赖:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

配置:

# 生成者配置
spring:
  kafka:
    bootstrap-servers: 192.168.3.221:9092
  cloud:
    stream:
      bindings:
        output:
          destination: ${kafka.topic}
        input:
          destination: ${kafka.topic}
kafka:
  topic: cloud-stream

生产者:

package com.example.springcloudstreamkafkademo.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    /**
     * 发送信息
     * @param message
     */
    public void send(String message) {
        // 通过消息管道发送消息
        // messageChannel.send(MessageBuilder.withPayload(message).build());
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}

消费者

package com.example.springcloudstreamkafkademo.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@EnableBinding(value={Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)
    private SubscribableChannel subscribableChannel;

    //1、 当subscribableChannel注入完成后完成回调
    @PostConstruct
    public void init() {
        subscribableChannel.subscribe(message->{
            System.out.println(message.getPayload());
        });
    }
    // 2、@ServiceActivator
    @ServiceActivator(inputChannel=Sink.INPUT)
    public void message(String message) {
        System.out.println("@ServiceActivator:"+message);
    }
    //3、@StreamListener
    @StreamListener(Sink.INPUT)
    public void onMessage(String message) {
        System.out.println("@StreamListener:"+message);
    }
}

Spring Cloud Stream Binder: RabbitMQ

引入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: ${rabbit.queue}
        input:
          destination: ${rabbit.queue}
  rabbitmq:
    host: 192.168.3.221
    port: 5672
    username: rabbit
    password: rabbit
rabbit:
  queue: cloud-stream-queue

代码同kafka

完整代码详见:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20