while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
long lastOffset = partitionRecord.offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
重平衡 (rebalance)
rebalance的作用
rebalance是一组协议,其定义了一个consumer group中的所有consumer如何达成一致均匀的订阅topic中的所有分区,
好比一共有一百块砖头,5个工人,那么这5个工人得达成协议怎么搬,不能两个或者五个人同时搬一块砖,这样会造成不必要的资源浪费,
例如一个名为A的topic,其有100个分区,现在有一个group来订阅A topic,该group中有5个consumer,默认情况下,kafka会为每个consumer分配不同的20个分区进行订阅消费,该分配过程就叫rebalance,
当consumer成功执行rebalance后, 组订阅的topic的每个分区只会分配给组内的一个consumer实例。
group coordinator(组协调者)
kafka内置了一个组协调协议 (group coordinator protocol),对于每个消费者组,kafka集群中的某个broker会被选举为其组协调者 (group coordinator),
coordinator负责对组的状态进行管理,其主要职责就是当组内有新成员来时,对该组进行 rebalance 操作,即协调topic分区订阅重新分配。
rebalance的触发条件
组rebalance的触发条件有下面3个:
组成员变更
例如新 consumer 加入组,已有consumer离开组,或者已有consumer崩溃。
组订阅topic数变更
基于正则表达式的订阅,当有符合正则表达式的新topic被创建时。
组订阅topic分区数变更
被订阅的topic的分区数发生更改,例如使用命令行脚本增加了topic的分区数。
一般应用比较常见的触发条件是第一种,即 consumer 崩溃,这里的崩溃并不一定指的是consumer进程挂掉,或者consumer进程所在的机器宕机,
而是指的是,当consumer无法在指定时间内完成消息的处理,coordinator将认为consumer已经崩溃,从而触发新一轮的rebalance,
在实际的业务处理中,一定要避免在poll主线程中执行较重的逻辑处理,这会导致处理时间过长而被coordinator认为崩溃执行rebalance,
频繁的rebalance会极大的降低consumer的吞吐量,在生产环境中需要结合业务配置好consumer的几个参数:
request.timeout.ms
、max.poll.records
、max.poll.interval.ms
,避免不必要的rebalance出现。
rebalance分区分配策略
分区分配策略决定topic的分区使用何种方式分配给consumer订阅。
consumer默认有三种分区分配策略:
range
range策略基于范围的思想,将单个topic的分区按照顺序排列,然后将这些分区划分成固定大小的分区段,并依次分配给每个consumer。
假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。
range策略的分配过程大概:首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;
消费者排完序将会是C1, C2;
然后将partitions的个数除以消费者的总数来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
几个例子:
情况1: 有10个分区,2个消费者, 10 / 2 = 5,那么消费者 C1和消费者C2 将会消费同样多的分区,所以最后分区分配的结果:
C1 将消费 0, 1, 2, 3, 4 分区
C2 将消费 5, 6, 7, 8, 9 分区
情况2: 有11个分区,那么最后分区分配的结果:
C1 将消费 0, 1, 2, 3, 4, 5 分区
C2 将消费 6, 7, 8, 9, 10分区
情况3: 有2个主题(T1和T2),分别有11个分区(0,1,2,…10),那么最后分区分配的结果:
C1 将消费 T1主题的 0, 1, 2, 3, 4, 5 分区以及 T2主题的 0, 1, 2, 3, 4, 5分区,总12个分区
C2 将消费 T1主题的 6, 7, 8, 9, 10 分区以及 T2主题的 6, 7, 8, 9, 10分区,总10个分区
可以看出,C1 消费者比C2 消费者多消费了2个分区,这是Range策略的一个弊端。
round-robin
round-robin策略将所有topic的分区顺序摆开,然后轮询式的分配给每个consumer。
假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。
那么最终分配给C1的是:0,2,4,6,8
分配给C2的是:1,3,5,7,9
sticky
sticky策略即黏性策略,rebalance会最大限度的按照之前的分配方案分配给各个consumer。
假设我们有两个名为 T1 、T2 的主题,每个主题各3个分区,然后我们有三个消费者(C1,C2,C3)来消费这6个分区里面的数据。
现分配如下
C1: T1-0、T2-0
C2: T1-1、T2-1
C3: T1-2、T2-2
突然 C2 崩溃,那么rebalance后,重新分配为:
C1: T1-0、T1-1、T2-0
C3:T1-2、T2-1、T2-2
即在保持分配均匀的情况下,将原本属于某consumer的分区还分配给该consumer。
kafka consumer默认的分配策略是range,如果group下的所有consumer订阅的主题都是一样的,那么使用round-robin策略分配的会更均匀,
通过 partition.assignment.strategy
对consumer进行分配策略的设置,除了kafka自带的分配策略,用户也可以自定义分配器(assignor)。
rebalance generation
一个consumer group可以执行多次rebalance,generation 的引入是为了保护consumer group的offset无效提交,
generation 表示rebalance的分代,起初是0,当进行一次rebalance后,该值就会增加,假设上一届的consumer成员由于某些原因延迟提交了offset,
由于其提交offset时,携带的是旧的 generation 信息,因此该提交会被consumer group拒绝,很多时候 consumer 抛出 ILLEGAL_GENERATION 异常就是这个原因。
rebalance 协议
rebalance 本质是一组协议,group 与 coordinator 共同使用这组协议完成 group 的 coordinator,协议有如下几个:
JoinGroup:consumer请求加入组
SyncGroup:group leader 将分配方案同步更新到所有组内成员中
Heartbeat:consumer定期向coordinator汇报心跳表明自己存活
LeaveGroup:consumer主动通知coordinator自己即将离组
DescribeGroup:查看组的所有信息(成员信息、协议信息、分配方案以及订阅信息),该类型主要供管理员使用
在rebalance过程中,coordinator主要处理consumer发过来的JoinGroup和SyncGroup请求,当consumer主动离组时发送LeaveGroup请求给coordinator。
在rebalance成功之后,组内所有consumer定期向coordinator发送Heartbeat请求,每个consumer根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROCESS判断是否开启新一轮的rebalance。
rebalance 流程
consumer group在执行 rebalance 之前必须确定 coordinator 所在的broker,并创建与该 broker 相互通信的 socket 连接,
确定 coordinator 的算法与确定 offset 被提交到 __consumer_offsets 目标分区的算法相同,如下:
计算 Math.abs(groupID.hashCode) % offsets.topic.num.partitions 参数值(默认50),假设得出结果 10
寻找 __consumer_offsets 分区 10 的 leader 副本所在的broker,该broker即为这个 group 的 coordinator
成功连接 coordinator 后,即可进行 rebalance 操作,rebalance 主要分为两步:
这一步组内所有 consumer 向 coordinator 发送 JoinGroup 请求,当收集全 JoinGroup 请求后,coordinator 从中选择一个 consumer 担任 group 的 leader,并将所有的成员信息以及它们的订阅信息发送给leader,
group 的 leader 与 coordinator 并非同一种概念,leader是某个consumer实例,coordinator是kafka集群中的一个broker,分配方案由leader给出,而非coordinator,
之所以将分配方案交给consumer leader执行,是因为这样做有更好的灵活性,在这种机制下,用户可以自行实现类似Hadoop 机架感知(rack-aware)分配方案,同一机架上的分区分配给相同机架上的consumer,可减少网络传输的开销;同时,当consumer的分区策略发生改变后,重启consumer即可,无需broker进行介入。
同步更新分配方案
这一步 group leader 开始制定分配方案,即根据分配策略决定group中的consumer分别负责topic中的哪些分区, 一旦分配完成,leader会将分配方案分装进 SyncGroup 请求并发送给 coordinator,
组内所有的consumer都会发送 SyncGroup 请求,但只有leader发送的SyncGroup请求中包含了分配方案,coordinator收到分配方案后将属于各自consumer的分配方案作为SyncGroup请求的response返还给各自的consumer。
rebalance监听器
consumer默认将位移提交到 __consumer_offsets 中,其实 kafka 也支持用户将位移提交到外部存储中,例如数据库,
如果要实现这个功能,用户必须使用 rebalance 监听器,使用 rebalance 监听器的前提是用户使用 consumer group,如果使用的是独立consumer或者直接手动分配分区,那么 rebalance 监听器将不会生效,
rebalance监听器主要是一个接口回调类 ConsumerRebalanceListener
,有两个方法需要实现 onPartitionsRevoked
、onPartitionsAssigned
,前者在开启新一轮的rebalance前调用,后者在rebalance完成后调用。
rebalance监听器最常见的用法是手动提交位移到第三方存储库,以及在rebalance前后执行一些审计操作,demo如下:
大致思路:
使用 joinStart 保存本次rebalance的开始时间、totalRebalanceTimeMs统计所有rebalance的时长总和
在 onPartitionsRevoked 记录本次rebalance的开始时间
在 onPartitionsRevoked 将每个分区的offset用自定义方法 saveOffsetInExternalStore 保存到外部存储中
在 onPartitionsAssigned 将每个分区的offset从外部存储中读取出来,并使用seek设置consumer从该位置读取
累加 totalRebalanceTimeMs 总时长
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
final AtomicLong joinStart = new AtomicLong(0L);
final AtomicLong totalRebalanceTimeMs = new AtomicLong(0L);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
joinStart.set(System.currentTimeMillis());
for (TopicPartition partition : partitions) {
saveOffsetInExternalStore(consumer.position(partition));
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, readOffsetFromExternalStore(partition));
totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get());
如果启用了启动提交位移,用户可以不在 ConsumerRebalanceListener
监听器中手动提交唯一,consumer每次rebalance时,会检查用户是否启用了自动提交位移,如果是,它会自动帮用户提交,因此无需显式提交。
consumer要求rebalance在很短的时间内完成,因此在rebalance中不要放执行时间很长的逻辑,特别是一些阻塞方法。
解码序列化
解码序列化与Producer发送者发送时的序列化是互逆操作,即将对方序列化后的字节数组再恢复成原样子。
默认解序列化器
与Producer的序列化呼应,常用的deserializer如下:
StringDeserializer:序列化String类型
ByteBufferDeserializer:序列化ByteBuffer类型
BytesDeserializer:序列化Kafka自定义的Bytes类
DoubleDeserializer:序列化Double类型
IntegerDeserializer:序列化Integer类型
LongDeserializer:序列化Long类型
如果用户有更复杂的解序列化需求,可自行定义 deserializer 。
在构造Consumer对象时,指定相应的序列化值即可使用序列化:
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
自定义解序列化器
先定义一个实现 Deserializer 接口的类:
public class UserDeserializer implements Deserializer<User> {
private ObjectMapper objectMapper;
@Override
public void configure(Map configs, boolean isKey) {
this.objectMapper = new ObjectMapper();
@Override
public User deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, User.class);
} catch (IOException e) {
throw new RuntimeException(e);
@Override
public void close() {
this.objectMapper = null;
指定consumer的value解码序列化为刚刚创建的类:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "com.kafka.producer.UserDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-user-topic"));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
} finally {
consumer.close();
与之前的Producer程序配合使用,得到消费结果:
多线程消费
与KafkaProducer不同,KafkaConsumer是非线程安全的,因此在实践过程中,推荐KafkaProducer单实例供多线程使用,
对于KafkaConsumer非线程安全有两种实践方式推荐:
1. 每个线程单独建立自己的KafkaConsumer
既然KafkaConsumer实例是非线程安全的,那么每个线程创建时,都各自创建一个仅自己使用的KafkaConsumer就可以避免问题了。
样例设计,先定义三个类:
ConsumerRunnable 类:消费线程类,执行真正的消费任务
ConsumerGroup 类:消费线程管理类,创建多个线程类执行消费任务
ConsumerMain 类:测试主方法类
public class ConsumerMain {
static class ConsumerRunnable implements Runnable {
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
} finally {
consumer.close();
static class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; i++) {
consumers.add(new ConsumerRunnable(brokerList, groupId, topic));
public void execute() {
for (ConsumerRunnable consumer : consumers) {
new Thread(consumer).start();
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
int consumerNum = 3;
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
2. 单KafkaConsumer实例+多worker线程
大量的线程和对应的KafkaConsumer创建占用的资源相对也会较多,因此可以选择消息的获取与消息的处理逻辑进行解耦,在全局维护一个或若干个消费者实例进行消息获取,然后将消息的处理逻辑放入单独的工作者线程中进行就好。
样例设计,先定义三个类:
ConsumerThreadHandler:consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合,consumer位移提交也在这里进行。
ConsumerWorker:本质是一个Runnable,执行真正的业务逻辑处理,并上报位移信息给ConsumerThreadHandler。
Main类:测试主方法类。
public class Main {
static class ConsumerWorker<K, V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = records;
this.offsets = offsets;
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
for (ConsumerRecord<K, V> record : partitionRecords) {
System.out.println(record.value());
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
long currentOffset = offsets.get(partition).offset();
if (currentOffset <= lastOffset + 1) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
static class ConsumerThreadHandler<K, V> {
private final KafkaConsumer<K, V> consumer;
private ExecutorService executorService;
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
offsets.clear();
public void consumer(int threadNumber) {
executorService = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
try {
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
executorService.submit(new ConsumerWorker<>(records, offsets));
commitOffsets();
} finally {
commitOffsets();
consumer.close();
private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
synchronized (offsets) {
if (offsets.isEmpty()) {
return;
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
offsets.clear();
consumer.commitSync(unmodfiedMap);
public void close() {
consumer.wakeup();
executorService.shutdown();
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
ConsumerThreadHandler<String, String> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
new Thread(() -> handler.consumer(Runtime.getRuntime().availableProcessors())).start();
try {
Thread.sleep(20000L);
} catch (InterruptedException e) {
e.printStackTrace();
handler.close();
3. 两种方法对比
两种方式各有利弊,用户可根据实际业务场景进行相应的选择:
| 优点 | 缺点 |
---|
方法1(每个线程维护专属KafkaConsumer) | 实现简单;无线程间交互开销,速度较快;方便位移管理;易维护分区间的消息消费顺序 | Socket连接开销大;consumer数量受限于topic分区数,扩展性差;因为socket连接多,发送的请求也会多,所以broker端负载相对较高;rebalance可能性增大 |
方法2(全局consumer + 多worker线程) | 消息获取与处理解耦;可独立拓展consumer数量和worker数量,伸缩性较好 | 需要实现负载;分区间的消息消费顺序难以维护;处理链路变长,位移管理困难;worker线程异常可能导致消费数据丢失 |
独立consumer
consumer group 会自动帮用户执行分区分配和rebalance,对于需要多个consumer共同消费某topic的场景,使用group是最合适的,
如果用户需要严格控制某个consumer固定消费某些分区,场景如下:
由进程自己维护分区状态
进程自身保证高可用(可自行重启恢复错误,例如YARN、Mesos等容器调度框架),无需kafka完成错误检测和恢复
在这种情况下,consumer group则不适用,需要应用独立消费者(standalone consumer),standalone consumer 之间彼此独立工作,任意一个consumer崩溃不会影响其他的consumer。
独立消费者Demo
使用 consumer group 进行消息的消费时,我们使用 KafkaConsumer.subscribe
直接订阅topic,独立消费者使用 KafkaConsumer.assign
方法进行消费,
如果发生多次 KafkaConsumer.assign
调用,只有最后一次会生效,之前的会被覆盖,同时 assign 和 subscribe 不可以在同一个 consumer 中混用。
assign
方法接收一个分区列表,直接赋予 consumer 访问这些分区的权力,代码如下:
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> partitions = consumer.partitionsFor("my-user-topic");
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitions) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
try {
if (!topicPartitions.isEmpty()) {
consumer.assign(topicPartitions);
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, User>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, User> partitionRecord : partitionRecords) {
System.out.println(partitionRecord.value());
} finally {
consumer.close();
新/旧consumer对比
旧版本Consumer使用scala语言编写,新版本使用Java
旧版本Consumer依赖Zookeeper提交位移,新版本直接使用borker提供的topic,ZK本质只是一个协调服务组件,并不适合高并发的读写操作
旧版本的Consumer读取消息需要为每个分区都新建一个线程,新版本不需要
旧版本区分low-level 和 high-level 两个版本,前者没有consumer group的概念,而后者支持,新版本可通过不同方法的使用来实现是否支持group
总之旧版本已经不推荐使用,请在生产环境中使用新版本sdk。