props.put("enable.auto.commit","true");//自动提交
props.put("auto.commit.interval.ms","5000");//offset间隔
props.put("max.poll.records",11);//单次拉取消息最大值
@KafkaListener(topics = {"test2"},groupId="group22")
public void consumeMsg(ConsumerRecord<?, ?> record) throws Exception{
System.out.println(new Date().toString()+":group22 "+record.toString());
Thread.sleep(30000);
在消费过程中消费者单次会拉取11条消息,每条消息耗时30s,11条消息耗时 5分钟30秒,由于max.poll.interval.ms 默认值5分钟,所以理论上消费者无法在5分钟内消费完,consumer会离开组,导致rebalance。
实际运行日志如下:
可以看到在消费完第11条消息后,因为消费时间超出max.poll.interval.ms 默认值5分钟,这时consumer已经离开消费组了,开始rebalance,因此提交offset失败。之后重新rebalance,消费者再次分配partition后,再次poll拉取消息依然从之前消费过的消息处开始消费,这样就造成重复消费。而且若不解决消费单次消费时间过长的问题,这部分消息可能会一直重复消费。
对于上述重复消费的场景,若不进行相应的处理,那么有可能造成一些线上问题。为了避免因重复消费导致的问题,以下提供了两种解决重复消费的思路:
1)提高消费能力:
提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。
2)引入单独去重机制:
例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。
此外,在一些消费场景中,我们可以将消费的接口幂等处理,例如数据库的查 询操作天然具有幂等性,这时候可不用考虑重复消费的问题。对于例如新增数据的操作,可通过设置唯一键等方式以达到单次与多次操作对系统的影响相同,从而使接口具有幂等性。
https://zhuanlan.zhihu.com/p/112745985
补充:对于自动提交,处理消息失败后如何补偿?
上面介绍了自动提交的时间间隔,以及每次拉取消息的数量参数。就拿上面的例子来做一个说明:
props.put("enable.auto.commit","true");//自动提交
props.put("auto.commit.interval.ms","30000");//offset间隔
props.put("max.poll.records",20);//单次拉取消息最大值
每次poll会拉取20条消息,每个消息消费1s,在第一次poll之后,下一次poll因为没有达到auto.commit.interval.ms=30s,所以不会提交offset。第二次poll时,已经经过40s,因此这次poll会提交之前两次消费的消息,offset增加40。也就是说只有在经过auto.commit.interval.ms间隔后,并且在下一次调用poll时才会提交所有 已消费消息的offset。
在第二次poll中,假设处理的消息有的失败了,由于自动提交机制,offset已经增加了40,所以无法再次消费失败的消息了。通常,我们会在代码中将消费失败的消息,再次发送到kafka中,或者记录到日志中,作为补偿。
Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅消费者者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。生产环境中消费者在消费消息的时候若不考虑消费者的相关特性可能会出现重复消费的问题。在讨论重复消费之前,首先来看一下kafka中跟消费者有关的几个重要配置参数。enable.auto.commit:默认值true,表示消费者会周期性自动提交消.
Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅消费者者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。生产环境中消费者在消费消息的时候若不考虑消费者的相关特性可能会出现重复消费的问题。
在讨论重复消费之前,首先来看一下kafka中跟消费者有关的几个重要...
@KafkaListener(id = "layer_test_consumer", topics = {"${kafka.consumer.topic.layerTestConfig}"},
groupId = "${kafka.consumer.group-id.layerTestConfig}", containerFactory = "batchContainerFactory", errorHandler = "consumerAwareListenerErrorHan..
最近有朋友在问我关于消费组rebalance的问题的时候有提到过引起rebalance的原因,其中一条就是partition leader切换会引起消费组的rebalance,一般来说大家经常提的原因有以下三个
1、成员数量发生变化,有成员加入组或者退组
2、订阅的topic发生变化
3、订阅的topicPartition发生变化
我也是第一次见人说partition leader切换会引起消费者Rebalance,于是从这个角度来分析一下是不是真的会发生......
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency..
问题描述:
在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。
原理解析:
上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。
系统采用Spring Cloud Stream框架集成Kafka来实现异步消息。
客户端消费某个topic消息出错时,会连续输出这个消息内容3次,同时没有提交offset。当有大量消息出错时,topic出现消息积压。
首先,我们知道实现异步消息的系统架构一般包含3个部分:生产者Producer、消息中间件Broker、消费者Consumer。其中,
消息中间件Broker,是...
kafkatool和启动都可以看生产和消费者的配置信息
同一个partition内的消息只能被同一个组中的一个consumer消费!!!! 当消费者数量多于partition的数量时,多余的消费者空闲 !!例如partition=4,则可在同一组中被最多4个consumer均衡消费。
一种是 RangeAssignor 分配策略(范围分区),另一种是 RoundRobinAssignor分配策略(轮询分区)。默认采用 Range 范围分区。 自定义分区策略
不同的消费组之间的消费者互不影响,但是同一个消费组
1. Consumer Group ID
Kafka 允许将多个消费者组成一个消费者组,每个消费者组都有一个唯一的 Consumer Group ID。同一个消费者组中的每个消费者都会消费相同的消息流,并且每个分区只能被该组中的一个消费者消费,因此可以通过 Consumer Group ID 来保证相同的消息只被同一个消费者组中的一个消费者处理。
2. Offset
在 Kafka 中,每个分区的每条消息都有一个唯一的 offset,消费者会保存自己消费的最后一个 offset,下次消费时从该 offset 开始消费。因此,消费者可以通过保存自己消费的 offset 来保证消息不被重复消费。
3. 事务
在 Kafka 0.11 版本以后,引入了事务功能,可以使得消费者在处理消息时保证事务的原子性和一致性。消费者可以通过事务来保证消息不被重复消费。
综上,通过 Consumer Group ID、Offset 和事务等方式,可以有效地避免 Kafka 中消息的重复消费问题。
jsondiffpatch介绍
qq_39113035:
java对list集合进行分页
旺来啦!:
asterisk、pbx、sip等基本概念
上海交大坑神: