Open
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements a wound-wait deadlock prevention policy for transactions, replacing the previous wait-die approach. It also introduces transaction kill messaging, refactors lock manager internals, adds runInTransaction to TxManager, and restructures test hierarchies for lock management tests.
Changes:
- Replaces
DeadlockPreventionPolicyImplwith specific policy classes (WoundWaitDeadlockPreventionPolicy,NoWaitDeadlockPreventionPolicy,TimeoutDeadlockPreventionPolicy,ReversedWaitDieDeadlockPreventionPolicy) and refactorsHeapLockManagerto useallowWaitcallback with a "sealable" tx map - Adds
TxKillMessagefor cross-node transaction kill signaling and integrates it intoTxManagerImplas the wound-wait fail action - Moves
runInTransactionfromIgniteTransactionsdefault methods intoTxManagerand refactors the retry logic inRunInTransactionInternalImpl
Reviewed changes
Copilot reviewed 54 out of 54 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
DeadlockPreventionPolicy.java |
Adds allowWait, failAction, reverse methods; removes usePriority |
WoundWaitDeadlockPreventionPolicy.java |
New wound-wait policy implementation |
WaitDieDeadlockPreventionPolicy.java |
Adds allowWait and reverse implementations |
NoWaitDeadlockPreventionPolicy.java |
New no-wait policy |
TimeoutDeadlockPreventionPolicy.java |
New timeout-only policy |
ReversedWaitDieDeadlockPreventionPolicy.java |
New reversed wait-die policy |
DeadlockPreventionPolicyImpl.java |
Deleted — replaced by specific policy classes |
HeapLockManager.java |
Major refactor: sealable tx map, tryAcquireInternal, findConflicts, callback-based conflict resolution |
TxKillMessage.java |
New network message for tx kill requests |
TxMessageGroup.java |
Registers new TX_KILL_MESSAGE type |
TxMessageSender.java |
Adds kill() method |
TxManagerImpl.java |
Switches to wound-wait policy, adds kill message handler, adds runInTransaction |
ReadWriteTransactionImpl.java |
Makes killed volatile, moves assignment, exposes enlistFailedException |
InternalTransaction.java |
Adds enlistFailedException default method |
PublicApiThreadingTransaction.java |
Delegates enlistFailedException |
IgniteTransactions.java |
Makes runInTransaction/runInTransactionAsync abstract |
IgniteTransactionsImpl.java |
Implements runInTransaction/runInTransactionAsync |
ClientTransactions.java |
Stub implementations throwing IllegalArgumentException |
RestartProofIgniteTransactions.java |
Delegates new methods |
PublicApiThreadingIgniteTransactions.java |
Delegates new methods |
RunInTransactionInternalImpl.java |
Refactored retry logic, made public |
TransactionIds.java |
Adds retryCnt field to tx ID encoding |
InternalTxOptions.java |
Adds retryId option |
Lock.java |
Adds equals/hashCode |
LockKey.java |
Improves toString for ByteBuffer keys |
LockManager.java |
Adds policy() method |
TransactionKilledException.java |
Adds simplified constructor |
InternalTableImpl.java |
Delegates to tx.enlistFailedException() |
PartitionReplicaListener.java |
Replaces TxCleanupReadyState with PartitionInflights |
PartitionInflights.java |
New inflight tracking for partition replicas |
TraceableFuture.java |
Debug utility (should be removed) |
ThreadAssertingMvPartitionStorage.java |
Disables thread assertion (should be reverted) |
TpccBenchmarkNodeRunner.java |
Developer-local benchmark runner |
ItDataConsistencyTest.java |
Test adjustments for new policy |
AbstractLockingTest.java |
Refactored base test class |
AbstractLockManagerTest.java |
Deleted — tests moved to HeapLockManagerTest |
HeapLockManagerTest.java |
Now extends AbstractLockingTest, contains moved tests |
| Various test files | Updated to use new policy classes and matchers |
LockWaiterMatcher.java, LockFutureMatcher.java, LockConflictMatcher.java |
New Hamcrest matchers for lock test assertions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| } | ||
|
|
||
| protected Path workDir() throws Exception { | ||
| return new File("c:/work/tpcc").toPath(); |
Comment on lines
+26
to
+27
| * Wound-wait prevention policy. TODO desc. | ||
| */ |
|
|
||
| /** | ||
| * Sends a message to kill a transaction to it's coordinator. | ||
| * |
Comment on lines
+132
to
+152
| public static class CleanupContext { | ||
| volatile CompletableFuture<Void> finishFut; | ||
| AtomicLong inflights = new AtomicLong(0); // TODO atomic updater | ||
| volatile boolean hasWrites = false; | ||
|
|
||
| // void addInflight() { | ||
| // inflights.incrementAndGet(); | ||
| // } | ||
| // | ||
| // void removeInflight(UUID txId) { | ||
| // //assert inflights > 0 : format("No inflights, cannot remove any [txId={}, ctx={}]", txId, this); | ||
| // | ||
| // inflights.decrementAndGet(); | ||
| // } | ||
| } | ||
|
|
||
| @TestOnly | ||
| public ConcurrentHashMap<UUID, CleanupContext> map() { | ||
| return txCtxMap; | ||
| } | ||
| } |
| boolean commit, | ||
| @Nullable HybridTimestamp commitTimestamp | ||
| ) { | ||
| //LOG.info("DBG: send cleanup " + txId); |
Comment on lines
+31
to
+63
| private final UUID waiterId; | ||
| private CompletableFuture<Lock> item; | ||
|
|
||
| private LockWaiterMatcher(UUID txId) { | ||
| this.waiterId = txId; | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean matchesSafely(CompletableFuture<Lock> item) { | ||
| try { | ||
| this.item = item; | ||
| item.get(50, TimeUnit.MILLISECONDS); | ||
| return false; // Timeout exception is expected. | ||
| } catch (TimeoutException e) { | ||
| return true; | ||
| } catch (InterruptedException | ExecutionException | CancellationException e) { | ||
| throw new AssertionError(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void describeMismatchSafely(CompletableFuture<Lock> item, Description mismatchDescription) { | ||
| mismatchDescription.appendText("lock future is completed ").appendValue(item); | ||
| } | ||
|
|
||
| @Override | ||
| public void describeTo(Description description) { | ||
| description.appendText("lock future which should wait for ").appendValue(waiterId); | ||
| } | ||
|
|
||
| public static LockWaiterMatcher waitsFor(UUID... txIds) { | ||
| return new LockWaiterMatcher(txIds[0]); | ||
| } |
Comment on lines
+276
to
+337
| @@ -342,14 +331,8 @@ default <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, Com | |||
| * @param <T> Closure result type. | |||
| * @return The result. | |||
| */ | |||
| default <T> CompletableFuture<T> runInTransactionAsync( | |||
| <T> CompletableFuture<T> runInTransactionAsync( | |||
| Function<Transaction, CompletableFuture<T>> clo, | |||
| @Nullable TransactionOptions options | |||
| ) { | |||
| // This start timestamp is not related to transaction's begin timestamp and only serves as local time for counting the timeout of | |||
| // possible retries. | |||
| long startTimestamp = System.currentTimeMillis(); | |||
| long initialTimeout = options == null ? TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : options.timeoutMillis(); | |||
| return runInTransactionAsyncInternal(this, clo, options, startTimestamp, initialTimeout, null); | |||
| } | |||
| ); | |||
| @Override | ||
| public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException { | ||
| assertThreadAllowsToRead(); | ||
| //assertThreadAllowsToRead(); |
Comment on lines
+1038
to
+1066
| var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy() { | ||
| @Override | ||
| public long waitTimeout() { | ||
| return DEFAULT_LOCK_TIMEOUT; | ||
| } | ||
|
|
||
| @Override | ||
| public void failAction(UUID owner) { | ||
| // TODO resolve tx with ABORT and delete locks | ||
| TxStateMeta state = txStateVolatileStorage.state(owner); | ||
| if (state == null || state.txCoordinatorId() == null) { | ||
| return; // tx state is invalid. locks should be cleaned up by tx recovery process. | ||
| } | ||
|
|
||
| InternalClusterNode coordinator = topologyService.getById(state.txCoordinatorId()); | ||
| if (coordinator == null) { | ||
| return; // tx is abandoned. locks should be cleaned up by tx recovery process. | ||
| } | ||
|
|
||
| txMessageSender.kill(coordinator, owner); | ||
| } | ||
| }; | ||
|
|
||
| // var deadlockPreventionPolicy = new WaitDieDeadlockPreventionPolicy() { | ||
| // @Override | ||
| // public long waitTimeout() { | ||
| // return DEFAULT_LOCK_TIMEOUT; | ||
| // } | ||
| // }; |
Comment on lines
+1
to
+20
| package org.apache.ignite.internal.table.distributed.replicator; | ||
|
|
||
| import java.io.StringWriter; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| public class TraceableFuture<T> extends CompletableFuture<T> { | ||
| private StringWriter log = new StringWriter(); | ||
|
|
||
| public synchronized void log(String msg) { | ||
| log.append("<" + msg + ">"); | ||
| } | ||
|
|
||
| public String message() { | ||
| String str; | ||
| synchronized (this) { | ||
| str = log.toString(); | ||
| } | ||
| return str; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.