通过源码分析RocketMQ主从复制原理(rocketmq主从同步策略)

  本篇文章为你整理了通过源码分析RocketMQ主从复制原理(rocketmq主从同步策略)的详细内容,包含有rocketmq主从切换原理 rocketmq主从同步策略 rocketmq多主多从 rocketmq同步复制原理 通过源码分析RocketMQ主从复制原理,希望能帮助你了解 通过源码分析RocketMQ主从复制原理。

   RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,具有高性能、高可靠、高实时、分布式特点。本篇文章通关源码分析深入地介绍了RocketMQ主从复制原理。

  
RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。

  CommitLog的消息复制是发生在消息写入时,当消息写完Broker Master时,会通过单独的线程,将消息写入到从服务器,在写入的时候支持同步写入、异步写入两种方式。

  Broker元数据的写入,则是Broker从服务器通过单独的线程每隔10s从主Broker上获取,然后更新从的配置,并持久化到相应的配置文件中。

  RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。

  
2.1、整体概述

  CommitLog主从复制的流程如下:

  1.Producer发送消息到Broker Master,Broker进行消息存储,并调用handleHA进行主从同步;

  2.如果是同步复制的话,参考2.6章节的同步复制;

  3.如果是异步复制的话,流程如下:

  

1. Broker Master启动,并在指定端口监听;

 

  2. Broker Slave启动,主动连接Broker Master,通过Java NIO建立TCP连接;

  3. Broker Slave以每隔5s的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地CommitLog文件中最大的偏移量,以该偏移量向服务端拉取消息

  4. Broker Master 解析请求,并返回数据给Broker Slave;

  5.Broker Slave收到一批消息后,将消息写入本地CommitLog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;

  

 

  我们先看下异步复制的整体流程,最后再看下同步复制的流程,异步复制的入口为HAService.start();

  

public void start() throws Exception {

 

   //broker master启动,接收slave请求,并处理

   this.acceptSocketService.beginAccept();

   this.acceptSocketService.start();

   //同步复制线程启动

   this.groupTransferService.start();

   //broker slave启动

   this.haClient.start();

  

 

  下面分别对上面的每一步做详细说明。

  2.2、HAService Master启动

  

public void beginAccept() throws Exception {

 

   this.serverSocketChannel = ServerSocketChannel.open();

   this.selector = RemotingUtil.openSelector();

   this.serverSocketChannel.socket().setReuseAddress(true);

   this.serverSocketChannel.socket().bind(this.socketAddressListen);

   this.serverSocketChannel.configureBlocking(false);

   this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);

  

 

  在beginAccept方法中主要创建了ServerSocketChannel、Selector、设置TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册OP_ACCEPT(连接事件)。可以看到在这里是通过Java原生的NIO来实现的,并没有通过Netty框架来实现。

  acceptSocketService.start()启动方法代码如下:

  

while (!this.isStopped()) {

 

   try {

   //获取事件

   this.selector.select(1000);

   Set SelectionKey selected = this.selector.selectedKeys();

   if (selected != null) {

   for (SelectionKey k : selected) {

  //处理OP_ACCEPT事件,并创建HAConnection

   if ((k.readyOps() SelectionKey.OP_ACCEPT) != 0) {

   SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

   if (sc != null) {

   HAConnection conn = new HAConnection(HAService.this, sc);

   //主要是启动readSocketService,writeSocketService这两个线程

   conn.start();

   HAService.this.addConnection(conn);

   selected.clear();

   } catch (Exception e) {

   log.error(this.getServiceName() + " service has exception.", e);

  

 

  选择器每1s处理一次处理一次连接就绪事件。连接事件就绪后,调用ServerSocketChannel的accept()方法创建SocketChannel,与服务端数据传输的通道。然后为每一个连接创建一个HAConnection对象,该HAConnection将负责Master-Slave数据同步逻辑。HAConnection.start方法如下:

  

public void start() {

 

   this.readSocketService.start();

   this.writeSocketService.start();

  

 

  2.3、HAClient启动

  

while (!this.isStopped()) {

 

   try {

   //和broker master建立连接,通过java nio来实现

   if (this.connectMaster()) {

   //在心跳的同时,上报offset

   if (this.isTimeToReportOffset()) {

   //上报offset

   boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);

   if (!result) {

   this.closeMaster();

   this.selector.select(1000);

   //处理网络读请求,也就是处理从Master传回的消息数据

   boolean ok = this.processReadEvent();

   if (!ok) {

   this.closeMaster();

   if (!reportSlaveMaxOffsetPlus()) {

   continue;

   long interval =

   HAService.this.getDefaultMessageStore().getSystemClock().now()

   - this.lastWriteTimestamp;

   if (interval HAService.this.getDefaultMessageStore().getMessageStoreConfig()

   .getHaHousekeepingInterval()) {

   log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress

   + "] expired, " + interval);

   this.closeMaster();

   log.warn("HAClient, master not response some time, so close connection");

   } else {

   this.waitForRunning(1000 * 5);

   } catch (Exception e) {

   log.warn(this.getServiceName() + " service has exception. ", e);

   this.waitForRunning(1000 * 5);

  

 

  2.3.1、HAService主从建立连接

  如果socketChannel为空,则尝试连接Master,如果Master地址为空,返回false。

  

private boolean connectMaster() throws ClosedChannelException {

 

   if (null == socketChannel) {

   String addr = this.masterAddress.get();

   if (addr != null) {

   SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);

   if (socketAddress != null) {

   this.socketChannel = RemotingUtil.connect(socketAddress);

   if (this.socketChannel != null) {

   //注册读事件,监听broker master返回的数据

   this.socketChannel.register(this.selector, SelectionKey.OP_READ);

   //获取当前的offset

   this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

   this.lastWriteTimestamp = System.currentTimeMillis();

   return this.socketChannel != null;

  

 

  Broker 主从连接

  Broker Slave通过NIO来进行Broker Master连接,代码如下:

  

SocketChannel sc = null;

 

  sc = SocketChannel.open();

  sc.configureBlocking(true);

  sc.socket().setSoLinger(false, -1);

  sc.socket().setTcpNoDelay(true);

  sc.socket().setReceiveBufferSize(1024 * 64);

  sc.socket().setSendBufferSize(1024 * 64);

  sc.socket().connect(remote, timeoutMillis);

  sc.configureBlocking(false);

  

 

  Slave获取当前offset

  

public long getMaxPhyOffset() {

 

   return this.commitLog.getMaxOffset();

  public long getMaxOffset() {

   return this.mappedFileQueue.getMaxOffset();

  public long getMaxOffset() {

   MappedFile mappedFile = getLastMappedFile();

   if (mappedFile != null) {

   return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();

   return 0;

  

 

  可以看到最终还是通过读取MappedFile的position来获取从的offset。

  2.3.2、上报offset时间判断

  

private boolean isTimeToReportOffset() {

 

   //当前时间-上次写的时间

   long interval =

   HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;

   boolean needHeart = interval HAService.this.defaultMessageStore.getMessageStoreConfig()

   .getHaSendHeartbeatInterval();

  
 

 

  判断逻辑为当前时间-上次写的时间 haSendHeartbeatInterval时,则进行心跳和offset的上报。haSendHeartbeatInterval默认为5s,可配置。

  2.3.3、上报offset

  

private boolean reportSlaveMaxOffset(final long maxOffset) {

 

   this.reportOffset.position(0);

   this.reportOffset.limit(8);

   this.reportOffset.putLong(maxOffset);

   this.reportOffset.position(0);

   this.reportOffset.limit(8);

   //最多发送三次,reportOffset是否有剩余

   for (int i = 0; i 3 this.reportOffset.hasRemaining(); i++) {

   try {

   this.socketChannel.write(this.reportOffset);

   } catch (IOException e) {

   log.error(this.getServiceName()

   + "reportSlaveMaxOffset this.socketChannel.write exception", e);

   return false;

   return !this.reportOffset.hasRemaining();

  

 

  主要还是通过NIO发送请求。

  2.4、Broker Master处理请求

  在主从建立连接时创建了HAConnection对象,该对象主要包含了如下两个重要的线程服务类:

  

//负责写,将commitlog数据发送到从

 

  private WriteSocketService writeSocketService;

  //负责读,读取从上报的offset,并根据offset从Broker Master读取commitlog

  private ReadSocketService readSocketService;

  

 

  2.4.1、ReadSocketService接收读请求

  readSocketService.run方法如下:

  

while (!this.isStopped()) {

 

   try {

   this.selector.select(1000);

   //处理读事件

   boolean ok = this.processReadEvent();

   if (!ok) {

   HAConnection.log.error("processReadEvent error");

   break;

   long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;

   if (interval HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {

   log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);

   break;

   } catch (Exception e) {

   HAConnection.log.error(this.getServiceName() + " service has exception.", e);

   break;

  

 

  processReadEvent的逻辑如下:

  

int readSize = this.socketChannel.read(this.byteBufferRead);

 

  if (readSize 0) {

   readSizeZeroTimes = 0;

   this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

   if ((this.byteBufferRead.position() - this.processPostion) = 8) {

   int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);

   //获取slave 请求的offset

   long readOffset = this.byteBufferRead.getLong(pos - 8);

   this.processPostion = pos;

  
HAConnection.this.slaveAckOffset = readOffset;

   if (HAConnection.this.slaveRequestOffset 0) {

   HAConnection.this.slaveRequestOffset = readOffset;

   log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);

   //如果是同步复制的话,判断请求的offset是否push2SlaveMaxOffset相同,相同的话则唤醒master GroupTransferService

   HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);

  

 

  可以看到processReadEvent逻辑很简单,就是从ByteBuffer中解析出offset,然后设置HAConnection.this.slaveRequestOffset;

  2.4.2、WriteSocketService进行写处理

  Broker Master通过HAConnection.WriteSocketService进行CommitLog的读取,run方法主逻辑如下:

  

this.selector.select(1000);

 

  //nextTransferFromWhere下次传输commitLog的起始位置

  if (-1 == this.nextTransferFromWhere) {

   if (0 == HAConnection.this.slaveRequestOffset) {

   long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();

   masterOffset =

   masterOffset

   - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()

   .getMapedFileSizeCommitLog());

  
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr

   + "], and slave request " + HAConnection.this.slaveRequestOffset);

  
//获取commitLog数据

  SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);

  //获取commitLog数据

  SelectMappedBufferResult selectResult =

   HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);

  if (selectResult != null) {

   int size = selectResult.getSize();

   if (size HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {

   size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();

  
selectResult.getByteBuffer().limit(size);

   this.selectMappedBufferResult = selectResult;

  
//如果没有获取到commitLog数据,等待100ms

   HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(1

  

 

  这里面主要包括获取CommitLog数据、发送CommitLog数据这两个步骤。

  2.4.2.1、获取CommitLog数据

  

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {

 

   int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();

   MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);

   if (mappedFile != null) {

   int pos = (int) (offset % mappedFileSize);

   SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);

   return result;

   return null;

  public SelectMappedBufferResult selectMappedBuffer(int pos) {

   int readPosition = getReadPosition();

   if (pos readPosition pos = 0) {

   if (this.hold()) {

   ByteBuffer byteBuffer = this.mappedByteBuffer.slice();

   byteBuffer.position(pos);

   int size = readPosition - pos;

   ByteBuffer byteBufferNew = byteBuffer.slice();

   byteBufferNew.limit(size);

   return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);

   return null;

  

 

  可以看到最终还是根据offset从MappedFile读取数据。

  2.4.2.2、发送CommitLog数据

  数据主要包括header、body两部分,数据发送的话还是通过NIO来实现,主要代码如下:

  

// Build Header

 

  this.byteBufferHeader.position(0);

  this.byteBufferHeader.limit(headerSize);

  this.byteBufferHeader.putLong(thisOffset);

  this.byteBufferHeader.putInt(size);

  this.byteBufferHeader.flip();

  
if (!this.byteBufferHeader.hasRemaining()) {

   while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {

   int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());

   if (writeSize 0) {

   writeSizeZeroTimes = 0;

   this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();

   } else if (writeSize == 0) {

   if (++writeSizeZeroTimes = 3) {

   break;

   } else {

   throw new Exception("ha master write body error 0");

  

 

  CommitLog主从发送完成后,Broker Slave则会监听读事件、获取CommitLog数据,并进行CommitLog的写入。

  2.5、HAClient processReadEvent

  在主从建立连接后,从注册了可读事件,目的就是读取从Broker Master返回的CommitLog数据,对应的方法为HAClient.processReadEvent:

  

int readSize = this.socketChannel.read(this.byteBufferRead);

 

  if (readSize 0) {

   lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();

   readSizeZeroTimes = 0;

   boolean result = this.dispatchReadRequest();

   if (!result) {

   log.error("HAClient, dispatchReadRequest error");

   return false;

  

 

  dispatchReadRequest方法如下:

  

 //读取返回的body data

 

  byte[] bodyData = new byte[bodySize];

  this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);

  this.byteBufferRead.get(bodyData);

  
this.byteBufferRead.position(readSocketPos);

  this.dispatchPostion += msgHeaderSize + bodySize;

  
调用defaultMessageStore.appendToCommitLog方法,将数据写入到MappedFile文件,写入方法如下:

  

public boolean appendToCommitLog(long startOffset, byte[] data) {

 

   //将数据写到commitlog,同普通消息的存储

   boolean result = this.commitLog.appendData(startOffset, data);

   //唤醒reputMessageService,构建consumeQueue,index

   this.reputMessageService.wakeup();

   return result;

  

 

  上报从新的offset,也是读取MappedFile的offset,然后上报Broker Master;

  2.6、同步复制

  上面主要介绍了Broker的异步复制,下面再来看下Broker的同步复制的实现。同步复制的整体流程图如下:

  大概说明如下:

  producer发送消息到broker,broker进行消息的存储,将消息写入到commitLog;

  broker master写消息线程唤醒WriteSocketService线程,查询commitLog数据,然后发送到从。在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,会等待100ms。所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。

  broker master创建GroupCommitRequest,同步等待主从复制完成;

  从接受新的commitLog数据,然后写commitLog数据,并返回新的slave offset到主;

  主更新push2SlaveMaxOffset,并判断push2SlaveMaxOffset是否大于等于主从复制请求的offset,如果大于等于的话,则认为主从复制完成,返回commitLog.handleHA方法成功,从而返回消息保存成功。

  
对应的代码入口为CommitLog.handleHA方法。

  

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

 

   //如果是broker主,并且是同步复制的话

   if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {

   //获取HAService

   HAService service = this.defaultMessageStore.getHaService();

   //获取Message上的MessageConst.PROPERTY_WAIT_STORE_MSG_OK,默认是需要等待主从复制完成

   if (messageExt.isWaitStoreMsgOK()) {

   * 判断从是否可用,判断的逻辑是:(主offset-push2SlaveMaxOffset 1024 * 1024 * 256),也就是如果主从的offset差的太多,

   * 则认为从不可用, Tell the producer, slave not available

   * 这里的result = mappedFile.appendMessage(msg, this.appendMessageCallback);

   if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {

   //组装GroupCommitRequest,nextOffset=result.getWroteOffset() + result.getWroteBytes(),这里的nextOffset指的就是从要写到的offset

   GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

   * 调用的是this.groupTransferService.putRequest(request);将request放到requestsWrite list中。

   * HAService持有GroupTransferService groupTransferService引用;

   service.putRequest(request);

   * 唤醒的是WriteSocketService,查询commitLog数据,然后发送到从。

   * 在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,等待100ms

   * HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);

   * 所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。

   service.getWaitNotifyObject().wakeupAll();

  
//等待同步复制完成,判断逻辑是: HAService.this.push2SlaveMaxOffset.get() = req.getNextOffset();

   boolean flushOK =

   request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

  
if (!flushOK) {

   log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "

   + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());

   putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);

   // Slave problem

   else {

   // Tell the producer, slave not available

   putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);

  

 

  2.6.1、GroupTransferService启动

  在HAService启动的时候,启动了GroupTransferService线程,代码如下:

  

public void run() {

 

   while (!this.isStopped()) {

   this.waitForRunning(10);

   this.doWaitTransfer();

  private void doWaitTransfer() {

   synchronized (this.requestsRead) {

   if (!this.requestsRead.isEmpty()) {

   for (CommitLog.GroupCommitRequest req : this.requestsRead) {

   * req.getNextOffset:result.getWroteOffset() + result.getWroteBytes()

   * push2SlaveMaxOffset:

   boolean transferOK = HAService.this.push2SlaveMaxOffset.get() = req.getNextOffset();

   //在这循环5次,最多等待5s,因为slave 心跳间隔默认5s

   for (int i = 0; !transferOK i i++) {

   this.notifyTransferObject.waitForRunning(1000);

   transferOK = HAService.this.push2SlaveMaxOffset.get() = req.getNextOffset();

   if (!transferOK) {

   log.warn("transfer messsage to slave timeout, " + req.getNextOffset());

   //主从复制完成,唤醒handleHA后续操作

   req.wakeupCustomer(transferOK);

   this.requestsRead.clear();

  

 

  wakeupCustomer:

  

public void wakeupCustomer(final boolean flushOK) {

 

   this.flushOK = flushOK;

   this.countDownLatch.countDown();

  

 

  2.6.2、唤醒WriteSocketService

  service.getWaitNotifyObject().wakeupAll();

  唤醒的是WriteSocketService,查询commitLog数据,然后发送到从。在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,等待100ms。HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。

  2.6.3、同步等待,直到复制完成

  

boolean flushOK =

 

   request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

  
三、元数据的复制

  broker元数据的复制,主要包括topicConfig、consumerOffset、delayOffset、subscriptionGroup这几部分,整体流程图如下:

  从broker通过单独的线程,每隔10s进行一次元数据的复制 ,代码入口为:BrokerController.start → SlaveSynchronize.syncAll:

  

slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

 

   @Override

   public void run() {

   try {

   //10s 进行一次主从同步

   BrokerController.this.slaveSynchronize.syncAll();

   catch (Throwable e) {

   log.error("ScheduledTask SlaveSynchronize syncAll error.", e);

  }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

  
3.1、syncTopicConfig

  

//从Master获取TopicConfig信息,最终调用的是AdminBrokerProcessor.getAllTopicConfig

 

  TopicConfigSerializeWrapper topicWrapper =

   this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);

  if (!this.brokerController.getTopicConfigManager().getDataVersion()

   .equals(topicWrapper.getDataVersion())) {

   this.brokerController.getTopicConfigManager().getDataVersion()

   .assignNewOne(topicWrapper.getDataVersion());

   this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();

   this.brokerController.getTopicConfigManager().getTopicConfigTable()

   .putAll(topicWrapper.getTopicConfigTable());

   //将topicConfig进行持久化,对应的文件为topics.json

   this.brokerController.getTopicConfigManager().persist();

   log.info("Update slave topic config from master, {}", masterAddrBak)

  

 

  3.2、syncConsumerOffset

  

//从"主Broker"获取ConsumerOffset

 

  ConsumerOffsetSerializeWrapper offsetWrapper =

   this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);

  //设置从的offsetTable

  this.brokerController.getConsumerOffsetManager().getOffsetTable()

   .putAll(offsetWrapper.getOffsetTable());

  //并持久化到从的consumerOffset.json文件中

  this.brokerController.getConsumerOffsetManager().persist();

  

 

  3.3、syncDelayOffset

  

String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);

 

  String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController

  .getMessageStoreConfig().getStorePathRootDir());

   MixAll.string2File(delayOffset, fileName);

  

 

  3.4、syncSubscriptionGroupConfig

  

SubscriptionGroupWrapper subscriptionWrapper =this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak);

 

  SubscriptionGroupManager subscriptionGroupManager =this.brokerController.getSubscriptionGroupManager();

  subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());

  subscriptionGroupManager.getSubscriptionGroupTable().clear();

  subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable());

  subscriptionGroupManager.persist();

  

 

  四、思考与收获

  通过上面的分享,我们基本上了解了RocketMQ的主从复制原理,其中有些思想我们可以后续借鉴下:

  在功能设计的时候将元数据、程序数据分开管理;

  主从复制的时候,基本思想都是从请求主,请求时带上offset,然后主查询数据返回从,从再执行;mysql的主从复制、redis的主从复制基本也是这样;

  主从复制包括异步复制、同步复制两种方式,可以通过配置来决定使用哪种同步方式,这个需要根据实际业务场景来决定;

  主从复制线程尽量和消息写线程或者主线程分开;

  以上就是通过源码分析RocketMQ主从复制原理(rocketmq主从同步策略)的详细内容,想要了解更多 通过源码分析RocketMQ主从复制原理的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: