SpringBoot 集成 rocketmq-spring-boot-starter demo案例

发布时间:2021-04-25作者:laosun阅读(3577)

SpringBoot

内含 同步消息、异步消息、顺序消息、单向消息、事务消息、广播模式、发送tag消息、延迟消息、消息内含自定义key值,消息体为java实体对象等等测试案例

    今天博主整理了一下SpringBoot集成 rocketmq-spring-boot-starter demo案例,能想起来的发送方式都集成进去了。

    下面进入正题,我们使用Idea创建个SpringBoot项目

    1.png

    2.png

    创建完成后,pom.xml 文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.4.5</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.sunjs.rocketmq</groupId>
        <artifactId>RocketMQ-SpringBoot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>RocketMQ-SpringBoot</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            
            <!-- 我们新增几个jar包,后边案例会用到 -->
            <!--新增rocketmq-stringboot插件包,目前最新版本是:2.2.0 -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!-- lombok 插件 -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
                <scope>provided</scope>
            </dependency>
            <!-- 阿里巴巴 fastJson 包 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.70</version>
            </dependency>
            <!-- hutool工具包 -->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>4.6.6</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>


    建议先弄个Controller类,看下服务启动,是否一切都正常。这里博主就不演示了


    在SpringBoot启动类增加包扫描

    package com.sunjs.rocketmq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.ComponentScan;
    
    @SpringBootApplication
    // 增加包扫描
    @ComponentScan(basePackages = {"com.sunjs.rocketmq"})
    public class RocketMqSpringBootApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RocketMqSpringBootApplication.class, args);
        }
    
    }


    application.properties 配置文件增加rocketmq的配置信息,如下所示:

    以下内容测试阶段,可以只修改rocketmq.name-server配置信息,修改成你自己的ip

    # 服务端口
    server.port=8080
    
    ## RocketMQ 配置文件
    rocketmq.name-server=10.211.55.11:9876
    # Access Key,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    # rocketmq.producer.access-key=
    # Secret Key
    # rocketmq.producer.secret-key=
    rocketmq.producer.group=default-producer
    # 发送消息超时时间,单位:毫秒。默认为3000
    rocketmq.producer.send-message-timeout=3000
    # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    rocketmq.producer.compress-message-body-threshold=4096
    # 消息体的最大允许大小。默认为4 * 1024 * 1024B
    rocketmq.producer.max-message-size=4194304
    # 同步发送消息时,失败重试次数。默认为2次
    rocketmq.producer.retry-times-when-send-failed=2
    # 异步发送消息时,失败重试次数。默认为2次
    rocketmq.producer.retry-times-when-send-async-failed=2
    # 发送消息给broker时,如果发送失败,是否重试另外一台broker。默认为false
    rocketmq.producer.retry-next-server=false
    # 是否开启消息轨迹功能。默认为true开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    rocketmq.producer.enable-msg-trace=true
    # 自定义消息轨迹的Topic。默认为RMQ_SYS_TRACE_TOPIC
    rocketmq.producer.customized-trace-topic=PMQ_SYS_TRACE_TOPIC



    消费者抽象类_BaseConsumer<T>,方便封装。

    package com.sunjs.rocketmq.consumer;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONValidator;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * 消费者抽象父类
     *
     * @Date 2021/4/25 2:51 下午
     */
    @Slf4j
    public abstract class _BaseConsumer<T> {
    
        protected T body;
    
        /**
         * 校验 RocketMQ Message
         *
         * @Author: sun
         * @Date: 2021/4/23 11:16 上午
         */
        protected boolean isVerify(MessageExt message, Class<T> clazz) {
            // log.info("MQ消费者 - 接收到的数据:{}", message);
            String body = new String(message.getBody());
            if (StringUtils.isBlank(body)) {
                log.info("MQ消息体为空");
                return false;
            }
            // log.info("MQ消费者 - 接收到的消息内容:{}", body);
    
            JSONValidator a = JSONValidator.from(body);
            if (a.validate()) {
                this.body = JSON.parseObject(body, clazz);
            } else {
                this.body = (T) body;
            }
            return true;
        }
    
        /**
         * MQ 消息执行公共处理部分
         *
         * @Author: sun
         * @Date: 2021/4/25 3:28 下午
         */
        protected void todo() {
            // 加锁 or 其他处理
            handle();
        }
    
        /**
         * 抽象钩子算法
         *
         * @Author: sun
         * @Date: 2021/4/25 3:27 下午
         */
        protected abstract void handle();
    
    }


    创建个简单的消费者接收:

    StringConsumer

    package com.sunjs.rocketmq.consumer.basic;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import com.sunjs.rocketmq.common.MqConfig;
    import com.sunjs.rocketmq.consumer._BaseConsumer;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.annotation.SelectorType;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 测试消费端接收String字符串
     *
     * @Date 2021/4/25 2:48 下午
     */
    @Slf4j
    @Service
    @RocketMQMessageListener(
            topic = MqConfig.Topic.TOPICSTRING, //topic
            consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICSTRING, //分组规则,Group-"topic命名"
            //下边可以去掉,都使用的是默认值
            consumeMode = ConsumeMode.CONCURRENTLY, //默认值,并发接收模式
            messageModel = MessageModel.CLUSTERING, //默认值,集群模式
            selectorType = SelectorType.TAG, // 默认值,标签
            selectorExpression = "*" //默认值,匹配该topic下所有tag
    )
    public class StringConsumer extends _BaseConsumer<String> implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            log.info("消费者接收:Topic:{}", message.getTopic());
            if (!isVerify(message, String.class)) {
                return;
            }
            // 执行MQ 消息公共处理
            todo();
        }
    
        /**
         * 具体业务处理
         *
         * @Author: sun
         * @Date: 2021/4/25 3:29 下午
         */
        @Override
        protected void handle() {
            // TOOD 业务处理
            log.info("业务处理:消息内容:{}", body);
        }
    }


    创建个生产者进行消息发送:

    package com.sunjs.rocketmq.producer.controller;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import cn.hutool.core.date.DateUtil;
    import cn.hutool.core.util.RandomUtil;
    import com.sunjs.rocketmq.common.MqConfig;
    import com.sunjs.rocketmq.model.Users;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * RocketMQ 消息生产者测试Controller
     *
     * @Date 2021/4/25 3:55 下午
     */
    @Slf4j
    @RestController
    @RequestMapping("/rocketmq/producer")
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * basic:MQ 发送字符串消息
         *
         * @param msg : 发送内容
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 3:58 下午
         */
        @RequestMapping("/sendString")
        public Object sendString(@RequestParam(value = "msg", required = true, defaultValue = "你好,RocketMQ") String msg) {
            // 同步发送
            SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICSTRING, msg);
            log.info("MQ发送结果:{}", sendResult);
            return sendResult;
        }
    
        /**
         * basic:发送User对象
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 3:59 下午
         */
        @RequestMapping("/testSendUserModel")
        public Object testSendUserModel() {
            // 构造Users对象
            Users users = Users.builder().id(1).name("sunjs").registerTime(DateUtil.date()).build();
            // 同步发送
            SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERS, users);
            log.info("MQ发送结果:{}", sendResult);
            return sendResult;
        }
    
        /**
         * consumemodel:测试有序消息队列
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 5:07 下午
         */
        @RequestMapping("/testOrderly")
        public Object testOrderly() {
            // 循环构造对象
            for (int i = 1; i <= 30; i++) {
                // 将循环下标当成用户id
                Users users = Users.builder().id(i).name(RandomUtil.randomNumbers(4)).build();
                // 同步发送
                SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConfig.Topic.TOPICUSERSORDERLY, users, "hashkey");
                // 屏蔽日志输出,方便监控消费者日志输出
                // log.info("MQ发送结果:{}", sendResult);
            }
            return "有序批量发送完成";
        }
    
        /**
         * consumemodel:测试无序消息队列
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 5:10 下午
         */
        @RequestMapping("/testConcurrently")
        public Object testConcurrently() {
            // 循环构造对象
            for (int i = 1; i <= 30; i++) {
                // 将循环下标当成用户id
                Users users = Users.builder().id(i).build();
                // 同步发送
                SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICUSERSCONCURRENTLY, users);
                // 屏蔽日志输出,方便监控消费者日志输出
                // log.info("MQ发送结果:{}", sendResult);
            }
            return "无序批量发送完成";
        }
    
        /**
         * RocketMQ 发送带有tag的消息
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 5:19 下午
         */
        @RequestMapping("/tag")
        public Object tag() {
            // 发送tag1消息
            SendResult sendResult1 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag1", "这是一条字符串消息,给tag1的");
            log.info("tag1发送结果:{}", sendResult1);
            // 发送tag2消息
            SendResult sendResult2 = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICTAG + ":tag2", "这是一条字符串消息,给tag2的");
            log.info("tag2发送结果:{}", sendResult2);
            return "发送完成";
        }
    
        /**
         * broadcasting 测试广播发送消息
         * 所有订阅该Topic的消费者都会收到通知
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 6:23 下午
         */
        @RequestMapping("/sendBroadcasting")
        public Object sendBroadcasting() {
            SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICBROADCASTING, "<广播消息>");
            return sendResult;
        }
    
        /**
         * 其他消息语法
         *
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 5:46 下午
         */
        @RequestMapping("/other")
        public Object other() {
            // 发送单向消息
            // 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
            // 此方式发送消息的过程耗时非常短,一般在微秒级别,无返回值
            // 不建议使用
            // rocketMQTemplate.sendOneWay(MqConfig.Topic.TOPICOTHER, "这是一条单向消息XXXXXXXX");
    
    
            // 延迟消息
            // 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h (不允许自定义)
            // 如果我们要延迟30秒,则设置为4即可
            // 设置延迟10秒,解释:2021-04-25 18:06:30 发送成功,那么消费是在 2021-04-25 18:06:40
            // 第三个参数3000表示:发送超时时间,默认好像是3秒
            // Message<String> message = MessageBuilder.withPayload("这是延迟消息内容").build();
            // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message, 3000, 3);
            // log.info("延迟消息发送结果:{}", sendResult);
    
    
            // 异步消息
            // rocketMQTemplate.asyncSend(MqConfig.Topic.TOPICOTHER, "这是异步消息", new SendCallback() {
            //     @Override
            //     public void onSuccess(SendResult sendResult) {
            //         log.info("异步消息发送成功:{}", sendResult);
            //     }
            //     @Override
            //     public void onException(Throwable e) {
            //         log.info("异步消息发送失败:{}", e.getMessage());
            //     }
            // });
    
    
            // 发送带有自定义key值的消息(在RocketMQ-console管理后台中可以根据这个key值进行查询)
            // 不过使用messageId也是可以的,建议每次发送完成后和消费者接收后 都打印出mq消息的必要信息,方便查询信息
            // Message<String> message = MessageBuilder.withPayload("带有key值的消息").setHeader(MessageConst.PROPERTY_KEYS, "123456789").build();
            // SendResult sendResult = rocketMQTemplate.syncSend(MqConfig.Topic.TOPICOTHER, message);
            // log.info("带有key值消息发送结果:{}", sendResult);
    
            return "发送完成";
        }
    
        /**
         * 测试发送事务消息
         *
         * @param type : rollback:表示业务处理出现异常,直接回滚MQ消息;
         *             submitMqFail:表示业务处理事务刚提交处理完成,在发送确认MQ消息之前系统挂了;
         *             rewardCouponFail:表示发券过程中挂了,事务还未提交;
         *             其他字符:业务处理完成,通知MQ成功
         * @return java.lang.Object
         * @throws
         * @Date 2021/4/25 6:43 下午
         */
        @RequestMapping("/sendTransaction")
        public Object sendTransaction(String type) {
            // 所谓事务消息应该理解成 MQ的事务,而不是开发人员所理解的事务,给你数据库进行回滚之类的。这里的事务是保证消息能正常发送和消费,是这个事务
            // 发送事务MQ,应该先发送MQ消息,再进行业务处理。
            // 发送半消息
    
            // 创建个用户对象,带到事务处理中
            Users users = Users.builder().build();
            users.setId(1);
            users.setName("RocketMQ");
            users.setRegisterTime(DateUtil.date());
    
            // 使用map作为消息类型
            Map<String, Object> paramMap = new HashMap<>();
            paramMap.put("type", type);
            paramMap.put("other", "这是其他信息");
    
            // 模拟场景:比如在用户注册、或者下单成功后,我们需要给用户发送一张券,并且发送短信通知用户
            // 关键点1:发券和通知都是异步处理
            // 关键点2:发券和通知都必须要成功。发券不成功的话,短信一定不要发送。发券成功的话,短信也一定要发送。(不考虑短信发送失败,这个是消费者落地后的操作)
            // 流程1:注册或者下单成功后,立即发送MQ消息,这个MQ消息就是告诉RocketMQ,我是一条事务消息,待确认消息,该消息RocketMQ不会去消费,而是处于等待过程中。
            // 流程2:流程1发送的这条MQ消息的本地事务(executeLocalTransaction),我们在这里进行发券操作,这个业务处理尽量在一个事务中处理,处理完成后,进行事务提交、或者回滚等操作
            // 如果流程2返回的是提交操作,那么流程1所发的待确认消息将会通知消费者进行消费,反之如果是回滚操作,则该消息将会被删除。
            // 如果流程2处理过程中,服务器挂了,或者其他任何不可控因素,导致流程2最后没有提交或者回滚操作。我们这里使用unknown状态来模拟服务器挂掉
            // 那么RocketMQ 将会调用checkLocalTransaction方法进行回查,回查频率为:1分钟回查一次,默认回查15次
    
            Message<Map<String, Object>> message = MessageBuilder.withPayload(paramMap).build();
            TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(MqConfig.Topic.topic_transaction_reward_coupon, message, users);
            log.info("事务消息发送结果:{}", transactionSendResult);
            return transactionSendResult;
        }
    }


    MqConfig 常量配置类:

    package com.sunjs.rocketmq.common;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    /**
     * MQ 常量配置
     *
     * @Date 2021/4/25 3:34 下午
     */
    public class MqConfig {
    
        /**
         * RocketMQ Topic 前缀
         */
        @Deprecated
        public static final String TOPIC_PREFFIX = "Topic-";
        /**
         * RocketMQ Group 前缀
         */
        public static final String GROUP_PREFFIX = "Group-";
    
        /**
         * Topic 常量配置
         *
         * @Author: sun
         * @Date: 2021/4/25 3:36 下午
         */
        public static class Topic {
            /** Topic 发送 String 字符串 **/
            public static final String TOPICSTRING = "TopicString";
            /** Topic 发送 Users  Object **/
            public static final String TOPICUSERS = "TopicUsers";
    
            /** Topic 发送 Users  顺序单队列 **/
            public static final String TOPICUSERSORDERLY = "TopicUsersOrderLy";
            /** Topic 发送 Users  无序多线程接收 **/
            public static final String TOPICUSERSCONCURRENTLY = "TopicUsersConcurrently";
    
            /** Topic 发送 含 tag 标签的消息 **/
            public static final String TOPICTAG = "TopicTag";
    
            /** 广播消息 **/
            public static final String TOPICBROADCASTING = "TopicBroadcasting";
    
            /** 其他消息语法接收 **/
            public static final String TOPICOTHER = "TopicOther";
    
            /** 模拟创建订单事务消息 **/
            public static final String topic_transaction_reward_coupon = "topic_transaction_createorder";
        }
    
    }


    测试发送消息,控制台输出如下:

    2021-04-25 21:24:10.752  INFO 47569 --- [nio-8080-exec-2] c.s.r.p.controller.ProducerController    : MQ发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001B9D118B4AAC28030537E0007, offsetMsgId=0AD3370B00002A9F0000000000148565, messageQueue=MessageQueue [topic=TopicString, brokerName=broker-a, queueId=1], queueOffset=2]
    2021-04-25 21:24:10.755  INFO 47569 --- [MessageThread_2] c.s.r.consumer.basic.StringConsumer      : 消费者接收:Topic:TopicString
    2021-04-25 21:24:10.756  INFO 47569 --- [MessageThread_2] c.s.r.consumer.basic.StringConsumer      : 业务处理:消息内容:你好,RocketMQ


    事务消息监听类:

    ExtTransactionListener

    package com.sunjs.rocketmq.listener;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * 自定义事务监听类
     * 自 rocketmq-spring-boot-starter 2.0还是2.1之后,该监听类不允许创建多个,只允许有一个
     *
     * @Date 2021/4/25 6:36 下午
     */
    @Slf4j
    @RocketMQTransactionListener
    public class ExtTransactionListener implements RocketMQLocalTransactionListener {
    
        private static Set<String> transactionList = new HashSet<>();
    
        /**
         * COMMIT_MESSAGE:提交消息,这个消息由prepared状态进入到commited状态,消费者可以消费这个消息;
         * ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
         * UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,而最终决定这个消息命运的,是checkLocalTransaction这个方法。
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) {
            log.info("本地事务开始执行。Message对象数据:{},Object对象数据:{}", message, obj);
            log.info("开始执行发券操作");
            MessageHeaders headers = message.getHeaders();
            // 获取事务ID
            String transactionId = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID, String.class);
            // 消息主题
            String topic = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC, String.class);
            // 消息key
            String keys = headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.KEYS, String.class);
            // 消息体内容
            String body = new String((byte[]) message.getPayload());
    
            try {
    
                Map<String, Object> bodyMap = JSON.parseObject(body, Map.class);
                String type = bodyMap.get("type").toString();
    
                // TODO 调用发券业务逻辑,发券业务逻辑理论上应该是事务方法 @Transactional,若失败则进行发券回滚,此处拦截也进行拦截
                // TODO 如果发券成功,在同一事务中建议将transaction_id保存一下,可以保存到数据库
                // 此处我们创建个表来存储事务消息,方便checkLocalTransaction回查
                // 表结构类似如下:
                // CREATE TABLE `d_test_weiif`.`无标题`  (
                //   `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
                //   `transaction_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息事务id(消息id)',
                //   `topic` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息主题',
                //   `keys` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'mq消息key',
                //   `body` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '消息体内容,json格式',
                //   `add_time` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '添加时间',
                //   `add_user_id` bigint(20) NULL DEFAULT NULL COMMENT '添加人',
                //    PRIMARY KEY (`id`) USING BTREE
                // ) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'RocketMQ 事务消息日志表' ROW_FORMAT = Compact;
    
                //其实只需要transaction_id即可,如果回查时能根据transaction_id回查到,则证明该消息正常,执行commit操作
    
                // 这里我们使用数据结构Set代替一下数据库进行存储
    
                if ("rollback".equals(type)) {
                    //模拟执行发券失败
                    Integer.valueOf("a");
                    return RocketMQLocalTransactionState.COMMIT;
                } else if ("submitMqFail".equals(type)) {
                    // 模拟服务器挂掉,挂掉时间为事务已经执行完成,但是MQ没有commit,这种情况下,RocketMQ会发起回查,调用 checkLocalTransaction()
                    transactionList.add(transactionId);
                    // 表示事务提交成功,但是MQ消息没有发出去
                    log.info("发券成功,提交MQ确认消息失败");
                    return RocketMQLocalTransactionState.UNKNOWN;
                }  else if ("rewardCouponFail".equals(type)) {
                    // 模拟服务器挂掉,挂掉时间为发券过程中,事务还未提交,但是MQ没有commit,这种情况下,RocketMQ会发起回查,调用 checkLocalTransaction()
                    log.info("发券中,哦~~~ 服务器挂掉了");
                    return RocketMQLocalTransactionState.UNKNOWN;
                } else {
                    // 发券成功,MQ消息确认成功
                    log.info("发券成功,已经通知MQ进行短信发送");
                    return RocketMQLocalTransactionState.COMMIT;
                }
            } catch (Exception e) {
                // 出现异常,MQ消息进行回滚
                log.info("发券出现异常,事务回滚,MQ消息回滚,异常信息:{}", e.getMessage());
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        /**
         * 回查轮询次数说明:1分钟回查一次,默认回查15次
         *
         * @Author: sun
         * @Date: 2021/4/23 6:00 下午
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            MessageHeaders headers = message.getHeaders();
            // 获取事务ID
            String transactionId = (String) headers.get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
            log.info("RocketMQ发起回查,回查事务消息ID:{}", transactionId);
            log.info("RocketMQ发起回查,回查事务消息数据:{}", message);
            try {
                // TODO 查询表中是否存在该事务ID,如果存在,则表示发券逻辑正常执行,如果不存在,则视情况程序员返回值而定, 是回滚还是继续回查
                if (transactionList.contains(transactionId)) {
                    // 查询到,返回给RocketMQ消息确认,此时RocketMQ消费者会进行消息消费操作
                    log.info("回查确认发券成功,准备通知MQ进行短信发送");
                    return RocketMQLocalTransactionState.COMMIT;
                } else {
                    // 未查询到,表示消息不存在,发券逻辑失败等情况,则进行回滚
                    log.info("回查发现发券失败,MQ消息事务回滚");
                    return RocketMQLocalTransactionState.ROLLBACK;
                    // 如果此处设置成 RocketMQLocalTransactionState.UNKNOWN,则RocketMQ还会继续回查
                }
            } catch (Exception e) {
                // 查询期间出现异常,则继续下次回查
                e.printStackTrace();
                log.error("RocketMQ发起回查,回查事务消息ID:{},回查出现异常,异常信息:{}", transactionId, e.getMessage());
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
    }


    测试发送对象消息:

    UsersConsumer

    package com.sunjs.rocketmq.consumer.basic;
    /**
     *                     .::::.
     *                   .::::::::.
     *                  :::::::::::    佛主保佑、永无Bug
     *              ..:::::::::::'
     *            '::::::::::::'
     *              .::::::::::
     *         '::::::::::::::..
     *              ..::::::::::::.
     *            ``::::::::::::::::
     *             ::::``:::::::::'        .:::.
     *            ::::'   ':::::'       .::::::::.
     *          .::::'      ::::     .:::::::'::::.
     *         .:::'       :::::  .:::::::::' ':::::.
     *        .::'        :::::.:::::::::'      ':::::.
     *       .::'         ::::::::::::::'         ``::::.
     *   ...:::           ::::::::::::'              ``::.
     *  ```` ':.          ':::::::::'                  ::::..
     *                     '.:::::'                    ':'````..
     */
    
    import cn.hutool.core.date.DateUtil;
    import com.sunjs.rocketmq.common.MqConfig;
    import com.sunjs.rocketmq.consumer._BaseConsumer;
    import com.sunjs.rocketmq.model.Users;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.MessageModel;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.annotation.SelectorType;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 测试消费端接收对象
     *
     * @Date 2021/4/25 2:48 下午
     */
    @Slf4j
    @Service
    @RocketMQMessageListener(
            topic = MqConfig.Topic.TOPICUSERS, //topic
            consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERS, //分组规则,Group-"topic命名"
            //下边可以去掉,都使用的是默认值
            consumeMode = ConsumeMode.CONCURRENTLY, //默认值,并发接收模式
            messageModel = MessageModel.CLUSTERING, //默认值,集群模式
            selectorType = SelectorType.TAG, // 默认值,标签
            selectorExpression = "*" // 默认值,匹配该topic下所有tag
    )
    public class UsersConsumer extends _BaseConsumer<Users> implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt message) {
            log.info("消费者接收:Topic:{}", message.getTopic());
            if (!isVerify(message, Users.class)) {
                return;
            }
            // 执行MQ 消息公共处理
            todo();
        }
    
        /**
         * 具体业务处理
         *
         * @Author: sun
         * @Date: 2021/4/25 3:29 下午
         */
        @Override
        protected void handle() {
            // TOOD 业务处理
            log.info("业务处理:用户ID:{}", body.getId());
            log.info("业务处理:用户姓名:{}", body.getName());
            log.info("业务处理:用户注册时间:{}", DateUtil.format(body.getRegisterTime(), "yyyy-MM-dd HH:mm:ss"));
        }
    }


    这里就不一一演示了,代码太多,文章超长。

    具体请看 git 代码


    gitee仓库地址



    下一篇: RocketMQ配置console控制台,配置登录账户密码


1 +1

版权声明

 Java  源码  springboot  rocketmq

 请文明留言

0 条评论