RocketMQ 顺序消息、延时消息、批量消息、过滤消息和事物消息之发送和消费案例

1、顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。如图:

而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费,如图:

但是如果控制发送的消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则RocketMQ就可以严格保证FIFO特性,即保证消息的顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

顺序消息生产
package top.easyblog.ordermessage;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import top.easyblog.OrderStep;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 顺序生产消息
 */
public class OrderProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.223.128:9876;192.168.223.129:9876");
        producer.start();
        // 订单列表
        List<OrderStep> orderList = OrderStep.buildOrders();

        for(int i=0;i<10;i++){
            String body=orderList.get(i).toString();
            final Message msg = new Message("User-Order", "order", body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息,这里需要根据订单id选择发送的queue,让同一个订单的消息进入到同一个queue
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                    //根据订单id选择发送queue
                    Long id = (Long) arg;  //根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        //关闭Producer
        producer.shutdown();
    }

}
顺序消息消费
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new 
           DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /**
        * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        * 如果非第一次启动,那么按照上次消费的位置继续消费
        */
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest", "TagA || TagC || TagD");

       consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }

               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

       consumer.start();

       System.out.println("Consumer Started.");
   }
}

执行结果

MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。

如果启动2个消费者呢?那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。

如果启动3个消费者呢?那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:

2、延时消息

RocketMQ提供一种延时消息的解决方案,就是在特定的时间到了,消息才会被投递出去供consumer消费。比如电商里,提交了一个订单就可以发送一个延时消息,30分钟后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

总体来是简单的场景是满足了,但是需要注意的是延时的时间是需要按照默认配置的延时级别去配置的,而不是随意设置消息的延时时间。现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

默认的延迟级别

这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置。具体的细节可以参考org/apache/rocketmq/store/config/MessageStoreConfig.java

发送延时消息

和普通的消息不同之处在于Producer在发送消息的时候 需要设置message.setDelayTimeLevel();延迟级别方法。其他参数和消费端的写法并与不同之处。

package top.easyblog.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import top.easyblog.MQConfig;

/**
 * 发送延时消息
 */
public class DelayProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //从配置文件重读取nameserver的ip地址并设置给producer
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("DelayTopic", "tag1", ("hello rocketmq" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 设置延时等级4,这个消息将在30s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
            //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            msg.setDelayTimeLevel(4);
            //发送消息
            SendResult sendRes = producer.send(msg);
            System.out.println(String.format("%d条消息发送结果:%s", i+1,sendRes));
        }
        producer.shutdown();
    }

}
启动消息消费者
package top.easyblog.delay;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import top.easyblog.MQConfig;

import java.util.List;

/**
 * 消费延时消息
 */
public class DelayCustomer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //从配置文件重读取nameserver的ip地址并设置给consumer
        consumer.setNamesrvAddr(MQConfig.getNameServerAddr());
        consumer.subscribe("DelayTopic","tag1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt message:msgs){
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }

}

执行结果:

从上图执行结果可以看到,当我们把压延时设置为4级,即延时30s时,消费者确实是只会在30s后消费到这些消息

3、批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

发送批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

package top.easyblog.batch;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import top.easyblog.MQConfig;

import java.util.ArrayList;
import java.util.List;

/**
 * 批量发送消息
 */
public class BatchProducer {

    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());

        final String topic="batch-topic";
        List<Message> messages= new ArrayList<>();
        //构造批量消息
        for(int i=0;i<10;i++){
            messages.add(new Message(topic,"tagA","OrderID00"+i++,("Hello RockerMQ"+i).getBytes()));
        }
        try {
            producer.start();
            //一次性发送批量消息,但是大小不要超过4MB,如果超过4MB最好将消息分批发送
            SendResult sendResult = producer.send(messages);
            System.out.println("消息发送结果:"+sendResult);
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   //限制每次发送的大小 
   private final int SIZE_LIMIT = 1024 * 10;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}

发送大于4MB的批量消息

package top.easyblog.batch;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import top.easyblog.MQConfig;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;

/**
 * 分批批量发送消息
 */
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());

        final String topic = "batch-topic";
        List<Message> messages = new ArrayList<>();
        //一个64MB的文本文件,里面全部是一些UUID,这些UUID以#分隔
        File testFile = new File("D://file.txt");
        FileInputStream fis = new FileInputStream(testFile);
        byte[] bytes = new byte[1024 * 1024];
        StringBuilder sb = new StringBuilder();
        while (fis.read(bytes) != -1) {
            sb.append(new String(bytes));
        }
        String[] uuids = sb.toString().split("#");
        //构造批量消息
        for (int i = 0; i < uuids.length; i++) {
            messages.add(new Message(topic, "tagA",  uuids[i].getBytes()));
        }
        producer.start();
        //切分消息
        ListSplitter splitter = new ListSplitter(messages);
        //统计发送次数
        int i = 0;
        while (splitter.hasNext()) {
            List<Message> messageSegment = splitter.next();
            //一次性发送批量消息,但是大小不要超过4MB,如果超过4MB最好将消息分批发送
            SendResult sendResult = producer.send(messageSegment);
            System.out.println(String.format("第%d次消息发送结果:%s", i++, sendResult));
        }
        producer.shutdown();
    }
}

执行结果:

4、过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

生产者

package top.easyblog.filter;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import top.easyblog.MQConfig;

import java.util.UUID;

/**
 * 生产同一Topic不同TAG的消息
 */
public class FilterProducer {

    public static void main(String[] args) throws Exception {
        //创建producer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //设置nameserver的IP地址,多个nameserver以;隔开
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());
        producer.setVipChannelEnabled(false);
        // 启动Producer实例
        producer.start();

        for (int i = 0; i < 100; i++) {
            /**
             * 发送100条消息,每条消息的topic都一样,但是tag不一样
             * 参数1:消息的Topic
             * 参数2:消息的Tag
             * 参数3:消息内容,是一个byte数组
             */
            Message message = new Message("filter-by-tag-msg", "tag" + i % 5, String.format(UUID.randomUUID().toString()+"-%d", i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息到broker
            SendResult sendRes = producer.send(message);
            System.out.println(String.format("%d条消息发送结果:%s", i + 1, sendRes));

        }
        //关闭Producer
        producer.shutdown();
    }

}

消费者

package top.easyblog.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import top.easyblog.MQConfig;

import java.util.List;

/**
 * 通过tag表达式消费指定tag的消息
 */
public class FilterByTagCustomer {

    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr(MQConfig.getNameServerAddr());
        //3.订阅主题Topic和Tag
        // 第二个参数subExpression可以是一个逻辑表达式,下面这个表示接受tag1,tag2,tag3的消息
        consumer.subscribe("filter-by-tag-msg", MessageSelector.byTag("tag1 || tag2 || tag3"));

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            int index=0;
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (int i=0;i<msgs.size();i++) {
                    System.out.println(String.format("第【%d】条消息:%s",index++,new String(msgs.get(i).getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();

    }
}

上面代码中消费者将接收包含tag1、tag2、tag3的消息。在生产者中每个tag下都有20条消息,所以这里消费者可以消费到60条消息,执行结果如下:

虽然使用tag表达式比较简单,很容易使用,但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。

RocketMQ支持的SQL92语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)
消息生产者

发送消息时,你能通过putUserProperty来设置消息的属性

package top.easyblog.filter;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import top.easyblog.MQConfig;

import java.util.UUID;

/**
 * 
 */
public class FilterSQLProducer {

    public static void main(String[] args) throws Exception {
        //创建producer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //设置nameserver的IP地址,多个nameserver以;隔开
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());
        producer.setVipChannelEnabled(false);
        // 启动Producer实例
        producer.start();

        for (int i = 0; i < 1000; i++) {
            /**
             * 发送100条消息,每条消息的topic都一样,但是tag不一样
             * 参数1:消息的Topic
             * 参数2:消息的Tag
             * 参数3:消息内容,是一个byte数组
             */
            Message message = new Message("filter-by-sql-msg", "tag" + i % 5, String.format(UUID.randomUUID().toString()+"-%d", i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //使用SQL过滤,需要在这里设置一个属性用于筛选消息
            //这里设置一个userId,模拟用户id
            message.putUserProperty("userId",String.valueOf(i));
            //发送消息到broker
            SendResult sendRes = producer.send(message);
            System.out.println(String.format("%d条消息发送结果:%s", i + 1, sendRes));

        }
        //关闭Producer
        producer.shutdown();
    }

}
消息消费者

MessageSelector.bySql()来使用sql筛选消息

package top.easyblog.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import top.easyblog.MQConfig;

import java.util.List;

/**
 * 
 */
public class FilterBySQLCustomer {

    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr(MQConfig.getNameServerAddr());
        //3.订阅主题Topic和Tag
        //这里第二个参数使用MQ支持的sql语法,通过一个属性userId来筛选接受的消息
        consumer.subscribe("filter-by-sql-msg", MessageSelector.bySql("userId BETWEEN 100 and 200"));

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            int index=0;
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (int i=0;i<msgs.size();i++) {
                   System.out.println(String.format("userId=%s 的消息:%s",message.getProperty("userId"),new String(message.getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();

    }

}

执行结果:

customer只接受userId介于100到200并且userId>900的这些消息,下图是执行结果

特别注意!

这一步如果执行发生报错:org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92,这是因为没有开启SQL92过滤功能,只需在broker的配置文件中添加enablePropertyFilter=true配置即可

5、事务消息

流程分析

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

** 1)事务消息发送及提交**

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
发送事务消息

1) 创建事务性生产者

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请就是提交、回滚和中间状态这三种其中的一种。

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //创建消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("group6");
        producer.setNamesrvAddr(MQConfig.getNameServerAddr());
        //生产者这是监听器
        producer.setTransactionListener(transactionListener);
        //启动消息生产者
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
               Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i, UUID.randomUUID().toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
      
    }
}

2)实现事务的监听接口

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回提交、回滚和中间状态这三种事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务");
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }

    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
事物消息的使用限制
  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

参考

留言区

还能输入500个字符