springcloud rabbitmq消息确认,rabbitmq发送消息确认

  springcloud rabbitmq消息确认,rabbitmq发送消息确认

  如何解决写爬虫IP受阻的问题?立即使用。

  最近部门号召大家多组织技术分享会,说是为了搞活公司的技术氛围,但我知道这个T M只是为了刷KPI。然而,从另一方面来说,这确实是一件好事。与其开那些无聊的斗嘴会,多做技术交流对个人成长还是很有帮助的。

  于是我主动报名分享,咳咳咳~,真的不是为了那个KPI,只是为了跟大家学习!

  这次分享springboot rabbitmq如何实现消息确认机制,以及实际开发中踩坑的一点经验。其实整体内容比较简单,有时候事情就是这么神奇,越简单的东西越容易出错。

  可以看出,使用RabbitMQ后,我们的业务链接明显更长了。虽然实现了系统间的解耦,但是可能导致消息丢失的场景也增加了。例如:

  消息生成器-rabbitmq服务器(消息发送失败)

  rabbitmq服务器本身出现故障,导致消息丢失。

  消息使用者-rabbitmq服务(无法使用消息)

  所以,如果能使用中间件,尽量不要使用。如果为了用而用,只会增加你的烦恼。开启消息确认机制后,虽然在很大程度上保证了消息的准确传递,但由于频繁的确认交互,rabbitmq的整体效率变低,吞吐量下降严重。对于不是很重要的消息,真的不建议你使用消息确认机制。

  我们先实现springboot rabbitmq的消息确认机制,然后具体分析一下遇到的问题。

  

一、准备环境

  

1、引入 rabbitmq 依赖包

  依赖性

  groupIdorg.springframework.boot/groupId

  在Artifact Spring-Boot-Starter-AMQP/Artifact ID/Dependency

2、修改 application.properties 配置

  的配置中,需要开启发送方和消费方的消息确认。

  spring . rabbit MQ . host=127 . 0 . 0 . 1 spring . rabbit MQ . port=5672 spring . rabbit MQ . username=guest

  spring.rabbitmq.password=guest

  #发送者打开确认机制。

  spring . rabbit MQ . publisher-confirmations=true #发送方开启退货确认机制

  spring . rabbit MQ . publisher-returns=true # # # # # # # # # # # # # # # # # # #

  #在用户端设置手动确认

  spring . rabbit MQ . listener . simple . acknowledge-mode=manual

  #你支持重试吗?

  spring . rabbit MQ . listener . simple . retry . enabled=true

3、定义 Exchange 和 Queue

  定义交换机confirmTestExchange和队列confirm_test_queue,并将队列绑定到交换机。

  @ configuration public class queue config {

  @Bean(name=confirmTestQueue )

  公共队列confirmTestQueue() {

  返回新队列( confirm_test_queue ,true,false,false);

  }

  @ Bean(name= confirmTestExchange )

  public fanout exchange confirmTestExchange(){

  返回新的fanout exchange( confirmTestExchange );

  }

  @Bean公共绑定confirmTestFanoutExchangeAndQueue(

  @ Qualifier( confirmTestExchange )fanout exchange confirmTestExchange,

  @ Qualifier( confirmTestQueue )Queue confirmTestQueue){

  返回binding builder . bind(confirmTestQueue)。to(confirmTestExchange);

  }}

  

二、消息发送确认

   Send Message Confirmation:用于确认生产者生产者将消息发送给代理,代理上的交易所将其传递给队列时,消息是否传递成功。

  从生产者到rabbitmq代理的消息具有confirmCallback确认模式。

  从exchange到队列的邮件传递失败具有returnCallback模式。

  我们可以利用这两次回调来确保100%交付。

  

1、 ConfirmCallback确认模式

   rabbit MQ代理收到消息后,将立即触发confirmCallback。

  @Slf4j

  @Componentpublic类ConfirmCallbackService实现RabbitTemplate。确认回调{

  @ Override public void confirm(correlation data correlation data,boolean ack,String cause) {

  如果(!确认){

  Log.error(消息发送异常!);

  }否则{

  log.info(发送者爸爸已经收到确认,correlationData={},ack={},cause={} ,correlationData.getId(),ack,cause);

  }

  }}实现接口确认回拨,重写其确认()方法,方法内有三个参数相关数据,确认,原因。

  相关数据:对象内部只有一个编号属性,用来表示当前消息的唯一性确认:消息投递到经纪人的状态,真的表示成功原因:表示投递失败的原因。但消息被经纪人接收到只能表示已经到达(法属)马提尼克岛(马提尼克岛的简写)服务器,并不能保证消息一定会被投递到目标长队里。所以接下来需要用到returnCallback

  

2、 ReturnCallback 退回模式

   如果消息未能投递到目标长队里将触发回调返回回调,一旦向长队投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

  @Slf4j

  @Componentpublic类ReturnCallbackService实现兔子模板.返回回调{

  @ Override public void返回的消息(Message Message,int replyCode,String replyText,String exchange,String routingKey) {

  日志。info(返回消息===replyCode={ },replyText={},exchange={},routingKey={} ,回复代码,replyText,exchange,routing key);

  }}实现接口返回回调,重写returnedMessage()方法,方法有五个参数消息(消息体)、replyCode(响应code)、replyText(响应内容)、交换(交换机)、路由键(队列)。

  下边是具体的消息发送,在兔子模板中设置确认和返回回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个相关数据对象,添加一个编号为10000000000。

  @ Autowired私兔模板兔模板;

  @ auto wired private ConfirmCallbackService ConfirmCallbackService;

  @ auto wired private ReturnCallbackService ReturnCallbackService;

  公共void sendMessage(字符串交换、字符串路由密钥、对象消息){

  /**

  * 确保消息发送失败后可以重新返回到队列中

  * 注意:yml需要配置发布者-返回:真

  */

  兔子模板。设置强制(真);

  /**

  * 消费者确认收到消息后,手动确认字符(确认字符)回执回调处理

  */

  兔子模板。setconfirmcallback(confirmCallbackService);

  /**

  * 消息投递到队列失败回调处理

  */

  兔子模板。setreturncallback(returnCallbackService);

  /**

  * 发送消息

  */

  兔子模板。convertandsend(交换,路由密钥,消息,

  消息- {

  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode .持久);

  返回消息;

  },

  新的相关数据(uuid。随机uuid().toString()));

  }

三、消息接收确认

   消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(确认)的过程。使用@RabbitHandler注解标注的方法要增加频道(信道)、消息两个参数。

  @Slf4j

  @组件

  @ rabbit listener(queues= confirm _ test _ queue )公共类接收消息1 {

  @ rabbit handler public void process handler(字符串msg,通道Channel,消息Message)抛出IOException {

  尝试{

  log.info(小富收到消息:{} ,msg);

  //TODO具体业务

  渠道。基本确认(消息。getmessageproperties().getDeliveryTag(),false);

  } catch(异常e) {

  如果(消息。getmessageproperties().get rede delivered()){

  log.error(消息已重复处理失败,拒绝再次接收.);

  渠道。基本项目(消息。getmessageproperties().getDeliveryTag(),false);//拒绝消息

  }否则{

  log.error(消息即将再次返回队列处理.);

  渠道。基本nack(消息。getmessageproperties().getDeliveryTag(),false,true);

  }

  }

  }}消费消息有三种回执方式。我们来分析一下每种方法的意义。

  

1、basicAck

  基本确认:表示确认成功。使用这种回执方式后,消息会被rabbitmq broker删除。

  Voidbasick (Long deliveryTag,Boolean Multiple) deliveryTag:表示消息传递序列号,每次消耗或重新传递消息时,Delivery Tag都会增加。在手动消息确认模式下,我们可以执行ack、nack、reject等操作。在具有指定deliveryTag的邮件上。

  多个:是否批量确认;如果该值为true,所有小于当前消息deliveryTag的消息将被一次性确认。

  举个栗子:假设我先发三条消息,deliveryTag分别是5,6,7,但是都没有确认。当我发送第四条消息时,当deliveryTag为8,multiple设置为true时,5、6、7、8的所有消息都会被确认。

  

2、basicNack

  基本确认:表示失败确认。这种方法一般在消费消息业务异常时使用,消息可以重新交付到队列中。

  Void基本NACK(长递送标记、布尔型多重、布尔型请求)递送标记:指明消息递送序列号。

  多个:是否批量确认。

  Requeue:值为true的消息将被重新排队。

  

3、basicReject

   basicReject: Reject消息,它与basicNack的不同之处在于无法执行批处理操作。其他用法也差不多。

  Void基本拒绝(长递送标记,布尔队列)递送标记:表示消息递送序列号。

  Requeue:值为true的消息将被重新排队。

  

四、测试

  发送消息,测试消息确认机制是否有效。从执行结果来看,发送方发送消息后成功回调,消费方成功消费消息。

  使用数据包捕获工具Wireshark观察rabbitmq amqp协议交互的变化,也有更多的ack过程。

  

五、踩坑日志

  

1、不消息确认

  这是一个非常不熟练的坑,但却是一个非常容易出错的地方。

  开启消息确认机制,消费消息时不要忘记channel.basicAck,否则消息会一直存在,导致重复消费。

  

2、消息无限投递

  我第一次接触消息确认机制的时候,消费端的代码是这样写的。想法很简单:在处理业务逻辑后确认消息,在int a=1/0异常后将消息放回队列。

  @ rabbit handler public void process handler(String msg,Channel channel,Message message)抛出IOException {

  尝试{

  Log.info(消费者2收到:{} ,msg);

  int a=1/0;

  channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);

  } catch(异常e) {

  channel . basic nack(message . getmessageproperties()。getDeliveryTag(),false,true);

  }

  }但是有一个问题。99.9%的业务代码一旦出现bug,不会自动修复。消息将被无限期地传递到队列,使用者将无限期地执行它,从而导致无限循环。

  本地CPU瞬间被占满。你可以想象当服务在生产环境中崩溃时,我有多惊慌。

  而rabbitmq管理层只有一条未经证实的消息。

  经过测试和分析,发现当消息被重新传递到消息队列时,消息不会返回到队列的末尾,而仍然会在队列的头部。

  消费者将立即消费该消息,业务流程将抛出异常,该消息将重新加入团队,等等。消息队列处理被阻止,正常消息无法运行。

  当时我们的解决方案是先回复消息,然后消息队列会删除消息。同时,我们再次将消息发送到消息队列,将异常消息放在消息队列的末尾,既保证了消息不会丢失,又保证了业务的正常进行。

  channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);//将消息重新发送到队列的末尾。getreceivedxchange(),

  message.getMessageProperties()。getReceivedRoutingKey(),MessageProperties。PERSISTENT_TEXT_PLAIN,

  JSON . tojsonbytes(msg));但是这种方法并没有解决根本问题,仍然会时不时的报错。稍后,优化消息重试的次数。达到重试限制后,消息会被人工确认,被队列删除,持久化在MySQL中并推送到闹钟,由调度任务进行人工处理和补偿。

  

3、重复消费

  如何保证MQ的消耗是幂等的?这个需要根据具体业务来确定。可以用MySQL或redis持久化消息,然后验证消息中的唯一性属性。以上是springboot rabbitmq如何用消息确认的细节。更多请关注我们的其他相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: