RocketMQ Java 客户端搭建及简单消息生产和消费案例

RocketMQ是阿里开源的一款分布式消息中间件,主要分为以下几个部分 1、生产者Producer,消费者(Consumer),NameServer,Broker 2、生产者主要用来发送消息,,消费者用来接收消息 3、nameServer就像一个邮局,Broker相当于快递小哥,nameServer用来管理Broker。 4、发送消息有三种方式,分别是同步,异步,单向,异步发送会返回一个回调函数,同步则没有;单向发送,发送消息后不会返回发送结果 5、每个生产者都会有一个Topic和tags,我把它理解为用来区分消息的标识

这里我们采用2m-2s-sync模式搭建起来一个RocketMQ集群环境,详细的环境搭建工作参考上一篇教程:RokcetMQ Linux 2m-2s-sync集群搭建

  • 导入MQ客户端依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

1、消息发送

  • 消息发送一般步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
1)发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

package top.easyblog.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * RocketMQ发送同步消息
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //创建profucer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //设置nameserver的IP地址,多个nameserver以;隔开
        producer.setNamesrvAddr("192.168.253.128:9876;192.168.253.129:9876");
        producer.setVipChannelEnabled(false);
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10_0000; i++) {
            /**
             * 参数1:消息的Topic
             * 参数2:消息的Tag
             * 参数3:消息内容,是一个byte数组
             */
            Message message = new Message("sync-msg", "tag1", String.format("hello-rocketmq-%d",i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息到broker
            SendResult sendRes = producer.send(message);
            System.out.println(String.format("%d条消息发送结果:%s", i+1,sendRes));

        }
        //关闭Producer
        producer.shutdown();
    }

}

执行结果:

2)发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

与发送同步消息步骤基本上一致,只需在第五步消息发送环节,使用提供的的public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException方法来发送异步消息。在发送时创建一个SendCallback接口的匿名内部实现类,实现消息发送成功的回调方法onSuccess(SendResult sendResult),以及消息发送失败出现异常时的onException(Throwable throwable)方法。

package top.easyblog.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * RockerMQ发送异步消息
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //执行nameserver ip地址和端口号
        producer.setNamesrvAddr("192.168.253.128:9876;192.168.253.129:9876");
        //启动producer
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 10_0000; i++) {
            Message msg = new Message("Async-test-msg", "tag1", String.format("hello-rocketmq-%d", i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //使用SendCallback回调异步处理结果
            final int finalI = i;
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d %s\n", finalI, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s\n", finalI, e);
                    e.printStackTrace();
                }
            });

            //由于我的测试环境是在我电脑VM上搭建,机器性能比较差,因此这里在每次发送消息后停顿500ms,不然总是报错,
            //当然生产环境绝对不应该这么使用
            Thread.sleep(500);
        }
        //如果不在发送消息,关闭producer实例
        producer.shutdown();
    }

}

执行结果:

3)单向发送消息

单向消息一般用于不是特别关心发送结果的场景,例如发送日志。

发送单向消息,只是在基本消息发送案例的基础上使用public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException 方法对消息进行发送,消息发送后没有返回结果

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	//创建profucer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //设置nameserver的IP地址,多个nameserver以;隔开
        producer.setNamesrvAddr("192.168.253.128:9876;192.168.253.129:9876");
        producer.setVipChannelEnabled(false);
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 10_0000; i++) {
             /**
             * 创建消息,并指定Topic,Tag和消息体
             * 参数1:消息的Topic
             * 参数2:消息的Tag
             * 参数3:消息内容,是一个byte数组
             */
        	Message msg = new Message("TopicTest","TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

消费消息

  • 消息消费一般步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer

消费者DefaultMQPushConsumer提供了public void setMessageModel(MessageModel messageModel)方法来设置消费的模式。

MessageModel.BROADCASTING   广播模式
MessageModel.CLUSTERING  负载均衡模式(默认)
1)负载均衡模式

什么是负载均衡模式?如下图

生产者生产了A,,B,C,D四个消息,消费者采用负载均衡方式消费消息,含义就是多个消费者共同消费队列消息,每个消费者处理的消息不同。就如上图所示,消费者customer1消费了A,B消息,消费者customer2消费了C,D消息。

负载均衡模式是RocketMQ默认的消费模式,下面这段代码是一个显示的指定为负载均衡消费模式的消费者代码

package top.easyblog.base;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 负载均衡的消费者模式
 */
public class ClusteringCustomer {

    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876;192.168.253.129:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("sync-msg","tag1");

        //指定消费模式 : 负载均衡模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
      
    }

}

执行结果:

同步发送者的代码参考上面 1)发送同步消息 代码执行完毕,可以看到两个消费者分别消费了发送者发送的10个消息

2)广播模式

什么又是广播模式?如下图

生产者同样生产了A,,B,C,D四个消息,消费者采用广播的方式消费消息,他的含义就是每个消费者消费的消息都是相同的,就如上图所示,消费者customer1消费了A,B,C,D消息,消费者customer2消费也了A,B,C,D消息。

package top.easyblog.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 负载均衡的广播模式
 */
public class BroadcastingCustomer {

    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876;192.168.253.129:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("sync-msg","tag1");

        //指定消费模式 : 广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();

    }

}

执行结果:

代码执行完毕,可以看到两个消费者消费的内容是完全相同的

参考

留言区

还能输入500个字符