发布时间:2021-04-25作者:laosun阅读(15097)
RocketMQ 下载安装,RocketMQ配置单主模式,RocketMQ配置双主双从模式,RocketMQ配置双主模式,RocketMQ配置ACL权限,RocketMQ配置RocketMQ-Console,RocketMQ配置console管理后台的登录账户密码,RocketMQ在ACL权限下Console管理后台如何配置,修改RocketMQ的Namrsrv启动内存,修改RocketMQ的Broker启动内存
讲解目录
1:RocketMQ 下载安装启动测试,调整 namesrv 和 broker 启动内存等等,
2:SpringBoot 集成 RocketMQ,
3:RocketMQ配置双主模式 和 双主双从模式,
4:RocketMQ配置ACL权限,
5:RocketMQ配置RocketMQ-Console,
6:RocketMQ配置console管理后台的登录账户密码,
7:RocketMQ在ACL权限下Console管理后台如何配置,
本文不介绍各项服务是什么意思?是怎么互相通信的?只讲解如何安装配置。其他讲解文章待后续补充,先会用,先了解,再熟悉。
RocketMQ 下载安装
1、打开apache官网,Apache官网 ,下拉到底部,找到RocketMQ ,点击进去到主页,或者直接访问 RocketMQ 主页
可以看到最新版本为:release-notes-4.8.0 ,点击进去进行下载,如下图所示:
演示系统:linux CentOS系统「本地虚拟机」
下载:
如果想看源码的同学们,可以去github上clone一份 rocketmq - github源码地址
这里我们只演示编译后的下载安装方式
我们先进行下载,使用指令:
> cd /usr/local > wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
如下所示:
下载完成后,使用unzip进行解压,修改文件名称(博主嫌弃它太长)
# 解压 > unzip rocketmq-all-4.8.0-bin-release.zip # 修改名称 > mv rocketmq-all-4.8.0-bin-release rocketmq-4.8.0 # 删除刚下载的压缩文件 > rm -rf rocketmq-all-4.8.0-bin-release.zip
都准备完毕了,我们使用指令查看一下目录文件详情:
下边我们的操作根目录都在 /usr/local/rocketmq-4.8.0 目录下。
修改 namesrv 和 broker 启动内存:
# 查看 namesrv 启动内存给分配了多少。 > vim bin/runserver.sh # 编辑打开后,我们按shift+G,跳转到最后,我们可以看到如下图所示:
仅一个namesrv就给了4G内存,太大了,namesrv没必要给这么大,我觉得给个 512m 就足够了。
我们进行调整修改成如下所示:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
然后保存退出。
再来看一下broker的内存分配情况:
> vim bin/runbroker.sh # 编辑打开后,我们按shift+G,跳转到最后,我们可以看到如下图所示:
broker 分配内存为8G,我们调整成1G。
我们进行调整修改成如下所示:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
然后保存退出。
启动单主模式:
namesrv启动:
# 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动 # 1、启动 namesrv 语法,退出控制台就退出了 > sh bin/mqnamesrv # 2、后台启动 namesrv 语法 > nohup sh bin/mqnamesrv &
第 1 种语法:如下所示,表示启动成功:
第 2 种语法:
broker启动:
# 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动 # 1、启动 broker 语法,退出控制台就退出了 > sh bin/mqbroker -c conf/broker.conf # 2、后台启动 broker 语法 > nohup sh bin/mqbroker -c conf/broker.conf & # 或者增加namesrv地址(如果broker启动后,没有和namesrv链接成功),例如: > nohup sh bin/mqbroker -c conf/broker.conf -n 10.211.55.11:9876 &
broker的启动我们先演示一张未启动成功的图片,未修改内存之前的启动:提示内存不足,相信不少人第一次下载启动的时候会遇到这种问题,因为官方默认的内存分配有点大。
正确的启动图展示:
RocketMQ namesrv和broker 停止语法
# namesrv 停止语法 > sh bin/mqshutdown namesrv # 输出如下内容,表示停止成功 The mqnamesrv(16200) is running... Send shutdown request to mqnamesrv(16200) OK # ========华丽短分割线======== # broker 停止语法 > sh bin/mqshutdown broker # 输出如下内存,表示停止成功 The mqbroker(16447) is running... Send shutdown request to mqbroker(16447) OK # ========华丽短分割线======== # 查看broker启动后是否和namesrv链接成功 > sh bin/mqadmin clusterList -n 10.211.55.11:9876 #输出如下内容则表示成功,如果没有,则重启broker,带上-n ip:端口 : RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE DefaultCluster broker-a 0 10.211.55.11:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 449820.93 0.1126
重复一遍:我们使用后台启动方式将namesrv和broker都启动起来后,就可以正常使用了。
# 后台启动namesrv > nohup sh bin/mqnamesrv & # 后台启动broker > nohup sh bin/mqbroker -c conf/broker.conf & #查看下进程是否正常,如下所示: > ps -ef|grep mq
测试消息发送和接收
### 运行示例程序,发送消息: # 我们设置一下namesrv_addr地址 > export NAMESRV_ADDR=localhost:9876 # 设置成功后,我们执行生产者进行消息生产 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 接下来我们会看到很多生产信息 SendResult [sendStatus=SEND OK , msgid= ......
我们再开一个窗口,也定位该目录。.
### 运行示例程序,接收消息: # 我们设置一下namesrv_addr地址 > export NAMESRV_ADDR=localhost:9876 # 设置成功后,我们执行消费者进行消息消息 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer # 接下来我们会看到很多消费信息 ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broke ......
以上流程都测试完毕了,如果没有问题,那么你所安装的RocketMQ已经具备生产消费的基本功能了,你也可以编写程序进行测试。
下边我们给出两个Java类,来测试生产消息和消费消息。
Producer 生产者:
/** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import com.alibaba.fastjson.JSONObject; import com.sunjs.rocketmq.model.Users; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.util.Date; import java.util.UUID; /** * @Date 2021-04-20 15:40 */ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("default_producter_group"); producer.setNamesrvAddr("10.211.55.11:9876"); producer.start(); String body = "你好 RocketMQ"; Message message = new Message("TopicTest", null, body.getBytes()); message.setKeys(UUID.randomUUID().toString()); SendResult sendResult = producer.send(message); System.out.printf("发送结果:%s%n", sendResult); producer.shutdown(); } }
控制台: RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. 发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000018D2618B4AAC27E7E1CEF0000, offsetMsgId=0AD3370B00002A9F00000000000F1275, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=817]
Consumer 消费者:
/** .::::. .::::::::. ::::::::::: 佛主保佑、永无Bug ..:::::::::::' '::::::::::::' .:::::::::: '::::::::::::::.. ..::::::::::::. ``:::::::::::::::: ::::``:::::::::' .:::. ::::' ':::::' .::::::::. .::::' :::: .:::::::'::::. .:::' ::::: .:::::::::' ':::::. .::' :::::.:::::::::' ':::::. .::' ::::::::::::::' ``::::. ...::: ::::::::::::' ``::. ```` ':. ':::::::::' ::::.. '.:::::' ':'````.. */ import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @Date 2021-04-20 15:47 */ public class SyncConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group"); consumer.setNamesrvAddr("10.211.55.11:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*");//订阅topic和tag设置 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { String body = new String(msgs.get(0).getBody()); System.out.printf("消息内容:%s%n", body); System.out.printf("消息完整数据包:%s%n", msgs); System.out.println(Thread.currentThread().getName()+" = "+msgs); System.out.println(Thread.currentThread().getName()+" = "+body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
控制台: 消息内容:你好 RocketMQ 消息完整数据包:[MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]] ConsumeMessageThread_1 = [MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]] ConsumeMessageThread_1 = 你好 RocketMQ
因文章内容字数有限,特将文章进行拆分。
下一篇: SpringBoot 集成 rocketmq-spring-boot-starter demo案例
版权属于: 技术客
原文地址: https://www.sunjs.com/article/detail/6d8877b9d86745fdb8b520691f96b3c7.html
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。