添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

flink报错ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

代码如下:

public class Sink_KafkaSink_1{
    public static void main(String[] args) throws Exception {
        final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
        String host = params.get("host");
        int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
        produceTestdata2kafka(new StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
    private static void produceTestdata2kafka(String kafkaAddr) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> text = env.addSource(new CustomsourceFuncation()).setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",kafkaAddr);
        FlinkKafkaProducer producer = new FlinkKafkaProducer("flinktest",//topic
                new SimpleStringSchema(), //消息序列化
                properties
        //写入 Kafka 时附加记录的事件时间戳
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("[kafkaSink with custom source]");
class CustomsourceFuncation implements SourceFunction<String> {
    //private long count = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("msg1");
            books.add("msg2");
            books.add("msg3");
            books.add("msg4");
            books.add("msg5");
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            //每2秒产生一条数据
            Thread.sleep(2000);
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:

2021-01-22 07:54:31,929 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
        at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
        at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
        ... 23 more
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:33,973 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.

flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据

跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order参数,要将默认的
child-first改成parent-first,修改后就ok了

方式一的方法是治标不治本,其实报这个错的原因,是个依赖的问题。
就是因为flink-connector-kafka.jar依赖冲突了。

如果集群flink/lib下已经有了flink-connector-kafka.jar,那就要自己任务中的pom里面就要将kafka的connector依赖provider一下。

flink tableEnv.toAppendStream报错 org.apache.flink.api.common.typeinfo.TypeInformation does not match

flink中将Table转为DataStream时报如下异常: 异常 Arity [6] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@7e345bac] does not match the number[1] ...

kafka 异常:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host

文章目录 1. 问题出现的场景 2. UnknownTopicOrPartitionException 3. 问题原因分析 3.1 既然会自动创建 topic,为什么还会报UnknownTopicOrPartitionException? 4. UnknownTopicOrPartitionExce

报错:org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for mySecondTopic-2:

大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员…

Flink系列:org.apache.flink.api.common.InvalidProgramException: This type XXX cannot be used as key

目录背景原因背景使用Flink keyBy函数时,参数为POJO对象报错org.apache.flink.api.common.InvalidProgramException: This type XXX cannot be used as key原因POJO需要满足:class必须是public的 必须有public无参构造函数...

kafka基础:解决org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxx topic

https://my.oschina.net/kyle1970/blog/2396318/print kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下: #Hostname and po