kafka消费者报错:Class org.apache.kafka.common.serialization.StringDeserializer could not be found.
2019-04-03 09:27:44
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.StringDeserializer could not be found.
是因为classloader不是同一个所以加载不到
在之前加上
Thread.currentThread().setContextClassLoader(null);
就可以了,看下面
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
kafka client use
Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader())
to get the Class object, and the create the instance, the key point is the classLoader, which is specified by the last param, the implementation of method
Utils.getContextOrKafkaClassLoader()
is
public static ClassLoader getContextOrKafkaClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null)
return getKafkaClassLoader();
return cl;
so, by default, the Class object of org.apache.kafka.common.serialization.StringSerializer
is load by the applicationClassLoader, if your target class is not loaded by the applicationClassLoader, this problem will happend !
to solve the problem, simply set the ContextClassLoader of current thread to null before new KafkaProducer instance like this
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
在使用 canal 和kafka处理数据同步时canal日志提示如下异常:
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept
原因是消息过大无法处理
第一步:修改 canal/conf/canal.properties配置
kafka报错:java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
反正我这样写报了异常出来,org.apache.kafka.common.serialization.StringDeserializer is not an instance
下面的两行代码我运行会报错
properties.setProperty("key.serializer", StringDeserializer.class.getName());
properties.setProp...
class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.运行kafka程序报错
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.record.CompressionType$SnappyConstructors
at org.apache.kafka.common.record.CompressionType$3.wrapForOutput(Com
加上Class.forName便能解决,这好像和kafka类加载机制有关
//设置key反序列化器
properties.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
//设置值反序列化器
properties.put("value.deserializer", Class.forName("
重要的生产者参数
在 KafkaProducer 中,除了第3节提及的3个默认的客户端参数,大部分的参数都有合理的默认值,一般不需要修改它们。不过了解这些参数可以让我们更合理地使用生产者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。下面挑选一些重要的参数进行讲解。
1. acks
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客...
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
pr...
以下是显示如何使用org.apache.kafka.common.serialization.StringSerializer的投票最高的示例。 这些示例摘自开源项目。
请自行查找KafkaProducer或者properties的代码位置书写
Example 1
public void run(Configuration configuration, Environment environmen...
查了好多资料,说什么类加载机制什么的,但改了依然无效,经查阅Spark官方文档和kafka官方的文档后,得以解决,不得不说,官方文档还是厉害环境,pom文件如下:
IntelliJ IDEA 2022.1.2 (Ultimate Edition)
kafka 官方文档
SparkStreaming 官方文档
报错写法(后来也是可以用的)......
KafkaProducer API
让我们了解本节中最重要的一组Kafka生产者API。KafkaProducer API的中心部分是KafkaProducer类。KafkaProducer类提供了一个选项,用于将其构造函数中的K...