背景是拿备份数据启kafka服务,之后项目就连不上kafka
  
  
   输出大致是这样的:
  
  
   [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=111] Group coordinator xxxxx:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
   
   [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=111] Discovered group coordinator xxxxx:9092 (id: 2147483647 rack: null)
   
   [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=111] Group coordinator xxxxx:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
  
  
   原因有很多:
  
  
   1. kafka 相应的hostname 和ip 对应本地没有配置,telnet检查下
  
  
   可能因为异常重启,造成__consumer_offsets的清理一直没有进行,积累了大量历史数据,kafka一直加载__consumer_offsets,导致__consumer_offsets无法正常工作,从而提示Coordinator unavailable 。
  
  
   解决方案:删除kafka_log/__consumer_offsets的历史文件,再启动Kafka进程即可。
  
  
   位移的概念
每个 con
   
    sum
   
   
    er
   
   实例都会为它
   
    消费
   
   的分区维护属于自己的位置信息来
   
    记录
   
   当前
   
    消费
   
   了多少条消息 。在
   
    Kafka
   
   中,这叫位移 Offset。
   
    消费
   
   位移
   
    记录
   
   了 Con
   
    sum
   
   
    er
   
   要
   
    消费
   
   的下一条消息的位移。
con
   
    sum
   
   
    er
   
   
    group
   
   使用一个长整型保存 offset。同时
   
    Kafka
   
   con
   
    sum
   
   
    er
   
   还引入了检查点机制( checkpointing)定期对 offset 进行持久化,从而简化了应答机制的实现 。
   
    Kafka
   
   con
   
    sum
   
   
    er
   
   在内部使用一个 map 来保存其订阅 topic 所
  
  
   报错信息:
   
    Dis
   
   
    cove
   
   
    red
   
   
    coordi
   
   
    nat
   
   or DESKTOP-NRTTBDM:9092 (id: 2147483647 rack: null) for
   
    group
   
   itstyle.
windows 上运行的
   
    kafka
   
   拿到的host是机器名而不是IP地址
所以会导致报错
DESKTOP-NRTTBDM 是
   
    Kafka
   
   实例所在服务器的主机名
而9092 是
   
    kafka
   
   的端口即
   
    Kafka
   
   的连接地址。
直接修改本机 hosts 文件
windows 系统 hosts 文件位
  
  
   
    Kafka
   
   启动成功且运行程序无报错,外网无法连接
   
    Kafka
   
   的
   
    消费
   
   者或生产者
sparkStreaming
   
    消费
   
   
    kafka
   
   中的数据,得不到数据以及无报错信息,找错误如下
首先检查一下,
   
    Kafka
   
   的
   
    消费
   
   者和
   
    Kafka
   
   生成者的Topic是否对应错误,以及其他错误
开启
   
    kafka
   
   使用下面指令,看
   
    kafka
   
   是否有错误
/opt/module/
   
    kafka
   
   /bin/
   
    kafka
   
   -s
   
    er
   
   v
   
    er
   
   -start.sh  /opt/module/
   
    kafka
   
   /config/s
   
    er
   
   v
   
    er
   
   .prop
   
    er
   
   ties
注意程序控制台是否出
  
  
   在前面我们讲过,
   
    Kafka
   
   Produc
   
    er
   
   是线程安全的,同时其内部还有一个Send
   
    er
   
   ,开了一个后台线程,不断从队列中取消息进行发送。
而con
   
    sum
   
   
    er
   
   ,是一个纯粹的单线程程序,后面所讲的所有机制,包括
   
    coordi
   
   
    nat
   
   or,rebalance, heartbeat等,都是在这个单线程的poll函数里面完成的。也因此,在con
   
    sum
   
   
    er
   
   的代码内部,没有锁的出现。
//客户端线程
while (true) {
    Con
   
    sum
   
   
    er
   
   Records<String, String> ...
  
  
   错误信息:
   
    Group
   
   
    coordi
   
   
    nat
   
   or promote.localdomain:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt
   
    red
   
   is
   
    cove
   
   ry
2018-10-29 14:56:52.496  INFO 33656 --- [ntain
   
    er
   
   #0-0-C-1] o.a.k.c.c.inte...
  
  
   
    Kafka
   
   之Con
   
    sum
   
   
    er
   
   
    消费
   
   者Con
   
    sum
   
   
    er
   
   概述
   
    消费
   
   者
   
    消费
   
   者组位移(offset)位移提交
   
    消费
   
   者重组SpringBoot集成
   
    Kafka
   
   导入依赖参数自定义配置
   
    消费
   
   消息消息测试Con
   
    sum
   
   
    er
   
   相关消息轮询位移管理con
   
    sum
   
   
    er
   
   位移管理自动提交和手动提交
Con
   
    sum
   
   
    er
   
   概述
con
   
    sum
   
   
    er
   
   是读取
   
    kafka
   
   集群某些topic消息的应用程序。
   
    消费
   
   者组
   
    消费
   
   者用一个
   
    消费
   
   者组名来标记自己,topic的每条消息都只会被发送到每个订阅它的
   
    消费
   
   者组的一个
   
    消费
   
   者实例上。
我们知道
   
    kafka
   
   同时支持基于队列和
  
  在IDE上创建maven项目,pom文件添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.
  
   kafka
  
  /
  
   kafka
  
  -clients -->
<dependency>
    <
  
   group
  
  Id>org.apache.
  
   kafka
  
  </
  
   group
  
  Id>
    <artifactId>
  
   kafka
  
  -clients</artifactId>
    <v
  
   er
  
  sion
要么
  
   消费
  
  很缓慢
  
   Group
  
  
   coordi
  
  
   nat
  
  or ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt
  
   red
  
  is
  
   cove
  
  ry
重启节点也没用
单节点情况下
  
   coordi
  
  
   nat
  
  or 压力太大了
__con
  
   sum
  
  
   er
  
  _offsets这个元主题的历史消息量分配不均衡
有分区kb、M级别正常,有几个分区直接几十G,如果
  
   消费
  
  的主题在这些压力大的分区,就会
  
   消费
  
  阻塞
因为这个元主
  
   kafka
  
  Template包装生产者工厂,生产者工厂包含具体的send发送send
  
   er
  
  Props参数,往topic里发,
Concurrent
  
   Kafka
  
  Listen
  
   er
  
  Contain
  
   er
  
  Factory监听器包装
  
   消费
  
  者工厂,
  
   消费
  
  者工厂包含具体的con
  
   sum
  
  
   er
  
  
   消费
  
  con
  
   sum
  
  
   er
  
  Props参数,从topic里
  
   消费
  
  ,该topic要和生产者的一致。
Collecting package metadata (current_repodata.json): failed
UnavailableInvalidChannel: The channel is not accessible or is invalid.
  channel name: simple
  channel url: https://pypi.tuna.tsinghua.edu.cn/simple
  
   er
  
  ror code: 404
You will need to adjus
  
   
    Kafka
   
   的
   
    消费
   
   者端通常会维护一些状态,以便处理消息和保证消息处理的顺序性。这些状态主要涉及到以下几个方面:
1. **分区偏移量(Partition Offset)**:这是
   
    消费
   
   者用来跟踪已经
   
    消费
   
   或
   
    失败
   
   的消息位置。每个分区都有一个偏移量,表示
   
    消费
   
   者读取到的最新消息的位置。这有助于
   
    消费
   
   者从上次
   
    消费
   
   的地方继续
   
    消费
   
   ,而不是从头开始。
2. **消息确认(Message Commit)**:
   
    消费
   
   者在处理完一条消息后,通常会向
   
    Kafka
   
   发送一个确认,告诉
   
    Kafka
   
   它已经成功
   
    消费
   
   了该消息。这一步是幂等的,即同一条消息被确认多次,结果都是一样的。
3. **
   
    消费
   
   者组(Con
   
    sum
   
   
    er
   
   
    Group
   
   )状态**:在
   
    Kafka
   
   中,一个
   
    消费
   
   者通常是属于某个
   
    消费
   
   组的。每个组内部有分工,比如使用FIFO(先进先出)或者轮询的方式分配消息。每个
   
    消费
   
   者会保存自己的
   
    消费
   
   进度信息,以便其他成员在
   
    消费
   
   者离开或重启后能接续
   
    消费
   
   。
4. **位点(Commit Timestamp)**:在某些情况下,
   
    Kafka
   
   还可能
   
    记录
   
   消息的提交时间戳,这可以帮助在分布式系统中处理消息持久化的问题,如果发生故障,可以从正确的提交时间点恢复。
5. **故障恢复策略(Rebalance Strategy)**:
   
    Kafka
   
   允许
   
    消费
   
   者组在增删
   
    消费
   
   者时重新均衡任务。
   
    消费
   
   者组的状态会被保存,使得在
   
    消费
   
   者改变后可以无缝地重新开始
   
    消费
   
   。
相关问题:
1.
   
    Kafka
   
   如何保证消息的唯一
   
    消费
   
   ?
2. 如何设置和管理
   
    消费
   
   者的偏移量?
3.
   
    Kafka
   
   的幂等确认机制是如何工作的?