欢迎您访问EasyBlog 本站旨在为大家提供IT技术相关的教程和资讯,以及常用开发工具免费下载!
  • 联系我:15709160159联系我
  • 微信公众号微信公众号
您现在的位置是: 首页  >  服务端  >  RocketMQ教程
  • SpringBoot整合RocketMQ实现消息发送和接收

    SpringBoot整合RocketMQ实现消息发送和接收

    三步走:导包-->写配置-->编码及测试1、引入依赖在SpringBoot工程中直接引入对应的starter就好了rocketmq-spring-boot-starter用法简介当开发中需要快速集成RocketMQ时可以考虑使用rocketmq-spring-boot-starter搭建RocketMQ的集成环境,但该框架并不完全具备RocketMQ所有的配置简化,如需批量消费消息便需要自定义一个DefaultMQPushConsumerbean去消费了。开发中常用的rocketmq-spring-boot-starter相关类:RocketMQListener接口:消费者都需实现该接口的消费方法onMessage(msg)。RocketMQPushConsumerLifecycleListener接口:当@RocketMQMessageListener中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer配置@RocketMQMessageListener:被该注解标注并实现了接口RocketMQListener的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。RocketMQMessageListener中的属性配置是可以从配置文件或配置中心获取的RocketMQLocalTransactionListener接口:可以发送事务型消息,它里面有两个方法executeLocalTransaction和checkLocalTransaction,用于实现执行本地事务和事务回查的两个方法。前者就是我们需要在事务型消息可以被消费之前需要在本地执行的事物操作,只有本地事务提交后发送到MQ中的事物消息才对Consumer可见,否则如果本地事务执行失败,那么消息队列中的消息也会回滚;如果超过一定时间还本地事务还没有提交,就会调用checkLocalTransaction执行本地事务回查。@RocketMQTransactionListener:配合RocketMQLocalTransactionListener接口一起使用,一般只需要修改txProducerGroup。2、配置文件3、消息生产者RocketMQProducerService.java执行上边sendKillSuccessEmailMsg()方法后,消息工程北发送到消息队列中并等待消费者来消费,如下图所示:接下来我们来编写消费者端来消费指定的消息。在非SpringBoot项目中一般我们编写消费者都是在获取consumer实例后,配置namesrvAddr、订阅有关topic后就需要将consumer注册到一个Listener上,当有消息了就处理。在SpringBoot中做法类似,不过在SpringBoot中组专门提供了一个注解RocketMQMessageListener用它来指定消费者来监听指定topic类型的消息,并且我们还需要实现RocketMQListener<T>接口的onMessage()方法,在此方法中接收并处理消息。下面我写一段代码示范一下,仅供参考!执行结果如下图所示:后台日志打印:收到邮件:完结撒花!!!

    LoveIT 2020-12-11
    RocketMQ
  • RocketMQ 消息存储

    RocketMQ 消息存储

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。消息生成者发送消息MQ收到消息,将消息进行持久化,在存储中新增一条记录返回ACK给生产者MQpush消息给对应的消费者,然后等待消费者返回ACK如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤MQ删除消息1、存储介质关系型数据库DBApache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障文件系统目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。2、消息存储结构RocketMQ主要存储文件包含消息文件(commitLog)、消息消费队列文件(ConsumeQueue)、Hash索引文件(IndexFile)、检测点文件(checkpoint)、abort(关闭异常文件)。RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件。CommitLog:消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题的消息,保证消息存储是完全的顺序写,单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;ConsumerQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节taghashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;IndexFile:文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故Rocketmq的索引文件其底层实现为hash索引。RocketMQ不会永久存储消息文件、消息消费队列文件,而是启用文件过期机制,并在磁盘空间不足或默认在凌晨四点删除过期文件,文件默认保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。3、RocketMQ消息存储和发送的性能保证1)消息存储—顺序写保证消息存储的高效性磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。2)消息发送—零拷贝技术保证消息发送的高效性Linux操作系统在运行过程中分为用户态和内核态,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:1)read;读取本地文件内容;2)write;将读取的内容通过网络发送出去。这两个看似简单的操作,实际进行了4次数据复制,分别是:从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;然后从用户态内存复制到网络驱动的内核态内存;最后是从网络驱动的内核态内存复制到网卡中进行传输。通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了4、刷盘机制RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。1)同步刷盘(SYNC_FLUSH)返回成功状态时,消息已经被写入磁盘。消息写入内存pagecache后,立即通知刷盘线程,刷盘完成后,返回消息写成功的状态。同步刷盘与异步刷盘的唯一区别是异步刷盘写完pagecache直接返回,而同步刷盘需要等待刷盘完成才返回,同步刷盘流程如下:写入pagecache后,线程等待,通知刷盘线程刷盘。刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。前端等待线程吐用户返回成功。2)异步刷盘(ASYNC_FLUSH)在返回写成功状态时,消息可能只是被写入了内存的PageCache,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。3)配置同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的一个。参考【1】Engineer-Bruce_Yang.什么是用户态?什么是内核态?如何区分?.CSDN

    LoveIT 2020-12-07
    RocketMQ
  • RocketMQ 顺序消息、延时消息、批量消息、过滤消息和事物消息之发送和消费案例

    RocketMQ 顺序消息、延时消息、批量消息、过滤消息和事物消息之发送和消费案例

    1、顺序消息消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。如图:而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费,如图:但是如果控制发送的消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则RocketMQ就可以严格保证FIFO特性,即保证消息的顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。顺序消息生产顺序消息消费执行结果MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。如果启动2个消费者呢?那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。如果启动3个消费者呢?那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:2、延时消息RocketMQ提供一种延时消息的解决方案,就是在特定的时间到了,消息才会被投递出去供consumer消费。比如电商里,提交了一个订单就可以发送一个延时消息,30分钟后去检查这个订单的状态,如果还是未付款就取消订单释放库存。总体来是简单的场景是满足了,但是需要注意的是延时的时间是需要按照默认配置的延时级别去配置的,而不是随意设置消息的延时时间。现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18默认的延迟级别这个配置下标从1开始比如级别2是延时5秒、级别5是延时1分钟。默认配置在不满足需求的情况下,可以在broker配置文件加入messageDelayLevel参数覆盖默认的延时级别配置。具体的细节可以参考org/apache/rocketmq/store/config/MessageStoreConfig.java发送延时消息和普通的消息不同之处在于Producer在发送消息的时候需要设置message.setDelayTimeLevel();延迟级别方法。其他参数和消费端的写法并与不同之处。启动消息消费者执行结果:从上图执行结果可以看到,当我们把压延时设置为4级,即延时30s时,消费者确实是只会在30s后消费到这些消息3、批量消息批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。发送批量消息如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:如果消息的总长度可能大于4MB时,这时候最好把消息进行分割发送大于4MB的批量消息执行结果:4、过滤消息在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:生产者消费者上面代码中消费者将接收包含tag1、tag2、tag3的消息。在生产者中每个tag下都有20条消息,所以这里消费者可以消费到60条消息,执行结果如下:虽然使用tag表达式比较简单,很容易使用,但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。RocketMQ支持的SQL92语法RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。数值比较,比如:>,>=,<,<=,BETWEEN,=;字符比较,比如:=,<>,IN;ISNULL或者ISNOTNULL;逻辑符号AND,OR,NOT;常量支持类型为:数值,比如:123,3.1415;字符,比如:'abc',必须用单引号包裹起来;NULL,特殊的常量布尔值,TRUE或FALSE只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:消息生产者发送消息时,你能通过putUserProperty来设置消息的属性消息消费者用MessageSelector.bySql()来使用sql筛选消息执行结果:customer只接受userId介于100到200并且userId>900的这些消息,下图是执行结果特别注意!这一步如果执行发生报错:org.apache.rocketmq.client.exception.MQClientException:CODE:1DESC:ThebrokerdoesnotsupportconsumertofiltermessagebySQL92,这是因为没有开启SQL92过滤功能,只需在broker的配置文件中添加enablePropertyFilter=true配置即可5、事务消息流程分析上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。**1)事务消息发送及提交**(1)发送消息(half消息)。(2)服务端响应消息写入结果。(3)根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。(4)根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)2)事务补偿(1)对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(2)Producer收到回查消息,检查回查消息对应的本地事务的状态(3)根据本地事务状态,重新Commit或者Rollback其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。3)事务消息状态事务消息共有三种状态,提交状态、回滚状态、中间状态:TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。TransactionStatus.Unknown:中间状态,它代表需要检查消息队列来确定状态。发送事务消息1)创建事务性生产者使用TransactionMQProducer类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请就是提交、回滚和中间状态这三种其中的一种。2)实现事务的监听接口当发送半消息成功时,我们使用executeLocalTransaction方法来执行本地事务。它返回提交、回滚和中间状态这三种事务状态之一。checkLocalTranscation方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。事物消息的使用限制事务消息不支持延时消息和批量消息。为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N=transactionCheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数。事务性消息可能不止一次被检查或消费。提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。参考【1】黑马程序员.RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件.bilibili【2】JaskeyLam.rocketmq怎么保证队列完全顺序消费?.知乎【3】木极子.RocketMQ错误:ThebrokerdoesnotsupportconsumertofiltermessagebySQL92.CSDN

    LoveIT 2020-12-02
    RocketMQ
  • RocketMQ Java 客户端搭建及简单消息生产和消费案例

    RocketMQ Java 客户端搭建及简单消息生产和消费案例

    RocketMQ是阿里开源的一款分布式消息中间件,主要分为以下几个部分1、生产者Producer,消费者(Consumer),NameServer,Broker2、生产者主要用来发送消息,,消费者用来接收消息3、nameServer就像一个邮局,Broker相当于快递小哥,nameServer用来管理Broker。4、发送消息有三种方式,分别是同步,异步,单向,异步发送会返回一个回调函数,同步则没有;单向发送,发送消息后不会返回发送结果5、每个生产者都会有一个Topic和tags,我把它理解为用来区分消息的标识这里我们采用2m-2s-sync模式搭建起来一个RocketMQ集群环境,详细的环境搭建工作参考上一篇教程:RokcetMQLinux2m-2s-sync集群搭建导入MQ客户端依赖1、消息发送消息发送一般步骤分析1)发送同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。执行结果:2)发送异步消息异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。与发送同步消息步骤基本上一致,只需在第五步消息发送环节,使用提供的的publicvoidsend(Messagemsg,SendCallbacksendCallback)throwsMQClientException,RemotingException,InterruptedException方法来发送异步消息。在发送时创建一个SendCallback接口的匿名内部实现类,实现消息发送成功的回调方法onSuccess(SendResultsendResult),以及消息发送失败出现异常时的onException(Throwablethrowable)方法。执行结果:3)单向发送消息单向消息一般用于不是特别关心发送结果的场景,例如发送日志。发送单向消息,只是在基本消息发送案例的基础上使用publicvoidsendOneway(Messagemsg)throwsMQClientException,RemotingException,InterruptedException方法对消息进行发送,消息发送后没有返回结果消费消息消息消费一般步骤分析消费者DefaultMQPushConsumer提供了publicvoidsetMessageModel(MessageModelmessageModel)方法来设置消费的模式。1)负载均衡模式什么是负载均衡模式?如下图生产者生产了A,,B,C,D四个消息,消费者采用负载均衡方式消费消息,含义就是多个消费者共同消费队列消息,每个消费者处理的消息不同。就如上图所示,消费者customer1消费了A,B消息,消费者customer2消费了C,D消息。负载均衡模式是RocketMQ默认的消费模式,下面这段代码是一个显示的指定为负载均衡消费模式的消费者代码执行结果:同步发送者的代码参考上面1)发送同步消息代码执行完毕,可以看到两个消费者分别消费了发送者发送的10个消息2)广播模式什么又是广播模式?如下图生产者同样生产了A,,B,C,D四个消息,消费者采用广播的方式消费消息,他的含义就是每个消费者消费的消息都是相同的,就如上图所示,消费者customer1消费了A,B,C,D消息,消费者customer2消费也了A,B,C,D消息。执行结果:代码执行完毕,可以看到两个消费者消费的内容是完全相同的参考【1】黑马程序员.RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件.bilibili【2】故事与酒~~.rocketmq简单的消息发送接收案例.CSDN【3】许仙许仙!.RocketMQ发送消息的基本案例.CSDN

    LoveIT 2020-12-01
    RocketMQ
  • RokcetMQ Linux 2m-2s-sync集群搭建

    RokcetMQ Linux 2m-2s-sync集群搭建

    本章节将示范三台RokcetMQ服务端集群搭建步骤。所需准备工作,创建两台虚拟机环境并安装好java开发工具包JDK,可以使用VM或者vagrant+virtualbox搭建centos/ubuntu环境,本案例基于宿主机Windows7系统同时使用VM搭建的centos7环境,如果直接使用云服务器或者物理机同理。1、集群各角色介绍Producer:消息的发送者;举例:发信者Consumer:消息接收者;举例:收信者Broker:暂存和传输消息;举例:邮局NameServer:管理Broker;举例:各个邮局的管理机构Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息MessageQueue:相当于是Topic的分区;用于并行发送和接收消息这里我们要做的就是搭建一个双主双从同步的RocketMQ集群,包括两个NameServer,两个BrokerMaster,两个BorkerSalve。2、集群搭建方式集群特点NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。集群模式NameServer集群比较简单,只需要在多个物理机上开启多个NameServer既可以了。Broker集群有多种配置方式:1)单Master模式这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。2)多Master模式一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。3)多Master多Slave模式(异步)每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;缺点:Master宕机,磁盘损坏情况下会丢失少量消息。4)多Master多Slave模式(同步)每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。Master和Slave的配置文件都在conf目录下,其中2m-2s-sync就是搭建双主双从同步的配置文件在配置文件中Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分3、双主双从集群搭建总体架构消息高可用采用2m-2s(同步双写)方式集群工作流程启动NameServer,NameServer起来后监听端口9876,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。服务器环境序号IP角色架构模式1192.168.253.128nameserver、brokerserverMaster1、Slave22192.168.253.129nameserver、brokerserverMaster2、Slave1Host添加信息配置如下:配置完成后,重启网卡防火墙配置宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876、10911、11011。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:nameserver默认使用9876端口master默认使用10911端口slave默认使用11011端口执行以下命令:创建消息持久化存储路径BorkerMaster目录设置:BorkerSalve目录设置:broker配置文件特别注意:尤其是在学习阶段的小伙伴,千万不要在同一台主机上把MasterBroker的持久化路径和SalveBroker的路径设置成一样的了,否则会启动失败1)master1服务器:192.168.235.128修改配置如下:2)slave2服务器:192.168.235.128修改配置如下:3)master2服务器:192.168.235.129修改配置如下:4)slave1服务器:192.168.235.129修改配置如下:修改启动脚本文件【非必须】1)runbroker.sh需要根据内存大小进行适当的对JVM参数进行调整:2)runserver.sh服务启动1)启动NameServe集群分别在192.168.235.128和192.168.235.129启动NameServer2)启动Broker集群在192.168.25.128上启动master1和slave2master1:slave2:启动成功之后输入jps命令,如果可以看到一个NamesrvStartup进程,两个BrokerStartup进程,就说明启动成功了在192.168.235.129上启动master2和slave2master2slave1同样地,启动成功之后输入jps命令,如果可以看到一个NamesrvStartup进程,两个BrokerStartup进程,就说明启动成功了至此,一个简单的RocketMQ2m-2s集群就搭建起来了,接下来我们来部署一个在使用RocketMQ开发的过程中非常常用的服务监控平台4、RocketMQ监控平台部署Apache版的RocketMQ管理界面部署工具可以从github上下载源码,地址:https://github.com/apache/rocketmq-externals,部署流程如下:下载并编译打包注意:打包前在rocketmq-console中配置namesrv集群地址:这是一个SPringBoot项目,打包之后会得到一个可执行的jar包,我们把jar上传到任意一条Linux服务器上,使用如下命令执行jar包,让监控平台运行起来:启动成功之后我们可以访问浏览器:192.168.253.128:8080进入控制台界面了,如下图:集群状态:参考【1】sean-zou.阿里RocketMQQuickStart.CSDN【2】黑马程序员.RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件.bilibili【3】yangxinhu_coder.java.lang.RuntimeException:Lockfailed,MQalreadystarted.CSDN【4】关惜寒.rocketMQ安装部署详细解析.CSDN

    LoveIT 2020-11-30
    RocketMQ
  • RocketMQ Linux 安装配置

    RocketMQ Linux 安装配置

    Linux安装1、准备工作下载RocketMQ截止教程发布日期,RocketMQ最新版本:4.7.1下载地址可以直接点击上面下载地址下载到本地然后上传到Liunx上,或者也可以直接使用wget命令在Linux上下载环境要求Linux64位系统JDK1.8(64位)源码安装需要安装Maven3.2.x2、安装RocketMQ安装步骤本教程以二进制包方式安装解压安装包进入安装目录目录介绍进入到安装目录我们就可以看到如下文件目录bin:启动脚本,包括shell脚本和CMD脚本conf:实例配置文件,包括broker配置文件、logback配置文件等lib:依赖jar包,包括Netty、commons-lang、FastJSON等3、启动RocketMQ启动NameServer进入到rocketmqden安装目录下的bin目录,执行以下命令:启动成功之后NameServer就默认监听端口:9876启动Broker:消息中转角色,负责存储消息,转发消息有关问题:在启动Broker中的时候,如果咱们的机器内存比较小的话,RocketMQ默认的虚拟机内存较大,启动Broker会因为内存不足失败,如下图所示:这时需要我们需要编辑如下两个配置文件,修改JVM内存大小参考设置:runserver.shrunbroker.sh4、测试RocketMQ接下来,我们测试一下我们安装的RocketMQ能否正常收发信息,我们需要首先开启两个虚拟机窗口,其中一个是发消息的,一个是收消息的:在发送/接收消息之前,我们需要告诉客户端nameservers的位置。RocketMQ提供了多种方法来实现这一目标。为简单起见,我们使用环境变量NAMESRV_ADDR发送消息看到在刷刷的打印信息,就说明发送消息没有问题接收消息执行完上边的命令,接收者就开始接收刚才发送端发出的信息,如下图所示:关闭RocketMQ参考【1】sean-zou.阿里RocketMQQuickStart.CSDN【2】黑马程序员.RocketMQ系统精讲,经受历年双十一狂欢节考验的分布式消息中间件.bilibili

    LoveIT 2020-11-29
    RocketMQ
  • RocketMQ教程

    RocketMQ教程

    RocektMQ是阿里巴巴在2012年开源的一个纯java、分布式、队列模型的第三代消息中间件,不仅在传统高频交易链路有着低延迟的出色表现,在实时计算等大数据领域也有着不错的吞吐。2016年11月11号,双十一大促见证了RocketMQ低延迟存储架构的成功试水,99.996%的延迟落在了10ms以内,极个别由于GC引发的停顿在50ms以内,其高性能、低延时和高可靠的特性承载了近年来双十一17万笔/秒的交易峰值,在整个生产链路上都有着稳定和出色的表现。其在同年捐赠给Apache后正式进入孵化期。并于2017年9月RocketMQ正式从Apache社区正式毕业,成为Apache顶级项目。谁适合阅读本教程?本教程是为专业的程序开发人员,通过本教程你可以一步一步了解RocketMQ的应用。为什么要用MQ?消息队列是一种“先进先出”的数据结构其应用场景主要包含以下3个方面应用解耦系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。流量削峰应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。处于经济考量目的:业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰数据分发通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可1.2MQ的优点和缺点优点:解耦、削峰、数据分发缺点包含以下几点:系统可用性降低系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用?系统复杂度提高MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?一致性问题A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?各种MQ产品的比较常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ等参考【1】关惜寒.rocketMQ安装部署详细解析.CSDN【2】Apache.Rocket官网

    LoveIT 2020-11-29
    RocketMQ