kafka偏移量不更新、重复消费问题
最近负责的短信项目业务量增长较多,使用kafka异步发送短信的过程遇到了一些问题,在业务和线上环境上具体的表现就是:
对于发送短信这种 实时性 要求较高的项目,五分钟以前的请求未得到处理的话,基本就已经晚了。不管短信内容是数分钟有效的验证码,还是运维的监控告警,五分钟的延迟都会带来很多问题。
因此, 如果可以容忍消息丢失 ,这时候可以用以下方法止血:
但是,
消费者的消费积压一般都是因为消费者自身有问题
,可以参考如下文章,本次线上问题和下面文章描述的情况也基本一致:(链接已删除)
下面记录一下项目的问题排查和复现、解决过程。
问题排查和复现
1、消费积压和重复消费出现
kafka消费积压情况:(这时候消费者已经不行了,一直在消费消息,但是位移无法提交,因此在重启生产环境的服务)
2、问题排查
2.1、消费者消费缓慢的原因
通过检查消费者日志,发现在消费一条消息时,消费者需要2次更新MongoDB数据库,而每次更新都会花费一秒钟以上。于是检查对应的数据库,发现单个集合数据量已经达到了170万条,且没有添加任何索引,这不慢才怪了。
于是将集合做了归档(rename),随即新建了带有索引的新集合,更新MongoDB的时间缩短到了0.01s,线上问题初步解决。
2.2、kafka offset无法更新的原因
问万能的chatGpt和阅读上面的文章,基本确认是消费者单次拉取消息过多,在默认时间内未完成处理,触发集群rebanlance导致的,如下文:
消费超时会发生什么?
Kafka Handle Error, Client Will Seek Soon: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
报错信息非常良心,简单解释下:
集群以为消费线程挂了,触发了rebanlance(这一批已经给别的消费者线程消费了)。当前消费者线程业务逻辑执行完了再去同步游标报错了,没有提交成功,这就导致了两个消费者线程把同一批消息消息了两遍。
kafka消费者超时解决方案_kafka消费超时_7im0thyZhang的博客-CSDN博客
如果超过了 [max.poll.interval.ms] 所设置的时间,就会被消费组所在的 coordinator 剔除掉,从而导致重平衡,Kafka 重平衡过程中是不能消费的,会导致消费组处于类似 stop the world 的状态下,重平衡过程中也不能提交位移,这会导致消息重复消费从而使得消费组的消费速度下降,导致消息堆积。
kafka消息堆积原因解析
kafka消费堆积
鸭梨山大哎的博客-CSDN博客
2.3、问题复现
1、在消费者程序中加入线程休眠模拟处理时长(5s);
2、配置max-poll-records参数为500(默认值),使消费者可以单次拉取较多的消息;
3、通过本地工程短时间循环请求接口600次;
4、检查消费者日志和消费积压情况;
消费积压情况:消费者的kafka offset在更新几个之后,逐渐卡在3035,不再更新,如下图
解决重复消费问题的解决办法验证:
1、配置max-poll-records参数为10,使消费者可以单次最多仅可拉取10条消息
2、在不改变消费者消费能力(速度)的情况下,检查消费情况
3、发现消费积压逐渐减少,且减少的步长就是配置的max-poll-records的值(10)(如下图),说明只要
单次拉取的消息数 * 处理时间
<
kafka的超时阈值[max.poll.interval.ms]
(默认五分钟) 即可保证消费结果可成功提交;
首先,要提升消费者消费能力,涉及到数据库的就添加索引和缓存,涉及到网络连接的考虑添加连接池等;
其次,kafka的相关配置一定要根据自己的需要及时修改,避免一次拉取过多消息无法处理。反复触发rebanlance的话,位移就一点都不会更新了。也就是卡死了。
最后,可以根据需求为kafka配置多线程消费,要注意为topic配置多个分区,注意上游消息生产者不要使用完全一样的key(否则消息无法发送到多个分区),然后再为消费者配置多线程的
KafkaListenerContainerFactory
,注意代码做好线程安全修改即可。