Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}finally{
consumer.close();
很简单,1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);
2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);
3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")
使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。
4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);
5、处理消息(打印了offset key value 这里写处理逻辑)。
6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。
Properties详解:
bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)
deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。
默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。
也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑
除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer
还有session.timeout.ms "coordinator检测失败的时间"
是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。
max.poll.interval.ms "consumer处理逻辑最大时间"
处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。
auto.offset.reset "无位移或者位移越界时kafka的应对策略"
所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效
所以3个值的解释是:
earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费
latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、
我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))
enable.auto.commit 是否自动提交位移
true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。
fetch.max.bytes consumer单次获取最大字节数
max.poll.records 单次poll返回的最大消息数
默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。
hearbeat.interval.ms consumer其他组员感知rabalance的时间
该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了
connections.max.idle.ms 定期关闭连接的时间
默认是9分钟 可以设置为-1 永不关闭
poll方法详解:
(旧版本:多分区多线程 新版本:一个线程管理多个socket连接)
但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,
另一个是后台心跳线程。
根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。
java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。
poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。
如果没有定时任务呢,那就设置为 Long.MAX_VALUE 未获取足够多的数据就无限等待。这里要捕获一下WakeupException。
consumer offset详解:
consumer需要定期向kafka提交自己的offset信息。已经学过 新版本将他提交到了一个topic中 __consumer_offsets。
offset有一个更大的作用是实现交付语义:
最多一次 at most once 可能丢失 不会重复
最少一次 at least once 可能重复 不会丢失
精确一次 exactly once 不丢失 不重复 就一次
若consumer在消费之前提交位移 就实现了at most once
若是消费后提交 就实现了 at least once 默认是这个。
consumer的多个位置信息:
上次提交的位置 当前位置 水位 日志最新位移
0 1 。。 5 。。 10 。。 15
上次提交位置:consumer最近一次提交的offset值;
当前位置:consumer上次poll 到了这个位置 但是还没提交;
水位:这是分区日志的管理 consumer无法读取水位以上的消息;
最新位移: 也是分区日志的管理 最大的位移值 一定不会比水位小。
新版本的consumer会在broker选一个broker作为consumergroup的coordinator,用于实现组成员管理,消费分配方案,提交位移。如果consumer崩溃,他负责的分区就分配给其他consumer,如果没有做好位移提交就可能重复消费。
多次提交的情况,kafka只关注最新一次的提交。
默认consumer自动提交位移 提交间隔为5秒 可以通过 auto.commit.interval.ms 设置这个间隔。
自动提交可以减少开发,但是可能重复消费,所以需要精准消费时还是要手动提交。设置手动提交 enable.auto.commit = false,然后调用 consumer.commitSync() 或者 consumer.commitAync() Sync为同步方式,阻塞 Aync为异步方式,不会阻塞。这两个方法可以传参,指定为哪个分区提交,这样更合理一些。
(旧版本的自动提交设置是 auto.commit.enable 默认间隔为60秒)
rebalance详解:
rebalance是consumer group如何分配topic的所有分区。
正常情况,比如有10个分区,5个consumer 那么consumer group将为每个consumer 平均分配两个分区。
每个分区只会分给一个consumer实例。有consumer出现问题,会重新执行这个过程,这个过程就是rebalance。
(旧版本通过zookeeper管理rebalance,新版本会选取某个broker为group coordinator来管理)
rebalance的触发条件:
1、有新的consumer加入,或者有consumer离开或者挂掉。
2、group订阅的topic发生变更,比如正则订阅。
3、group订阅的分区数发生变化。
第一个经常出现,不一定是挂掉,也可能是处理太慢,为了避免频繁rebalance,要调整好request.timeout.ms max.poll.records和ma.poll.interval.
rebalance分区策略:
partition.assignment.strategy 设置 自定义分区策略-创建分区器 assignor
range策略(默认),将分区划分为分区段,一次分配给每个consumer。
round-robin策略,轮询分配。
sticky策略(0.11.0.0出现,更优秀),range策略在订阅多个topic时会不均匀。
sticky有两个原则,当两者发生冲突时,第一个目标优先于第二个目标。
分区的分配要尽可能的均匀;
分区的分配尽可能的与上次分配的保持相同。
rebalance generation分代机制保证rabalance时重复提交的问题,延迟的offset提交时旧的generation信息会报异常ILLEGAL_GENERATION
rebalance过程:
1、确定coordinator所在的broker,建立socket连接。
确定算法: Math.abs(groupID.hashCode) % offsets.topic.num.partition 参数值(默认50)
寻找__consumer_offset分区50的leader副本所在的broker,该broker即为这个group的coordinator
2、加入组
所有consumer会向coordinator发送JoinGroup请求,收到所有请求后选一个consumer做leader(这个leader是consumer coordinator是broker),coordinator把成员和订阅信息发给coordinator。
3、同步分配方案
leader制定分配方案,通过SyncGroup请求发给coordinator,每个consumer也会发请求返回方案。
kafka也支持offset不提交到__consumer_offset,可以自定义,这时候就需要实现一个监听器ConsumerRebalanceListener,在这里重新处理Rebalance的逻辑。
多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。
Main:
public static void main(String[] args) {
String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){
consumers = new ArrayList<>(consumerNum);
for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
public void execute(){
for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerRunnable implements Runnable{
private final KafkaConsumer<String,String> consumer;
public ConsumerRunnable(String bootstrapServers,String groupId,String topic){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
standalone consumer
有一些需求,需要指定一个消费者消费某一个分区。彼此之间不干扰,一个standalone consumer崩溃不会影响其他。
类似旧版本的低阶消费者。
示例代码如下:consumer.assign方法订阅分区
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> allpartitions = consumer.partitionsFor("testtopic");
if(allpartitions!=null && !allpartitions.isEmpty()){
for(PartitionInfo partitionInfo:allpartitions){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
以上为kafka消费者的学习,不同的具体细节还需要通过官网文档仔细学习。
大数据流动 专注于大数据实时计算,数据治理,数据可视化等技术分享与实践。
请在后台回复关键字下载相关资料。相关学习交流群已经成立,欢迎加入~