添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在业务的运用中。

对于消息重复,这个影响不是很严重,无论是生产者重复推送数据,还是消费者重复拉取数据,只要在消费端落库时,手动做去重就可以了。

对于消息丢失:

  • consumer端丢失消息的情形比较简单: 如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失 。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,可以采用手动提交offset:(1)enable.auto.commit=false 关闭自动提交位移、(2)在消息被完整处理之后再手动提交位移
  • 生产者丢失消息是最复杂的情形了。生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get() 方法获取调用结果,但是这样也让它变为了同步操作,但是一般不推荐这么做!可以采用为其添加回调函数的形式。这个回调函数会在 Producer 收到 ack 时调用,此处就和acks参数配置[1、0、-1]密切相关了,详细代码可参考: Kafka API(代码编写)
    • 如果消息发送失败的话,我们检查失败的原因之后重新发送即可!另外这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次,你3次一下子就重试完了。
消息 重复 场景 Producer的send()方法可能会出现异常,配合生产者参数retries>0,生产者会在出现可恢复异常的时候进行重试。 若出现不可恢复异常的时候,配合send()的异步发送方式,则可能在回调函数中进行 消息 重发。上述均可能导致 消息 重复 Kafka 的幂等性就是为了避免出现生产者重试的时候出现 重复 写入 消息 的情况。 开启幂等性功能配置(该配置默认为false)如下: prop.put(Pro 当设置成false时,由于是手动提交的,可以处理一条提交一条,也可以处理一批,提交一批,由于consumer在 消费 数据 时是按一个batch来的,当pull了30条 数据 时,如果我们处理一条,提交一个offset,这样会严重影响 消费 的能力,那就需要我们来按一批来处理,或者设置一个累加器,处理一条加1,如果在处理 数据 时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次 消费 的时候就可以从提交的offset处进行再次 消费 Kafka 是一个分布式 消息 队列系统,具有高可靠性、高性能和高扩展性等特点。在 数据 传输过程中, Kafka 采用了多种措施来 保证 数据 的可靠性,包括 数据 复制、 数据 持久化、 数据 备份等。本文将从各个阶段深入分析 Kafka 如何 保证 数据 丢失 、不 重复 ,并提供代码实例来验证过程。 由于生产者端设置了发送 消息 的ack为1,并且生产者把 消息 发送到集群并且leader已经拿到 消息 ,正好在返回ack的时候产生了网络波动,生产者拿不到broker返回的ack所以触发了重试机制,又一次给broker发送了这条 消息 。那么此时 消费 者就会从broker中poll到两条相同的 消息 。因为我们把ack设置成1或者-1/all,这样生产者生产的 消息 发送到broker中,会等待leader或者至少leader和一个副本同步到 消息 才会返回ack,如果生产者同步 消息 失败,会进行重试。二、防止 消息 重复 消费 。 [ Kafka ] Kafka 如何 保证 消息 丢失 、不 重复 Kafka 基本架构 Kafka 如何 保证 消息 丢失 、不 重复 Kafka 消息 丢失 重复 可能会发生在哪里? Kafka 如何 保证 `生产者端`的 消息 丢失 、不 重复 ?生产者端` 丢失 数据 `的情况分析 Kafka 基本架构 生产者Producer :生产信息; 消费 者Consumer :订阅主题、 消费 信息; 代理Broker : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个卡夫卡集群 Kafka Cluster; 主题topic:可以理解 使用带回调的发送 消息 的方法。 如果 消息 没有发送成功,那么Producer会按照配置的重试规则进行重试,如果重试次数用光后,还是 消息 发送失败,那么 kafka 会将异常信息通过回调的形式带给我们,这时,我们可以将没有发送成功的 消息 进行持久化,做后续的补偿处理。 (1) kafka 有个offset的概念,当每个 消息 被写进去后,都有一个offset,代表他的序号,然后consumer 消费 数据 之后,隔一段时间,会把自己 消费 过的 消息 的offset提交一下,代表我已经 消费 过了。下次我要是重启,就会继续从上次 消费 到的offset来继续 消费 。但是当我们直接kill进程了,再重启。这会导致consumer有些 消息 处理了,但是没来得及提交offset。等重启之后,少数 消息 就会再次 消费 一次。