Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
* kafka-stream - pipe,
public class Pipe {
// topic names,
public static final String TOPIC_INPUT = "streams-plaintext-input";
public static final String TOPIC_OUTPUT = "streams-pipe-output";
public static void pipe() {
// set up properties,
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id,
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server,
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / deserialization class, for key,
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / deserialization class, for value,
// create stream - source,
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> sourceStream = builder.stream(TOPIC_INPUT); // create stream, with specified input topic,
sourceStream.to(TOPIC_OUTPUT); // set output topic of stream,
// print stream info,
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler,
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() { // trigger by ctrl+c,
streams.close(); // close stream,
latch.countDown(); // trigger latch, so that jvm will terminate,
try {
streams.start(); // start kafka,
latch.await(); // keep jvm running,
} catch (Throwable e) {
System.exit(1);
System.exit(0);
public static void main(String[] args) {
pipe();
Here are the steps that I start zookeeper & kafka & the program:
* create a maven project, if not yet,
* add dependency - via maven,
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
* add exec maven plugin,
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
</plugin>
* [stream program - pipe]
* create a class Pipe.java,
* create a method test(),
* call test() within main(),
* cd $KAFKA_HOME
* [start server]
* start zookeeper,
command:
bin/zookeeper-server-start.sh config/zookeeper.properties
* start kafka server,
command:
bin/kafka-server-start.sh config/server.properties
* [create topic]
* create topic,
command:
# topic - input,
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
# topic - output,
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-pipe-output --config cleanup.policy=compact
* list topic,
command:
bin/kafka-topics.sh --list --zookeeper localhost:2181
* [start streaming application]
* start Pipe program,
command:
mvn exec:java -Dexec.mainClass=eric.kafka.stream.Pipe
* [start producer & consumer]
* start a producer, to create input,
command:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
* start a consumer, to read output,
command:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-pipe-output --from-beginning
* [input a line]
* in producer console, TODO ... get error here ...
input line:
Hello, how are you?
* in consumer console,
will see output same as input,
Error output
After starting the Pipe program, I connect it via producer, and input a line, then get following error:
[2018-03-06 04:31:23.281] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 351]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] partition assignment took 24 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []
[2018-03-06 04:31:23.349] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[2018-03-06 04:31:23.350] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.info() - 346]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]State transition from REBALANCING to RUNNING
[2018-03-06 04:31:50.063] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 3 on topic-partition streams-pipe-output-0, retrying (9 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.165] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 4 on topic-partition streams-pipe-output-0, retrying (8 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.267] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 5 on topic-partition streams-pipe-output-0, retrying (7 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.369] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 6 on topic-partition streams-pipe-output-0, retrying (6 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.471] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 7 on topic-partition streams-pipe-output-0, retrying (5 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.573] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 8 on topic-partition streams-pipe-output-0, retrying (4 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.675] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 9 on topic-partition streams-pipe-output-0, retrying (3 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.777] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 10 on topic-partition streams-pipe-output-0, retrying (2 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.879] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 11 on topic-partition streams-pipe-output-0, retrying (1 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.981] WARN [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 12 on topic-partition streams-pipe-output-0, retrying (0 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:51.085] ERROR [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.streams.processor.internals.RecordCollectorImpl.error() - 301]: task [0_0] Error sending record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to {}; No more records will be sent and no more offsets will be recorded for this task.
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
[2018-03-06 04:31:53.265] ERROR [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.AssignedTasks.error() - 301]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Failed to commit stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt..
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
[2018-03-06 04:31:53.266] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[2018-03-06 04:31:53.266] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 336]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Shutting down
[2018-03-06 04:31:53.268] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.clients.producer.KafkaProducer.info() - 341]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2018-03-06 04:31:53.273] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[2018-03-06 04:31:53.273] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.info() - 346]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]State transition from RUNNING to ERROR
[2018-03-06 04:31:53.273] WARN [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.warn() - 236]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]All stream threads have died. The instance will be in error state and should be closed.
[2018-03-06 04:31:53.273] INFO [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 336]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Shutdown complete
[WARNING]
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt..
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
Above is the console output, I also checked $KAFKA_HOME/logs/
didn't found any error lines.
Software versions
OS is Linux mint mate 18 (64 bit)
.
Local Scala version is scala-2.12.2
.
Local Kafka version is kafka_2.12-1.0.0
.
Maven dependencies are:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
I have run the demo
from following line on my local, and it works well:
http://kafka.apache.org/10/documentation/streams/quickstart
So, seems the setup is ok.
Is it due to the versions of maven dependeny ? Since I saw maven dependency kafka-streams 1.0.0
actually in turn has a dependency for kafka_2.11 1.0.0
, but not 2.12
.
Any help?
–
–
–
–
The problem in this case is following the quickstart tutorial a little too closely. It has you create the output stream like this:
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
If you simply change streams-wordcount-output
to streams-pipe-output
, the example will break because of cleanup.policy=compact
. The compact setting enables log compaction, which requires a key and a value. The simple pipe demo only has a value, so errors like yours show up.
Create the topic like this and it will work:
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-pipe-output
You'll also need to run the consumer like this (changing Long
to String
):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-pipe-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
This will also help you see the null
key.
–
–
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.