发布时间:2021-04-25作者:laosun阅读(3131)
内含 同步消息、异步消息、顺序消息、单向消息、事务消息、广播模式、发送tag消息、延迟消息、消息内含自定义key值,消息体为java实体对象等等测试案例
今天博主整理了一下SpringBoot集成 rocketmq-spring-boot-starter demo案例,能想起来的发送方式都集成进去了。
下面进入正题,我们使用Idea创建个SpringBoot项目
创建完成后,pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sunjs.rocketmq</groupId> <artifactId>RocketMQ-SpringBoot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RocketMQ-SpringBoot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 我们新增几个jar包,后边案例会用到 --> <!--新增rocketmq-stringboot插件包,目前最新版本是:2.2.0 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency> <!-- lombok 插件 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!-- 阿里巴巴 fastJson 包 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.70</version> </dependency> <!-- hutool工具包 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.6.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
建议先弄个Controller类,看下服务启动,是否一切都正常。这里博主就不演示了
在SpringBoot启动类增加包扫描
package com.sunjs.rocketmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication // 增加包扫描 @ComponentScan(basePackages = {"com.sunjs.rocketmq"}) public class RocketMqSpringBootApplication { public static void main(String[] args) { SpringApplication.run(RocketMqSpringBootApplication.class, args); } }
application.properties 配置文件增加rocketmq的配置信息,如下所示:
以下内容测试阶段,可以只修改rocketmq.name-server配置信息,修改成你自己的ip
# 服务端口 server.port=8080 ## RocketMQ 配置文件 rocketmq.name-server=10.211.55.11:9876 # Access Key,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 # rocketmq.producer.access-key= # Secret Key # rocketmq.producer.secret-key= rocketmq.producer.group=default-producer # 发送消息超时时间,单位:毫秒。默认为3000 rocketmq.producer.send-message-timeout=3000 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B rocketmq.producer.compress-message-body-threshold=4096 # 消息体的最大允许大小。默认为4 * 1024 * 1024B rocketmq.producer.max-message-size=4194304 # 同步发送消息时,失败重试次数。默认为2次 rocketmq.producer.retry-times-when-send-failed=2 # 异步发送消息时,失败重试次数。默认为2次 rocketmq.producer.retry-times-when-send-async-failed=2 # 发送消息给broker时,如果发送失败,是否重试另外一台broker。默认为false rocketmq.producer.retry-next-server=false # 是否开启消息轨迹功能。默认为true开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 rocketmq.producer.enable-msg-trace=true # 自定义消息轨迹的Topic。默认为RMQ_SYS_TRACE_TOPIC rocketmq.producer.customized-trace-topic=PMQ_SYS_TRACE_TOPIC
消费者抽象类_BaseConsumer<T>,方便封装。
package com.sunjs.rocketmq.consumer; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONValidator; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.MessageExt; /** * 消费者抽象父类 * * @Date 2021/4/25 2:51 下午 */ @Slf4j public abstract class _BaseConsumer<T> { protected T body; /** * 校验 RocketMQ Message * * @Author: sun * @Date: 2021/4/23 11:16 上午 */ protected boolean isVerify(MessageExt message, Class<T> clazz) { // log.info("MQ消费者 - 接收到的数据:{}", message); String body = new String(message.getBody()); if (StringUtils.isBlank(body)) { log.info("MQ消息体为空"); return false; } // log.info("MQ消费者 - 接收到的消息内容:{}", body); JSONValidator a = JSONValidator.from(body); if (a.validate()) { this.body = JSON.parseObject(body, clazz); } else { this.body = (T) body; } return true; } /** * MQ 消息执行公共处理部分 * * @Author: sun * @Date: 2021/4/25 3:28 下午 */ protected void todo() { // 加锁 or 其他处理 handle(); } /** * 抽象钩子算法 * * @Author: sun * @Date: 2021/4/25 3:27 下午 */ protected abstract void handle(); }
创建个简单的消费者接收:
StringConsumer
package com.sunjs.rocketmq.consumer.basic; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import com.sunjs.rocketmq.common.MqConfig; import com.sunjs.rocketmq.consumer._BaseConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 测试消费端接收String字符串 * * @Date 2021/4/25 2:48 下午 */ @Slf4j @Service @RocketMQMessageListener( topic = MqConfig.Topic.TOPICSTRING, //topic consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICSTRING, //分组规则,Group-"topic命名" //下边可以去掉,都使用的是默认值 consumeMode = ConsumeMode.CONCURRENTLY, //默认值,并发接收模式 messageModel = MessageModel.CLUSTERING, //默认值,集群模式 selectorType = SelectorType.TAG, // 默认值,标签 selectorExpression = "*" //默认值,匹配该topic下所有tag ) public class StringConsumer extends _BaseConsumer<String> implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { log.info("消费者接收:Topic:{}", message.getTopic()); if (!isVerify(message, String.class)) { return; } // 执行MQ 消息公共处理 todo(); } /** * 具体业务处理 * * @Author: sun * @Date: 2021/4/25 3:29 下午 */ @Override protected void handle() { // TOOD 业务处理 log.info("业务处理:消息内容:{}", body); } }
创建个生产者进行消息发送:
package com.sunjs.rocketmq.producer.controller; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.RandomUtil; import com.sunjs.rocketmq.common.MqConfig; import com.sunjs.rocketmq.model.Users; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; /** * RocketMQ 消息生产者测试Controller * * @Date 2021/4/25 3:55 下午 */ @Slf4j @RestController @RequestMapping("/rocketmq/producer") public class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; /** * basic:MQ 发送字符串消息 * * @param msg : 发送内容 * @return java.lang.Object * @throws * @Date 2021/4/25 3:58 下午 */ @RequestMapping("/sendString") public Object sendString(@RequestParam(value = "msg", required = true, defaultValue = "你好,RocketMQ") String msg) { // 同步发送 SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICSTRING, msg); log.info("MQ发送结果:{}", sendResult); return sendResult; } /** * basic:发送User对象 * * @return java.lang.Object * @throws * @Date 2021/4/25 3:59 下午 */ @RequestMapping("/testSendUserModel") public Object testSendUserModel() { // 构造Users对象 Users users = Users.builder().id(1).name("sunjs").registerTime(DateUtil.date()).build(); // 同步发送 SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERS, users); log.info("MQ发送结果:{}", sendResult); return sendResult; } /** * consumemodel:测试有序消息队列 * * @return java.lang.Object * @throws * @Date 2021/4/25 5:07 下午 */ @RequestMapping("/testOrderly") public Object testOrderly() { // 循环构造对象 for (int i = 1; i <= 30; i++) { // 将循环下标当成用户id Users users = Users.builder().id(i).name(RandomUtil.randomNumbers(4)).build(); // 同步发送 SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConfig.Topic.TOPICUSERSORDERLY, users, "hashkey"); // 屏蔽日志输出,方便监控消费者日志输出 // log.info("MQ发送结果:{}", sendResult); } return "有序批量发送完成"; } /** * consumemodel:测试无序消息队列 * * @return java.lang.Object * @throws * @Date 2021/4/25 5:10 下午 */ @RequestMapping("/testConcurrently") public Object testConcurrently() { // 循环构造对象 for (int i = 1; i <= 30; i++) { // 将循环下标当成用户id Users users = Users.builder().id(i).build(); // 同步发送 SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERSCONCURRENTLY, users); // 屏蔽日志输出,方便监控消费者日志输出 // log.info("MQ发送结果:{}", sendResult); } return "无序批量发送完成"; } /** * RocketMQ 发送带有tag的消息 * * @return java.lang.Object * @throws * @Date 2021/4/25 5:19 下午 */ @RequestMapping("/tag") public Object tag() { // 发送tag1消息 SendResult sendResult1 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag1", "这是一条字符串消息,给tag1的"); log.info("tag1发送结果:{}", sendResult1); // 发送tag2消息 SendResult sendResult2 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag2", "这是一条字符串消息,给tag2的"); log.info("tag2发送结果:{}", sendResult2); return "发送完成"; } /** * broadcasting 测试广播发送消息 * 所有订阅该Topic的消费者都会收到通知 * * @return java.lang.Object * @throws * @Date 2021/4/25 6:23 下午 */ @RequestMapping("/sendBroadcasting") public Object sendBroadcasting() { SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICBROADCASTING, "<广播消息>"); return sendResult; } /** * 其他消息语法 * * @return java.lang.Object * @throws * @Date 2021/4/25 5:46 下午 */ @RequestMapping("/other") public Object other() { // 发送单向消息 // 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功 // 此方式发送消息的过程耗时非常短,一般在微秒级别,无返回值 // 不建议使用 // rocketMQTemplate.sendOneWay(MqConfig.Topic.TOPICOTHER, "这是一条单向消息XXXXXXXX"); // 延迟消息 // 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h (不允许自定义) // 如果我们要延迟30秒,则设置为4即可 // 设置延迟10秒,解释:2021-04-25 18:06:30 发送成功,那么消费是在 2021-04-25 18:06:40 // 第三个参数3000表示:发送超时时间,默认好像是3秒 // Message<String> message = MessageBuilder.withPayload("这是延迟消息内容").build(); // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message, 3000, 3); // log.info("延迟消息发送结果:{}", sendResult); // 异步消息 // rocketMQTemplate.asyncSend(MqConfig.Topic.TOPICOTHER, "这是异步消息", new SendCallback() { // @Override // public void onSuccess(SendResult sendResult) { // log.info("异步消息发送成功:{}", sendResult); // } // @Override // public void onException(Throwable e) { // log.info("异步消息发送失败:{}", e.getMessage()); // } // }); // 发送带有自定义key值的消息(在RocketMQ-console管理后台中可以根据这个key值进行查询) // 不过使用messageId也是可以的,建议每次发送完成后和消费者接收后 都打印出mq消息的必要信息,方便查询信息 // Message<String> message = MessageBuilder.withPayload("带有key值的消息").setHeader(MessageConst.PROPERTY_KEYS, "123456789").build(); // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message); // log.info("带有key值消息发送结果:{}", sendResult); return "发送完成"; } /** * 测试发送事务消息 * * @param type : rollback:表示业务处理出现异常,直接回滚MQ消息; * submitMqFail:表示业务处理事务刚提交处理完成,在发送确认MQ消息之前系统挂了; * rewardCouponFail:表示发券过程中挂了,事务还未提交; * 其他字符:业务处理完成,通知MQ成功 * @return java.lang.Object * @throws * @Date 2021/4/25 6:43 下午 */ @RequestMapping("/sendTransaction") public Object sendTransaction(String type) { // 所谓事务消息应该理解成 MQ的事务,而不是开发人员所理解的事务,给你数据库进行回滚之类的。这里的事务是保证消息能正常发送和消费,是这个事务 // 发送事务MQ,应该先发送MQ消息,再进行业务处理。 // 发送半消息 // 创建个用户对象,带到事务处理中 Users users = Users.builder().build(); users.setId(1); users.setName("RocketMQ"); users.setRegisterTime(DateUtil.date()); // 使用map作为消息类型 Map<String, Object> paramMap = new HashMap<>(); paramMap.put("type", type); paramMap.put("other", "这是其他信息"); // 模拟场景:比如在用户注册、或者下单成功后,我们需要给用户发送一张券,并且发送短信通知用户 // 关键点1:发券和通知都是异步处理 // 关键点2:发券和通知都必须要成功。发券不成功的话,短信一定不要发送。发券成功的话,短信也一定要发送。(不考虑短信发送失败,这个是消费者落地后的操作) // 流程1:注册或者下单成功后,立即发送MQ消息,这个MQ消息就是告诉RocketMQ,我是一条事务消息,待确认消息,该消息RocketMQ不会去消费,而是处于等待过程中。 // 流程2:流程1发送的这条MQ消息的本地事务(executeLocalTransaction),我们在这里进行发券操作,这个业务处理尽量在一个事务中处理,处理完成后,进行事务提交、或者回滚等操作 // 如果流程2返回的是提交操作,那么流程1所发的待确认消息将会通知消费者进行消费,反之如果是回滚操作,则该消息将会被删除。 // 如果流程2处理过程中,服务器挂了,或者其他任何不可控因素,导致流程2最后没有提交或者回滚操作。我们这里使用unknown状态来模拟服务器挂掉 // 那么RocketMQ 将会调用checkLocalTransaction方法进行回查,回查频率为:1分钟回查一次,默认回查15次 Message<Map<String, Object>> message = MessageBuilder.withPayload(paramMap).build(); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(MqConfig.Topic.topic_transaction_reward_coupon, message, users); log.info("事务消息发送结果:{}", transactionSendResult); return transactionSendResult; } }
MqConfig 常量配置类:
package com.sunjs.rocketmq.common; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ /** * MQ 常量配置 * * @Date 2021/4/25 3:34 下午 */ public class MqConfig { /** * RocketMQ Topic 前缀 */ @Deprecated public static final String TOPIC_PREFFIX = "Topic-"; /** * RocketMQ Group 前缀 */ public static final String GROUP_PREFFIX = "Group-"; /** * Topic 常量配置 * * @Author: sun * @Date: 2021/4/25 3:36 下午 */ public static class Topic { /** Topic 发送 String 字符串 **/ public static final String TOPICSTRING = "TopicString"; /** Topic 发送 Users Object **/ public static final String TOPICUSERS = "TopicUsers"; /** Topic 发送 Users 顺序单队列 **/ public static final String TOPICUSERSORDERLY = "TopicUsersOrderLy"; /** Topic 发送 Users 无序多线程接收 **/ public static final String TOPICUSERSCONCURRENTLY = "TopicUsersConcurrently"; /** Topic 发送 含 tag 标签的消息 **/ public static final String TOPICTAG = "TopicTag"; /** 广播消息 **/ public static final String TOPICBROADCASTING = "TopicBroadcasting"; /** 其他消息语法接收 **/ public static final String TOPICOTHER = "TopicOther"; /** 模拟创建订单事务消息 **/ public static final String topic_transaction_reward_coupon = "topic_transaction_createorder"; } }
测试发送消息,控制台输出如下:
2021-04-25 21:24:10.752 INFO 47569 --- [nio-8080-exec-2] c.s.r.p.controller.ProducerController : MQ发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001B9D118B4AAC28030537E0007, offsetMsgId=0AD3370B00002A9F0000000000148565, messageQueue=MessageQueue [topic=TopicString, brokerName=broker-a, queueId=1], queueOffset=2] 2021-04-25 21:24:10.755 INFO 47569 --- [MessageThread_2] c.s.r.consumer.basic.StringConsumer : 消费者接收:Topic:TopicString 2021-04-25 21:24:10.756 INFO 47569 --- [MessageThread_2] c.s.r.consumer.basic.StringConsumer : 业务处理:消息内容:你好,RocketMQ
事务消息监听类:
ExtTransactionListener
package com.sunjs.rocketmq.listener; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * 自定义事务监听类 * 自 rocketmq-spring-boot-starter 2.0还是2.1之后,该监听类不允许创建多个,只允许有一个 * * @Date 2021/4/25 6:36 下午 */ @Slf4j @RocketMQTransactionListener public class ExtTransactionListener implements RocketMQLocalTransactionListener { private static Set<String> transactionList = new HashSet<>(); /** * COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息; * ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息; * UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法。 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) { log.info("本地事务开始执行。Message对象数据:{},Object对象数据:{}", message, obj); log.info("开始执行发券操作"); MessageHeaders headers = message.getHeaders(); // 获取事务ID String transactionId = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID, String.class); // 消息主题 String topic = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC, String.class); // 消息key String keys = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.KEYS, String.class); // 消息体内容 String body = new String((byte[]) message.getPayload()); try { Map<String, Object> bodyMap = JSON.parseObject(body, Map.class); String type = bodyMap.get("type").toString(); // TODO 调用发券业务逻辑,发券业务逻辑理论上应该是事务方法 @Transactional,若失败则进行发券回滚,此处拦截也进行拦截 // TODO 如果发券成功,在同一事务中建议将transaction_id保存一下,可以保存到数据库 // 此处我们创建个表来存储事务消息,方便checkLocalTransaction回查 // 表结构类似如下: // CREATE TABLE `d_test_weiif`.`无标题` ( // `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id', // `transaction_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息事务id(消息id)', // `topic` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息主题', // `keys` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息key', // `body` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '消息体内容,json格式', // `add_time` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '添加时间', // `add_user_id` bigint(20) NULL DEFAULT NULL COMMENT '添加人', // PRIMARY KEY (`id`) USING BTREE // ) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'RocketMQ 事务消息日志表' ROW_FORMAT = Compact; //其实只需要transaction_id即可,如果回查时能根据transaction_id回查到,则证明该消息正常,执行commit操作 // 这里我们使用数据结构Set代替一下数据库进行存储 if ("rollback".equals(type)) { //模拟执行发券失败 Integer.valueOf("a"); return RocketMQLocalTransactionState.COMMIT; } else if ("submitMqFail".equals(type)) { // 模拟服务器挂掉,挂掉时间为事务已经执行完成,但是MQ没有commit,这种情况下,RocketMQ会发起回查,调用 checkLocalTransaction() transactionList.add(transactionId); // 表示事务提交成功,但是MQ消息没有发出去 log.info("发券成功,提交MQ确认消息失败"); return RocketMQLocalTransactionState.UNKNOWN; } else if ("rewardCouponFail".equals(type)) { // 模拟服务器挂掉,挂掉时间为发券过程中,事务还未提交,但是MQ没有commit,这种情况下,RocketMQ会发起回查,调用 checkLocalTransaction() log.info("发券中,哦~~~ 服务器挂掉了"); return RocketMQLocalTransactionState.UNKNOWN; } else { // 发券成功,MQ消息确认成功 log.info("发券成功,已经通知MQ进行短信发送"); return RocketMQLocalTransactionState.COMMIT; } } catch (Exception e) { // 出现异常,MQ消息进行回滚 log.info("发券出现异常,事务回滚,MQ消息回滚,异常信息:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 回查轮询次数说明:1分钟回查一次,默认回查15次 * * @Author: sun * @Date: 2021/4/23 6:00 下午 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { MessageHeaders headers = message.getHeaders(); // 获取事务ID String transactionId = (String) headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID); log.info("RocketMQ发起回查,回查事务消息ID:{}", transactionId); log.info("RocketMQ发起回查,回查事务消息数据:{}", message); try { // TODO 查询表中是否存在该事务ID,如果存在,则表示发券逻辑正常执行,如果不存在,则视情况程序员返回值而定, 是回滚还是继续回查 if (transactionList.contains(transactionId)) { // 查询到,返回给RocketMQ消息确认,此时RocketMQ消费者会进行消息消费操作 log.info("回查确认发券成功,准备通知MQ进行短信发送"); return RocketMQLocalTransactionState.COMMIT; } else { // 未查询到,表示消息不存在,发券逻辑失败等情况,则进行回滚 log.info("回查发现发券失败,MQ消息事务回滚"); return RocketMQLocalTransactionState.ROLLBACK; // 如果此处设置成 RocketMQLocalTransactionState.UNKNOWN,则RocketMQ还会继续回查 } } catch (Exception e) { // 查询期间出现异常,则继续下次回查 e.printStackTrace(); log.error("RocketMQ发起回查,回查事务消息ID:{},回查出现异常,异常信息:{}", transactionId, e.getMessage()); return RocketMQLocalTransactionState.UNKNOWN; } } }
测试发送对象消息:
UsersConsumer
package com.sunjs.rocketmq.consumer.basic; /** * .::::. * .::::::::. * ::::::::::: 佛主保佑、永无Bug * ..:::::::::::' * '::::::::::::' * .:::::::::: * '::::::::::::::.. * ..::::::::::::. * ``:::::::::::::::: * ::::``:::::::::' .:::. * ::::' ':::::' .::::::::. * .::::' :::: .:::::::'::::. * .:::' ::::: .:::::::::' ':::::. * .::' :::::.:::::::::' ':::::. * .::' ::::::::::::::' ``::::. * ...::: ::::::::::::' ``::. * ```` ':. ':::::::::' ::::.. * '.:::::' ':'````.. */ import cn.hutool.core.date.DateUtil; import com.sunjs.rocketmq.common.MqConfig; import com.sunjs.rocketmq.consumer._BaseConsumer; import com.sunjs.rocketmq.model.Users; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 测试消费端接收对象 * * @Date 2021/4/25 2:48 下午 */ @Slf4j @Service @RocketMQMessageListener( topic = MqConfig.Topic.TOPICUSERS, //topic consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERS, //分组规则,Group-"topic命名" //下边可以去掉,都使用的是默认值 consumeMode = ConsumeMode.CONCURRENTLY, //默认值,并发接收模式 messageModel = MessageModel.CLUSTERING, //默认值,集群模式 selectorType = SelectorType.TAG, // 默认值,标签 selectorExpression = "*" // 默认值,匹配该topic下所有tag ) public class UsersConsumer extends _BaseConsumer<Users> implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { log.info("消费者接收:Topic:{}", message.getTopic()); if (!isVerify(message, Users.class)) { return; } // 执行MQ 消息公共处理 todo(); } /** * 具体业务处理 * * @Author: sun * @Date: 2021/4/25 3:29 下午 */ @Override protected void handle() { // TOOD 业务处理 log.info("业务处理:用户ID:{}", body.getId()); log.info("业务处理:用户姓名:{}", body.getName()); log.info("业务处理:用户注册时间:{}", DateUtil.format(body.getRegisterTime(), "yyyy-MM-dd HH:mm:ss")); } }
这里就不一一演示了,代码太多,文章超长。
具体请看 git 代码
下一篇: RocketMQ配置console控制台,配置登录账户密码
版权属于: 技术客
原文地址: https://www.sunjs.com/article/detail/82b49d19d2034c969f485b05ea530715.html
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
关键字: Java 源码 springboot rocketmq