By default, the inferred type information will override the inbound __TypeId__ and related headers created
by the sending system.
This allows the receiving system to automatically convert to a different domain object.
This applies only if the parameter type is concrete (not abstract or an interface) or it is from the java.util
package.
In all other cases, the __TypeId__ and related headers will be used.
There are cases where you might wish to override the default behavior and always use the __TypeId__ information.
For example, let’s say you have a @RabbitListener that takes a Foo argument but the message contains a Bar which
is a subclass of Foo (which is concrete).
The inferred type would be incorrect.
To handle this situation, set the TypePrecedence property on the Jackson2JsonMessageConverter to TYPE_ID instead
of the default INFERRED .
The property is actually on the converter’s DefaultJackson2JavaTypeMapper but a setter is provided on the converter
for convenience.
If you inject a custom type mapper, you should set the property on the mapper instead.
When converting from the Message , an incoming MessageProperties.getContentType() must be JSON-compliant (the logic contentType.contains("json") is used).
Otherwise, a WARN log message Could not convert incoming message with content-type [...] , is emitted and message.getBody() is returned as is - as a byte[] .
So, to meet the Jackson2JsonMessageConverter requirements on the consumer side, the producer must add the contentType message property, e.g. as application/json , text/x-json or simply use the Jackson2JsonMessageConverter , which will set the header automatically.
@RabbitListener
public void foo(Foo foo) {...}
@RabbitListener
public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void foo(Foo foo, o.s.amqp.core.Message message) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void foo(Foo foo, String bar) {...}
@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<?> message) {...}
In the first four cases above the converter will attempt to convert to the Foo type.
The fifth example is invalid because we can’t determine which argument should receive the message payload.
With the sixth example, the Jackson defaults will apply due to the generic type being a WildcardType .
You can, however, create a custom converter and use the targetMethod message property to decide which type to convert
the JSON to.
This type inference can only be achieved when the @RabbitListener annotation is declared at the method level.
With class-level @RabbitListener , the converted type is used to select which @RabbitHandler method to invoke.
For this reason, the infrastructure provides the targetObject message property which can be used by a custom
converter to determine the type.
| Important |
---|
Starting with version 1.6.11, the Jackson2JsonMessageConverter and, therefore, DefaultJackson2JavaTypeMapper (DefaultClassMapper ) provide the trustedPackages option to overcome Serialization Gadgets vulnerability.
By default, for backward compatiblity the Jackson2JsonMessageConverter trusts all packages - use * for the option.
Yet another option is the MarshallingMessageConverter .
It delegates to the Spring OXM library’s implementations of the Marshaller and Unmarshaller strategy interfaces.
You can read more about that library here.
In terms of configuration, it’s most common to provide the constructor argument only since most implementations of Marshaller will also implement Unmarshaller .
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
This class was introduced in version 1.4.2 and allows delegation to a specific MessageConverter based on the content type property in the MessageProperties .
By default, it will delegate to a SimpleMessageConverter if there is no contentType property, or a value that matches none of the configured converters.
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</property>
</bean>
| Important |
---|
There is a possible vulnerability when deserializing java objects from untrusted sources.
If you accept messages from untrusted sources with a content-type application/x-java-serialized-object , you should
consider configuring which packages/classes are allowed to be deserialized.
This applies to both the SimpleMessageConverter and SerializerMessageConverter when it is configured to use a
DefaultDeserializer - either implicitly, or via configuration.
By default, the white list is empty, meaning all classes will be deserialized.
You can set a list of patterns, such as foo.* , foo.bar.Baz or *.MySafeClass .
The patterns will be checked in order until a match is found.
If there is no match, a SecurityException is thrown.
Set the patterns using the whiteListPatterns property on these converters.
The MessagePropertiesConverter strategy interface is used to convert between the Rabbit Client
BasicProperties and Spring AMQP MessageProperties . The default implementation
(DefaultMessagePropertiesConverter ) is usually sufficient for most purposes but you can implement your own if needed.
The default properties converter will convert BasicProperties elements of type LongString to String s
when the size is not greater than 1024 bytes. Larger LongString s are not converted (see below).
This limit can be overridden with a constructor argument.
Starting with version 1.6, headers longer than the long string limit (default 1024) are now left as
LongString s by default by the DefaultMessagePropertiesConverter .
You can access the contents via the getBytes[] , toString() , or getStream() methods.
Previously, the DefaultMessagePropertiesConverter "converted" such headers to a DataInputStream (actually it just
referenced the LongString 's DataInputStream ).
On output, this header was not converted (except to a String, e.g.
[email protected] by calling
toString() on the stream).
Large incoming LongString headers are now correctly "converted" on output too (by default).
A new constructor is provided to allow you to configure the converter to work as before:
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
public DefaultMessagePropertiesConverter( int longStringLimit, boolean convertLongLongStrings) { ... }
Also starting with version 1.6, a new property correlationIdString has been added to MessageProperties .
Previously, when converting to/from BasicProperties used by the RabbitMQ client, an unnecessary byte[] <-> String
conversion was performed because MessageProperties.correlationId is a byte[] but BasicProperties uses a
String . (Ultimately, the RabbitMQ client uses UTF-8 to convert the String to bytes to put in the protocol message).
To provide maximum backwards compatibility, a new property correlationIdPolicy has been added to the
DefaultMessagePropertiesConverter .
This takes an DefaultMessagePropertiesConverter.CorrelationIdPolicy enum argument.
By default it is set to BYTES which replicates the previous behavior.
For inbound messages:
STRING - just the correlationIdString property is mapped
BYTES - just the correlationId property is mapped
BOTH - both properties are mapped
For outbound messages:
STRING - just the correlationIdString property is mapped
BYTES - just the correlationId property is mapped
BOTH - Both properties will be considered, with the String property taking precedence
Also starting with version 1.6, the inbound deliveryMode property is no longer mapped to MessageProperties.deliveryMode , it is mapped to MessageProperties.receivedDeliveryMode instead.
Also, the inbound userId property is no longer mapped to MessageProperties.userId , it is mapped to MessageProperties.receivedUserId instead.
These changes are to avoid unexpected propagation of these properties if the same MessageProperties object is used for an outbound message.
A number of extension points exist where you can perform some processing on a message, either before it is sent to RabbitMQ, or immediately after it is received.
As can be seen in Section 3.1.7, “Message Converters”, one such extension point is in the AmqpTemplate convertAndReceive operations, where you can provide a MessagePostProcessor .
For example, after your POJO has been converted, the MessagePostProcessor enables you to set custom headers or properties on the Message .
Starting with version 1.4.2, additional extension points have been added to the RabbitTemplate - setBeforePublishPostProcessors() and setAfterReceivePostProcessors() .
The first enables a post processor to run immediately before sending to RabbitMQ. When using batching (see the section called “Batching”), this is invoked after the batch is assembled and before the batch is sent. The second is invoked immediately after a message is received.
These extension points are used for such features as compression and, for this purpose, several MessagePostProcessor s are provided:
GZipPostProcessor
ZipPostProcessor
for compressing messages before sending, and
GUnzipPostProcessor
UnzipPostProcessor
for decompressing received messages.
Similarly, the SimpleMessageListenerContainer also has a setAfterReceivePostProcessors() method, allowing the decompression to be performed after messages are received by the container.
The AmqpTemplate also provides a variety of sendAndReceive methods that accept the same argument options that you have seen above for the one-way send operations (exchange, routingKey, and Message).
Those methods are quite useful for request/reply scenarios since they handle the configuration of the necessary "reply-to" property before sending and can listen for the reply message on an exclusive Queue that is created internally for that purpose.
Similar request/reply methods are also available where the MessageConverter is applied to both the request and reply.
Those methods are named convertSendAndReceive .
See the Javadoc of AmqpTemplate for more detail.
Starting with version 1.5.0, each of the sendAndReceive method variants has an overloaded version that takes CorrelationData .
Together with a properly configured connection factory, this enables the receipt of publisher confirms for the send side of the operation.
See the section called “Publisher Confirms and Returns” for more information.
By default, the send and receive methods will timeout after 5 seconds and return null.
This can be modified by setting the replyTimeout property.
Starting with version 1.5, if you set the mandatory property to true (or the mandatory-expression evaluates to
true for a particular message), if the message cannot be delivered to a queue an AmqpMessageReturnedException will
be thrown.
This exception has returnedMessage , replyCode , replyText properties, as well as the exchange and
routingKey used for the send.
This feature uses publisher returns and is enabled by setting publisherReturns to true on the
CachingConnectionFactory (see the section called “Publisher Confirms and Returns”).
Also, you must not have registered your own ReturnCallback with the RabbitTemplate .
| Important |
---|
Starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request).
Starting with Spring AMQP version 1.4.1 Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues.
When no replyQueue is provided (or it is set with the name amq.rabbitmq.reply-to ), the RabbitTemplate will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue.
When using Direct reply-to, a reply-listener is not required and should not be configured.
Reply listeners are still supported with named queues (other than amq.rabbitmq.reply-to ), allowing control of reply concurrency etc.
Starting with version 1.6 if, for some reason, you wish to use a temporary, exclusive, auto-delete queue for each
reply, set the useTemporaryReplyQueues property to true .
This property is ignored if you you set a replyAddress .
The decision whether or not to use direct reply-to can be changed to use different criteria by subclassing
RabbitTemplate and overriding useDirectReplyTo() .
The method is called once only; when the first request is sent.
When using a fixed reply queue (other than amq.rabbitmq.reply-to ), it is necessary to provide correlation data so that replies can be correlated to requests.
See RabbitMQ Remote Procedure Call (RPC).
By default, the standard correlationId property will be used to hold the correlation data.
However, if you wish to use a custom property to hold correlation data, you can set the correlation-key attribute on the <rabbit-template/>.
Explicitly setting the attribute to correlationId is the same as omitting the attribute.
Of course, the client and server must use the same header for correlation data.
Spring AMQP version 1.1 used a custom property spring_reply_correlation for this data.
If you wish to revert to this behavior with the current version, perhaps to maintain compatibility with another application using 1.1, you must set the attribute to spring_reply_correlation .
When using RabbitMQ versions prior to 3.4.0, a new temporary queue is used for each reply.
However, a single reply queue can be configured on the template, which can be more efficient,
and also allows you to set arguments on that queue.
In this case, however, you must also provide a <reply-listener/> sub element.
This element provides a listener container for the reply queue, with the template being the listener.
All of the Section 3.1.15, “Message Listener Container Configuration” attributes allowed on a <listener-container/> are allowed on the element, except for
connection-factory and message-converter, which are inherited from the template’s configuration.
| Important |
---|
If you run multiple instances of your application or use multiple RabbitTemplate s, you MUST use a
unique reply queue for each - RabbitMQ has no capability to select messages from a queue so, if they all use the same
queue, each instance would compete for replies and not necessarily receive their own.
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
While the container and template share a connection factory, they do not share a channel and therefore requests and replies are not performed within the same transaction (if transactional).
Prior to version 1.5.0, the reply-address attribute was not available, replies were always routed using the
default exchange and the reply-queue name as the routing key.
This is still the default but you can now specify the new reply-address attribute.
The reply-address can contain an address with the form <exchange>/<routingKey> and the reply will be routed to the
specified exchange and routed to a queue bound with the routing key.
The reply-address has precedence over reply-queue . The <reply-listener> must be configured as a separate
<listener-container> component, when only reply-address is in use, anyway reply-address and reply-queue
(or queues attribute on the <listener-container> ) must refer to the same queue logically.
With this configuration, a SimpleListenerContainer is used to receive the replies; with the RabbitTemplate being the MessageListener .
When defining a template with the <rabbit:template/> namespace element, as shown above, the parser defines the container and wires in the template as the listener.
When the template does not use a fixed replyQueue (or is using Direct reply-to - see the section called “RabbitMQ Direct reply-to”) a listener container is not needed. Direct reply-to is the preferred mechanism when using RabbitMQ 3.4.0 or later.
If you define your RabbitTemplate as a <bean/> , or using an @Configuration class to define it as an @Bean , or when creating the template programmatically, you will need to define and wire up the reply listener container yourself.
If you fail to do this, the template will never receive the replies and will eventually time out and return null as the reply to a call to a sendAndReceive method.
Starting with version 1.5, the RabbitTemplate will detect if it has been
configured as a MessageListener to receive replies.
If not, attempts to send and receive messages with a reply address
will fail with an IllegalStateException (because the replies will never be received).
Further, if a simple replyAddress (queue name) is used, the reply listener container will verify that it is listening
to a queue with the same name.
This check cannot be performed if the reply address is an exchange and routing key and a debug log message will be
written.
| Important |
---|
When wiring the reply listener and template yourself, it is important to ensure that the template’s replyQueue and the container’s queues (or queueNames ) properties refer to the same queue.
The template inserts the reply queue into the outbound message replyTo property.
The following are examples of how to manually wire up the beans.
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate;
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
A complete example of a RabbitTemplate wired with a fixed reply queue, together with a "remote" listener container that handles the request and returns the reply is shown in this test case.
| Important |
---|
When the reply times out (replyTimeout ), the sendAndReceive() methods return null.
Prior to version 1.3.6, late replies for timed out messages were simply logged.
Now, if a late reply is received, it is rejected (the template throws an AmqpRejectAndDontRequeueException ).
If the reply queue is configured to send rejected messages to a dead letter exchange, the reply can be retrieved for later analysis.
Simply bind a queue to the configured dead letter exchange with a routing key equal to the reply queue’s name.
Refer to the RabbitMQ Dead Letter Documentation for more information about configuring dead lettering.
You can also take a look at the FixedReplyQueueDeadLetterTests test case for an example.
Version 1.6 introduced the AsyncRabbitTemplate .
This has similar sendAndReceive (and convertSendAndReceive ) methods to those on the AmqpTemplate
but instead of blocking, they return a ListenableFuture .
The sendAndReceive methods return a RabbitMessageFuture ; the convertSendAndReceive methods return a
RabbitConverterFuture .
You can either synchronously retrieve the result later, by invoking get() on the future, or you can register a
callback which will be called asynchronously with the result.
@Autowired
private AsyncRabbitTemplate template;
public void doSomeWorkAndGetResultLater() {
ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
String reply = null;
try {
reply = future.get();
catch (ExecutionException e) {
public void doSomeWorkAndGetResultAsync() {
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
@Override
public void onFailure(Throwable ex) {
If mandatory is set, and the message can’t be delivered, the future will throw an ExecutionException with a cause of
AmqpMessageReturnedException which encapsulates the returned message and information about the return.
If enableConfirms is set, the future will have a property confirm which is itself a ListenableFuture<Boolean>
with true indicating a successful publish.
If the confirm future is false, the RabbitFuture will have a further property nackCause - the reason for the
failure, if available.
| Important |
---|
The publisher confirm is discarded if it is received after the reply - since the reply implies a successful
publish.
Set the receiveTimeout property on the template to time out replies (it defaults to 30000 - 30 seconds).
If a timeout occurs, the future will be completed with an AmqpReplyTimeoutException .
The template implements SmartLifecycle ; stopping the template while there are pending replies will cause the
pending Future s to be canceled.
The Spring Framework has a general remoting capability, allowing Remote Procedure Calls (RPC) using various transports.
Spring-AMQP supports a similar mechanism with a AmqpProxyFactoryBean on the client and a AmqpInvokerServiceExporter on the server.
This provides RPC over AMQP.
On the client side, a RabbitTemplate is used as described above; on the server side, the invoker (configured as a MessageListener ) receives the message, invokes the configured service, and returns the reply using the inbound message’s replyTo information.
The client factory bean can be injected into any bean (using its serviceInterface ); the client can then invoke methods on the proxy, resulting in remote execution over AMQP.
With the default MessageConverter s, the method parameters and returned value must be instances of Serializable .
On the server side, the AmqpInvokerServiceExporter has both AmqpTemplate and MessageConverter properties.
Currently, the template’s MessageConverter is not used.
If you need to supply a custom message converter, then you should provide it using the messageConverter property.
On the client side, a custom message converter can be added to the AmqpTemplate which is provided to the AmqpProxyFactoryBean using its amqpTemplate property.
Sample client and server configurations are shown below.
<bean id="client"
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<property name="amqpTemplate" ref="template" />
<property name="serviceInterface" value="foo.ServiceInterface" />
</bean>
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"
routing-key="remoting.binding" exchange="remoting.exchange" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="listener"
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
</bean>
<bean id="service" class="foo.ServiceImpl" />
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
| Important |
---|
The AmqpInvokerServiceExporter can only process properly formed messages, such as those sent from the AmqpProxyFactoryBean .
If it receives a message that it cannot interpret, a serialized RuntimeException will be sent as a reply.
If the message has no replyToAddress property, the message will be rejected and permanently lost if no Dead Letter Exchange has been configured.
By default, if the request message cannot be delivered, the calling thread will eventually timeout and a
RemoteProxyFailureException will be thrown.
The timeout is 5 seconds by default, and can be modified by setting the replyTimeout property on the
RabbitTemplate .
Starting with version 1.5, setting the mandatory property to true, and enabling returns on the connection
factory (see the section called “Publisher Confirms and Returns”), the calling thread will throw an AmqpMessageReturnedException .
See the section called “Reply Timeout” for more information.
The AMQP specification describes how the protocol can be used to configure Queues, Exchanges and Bindings on the broker.
These operations which are portable from the 0.8 specification and higher are present in the AmqpAdmin interface in the org.springframework.amqp.core package.
The RabbitMQ implementation of that class is RabbitAdmin located in the org.springframework.amqp.rabbit.core package.
The AmqpAdmin interface is based on using the Spring AMQP domain abstractions and is shown below:
public interface AmqpAdmin {
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
The getQueueProperties() method returns some limited information about the queue (message count and consumer count).
The keys for the properties returned are available as constants in the RabbitTemplate (QUEUE_NAME ,
QUEUE_MESSAGE_COUNT , QUEUE_CONSUMER_COUNT ).
The RabbitMQ REST API provides much more information in the QueueInfo object.
The no-arg declareQueue() method defines a queue on the broker with a name that is automatically generated.
The additional properties of this auto-generated queue are exclusive=true , autoDelete=true , and durable=false .
The declareQueue(Queue queue) method takes a Queue object and returns the name of the declared queue.
If the provided Queue 's name property is an empty String, the broker declares the queue with a generated name and
that name is returned to the caller.
The Queue object itself is not changed.
This functionality can only be used programmatically by invoking the RabbitAdmin directly.
It is not supported for auto-declaration by the admin by defining a queue declaratively in the application context.
This is in contrast to an AnonymousQueue where the framework generates a unique (UUID ) name and sets durable to
false and exclusive , autoDelete to true .
A <rabbit:queue/> with an empty, or missing, name attribute will always create an AnonymousQueue .
See the section called “AnonymousQueue” to understand why AnonymousQueue is preferred over broker-generated queue names, as well as
how to control the format of the name.
Declarative queues must have fixed names because they might be referenced elsewhere in the context, for example, in a
listener:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
See the section called “Automatic Declaration of Exchanges, Queues and Bindings”.
The RabbitMQ implementation of this interface is RabbitAdmin which when configured using Spring XML would look like this:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
When the CachingConnectionFactory cache mode is CHANNEL (the default), the RabbitAdmin implementation does automatic lazy declaration of Queues , Exchanges and Bindings declared in the same ApplicationContext .
These components will be declared as s0on as a Connection is opened to the broker.
There are some namespace features that make this very convenient, e.g.
in the Stocks sample application we have:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
In the example above we are using anonymous Queues (actually internally just Queues with names generated by the framework, not by the broker) and refer to them by ID.
We can also declare Queues with explicit names, which also serve as identifiers for their bean definitions in the context.
<rabbit:queue name="stocks.trade.queue"/>
You can provide both an id and a name attribute.
This allows you to refer to the queue (for example in a binding) by an id that is independent of the queue name.
It also allows standard Spring features such as property placeholders, and SpEL expressions for the queue name; these features are not available when using the name as the bean identifier.
Queues can be configured with additional arguments, for example, x-message-ttl or x-ha-policy.
Using the namespace support, they are provided in the form of a Map of argument name/argument value pairs, using the <rabbit:queue-arguments> element.
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
By default, the arguments are assumed to be strings.
For arguments of other types, the type needs to be provided.
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
When providing arguments of mixed types, the type is provided for each entry element:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
With Spring Framework 3.2 and later, this can be declared a little more succinctly:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
| Important |
---|
The RabbitMQ broker will not allow declaration of a queue with mismatched arguments.
For example, if a queue already exists with no time to live argument, and you attempt to declare it with, say, key="x-message-ttl" value="100" , an exception will be thrown.
By default, the RabbitAdmin will immediately stop processing all declarations when any exception occurs; this could cause downstream issues - such as a listener container failing to initialize because another queue (defined after the one in error) is not declared.
This behavior can be modified by setting the ignore-declaration-exceptions attribute to true on the RabbitAdmin .
This option instructs the RabbitAdmin to log the exception, and continue declaring other elements.
When configuring the RabbitAdmin using java, this property is ignoreDeclarationExceptions .
This is a global setting which applies to all elements, queues, exchanges and bindings have a similar property which
applies to just those elements.
Prior to version 1.6, this property only took effect if an IOException occurred on the channel - such as when there
is a mismatch between current and desired properties.
Now, this property takes effect on any exception, including TimeoutException etc.
In addition, any declaration exceptions result in the publishing of a DeclarationExceptionEvent , which is an
ApplicationEvent that can be consumed by any ApplicationListener in the context.
The event contains a reference to the admin, the element that was being declared, and the Throwable .
Starting with version 1.3 the HeadersExchange can be configured to match on multiple headers; you can also specify whether any or all headers must match:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
Starting with version 1.6 Exchanges can be configured with an internal flag (defaults to false ) and such an
Exchange will be properly configured on the Broker via a RabbitAdmin (if one is present in the application context).
If the internal flag is true for an exchange, RabbitMQ will not allow clients to use the exchange.
This is useful for a dead letter exchange, or exchange-to-exchange binding, where you don’t wish the exchange to be used
directly by publishers.
To see how to use Java to configure the AMQP infrastructure, look at the Stock sample application,
where there is the @Configuration class AbstractStockRabbitConfiguration which in turn has
RabbitClientConfiguration and RabbitServerConfiguration subclasses.
The code for AbstractStockRabbitConfiguration is shown below
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
@Bean
public MessageConverter jsonMessageConverter() {
return new JsonMessageConverter();
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
In the Stock application, the server is configured using the following @Configuration class:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
This is the end of the whole inheritance chain of @Configuration classes.
The end result is the the TopicExchange and Queue will be declared to the broker upon application startup.
There is no binding of the TopicExchange to a queue in the server configuration, as that is done in the client application.
The stock request queue however is automatically bound to the AMQP default exchange - this behavior is defined by the specification.
The client @Configuration class is a little more interesting and is shown below.
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
The client is declaring another queue via the declareQueue() method on the AmqpAdmin , and it binds that queue to the market data exchange with a routing pattern that is externalized in a properties file.
Version 1.6 introduces a convenient fluent API for configuring Queue and Exchange objects when using Java configuration:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
See the javadocs for org.springframework.amqp.core.QueueBuilder and org.springframework.amqp.core.ExchangeBuilder for more information.
Starting with version 1.5, it is now possible to declare multiple entities with one @Bean , by returing a
collection.
Only collections where the first element is a Declarable are considered, and only Declarable elements from such
collections are processed.
@Configuration
public static class Config {
@Bean
public ConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
@Bean
public List<Exchange> es() {
return Arrays.<Exchange>asList(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true)
@Bean
public List<Queue> qs() {
return Arrays.asList(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true)
@Bean
public List<Binding> bs() {
return Arrays.asList(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
@Bean
public List<Declarable> ds() {
return Arrays.<Declarable>asList(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
By default, all queues, exchanges, and bindings are declared by all RabbitAdmin instances (that have auto-startup="true" ) in the application context.
Starting with the 1.2 release, it is possible to conditionally declare these elements.
This is particularly useful when an application connects to multiple brokers and needs to specify with which broker(s) a particular element should be declared.
The classes representing these elements implement Declarable which has two methods: shouldDeclare() and getDeclaringAdmins() .
The RabbitAdmin uses these methods to determine whether a particular instance should actually process the declarations on its Connection .
The properties are available as attributes in the namespace, as shown in the following examples.
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:queue id="declaredByBothAdminsImplicitly" />
<rabbit:queue id="declaredByBothAdmins" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAny" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
The auto-declare attribute is true by default and if the declared-by is not supplied (or is empty) then all RabbitAdmin s will declare the object (as long as the admin’s auto-startup attribute is true; the default).
Similarly, you can use Java-based @Configuration to achieve the same effect.
In this example, the components will be declared by admin1 but not admin2 :
@Bean
public RabbitAdmin admin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
@Bean
public RabbitAdmin admin2() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin());
return queue;
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin());
return exchange;
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin());
return binding;
In general, when needing a uniquely-named, exclusive, auto-delete queue, it is recommended that the AnonymousQueue is
used instead of broker-defined queue names (using "" as a Queue name will cause the broker to generate the queue
name).
This is because:
The queues are actually declared when the connection to the broker is established; this is long after the beans are
created and wired together; beans using the queue need to know its name.
In fact, the broker might not even be running when the app is started.
If the connection to the broker is lost for some reason, the admin will re-declare the AnonymousQueue with the same
name.
If we used broker-declared queues, the queue name would change.
Starting with version 1.5.3, you can control the format of the queue name used by AnonymousQueue s.
By default, the queue name is the String representation of a UUID ; for example:
07afcfe9-fe77-4983-8645-0061ec61a47a .
You can now provide an AnonymousQueue.NamingStrategy implementation in a constructor argument:
@Bean
public Queue anon1() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy());
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("foo-"));
The first will generate a queue name prefixed by spring.gen- followed by a base64 representation of the UUID , for
example: spring.gen-MRBv9sqISkuCiPfOYfpo4g .
The second will generate a queue name prefixed by foo- followed by a base64 representation of the UUID .
The base64 encoding uses the "URL and Filename Safe Alphabet" from RFC 4648; trailing padding characters (= ) are
removed.
You can provide your own naming strategy, whereby you can include other information (e.g. application, client host) in
the queue name.
Starting with version 1.6, the naming strategy can be specified when using XML configuration;
the naming-strategy attribute is present on the <rabbit:queue> element
for a bean reference that implements AnonymousQueue.NamingStrategy .
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="springNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
The first creates names with a String representation of a UUID.
The second creates names like spring.gen-MRBv9sqISkuCiPfOYfpo4g .
The third creates names like custom.gen-MRBv9sqISkuCiPfOYfpo4g .
Of course, you can provide your own naming strategy bean.
Version 1.6 introduces support for the
Delayed Message Exchange Plugin
The plugin is currently marked as experimental but has been available for over a year (at the time of writing).
If changes to the plugin make it necessary, we will add support for such changes as soon as practical.
For that reason, this support in Spring AMQP should be considered experimental, too.
This functionality was tested with RabbitMQ 3.6.0 and version 0.0.1 of the plugin.
To use a RabbitAdmin to declare an exchange as delayed, simply set the delayed property on the exchange bean to
true.
The RabbitAdmin will use the exchange type (Direct , Fanout etc) to set the x-delayed-type argument and
declare the exchange with type x-delayed-message .
The delayed property (default false ) is also available when configuring exchange beans using XML.
<rabbit:topic-exchange name="topic" delayed="true" />
To send a delayed message, it’s simply a matter of setting the x-delay header, via the MessageProperties :
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(15000);
return message;
To check if a message was delayed, use the getReceivedDelay() method on the MessageProperties .
It is a separate property to avoid unintended propagation to an output message generated from an input message.
|
|
|
|
|
|
|
|
|
|