RocketMQ 下载安装,修改运行内存,配置单主

发布时间:2021-04-25作者:laosun阅读(351)

RocketMQ

RocketMQ 下载安装,RocketMQ配置单主模式,RocketMQ配置双主双从模式,RocketMQ配置双主模式,RocketMQ配置ACL权限,RocketMQ配置RocketMQ-Console,RocketMQ配置console管理后台的登录账户密码,RocketMQ在ACL权限下Console管理后台如何配置,修改RocketMQ的Namrsrv启动内存,修改RocketMQ的Broker启动内存

    讲解目录

    1:RocketMQ 下载安装启动测试,调整 namesrv 和 broker 启动内存

    2:SpringBoot 集成 RocketMQ,

    3:RocketMQ配置双主模式 和 双主双从模式,

    4:RocketMQ配置ACL权限,

    5:RocketMQ配置RocketMQ-Console,

    6:RocketMQ配置console管理后台的登录账户密码,

    7:RocketMQ在ACL权限下Console管理后台如何配置,


    本文不介绍各项服务是什么意思?是怎么互相通信的?只讲解如何安装配置。其他讲解文章待后续补充,先会用,先了解,再熟悉。


    RocketMQ 下载安装

    1、打开apache官网,Apache官网 ,下拉到底部,找到RocketMQ ,点击进去到主页,或者直接访问 RocketMQ 主页 

    可以看到最新版本为:release-notes-4.8.0 ,点击进去进行下载,如下图所示:

    image.png

    演示系统:linux CentOS系统「本地虚拟机」


    下载:

    如果想看源码的同学们,可以去github上clone一份 rocketmq - github源码地址

    这里我们只演示编译后的下载安装方式

    我们先进行下载,使用指令:

    > cd /usr/local
    > wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

    如下所示:

    image.png

    下载完成后,使用unzip进行解压,修改文件名称(博主嫌弃它太长)

    # 解压
    > unzip rocketmq-all-4.8.0-bin-release.zip
    # 修改名称
    > mv rocketmq-all-4.8.0-bin-release rocketmq-4.8.0
    # 删除刚下载的压缩文件
    > rm -rf rocketmq-all-4.8.0-bin-release.zip

    都准备完毕了,我们使用指令查看一下目录文件详情:

    image.png


    下边我们的操作根目录都在 /usr/local/rocketmq-4.8.0 目录下。


    修改 namesrv 和 broker 启动内存:

    # 查看 namesrv 启动内存给分配了多少。
    > vim bin/runserver.sh
    # 编辑打开后,我们按shift+G,跳转到最后,我们可以看到如下图所示:

    image.png

    仅一个namesrv就给了4G内存,太大了,namesrv没必要给这么大,我觉得给个 512m 就足够了。

    我们进行调整修改成如下所示:

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    然后保存退出。


    再来看一下broker的内存分配情况:

    > vim bin/runbroker.sh
    # 编辑打开后,我们按shift+G,跳转到最后,我们可以看到如下图所示:

    image.png

    broker 分配内存为8G,我们调整成1G。

    我们进行调整修改成如下所示:

    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

    然后保存退出。


    启动单主模式:

    namesrv启动:

    # 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动
    
    # 1、启动 namesrv 语法,退出控制台就退出了
    > sh bin/mqnamesrv   
    # 2、后台启动 namesrv 语法
    > nohup sh bin/mqnamesrv &

    第 1 种语法:如下所示,表示启动成功:image.png

    第 2 种语法:image.png


    broker启动:

    # 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动
    
    # 1、启动 broker 语法,退出控制台就退出了
    > sh bin/mqbroker -c conf/broker.conf   
    # 2、后台启动 broker 语法
    > nohup sh bin/mqbroker -c conf/broker.conf &
      
    # 或者增加namesrv地址(如果broker启动后,没有和namesrv链接成功),例如:
    > nohup sh bin/mqbroker -c conf/broker.conf -n 10.211.55.11:9876 &

    broker的启动我们先演示一张未启动成功的图片,未修改内存之前的启动:提示内存不足,相信不少人第一次下载启动的时候会遇到这种问题,因为官方默认的内存分配有点大。

    image.png

    正确的启动图展示:

    image.png


    RocketMQ namesrv和broker 停止语法

    # namesrv 停止语法
    > sh bin/mqshutdown namesrv
    # 输出如下内容,表示停止成功
    The mqnamesrv(16200) is running...
    Send shutdown request to mqnamesrv(16200) OK
    
    # ========华丽短分割线========
    
    # broker 停止语法
    > sh bin/mqshutdown broker
    # 输出如下内存,表示停止成功
    The mqbroker(16447) is running...
    Send shutdown request to mqbroker(16447) OK
    
    # ========华丽短分割线========
    
    # 查看broker启动后是否和namesrv链接成功
    > sh bin/mqadmin clusterList -n 10.211.55.11:9876
    #输出如下内容则表示成功,如果没有,则重启broker,带上-n ip:端口 :
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    #Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
    DefaultCluster    broker-a                0     10.211.55.11:10911     V4_8_0                   0.00(0,0ms)         0.00(0,0ms)          0 449820.93 0.1126


    重复一遍:我们使用后台启动方式将namesrv和broker都启动起来后,就可以正常使用了。

    # 后台启动namesrv
    > nohup sh bin/mqnamesrv &
    # 后台启动broker
    > nohup sh bin/mqbroker -c conf/broker.conf &
    
    #查看下进程是否正常,如下所示:
    > ps -ef|grep mq

    1.png


    测试消息发送和接收

    ### 运行示例程序,发送消息:
    # 我们设置一下namesrv_addr地址
    > export NAMESRV_ADDR=localhost:9876
    # 设置成功后,我们执行生产者进行消息生产
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 
    # 接下来我们会看到很多生产信息
    SendResult [sendStatus=SEND OK , msgid= 
    ......

    image.png

    我们再开一个窗口,也定位该目录。.

    ### 运行示例程序,接收消息:
    # 我们设置一下namesrv_addr地址
    > export NAMESRV_ADDR=localhost:9876
    # 设置成功后,我们执行消费者进行消息消息
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 
    # 接下来我们会看到很多消费信息
    ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broke
    ......

    以上流程都测试完毕了,如果没有问题,那么你所安装的RocketMQ已经具备生产消费的基本功能了,你也可以编写程序进行测试。

    下边我们给出两个Java类,来测试生产消息和消费消息。


    Producer 生产者:

    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import com.alibaba.fastjson.JSONObject;
    import com.sunjs.rocketmq.model.Users;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.Date;
    import java.util.UUID;
    
    /**
     * @Date 2021-04-20 15:40
     */
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("default_producter_group");
            producer.setNamesrvAddr("10.211.55.11:9876");
            producer.start();
    
            String body = "你好 RocketMQ";
            Message message = new Message("TopicTest", null, body.getBytes());
            message.setKeys(UUID.randomUUID().toString());
            SendResult sendResult = producer.send(message);
            System.out.printf("发送结果:%s%n", sendResult);
    
            producer.shutdown();
        }
    
    }
    控制台:
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000018D2618B4AAC27E7E1CEF0000, offsetMsgId=0AD3370B00002A9F00000000000F1275, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=817]


    Consumer 消费者:

    /**
                         .::::.
                       .::::::::.
                      :::::::::::    佛主保佑、永无Bug
                  ..:::::::::::'
                '::::::::::::'
                  .::::::::::
             '::::::::::::::..
                  ..::::::::::::.
                ``::::::::::::::::
                 ::::``:::::::::'        .:::.
                ::::'   ':::::'       .::::::::.
              .::::'      ::::     .:::::::'::::.
             .:::'       :::::  .:::::::::' ':::::.
            .::'        :::::.:::::::::'      ':::::.
           .::'         ::::::::::::::'         ``::::.
       ...:::           ::::::::::::'              ``::.
      ```` ':.          ':::::::::'                  ::::..
                         '.:::::'                    ':'````..
                
     */
    
    
    import com.alibaba.fastjson.JSON;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @Date 2021-04-20 15:47
     */
    public class SyncConsumer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");
            consumer.setNamesrvAddr("10.211.55.11:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicTest", "*");//订阅topic和tag设置
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    String body = new String(msgs.get(0).getBody());
                    System.out.printf("消息内容:%s%n", body);
                    System.out.printf("消息完整数据包:%s%n", msgs);
    
                    System.out.println(Thread.currentThread().getName()+" = "+msgs);
                    System.out.println(Thread.currentThread().getName()+" = "+body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    
    }
    控制台:
    消息内容:你好 RocketMQ
    消息完整数据包:[MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]
    ConsumeMessageThread_1 = [MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]
    ConsumeMessageThread_1 = 你好 RocketMQ


    因文章内容字数有限,特将文章进行拆分。

    下一篇: SpringBoot 集成 rocketmq-spring-boot-starter demo案例


0 +1

版权声明

 Java  扩展  rocketmq

 请文明留言

0 条评论