Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 9 additions & 31 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,6 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();

if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
if (mappedFileOffset > 0) {
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
lastValidMsgPhyOffset = dispatchFromPhyOffset;
byteBuffer.position((int) mappedFileOffset);
}
}
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
Expand Down Expand Up @@ -744,7 +735,7 @@ public long getLastFileFromOffset() {
/**
* @throws RocksDBException only in rocksdb mode
*/
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
public void recoverAbnormally(long dispatchFromPhyOffset) throws RocksDBException {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
Expand Down Expand Up @@ -779,18 +770,17 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
long lastValidMsgPhyOffset;
long lastConfirmValidMsgPhyOffset;

if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset();
if (defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
// Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur
if (mappedFileOffset < 0) {
mappedFileOffset = 0;
lastValidMsgPhyOffset = processOffset;
lastConfirmValidMsgPhyOffset = processOffset;
} else {
log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue);
lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
lastValidMsgPhyOffset = dispatchFromPhyOffset;
lastConfirmValidMsgPhyOffset = dispatchFromPhyOffset;
byteBuffer.position((int) mappedFileOffset);
}
} else {
Expand Down Expand Up @@ -933,27 +923,15 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
return false;
}

if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
return false;
}
log.info("CommitLog isMmapFileMatchedRecover find satisfied MmapFile for index, " +
"MmapFile storeTimestamp={}, MmapFile phyOffset={}, indexMsgTimestamp={}, recoverNormally={}",
storeTimestamp, phyOffset, this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp(), recoverNormally);
}

return isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
}

private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
boolean recoverNormally) throws RocksDBException {
boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
}
if (null != this.defaultMessageStore.getIndexRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) {
result = result && this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset);
// Check all registered CommitLogDispatchStore instances
for (CommitLogDispatchStore store : defaultMessageStore.getCommitLogDispatchStores()) {
result = result && store.isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;

import org.rocksdb.RocksDBException;

/**
* Interface for stores that require commitlog dispatch and recovery. Each store implementing this interface should
* register itself in the commitlog when loading. This abstraction allows the commitlog recovery process to
* automatically consider all registered stores without needing to modify the recovery logic when adding a new store.
*/
public interface CommitLogDispatchStore {

/**
* Get the dispatch offset in the store. Messages whose phyOffset larger than this offset need to be dispatched. The
* dispatch offset is only used during recovery.
*
* @param recoverNormally true if broker exited normally last time (normal recovery), false for abnormal recovery
* @return the dispatch phyOffset, or null if the store is not enabled or has no valid offset
* @throws RocksDBException if there is an error accessing RocksDB storage
*/
Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBException;

/**
* Used to determine whether to start doDispatch from this commitLog mappedFile.
*
* @param phyOffset the offset of the first message in this commitlog mappedFile
* @param storeTimestamp the timestamp of the first message in this commitlog mappedFile
* @param recoverNormally whether this is a normal recovery
* @return whether to start recovering from this MappedFile
* @throws RocksDBException if there is an error accessing RocksDB storage
*/
boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
boolean recoverNormally) throws RocksDBException;
}

Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
this.messageStore.getStoreCheckpoint().setTmpLogicsPhysicalOffset(request.getCommitLogOffset());
if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
multiDispatchLmqQueue(request, maxRetries);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public class DefaultMessageStore implements MessageStore {

private final LinkedList<CommitLogDispatcher> dispatcherList = new LinkedList<>();

/**
* List of stores that require commitlog dispatch and recovery. Each store registers itself when loading.
*/
private final List<CommitLogDispatchStore> commitLogDispatchStores = new ArrayList<>();

private final RandomAccessFile lockFile;

private FileLock lock;
Expand Down Expand Up @@ -333,6 +338,11 @@ public boolean load() {
// load Consume Queue
result = result && this.consumeQueueStore.load();
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_CONSUME_QUEUE_OK, result);
// Register consume queue store for commitlog dispatch
// AbstractConsumeQueueStore implements CommitLogDispatchStore, so we can register it directly
if (this.consumeQueueStore != null) {
registerCommitLogDispatchStore(this.consumeQueueStore);
}

if (messageStoreConfig.isEnableCompaction()) {
result = result && this.compactionService.load(lastExitOK);
Expand All @@ -342,7 +352,15 @@ public boolean load() {
if (result) {
loadCheckPoint();
result = this.indexService.load(lastExitOK);
registerCommitLogDispatchStore(this.indexService);
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_INDEX_OK, result);
// Register IndexRocksDBStore and TransMessageRocksDBStore for commit-log dispatch
if (messageStoreConfig.isIndexRocksDBEnable()) {
registerCommitLogDispatchStore(this.indexRocksDBStore);
}
if (messageStoreConfig.isTransRocksDBEnable() && transMessageRocksDBStore != null) {
registerCommitLogDispatchStore(this.transMessageRocksDBStore);
}
this.recover(lastExitOK);
LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
}
Expand Down Expand Up @@ -377,7 +395,16 @@ private void recover(final boolean lastExitOK) throws RocksDBException {
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_CONSUME_QUEUE_OK);

// recover commitlog
long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset();
// Calculate the minimum dispatch offset from all registered stores
Long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset(lastExitOK);

for (CommitLogDispatchStore store : commitLogDispatchStores) {
Long storeOffset = store.getDispatchFromPhyOffset(lastExitOK);
if (storeOffset != null && storeOffset > 0) {
dispatchFromPhyOffset = Math.min(dispatchFromPhyOffset, storeOffset);
}
}

if (lastExitOK) {
this.commitLog.recoverNormally(dispatchFromPhyOffset);
} else {
Expand Down Expand Up @@ -1102,6 +1129,31 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
@Override
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
this.transMessageRocksDBStore = transMessageRocksDBStore;
// Register TransMessageRocksDBStore for commitlog dispatch if enabled
if (transMessageRocksDBStore != null && messageStoreConfig.isTransRocksDBEnable()) {
registerCommitLogDispatchStore(this.transMessageRocksDBStore);
}
}

/**
* Register a store that requires commitlog dispatch and recovery. Each store should register itself when loading.
*
* @param store the store to register
*/
public void registerCommitLogDispatchStore(CommitLogDispatchStore store) {
if (store != null) {
commitLogDispatchStores.add(store);
LOGGER.info("Registered CommitLogDispatchStore: {}", store.getClass().getSimpleName());
}
}

/**
* Get all registered CommitLogDispatchStore instances.
*
* @return list of registered stores
*/
public List<CommitLogDispatchStore> getCommitLogDispatchStores() {
return commitLogDispatchStores;
}

@Override
Expand Down Expand Up @@ -1400,7 +1452,8 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon
}

@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end, String indexType, String lastKey) {
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end, String indexType,
String lastKey) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
Expand Down Expand Up @@ -1510,10 +1563,9 @@ public long now() {
}

/**
* Lazy clean queue offset table.
* If offset table is cleaned, and old messages are dispatching after the old consume queue is cleaned,
* consume queue will be created with old offset, then later message with new offset table can not be
* dispatched to consume queue.
* Lazy clean queue offset table. If offset table is cleaned, and old messages are dispatching after the old consume
* queue is cleaned, consume queue will be created with old offset, then later message with new offset table can not
* be dispatched to consume queue.
*/
@Override
public int deleteTopics(final Set<String> deleteTopics) {
Expand Down Expand Up @@ -1677,6 +1729,7 @@ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long consu
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}

@Override
public long dispatchBehindMilliseconds() {
return this.reputMessageService.behindMs();
Expand Down Expand Up @@ -1818,8 +1871,8 @@ public boolean checkInDiskByCommitOffset(long offsetPy) {
}

/**
* The ratio val is estimated by the experiment and experience
* so that the result is not high accurate for different business
* The ratio val is estimated by the experiment and experience so that the result is not high accurate for different
* business
*
* @return
*/
Expand Down
21 changes: 21 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class StoreCheckpoint {
private volatile long tmpLogicsMsgTimestamp = 0;
private volatile long physicMsgTimestamp = 0;
private volatile long logicsMsgTimestamp = 0;
private volatile long tmpLogicsPhysicalOffset = 0;
private volatile long logicsPhysicalOffset = 0;
private volatile long indexMsgTimestamp = 0;
private volatile long masterFlushedOffset = 0;
private volatile long confirmPhyOffset = 0;
Expand All @@ -56,6 +58,7 @@ public StoreCheckpoint(final String scpPath) throws IOException {
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
this.masterFlushedOffset = this.mappedByteBuffer.getLong(24);
this.confirmPhyOffset = this.mappedByteBuffer.getLong(32);
this.logicsPhysicalOffset = this.mappedByteBuffer.getLong(40);

log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
Expand All @@ -65,6 +68,7 @@ public StoreCheckpoint(final String scpPath) throws IOException {
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
log.info("store checkpoint file masterFlushedOffset " + this.masterFlushedOffset);
log.info("store checkpoint file confirmPhyOffset " + this.confirmPhyOffset);
log.info("store checkpoint file logicsPhysicalOffset " + this.logicsPhysicalOffset);
} else {
log.info("store checkpoint file not exists, " + scpPath);
}
Expand All @@ -91,6 +95,7 @@ public void flush() {
this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
this.mappedByteBuffer.putLong(24, this.masterFlushedOffset);
this.mappedByteBuffer.putLong(32, this.confirmPhyOffset);
this.mappedByteBuffer.putLong(40, this.logicsPhysicalOffset);
this.mappedByteBuffer.force();
} catch (Throwable e) {
log.error("Failed to flush", e);
Expand Down Expand Up @@ -121,6 +126,22 @@ public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) {
this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp;
}

public long getTmpLogicsPhysicalOffset() {
return tmpLogicsPhysicalOffset;
}

public void setTmpLogicsPhysicalOffset(long tmpLogicsPhysicalOffset) {
this.tmpLogicsPhysicalOffset = tmpLogicsPhysicalOffset;
}

public long getLogicsPhysicalOffset() {
return logicsPhysicalOffset;
}

public void setLogicsPhysicalOffset(long logicsPhysicalOffset) {
this.logicsPhysicalOffset = logicsPhysicalOffset;
}

public long getConfirmPhyOffset() {
return confirmPhyOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,8 @@ public class MessageStoreConfig {
private long rocksdbWalFileRollingThreshold = SizeUnit.GB;

/**
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH
* and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave
* or controller mode).
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH.
* This switch is not recommended for normal use cases (include master-slave or controller mode).
*/
private boolean enableAcceleratedRecovery = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
}

@Override
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
dledgerRecoverAbnormally(maxPhyOffsetOfConsumeQueue);
public void recoverAbnormally(long dispatchFromPhyOffset) throws RocksDBException {
dledgerRecoverAbnormally(dispatchFromPhyOffset);
}

@Override
Expand Down
Loading
Loading