《Apache Kafka系列》Kafka如何自定义序列化器?
大家好,我是 @明人只说暗话 。
本文介绍Kafka中的序列化器,以及如何自定义序列化器。
创作不易,禁止白嫖哦!
点赞、评论、关注,选择一个吧!
使用过kafka的朋友都应该知道,我们在创建KafkaProducer对象时,必须要指定key和value的序列化器,否则报如下错误:
org.apache.kafka.common.config.ConfigException: Invalid value null for configuration key.serializer: must be non-null.
org.apache.kafka.common.config.ConfigException: Invalid value null for configuration value.serializer: must be non-null.
kafka为我们提供了很多的系列化器和反序列化器,可以满足大部分业务场景,但是不足以满足所有的业务场景,因此,在这种情况下,我们就需要自定义系列化器了。
序列化器
序列化器的作用就是生产者在发送消息到kafka broker的时候,需要将我们的消息内容(可能是字符串类型,可能是数字类型,可能是我们自定义的Java对象)序列化成字节数组 。
如上图所示,org.apache.kafka.common.serialization.Serializer接口是kafka中序列化器的顶级接口,所有的序列化器都必须实现该接口。
kafka默认为我们提供了六种基本类型的包装类(Byte、Double、Float、Long、Short、Integer)的序列化器,还提供了String、List、UUID等类型的序列化器。
Serializer接口源码如下:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
@Override
default void close() {
// intentionally left blank
}
如上所示,Serializer接口主要有三个方法:
configure()方法用来配置当前类。
serialize()方法用来执行序列化操作。
close()方法用来关闭当前序列化器。
如果要实现Serializer接口,以上三个方法中,只有serialize()方法是必须要重写的,其他方法根据需要解决是否重写即可。
反序列化器
有序列化器,自然就有反序列化器。
显而易见,反序列化器的作用就是将序列化器序列化后的字节数组再反序列化成原来的类型。
如上图所示,org.apache.kafka.common.serialization.Deserializer接口是kafka中反序列化器的顶级接口,所有的反序列化器都必须实现该接口。
上面说到kafka为我们提供了很多序列化器,同样还为我们提供了对应的反序列化器。
Deserializer接口的代码如下:
public interface Deserializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
T deserialize(String topic, byte[] data);
default T deserialize(String topic, Headers headers, byte[] data) {
return deserialize(topic, data);
@Override
default void close() {
// intentionally left blank
}
如上所示,Deserializer接口主要有三个方法:
configure()方法用来配置当前类。
deserialize()方法用来执行反序列化操作。
close()方法用来关闭当前序列化器。
如果要实现Deserializer接口,以上三个方法中,只有deserialize()方法是必须要重写的,其他方法根据需要解决是否重写即可。
自定义序列化器和反序列化器
如果kafka自带的序列化器不能满足我们的实际需求,那么,我们可以通过实现Serializer接口来自定义序列化器,实现Deserializer接口来自定义反序列化器。
假设,我们向kafka中发送的是User对象,其属性如下所示。
package com.panda.kafka.entity;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDate;
@Data
@Builder
public class User {
private Long userId;
private String userName;
private Integer age;
private LocalDate birthday;
自定义UserSerializer
package com.panda.kafka.serialization;
import com.alibaba.fastjson2.JSON;
import com.panda.kafka.entity.User;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 不做任何配置
@Override
public byte[] serialize(String topic, User data) {
if (Objects.isNull(data)) {
return null;
String userStr = JSON.toJSONString(data);
return userStr.getBytes(StandardCharsets.UTF_8);
@Override
public void close() {
// 不需要关闭任何东西
如上所示,serialize方法的主要逻辑就是将User对象序列化成字节数组。
自定义UserDserializer
package com.panda.kafka.serialization;
import com.alibaba.fastjson2.JSON;
import com.panda.kafka.entity.User;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 不做任何配置
@Override
public User deserialize(String topic, byte[] data) {
if (Objects.isNull(data)) {
return null;
String userStr = new String(data, StandardCharsets.UTF_8);
return JSON.parseObject(userStr, User.class);
@Override
public void close() {
// 不需要关闭任何东西
如上所示,deserialize方法的主要逻辑是将byte数组转换为User对象。
配置自定义序列化器
生产者需要配置自定义的序列化器,如下:
spring:
kafka:
producer:
value-serializer: com.panda.kafka.serialization.UserSerializer
配置自定义反序列化器
消费者需要配置自定义的反序列化器,如下:
spring:
kafka:
consumer:
value-deserializer: com.panda.kafka.serialization.UserDeserializer
代码示例:自定义序列化器
application.yml
server:
port: 8080
servlet:
context-path: /
spring:
kafka:
bootstrap-servers: 106.114.211.115:9092,147.100.100.151:9092,147.100.113.168:9092
producer:
# 序列化器
value-serializer: com.panda.kafka.serialization.UserSerializer
# 序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# 开启事务时,必须设置为all
acks: all
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 生产者内存缓冲区的大小
buffer-memory: 1024000
retries: 3
properties:
request:
timeout:
ms: 180000
enable:
idempotence: true
max:
request:
size: 1048576
block:
ms: 6000
linger:
ms: 1
compression-type: none
consumer:
# 消费者组
group-id: consumer-test-01
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
# 反序列化器
value-deserializer: com.panda.kafka.serialization.UserDeserializer
# 反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要
properties:
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
max:
poll:
interval:
ms: 600000
# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
session:
timeout:
ms: 10000
spring:
json:
trusted:
packages: "*"
# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
max-poll-records: 500
auto-commit-interval: 100
listener:
# 在监听器容器中运行的线程数,一般设置为 机器数 * 分区数
concurrency: 9
# 自动提交关闭,需要设置手动消息确认
ack-mode: MANUAL_IMMEDIATE
# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
missing-topics-fatal: false
# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
poll-timeout: 600000
配置项目端口,以及kafka服务器地址,生产者和消费者相关信息。
KafkaProducerApplication.java
package com.panda.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
@EnableKafka
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
入口类。
配置EnableKafka注解,开启kafka端点监听功能。
KafkaProducerUtil.java
package com.panda.kafka.config;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KafkaProducerUtil {
@Resource
private KafkaPropertiesConfig propertiesConfig;
public Map<String, Object> producerConfigs() {
// 创建Kafka的配置信息
Map<String, Object> properties = new HashMap<>(16);
// 指定broker,会自动从broker中获取元数据信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, propertiesConfig.getBootstrapServers());
// 执行ack应答机制
properties.put(ProducerConfig.ACKS_CONFIG, propertiesConfig.getAcks());
// 指定重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, propertiesConfig.getRetries());
// 指定批处理的字节数
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, propertiesConfig.getBatchSize());
// 等待时间,如果批次未满 间隔一定时间发出去
properties.put(ProducerConfig.LINGER_MS_CONFIG, propertiesConfig.getLingerMs());
// RecordAccumulator缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, propertiesConfig.getBufferMemory());
// 指定key序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, propertiesConfig.getKeySerializer());
// 指定value序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, propertiesConfig.getValueSerializer());
// 开启幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, propertiesConfig.getIdempotence());
// 生产端能够发送的最大消息大小,默认值为1048576字节(1M)
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, propertiesConfig.getMaxRequestSize());
// os page cache 中压缩格式
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, propertiesConfig.getCompressionType());
// 控制客户端等待请求响应的最长时间。
// 如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则请求失败。
// 此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, propertiesConfig.getRequestTimeoutMs());
// 指定控制KafkaProducer.send()方法和KafkaProducer.partitionsFor()方法的阻塞时间,
// 这些方法可以由于缓冲区已满或元数据不可用而被阻塞,用户提供的序列化器或分区器中的阻塞将不计入此超时时间。
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, propertiesConfig.getMaxBlockMs());
return properties;
public KafkaProducer<String, Object> createKafkaProducer() {
return new KafkaProducer<>(producerConfigs());
kafka配置信息。
配置KafkaProducer对象。
KafkaPropertiesConfig.java
package com.panda.kafka.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class KafkaPropertiesConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Long bufferMemory;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.properties.max.request.size}")
private Integer maxRequestSize;
@Value("${spring.kafka.producer.compression-type}")
private String compressionType;
@Value("${spring.kafka.producer.properties.linger.ms}")
private Long lingerMs;
@Value("${spring.kafka.producer.properties.max.block.ms}")
private Long maxBlockMs;
@Value("${spring.kafka.producer.properties.enable.idempotence}")
private Boolean idempotence;
@Value("${spring.kafka.producer.properties.request.timeout.ms}")
private Integer requestTimeoutMs;
从配置文件中将kafka消费者配置信息引入Java代码中,用于配置KafkaProducer对象。
KafkaConstants.java
package com.panda.kafka.constants;
public class KafkaConstants {
public static interface KafkaTopic {
String TOPIC_TEST_001 = "topic-test-001";
String TOPIC_TEST_002 = "topic-test-002";
常量类。
配置kafka的主题名称。
User.java
package com.panda.kafka.entity;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDate;
@Data
@Builder
public class User {
private Long userId;
private String userName;
private Integer age;
private LocalDate birthday;
用户信息实体类,用于测试自定义序列化器和反序列化器。
ConsumerListener.java
package com.panda.kafka.listener;
import com.panda.kafka.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerListener {
@KafkaListener(topics = {"topic-test-002"}, groupId = "consumer-test-01")
public void kafkaListener(User user) {
log.info("消费者接收到消息:{}", user);
消费者代码。
通过KafkaListener注解标注消费者方法。
topics 属性指定要监听的主题,groupId 配置该消费者所属的群组。
UserSerializer.java
package com.panda.kafka.serialization;
import com.alibaba.fastjson2.JSON;
import com.panda.kafka.entity.User;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 不做任何配置
@Override
public byte[] serialize(String topic, User data) {
if (Objects.isNull(data)) {
return null;
String userStr = JSON.toJSONString(data);
return userStr.getBytes(StandardCharsets.UTF_8);
@Override
public void close() {
// 不需要关闭任何东西
自定义序列化器,用于将User对象序列化成字节数组。
UserSerializer.java
package com.panda.kafka.serialization;
import com.alibaba.fastjson2.JSON;
import com.panda.kafka.entity.User;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 不做任何配置
@Override
public User deserialize(String topic, byte[] data) {
if (Objects.isNull(data)) {
return null;
String userStr = new String(data, StandardCharsets.UTF_8);
return JSON.parseObject(userStr, User.class);
@Override
public void close() {
// 不需要关闭任何东西
自定义反序列化器,用于将字节数组反序列化成User对象。
KafkaProducerSendUtil.java
package com.panda.kafka.util;
import com.panda.kafka.config.KafkaProducerUtil;
import com.panda.kafka.entity.User;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class KafkaProducerSendUtil {
@Resource
private KafkaProducerUtil kafkaProducerUtil;
public RecordMetadata sendSync(String topic, String message) {
try {
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);
KafkaProducer<String, Object> producer = kafkaProducerUtil.createKafkaProducer();
return producer.send(producerRecord).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error("发送同步消息失败", e);
throw new KafkaException("发送同步消息失败", e);
public void sendAsync(String topic, String message) {
try {
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);
KafkaProducer<String, Object> producer = kafkaProducerUtil.createKafkaProducer();
producer.send(producerRecord, (metadata, exception) -> {
if (Objects.nonNull(exception)) {
log.error("发送异步消息出现异常: {}", exception.getMessage(), exception);
} else {
log.info("发送异步消息成功,元数据信息:{}", metadata.toString());
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error("发送异步消息失败", e);
throw new KafkaException("发送异步消息失败", e);
public void sendUserAsync(String topic, User user) {
try {
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, user);
KafkaProducer<String, Object> producer = kafkaProducerUtil.createKafkaProducer();
producer.send(producerRecord, (metadata, exception) -> {
if (Objects.nonNull(exception)) {
log.error("发送异步消息出现异常: {}", exception.getMessage(), exception);
} else {
log.info("发送异步消息成功,元数据信息:{}", metadata.toString());
} catch (Exception e) {
Thread.currentThread().interrupt();
log.error("发送异步消息失败", e);
throw new KafkaException("发送异步消息失败", e);
生产者发送消息的工具类。
sendUserAsync方法用于异步发送User对象消息。
TestController.java
package com.panda.kafka.controller;
import com.panda.kafka.constants.KafkaConstants;
import com.panda.kafka.entity.User;
import com.panda.kafka.util.KafkaProducerSendUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
@Resource
private KafkaProducerSendUtil kafkaProducerSendUtil;
@RequestMapping("sendSync")
public String sendSync(String message) {
RecordMetadata recordMetadata = kafkaProducerSendUtil.sendSync(KafkaConstants.KafkaTopic.TOPIC_TEST_001, message);
log.info("同步发送消息返回结果:{}", recordMetadata.toString());
return recordMetadata.topic();
@RequestMapping("sendAsync")
public void sendAsync(String message) {
kafkaProducerSendUtil.sendAsync(KafkaConstants.KafkaTopic.TOPIC_TEST_001, message);
@PostMapping("sendUserAsync")
public void sendUserAsync(@RequestBody User user) {
kafkaProducerSendUtil.sendUserAsync(KafkaConstants.KafkaTopic.TOPIC_TEST_002, user);
测试Controller类。
sendUserAsync方法用于发送User对象的消息。
测试
如上所示,调用/test/sendUserAsync方法,参数如下:
{
"userId": 1,
"userName": "userName_6ut0c",
"age": 1,