添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

這邊會使用spring-boot-starter-artemis套件來實作,artemis是activemq新的實作版本。

在我們的Spring App上使用activemq時,還需要另外啟動activemq broker,我們app傳出去的訊息其實是會傳到這個broker上的mq,然後app可能透過receive的方法或者listener的設定去拿到broker mq上的訊息。這個broker其實也可以是Spring幫我們embeded在memory中的,但是這樣就變成只支援在當前的app上了,無法讓其他app去使用。

JmsTemplate會是我們從Spring App不管傳訊息或收訊息會用到的核心類別。

// Send raw messages
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// Send messages converted from objects
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message,MessagePostProcessor postProcessor) throws JmsException;

有send()和convertAndSend()兩類方法:

  • send()
  • 接受MessageCreator所return的Message物件當作要傳出去的訊息,也可放入Destination物件指定要傳到哪裡,如果不放Destination就代表要傳到default的destination(可在application.properties設定)

  • convertAndSend()
  • 直接接受要傳出的物件,方法內會幫我們轉成Message物件傳出去。不過由於是由方法內部幫我們轉成Message的,若我們希望設定一些customize的參數在Message反而就不像send()方法的MessageCreator可以自定義,這時候就要再傳入MessagePostProcessor(吃入Message返回Message)來針對要傳出的Message預先處理後,再傳出去。

    receive

    Message receive() throws JmsException;
    Message receive(Destination destination) throws JmsException;
    Message receive(String destinationName) throws JmsException;
    Object receiveAndConvert() throws JmsException;
    Object receiveAndConvert(Destination destination) throws JmsException;
    Object receiveAndConvert(String destinationName) throws JmsException;
    

    當我們call receive方法時,Spring App就會主動去向broker拉該destination的訊息下來。但除了這種主動拉的方式外,其實還可以在Spring App設定Listener來主動監聽broker該destination是否有訊息在mq中了,若有就會自動拉進來。

    listener

    package demo.kitchen.messaging.jms.listener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Profile;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    import demo.SwordOrder;
    import demo.kitchen.KitchenUI;
    @Profile("jms-listener")
    @Component
    public class OrderListener {
      private KitchenUI ui;
      @Autowired
      public OrderListener(KitchenUI ui) {
        this.ui = ui;
      @JmsListener(destination = "demo.order.queue")
      public void receiveOrder(SwordOrder order) {
        ui.displayOrder(order);
    

    以上範例代表會主動監聽demo.order.queue這個destination有沒有訊息。

    這段嘗試使用RabbitMQ(RabbitTemplate)來實作messaging的功能,說實在跟ActiveMQ(JmsTemplate)概念根本完全一樣,只是他多了exchange的一層來去分派訊息給不同的mq,在JmsTemplate中只會指定Destination,而在RabbitTemplate要指定exchange和routing key,若不指定就會使用預設的值。

    引入以下dependency即可在Spring App使用RabbitMQ:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    參考以下RabbitTemplate的send方法:

    // Send raw messages
    void send(Message message) throws AmqpException;
    void send(String routingKey, Message message) throws AmqpException;
    void send(String exchange, String routingKey, Message message) throws AmqpException;
    // Send messages converted from objects
    void convertAndSend(Object message) throws AmqpException;
    void convertAndSend(String routingKey, Object message) throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
    // Send messages converted from objects with post-processing
    void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
    void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    

    send的範例code:

    package demo.messaging;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import demo.Order;
    @Service
    public class RabbitOrderMessagingService implements OrderMessagingService {
      private RabbitTemplate rabbit;
      @Autowired
      public RabbitOrderMessagingService(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
      public void sendOrder(TacoOrder order) {
        MessageConverter converter = rabbit.getMessageConverter();
        MessageProperties props = new MessageProperties();
        Message message = converter.toMessage(order, props);
        rabbit.send("demo.order", message);
    

    以下為receive方法:

    // Receive messages
    Message receive() throws AmqpException;
    Message receive(String queueName) throws AmqpException;
    Message receive(long timeoutMillis) throws AmqpException;
    Message receive(String queueName, long timeoutMillis) throws AmqpException;
    // Receive objects converted from messages
    Object receiveAndConvert() throws AmqpException;
    Object receiveAndConvert(String queueName) throws AmqpException;
    Object receiveAndConvert(long timeoutMillis) throws AmqpException;
    Object receiveAndConvert(String queueName, long timeoutMillis)
                                                        throws AmqpException;
    // Receive type-safe objects converted from messages
    <T> T receiveAndConvert(ParameterizedTypeReference<T> type)
                                                          throws AmqpException;
    <T> T receiveAndConvert(
        String queueName, ParameterizedTypeReference<T> type)
                                                          throws AmqpException;
    <T> T receiveAndConvert(
        long timeoutMillis, ParameterizedTypeReference<T> type)
                                                          throws AmqpException;
    <T> T receiveAndConvert(String queueName, long timeoutMillis,
        ParameterizedTypeReference<T> type)
                                                          throws AmqpException;
    

    receive範例code:

    package demo.kitchen.messaging.rabbit;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    @Component
    public class RabbitOrderReceiver {
      private RabbitTemplate rabbit;
      private MessageConverter converter;
      @Autowired
      public RabbitOrderReceiver(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
        this.converter = rabbit.getMessageConverter();
      public SwordOrder receiveOrder() {
        Message message = rabbit.receive("demo.order");
        return message != null
               ? (SwordOrder) converter.fromMessage(message)
               : null;
    

    當然也可以設定listener:

    package demo.kitchen.messaging.rabbit.listener;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import demo.SwordOrder;
    import tacos.kitchen.KitchenUI;
    @Component
    public class OrderListener {
      private KitchenUI ui;
      @Autowired
      public OrderListener(KitchenUI ui) {
        this.ui = ui;
      @RabbitListener(queues = "demo.order.queue")
      public void receiveOrder(SwordOrder order) {
        ui.displayOrder(order);