springboot整合rocketmq教程,

  springboot整合rocketmq教程,

  00-1010简介1。为什么要二次包装1.1二次包装的不同观点1.2包装的退出点1.3设计图案的应用2。二次包装的核心要点2.1。二次封装的主要讨论点2 . 1 . 2 Send/2.2 RocketMQTemplate封装消耗的几个消息实体2.2.1封装基本实体类2.2.2 RocketMQTemplate3.2.3增强的RocketMQTemplate 2.3 RocketMQListener封装2.4广播消息应用场景2.3代码封装完成测试

  00-1010预掌握:跳趾的基本使用,RocketMQ和跳趾的综合使用

  主要使用参考第二节核心使用参考第一篇难度:四颗星

  代码不难,关键是封装的思想需要体验。

  不同观点欢迎大家在评论区一起讨论学习。没有对错之分。每个系统都有不同的业务特点,适合系统的才是最好的~

  源码地址: https://gitee.com/tianxincoder/practice-rocketmq-enterprises的文章将只解释核心代码,其他基本集成配置和多环境自动隔离参考源代码。

  00-1010为了不产生歧义,本文中提到的二次封装是基于原有的使用模式,而不是源代码级别的二次封装。

  换句话说,如果所有的源代码都需要封装,那就说明公司的业务规模已经到了一定的程度,不需要再讨论二次封装这种事情了。封装已经是共识。

  首先,要明确的是:不进行二次封装完全不影响RocketMQ的使用,可以选择二次封装和不选择二次封装二次包装可以提供更多的功能和更简单的用法。如果一个包装比原来的用途更复杂,它就失去了二次包装的意义Q1:二次包装有必要吗?是的,一点也不影响正常使用Q2:有必要进行二次包装吗?不同的人有不同的看法。如果觉得没有必要那么这篇文章可以跳过~ORM框架中一个典型的二次封装框架是MyBatisPlus(简称MP),是对MyBatis原生用法的补充。有没有可能不用MP直接用MyBatis?绝对的。那为什么要用二次包装后的MP呢?场景:大部分数据库操作无非就是CRUD,所以最常用的有(根据名字就能知道这个方法是做什么的,不用再解释了):UpdateById、batchUpdate、deleteById、saveOrUpdate、batchInsert.对于以上五种操作,只改变了表字段和表名,其余语法相同。不封装:你可以直接用MyBatis完全实现以上方法的功能,但是每个表需要自己写一次方法。假设有100个表,将有495个重复的功能代码(如下所述)。而且所有的代码都是冗余的封装后:打包器提供了上述五个方法的公共实现,那么所有需要使用上述功能的服务自然只要继承打包后的类就拥有了上述五个功能,那么代码的冗余就从100个表*5个方法==500,去掉5个包,节省了495个代码冗余。所以二次包装就是要更方便,简洁,更适合系统,量身定做即可。就像上面五种方法,完全重复的东西为什么要浪费开发时间去做这些多余的事情?

  00-1010先从生活中的一份蛋炒饭说起吧。

  原架就像提供原材料:厨具、鸡蛋、大米等食材和菜谱。

  通常有两种使用框架的方法:第一种:根据食谱烹饪(调用原生方法),洗菜,烹饪,洗碗和清洁。管你呢,照顾第二种:找人学习这道菜的各种做法(负责二次包装的人)(包装大部分业务场景),做成点菜服务。谁想吃哪种蛋炒饭可以直接点(叫打包法)。回答:各有千秋(瞎说,哈哈~)

  第一种方法:原生优点:它可以有多种灵活的称呼。比如大家都用RocketMQTemplate native send发消息,想发什么样的消息都可以,比较随意。

  ong>缺点:代码大量冗余,从构建参数对象、发送消息、消费消息、异常处理、日志记录、异常重试啥啥的都是自己搞,每个消费队列就会出现上面所有的步骤。比如现在有一个订单处理中心A接收来自各种类型订单,此时如B、C两个原始订单来源想让A处理订单,那么B和C都需要按照A的要求进行调用,代码会冗余第二种:点餐式服务优点:封装了大部分统一的方法调用,比如 发送消息、异常处理、日志记录等等都是重复的,封装后点餐的人不需要再关系这一部分要怎么处理,只需要告诉点餐服务要不要进行异常、要不异常重试等等,那么此时对于点餐人来说只需要 付钱(调用服务)吃饭(消费消息),除此之外啥也不用管,全部由点餐服务提供者完成所有上述两个步骤外的其它操作缺点:无法满足所有点餐人的要求,有的人喜欢味道重一点,有的喜欢味道淡一点。但是这个缺点完全可以处理,比如点餐服务提供了自定义厨房(返回原始发送对象),此时调用者可以按照第一种方式进行使用选哪种?个人而言:业务系统复杂的优先选择第二种简单业务的选择第一种(尽量采用封装,后续维护方便)。对于一个复杂的系统,本身业务级的代码就已经很多了,结果还要每个人处理全部一样的东西,消费者越多代码冗余越多。如果一个系统只是为了使用MQ来进行业务分离,消费者也不多,那么可以选择最快的方式,但是最终会选择第二种,如果业务随着时间增长越复杂,越晚改成第二种花费的代价越大!第一种就好比此时我们要直接操作内存,原生操作就好比C++或C,可以直接操作内存,但是同时用完后还要自己写各种异常处理和释放内存;代码封装就好比Java,我们只需要告诉Java我们要使用内存,然后用完就不用管企业中,业务功能产出是一级优先级,在此之上才能有更高级的东西。技术服务于业务,而不是业务服务于技术!比如现在30个人的系统,我们要使用缓存加速访问,那么我们是选择 内部缓存(直接用集合或者map存起来)还是用Redis?内部缓存和Redis能不能达到目的?能哪个更方便更快?内部缓存!内部对象就很快实现如果业务发展迟早会转为Redis这种专业的缓存中间件,就好比业务发展前第一种,业务发展后选择第二种,但是对于大部分业务系统来说功能增加是很快的,特别是产品同事上一分钟提需求下一分钟就要上线这种(开个玩笑~),所以我们在引用一个技术需不需要进行二次封装时需要技术负责人对业务增长有一个预判。建议是都进行封装一下

  

1.2 封装的抽离点

对于二次封装,其中最主要的就是找出该框架在日常使用中所出现的大部分涉及到的操作,然后找出变化操作和不变化操作RocketMQ日常使用主要场景为例:发送消息阶段:准备需要发送的消息、发送消息、记录原始消息日志、发送失败处理、可靠性处理消费消息阶段:记录接收消息日志、业务处理、业务日志记录、异常处理、异常重试、异常通知、死信处理提取变化点和不变化点(可以抽取为公共处理的场景)发送消息阶段:变化点:准备需要发送的消息不变化点:发送消息、记录原始消息日志、发送失败处理、可靠性处理消费消息阶段变化点:业务处理、业务日志记录不变化点:记录接收消息日志、异常处理、异常重试、异常通知、死信处理从上可以看到,对于RocketMQ的使用,大部分场景都是可以抽离成一个公共的方法处理,只有业务级的需要自己处理,所以如果我们把不变化场景抽取后,每个同事只需要写自己业务相关部分即可抽取后的复杂度:对于新加一个消费者,只需要处理业务相关三个场景(准备需要发送的消息、业务处理、业务日志记录),剩下的九个场景,只需要封装一次就可以。需要现在就几十个消费者,可以想想一些减少了多少代码冗余

  

1.3 设计模式的应用

要封装出一个好的抽象层,【设计模式】建议好好体会和学习一下设计模式对于用不到的人来说比较虚幻,对于用的到的人来说,这个真牛X

  

二、二次封装核心要点

  

2.1 二次封装核心点

  

2.1.1 封装主要讨论点

对于RocketMQ或者说对于整个MQ体系来说(不管是RabbitMQ、RocketMQ、Kafka)等封装的核心主要有两个:发送消息、消费消息者两个场景对于RocketMQ我们主要讨论三个地方:RocketMQTemplate封装、RocketMQListener封装和广播消息的封装广播消息是分布式系统中同时让所有节点都干一件事情的一个好的方式,如果用不到忽略广播消息即可

  

2.1.2 发送/消费的几种消息实体

RocketMQ发送消息对于不同的使用来说,大部分选择下面的几种发送消息类型A、发送Json对象,比如Fastjson的JSONObjectB、直接发送转Json后的String对象C、根据业务封装对应实体类D、直接使用原生MessageExt接收怎么选择?怎么选择才是最优?上面哪一种都可以达到目的,如果要统一封装就必须要有一个标准怎么选择只需要回答这个问题:在不看消息发送者的情况下,消费者怎么知道发送者发送的消息含义?比如现在有一个订单消息,如果我们不看消息发送者,怎么知道发送者给消费者发送哪些字段A、B、D可以吗?一定不可以!JSON对象和String对象,如果我们不看消息发送者不可能知道到底发送了啥,这点我相信没有可以讨论的地方,因为类型决定了这个操作不可能C可以吗?可以!此时不需要看消息发送者,只需要看消费者的实体类点进去,有哪些业务字段一清二楚可能有杠要抬了,有看实体类的功夫,我看消息发送者都看完了灵魂拷问1:如果消息发送者和消费者不在一个系统怎么看?邪魅一笑,不同业务线可能没代码权限吧?分布式系统完全独立可能吧?灵魂拷问2:如果现在需要一个功能,如果某些必须要的字段消息发送者如果没有给的话需要校验,普通String和JSONObject怎么实现?换成实体类呢?基于上述讨论点,封装建议基于实体类来,实体类不管是排查问题、新人熟悉系统代码、信息校验等String和JSONObject无法像实体类一样轻松胜任

  

2.2 RocketMQTemplate封装

  

2.2.1 封装基础实体类

基础消息实体类封装了除了业务消息外所有其他公共字段,主要看下面代码中的字段和注释基础抽象消息实体,包含基础的消息、根据自己的业务消息设置更多的字段其中也可以包含所有消费者可能用得到的方法等,比如做些数据的加解密
package com.codecoord.rocketmq.domain;import lombok.Data;import java.time.LocalDateTime;import java.util.UUID;/** * 基础消息实体,包含基础的消息 * 根据自己的业务消息设置更多的字段 * * @author tianxincoord@163.com * @since 2022/6/16 */@Datapublic abstract class BaseMqMessage { /** * 业务键,用于RocketMQ控制台查看消费情况 */ protected String key; /** * 发送消息来源,用于排查问题 */ protected String source = ""; /** * 发送时间 */ protected LocalDateTime sendTime = LocalDateTime.now(); /** * 跟踪id,用于slf4j等日志记录跟踪id,方便查询业务链 */ protected String traceId = UUID.randomUUID().toString(); /** * 重试次数,用于判断重试次数,超过重试次数发送异常警告 */ protected Integer retryTimes = 0;}
有了此基础抽象实体类,那么剩下的所有业务消息实体只需要继承此基类,然后在自己业务类中包含自己需要的字段即可,因为这些公共字段不管是向上转型还是向下转型,子类和父类都可以看得到

  

2.2.2 RocketMQTemplate

RocketMQTemplate发送消息的代码如果不封装,我们发送消息需要这样String destination = topic + ":" + tag;template.syncSend(destination, message);每个人发送消息都要自己处理这个冒号,直接传入topic和tag不香吗?按照抽离变化点中的变化点,只有消息是变化的,除此之外的其他规则交给封装类RocketMQTemplate主要封装发送消息的日志、异常的处理、消息key设置、等等其他配置封装代码类如下,下面包含了主要发送方式,更多自己添加即可这里就是消息发送的点餐机器,同时也提供了封装方法也提供原始RocketMQTemplate供使用此处只是提供一种方式,生产中按照项目组商量决定
package com.codecoord.rocketmq.template;import com.alibaba.fastjson.JSONObject;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.util.JsonUtil;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * RocketMQ模板类 * * @author tianxincoord@163.com * @since 2022/4/15 */@Componentpublic class RocketMqTemplate { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class); @Resource(name = "rocketMQTemplate") private RocketMQTemplate template; /** * 获取模板,如果封装的方法不够提供原生的使用方式 */ public RocketMQTemplate getTemplate() { return template; } /** * 构建目的地 */ public String buildDestination(String topic, String tag) { return topic + RocketMqSysConstant.DELIMITER + tag; } /** * 发送同步消息 */ public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(topic + RocketMqSysConstant.DELIMITER + tag, message); } public <T extends BaseMqMessage> SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 发送延迟消息 */ public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) { return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel); } public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) { Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult)); return sendResult; }}
这个类是最基础的原始封装类,相当于餐馆提供的点餐服务。上面提供无业务特性的发送,比如想要发送日志消息或者动态发送消息目的场景

  

3.2.3 增强RocketMQTemplate

以订单处理中心来说,变化点仅仅只是单号等业务数据不一样,发往订单处理中心的消息不管是topic还是tag等等其实完全都一样,那么此时可以根据业务来增加封装增强原始功能需要注意下面两个点所有父类能出现的地方,子类都能出现:也就是子类拥有功能 >= 父类 ,比如Java的List,只要入参是List的地方,传ArrayList和LinkedList完全可以增强功能不能改变原始功能的行为:比如父类有一个方法say是说话,结果子类覆写了say改成了行为是吃饭,然后当调用者调用say的时候得到了一个完全预期外的结果就以订单中心消息发送为例,封装OrderMessageTemplate继承自RocketMqTemplate,此时前者就拥有了封装父类的所有基础方法,拥有了所有父类的功能。然后可以在前者增加自身业务特性的发送方法,比如发送订单处理消息
package com.codecoord.rocketmq.template;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import org.apache.rocketmq.client.producer.SendResult;import org.springframework.stereotype.Component;import javax.annotation.Resource;import javax.validation.constraints.NotNull;import java.time.LocalDate;import java.time.LocalDateTime;/** * 订单类发送消息模板工具类 * * @author tianxincode@163.com * @since 2022/6/16 */@Componentpublic class OrderMessageTemplate extends RocketMqTemplate { /// 如果不采用继承也可以直接注入使用 /* @Resource private RocketMqTemplate rocketMqTemplate; */ /** * 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理 * 这样对于调用者而言,可以更加简化调用 */ public SendResult sendOrderPaid(@NotNull String orderId, String body) { RocketMqEntityMessage message = new RocketMqEntityMessage(); message.setKey(orderId); message.setSource("订单支付"); message.setMessage(body); // 这两个字段只是为了测试 message.setBirthday(LocalDate.now()); message.setTradeTime(LocalDateTime.now()); return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message); }}
此时对于调用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息发送到订单处理中心封装后的好处,假如现在有10个订单来源,现在需要调整消息发送格式,如果不进行封装那么10个来源发送的地方都需要改;如果进行了二次封装,只需要改sendOrderPaid方法即可,而且还不会出错,此时优势就体现出来了

  

2.3 RocketMQListener封装

RocketMQListener是消费消息的核心,同时也涉及到更多的操作,比如:基础日志记录、异常处理、消息重试、警告通知等等等按照抽离变化点,RocketMQListener只应该处理与自身业务相关的,除此之外的其它应该交给父类,子类只需要告诉父类要不要异常处理、要不要重试等等,点餐式服务封装消息消费的抽象类注意泛型限定为标准基础消息类,这样能到消费者的一定有统一的标准类BaseMqMessage下面简单封装示例
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.template.RocketMqTemplate;import com.codecoord.rocketmq.util.JsonUtil;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.MDC;import javax.annotation.Resource;import java.time.Instant;import java.util.Objects;/** * 抽象消息监听器,封装了所有公共处理业务,如 * 1、基础日志记录 * 2、异常处理 * 3、消息重试 * 4、警告通知 * 5、.... * * @author tianxincoord@163.com * @since 2022/4/17 */public abstract class BaseMqMessageListener<T extends BaseMqMessage> { /** * 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化 */ protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RocketMqTemplate rocketMqTemplate; /** * 消息者名称 * * @return 消费者名称 */ protected abstract String consumerName(); /** * 消息处理 * * @param message 待处理消息 * @throws Exception 消费异常 */ protected abstract void handleMessage(T message) throws Exception; /** * 超过重试次数消息,需要启用isRetry * * @param message 待处理消息 */ protected abstract void overMaxRetryTimesMessage(T message); /** * 是否过滤消息,例如某些 * * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ protected boolean isFilter(T message) { return false; } /** * 是否异常时重复发送 * * @return true: 消息重试,false:不重试 */ protected abstract boolean isRetry(); /** * 消费异常时是否抛出异常 * * @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack) */ protected abstract boolean isThrowException(); /** * 最大重试次数 * * @return 最大重试次数,默认10次 */ protected int maxRetryTimes() { return 10; } /** * isRetry开启时,重新入队延迟时间 * * @return -1:立即入队重试 */ protected int retryDelayLevel() { return -1; } /** * 由父类来完成基础的日志和调配,下面的只是提供一个思路 */ public void dispatchMessage(T message) { MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId()); // 基础日志记录被父类处理了 logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message)); if (isFilter(message)) { logger.info("消息不满足消费条件,已过滤"); return; } // 超过最大重试次数时调用子类方法处理 if (message.getRetryTimes() > maxRetryTimes()) { overMaxRetryTimesMessage(message); return; } try { long start = Instant.now().toEpochMilli(); handleMessage(message); long end = Instant.now().toEpochMilli(); logger.info("消息消费成功,耗时[{}ms]", (end - start)); } catch (Exception e) { logger.error("消息消费异常", e); // 是捕获异常还是抛出,由子类决定 if (isThrowException()) { throw new RuntimeException(e); } if (isRetry()) { // 获取子类RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(annotation)) { message.setSource(message.getSource() + "消息重试"); message.setRetryTimes(message.getRetryTimes() + 1); SendResult sendResult; try { // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息) // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息 sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel()); } catch (Exception ex) { throw new RuntimeException(ex); } // 发送失败的处理就是不进行ACK,由RocketMQ重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { throw new RuntimeException("重试消息发送失败"); } } } } }}
封装消费最终类注意:收到的消息是先委派给父类,父类进行调度管理
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 实体类消费监听器,在实现RocketMQListener中间还加了一层BaseMqMessageListener来处理基础业务消息 * * @author tianxincoord@163.com * @since 2022/5/12 */@Slf4j@Component@RocketMQMessageListener( topic = RocketMqBizConstant.SOURCE_TOPIC, consumerGroup = RocketMqBizConstant.SOURCE_GROUP, selectorExpression = RocketMqBizConstant.SOURCE_TAG, // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小 consumeThreadMax = 5)public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage> implements RocketMQListener<RocketMqEntityMessage> { /** * 此处只是说明封装的思想,更多还是要根据业务操作决定 * 内功心法有了,无论什么招式都可以发挥最大威力 */ @Override protected String consumerName() { return "RocketMQ二次封装消息消费者"; } @Override public void onMessage(RocketMqEntityMessage message) { // 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型 super.dispatchMessage(message); } @Override protected void handleMessage(RocketMqEntityMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 System.out.println("业务消息处理"); } @Override protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 } @Override protected boolean isRetry() { return false; } @Override protected int maxRetryTimes() { // 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用 return 5; } @Override protected boolean isThrowException() { // 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常 return false; }}
封装后对于子类来说,只需要告诉父类要不要做就拥有了最开始说的所有功能,简化了使用,此时子类消费者只需要专注于自己的业务核心处理就可以了

  

2.4 广播消息的应用场景

应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息也就是需要所有服务节点都需要同事做某一件事情的时候,此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的,所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释,否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * 广播消息 * 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息 * 也就是需要所有服务节点都需要同事做某一件事情的时候 * 此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理 * * 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876 * 这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的 * 所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释 * 否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题 * * @author tianxincoord@163.com * @since 2022/5/12 */@Slf4j@Component@RocketMQMessageListener( topic = RocketMqBizConstant.SOURCE_TOPIC, consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP, selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG // messageModel = MessageModel.BROADCASTING)public class RocketBroadcastingListener implements RocketMQListener<MessageExt> { /** * MessageExt:内置的消息实体,生产中根据需要自己封装实体 */ @Override public void onMessage(MessageExt message) { log.info("收到广播消息【{}】", new String(message.getBody())); }}

  

2.3 代码封装完结测试

封装测试大家可以直接参考RocketMqController即可

  

package com.codecoord.rocketmq.controller;import com.alibaba.fastjson.JSONObject;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import com.codecoord.rocketmq.template.OrderMessageTemplate;import com.codecoord.rocketmq.template.RocketMqTemplate;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendResult;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.time.LocalDate;import java.time.LocalDateTime;import java.util.UUID;/** * 消息发送 * * @author tianxin01@huice.com * @since 2022/6/16 */@RestController@RequestMapping("/rocketmq")@Slf4jpublic class RocketMqController { /** * 注意此处注入的是封装的RocketMqTemplate */ @Resource private RocketMqTemplate rocketMqTemplate; /** * 注入对应业务的模板类 */ @Resource private OrderMessageTemplate orderMessageTemplate; /** * 通过实体类发送消息,发送注意事项请参考实体类 * 说明:也可以在RocketMqTemplate按照业务封装发送方法,这样只需要调用方法指定基础业务消息接口 */ @RequestMapping("/entity/message") public Object sendMessage() { RocketMqEntityMessage message = new RocketMqEntityMessage(); // 设置业务key message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询we年 message.setSource("封装测试"); // 业务消息内容 message.setMessage("当前消息发送时间为:" + LocalDateTime.now()); // Java时间字段需要单独处理,否则会序列化失败 message.setBirthday(LocalDate.now()); message.setTradeTime(LocalDateTime.now()); return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message); } /** * 此时对于调用者而且,无需创建任何类 * 如果某天需要调整消息发送来源,如果不封装,所有原来产生message的地方全部改 * 如果封装了,只需要改sendOrderPaid就可以切换 */ @RequestMapping("/order/paid") public Object sendOrderPaidMessage() { return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货"); } /** * 直接将对象进行传输,也可以自己进行json转化后传输 */ @RequestMapping("/messageExt/message") public SendResult convertAndSend() { // 生产中不推荐使用jsonObject传递,不看发送者无法知道传递的消息包含什么信息 JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "messageExt"); String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG); // 如果要走内部方法发送则必须要按照标准来,否则就使用原生的消息发送 return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject); }}
到此这篇关于RocketMQ整合SpringBoot实现生产级二次封装的文章就介绍到这了,更多相关RocketMQ SpringBoot封装内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: