添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

使用@KafkaListener注解时,可以一个注解指定消费多个topic
topic的参数需要是常量,我们可以用以下两种方法从配置参数中获取

1:在配置文件中获取
//配置在yml文件或者properties文件中
topics: "admin,login,client"
@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
2:在配置类中获取
//创建一个类
@Component
public class TopicHandler {
    String [] KAFKA_TOPICS = {"admin","login","client"};
    //调用此方法获取topic参数,参数从哪来自己在方法里实现
    public String[] getTopics(){
        return KAFKA_TOPICS;
@KafkaListener(topics = "#{topicHandler.getTopics()}",concurrency = "#{topicHandler.getTopics().length}")

具体使用方式如下

//@KafkaListener(topics = "#{topicHandler.getTopics()}",concurrency = "#{topicHandler.getTopics().length}")
@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
public void logs(List<ConsumerRecord<?, ?>> records){
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.topic());//获取topic信息
        System.out.println(record);//获取record对象信息,包含除值外的信息
        System.out.println(record.value().toString());//实际需要的值数据

打印值如下

admin
ConsumerRecord(topic = admin, partition = 0, leaderEpoch = 2, offset = 136201, CreateTime = 1649400069032, serialized key size = -1, serialized value size = 1805, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"account":"admin","name":"管理员","sip":"168.168.168.131","method":"POST","operation":"角色分页查询","code":"002143901","@timestamp":"2022-04-08T06:40:43Z","@mtime":1649400043559})
{"account":"admin","name":"管理员",,"sip":"168.168.168.131","method":"POST","operation":"角色分页查询","code":"002143901","@timestamp":"2022-04-08T06:40:43Z","@mtime":1649400043559}
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependenc
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency..
				
       接到领导的一个需求,希望封装一下kafka消费者,可以从配置读取topic进行消费;一开始首先想到的是用java kafka的高阶api手工根据topic创建消费者,一个topic创建一个消费者,依赖zookeeper完成kafka内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。        调研了一下,手工实现类似sp...
springboot+kafka中@KafkaListener如何动态指定多个topic 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic。 运行程序,console打印的效果如下: 因为只开了一条消费者线程,所...
KafkaListener有若干的配置属性,这些配置属性使用或者是结合使用,可以方便快捷的帮助我们实现kafka消费者数据监听的需求。这里的属性比较多,先大概了解一下,后续我会介绍。 通常我们会把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示: 下面的消费者监听器监听了两个topictopic-a,topic-b(使用SpEL表达式逗号分割为字符串数组),该消费者组命名为group-demo,包含5个消费者线程并行消费。 三、指定Topic分区
一个消费者组可以消费多个topic,以前写过一篇一个消费消费一个topic的,这次的是一个消费者组通过直连方式消费多个topic,做了小测试,结果是正确的,通过查看zookeeper的客户端,zookeeper记录了偏移量 package day04 消费多个topic import kafka.common.TopicAndPartition import kafka.mess...
@KafkaListener是Spring Kafka提供的注解,用于标识一个方法作为消息监听器。你可以使用以下方式来使用@KafkaListener注解: 1. 在方法上直接标注@KafkaListener注解,并指定要监听的topic,例如: @KafkaListener(topics = "topic1") public void listen(ConsumerRecord<Integer, String> msg) { // 处理接收到的消息 2. 你还可以使用@KafkaListeners注解,它是@KafkaListener的容器注解,可以重复标注。这在处理多个topic时非常有用。例如: @KafkaListeners({@KafkaListener(topics = "topic1"), @KafkaListener(topics = "topic2")}) public void listen(ConsumerRecord<Integer, String> msg) { // 处理接收到的消息 3. 在Spring容器启动时,KafkaListenerAnnotationBeanPostProcessor会解析@KafkaListener注解,并为带有@KafkaListener注解的方法创建消息监听器。这个解析过程会在postProcessAfterInitialization方法中进行。<span class="em">1</span><span class="em">2</span><span class="em">3</span> #### 引用[.reference_title] - *1* *2* *3* [@KafkaListener 使用方式](https://blog.csdn.net/kwame211/article/details/107386782)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v93^chatsearchT3_2"}}] [.reference_item style="max-width: 100%"] [ .reference_list ]
"status": 415, "error": "Unsupported Media Type", "trace": "org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'application/msword' not supported\r\n\tat org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodArgumentResolver. [/code]