背景是拿备份数据启kafka服务,之后项目就连不上kafka
输出大致是这样的:
[ Thread-27] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=111] Group coordinator xxxxx:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[ Thread-27] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=111] Discovered group coordinator xxxxx:9092 (id: 2147483647 rack: null)
[ Thread-27] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=111] Group coordinator xxxxx:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
原因有很多:
1. kafka 相应的hostname 和ip 对应本地没有配置,telnet检查下
可能因为异常重启,造成__consumer_offsets的清理一直没有进行,积累了大量历史数据,kafka一直加载__consumer_offsets,导致__consumer_offsets无法正常工作,从而提示Coordinator unavailable 。
解决方案:删除kafka_log/__consumer_offsets的历史文件,再启动Kafka进程即可。
位移的概念
每个 con
sum
er
实例都会为它
消费
的分区维护属于自己的位置信息来
记录
当前
消费
了多少条消息 。在
Kafka
中,这叫位移 Offset。
消费
位移
记录
了 Con
sum
er
要
消费
的下一条消息的位移。
con
sum
er
group
使用一个长整型保存 offset。同时
Kafka
con
sum
er
还引入了检查点机制( checkpointing)定期对 offset 进行持久化,从而简化了应答机制的实现 。
Kafka
con
sum
er
在内部使用一个 map 来保存其订阅 topic 所
报错信息:
Dis
cove
red
coordi
nat
or DESKTOP-NRTTBDM:9092 (id: 2147483647 rack: null) for
group
itstyle.
windows 上运行的
kafka
拿到的host是机器名而不是IP地址
所以会导致报错
DESKTOP-NRTTBDM 是
Kafka
实例所在服务器的主机名
而9092 是
kafka
的端口即
Kafka
的连接地址。
直接修改本机 hosts 文件
windows 系统 hosts 文件位
Kafka
启动成功且运行程序无报错,外网无法连接
Kafka
的
消费
者或生产者
sparkStreaming
消费
kafka
中的数据,得不到数据以及无报错信息,找错误如下
首先检查一下,
Kafka
的
消费
者和
Kafka
生成者的Topic是否对应错误,以及其他错误
开启
kafka
使用下面指令,看
kafka
是否有错误
/opt/module/
kafka
/bin/
kafka
-s
er
v
er
-start.sh /opt/module/
kafka
/config/s
er
v
er
.prop
er
ties
注意程序控制台是否出
在前面我们讲过,
Kafka
Produc
er
是线程安全的,同时其内部还有一个Send
er
,开了一个后台线程,不断从队列中取消息进行发送。
而con
sum
er
,是一个纯粹的单线程程序,后面所讲的所有机制,包括
coordi
nat
or,rebalance, heartbeat等,都是在这个单线程的poll函数里面完成的。也因此,在con
sum
er
的代码内部,没有锁的出现。
//客户端线程
while (true) {
Con
sum
er
Records<String, String> ...
错误信息:
Group
coordi
nat
or promote.localdomain:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt
red
is
cove
ry
2018-10-29 14:56:52.496 INFO 33656 --- [ntain
er
#0-0-C-1] o.a.k.c.c.inte...
Kafka
之Con
sum
er
消费
者Con
sum
er
概述
消费
者
消费
者组位移(offset)位移提交
消费
者重组SpringBoot集成
Kafka
导入依赖参数自定义配置
消费
消息消息测试Con
sum
er
相关消息轮询位移管理con
sum
er
位移管理自动提交和手动提交
Con
sum
er
概述
con
sum
er
是读取
kafka
集群某些topic消息的应用程序。
消费
者组
消费
者用一个
消费
者组名来标记自己,topic的每条消息都只会被发送到每个订阅它的
消费
者组的一个
消费
者实例上。
我们知道
kafka
同时支持基于队列和
在IDE上创建maven项目,pom文件添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.
kafka
/
kafka
-clients -->
<dependency>
<
group
Id>org.apache.
kafka
</
group
Id>
<artifactId>
kafka
-clients</artifactId>
<v
er
sion
要么
消费
很缓慢
Group
coordi
nat
or ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt
red
is
cove
ry
重启节点也没用
单节点情况下
coordi
nat
or 压力太大了
__con
sum
er
_offsets这个元主题的历史消息量分配不均衡
有分区kb、M级别正常,有几个分区直接几十G,如果
消费
的主题在这些压力大的分区,就会
消费
阻塞
因为这个元主
kafka
Template包装生产者工厂,生产者工厂包含具体的send发送send
er
Props参数,往topic里发,
Concurrent
Kafka
Listen
er
Contain
er
Factory监听器包装
消费
者工厂,
消费
者工厂包含具体的con
sum
er
消费
con
sum
er
Props参数,从topic里
消费
,该topic要和生产者的一致。
Collecting package metadata (current_repodata.json): failed
UnavailableInvalidChannel: The channel is not accessible or is invalid.
channel name: simple
channel url: https://pypi.tuna.tsinghua.edu.cn/simple
er
ror code: 404
You will need to adjus
Kafka
的
消费
者端通常会维护一些状态,以便处理消息和保证消息处理的顺序性。这些状态主要涉及到以下几个方面:
1. **分区偏移量(Partition Offset)**:这是
消费
者用来跟踪已经
消费
或
失败
的消息位置。每个分区都有一个偏移量,表示
消费
者读取到的最新消息的位置。这有助于
消费
者从上次
消费
的地方继续
消费
,而不是从头开始。
2. **消息确认(Message Commit)**:
消费
者在处理完一条消息后,通常会向
Kafka
发送一个确认,告诉
Kafka
它已经成功
消费
了该消息。这一步是幂等的,即同一条消息被确认多次,结果都是一样的。
3. **
消费
者组(Con
sum
er
Group
)状态**:在
Kafka
中,一个
消费
者通常是属于某个
消费
组的。每个组内部有分工,比如使用FIFO(先进先出)或者轮询的方式分配消息。每个
消费
者会保存自己的
消费
进度信息,以便其他成员在
消费
者离开或重启后能接续
消费
。
4. **位点(Commit Timestamp)**:在某些情况下,
Kafka
还可能
记录
消息的提交时间戳,这可以帮助在分布式系统中处理消息持久化的问题,如果发生故障,可以从正确的提交时间点恢复。
5. **故障恢复策略(Rebalance Strategy)**:
Kafka
允许
消费
者组在增删
消费
者时重新均衡任务。
消费
者组的状态会被保存,使得在
消费
者改变后可以无缝地重新开始
消费
。
相关问题:
1.
Kafka
如何保证消息的唯一
消费
?
2. 如何设置和管理
消费
者的偏移量?
3.
Kafka
的幂等确认机制是如何工作的?