添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams import java.util.concurrent.CountDownLatch; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; * kafka-stream - pipe, public class Pipe { // topic names, public static final String TOPIC_INPUT = "streams-plaintext-input"; public static final String TOPIC_OUTPUT = "streams-pipe-output"; public static void pipe() { // set up properties, Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id, props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server, props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / deserialization class, for key, props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / deserialization class, for value, // create stream - source, final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> sourceStream = builder.stream(TOPIC_INPUT); // create stream, with specified input topic, sourceStream.to(TOPIC_OUTPUT); // set output topic of stream, // print stream info, final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler, Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { // trigger by ctrl+c, streams.close(); // close stream, latch.countDown(); // trigger latch, so that jvm will terminate, try { streams.start(); // start kafka, latch.await(); // keep jvm running, } catch (Throwable e) { System.exit(1); System.exit(0); public static void main(String[] args) { pipe();

Here are the steps that I start zookeeper & kafka & the program:

* create a maven project, if not yet,
* add dependency - via maven,
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>
* add exec maven plugin,
          <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
          </plugin>
* [stream program - pipe]
* create a class Pipe.java,
    * create a method test(),
    * call test() within main(),
* cd $KAFKA_HOME
* [start server]
* start zookeeper,
    command:
        bin/zookeeper-server-start.sh config/zookeeper.properties
* start kafka server,
    command:
        bin/kafka-server-start.sh config/server.properties
* [create topic]
* create topic,
    command:
        # topic - input,
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
        # topic - output,
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-pipe-output --config cleanup.policy=compact
* list topic,
    command:
        bin/kafka-topics.sh --list --zookeeper localhost:2181
* [start streaming application]
* start Pipe program,
    command:
        mvn exec:java -Dexec.mainClass=eric.kafka.stream.Pipe
* [start producer & consumer]
* start a producer, to create input,
    command:
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
* start a consumer, to read output,
    command:
        bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-pipe-output --from-beginning
* [input a line]
* in producer console,                      TODO ... get error here ...
    input line:
        Hello, how are you?
* in consumer console,
    will see output same as input,

Error output

After starting the Pipe program, I connect it via producer, and input a line, then get following error:

[2018-03-06 04:31:23.281] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 351]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] partition assignment took 24 ms.
    current active tasks: [0_0]
    current standby tasks: []
    previous active tasks: []
[2018-03-06 04:31:23.349] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[2018-03-06 04:31:23.350] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.info() - 346]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]State transition from REBALANCING to RUNNING
[2018-03-06 04:31:50.063] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 3 on topic-partition streams-pipe-output-0, retrying (9 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.165] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 4 on topic-partition streams-pipe-output-0, retrying (8 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.267] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 5 on topic-partition streams-pipe-output-0, retrying (7 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.369] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 6 on topic-partition streams-pipe-output-0, retrying (6 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.471] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 7 on topic-partition streams-pipe-output-0, retrying (5 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.573] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 8 on topic-partition streams-pipe-output-0, retrying (4 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.675] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 9 on topic-partition streams-pipe-output-0, retrying (3 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.777] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 10 on topic-partition streams-pipe-output-0, retrying (2 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.879] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 11 on topic-partition streams-pipe-output-0, retrying (1 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:50.981] WARN  [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.clients.producer.internals.Sender.warn() - 251]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Got error produce response with correlation id 12 on topic-partition streams-pipe-output-0, retrying (0 attempts left). Error: CORRUPT_MESSAGE
[2018-03-06 04:31:51.085] ERROR [kafka-producer-network-thread | streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] [org.apache.kafka.streams.processor.internals.RecordCollectorImpl.error() - 301]: task [0_0] Error sending record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to {}; No more records will be sent and no more offsets will be recorded for this task.
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
[2018-03-06 04:31:53.265] ERROR [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.AssignedTasks.error() - 301]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Failed to commit stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt..
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
[2018-03-06 04:31:53.266] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[2018-03-06 04:31:53.266] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 336]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Shutting down
[2018-03-06 04:31:53.268] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.clients.producer.KafkaProducer.info() - 341]: [Producer clientId=streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2018-03-06 04:31:53.273] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 346]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[2018-03-06 04:31:53.273] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.info() - 346]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]State transition from RUNNING to ERROR
[2018-03-06 04:31:53.273] WARN  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.KafkaStreams.warn() - 236]: stream-client [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3]All stream threads have died. The instance will be in error state and should be closed.
[2018-03-06 04:31:53.273] INFO  [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.info() - 336]: stream-thread [streams-pipe-455df74a-b0ca-4612-8df0-c582a6f779b3-StreamThread-1] Shutdown complete
[WARNING] 
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value hello timestamp 1520281908878) to topic streams-pipe-output due to org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt..
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.

Above is the console output, I also checked $KAFKA_HOME/logs/ didn't found any error lines.

Software versions

OS is Linux mint mate 18 (64 bit).

Local Scala version is scala-2.12.2.

Local Kafka version is kafka_2.12-1.0.0.

Maven dependencies are:

<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.12.2</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>1.0.0</version>
</dependency>

I have run the demo from following line on my local, and it works well:
http://kafka.apache.org/10/documentation/streams/quickstart

So, seems the setup is ok.

Is it due to the versions of maven dependeny ? Since I saw maven dependency kafka-streams 1.0.0 actually in turn has a dependency for kafka_2.11 1.0.0, but not 2.12.

Any help?

Is this reproducible? The error indicates, that the broker does not accept the write -- it doesn't seem to be client side issue. Does the log revile more information? – Matthias J. Sax Mar 5, 2018 at 0:55 Unclear without the logs. I would try to investigate there with DEBUG level enabled. If you can't find anything by yourself, ask for help at user mailing list: kafka.apache.org/contact – Matthias J. Sax Mar 5, 2018 at 17:34 @MatthiasJ.Sax Question update, with all the error output that I could found, and more information about the test, not sure is that useful. – Eric Mar 5, 2018 at 20:55 I have no idea atm why the message might get corrupted on write. Kafka Streams code is purely Java based, thus Scala version should not matter. Does the broker log anything? – Matthias J. Sax Mar 6, 2018 at 0:57

The problem in this case is following the quickstart tutorial a little too closely. It has you create the output stream like this:

bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact

If you simply change streams-wordcount-output to streams-pipe-output, the example will break because of cleanup.policy=compact. The compact setting enables log compaction, which requires a key and a value. The simple pipe demo only has a value, so errors like yours show up.

Create the topic like this and it will work:

bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-pipe-output

You'll also need to run the consumer like this (changing Long to String):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-pipe-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

This will also help you see the null key.

Interesting. Side note: in the word-count example, enabling compaction is actually correct as the result is a KTable (note, that for KTables, it's insured that there are no null-keys) – Matthias J. Sax May 4, 2018 at 17:23 Great answer! There really needs to be a more informative error from the broker for this. I churned on it for a while. – Paul Whalen Jul 16, 2018 at 16:09

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.