说明:本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener
首先,application.properties中配置用逗号隔开的多个topic。
方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)
运行程序,console打印的效果如下:
因为只开了一条消费者线程,所以所有的topic和分区都分配给这条线程。
如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数
concurrency
的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和)
运行程序,console打印的效果如下:
总结一下大家问的最多的一个问题:
如何在程序运行的过程中,改变topic,消费者能够消费修改后的topic?
ans:
经过尝试,使用@KafkaListener注解实现不了此需求,在程序启动的时候,程序就会根据@KafkaListener的注解信息初始化好消费者去消费指定好的topic。如果在程序运行的过程中,修改topic,不会让此消费者修改消费者的配置再重新订阅topic的。
不过我们可以有个折中的办法,就是利用@KafkaListener的topicPattern参数来进行topic匹配。
具体如何操作的可以看下这位老哥的blog:
https://blog.csdn.net/songzehao/article/details/103091486
终极方法
:
-
不使用@KafkaListener,使用kafka原生客户端依赖,手动初始化消费者,开启消费者线程。
-
在消费者线程中,每次循环都从配置、数据库或者其他配置源获取最新的topic信息,与之前的topic比较,如果发生变化,重新订阅topic或者初始化消费者。
-
加入kafka客户端依赖(本次测试服务端kafka版本:2.12-2.4.0)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
-
代码
@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {
* 消费者
private static KafkaConsumer<String, String> consumer;
* topic
private List<String> topicList;
public static String getNewTopic() {
try {
return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
} catch (IOException e) {
e.printStackTrace();
return null;
* 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
* @param topicList
* @return
public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.9.185:9092");
//必须指定消费者组
props.put("group.id", "haha");
//设置数据key和value的序列化处理类
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
//创建消息者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic的消息
consumer.subscribe(topicList);
return consumer;
* 开启消费者线程
* 异常请自己根据需求自己处理
@Override
public void afterPropertiesSet() {
// 初始化topic
topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
consumer = getInitConsumer(topicList);
// 开启一个消费者线程
new Thread(() -> {
while (true) {
// 模拟从配置源中获取最新的topic(字符串,逗号隔开)
final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
// 如果topic发生变化
if (!topicList.equals(newTopic)) {
log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
// method one:重新订阅topic:
topicList = newTopic;
consumer.subscribe(newTopic);
// method two:关闭原来的消费者,重新初始化一个消费者
//consumer.close();
//topicList = newTopic;
//consumer = getInitConsumer(newTopic);
continue;
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("key:" + record.key() + "" + ",value:" + record.value());
}).start();
说一下第72行代码:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上面这行代码表示:在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回。
在修改topic后,必须等到此次poll拉取的消息处理完,while(true)循环的时候检测topic发生变化,才能重新订阅topic.
poll()方法一次拉取得消息数默认为:500,如下图,kafka客户端源码中设置的。
如果想自定义此配置,可在初始化消费者时加入
-
运行结果(测试的topic中都无数据)
注意:KafkaConsumer是线程不安全的,不要用一个KafkaConsumer实例开启多个消费者,要开启多个消费者,需要new 多个KafkaConsumer实例。
有什么问题欢迎提出来讨论和交流 q:281867465
Kafka
Listener
有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现
kafka
消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。
通常我们会把消费者监听的主题,消费者组名称,消费者组
中
消费者数量等常用信息做成自定义配置(而不是在代码
中
写死),如下所示:
下面的消费者监听器监听了两个
topic
:
topic
-a,
topic
-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。
三、
指定
Topic
分区
接到领导的一个需求,希望封装一下
kafka
的消费者,可以从配置读取
topic
进行消费;一开始首先想到的是用
java
kafka
的高阶api手工根据
topic
创建消费者,一个
topic
创建一个消费者,依赖zookeeper完成
kafka
内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。
调研了一下,手工实现类似sp...
使用@
Kafka
Listener
注解时,可以一个注解
指定
消费
多个
topic
topic
的
参数
需要是常量,我们可以用以下两种方法从配置
参数
中
获取
1:在配置文件
中
获取
//配置在yml文件或者properties文件
中
topic
s: "admin,login,client"
@
Kafka
Listener
(
topic
s = "#{'${
topic
s}'.split(',')}",concurrency = "#{'${
topic
s}'.split(',').length}")
2:在配置类
中
获取
本文是
SpringBoot
+
Kafka
的实战讲解,如果对
kafka
的架构原理还不了解的读者,建议先看一下《大白话
kafka
架构原理》、《秒懂
kafka
HA(高可用)》两篇文章。
一、生产者实践
普通生产者
带回调的生产者
自定义分区器
kafka
事务提交
二、消费者实践
指定
topic
、partition、offset消费
@
Kafka
Listener
(
topic
s = "#{'${
kafka
Topic
Name}'.split(',')}")
public void listeninstances(ConsumerRecord<?, ?> record){
logger.info("----------------- record =" + record);
Optional<?>
kafka
Message = Optional.ofNullable(record