JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用

发布时间:2021-04-28作者:laosun阅读(3026)

JavaDemo案例演示RocketMQ

JavaDemo案例演示RocketMQ 宕机故障下的高可用

    今天演示RocketMQ 宕机故障下的高可用

    使用demo案例之前的文章已经上传至gitee,自己去clone一份。地址:https://gitee.com/rdweiif/rocketmq-spring-boot-starter-demo 

    RocketMQ的环境是双主双从模式,具体请看文章:RocketMQ 配置双主模式 和 双主双从模式


    我们先演示双主双从环境下到底能否实现高可用。


    我们先检查下环境:


    image.png


    双主双从的配置这里就不展示了,请移步查看文章:RocketMQ 配置双主模式 和 双主双从模式 


    从图中得知,双主双从模式是正常的,我们打开RocketMQ-SpringBoot项目。首先增加namesrv地址,因为之前的测试都是单台单主模式下进行的测试。


    image.png


    我们修改完配置后,模拟创建生产者进行消息发送,如下所示「代码片段」:

    /**
     * 集群测试(双主双从)
     * 增加两个namesrv地址
     * 需要修改配置文件:rocketmq.name-server=10.211.55.11:9876;10.211.55.12:9876
     *
     * @return java.lang.Object
     * @throws
     * @Date 2021/4/27 8:35 下午
     */
    @RequestMapping("/cluster")
    public Object cluster() {
        SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TopicCluster, "集群消息");
        log.info("《集群生产者》发送结果:{}", sendResult);
        return sendResult;
    }


    再创建一个消费者进行消费。

    package com.sunjs.rocketmq.consumer.cluster;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import com.sunjs.rocketmq.common.MqConfig;
    import com.sunjs.rocketmq.consumer._BaseConsumer;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 测试消费端接收String字符串
     *
     * @Date 2021/4/25 2:48 下午
     */
    @Slf4j
    @Service
    @RocketMQMessageListener(
            topic = MqConfig.Topic.TopicCluster, //topic
            consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TopicCluster //分组规则,Group-"topic命名"
    )
    public class ClusterConsumer extends _BaseConsumer<String> implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            log.info("《集群消费者》接收数据:{}", message);
            if (!isVerify(message, String.class)) {
                return;
            }
            // 执行MQ 消息公共处理
            todo();
        }
    
        /**
         * 具体业务处理
         *
         * @Author: sun
         * @Date: 2021/4/25 3:29 下午
         */
        @Override
        protected void handle() {
            // TOOD 业务处理
        }
    }


    都是很普通的生产者和消费者。主要是打印生产和消费的详细日志,我们来分析broker切换。

    现在我们启动SpringBoot项目,请求URL进行测试,此时我的控制台打印情况如下:


    2021-04-28 20:45:21.060  INFO 79324 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F7FDAE20000, offsetMsgId=0AD3370B00002B67000000000016FDE8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-a, queueId=3], queueOffset=0]
    2021-04-28 20:45:56.013  INFO 79324 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=broker-a, queueId=3, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619613921038, bornHost=/10.211.55.2:51921, storeTimestamp=1619613921055, storeHost=/10.211.55.11:11111, msgId=0AD3370B00002B67000000000016FDE8, commitLogOffset=1506792, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619613956013, id=75b500d8-e90e-818e-011f-ed9b1129c215, UNIQ_KEY=7F00000135DC18B4AAC28F7FDAE20000, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619613920953}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]


    image.png


    我们搭建的是双主双从,有两主,也就是说现在这里却只有一台11机器上的broker-a,这是为什么呢? 

    其实我们可以分析一下:

    因为路由信息在broker上生成,当发送者发送消息的个数小于某个阀值时,RocketMQ根据轮询规则,消息会落在同一个broker上,该broker上生成路由规则上报给NameServer后,再同步给发送者,这样在发送者的内存中就只有一个broker的队列信息了,NameServer也就只有一个broker的路由信息。这里可能大部分人不太明白,没关系,后期我们会详细介绍。

    言归正传:

    我们通过控制台看到消息打到了11机器上,现在我们将11机器停机,或者停止掉broker。

    # 该命令将会停止11机器上的所有broker
    > sh bin/mqshutdown broker

    停止后,此时我们看到控制台输出一堆的日志信息,如下所示:

    2021-04-28 20:47:33.822  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
    2021-04-28 20:47:33.929  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
    2021-04-28 20:47:34.168  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
    2021-04-28 20:47:34.275  INFO 79324 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true

    这时候我们再生产一条消息看看会发生什么现象

    2021-04-28 20:49:44.259 ERROR 79324 --- [nio-8080-exec-7] o.a.r.spring.core.RocketMQTemplate       : syncSend failed. destination:TopicCluster, message:GenericMessage [payload=RocketMQ双主双从测试, headers={id=cf63a0b2-6a5d-d7f4-5224-742ca1ea5707, timestamp=1619614184256}] 
    2021-04-28 20:49:44.261 ERROR 79324 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: No route info of this topic: TopicCluster
    See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause
    
    org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicCluster
    See http://rocketmq.apache.org/docs/faq/ for further details.
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685) ~[rocketmq-client-4.8.0.jar:4.8.0]
    	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]
    	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]
    	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]
    	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]

    控制台抛出了一大堆的异常,找不到主体TopicCluster。


    总结:RocketMQ本身并不支持高可用,不能自动控制节点切换,一旦出了问题,需要人为介入。那么怎么人为介入呢,就是为12机器上的 broker 手动创建主题Topic。

    我们可以使用rocketmq-console来测试添加一下,界面上给broker-b新增TopicCluster主题(mqadmin指令也可以操作)。


    image.png


    创建完成后,我们再生产一条信息进行测试一下,结果如下所示,消息正常生产和消息了:

    2021-04-28 20:56:02.738  INFO 79324 --- [nio-8080-exec-3] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F00000135DC18B4AAC28F89A5AE0176, offsetMsgId=0AD3370C000056CE0000000000035237, messageQueue=MessageQueue [topic=TopicCluster, brokerName=broker-b, queueId=6], queueOffset=0]
    2021-04-28 20:56:02.741  INFO 79324 --- [essageThread_13] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=broker-b, queueId=6, storeSize=306, queueOffset=0, sysFlag=0, bornTimestamp=1619614562734, bornHost=/10.211.55.2:52513, storeTimestamp=1619614562733, storeHost=/10.211.55.12:22222, msgId=0AD3370C000056CE0000000000035237, commitLogOffset=217655, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619614562741, id=3773aa8a-3ec7-7790-d241-a3e42f6e6ef7, UNIQ_KEY=7F00000135DC18B4AAC28F89A5AE0176, CLUSTER=DefaultCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619614562734}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]


    既然RocketMQ本身不支持高可用,那使用什么来支持真正意义上的高可用呢,比如:master服务宕机,slave服务转换成master。


    我们可以使用DLedger,DLedger 这里就不做介绍了,后期会单独介绍这个。今天我们主要是演示RocketMQ容灾高可用。

    那么DLedger如何配置呢,如果RocketMQ安装版本是按照我之前的文章来操作的,4.8版本是支持的,并且下载的是编译好的binary zip包。那么官方也给出了配置案例

    image.png

    我们进入rocketmq conf目录下观察一下。/usr/local/rocketmq-4.8.0/conf/


    image.png

    image.png


    官方附带的dledger配置案例,有三个配置文件,分别为:broker-n0、broker-n1、broker-n2。

    可以打开这三个文件挨个看一下,对比一下,这里就不详解了,直接上手测试吧。


    现在我们无需修改任何东西,现在要做的就是停止掉11机器和12机器上的所有broker和namesrv服务。

    11机器:

    # 先启动11机器上的namesrv服务
    > nohup sh bin/mqnamesrv &
    # 启动dledger服务
    > nohup sh bin/dledger/fast-try.sh start &


    都启动好后,我们打开rocketmq-console控制台看一下集群菜单。


    image.png

    我们可以看到启动后,有一主两从。生产环境搭建也建议使用3台机器。 现在我们看下那三个配置文件,看看哪个配置文件监听的端口是30931这个端口,配置项:listenPort。

    我这里演示的环境经过查询broker-n2.conf这个文件监听的端口是30931,那我现在kill掉这个服务。

    # 查看broker-n2的线程,然后挨个kill掉
    > ps -ef|grep broker-n2
    > kill -9 21736 21755 21767

    停止后,然后我们再看一下rocketma-console控制台


    image.png


    我们发现30911从slave自动转换到了master,从而实现了真正意义上的高可用。


    现在我们使用java demo 案例测试一下。使用 sh bin/mqshutdown broker 先停止掉所有的broker,再重启一次,让三台broker都启动起来。

    image.png


    我们先修改java demo的配置文件,将namesrvAddr配置项去掉12这台机器,因为我们现在使用的是一台机器启动了三个broker。

    发送消息后控制台输出如下:

    2021-04-28 21:33:53.720  INFO 80521 --- [nio-8080-exec-1] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAC4B690000, offsetMsgId=0AD3370B000078BF0000000000000030, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=3], queueOffset=0]
    2021-04-28 21:34:30.231  INFO 80521 --- [MessageThread_1] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=3, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619616833399, bornHost=/10.211.55.2:50006, storeTimestamp=1619616833471, storeHost=/10.211.55.11:30911, msgId=0AD3370B000078BF0000000000000030, commitLogOffset=48, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619616870231, id=266f6a06-5363-e267-885b-404e4851ff8c, UNIQ_KEY=7F0000013A8918B4AAC28FAC4B690000, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619616833251}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]


    消息发往了30911这台master broker。现在我们使用上边的方式只停止掉这个master服务,找到线程 kill 掉。


    image.png

    然后30921从slave转换成了master,我们再发送消息试试看。

    2021-04-28 21:37:23.031  INFO 80521 --- [nio-8080-exec-4] c.s.r.p.controller.ProducerController    : 《集群生产者》发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013A8918B4AAC28FAF7E42000C, offsetMsgId=0AD3370B000078C90000000000000AC8, messageQueue=MessageQueue [topic=TopicCluster, brokerName=RaftNode00, queueId=0], queueOffset=0]
    2021-04-28 21:37:23.032  INFO 80521 --- [MessageThread_3] c.s.r.consumer.cluster.ClusterConsumer   : 《集群消费者》接收数据:MessageExt [brokerName=RaftNode00, queueId=0, storeSize=303, queueOffset=0, sysFlag=0, bornTimestamp=1619617043010, bornHost=/10.211.55.2:50453, storeTimestamp=1619617043014, storeHost=/10.211.55.11:30921, msgId=0AD3370B000078C90000000000000AC8, commitLogOffset=2760, bodyCRC=1376009287, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicCluster', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1619617043032, id=0bf4f7e5-b8ad-030e-6561-93aeb53dbd3a, UNIQ_KEY=7F0000013A8918B4AAC28FAF7E42000C, CLUSTER=RaftCluster, WAIT=false, contentType=text/plain;charset=UTF-8, timestamp=1619617043010}, body=[82, 111, 99, 107, 101, 116, 77, 81, -27, -113, -116, -28, -72, -69, -27, -113, -116, -28, -69, -114, -26, -75, -117, -24, -81, -107], transactionId='null'}]

    消息又全部打到了30921这个broker上。

    我们经过一系列的测试,使用Dledger达到了高可用架构。


    我这里附带一份三台机器的测试配置方式吧。

    环境:三台虚拟机,ip为:110.211.55.11、10.211.55.12、10.211.55.13

    在三台机器的conf目录下新增配置文件,我这里就粘贴一份,根据机器不同进行修改吧。

    三台机器上的 listenPort 配置项 依次为:10001、10002、10003

    三台机器上的 dLegerSelfId 配置项 依次为:n0、n1、n2

    # dledger_broker.conf
    
    #集群名称
    brokerClusterName = RaftCluster
    
    #broker集群名称
    brokerName=RaftNode00
    
    #监听端口
    listenPort=10001
    
    #namesrv地址列表
    namesrvAddr=10.211.55.11:9876;10.211.55.12:9876;10.211.55.13:9876
    
    #主题不存在时是否自动创建主题
    autoCreateTopicEnable=true
    
    #订阅组不存在时是否自动创建订阅组
    autoCreateSubscriptionGroup=true
    
    #数据存储根路径
    storePathRootDir=/usr/local/rocketmq-4.8.0/store
    
    #commitlog数据存储根路径
    #storePathCommitLog=/tmp/rmqstore/node02/commitlog
    
    #是否启用DLeger集群模式
    enableDLegerCommitLog=true
    
    #与brokerName保持一致就好
    dLegerGroup=RaftNode00
    
    #dLeger集群下的节点配置
    dLegerPeers=n0-10.211.55.11:11111;n1-10.211.55.12:22222;n2-10.211.55.13:33333
    
    ## must be unique
    #当前节点在dLeger集群下的标识
    dLegerSelfId=n0
    
    #服务端处理消息发送线程池数量
    sendMessageThreadPoolNums=16


    然后分别启动服务

    # 挨个启动namesrv服务
    nohup sh bin/mqnamesrv &
    
    # 挨个启动broker服务
    nohup sh bin/mqbroker -c conf/dledger_broker.conf &


0 +1

版权声明

 Java  源码  rocketmq

 请文明留言

1 条评论