Skip to content

IGNITE-24963#7799

Open
ascherbakoff wants to merge 27 commits intoapache:mainfrom
gridgain:ignite-24963-3
Open

IGNITE-24963#7799
ascherbakoff wants to merge 27 commits intoapache:mainfrom
gridgain:ignite-24963-3

Conversation

@ascherbakoff
Copy link
Contributor

No description provided.

Copilot AI review requested due to automatic review settings March 17, 2026 07:03
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a wound-wait deadlock prevention policy for transactions, replacing the previous wait-die approach. It also introduces transaction kill messaging, refactors lock manager internals, adds runInTransaction to TxManager, and restructures test hierarchies for lock management tests.

Changes:

  • Replaces DeadlockPreventionPolicyImpl with specific policy classes (WoundWaitDeadlockPreventionPolicy, NoWaitDeadlockPreventionPolicy, TimeoutDeadlockPreventionPolicy, ReversedWaitDieDeadlockPreventionPolicy) and refactors HeapLockManager to use allowWait callback with a "sealable" tx map
  • Adds TxKillMessage for cross-node transaction kill signaling and integrates it into TxManagerImpl as the wound-wait fail action
  • Moves runInTransaction from IgniteTransactions default methods into TxManager and refactors the retry logic in RunInTransactionInternalImpl

Reviewed changes

Copilot reviewed 54 out of 54 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
DeadlockPreventionPolicy.java Adds allowWait, failAction, reverse methods; removes usePriority
WoundWaitDeadlockPreventionPolicy.java New wound-wait policy implementation
WaitDieDeadlockPreventionPolicy.java Adds allowWait and reverse implementations
NoWaitDeadlockPreventionPolicy.java New no-wait policy
TimeoutDeadlockPreventionPolicy.java New timeout-only policy
ReversedWaitDieDeadlockPreventionPolicy.java New reversed wait-die policy
DeadlockPreventionPolicyImpl.java Deleted — replaced by specific policy classes
HeapLockManager.java Major refactor: sealable tx map, tryAcquireInternal, findConflicts, callback-based conflict resolution
TxKillMessage.java New network message for tx kill requests
TxMessageGroup.java Registers new TX_KILL_MESSAGE type
TxMessageSender.java Adds kill() method
TxManagerImpl.java Switches to wound-wait policy, adds kill message handler, adds runInTransaction
ReadWriteTransactionImpl.java Makes killed volatile, moves assignment, exposes enlistFailedException
InternalTransaction.java Adds enlistFailedException default method
PublicApiThreadingTransaction.java Delegates enlistFailedException
IgniteTransactions.java Makes runInTransaction/runInTransactionAsync abstract
IgniteTransactionsImpl.java Implements runInTransaction/runInTransactionAsync
ClientTransactions.java Stub implementations throwing IllegalArgumentException
RestartProofIgniteTransactions.java Delegates new methods
PublicApiThreadingIgniteTransactions.java Delegates new methods
RunInTransactionInternalImpl.java Refactored retry logic, made public
TransactionIds.java Adds retryCnt field to tx ID encoding
InternalTxOptions.java Adds retryId option
Lock.java Adds equals/hashCode
LockKey.java Improves toString for ByteBuffer keys
LockManager.java Adds policy() method
TransactionKilledException.java Adds simplified constructor
InternalTableImpl.java Delegates to tx.enlistFailedException()
PartitionReplicaListener.java Replaces TxCleanupReadyState with PartitionInflights
PartitionInflights.java New inflight tracking for partition replicas
TraceableFuture.java Debug utility (should be removed)
ThreadAssertingMvPartitionStorage.java Disables thread assertion (should be reverted)
TpccBenchmarkNodeRunner.java Developer-local benchmark runner
ItDataConsistencyTest.java Test adjustments for new policy
AbstractLockingTest.java Refactored base test class
AbstractLockManagerTest.java Deleted — tests moved to HeapLockManagerTest
HeapLockManagerTest.java Now extends AbstractLockingTest, contains moved tests
Various test files Updated to use new policy classes and matchers
LockWaiterMatcher.java, LockFutureMatcher.java, LockConflictMatcher.java New Hamcrest matchers for lock test assertions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

}

protected Path workDir() throws Exception {
return new File("c:/work/tpcc").toPath();
Comment on lines +26 to +27
* Wound-wait prevention policy. TODO desc.
*/

/**
* Sends a message to kill a transaction to it's coordinator.
*
Comment on lines +132 to +152
public static class CleanupContext {
volatile CompletableFuture<Void> finishFut;
AtomicLong inflights = new AtomicLong(0); // TODO atomic updater
volatile boolean hasWrites = false;

// void addInflight() {
// inflights.incrementAndGet();
// }
//
// void removeInflight(UUID txId) {
// //assert inflights > 0 : format("No inflights, cannot remove any [txId={}, ctx={}]", txId, this);
//
// inflights.decrementAndGet();
// }
}

@TestOnly
public ConcurrentHashMap<UUID, CleanupContext> map() {
return txCtxMap;
}
}
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
//LOG.info("DBG: send cleanup " + txId);
Comment on lines +31 to +63
private final UUID waiterId;
private CompletableFuture<Lock> item;

private LockWaiterMatcher(UUID txId) {
this.waiterId = txId;
}

@Override
protected boolean matchesSafely(CompletableFuture<Lock> item) {
try {
this.item = item;
item.get(50, TimeUnit.MILLISECONDS);
return false; // Timeout exception is expected.
} catch (TimeoutException e) {
return true;
} catch (InterruptedException | ExecutionException | CancellationException e) {
throw new AssertionError(e);
}
}

@Override
protected void describeMismatchSafely(CompletableFuture<Lock> item, Description mismatchDescription) {
mismatchDescription.appendText("lock future is completed ").appendValue(item);
}

@Override
public void describeTo(Description description) {
description.appendText("lock future which should wait for ").appendValue(waiterId);
}

public static LockWaiterMatcher waitsFor(UUID... txIds) {
return new LockWaiterMatcher(txIds[0]);
}
Comment on lines +276 to +337
@@ -342,14 +331,8 @@ default <T> CompletableFuture<T> runInTransactionAsync(Function<Transaction, Com
* @param <T> Closure result type.
* @return The result.
*/
default <T> CompletableFuture<T> runInTransactionAsync(
<T> CompletableFuture<T> runInTransactionAsync(
Function<Transaction, CompletableFuture<T>> clo,
@Nullable TransactionOptions options
) {
// This start timestamp is not related to transaction's begin timestamp and only serves as local time for counting the timeout of
// possible retries.
long startTimestamp = System.currentTimeMillis();
long initialTimeout = options == null ? TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) : options.timeoutMillis();
return runInTransactionAsyncInternal(this, clo, options, startTimestamp, initialTimeout, null);
}
);
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws StorageException {
assertThreadAllowsToRead();
//assertThreadAllowsToRead();
Comment on lines +1038 to +1066
var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy() {
@Override
public long waitTimeout() {
return DEFAULT_LOCK_TIMEOUT;
}

@Override
public void failAction(UUID owner) {
// TODO resolve tx with ABORT and delete locks
TxStateMeta state = txStateVolatileStorage.state(owner);
if (state == null || state.txCoordinatorId() == null) {
return; // tx state is invalid. locks should be cleaned up by tx recovery process.
}

InternalClusterNode coordinator = topologyService.getById(state.txCoordinatorId());
if (coordinator == null) {
return; // tx is abandoned. locks should be cleaned up by tx recovery process.
}

txMessageSender.kill(coordinator, owner);
}
};

// var deadlockPreventionPolicy = new WaitDieDeadlockPreventionPolicy() {
// @Override
// public long waitTimeout() {
// return DEFAULT_LOCK_TIMEOUT;
// }
// };
Comment on lines +1 to +20
package org.apache.ignite.internal.table.distributed.replicator;

import java.io.StringWriter;
import java.util.concurrent.CompletableFuture;

public class TraceableFuture<T> extends CompletableFuture<T> {
private StringWriter log = new StringWriter();

public synchronized void log(String msg) {
log.append("<" + msg + ">");
}

public String message() {
String str;
synchronized (this) {
str = log.toString();
}
return str;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants