diff --git a/.server-changes/fix-batch-waitpoint-lock-contention.md b/.server-changes/fix-batch-waitpoint-lock-contention.md new file mode 100644 index 0000000000..6b545eb794 --- /dev/null +++ b/.server-changes/fix-batch-waitpoint-lock-contention.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Reduce lock contention when processing large `batchTriggerAndWait` batches. Previously, each batch item acquired a Redis lock on the parent run to insert a `TaskRunWaitpoint` row, causing `LockAcquisitionTimeoutError` with high concurrency (880 errors/24h in prod). Since `blockRunWithCreatedBatch` already transitions the parent to `EXECUTING_WITH_WAITPOINTS` before items are processed, the per-item lock is unnecessary. The new `blockRunWithWaitpointLockless` method performs only the idempotent CTE insert without acquiring the lock. diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index c031113411..6cf80bd46c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -728,17 +728,32 @@ export class RunEngine { //triggerAndWait or batchTriggerAndWait if (resumeParentOnCompletion && parentTaskRunId && taskRun.associatedWaitpoint) { - //this will block the parent run from continuing until this waitpoint is completed (and removed) - await this.waitpointSystem.blockRunWithWaitpoint({ - runId: parentTaskRunId, - waitpoints: taskRun.associatedWaitpoint.id, - projectId: taskRun.associatedWaitpoint.projectId, - organizationId: environment.organization.id, - batch, - workerId, - runnerId, - tx: prisma, - }); + if (batch) { + // Batch path: lockless insert. The parent is already EXECUTING_WITH_WAITPOINTS + // from blockRunWithCreatedBatch, so we only need to insert the TaskRunWaitpoint + // row without acquiring the parent run lock. This avoids lock contention when + // processing large batches with high concurrency. + await this.waitpointSystem.blockRunWithWaitpointLockless({ + runId: parentTaskRunId, + waitpoints: taskRun.associatedWaitpoint.id, + projectId: taskRun.associatedWaitpoint.projectId, + batch, + tx: prisma, + }); + } else { + // Single triggerAndWait: acquire the parent run lock to safely transition + // the snapshot and insert the waitpoint + await this.waitpointSystem.blockRunWithWaitpoint({ + runId: parentTaskRunId, + waitpoints: taskRun.associatedWaitpoint.id, + projectId: taskRun.associatedWaitpoint.projectId, + organizationId: environment.organization.id, + batch, + workerId, + runnerId, + tx: prisma, + }); + } } if (taskRun.delayUntil) { diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index f2ad85cc95..d650c1c6d0 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -525,6 +525,85 @@ export class WaitpointSystem { }); } + /** + * Lockless version of blockRunWithWaitpoint for batch item processing. + * + * When processing batchTriggerAndWait items, blockRunWithCreatedBatch has already + * transitioned the parent run to EXECUTING_WITH_WAITPOINTS before any items are + * processed. Per-item calls to blockRunWithWaitpoint would all compete for the same + * parent run lock just to insert a TaskRunWaitpoint row — causing lock contention + * and LockAcquisitionTimeoutError with large batches. + * + * This method performs only the CTE insert (which is idempotent via ON CONFLICT DO + * NOTHING) and timeout scheduling, without acquiring the parent run lock. + */ + async blockRunWithWaitpointLockless({ + runId, + waitpoints, + projectId, + timeout, + spanIdToComplete, + batch, + tx, + }: { + runId: string; + waitpoints: string | string[]; + projectId: string; + timeout?: Date; + spanIdToComplete?: string; + batch: { id: string; index?: number }; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.$.prisma; + const $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; + + // Insert the blocking connections and the historical run connections. + // No lock needed: ON CONFLICT DO NOTHING makes concurrent inserts safe, + // and the parent snapshot is already EXECUTING_WITH_WAITPOINTS from + // blockRunWithCreatedBatch. + await prisma.$queryRaw` + WITH inserted AS ( + INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex") + SELECT + gen_random_uuid(), + ${runId}, + w.id, + ${projectId}, + NOW(), + NOW(), + ${spanIdToComplete ?? null}, + ${batch.id}, + ${batch.index ?? null} + FROM "Waitpoint" w + WHERE w.id IN (${Prisma.join($waitpoints)}) + ON CONFLICT DO NOTHING + RETURNING "waitpointId" + ), + connected_runs AS ( + INSERT INTO "_WaitpointRunConnections" ("A", "B") + SELECT ${runId}, w.id + FROM "Waitpoint" w + WHERE w.id IN (${Prisma.join($waitpoints)}) + ON CONFLICT DO NOTHING + ) + SELECT COUNT(*) FROM inserted`; + + // Schedule timeout jobs if needed + if (timeout) { + for (const waitpoint of $waitpoints) { + await this.$.worker.enqueue({ + id: `finishWaitpoint.${waitpoint}`, + job: "finishWaitpoint", + payload: { + waitpointId: waitpoint, + error: JSON.stringify(timeoutError(timeout)), + }, + availableAt: timeout, + }); + } + } + } + /** * Blocks a run with a waitpoint and immediately completes the waitpoint. * diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index 04b43fd19a..53b59ddced 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -6,7 +6,7 @@ import { trace } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; describe("RunLocker", () => { - redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { + redisTest("Test acquiring a lock works", { timeout: 60_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); const logger = new Logger("RunLockTest", "debug"); const runLock = new RunLocker({