在业务的运用中。
对于消息重复,这个影响不是很严重,无论是生产者重复推送数据,还是消费者重复拉取数据,只要在消费端落库时,手动做去重就可以了。
对于消息丢失:
-
consumer端丢失消息的情形比较简单:
如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失
。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,可以采用手动提交offset:(1)enable.auto.commit=false 关闭自动提交位移、(2)在消息被完整处理之后再手动提交位移
-
生产者丢失消息是最复杂的情形了。生产者(Producer) 使用
send
方法发送消息实际上是异步的操作,我们可以通过
get()
方法获取调用结果,但是这样也让它变为了同步操作,但是一般不推荐这么做!可以采用为其添加回调函数的形式。这个回调函数会在 Producer 收到 ack 时调用,此处就和acks参数配置[1、0、-1]密切相关了,详细代码可参考:
Kafka API(代码编写)
-
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!另外这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次,你3次一下子就重试完了。
消息
重复
场景
Producer的send()方法可能会出现异常,配合生产者参数retries>0,生产者会在出现可恢复异常的时候进行重试。
若出现不可恢复异常的时候,配合send()的异步发送方式,则可能在回调函数中进行
消息
重发。上述均可能导致
消息
重复
。
Kafka
的幂等性就是为了避免出现生产者重试的时候出现
重复
写入
消息
的情况。
开启幂等性功能配置(该配置默认为false)如下:
prop.put(Pro
当设置成false时,由于是手动提交的,可以处理一条提交一条,也可以处理一批,提交一批,由于consumer在
消费
数据
时是按一个batch来的,当pull了30条
数据
时,如果我们处理一条,提交一个offset,这样会严重影响
消费
的能力,那就需要我们来按一批来处理,或者设置一个累加器,处理一条加1,如果在处理
数据
时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次
消费
的时候就可以从提交的offset处进行再次
消费
。
Kafka
是一个分布式
消息
队列系统,具有高可靠性、高性能和高扩展性等特点。在
数据
传输过程中,
Kafka
采用了多种措施来
保证
数据
的可靠性,包括
数据
复制、
数据
持久化、
数据
备份等。本文将从各个阶段深入分析
Kafka
如何
保证
数据
不
丢失
、不
重复
,并提供代码实例来验证过程。
由于生产者端设置了发送
消息
的ack为1,并且生产者把
消息
发送到集群并且leader已经拿到
消息
,正好在返回ack的时候产生了网络波动,生产者拿不到broker返回的ack所以触发了重试机制,又一次给broker发送了这条
消息
。那么此时
消费
者就会从broker中poll到两条相同的
消息
。因为我们把ack设置成1或者-1/all,这样生产者生产的
消息
发送到broker中,会等待leader或者至少leader和一个副本同步到
消息
才会返回ack,如果生产者同步
消息
失败,会进行重试。二、防止
消息
重复
消费
。
[
Kafka
]
Kafka
如何
保证
消息
不
丢失
、不
重复
Kafka
基本架构
Kafka
如何
保证
消息
不
丢失
、不
重复
Kafka
消息
的
丢失
和
重复
可能会发生在哪里?
Kafka
如何
保证
`生产者端`的
消息
不
丢失
、不
重复
?生产者端`
丢失
数据
`的情况分析
Kafka
基本架构
生产者Producer :生产信息;
消费
者Consumer :订阅主题、
消费
信息;
代理Broker : 可以看作是一个独立的
Kafka
实例。多个
Kafka
Broker 组成一个卡夫卡集群
Kafka
Cluster;
主题topic:可以理解
使用带回调的发送
消息
的方法。
如果
消息
没有发送成功,那么Producer会按照配置的重试规则进行重试,如果重试次数用光后,还是
消息
发送失败,那么
kafka
会将异常信息通过回调的形式带给我们,这时,我们可以将没有发送成功的
消息
进行持久化,做后续的补偿处理。
(1)
kafka
有个offset的概念,当每个
消息
被写进去后,都有一个offset,代表他的序号,然后consumer
消费
该
数据
之后,隔一段时间,会把自己
消费
过的
消息
的offset提交一下,代表我已经
消费
过了。下次我要是重启,就会继续从上次
消费
到的offset来继续
消费
。但是当我们直接kill进程了,再重启。这会导致consumer有些
消息
处理了,但是没来得及提交offset。等重启之后,少数
消息
就会再次
消费
一次。