Kafka入门

时间:2019-06-15
本文章向大家介绍Kafka入门,主要包括Kafka入门使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。(百度百科)

1、启动

1.1、下载Kafka文件

下载:kafka_2.12-2.2.0.tgz

解压:tar -xzf kafka_2.12-2.2.0.tgz –C /usr/local/

cd /usr/local/kafka_2.12-2.2.0

Kafka目录:

1.2、单例

启动zookeeper服务:

修改配置文件:config/zookeeper.properties

mkdir -p /var/zookeeper

数据目录

dataDir=/var/zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties&

启动Kafka服务:

修改配置文件:config/ server.properties

mkdir -p /var/kafka/logs

服务地址

listeners=PLAINTEXT://192.168.229.13:9092

日志目录

log.dirs=/var/kafka/logs

bin/kafka-server-start.sh config/server.properties&

创建与查看主题:

bin/kafka-topics.sh --create --bootstrap-server 192.168.229.13:9092 --replication-factor 1 --partitions 1 --topic test

 bin/kafka-topics.sh --list --bootstrap-server 192.168.229.13:9092

启动生产者,发送信息:

bin/kafka-console-producer.sh --broker-list 192.168.229.13:9092 --topic test

 启动消费者,接收信息:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.229.13:9092 --topic test --from-beginning

1.3、集群

1.3.1、配置

序号

服务编号

日志目录

服务地址

1

broker.id=0

og.dirs=/var/kafka/logs

listeners=PLAINTEXT:// 192.168.229.13:9092

2

broker.id=1

og.dirs=/var/kafka/logs-1

listeners=PLAINTEXT:// 192.168.229.13:9093

3

broker.id=2

og.dirs=/var/kafka/logs-2

listeners=PLAINTEXT:// 192.168.229.13:9094

mkdir -p /var/kafka/logs-1

mkdir -p /var/kafka/logs-2

复制修改配置文件:

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties 

1.3.2、启动

命令启动:

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties&

bin/kafka-server-start.sh config/server-1.properties&

bin/kafka-server-start.sh config/server-2.properties&

查看服务:

创建与查看主题(3副本,1分区):

bin/kafka-topics.sh --create --bootstrap-server 192.168.229.13:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic 

bin/kafka-topics.sh --describe --bootstrap-server 192.168.229.13:9092 --topic my-replicated-topic

说明:

①Leader: 0  对应着server.properties配置文件broker.id=0

②Replicas:可作为主节点的Kafka服务编号(与配置文件中broker.id一一对应)

启动生产者,发送信息:

bin/kafka-console-producer.sh --broker-list 192.168.229.13:9092 --topic my-replicated-topic

启动消费者,接收信息:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.229.13:9092 --topic my-replicated-topic --from-beginning

1.3.3、集群容错验证:

杀掉一个服务:

kill -9 3468

生产者

消费者

查看主题信息

再次启动broker.id=1

bin/kafka-server-start.sh config/server-1.properties&

2、Java连接

Kafka

package ha;

import org.junit.runner.RunWith;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@SpringBootTest

public class BaseTest {}

2.1、Kafka生产者

package ha.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.KafkaException;

import org.apache.kafka.common.errors.AuthorizationException;

import org.apache.kafka.common.errors.OutOfOrderSequenceException;

import org.apache.kafka.common.errors.ProducerFencedException;

import org.apache.kafka.common.serialization.StringSerializer;

import org.junit.Before;

import org.junit.Test;

import ha.BaseTest;

/**

 * Kafka生产者

 * @author wangym

 *

 */

public class KafkaProducerTest extends BaseTest {

       Properties props = new Properties();

       @Before

       public void initKafka() {

              props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

              props.put("acks", "all");

              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       }

       // 发送信息

       @Test

       public void test1() {

              Producer<String, String> producer = new KafkaProducer<String, String>(props);

              long start = System.currentTimeMillis();

              for (int i = 0; i < 1000000; i++) {

                     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

              }

              producer.close();

              long end = System.currentTimeMillis();

              long time = end - start;

              System.out.println("共计耗时:" + time + "ms");

       }

       @Test

       public void test2() {

              props.put("transactional.id", "my-transactional-id");

              Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer());

              producer.initTransactions();// 开启事务

              long start = System.currentTimeMillis();

              try {

                     producer.beginTransaction();

                     for (int i = 0; i < 1000000; i++) {

                            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

                     }

                     producer.commitTransaction();// 提交事务

              } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

                     // We can't recover from these exceptions, so our only option is to close the

                     // producer and exit.

                     producer.close();

              } catch (KafkaException e) {

                     // For all other exceptions, just abort the transaction and try again.

                     producer.abortTransaction();

              }

              producer.close();

              long end = System.currentTimeMillis();

              long time = end - start;

              System.out.println("共计耗时:" + time + "ms");

       }

}

2.2、Kafka消费者

package ha.kafka;

import java.time.Duration;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collections;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import org.junit.Before;

import org.junit.Test;

import ha.BaseTest;

/**

 * Kafka消费者

 * @author wangym

 *

 */

public class KafkaConsumerTest extends BaseTest {

       Properties props = new Properties();

       KafkaConsumer<String, String> consumer = null;

       @Before

       public void initKafka() {

              props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

              props.setProperty("group.id", "test");

              props.setProperty("enable.auto.commit", "true");

              props.setProperty("auto.commit.interval.ms", "1000");

              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       }

       /**

        *

        * 发送信息 自动提交

        */

       @SuppressWarnings("resource")

       @Test

       public void test1() {

              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

              consumer.subscribe(Arrays.asList("my-topic"));

              while (true) {

                     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                     for (ConsumerRecord<String, String> record : records)

                            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

              }

       }

       /**

        *

        * 发送信息 手动提交

        */

       @Test

       public void test2() {

              props.setProperty("enable.auto.commit", "false");

              consumer = new KafkaConsumer<String, String>(props);

              consumer.subscribe(Arrays.asList("foo", "bar"));

              final int minBatchSize = 200;

              List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

              while (true) {

                     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                     for (ConsumerRecord<String, String> record : records) {

                            buffer.add(record);

                     }

                     if (buffer.size() >= minBatchSize) {

                            // insertIntoDb(buffer);

                            consumer.commitSync();// 手动提交

                            buffer.clear();

                     }

              }

       }

       /**

        *

        * 遍历分区 手动提交

        */

       @Test

       public void test3() {

              try {

                     consumer = new KafkaConsumer<String, String>(props);

                     while (true) {

                            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                            for (TopicPartition partition : records.partitions()) {

                                   List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                                   for (ConsumerRecord<String, String> record : partitionRecords) {

                                          System.out.println(record.offset() + ":" + record.value());

                                   }

                                   long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                                   consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));

                            }

                     }

              } finally {

                     consumer.close();

              }

       }

}

2.3、Kafka流

流的生产者:

package ha.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.KafkaException;

import org.apache.kafka.common.errors.AuthorizationException;

import org.apache.kafka.common.errors.OutOfOrderSequenceException;

import org.apache.kafka.common.errors.ProducerFencedException;

import org.apache.kafka.common.serialization.StringSerializer;

import org.junit.Before;

import org.junit.Test;

import ha.BaseTest;

/**

 * Kafka生产者

 * @author wangym

 *

 */

public class KafkaStreamsProducerTest extends BaseTest {

       Properties props = new Properties();

       @Before

       public void initKafka() {

              props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

              props.put("acks", "all");

              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       }

       /**

        * 流测试-生产者

        * 自动提交

        */

       @Test

       public void streamProducerTest() {

              Producer<String, String> producer = new KafkaProducer<String, String>(props);

              long start = System.currentTimeMillis();

              for (int i = 0; i < 1000000; i++) {

                     producer.send(new ProducerRecord<String, String>("my-stream-input-topic", Integer.toString(i), Integer.toString(i)));

              }

              producer.close();

              long end = System.currentTimeMillis();

              long time = end - start;

              System.out.println("共计耗时:" + time + "ms");

       }

}

流处理:

package ha.kafka;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.junit.Before;

import org.junit.Test;

import ha.BaseTest;

/**

 * Kafka流测试

 * @author wangym

 *

 */

public class KafkaStreamsTest extends BaseTest {

       Properties props = new Properties();

       KafkaConsumer<String, String> consumer = null;

       @Before

       public void initKafka() {

               props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");

               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       }

       /**

        * 流测试

        */

       @SuppressWarnings("resource")

       @Test

       public void test1() {

               StreamsBuilder builder = new StreamsBuilder();

               builder.<String, String>stream("my-stream-input-topic")

               .mapValues(value -> {

                   System.out.println(value);

                   return value;

               })

               .to("my-stream-output-topic");

               KafkaStreams streams = new KafkaStreams(builder.build(), props);

               streams.start();

               while(true) {

               }

       }

}

流消费者:

package ha.kafka;

import java.time.Duration;

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.junit.Before;

import org.junit.Test;

import ha.BaseTest;

/**

 * Kafka消费者

 * @author wangym

 *

 */

public class KafkaStreamsConsumerTest extends BaseTest {

       Properties props = new Properties();

       KafkaConsumer<String, String> consumer = null;

       @Before

       public void initKafka() {

              props.put("bootstrap.servers", "192.168.229.13:9092,192.168.229.13:9093,192.168.229.13:9094");

              props.setProperty("group.id", "test");

              props.setProperty("enable.auto.commit", "true");

              props.setProperty("auto.commit.interval.ms", "1000");

              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       }

       /**

        * 流测试-消费者

        * 自动提交

        */

       @SuppressWarnings("resource")

       @Test

       public void streamConsumerTest() {

              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

              consumer.subscribe(Arrays.asList("my-stream-output-topic"));

              while (true) {

                     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                     for (ConsumerRecord<String, String> record : records)

                            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

              }

       }

}

原文地址:https://www.cnblogs.com/wangymd/p/11028555.html