发布时间:2021-04-28作者:laosun阅读(3215)
JavaDemo案例演示RocketMQ 宕机故障下的高可用
今天演示RocketMQ 宕机故障下的高可用
使用demo案例之前的文章已经上传至gitee,自己去clone一份。地址:https://gitee.com/rdweiif/rocketmq-spring-boot-starter-demo
RocketMQ的环境是双主双从模式,具体请看文章:RocketMQ 配置双主模式 和 双主双从模式
我们先演示双主双从环境下到底能否实现高可用。
我们先检查下环境:
双主双从的配置这里就不展示了,请移步查看文章:RocketMQ 配置双主模式 和 双主双从模式
从图中得知,双主双从模式是正常的,我们打开RocketMQ-SpringBoot项目。首先增加namesrv地址,因为之前的测试都是单台单主模式下进行的测试。
我们修改完配置后,模拟创建生产者进行消息发送,如下所示「代码片段」:
/** * 集群测试(双主双从) * 增加两个namesrv地址 * 需要修改配置文件:rocketmq.name-server=10.211.55.11:9876;10.211.55.12:9876 * * @return java.lang.Object * @throws * @Date 2021/4/27 8:35 下午 */ @RequestMapping("/cluster") public Object cluster() { SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TopicCluster, "集群消息"); log.info("《集群生产者》发送结果:{}", sendResult); return sendResult; }
再创建一个消费者进行消费。
package com.sunjs.rocketmq.consumer.cluster; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import com.sunjs.rocketmq.common.MqConfig; import com.sunjs.rocketmq.consumer._BaseConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 测试消费端接收String字符串 * * @Date 2021/4/25 2:48 下午 */ @Slf4j @Service @RocketMQMessageListener( topic = MqConfig.Topic.TopicCluster, //topic consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TopicCluster //分组规则,Group-"topic命名" ) public class ClusterConsumer extends _BaseConsumer<String> implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { log.info("《集群消费者》接收数据:{}", message); if (!isVerify(message, String.class)) { return; } // 执行MQ 消息公共处理 todo(); } /** * 具体业务处理 * * @Author: sun * @Date: 2021/4/25 3:29 下午 */ @Override protected void handle() { // TOOD 业务处理 } }
都是很普通的生产者和消费者。主要是打印生产和消费的详细日志,我们来分析broker切换。
现在我们启动SpringBoot项目,请求URL进行测试,此时我的控制台打印情况如下:
2021-04-28 20:45:21.060 INFO 79324 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F7FDAE20000, offsetMsgId=0AD3370B00002B67000000000016FDE8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-a, queueId=3], queueOffset=0] 2021-04-28 20:45:56.013 INFO 79324 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer : 《集群消费者》接收数据:MessageExt [brokerName=broker-a, queueId=3, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619613921038, bornHost=/10.211.55.2:51921, storeTimestamp=1619613921055, storeHost=/10.211.55.11:11111, msgId=0AD3370B00002B67000000000016FDE8, commitLogOffset=1506792, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619613956013, id=75b500d8-e90e-818e-011f-ed9b1129c215, UNIQ_KEY=7F00000135DC18B4AAC28F7FDAE20000, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619613920953}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]
我们搭建的是双主双从,有两主,也就是说现在这里却只有一台11机器上的broker-a,这是为什么呢?
其实我们可以分析一下:
因为路由信息在broker上生成,当发送者发送消息的个数小于某个阀值时,RocketMQ根据轮询规则,消息会落在同一个broker上,该broker上生成路由规则上报给NameServer后,再同步给发送者,这样在发送者的内存中就只有一个broker的队列信息了,NameServer也就只有一个broker的路由信息。这里可能大部分人不太明白,没关系,后期我们会详细介绍。
言归正传:
我们通过控制台看到消息打到了11机器上,现在我们将11机器停机,或者停止掉broker。
# 该命令将会停止11机器上的所有broker > sh bin/mqshutdown broker
停止后,此时我们看到控制台输出一堆的日志信息,如下所示:
2021-04-28 20:47:33.822 INFO 79324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true 2021-04-28 20:47:33.929 INFO 79324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true 2021-04-28 20:47:34.168 INFO 79324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true 2021-04-28 20:47:34.275 INFO 79324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
这时候我们再生产一条消息看看会发生什么现象
2021-04-28 20:49:44.259 ERROR 79324 --- [nio-8080-exec-7] o.a.r.spring.core.RocketMQTemplate : syncSend failed. destination:TopicCluster, message:GenericMessage [payload=RocketMQ双主双从测试, headers={id=cf63a0b2-6a5d-d7f4-5224-742ca1ea5707, timestamp=1619614184256}] 2021-04-28 20:49:44.261 ERROR 79324 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: No route info of this topic: TopicCluster See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685) ~[rocketmq-client-4.8.0.jar:4.8.0] at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0] at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0] at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0] at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
控制台抛出了一大堆的异常,找不到主体TopicCluster。
总结:RocketMQ本身并不支持高可用,不能自动控制节点切换,一旦出了问题,需要人为介入。那么怎么人为介入呢,就是为12机器上的 broker 手动创建主题Topic。
我们可以使用rocketmq-console来测试添加一下,界面上给broker-b新增TopicCluster主题(mqadmin指令也可以操作)。
创建完成后,我们再生产一条信息进行测试一下,结果如下所示,消息正常生产和消息了:
2021-04-28 20:56:02.738 INFO 79324 --- [nio-8080-exec-3] c.s.r.p.controller.ProducerController : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F89A5AE0176, offsetMsgId=0AD3370C000056CE0000000000035237, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-b, queueId=6], queueOffset=0] 2021-04-28 20:56:02.741 INFO 79324 --- [essageThread_13] c.s.r.consumer.cluster.ClusterConsumer : 《集群消费者》接收数据:MessageExt [brokerName=broker-b, queueId=6, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619614562734, bornHost=/10.211.55.2:52513, storeTimestamp=1619614562733, storeHost=/10.211.55.12:22222, msgId=0AD3370C000056CE0000000000035237, commitLogOffset=217655, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619614562741, id=3773aa8a-3ec7-7790-d241-a3e42f6e6ef7, UNIQ_KEY=7F00000135DC18B4AAC28F89A5AE0176, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619614562734}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]
既然RocketMQ本身不支持高可用,那使用什么来支持真正意义上的高可用呢,比如:master服务宕机,slave服务转换成master。
我们可以使用DLedger,DLedger 这里就不做介绍了,后期会单独介绍这个。今天我们主要是演示RocketMQ容灾高可用。
那么DLedger如何配置呢,如果RocketMQ安装版本是按照我之前的文章来操作的,4.8版本是支持的,并且下载的是编译好的binary zip包。那么官方也给出了配置案例
我们进入rocketmq conf目录下观察一下。/usr/local/rocketmq-4.8.0/conf/
官方附带的dledger配置案例,有三个配置文件,分别为:broker-n0、broker-n1、broker-n2。
可以打开这三个文件挨个看一下,对比一下,这里就不详解了,直接上手测试吧。
现在我们无需修改任何东西,现在要做的就是停止掉11机器和12机器上的所有broker和namesrv服务。
11机器:
# 先启动11机器上的namesrv服务 > nohup sh bin/mqnamesrv & # 启动dledger服务 > nohup sh bin/dledger/fast-try.sh start &
都启动好后,我们打开rocketmq-console控制台看一下集群菜单。
我们可以看到启动后,有一主两从。生产环境搭建也建议使用3台机器。 现在我们看下那三个配置文件,看看哪个配置文件监听的端口是30931这个端口,配置项:listenPort。
我这里演示的环境经过查询broker-n2.conf这个文件监听的端口是30931,那我现在kill掉这个服务。
# 查看broker-n2的线程,然后挨个kill掉 > ps -ef|grep broker-n2 > kill -9 21736 21755 21767
停止后,然后我们再看一下rocketma-console控制台
我们发现30911从slave自动转换到了master,从而实现了真正意义上的高可用。
现在我们使用java demo 案例测试一下。使用 sh bin/mqshutdown broker 先停止掉所有的broker,再重启一次,让三台broker都启动起来。
我们先修改java demo的配置文件,将namesrvAddr配置项去掉12这台机器,因为我们现在使用的是一台机器启动了三个broker。
发送消息后控制台输出如下:
2021-04-28 21:33:53.720 INFO 80521 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAC4B690000, offsetMsgId=0AD3370B000078BF0000000000000030, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=3], queueOffset=0] 2021-04-28 21:34:30.231 INFO 80521 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=3, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619616833399, bornHost=/10.211.55.2:50006, storeTimestamp=1619616833471, storeHost=/10.211.55.11:30911, msgId=0AD3370B000078BF0000000000000030, commitLogOffset=48, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619616870231, id=266f6a06-5363-e267-885b-404e4851ff8c, UNIQ_KEY=7F0000013A8918B4AAC28FAC4B690000, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619616833251}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]
消息发往了30911这台master broker。现在我们使用上边的方式只停止掉这个master服务,找到线程 kill 掉。
然后30921从slave转换成了master,我们再发送消息试试看。
2021-04-28 21:37:23.031 INFO 80521 --- [nio-8080-exec-4] c.s.r.p.controller.ProducerController : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAF7E42000C, offsetMsgId=0AD3370B000078C90000000000000AC8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=0], queueOffset=0] 2021-04-28 21:37:23.032 INFO 80521 --- [MessageThread_3] c.s.r.consumer.cluster.ClusterConsumer : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=0, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619617043010, bornHost=/10.211.55.2:50453, storeTimestamp=1619617043014, storeHost=/10.211.55.11:30921, msgId=0AD3370B000078C90000000000000AC8, commitLogOffset=2760, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619617043032, id=0bf4f7e5-b8ad-030e-6561-93aeb53dbd3a, UNIQ_KEY=7F0000013A8918B4AAC28FAF7E42000C, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619617043010}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]
消息又全部打到了30921这个broker上。
我们经过一系列的测试,使用Dledger达到了高可用架构。
我这里附带一份三台机器的测试配置方式吧。
环境:三台虚拟机,ip为:110.211.55.11、10.211.55.12、10.211.55.13
在三台机器的conf目录下新增配置文件,我这里就粘贴一份,根据机器不同进行修改吧。
三台机器上的 listenPort 配置项 依次为:10001、10002、10003
三台机器上的 dLegerSelfId 配置项 依次为:n0、n1、n2
# dledger_broker.conf #集群名称 brokerClusterName = RaftCluster #broker集群名称 brokerName=RaftNode00 #监听端口 listenPort=10001 #namesrv地址列表 namesrvAddr=10.211.55.11:9876;10.211.55.12:9876;10.211.55.13:9876 #主题不存在时是否自动创建主题 autoCreateTopicEnable=true #订阅组不存在时是否自动创建订阅组 autoCreateSubscriptionGroup=true #数据存储根路径 storePathRootDir=/usr/local/rocketmq-4.8.0/store #commitlog数据存储根路径 #storePathCommitLog=/tmp/rmqstore/node02/commitlog #是否启用DLeger集群模式 enableDLegerCommitLog=true #与brokerName保持一致就好 dLegerGroup=RaftNode00 #dLeger集群下的节点配置 dLegerPeers=n0-10.211.55.11:11111;n1-10.211.55.12:22222;n2-10.211.55.13:33333 ## must be unique #当前节点在dLeger集群下的标识 dLegerSelfId=n0 #服务端处理消息发送线程池数量 sendMessageThreadPoolNums=16
然后分别启动服务
# 挨个启动namesrv服务 nohup sh bin/mqnamesrv & # 挨个启动broker服务 nohup sh bin/mqbroker -c conf/dledger_broker.conf &
版权属于: 技术客
原文地址: https://www.sunjs.com/article/detail/2810b920ea954961ac151302a4fed1f1.html
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。