@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
public void logs(List<ConsumerRecord<?, ?>> records){
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.topic());
System.out.println(record);
System.out.println(record.value().toString());
打印值如下
admin
ConsumerRecord(topic = admin, partition = 0, leaderEpoch = 2, offset = 136201, CreateTime = 1649400069032, serialized key size = -1, serialized value size = 1805, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"account":"admin","name":"管理员","sip":"168.168.168.131","method":"POST","operation":"角色分页查询","code":"002143901","@timestamp":"2022-04-08T06:40:43Z","@mtime":1649400043559})
{"account":"admin","name":"管理员",,"sip":"168.168.168.131","method":"POST","operation":"角色分页查询","code":"002143901","@timestamp":"2022-04-08T06:40:43Z","@mtime":1649400043559}
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependenc
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency..
接到领导的一个需求,希望封装一下kafka的消费者,可以从配置读取topic进行消费;一开始首先想到的是用java kafka的高阶api手工根据topic创建消费者,一个topic创建一个消费者,依赖zookeeper完成kafka内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。
调研了一下,手工实现类似sp...
springboot+kafka中@KafkaListener如何动态指定多个topic
本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener
首先,application.properties中配置用逗号隔开的多个topic。
运行程序,console打印的效果如下:
因为只开了一条消费者线程,所...
KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。
通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:
下面的消费者监听器监听了两个topic:topic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。
三、指定Topic分区
一个消费者组可以消费多个topic,以前写过一篇一个消费者消费一个topic的,这次的是一个消费者组通过直连方式消费多个topic,做了小测试,结果是正确的,通过查看zookeeper的客户端,zookeeper记录了偏移量
package day04
消费多个topic
import kafka.common.TopicAndPartition
import kafka.mess...
@KafkaListener是Spring Kafka提供的注解,用于标识一个方法作为消息监听器。你可以使用以下方式来使用@KafkaListener注解:
1. 在方法上直接标注@KafkaListener注解,并指定要监听的topic,例如:
@KafkaListener(topics = "topic1")
public void listen(ConsumerRecord<Integer, String> msg) {
// 处理接收到的消息
2. 你还可以使用@KafkaListeners注解,它是@KafkaListener的容器注解,可以重复标注。这在处理多个topic时非常有用。例如:
@KafkaListeners({@KafkaListener(topics = "topic1"), @KafkaListener(topics = "topic2")})
public void listen(ConsumerRecord<Integer, String> msg) {
// 处理接收到的消息
3. 在Spring容器启动时,KafkaListenerAnnotationBeanPostProcessor会解析@KafkaListener注解,并为带有@KafkaListener注解的方法创建消息监听器。这个解析过程会在postProcessAfterInitialization方法中进行。<span class="em">1</span><span class="em">2</span><span class="em">3</span>
#### 引用[.reference_title]
- *1* *2* *3* [@KafkaListener 使用方式](https://blog.csdn.net/kwame211/article/details/107386782)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 100%"]
[ .reference_list ]
"status": 415,
"error": "Unsupported Media Type",
"trace": "org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'application/msword' not supported\r\n\tat org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodArgumentResolver.
[/code]