SpringBoot整合RocketMQ实现消息发送和接收

三步走:导包-->写配置-->编码及测试

1、引入依赖

在SpringBoot工程中直接引入对应的starter就好了

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
</parent>

<properties>
    <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>

<dependencies>
    <!--RocketMQ-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
</dependencies>

rocketmq-spring-boot-starter 用法简介

当开发中需要快速集成RocketMQ时可以考虑使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成环境,但该框架并不完全具备RocketMQ所有的配置简化,如需批量消费消息便需要自定义一个DefaultMQPushConsumer bean去消费了。

开发中常用的rocketmq-spring-boot-starter相关类:

  • RocketMQListener接口:消费者都需实现该接口的消费方法onMessage(msg)。

  • RocketMQPushConsumerLifecycleListener接口:当@RocketMQMessageListener中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer配置

  • @RocketMQMessageListener:被该注解标注并实现了接口RocketMQListener的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。RocketMQMessageListener中的属性配置是可以从配置文件或配置中心获取的

  • RocketMQLocalTransactionListener接口:可以发送事务型消息,它里面有两个方法executeLocalTransactioncheckLocalTransaction,用于实现执行本地事务和事务回查的两个方法。前者就是我们需要在事务型消息可以被消费之前需要在本地执行的事物操作,只有本地事务提交后发送到MQ中的事物消息才对Consumer可见,否则如果本地事务执行失败,那么消息队列中的消息也会回滚;如果超过一定时间还本地事务还没有提交,就会调用checkLocalTransaction执行本地事务回查。

  • @RocketMQTransactionListener:配合RocketMQLocalTransactionListener接口一起使用,一般只需要修改txProducerGroup。

2、配置文件

# application.properties
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=group1
mq:
  email:
    topic: kill-success-email-topic
    consumer:
      group: kill-success-group
    producer:
      group: kill-success-group

3、消息生产者

RocketMQProducerService.java

@Slf4j
@Service
public class RocketMQProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private ItemKillSuccessMapper itemKillSuccessMapper;
    @Value(value = "${mq.email.topic}")
    private String emailTopic;

    /**
     * 秒杀成功异步发送邮件通知消息给用户
     */
    public SendResult sendKillSuccessEmailMsg(String orderNo) {
        log.info("秒杀成功[订单ID:{}] 异步发送邮件通知消息-准备发送消息:{}", orderNo, DateTime.now().toDateTime());
        if (StringUtils.isEmpty(orderNo)) {
            return null;
        }
        DefaultMQProducer producer = rocketMQTemplate.getProducer();
        MailDto mailDto = new MailDto();
        mailDto.setContent("恭喜你抢购成功!请你点击链接验证相关信息,并请在1小时内完成订单的支付!");
        mailDto.setSubject("抢购成功");
        mailDto.setSendTo(new String[]{"huangxin981230@163.com"});
        Message emailMsg = new Message(emailTopic, orderNo, JSON.toJSON(mailDto).toString().getBytes(StandardCharsets.UTF_8));
        try {
            //发送重试3次
            int reTry = 3;
            SendResult sendResult = null;
            while (sendResult == null && reTry > 0) {
                sendResult = producer.send(emailMsg);
                reTry--;
            }
            if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
                log.info("秒杀成功[订单ID:{}] 异步发送邮件通知消息-消息发送成功:{}", orderNo, DateTime.now().toDateTime());
            } else {
                //3次都失败了抛出异常
                throw new BusinessException(AppResponseCode.MQ_SEND_MESSAGE_FAIL);
            }
            return sendResult;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("秒杀成功[订单ID:{}] 异步发送邮件通知消息-发生异常,消息为:{}", orderNo, e.getMessage());
            return null;
        }

    }

    /**
     * 秒杀成功后生成抢购订单-发送信息入消息队列,等待着一定时间失效超时未支付的订单
     * @return
     */
    public SendResult sendKillSuccessOrderExpireMsg(String orderNo){
       return null;
    }

}

执行上边sendKillSuccessEmailMsg()方法后,消息工程北发送到消息队列中并等待消费者来消费,如下图所示:

接下来我们来编写消费者端来消费指定的消息。在非SpringBoot项目中一般我们编写消费者都是在获取consumer实例后,配置namesrvAddr、订阅有关topic后就需要将consumer注册到一个Listener上,当有消息了就处理。在SpringBoot中做法类似,不过在SpringBoot中组专门提供了一个注解RocketMQMessageListener用它来指定消费者来监听指定topic类型的消息,并且我们还需要实现RocketMQListener<T>接口的onMessage()方法,在此方法中接收并处理消息。下面我写一段代码示范一下,仅供参考!

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.group}", topic = "${mq.email.topic}", selectorType = SelectorType.TAG, selectorExpression = "*")
public class EmailMQListener implements RocketMQListener<MessageExt> {

    @Autowired
    private MailService mailService;


    /**
     * 在这里处理消息,通过MQ异步发送消息给用户
     *
     * @param msg
     */
    @Override
    public void onMessage(MessageExt msg) {
        MailDto mailDto = JSON.parseObject(new String(msg.getBody()), MailDto.class);
        log.info("接收到异步发送邮件的消息:{},开始处理", mailDto.toString());
        mailService.sendSimpleEmail(mailDto);
        log.info("异步发送邮件成功!");
    }
}

执行结果如下图所示: 后台日志打印:

收到邮件:

完结撒花!!!

留言区

还能输入500个字符