rocketmq指定消费者消费,rocketmq消费成功确认

  rocketmq指定消费者消费,rocketmq消费成功确认

  00-1010一、简介1.1 Rocket MQ 1.2工作流程简介2、消费者启动流程2.1实例化消费者2.2设置NameServer并订阅主题流程2.2.1添加tag2.2.2向Broker发送heart beat 2 . 2 . 3向FilterServer上传filter类2.3注册回调实现类2.4消费者启动III。拉/推模式消耗3.1拉模式-DefaultMQPullConsumer3.2推模式-DefaultMQPushConsumer3.3总结四。顺序消息4.1实现MQ顺序消息发送存在问题4.2实现MQ顺序消息的关键点5。消息确认机制5.1消息消费的失败处理5.2消息重播带来的问题VI。摘要

  

目录

 

  

一、简介

RocketMQ是阿里巴巴开源的分布式消息中间件。它支持顺序消息,定时消息,自定义过滤器,负载平衡,拉/推消息和其他功能。RocketMQ主要由生产者、代理、消费者和名称服务器四部分组成,其中生产者负责生产消息,消费者负责消费消息,代理负责存储消息。名称服务器充当名称路由服务,整体架构图如下:

 

  * *生产者:* *负责生产消息。通常,业务系统产生消息,这些消息可以以集群模式部署。RocketMQ提供了多种发送方式,包括同步发送、异步发送、顺序发送和单向发送。同步和异步方法都需要Broker返回确认信息,但单向发送不需要。* *消费者:* *负责消费消息。一般来说,后台系统负责异步消费,可以采用集群模式部署。消息消费者将从代理服务器提取消息,并将其提供给应用程序。提供拉/推两种消费模式。* *代理服务器:* *负责存储和转发消息。RocketMQ系统负责接收和存储生产者发送的消息,准备消费者的拉取请求,存储与消息相关的元数据,包括消费者群体、消费进度偏移、主题和队列消息等。* *名称服务器:* *名称服务,充当路由消息的提供者。或者生产者和消费者可以通过名称服务找到每个话题对应的经纪人IP列表。名称服务器的多个实例是集群的,彼此独立,没有信息交换。本文基于最新版本的Apache RocketMQ,主要讲述RocketMQ的消费者机制,分析其启动过程、拉/推机制、消息ack机制,以及定时消息和顺序消息的区别。

  

1.1 RocketMQ 简介

(1)启动NameServer。

 

  服务器起来后监听端口,等待代理、生产者、消费者的连接,相当于一个路由控制中心。

  (2)启动Broker。

  与所有名称服务器保持长时间连接,并定期发送心跳包。心跳包包含当前代理信息(IP端口等。)和所有存储的主题信息。注册成功后,名称服务器集群中的主题和代理之间存在映射关系。

  (3)创建Topic。

  创建主题时,您需要指定主题应该存储在哪个代理上,或者您可以在发送消息时自动创建主题。

  (4)Producer发送消息。

  启动时,首先与其中一个名称服务器集群建立一个长连接,并从名称服务器获取当前发送的主题存在于哪些代理中。轮询从队列列表中选择一个队列,然后与队列所在的代理建立长连接,向代理发送消息。

  (5)Consumer消费消息。

  与其中一个名称服务器建立长连接,获取当前订阅主题存在于哪些代理上,然后直接与代理建立连接通道,开始消费消息。

  00-1010官方给出的消费者实现代码如下:

  类consumer { public static void main(string[]args)抛出中断的异常,mqclientexception {//instantiate consumer defaultmqpushconsumer=new defaultmqpushconsumer( test consumer );//设置NameServer的地址,consumer . setnamesraddr( localhost 33609876 );//订阅一个主题,标签过滤需要消费的消息消费者。

  subscribe("Test", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); }}下面让我们来分析消费者在启动中每一阶段中做了什么吧,let’s go.

  

 

  

2.1 实例化消费者

第一步主要是实例化消费者,这里采取默认的Push消费者模式,构造器中参数为对应的消费者分组,指定同一分组可以消费同一类型的消息,如果没有指定,将会采取默认的分组模式,这里实例化了一个DefaultMQPushConsumerImpl对象,它是后面消费功能的主要实现类。

 

  

// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");

主要通过DefaultMQPushConsumer实例化DefaultMQPushConsumerImpl,它是主要的消费功能实现类。

 

  

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); }

 

  

2.2 设置NameServer和订阅topic过程

// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("Test", "*");

 

  

2.2.1 添加tag

设置NameServer地址后,这个地址为你名字服务集群的地址,类似于zookeeper集群地址,样例给出的是单机本地地址,搭建集群后,可以设置为集群地址,接下来我们需要订阅一个主题topic下的消息,设置对应的topic,可以进行分类,通过设置不同的tag来实现,但目前只支持""进行连接,如:"tag1 tag2 tag3"。归根在于构造订阅数据时,源码通过""进行了字符串的分割,如下所示:

 

  

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString subString.equals(SubscriptionData.SUB_ALL) subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData;}

 

  

2.2.2 发送心跳至Broker

前面构造好订阅主题和分类后,将其放入了一个ConcurrentMap中,并调用sendHeartbeatToAllBrokerWithLock()方法,进行心跳检测和上传过滤器类至broker集群(生产者启动过程也会进行此步骤)。如下所示:

 

  

public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); }}

首先会对broker集群进行心跳检测,在此过程中会施加锁,它会执行sendHeartbeatToAllBroker方法,构建心跳数据heartbeatData,然后遍历消费和生产者table,将消费者和生产者信息加入到heartbeatData中,当都存在消费者和生产者的情况下,会遍历brokerAddrTable,往每个broker 地址发送心跳,相当于往对应地址发送一次http请求,用于探测当前broker是否存活。

 

  

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

 

  

2.2.3上传过滤器类至FilterServer

之后会执行uploadFilterClassSource()方法,只有push模式才会有此过程,在此模式下,它会循环遍历订阅数据SubscriptionData,如果此订阅数据使用了类模式过滤,会调uploadFilterClassToAllFilterServer()方法:上传用户自定义的过滤消息实现类至过滤器服务器。

 

  

private void uploadFilterClassSource() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> next = it.next(); MQConsumerInner consumer = next.getValue(); if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) { Set<SubscriptionData> subscriptions = consumer.subscriptions(); for (SubscriptionData sub : subscriptions) { if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) { final String consumerGroup = consumer.groupName(); final String className = sub.getSubString(); final String topic = sub.getTopic(); final String filterClassSource = sub.getFilterClassSource(); try { this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); } catch (Exception e) { log.error("uploadFilterClassToAllFilterServer Exception", e); } } } } }}

过滤器类的作用:消费端可以上传一个Class类文件到 FilterServer,Consumer从FilterServer拉取消息时,FilterServer会把请求转发给Broker,FilterServer收取到Broker消息后,根据上传的过滤类中的逻辑做过滤操作,过滤完成后再把消息给到Consumer,用户可以自定义过滤消息的实现类。

 

  

 

  

2.3 注册回调实现类

接下来就是代码中的注册回调实现类了,当然,如果你是pull模式的话就不需要实现它了,push模式需要定义,两者区别后面会讲到,它主要用于从broker实时获取消息,这里有两种消费上下文类型,用于不同的消费类型。

 

  **ConsumeConcurrentlyContext:**延时类消息上下文,用于延时消息,即定时消息,默认不延迟,可以设置延迟等级,每个等级对应固定时间刻度,RocketMQ中不能自定义延迟时间,延迟等级从1开始,对应的时间间隔如下所示:

  

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

 

  

**ConsumeOrderlyContext :**顺序类消息上下文,控制发送消息的顺序,生产者设置分片路由规则后,相同key只落到指定queue上,消费过程中会对顺序消息所在的queue加锁,保证消息的有序性。

 

  

 

  

2.4 消费者启动

我们先来看下消费者启动的过程,如下所示:

 

  

 

  **(1)this.checkConfig():**首先是检测消费配置项,包括消费分组group、消息模型(集群、广播)、订阅数据、消息监听器等是否存在,如果不存在的话,会抛出异常。

  **(2)copySubscription():**构建主题订阅信息SubscriptionData并加入到RebalanceImpl负载均衡方法的订阅信息中。

  **(3)getAndCreateMQClientInstance():**初始化MQ客户端实例。

  **(4)offsetStore.load():**根据不同消息模式创建消费进度offsetStore并加载:BROADCASTING-广播模式,同一个消费group中的consumer都消费一次,CLUSTERING-集群模式,默认方式,只被消费一次。

  

switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break;}

可以通过setMessageModel方式设置不同模式;广播模式下同消费组的消费者相互独立,消费进度在本地单独进行存储;集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,消费进度是共享在整个消费组中的。

 

  **(5)consumeMessageService.start():**根据不同消息监听类型实例化并启动。这里有延时消息和顺序消息。

  这里主要讲下顺序消息,RocketMQ也帮我们实现了,在启动时,如果是集群模式并是顺序类型,它会启动定时任务,定时向broker发送批量锁,锁住当前顺序消费发往的消息队列,顺序消息因为生产者生产消息时指定了分片策略和消息上下文,只会发往一个消费队列。

  定时任务发送批量锁,锁住当前顺序消息队列。

  

public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }

发送锁住队列的消息至broker,broker端返回锁住成功的队列集合lockOKMQSet,顺序消息具体实现可查看后面第四节。

 

  **(6)mQClientFactory.registerConsumer():**MQClientInstance注册消费者,并启动MQClientInstance,没有注册成功会结束消费服务。

  **(7)mQClientFactory.start():**最后会启动如下服务:远程客户端、定时任务、pull消息服务、负载均衡服务、push消息服务,然后将状态改为运行中。

  

switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break;}

全部启动完毕后,整个消费者也就启动好了,接下来就可以对生产者发送过来的消息进行消费了,那么是如何进行消息消费的呢?不同的消息模式有何区别呢?

 

  

 

  

三、pull/push 模式消费

 

  

 

  

3.1 pull模式-DefaultMQPullConsumer

**pull拉取式消费:**应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用程序控制,可以指定消费的位移,【伪代码】如下所示:

 

  

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");// 设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 启动消费者实例consumer.start();//获取主题下所有的消息队列,这里根据主题从nameserver获取的Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");for (MessageQueue queue : mqs) { //获取当前队列的消费位移,指定消费进度offset,fromstore:从broker中获取还是本地获取,true-broker long offset = consumer.fetchConsumeOffset(queue, true); PullResult pullResult = null; while (offset < pullResult.getMaxOffset()) { //第二个参数为tag,获取指定topic下的tag //第三个参数表示从哪个位移下开始消费消息 //第四个参数表示一次最大拉取多少个消息 try { pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32); } catch (Exception e) { e.printStackTrace(); System.out.println("pull拉取消息失败"); } //代码省略,记录消息位移 offset = pullResult.getNextBeginOffset(); //代码省略,这里为消费消息 }}

可以看到我们是主动拉取topic对应下的消息队列,然后遍历它们,获取当前消费进度并进行消费。

 

  

 

  

3.2 push模式-DefaultMQPushConsumer

该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高,现在一般推荐使用该方式,具体示例可以观看第一章开头的官方demo。

 

  它也是通过实现pull方式来实现的,首先,前面2.4消费者启动之后,最后会启动拉取消息服务pullMessageService和负载均衡rebalanceService服务,它们启动后会一直有线程进行消费。

  

case CREATE_JUST: //...... // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); //....... this.serviceState = ServiceState.RUNNING; break; case RUNNING:

这里面调用doRebalance()方法,进行负载均衡,默认每20s做一次,会轮询所有订阅该实例的topic。

 

  

public class RebalanceService extends ServiceThread { //初始化,省略.... @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); //做负载均衡 this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return RebalanceService.class.getSimpleName(); }}

然后根据每个topic,以及它是否顺序消息模式来做rebalance。

 

  具体做法就是先对Topic下的消息消费队列、消费者Id进行排序,然后用消息队列的平均分配算法,计算出待拉取的消息队列,将分配到的消息队列集合与processQueueTable做一个过滤比对,新队列不包含或已过期,则进行移除 。

  

public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { /根据 /每个topic,以及它是否顺序消息模式来做rebalance this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }

rebalanceByTopic中广播和集群模式都会执行updateProcessQueueTableInRebalance()方法,最后会分发请求dispatchPullRequest,通过executePullRequestImmediately()方法将pull请求放入pull请求队列pullRequestQueue中,注意,pull模式下分发请求方法dispatchPullRequest()实际实现是一个空方法,这里两者很大不同,push模式实现如下

 

  

@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }

然后再PullMessageService中,因为前面consumer启动成功了,PullMessageService线程会实时去取pullRequestQueue中的pull请求。

 

  

@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }

取出来的pull请求又会经由DefaultMQPushConsumerImpl的消息监听类,调用pullMessage()方法。

 

  

private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }

pullMessage()中pullKernelImpl()有一个Pullback方法用于执行消息的回调,它会通过submitConsumeRequest()这个方法来处理消息,总而言之就是通过线程回调的方式让push模式下的监听器能够感知到。

 

  

//Pull回调PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: //省略...消费位移更新 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume);

这个方法对应的不同消费模式有着不同实现,但都是会构建一个消费请求ConsumeRequest,里面有一个run()方法,构建完毕后,会把它放入到listener监听器中。

 

  

//监听消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

还记得前面我们样例给出的注册监听器回调处理方法吗?

 

  我们可以点击上面的consumeMessage方法,查看它在源码中的实现位置,发现它就回到了我们前面的2.3注册回调实现类里面了,整个流程是不是通顺了呢?这个监听器中就会收到push的消息,拉取出来进行业务消费逻辑,下面是我们自己定义的消息回调处理方法。

  

// 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

 

  

3.3 小结

push模式相比较于pull模式不同的是,做负载均衡时,pullRequest请求会放入pullRequestQueue,然后PullMessageService线程会实时去取出这个请求,将消息存入ProcessQueue,通过线程回调的方式让push模式下的监听器能够感知到,这样消息从分发请求到接收都是实时的,而pull模式是消费端主动去拉取指定消息的,需要指定消费进度。

 

  对于我们开发者来说,选取哪种模式实现我们的业务逻辑比较合适呢?别急,先让我们总结下他们的特点:

  共同点:

  

两者底层实际一样,push模式也是基于pull模式来实现的。

 

  pull模式需要我们通过程序主动通过consumer向broker拉消息,而消息的push模式则只需要我们提供一个listener监听,实时获取消息。

  

优点:

 

  

push模式采用长轮询阻塞的方式获取消息,实时性非常高;

 

  push模式rocketMQ处理了获取消息的细节,使用起来比较简单方便;

  pull模式可以指定消费进度,想消费多少就消费多少,灵活性大。

  

缺点:

 

  

push模式当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积;

 

  pull模式实时性很低,频率不好设置;

  拉取消息的间隔不好设置,太短则产生很多无效Pull请求的RPC开销,影响MQ整体的网络性能,太长则实时性差。

  

适用场景:

 

  

对于服务端生产消息数据比较大时,而消费端处理比较复杂,消费能力相对较低时,这种情况就适用pull模式;

 

  对于数据实时性要求高的场景,就比较适用与push模式。

  

现在的你是否明确业务中该使用哪种模式了呢?

 

  

 

  

四、顺序消息

 

  

4.1 实现MQ顺序消息发送存在问题

(1)一般消息发送会采取轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,broker之间是无感知的,这种情况发送和消费是不能保证顺序。

 

  (2)异步方式发送消息时,发送的时候不是按着一条一条顺序发送的,保证不了消息到达Broker的时间也是按照发送的顺序来的。

  消息发送到存储,最后到消费要经历这么多步骤,我们该如何在业务中使用顺序消息呢?让咱们来一步步拆解下吧。

  

 

  

4.2 实现MQ顺序消息关键点

既然分散到多个broker上无法追踪顺序,那么可以控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。在发送时设置分片路由规则,让相同key的消息只落到指定queue上,然后消费过程中对顺序消息所在的queue加锁,保证消息的有序性,让这个queue上的消息就按照FIFO顺序来进行消费。因此我们满足以下三个条件是否就可以呢?

 

  **1)消息顺序发送:**多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性。

  

//采用的同步发送方式,在一个线程内顺序发送,异步发送方式为:producer.send(msg, new SendCallback() {...})SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

**2)消息顺序存储:**MQ 的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。对应到mq中,需要使用MessageQueueSelector来选择要发送的queue。即可以对业务编号设置路由规则,像根据队列数量对业务字段hash取余,将消息发送到一个queue中。

 

  

//使用"%"操作,使得订单id取余后相同的数据路由到同一个queue中,也可以自定义路由规则long index = id % mqs.size(); return mqs.get((int) index);

3)消息顺序消费:要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。这里RocketMQ已经为我们实现好了

 

  

List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } //....省略 } }

消费者重新负载,并且分配完消费队列后,需要向mq服务器发起消息拉取请求,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance()中,针对顺序消息的消息拉取,mq做了以上判断,即消费客户端先向broker端发起对messageQueue的加锁请求,只有加锁成功时才创建pullRequest进行消息拉取,这里的pullRequest就是前面pull和push模式消息体,而updateProcessQueueTableInRebalance这个方法也是在前面消费者启动过程中有讲到过哦。

 

  具体加锁逻辑如下:

  

public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { processQueue.setLocked(true);&nbs      

	  
	  
	  
	  
	  
	  
        

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: