flink报错ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
代码如下:
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host = params.get("host");
int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
produceTestdata2kafka(new StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
private static void produceTestdata2kafka(String kafkaAddr) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DataStreamSource<String> text = env.addSource(new CustomsourceFuncation()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
FlinkKafkaProducer producer = new FlinkKafkaProducer("flinktest",//topic
new SimpleStringSchema(), //消息序列化
properties
//写入 Kafka 时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute("[kafkaSink with custom source]");
class CustomsourceFuncation implements SourceFunction<String> {
//private long count = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){
//图书的排行榜
List<String> books = new ArrayList<>();
books.add("msg1");
books.add("msg2");
books.add("msg3");
books.add("msg4");
books.add("msg5");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
2021-01-22 07:54:31,929 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
... 23 more
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:33,973 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据
跟flink的类加载方式有关,即flink-conf.yml中的classloader.resolve-order
参数,要将默认的
child-first
改成parent-first
,修改后就ok了
方式一的方法是治标不治本,其实报这个错的原因,是个依赖的问题。
就是因为flink-connector-kafka.jar
依赖冲突了。
如果集群flink/lib
下已经有了flink-connector-kafka.jar
,那就要自己任务中的pom里面就要将kafka的connector依赖provider一下。
flink tableEnv.toAppendStream报错 org.apache.flink.api.common.typeinfo.TypeInformation does not match
flink中将Table转为DataStream时报如下异常: 异常 Arity [6] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@7e345bac] does not match the number[1] ...
kafka 异常:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host
文章目录 1. 问题出现的场景 2. UnknownTopicOrPartitionException 3. 问题原因分析 3.1 既然会自动创建 topic,为什么还会报UnknownTopicOrPartitionException? 4. UnknownTopicOrPartitionExce
报错:org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for mySecondTopic-2:
大家好,我是邵奈一,一个不务正业的程序猿、正儿八经的斜杠青年。1、世人称我为:被代码耽误的诗人、没天赋的书法家、五音不全的歌手、专业跑龙套演员、不合格的运动员…
Flink系列:org.apache.flink.api.common.InvalidProgramException: This type XXX cannot be used as key
目录背景原因背景使用Flink keyBy函数时,参数为POJO对象报错org.apache.flink.api.common.InvalidProgramException: This type XXX cannot be used as key原因POJO需要满足:class必须是public的 必须有public无参构造函数...
kafka基础:解决org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxx topic
https://my.oschina.net/kyle1970/blog/2396318/print kafka 0.9.x以后的版本,有一个配置属性叫advertised.listeners,在server.properties中,该属性默认是注释掉的,解释如下: #Hostname and po