Starting with version 2.3, all the JSON-aware components are configured by default with a JacksonUtils.enhancedObjectMapper()
instance, which comes with the MapperFeature.DEFAULT_VIEW_INCLUSION
and DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
features disabled.
Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support.
See JacksonUtils.enhancedObjectMapper()
JavaDocs for more information.
This method also registers a org.springframework.kafka.support.JacksonMimeTypeModule
for org.springframework.util.MimeType
objects serialization into the plain string for inter-platform compatibility over the network.
A JacksonMimeTypeModule
can be registered as a bean in the application context and it will be auto-configured into the Spring Boot ObjectMapper
instance.
Also starting with version 2.3, the JsonDeserializer
provides TypeReference
-based constructors for better handling of target generic container types.
Starting with version 2.1, you can convey type information in record Headers
, allowing the handling of multiple types.
In addition, you can configure the serializer and deserializer by using the following Kafka properties.
They have no effect if you have provided Serializer
and Deserializer
instances for KafkaConsumer
and KafkaProducer
, respectively.
JsonSerializer.ADD_TYPE_INFO_HEADERS
(default true
): You can set it to false
to disable this feature on the JsonSerializer
(sets the addTypeInfo
property).
JsonSerializer.TYPE_MAPPINGS
(default empty
): See Mapping Types.
JsonDeserializer.USE_TYPE_INFO_HEADERS
(default true
): You can set it to false
to ignore headers set by the serializer.
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(default true
): You can set it to false
to retain headers set by the serializer.
JsonDeserializer.KEY_DEFAULT_TYPE
: Fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES
(default java.util
, java.lang
): Comma-delimited list of package patterns allowed for deserialization.
*
means deserializing all.
JsonDeserializer.TYPE_MAPPINGS
(default empty
): See Mapping Types.
JsonDeserializer.KEY_TYPE_METHOD
(default empty
): See Using Methods to Determine Types.
JsonDeserializer.VALUE_TYPE_METHOD
(default empty
): See Using Methods to Determine Types.
Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
You can revert to the previous behavior by setting the removeTypeHeaders
property to false
, either directly on the deserializer or with the configuration property described earlier.
See also Customizing the JsonSerializer and JsonDeserializer.
Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*()
methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.
Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list.
Previously, you had to customize the type mapper within the serializer and deserializer.
Mappings consist of a comma-delimited list of token:className
pairs.
On outbound, the payload’s class name is mapped to the corresponding token.
On inbound, the token in the type header is mapped to the corresponding class name.
The following example creates a set of mappings:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
If you use Spring Boot, you can provide these properties in the application.properties
(or yaml) file.
The following example shows how to do so:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
You can perform only simple configuration with properties.
For more advanced configuration (such as using a custom ObjectMapper
in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer.
The following Spring Boot example overrides the default factories:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
Map<String, Object> properties = new HashMap<>();
// properties.put(..., ...)
// ...
return new DefaultKafkaConsumerFactory<>(properties,
new StringDeserializer(), customValueDeserializer);
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
new StringSerializer(), customValueSerializer);
When using Spring Boot and overriding the ConsumerFactory
and ProducerFactory
as shown above, wild card generic types need to be used with the bean method return type.
If concrete generic types are provided instead, then Spring Boot will ignore these beans and still use the default ones.
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent
argument (which is true
by default).
The following example shows how to do so:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type.
If present, this will override any of the other techniques discussed above.
This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers.
Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period .
.
The method must be declared as public static
, have one of three signatures (String topic, byte[] data, Headers headers)
, (byte[] data, Headers headers)
or (byte[] data)
and return a Jackson JavaType
.
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
else {
return thing2Type;
For more sophisticated data inspection consider using JsonPath
or similar but, the simpler the test to determine the type, the more efficient the process will be.
The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
To provide type mapping programmatically, similar to Using Methods to Determine Types, use the typeFunction
property.
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
Alternatively, as long as you don’t use the fluent API to configure properties, or set them using set*()
methods, the factories will configure the serializer/deserializer using the configuration properties; see Configuration Properties.
Version 2.3 introduced the DelegatingSerializer
and DelegatingDeserializer
, which allow producing and consuming records with different key and/or value types.
Producers must set a header DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
to a selector value that is used to select which serializer to use for the value and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
for the key; if a match is not found, an IllegalStateException
is thrown.
For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw byte[]
is returned.
You can configure the map of selector to Serializer
/ Deserializer
via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
.
For the serializer, the producer property can be a Map<String, Object>
where the key is the selector and the value is a Serializer
instance, a serializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
For the deserializer, the consumer property can be a Map<String, Object>
where the key is the selector and the value is a Deserializer
instance, a deserializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
To configure using properties, use the following syntax:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
Producers would then set the DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
header to thing1
or thing2
.
This technique supports sending different types to the same topic (or different topics).
Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by Serdes
(Long
, Integer
, etc).
Instead, the serializer will set the header to the class name of the type.
It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered Map
, such as a LinkedHashMap
should be provided.
Starting with version 2.8, the DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
allow selection of a serializer/deserializer based on the topic name.
Regex Pattern
s are used to lookup the instance to use.
The map can be configured using a constructor, or via properties (a comma delimited list of pattern:serializer
).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
Use KEY_SERIALIZATION_TOPIC_CONFIG
when using this for keys.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
You can specify a default serializer/deserializer to use when there is no pattern match using DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
and DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
.
An additional property DelegatingByTopicSerialization.CASE_SENSITIVE
(default true
), when set to false
makes the topic lookup case insensitive.
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
Starting with version 3.1.2
, a RecoveryCallback
can be set on the RetryingDeserializer
optionally.
Refer to the spring-retry project for configuration of the RetryTemplate
with a retry policy, back off policy, etc.
Although the Serializer
and Deserializer
API is quite simple and flexible from the low-level Kafka Consumer
and Producer
perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener
or Spring Integration’s Apache Kafka Support.
To let you easily convert to and from org.springframework.messaging.Message
, Spring for Apache Kafka provides a MessageConverter
abstraction with the MessagingMessageConverter
implementation and its JsonMessageConverter
(and subclasses) customization.
You can inject the MessageConverter
into a KafkaTemplate
instance directly and by using AbstractKafkaListenerContainerFactory
bean definition for the @KafkaListener.containerFactory()
property.
The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
When using Spring Boot, simply define the converter as a @Bean
and Spring Boot auto configuration will wire it into the auto-configured template and container factory.
When you use a @KafkaListener
, the parameter type is provided to the message converter to assist with the conversion.
This type inference can be achieved only when the @KafkaListener
annotation is declared at the method level.
With a class-level @KafkaListener
, the payload type is used to select which @KafkaHandler
method to invoke, so it must already have been converted before the method can be chosen.
On the consumer side, you can configure a JsonMessageConverter
; it can handle ConsumerRecord
values of type byte[]
, Bytes
and String
so should be used in conjunction with a ByteArrayDeserializer
, BytesDeserializer
or StringDeserializer
.
(byte[]
and Bytes
are more efficient because they avoid an unnecessary byte[]
to String
conversion).
You can also configure the specific subclass of JsonMessageConverter
corresponding to the deserializer, if you so wish.
On the producer side, when you use Spring Integration or the KafkaTemplate.send(Message<?> message)
method (see Using KafkaTemplate
), you must configure a message converter that is compatible with the configured Kafka Serializer
.
Again, using byte[]
or Bytes
is more efficient because they avoid a String
to byte[]
conversion.
For convenience, starting with version 2.3, the framework also provides a StringOrBytesSerializer
which can serialize all three value types so it can be used with any of the message converters.
The KafkaMessageConverter.fromMessage()
method is called for outbound conversion to a ProducerRecord
with the message payload in the ProducerRecord.value()
property.
The KafkaMessageConverter.toMessage()
method is called for inbound conversion from ConsumerRecord
with the payload being the ConsumerRecord.value()
property.
The SmartMessageConverter.toMessage()
method is called to create a new outbound Message<?>
from the Message
passed to fromMessage()
(usually by KafkaTemplate.send(Message<?> msg)
).
Similarly, in the KafkaMessageConverter.toMessage()
method, after the converter has created a new Message<?>
from the ConsumerRecord
, the SmartMessageConverter.fromMessage()
method is called and then the final inbound message is created with the newly converted payload.
In either case, if the SmartMessageConverter
returns null
, the original message is used.
When the default converter is used in the KafkaTemplate
and listener container factory, you configure the SmartMessageConverter
by calling setMessagingConverter()
on the template and via the contentTypeConverter
property on @KafkaListener
methods.
Examples:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type.
This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document.
For example the following interface can be defined as message payload type:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
Accessor methods will be used to lookup the property name as field in the received JSON document by default.
The @JsonPath
expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.
To enable this feature, use a ProjectingMessageConverter
configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces).
You must also add spring-data:spring-data-commons
and com.jayway.jsonpath:json-path
to the classpath.
When used as the parameter to a @KafkaListener
method, the interface type is automatically passed to the converter as normal.
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll()
returns.
To solve this problem, the ErrorHandlingDeserializer
has been introduced.
This deserializer delegates to a real deserializer (key or value).
If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer
returns a null
value and a DeserializationException
in a header that contains the cause and the raw bytes.
When you use a record-level MessageListener
, if the ConsumerRecord
contains a DeserializationException
header for either the key or value, the container’s ErrorHandler
is called with the failed ConsumerRecord
.
The record is not passed to the listener.
Alternatively, you can configure the ErrorHandlingDeserializer
to create a custom value by providing a failedDeserializationFunction
, which is a Function<FailedDeserializationInfo, T>
.
This function is invoked to create an instance of T
, which is passed to the listener in the usual fashion.
An object of type FailedDeserializationInfo
, which contains all the contextual information is provided to the function.
You can find the DeserializationException
(as a serialized Java object) in headers.
See the Javadoc for the ErrorHandlingDeserializer
for more information.
You can use the DefaultKafkaConsumerFactory
constructor that takes key and value Deserializer
objects and wire in appropriate ErrorHandlingDeserializer
instances that you have configured with the proper delegates.
Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer
) to instantiate the delegates.
The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
.
The property value can be a class or class name.
The following example shows how to set these properties:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
The following example uses a failedDeserializationFunction
.
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
The preceding example uses the following configuration:
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
If the consumer is configured with an ErrorHandlingDeserializer
, it is important to configure the KafkaTemplate
and its producer with a serializer that can handle normal objects as well as raw byte[]
values, which result from deserialization exceptions.
The generic value type of the template should be Object
.
One technique is to use the DelegatingByTypeSerializer
; an example follows:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
When using an ErrorHandlingDeserializer
with a batch listener, you must check for the deserialization exceptions in message headers.
When used with a DefaultBatchErrorHandler
, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException
.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
process(thing);
SerializationUtils.byteArrayToDeserializationException()
can be used to convert the header to a DeserializationException
.
When consuming List<ConsumerRecord<?, ?>
, SerializationUtils.getExceptionFromHeader()
is used instead:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
process(rec.value());
If you are also using a DeadLetterPublishingRecoverer
, the record published for a DeserializationException
will have a record.value()
of type byte[]
; this should not be serialized.
Consider using a DelegatingByTypeSerializer
configured to use a ByteArraySerializer
for byte[]
and the normal serializer (Json, Avro, etc) for all other types.
Starting with version 3.1, you can add a Validator
to the ErrorHandlingDeserializer
.
If the delegate Deserializer
successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
When creating the deserializer yourself, simply call setValidator
; if you configure the serializer using properties, set the consumer configuration property ErrorHandlingDeserializer.VALIDATOR_CLASS
to the class or fully qualified class name for your Validator
.
When using Spring Boot, this property name is spring.kafka.consumer.properties.spring.deserializer.validator.class
.
You can also use a JsonMessageConverter
within a BatchMessagingMessageConverter
to convert batch messages when you use a batch listener container factory.
See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.
By default, the type for the conversion is inferred from the listener argument.
If you configure the JsonMessageConverter
with a DefaultJackson2TypeMapper
that has its TypePrecedence
set to TYPE_ID
(instead of the default INFERRED
), the converter uses the type information in headers (if present) instead.
This allows, for example, listener methods to be declared with interfaces instead of concrete classes.
Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible).
This is also useful when you use class-level @KafkaListener
instances where the payload must have already been converted to determine which method to invoke.
The following example creates beans that use this method:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
Note that you can still access the batch headers.
If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type.
The following example shows how to do so:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
Starting with version 2.1.1, the org.springframework.core.convert.ConversionService
used by the default org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:
Starting with version 2.4.2 you are able to add your own HandlerMethodArgumentResolver
and resolve custom method parameters.
All you need is to implement KafkaListenerConfigurer
and use method setCustomMethodArgumentResolvers()
from class KafkaListenerEndpointRegistrar
.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory
to the KafkaListenerEndpointRegistrar
bean.
If you do this, and your application needs to handle tombstone records, with a null
value()
(e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver
to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload
annotation.
If you are using a DefaultMessageHandlerMethodFactory
, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver
, which has no knowledge of KafkaNull
payloads.
See also Null Payloads and Log Compaction of Tombstone
Records.
Apache®, Apache Tomcat®, Apache Kafka®, Apache Cassandra™, and Apache Geode™ are trademarks or registered trademarks of the Apache Software Foundation in the United States and/or other countries. Java™, Java™ SE, Java™ EE, and OpenJDK™ are trademarks of Oracle and/or its affiliates. Kubernetes® is a registered trademark of the Linux Foundation in the United States and other countries. Linux® is the registered trademark of Linus Torvalds in the United States and other countries. Windows® and Microsoft® Azure are registered trademarks of Microsoft Corporation. “AWS” and “Amazon Web Services” are trademarks or registered trademarks of Amazon.com Inc. or its affiliates. All other trademarks and copyrights are property of their respective owners and are only mentioned for informative purposes. Other names may be trademarks of their respective owners.