RocketMQ消息存储文件的加载与恢复机制源码分析

作者:林师傅 时间:2021-12-29 20:23:19 

前言

前面文章我们介绍了Broker是如何将消息全量存储到CommitLog文件中,并异步生成dispatchRequest任务更新ConsumeQueue,IndexFile的过程以及ConsumeQueue和IndexFile的文件结构。由于是异步转发消息,就可能出现消息成功存储到CommitLog文件,转发请求任务执行失败,Broker宕机了,此时CommitLog和Index消息并未处理完,导致CommitLog与ConsumeQueue和IndexFile文件中的数据不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么这部分消息Consumer将永远无法消费到了,那么Broker是如何保证数据一致性的呢?

StoreCheckPoint介绍

StoreCheckPoint的作用是记录CommitLog,ConsumeQueue和IndexFile的刷盘点,当Broker异常结束时会根据StoreCheckPoint的数据恢复,StoreCheckPoint属性如下

public class StoreCheckpoint {
   // commitLog最后一条信息的刷盘时间戳
   private volatile long physicMsgTimestamp = 0;
   // consumeQueue最后一个存储单元刷盘时间戳
   private volatile long logicsMsgTimestamp = 0;
   // 最近一个已经写完IndexFile的最后一条记录刷盘时间戳
   private volatile long indexMsgTimestamp = 0;
}

StoreCheckPoint文件的存储位置是${user.home}/store/checkpoint,文件的固定长度为4K,但StoreCheckPoint只占用了前24个字节,存储格式如下图所示

RocketMQ消息存储文件的加载与恢复机制源码分析

StoreCheckPoint时间戳更新时机

physicMsgTimestamp

FlushRealTimeService刷盘时更新

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
 // ...
 // 更新CommitLog刷盘时间戳
 if (storeTimestamp > 0) {        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
 }
}

GroupCommitService刷盘时更新

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
 // ...
 // 更新CommitLog刷盘时间戳
 if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
 }
}

logicsMsgTimestamp

ConsumeQueue保存消息存储单元时更新

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
 // ...
 // 如果consumeQueue保存成功,则更新ConsumeQueue存储点信息
 if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
 }
}

ConsumeQueue刷盘时更新并触发StoreCheckPoint刷盘

// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {
 // ...
 // 更新ConsumeQueue存储时间戳,并刷盘
 if (0 == flushConsumeQueueLeastPages) {
   if (logicsMsgTimestamp > 0) {
 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
   }
   // 更新存储点
   DefaultMessageStore.this.getStoreCheckpoint().flush();
 }
}

indexMsgTimestamp

// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile
public IndexFile getAndCreateLastIndexFile() {
 // 获取最新IndexFile,如果IndexFile已经满了,需要创建一个新的IndexFile
 if (indexFile == null) {
         indexFile =
             new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                 lastUpdateIndexTimestamp);
// 如果创建新的IndexFile成功,原IndexFile刷盘
     if (indexFile != null) {
         final IndexFile flushThisFile = prevIndexFile;
         Thread flushThread = new Thread(new Runnable() {
             @Override
             public void run() {
               // indexFile刷盘
                 IndexService.this.flush(flushThisFile);
             }
         }, "FlushIndexFileThread");
         flushThread.setDaemon(true);
         flushThread.start();
     }
 }
 return indexFile;
}
// org.apache.rocketmq.store.index.IndexService#flush
public void flush(final IndexFile f) {
   if (null == f)
       return;
   long indexMsgTimestamp = 0;
   if (f.isWriteFull()) {
       indexMsgTimestamp = f.getEndTimestamp();
   }
   f.flush();
   if (indexMsgTimestamp > 0) {
       // 更新checkPoint的indexMsgTimestamp并触发刷盘
       this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
       this.defaultMessageStore.getStoreCheckpoint().flush();
   }
}
  • 保存消息Index,获取最新的IndexFile如果满了,则会创建一个新的IndexFile,并且更新IndexMsgTimestamp并触发StoreCheckPoint刷盘

StoreCheckPoint刷盘源码

StoreCheckPoint刷盘源码如下所示,就是将CommitLog,ConsumeQueue和IndexFile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机

  • ConsumeQueue刷盘时触发

  • 创建新IndexFile文件时触发

StoreCheckPoint刷盘源码如下

// org.apache.rocketmq.store.StoreCheckpoint#flush
public void flush() {
   this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
   this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
   this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
   this.mappedByteBuffer.force();
}

消息加载源码分析

在BrokerController启动时会调用DefaultMessageStore#load加载存储文件加载和恢复过程主要分为下面几步

  • 判断Broker上次是否正常退出。这个判断逻辑是根据${user.home}/store/abort是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。

  • 加载CommitLog

  • 加载ConsumeQueue

  • 加载StoreCheckPoint

  • 加载IndexFile

  • 恢复ConsumeQueue与IndexFile

  • 加载延迟队列服务

// org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {
   boolean result = true;
   try {
     // 1. Broker上次是否正常退出
       boolean lastExitOK = !this.isTempFileExist();
       log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
       // 2. 加载commitLog
       result = result && this.commitLog.load();
// 3. 加载consumeQueue
       result = result && this.loadConsumeQueue();
       if (result) {
           // 4. 加载StoreCheckPoint
           this.storeCheckpoint =
               new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
           // 5. 加载IndexFile
           this.indexService.load(lastExitOK);
           // 6. 恢复ConsumeQueue与IndexFile
           this.recover(lastExitOK);
// 7. 延迟队列服务加载
           if (null != scheduleMessageService) {
               result =  this.scheduleMessageService.load();
           }
       }
   }
   return result;
}

CommitLog加载

前面文章介绍过,CommitLog文件的存储目录是${user.home}/store/commitlog/,并且CommitLog文件的底层是MappedFile,由MappedFileQueue管理。

RocketMQ消息存储文件的加载与恢复机制源码分析

CommitLog文件的加载其实调用的是MappedFileQueue#load方法,代码如下所示,load()中首先加载CommitLog文件目录下的所有文件,并调用doLoad()方法加载CommitLog。

// org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {
   File dir = new File(this.storePath/*${user.home}/store/commitlog/*/);
   File[] ls = dir.listFiles();
   if (ls != null) {
       return doLoad(Arrays.asList(ls));
   }
   return true;
}

MappedFile的加载过程如下所示,核心逻辑主要分为下面三步

  • 按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中

  • 校验文件大小与mappedFile是否一致,如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改

  • 创建mappedFile,并且设置wrotePosition,flushedPosition,committedPosition为mappedFileSize

public boolean doLoad(List<File> files) {
   // 按照文件名称排序
   files.sort(Comparator.comparing(File::getName));
   for (File file : files) {
     // 如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改
       if (file.length() != this.mappedFileSize) {
           return false;
       }
       try {
         // 创建MappedFile
           MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
           mappedFile.setWrotePosition(this.mappedFileSize);
           mappedFile.setFlushedPosition(this.mappedFileSize);
           mappedFile.setCommittedPosition(this.mappedFileSize);
           this.mappedFiles.add(mappedFile);
       }
   }
   return true;
}

看到这里肯定会有疑问,加载后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都为mappedFileSize,如果最后一个MappedFile没有使用完,Broker启动后还会从最后一个MappedFile开始写么?我们可以在后面消息文件恢复源码分析找到答案。

ConsumeQueue加载

从前面文章我们知道,ConsumeQueue文件底层其实也是MappedFile,因此ConsumeQueue文件的加载与CommitLog加载差别不大。ConsumeQueue加载逻辑为

  • 获取ConsumeQueue目录下存储的所有Topic目录,遍历Topic目录

  • 遍历每个Topic目录下的所有queueId目录,逐个加载ququeId中的所有MappedFile

// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue
private boolean loadConsumeQueue() {
 // 获取consumeQueue目录
 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */));
 // topic文件夹数组
 File[] fileTopicList = dirLogic.listFiles();
 if (fileTopicList != null) {
     // 遍历topic
     for (File fileTopic : fileTopicList) {
         // 获取topic名称
         String topic = fileTopic.getName();
         // 获取queueId文件夹数组
         File[] fileQueueIdList = fileTopic.listFiles();
         // 遍历queueId
         if (fileQueueIdList != null) {
             for (File fileQueueId : fileQueueIdList) {
                 int queueId;
                 // 文件夹名称就是queueId
                 queueId = Integer.parseInt(fileQueueId.getName());
                 // 构建consumeQueue
                 ConsumeQueue logic = new ConsumeQueue(/* ... */);
                 this.putConsumeQueue(topic, queueId, logic);
                 // ConsumeQueue加载
                 if (!logic.load()) {
                     return false;
                 }
             }
         }
     }
 }
 return true;
}

IndexFile加载

IndexFile文件加载过程调用的是IndexService#load,首先获取${user.home}/store/index目录下的所有文件,遍历所有文件,如果IndexFile最后存储时间大于StoreCheckPoint中indexMsgTimestamp,则会先删除IndexFile

// org.apache.rocketmq.store.index.IndexService#load
public boolean load(final boolean lastExitOK) {
   // indexFile文件目录
   File dir = new File(this.storePath);
   // indexFile文件列表
   File[] files = dir.listFiles();
   if (files != null) {
       // 文件排序
       Arrays.sort(files);
       for (File file : files) {
           try {
               IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
               f.load();
               if (!lastExitOK) {
                   // 文件最后存储时间戳大于刷盘点,则摧毁indexFile,重建
                   if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存储点时间*/
                       .getIndexMsgTimestamp()) {
                       f.destroy(0);
                       continue;
                   }
               }
               this.indexFileList.add(f);
           }
       }
   }
   return true;
}

ConsumeQueue与IndexFile恢复

如果是正常退出,数据都已经正常刷盘,前面我们说到CommitLog在加载时的wrotePosition,flushedPosition,committedPosition都设置为mappedFileSize,

因此即使是正常退出,也会调用CommitLog#recoverNormally找到最后一条消息的位置,更新这三个属性。

// org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
   // consumeQueue中最大物理偏移量
   long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
   if (lastExitOK) {
       // 正常退出文件恢复
       this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
   } else {
       // 异常退出文件恢复
       this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
   }
   // 恢复topicQueueTable
   this.recoverTopicQueueTable();
}

正常恢复的源码如下,由于Broker是正常关闭,因此CommitLog,ConsumeQueue与IndexFile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盘点(flushWhere)和提交点(committedWhere),

  • 从最后3个mappedFile开始恢复,如果mappedFile总数不足3个,则从第0个mappedFile开始恢复

  • 逐个遍历mappedFile,找到每个MappedFile的最后一条消息的偏移量,并将其更新到CommitLog中MappedFileQueue的刷盘点和提交点中

  • 清除ConsumeQueue冗余数据

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
       // 确认消息是否完整,默认是true
       boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
       final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
       if (!mappedFiles.isEmpty()) {
           // 默认从最后3个mappedFile开始恢复
           int index = mappedFiles.size() - 3;
           // 如果commitLog不足三个,则从第一个文件开始恢复
           if (index < 0)
               index = 0;
           MappedFile mappedFile = mappedFiles.get(index);
           ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
           // 最后一个MappedFile的文件起始偏移量
           long processOffset = mappedFile.getFileFromOffset();
           // mappedFileOffset偏移量
           long mappedFileOffset = 0;
           // 遍历CommitLog文件
           while (true) {
               // 校验消息完整性
               DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
               // 获取消息size
               int size = dispatchRequest.getMsgSize();
               // 返回结果为true并且消息size>0,说明消息是完整的
               if (dispatchRequest.isSuccess() && size > 0) {
                   mappedFileOffset += size;
               }
           }
           // 最大物理偏移量
           processOffset += mappedFileOffset;
           // 更新flushedWhere和committedPosition指针
           this.mappedFileQueue.setFlushedWhere(processOffset);
           this.mappedFileQueue.setCommittedWhere(processOffset);
           this.mappedFileQueue.truncateDirtyFiles(processOffset);
           // 清除ConsumeQueue冗余数据
           if (maxPhyOffsetOfConsumeQueue >= processOffset) {
               this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/);
           }
       }
   }

异常恢复源码如下,由于上次Broker没有正常关闭,因此由可能存在CommitLog、ConsumeQueue与IndexFile不一致的情况,因此在异常恢复时可能需要恢复ConsumeQueue和IndexFile,异常恢复核心逻辑主要包括

  • 倒序查CommitLog的mappedFile文件,找到第一条消息存储的时间戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,该mappedFile至少有一部分消息是被正常转发,正常存储,正常刷盘的

  • 从该mappedFile开始逐条转发消息,重新恢复ConsumeQueue和IndexFile

  • 当遍历到最后一条消息,将其偏移量更新到CommitLog中MappedFileQueue的刷盘点和提交点中

  • 清除ConsumeQueue冗余数据

// org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
   // 是否CRC校验
   boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
   final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
   if (!mappedFiles.isEmpty()) {
       // 最后一个mappedFile的index
       int index = mappedFiles.size() - 1;
       MappedFile mappedFile = null;
       // 倒序遍历mappedFile数组,
       for (; index >= 0; index--) {
           mappedFile = mappedFiles.get(index);
           // 1. 如果第一条消息的时间戳小于存储点时间戳
           if (this.isMappedFileMatchedRecover(mappedFile)) {
               break;
           }
       }
       long processOffset = mappedFile.getFileFromOffset();
       long mappedFileOffset = 0;
       while (true) {
           DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
           int size = dispatchRequest.getMsgSize();
           if (dispatchRequest.isSuccess()) {
               if (size > 0) {
                   mappedFileOffset += size;
                   // 2. 转发消息
                   if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重复,默认是false*/) {
                       if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                           this.defaultMessageStore.doDispatch(dispatchRequest);
                       }
                   } else {
                       this.defaultMessageStore.doDispatch(dispatchRequest);
                   }
               }
       }
// 3. 更新MappedFileQueue中的刷盘位置和提交位置
       processOffset += mappedFileOffset;
       this.mappedFileQueue.setFlushedWhere(processOffset);
       this.mappedFileQueue.setCommittedWhere(processOffset);
       this.mappedFileQueue.truncateDirtyFiles(processOffset);
       // 清除ConsumeQueue中的冗余数据
       if (maxPhyOffsetOfConsumeQueue >= processOffset) {
           this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
       }
   }
}

来源:https://juejin.cn/post/7229324815191818298

标签:RocketMQ,消息存储,文件加载,恢复
0
投稿

猜你喜欢

  • java8 stream 如何打印数据元素

    2022-08-20 18:40:02
  • 浅谈java中守护线程与用户线程

    2023-11-26 20:46:41
  • spring 整合mybatis后用不上session缓存的原因分析

    2021-12-09 10:11:03
  • C#多线程之线程同步WaitHandle

    2022-08-10 10:16:12
  • Java日常练习题,每天进步一点点(63)

    2021-10-22 12:35:44
  • 详谈Java中的事件监听机制

    2022-08-05 00:29:30
  • JAVA面试题 简谈你对synchronized关键字的理解

    2022-09-17 17:06:05
  • android自动生成dimens适配文件的图文教程详解(无需Java工具类)

    2023-07-17 12:12:30
  • java 命名空间 命名规则第1/2页

    2021-06-06 14:20:19
  • springBoot+dubbo+zookeeper实现分布式开发应用的项目实践

    2021-11-22 02:03:25
  • 设计模式之责任链模式_动力节点Java学院整理

    2022-03-08 11:24:14
  • 简单了解4种分布式session解决方案

    2023-08-09 11:45:49
  • java堆排序概念原理介绍

    2021-08-30 12:31:58
  • J2SE基础之命令行中编写第一个 Hello World

    2023-10-05 09:43:55
  • seata的部署和集成详细介绍

    2023-06-05 08:33:24
  • 深入理解Java责任链模式实现灵活的请求处理流程

    2022-06-06 23:33:53
  • C#并行编程之信号量

    2023-10-08 19:26:54
  • Spring Security 实现用户名密码登录流程源码详解

    2023-05-31 11:13:59
  • Vue3源码解读effectScope API及实现原理

    2023-12-11 19:28:49
  • Java枚举类型enum的详解及使用

    2023-08-02 14:23:57
  • asp之家 软件编程 m.aspxhome.com