這邊會使用spring-boot-starter-artemis套件來實作,artemis是activemq新的實作版本。
在我們的Spring App上使用activemq時,還需要另外啟動activemq broker,我們app傳出去的訊息其實是會傳到這個broker上的mq,然後app可能透過receive的方法或者listener的設定去拿到broker mq上的訊息。這個broker其實也可以是Spring幫我們embeded在memory中的,但是這樣就變成只支援在當前的app上了,無法讓其他app去使用。
JmsTemplate會是我們從Spring App不管傳訊息或收訊息會用到的核心類別。
// Send raw messages
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// Send messages converted from objects
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message,MessagePostProcessor postProcessor) throws JmsException;
有send()和convertAndSend()兩類方法:
send()
接受MessageCreator所return的Message物件當作要傳出去的訊息,也可放入Destination物件指定要傳到哪裡,如果不放Destination就代表要傳到default的destination(可在application.properties設定)
convertAndSend()
直接接受要傳出的物件,方法內會幫我們轉成Message物件傳出去。不過由於是由方法內部幫我們轉成Message的,若我們希望設定一些customize的參數在Message反而就不像send()方法的MessageCreator可以自定義,這時候就要再傳入MessagePostProcessor(吃入Message返回Message)來針對要傳出的Message預先處理後,再傳出去。
receive
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
當我們call receive方法時,Spring App就會主動去向broker拉該destination的訊息下來。但除了這種主動拉的方式外,其實還可以在Spring App設定Listener來主動監聽broker該destination是否有訊息在mq中了,若有就會自動拉進來。
listener
package demo.kitchen.messaging.jms.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import demo.SwordOrder;
import demo.kitchen.KitchenUI;
@Profile("jms-listener")
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
@JmsListener(destination = "demo.order.queue")
public void receiveOrder(SwordOrder order) {
ui.displayOrder(order);
以上範例代表會主動監聽demo.order.queue這個destination有沒有訊息。
這段嘗試使用RabbitMQ(RabbitTemplate)來實作messaging的功能,說實在跟ActiveMQ(JmsTemplate)概念根本完全一樣,只是他多了exchange的一層來去分派訊息給不同的mq,在JmsTemplate中只會指定Destination,而在RabbitTemplate要指定exchange和routing key,若不指定就會使用預設的值。
引入以下dependency即可在Spring App使用RabbitMQ:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
參考以下RabbitTemplate的send方法:
// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// Send messages converted from objects
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
send的範例code:
package demo.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import demo.Order;
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
public void sendOrder(TacoOrder order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("demo.order", message);
以下為receive方法:
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis)
throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(
String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(
long timeoutMillis, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type)
throws AmqpException;
receive範例code:
package demo.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
public SwordOrder receiveOrder() {
Message message = rabbit.receive("demo.order");
return message != null
? (SwordOrder) converter.fromMessage(message)
: null;
當然也可以設定listener:
package demo.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import demo.SwordOrder;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
@RabbitListener(queues = "demo.order.queue")
public void receiveOrder(SwordOrder order) {
ui.displayOrder(order);