本篇内容介绍了“RocketMQ broker文件清理源码分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1. broker 清
本篇内容介绍了“RocketMQ broker文件清理源码分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ中哪些文件是一直在往里面写入东西的,最容易想到的就是commitlog
了,因为在一个broker 进程中,所有的普通消息,事务消息,系统消息啥的都往这个commitlog
中写,随着时间的越来越长,然后commitlog
就会越积攒越多,肯定会有磁盘放不下的那一天,而且我们消息消费完成后,那些被消费完成后的消息其实作用就很小了,可能会有这么一个场景,比如说我线上出现了某个问题,我想看下关于这个问题的消息有没有被消费到,可能你会用到这个消息,但是这种问题一般就是比较紧急的,最近实效的,之前那些消息其实作用就基本没有了,所以就需要清理掉之前的消息。其实不光commitlog
需要清理,还需要清理一下ConsumeQueue
与indexFile
, 因为你commitlog
里面的消息都被清理了,ConsumeQueue
与indexFile
再保存着之前的一些数据,就是纯粹浪费空间了。
所以说 broker
文件清理主要是清理commitlog , ConsumeQueue , indexFile
。
我们介绍下RocketMQ
文件清理的机制,RocketMQ
默认是清理72小时之前的消息
,然后它有几个触发条件, 默认是凌晨4点触发清
理, 除非你你这个磁盘空间占用到75%
以上了。在清理commitlog
的时候,并不是一条消息一条消息的清理,拿到所有的MappedFile
(抛去现在还在用着的,也就是最后一个) ,然后比对每个MappedFile
的最后一条消息的时间,如果是72小时之前
的就把MappedFile
对应的文件删除了,销毁对应MappedFile
,这种情况的话只要你MappedFile
最后一条消息还在存活实效内的话
,它就不会清理你这个MappedFile
,就算你这个MappedFile
靠前的消息过期了。但是有一种情况它不管你消息超没超过72小时,直接就是删,那就是磁盘空间不足的时候,也就是占了85%
以上了,就会立即清理。
清理完成commitlog
之后,就会拿到commitlog
中最小的offset
,然后去ConsumeQueue
与indexFile
中把小于offset
的记录删除掉。清理ConsumeQueue
的时候也是遍历MappedFile
,然后它的最后一条消息(unit)小于commitlog
中最小的offset
的话,就说明这个MappedFile
都小于offset
,因为他们是顺序追加写的,这个MappedFile 就会清理掉,如果你MappedFile
最后一个unit不是小于offset
的话,这个MappedFile
就不删了。
我们来看下源码是怎样实现的: 在broker 存储器DefaultMessageStore
启动(start)的时候,会添加几个任务调度,其中有一个就是文件清理的:
private void addScheduleTask() { // todo 清理过期文件 每隔10s this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // todo DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); ...}
默认是10s
执行一次,可以看到它调用了DefaultMessageStore
的cleanFilesPeriodically
方法:
private void cleanFilesPeriodically() { // todo 清除CommitLog文件 this.cleanCommitLogService.run(); // todo 清除ConsumeQueue文件 this.cleanConsumeQueueService.run();}
我们先来看下关于commitlog
的清理工作:
public void run() { try { // todo 删除过期文件 this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); }}
我们看下deleteExpiredFiles
方法的实现:
private void deleteExpiredFiles() { int deleteCount = 0; // 文件保留时间,如果超过了该时间,则认为是过期文件,可以被删除 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); // 删除物理文件的间隔时间,在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); // 在清除过期文件时,如 //果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会 //阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间 //戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能 //保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时 //间后,会将引用次数设置为负数,文件将被强制删除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); // 指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在 //固定时间执行一次删除过期文件操作,默认凌晨4点 boolean timeup = this.isTimeToDelete(); // todo 检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作 boolean spacefull = this.isSpaceToDelete(); // 预留手工触发机制,可以通过调用excuteDeleteFilesManualy //方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发 //文件删除的命令 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; // todo 文件的销毁和删除 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } }}
开始几个参数,一个是文件保留实效默认是72小时
,你可以使用fileReservedTime
来配置,一个是删除文件的间隔100ms
,再就是强行销毁MappedFile
的120s
(这个为啥要强行销毁,因为它还害怕还有地方用着这个MappedFile,它有个专门的引用计数器,比如说我还有地方要读它的消息,这个时候计数器就是+1的)。
接着就是判断到没到删除的那个时间,它默认是凌晨4点才能删除
:
private boolean isTimeToDelete() { // 什么时候删除,默认是凌晨4点 -> 04 String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); // 判断是不是到点了 就是判断的当前小时 是不是等于 默认的删除时间 if (UtilAll.isItTimeToDo(when)) { DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); return true; } return false;}
再接着就是看看空间是不是充足,看看磁盘空间使用占比是什么样子的:
private boolean isSpaceToDelete() { // 表示CommitLog文件、ConsumeQueue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; // 表示是否需要立即执行清除过期文件的操作 cleanImmediately = false; { // 当前CommitLog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace方法获取文件所在磁盘分区的总容量, //通过File#getFreeSpace方法获取文件所在磁盘分区的剩余容量 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); // diskSpaceWarningLevelRatio:默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘为不可写,此时会拒绝写入新消息 // 如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,应该立即启动过期文件删除操作 if (physicRatio > diskSpaceWarningLevelRatio) { // 设置 磁盘不可写 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; //diskSpaceCleanForciblyRatio:默认0.85 如果磁盘分区使用超过该阈值,建议立即执行过期文件删除,但不会拒绝写入新消息 // 如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除 } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { // 设置 磁盘可以写入 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } // 如果当前磁盘使用率小于diskMaxUsedSpaceRatio,则返回false,表示磁盘使用率正常, // 否则返回true,需要执行删除过期文件 if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } } ... return false;}
这里其实不光是判断 commitlog
的存储区域,后面还有段判断ConsumeQueue
的存储区域的,然后与这块逻辑一样,就没有放上。这里就是获取默认的最大使用占比 就是75%
,接着就是看看commitlog
存储的那地方使用了多少了,如果是使用90%
了,就设置runningFlag
说磁盘满了,立即清理设置成true
,这个参数设置成true之后,就不会管你消息有没有超过72小时,如果你使用了85%
以上了,也是设置立即清理,如果超过75%
返回true。好了,磁盘占用空间这块我们就看完了。
接着看上面deleteExpiredFiles
方法实现,还有一个手动清除
的,这块我没有找到哪里有用到的,如果后续找到,会补充上, 判断 到了清理的点 或者是磁盘空间满了 或者是手动删除了,满足一个条件就ok了,如果是立即清除是个true,它这里这个cleanAtOnce
变量就是true了,因为前面那个强制清理是默认开启
的。
接着计算了一下fileReservedTime
就是将小时转成了毫秒,为了后面好比对,最后就是调用commitlo
g的deleteExpiredFile
方法清理了:
public int deleteExpiredFile( final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { // todo return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);}
可以看到commitlog
对象调用mappedFileQueue
的deleteExpiredFileByTime
方法来处理的,这个mappedFileQueue
就是管理了一堆MappedFile
:
public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { // 拿到mappedFile的引用 Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; int mfsLength = mfs.length - 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; // 计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认72小时) long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; // 如果当前时间大于文件的最大存活时间 或 需要强制删除文件(当磁盘使用超过设定的阈值)时 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { // todo 执行destroy方法 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; // 一批 最多删除10 个 if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } // 删除间隔 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } // todo 统一执行File#delete方法将文件从物理磁盘中删除 deleteExpiredFile(files); return deleteCount;}
这里首先是拿到所有MappedFile
的引用,然后就是遍历了,可以看到它这个length是-1的,也就是最后一个MappedFile
是遍历不到的,这个是肯定的,因为最后一个MappedFile
肯定是在用着的,如果你来个强制清理,一下清理了,就没法提供服务了。
遍历的时候,拿到对应MappedFile
里面最后一条消息,看看它的写入时间是不是已经过了这个过期时间了,或者直接强制删除,就会执行MappedFile
的销毁方法,而且带着销毁时间:
public boolean destroy(final long intervalForcibly) { // todo this.shutdown(intervalForcibly); // 清理结束 if (this.isCleanupOver()) { try { // 关闭文件通道, this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 删除物理文件 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeElapsedTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false;}public void shutdown(final long intervalForcibly) { // 关闭MappedFile if (this.available) { this.available = false; // 初次关闭的时间戳 this.firstShutdownTimestamp = System.currentTimeMillis(); // todo 尝试释放资源 this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } }}
这里就不详细说了,其实就是shutdown
,然后过了120s后强制把引用清了
,之后就是关闭channel
,删除对应文件。
接着往下说,就是销毁成功了,会记录删除数量,判断删了多少了,一批是最多删10个的,这块应该是怕影响性能的,你一直删的的话,这东西很消耗磁盘性能,容易影响其他写入,读取功能,如果你销毁失败,直接就停了。最后就是将删除的这些MappedFile
从MappedFileQueue
中删除掉。再回到commitlog clean service
的run
方法:
public void run() { try { // todo 删除过期文件 this.deleteExpiredFiles(); // todo this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); }}
我们deleteExpiredFiles
方法已经介绍完了,然后再来看看第二个方法是干嘛的,这个其实就是判断第一个MappedFile
还可不可用了,如果不可用的话,就删了,这块有可能是上面 deleteExpiredFiles
方法MappedFile
销毁失败,然后设置了不可用,但是没有清理掉,所以这块再来善后下:
private void redeleteHangedFile() { // redeleteHangedFileInterval间隔 默认1000*120 int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); // 当前时间戳 long currentTimestamp = System.currentTimeMillis(); if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { this.lastRedeleteTimestamp = currentTimestamp; // 获取强制销毁Mapped文件间隔 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); // todo 重新删除第一个MappedFile if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { } }}public boolean retryDeleteFirstFile(final long intervalForcibly) { // 获取到 第一个mappedFile MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { // 不可用 if (!mappedFile.isAvailable()) { log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName()); // 销毁 boolean result = mappedFile.destroy(intervalForcibly); if (result) { log.info("the mappedFile re delete OK, " + mappedFile.getFileName()); List<MappedFile> tmpFiles = new ArrayList<MappedFile>(); tmpFiles.add(mappedFile); this.deleteExpiredFile(tmpFiles); } else { log.warn("the mappedFile re delete failed, " + mappedFile.getFileName()); } return result; } } return false;}
这块就是看第一个MappedFile
还可不可用,不可用的话,就销毁掉。好了commitlog
文件清理源码就解析完成了。接下来看下这个ConsumeQueue与indexFile
的清理。
private void cleanFilesPeriodically() { // todo 清除CommitLog文件 this.cleanCommitLogService.run(); // todo 清除ConsumeQueue文件 this.cleanConsumeQueueService.run();}
DefaultMessageStore.CleanConsumeQueueService#run:
public void run() { try { // 删除 过期的file this.deleteExpiredFiles(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); }}
接下来DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() { // 删除间隔 100 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); // 获取 commitLog 的最小offset long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { // 上次 清理 到哪了 this.lastPhysicalMinOffset = minOffset; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; // 遍历删除 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { // 进行删除 int deleteCount = logic.deleteExpiredFile(minOffset); // 间隔 if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { Thread.sleep(deleteLogicsFilesInterval); } catch (InterruptedException ignored) { } } } } // todo 删除 过期的 indexFile DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); }}
首先是获取删除间隔,然后拿到commitlog
中最小的那个offset
,接着就是判断上次清理位置与最小offset
比较,如果offset
大于它上次清理的位置的话,就说明 它得把最小offset
之前的清理掉。先是记录最后一次清理的offset
是最小offset
, 接着就是遍历所有的ConsumeQueue
,调用每个ConsumeQueue
的 deleteExpiredFile
方法来清理,我们来看下这个方法:
public int deleteExpiredFile(long offset) { // 进行销毁 然后得到销毁个数 int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE); // 纠正最小偏移量 this.correctMinOffset(offset); return cnt;}
CQ_STORE_UNIT_SIZE
这个就是每个unit
占20个字节,见。
public int deleteExpiredFileByOffset(long offset, int unitSize) { Object[] mfs = this.copyMappedFiles(0); List<MappedFile> files = new ArrayList<MappedFile>(); int deleteCount = 0; if (null != mfs) { int mfsLength = mfs.length - 1; for (int i = 0; i < mfsLength; i++) { boolean destroy; MappedFile mappedFile = (MappedFile) mfs[i]; // 最后一个单元位置到这个MappedFile结束,其实就是获取最后一个单元 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); if (result != null) { // 获取最大的offset long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); result.release(); // 判断是否销毁 如果小于offset 就要销毁 destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " + maxOffsetInLogicQueue + ", delete it"); } } else if (!mappedFile.isAvailable()) { // Handle hanged file. log.warn("Found a hanged consume queue file, attempting to delete it."); destroy = true; } else { log.warn("this being not executed forever."); break; } // 进行销毁 if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; } else { break; } } } // 删除引用 deleteExpiredFile(files); return deleteCount;}
它的删除跟commitlog
的差不多,只不过commitlog 是根据时间来判断的,它是根据commitlog 的offset 来判断的,判断要不要删除这个MappedFile,如果这个MappedFile最后一个unit 存储的offset 小于 commitlog 最小的offset 的话就要销毁了。接着就是销毁,超时时间是1分钟,最后是删除引用。
最后我们来看下 indexFile
的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() { // 删除间隔 100 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); // 获取 commitLog 的最小offset long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { ... // todo 删除 过期的 indexFile DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); }}
public void deleteExpiredFile(long offset) { Object[] files = null; try { // 获取读锁 this.readWriteLock.readLock().lock(); if (this.indexFileList.isEmpty()) { return; } // 获取第一个indexFile 的一个offset long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset(); if (endPhyOffset < offset) { files = this.indexFileList.toArray(); } } catch (Exception e) { log.error("destroy exception", e); } finally { this.readWriteLock.readLock().unlock(); } if (files != null) { // 找到需要删除的indexFile List<IndexFile> fileList = new ArrayList<IndexFile>(); for (int i = 0; i < (files.length - 1); i++) { IndexFile f = (IndexFile) files[i]; if (f.getEndPhyOffset() < offset) { fileList.add(f); } else { break; } } // 删除 this.deleteExpiredFile(fileList); }}
可以看到,先是拿第一个indexFile
看看有没有小于commitlog
最小offset
的情况发生,这里也是拿的indexFile
最后一个offset
做的对比,因为这块也是按照offset
大小 前后顺序处理的,最后一个的offest
肯定是这个indexFile
中最大的了,如果第一个indexFile
满足了的话,就会拿到所有引用,然后遍历找出符合条件的indexFile
, 调用deleteExpiredFile
方法遍历销毁:
private void deleteExpiredFile(List<IndexFile> files) { if (!files.isEmpty()) { try { this.readWriteLock.writeLock().lock(); for (IndexFile file : files) { // 销毁 boolean destroyed = file.destroy(3000); // 从index 集合中移除 destroyed = destroyed && this.indexFileList.remove(file); if (!destroyed) { log.error("deleteExpiredFile remove failed."); break; } } } catch (Exception e) { log.error("deleteExpiredFile has exception.", e); } finally { this.readWriteLock.writeLock().unlock(); } }}
这里就是遍历销毁,然后移除对这个indexFile
管理。
“RocketMQ broker文件清理源码分析”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!
--结束END--
本文标题: RocketMQ broker文件清理源码分析
本文链接: https://lsjlt.com/news/355274.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0