springboot集成kafka消费者序列化问题报错*is not an instance of org.apache.kafka.common.serialization.Deserializer
最新推荐文章于 2023-07-27 17:07:57 发布
最新推荐文章于 2023-07-27 17:07:57 发布
前言:之前博客里面提到本公司为物联网项目。项目中使用mqtt+kafka进行与设备端的通讯,之前的协议格式为json格式,现在改成字节数组byte[]格式进行通信。
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
-
分析原因:
之前json格式传输时候是用的是String类型的,反序列化的时候指定的是StringDeserializer.class,现在传输为byte[]所以需要改成ByteArrayDeserializer.class
-
解决办法:
之前json格式传输时候是用的是String类型的,所以我们现在需要改成byte[]反序列化数据类型。
kafka实现序列化/反序列化可以简单的总结为两步,第一步继承序列化Serializer或者反序列化Deserializer接口。第二步实现接口方法,将指定类型序列化成byte[]或者将byte[]反序列化成指定数据类型。
public Map<String, Object> consumerByteConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>(8);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//此处是上面提到的设置value值的类型
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
return propsMap;
其中ByteArrayDeserializer implements Deserializer<byte[]>,ByteArrayDeserializer 已经是实现了Deserializer。接下来我们只需要将kafka配置中的"value.serializer"值的类型为 ByteArrayDeserializer.class。
kafka在发送和接受消息的时候,都是以byte[]字节型数组发送或者接受的。但是我们平常使用的时候,不但可以使用byte[],还可以使用int、short、long、float、double、String等数据类型,这是因为在我们使用这些数据类型的时候,kafka根据我们指定的序列化和反序列化方式转成byte[]类型之后再进行发送或者接受的。
通常我们在使用kakfa发送或者接受消息的时候都需要指定消息的key和value序列化方式,如设置value.serializer为org.apache.kafka.common.serialization.StringSerializer,设置value的序列化方式为字符串,即我们可以发送string类型的消息。目前kafka原生支持的序列化和反序列化方式如下两表所示:
序列化方式 | 对应java数据类型 |
---|
org.apache.kafka.common.serialization.ByteArraySerializer | byte[] |
org.apache.kafka.common.serialization.ByteBufferSerializer | ByteBuffer |
org.apache.kafka.common.serialization.IntegerSerializer | Interger |
org.apache.kafka.common.serialization.ShortSerializer | Short |
org.apache.kafka.common.serialization.LongSerializer | Long |
org.apache.kafka.common.serialization.DoubleSerializer | Double |
序列化方式 | 对应java数据类型 |
---|
org.apache.kafka.common.serialization.ByteArrayDeserializer | byte[] |
org.apache.kafka.common.serialization.ByteBufferDeserializer | ByteBuffer |
org.apache.kafka.common.serialization.IntegerDeserializer | Interger |
org.apache.kafka.common.serialization.ShortDeserializer | Short |
org.apache.kafka.common.serialization.LongDeserializer | Long |
org.apache.kafka.common.serialization.DoubleDeserializer | Double |
class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.运行kafka程序报错
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.S...
MEAP Edition, Manning Early Access Program, Kafka Streams in Action, Version 4
PART 1: GETTING STARTED WITH KAFKA STREAMS
PART 2: KAFKA STREAMS DEVELOPMENT
PART 3: ADMINISTERING KAFKA STREAMS
PART 4: ADVANCED CONCEPTS WITH KAFKA STREAMS
spring集成kafka运行时报错:Failed to construct kafka producer] with root cause
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Serializer
反正我这样写报了异常出来,org.apache.kafka.common.serialization.StringDeserializer is not an instance
下面的两行代码我运行会报错
properties.setProperty("key.serializer", StringDeserializer.class.getName());
properties.setProp...
class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.
查了好多资料,说什么类加载机制什么的,但改了依然无效,经查阅Spark官方文档和kafka官方的文档后,得以解决,不得不说,官方文档还是厉害环境,pom文件如下:
IntelliJ IDEA 2022.1.2 (Ultimate Edition)
kafka 官方文档
SparkStreaming 官方文档
报错写法(后来也是可以用的)......
今天springboot 项目配置kafka消费者,出现了一个错误
com.fasterxml.jackson.databind.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.ja