Skip to content

Commit dc42cdf

Browse files
d-csclaude
andcommitted
test(run-engine): repro for stale worker queue entry after sweeper ack
Adds a failing test in concurrencySweeper.test.ts that demonstrates an inconsistency between the worker queue list and the message body store: - Fast-path enqueue RPUSHes the messageKey value onto the worker queue list and SADDs the run into currentConcurrency. - The sweeper marks the run as completed (via test callback) and processMarkedRun acks with removeFromWorkerQueue: false, which DELs the message body but skips the LREM on the worker queue list. - A subsequent blocking dequeue BLPOPs the stale messageKey, the GET returns nil, and the dequeue path emits "Failed to dequeue message from worker queue" with workerQueueLength: 0. The test asserts that the dequeue path does not log this error after the sweeper has acked the run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 386b4f6 commit dc42cdf

1 file changed

Lines changed: 96 additions & 0 deletions

File tree

internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,100 @@ describe("RunQueue Concurrency Sweeper", () => {
175175
}
176176
}
177177
);
178+
179+
// When the sweeper acks a run whose messageKey value is still sitting on the worker
180+
// queue list (e.g. fast-path enqueued, never BLPOP'd), it deletes the message body but
181+
// leaves a stale tombstone on the list. The next BLPOP returns that tombstone, GET
182+
// returns nil, and the dequeue path logs "Failed to dequeue message from worker queue".
183+
redisTest(
184+
"should not produce 'Failed to dequeue message from worker queue' after sweeper acks a fast-path-enqueued run",
185+
async ({ redisContainer }) => {
186+
let enableConcurrencySweeper = false;
187+
const logger = new Logger("RunQueue", "debug");
188+
const errorSpy = vi.spyOn(logger, "error");
189+
190+
const queue = new RunQueue({
191+
...testOptions,
192+
logger,
193+
logLevel: "debug",
194+
queueSelectionStrategy: new FairQueueSelectionStrategy({
195+
redis: {
196+
keyPrefix: "runqueue:test:",
197+
host: redisContainer.getHost(),
198+
port: redisContainer.getPort(),
199+
},
200+
keys: testOptions.keys,
201+
}),
202+
workerOptions: {
203+
pollIntervalMs: 100,
204+
immediatePollIntervalMs: 100,
205+
},
206+
redis: {
207+
keyPrefix: "runqueue:test:",
208+
host: redisContainer.getHost(),
209+
port: redisContainer.getPort(),
210+
},
211+
concurrencySweeper: {
212+
scanSchedule: "* * * * * *",
213+
scanJitterInMs: 5,
214+
processMarkedSchedule: "* * * * * *",
215+
processMarkedJitterInMs: 5,
216+
callback: async (runIds) => {
217+
if (!enableConcurrencySweeper) {
218+
return [];
219+
}
220+
return [{ id: messageDev.runId, orgId: "o1234" }];
221+
},
222+
},
223+
});
224+
225+
try {
226+
await queue.updateEnvConcurrencyLimits(authenticatedEnvDev);
227+
228+
// Fast-path enqueue: SET messageKey, RPUSH messageKeyValue onto worker queue list,
229+
// SADD runId into currentConcurrency. The message is on the list waiting to be popped.
230+
await queue.enqueueMessage({
231+
env: authenticatedEnvDev,
232+
message: messageDev,
233+
workerQueue: authenticatedEnvDev.id,
234+
enableFastPath: true,
235+
});
236+
237+
// Pre-conditions: list has the entry, run is "in-flight" per operational concurrency,
238+
// body exists. Fast-path bumps the operational currentConcurrency (SADD) but not
239+
// currentDequeued — the displayed concurrency is bumped only when a worker BLPOPs.
240+
expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toHaveLength(1);
241+
expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(1);
242+
expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeDefined();
243+
244+
// Sweeper now considers the run completed (test callback returns it), so
245+
// processMarkedRun acks with removeFromWorkerQueue: false.
246+
enableConcurrencySweeper = true;
247+
await setTimeout(5_000);
248+
249+
// Sweeper has run: operational concurrency released, message body deleted.
250+
expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(0);
251+
expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeUndefined();
252+
253+
// Trigger a blocking dequeue with a short timeout — production uses blockingPop:true,
254+
// which is the only path that emits this error log.
255+
const dequeued = await queue.dequeueMessageFromWorkerQueue(
256+
"test_consumer",
257+
authenticatedEnvDev.id,
258+
{ blockingPop: true, blockingPopTimeoutSeconds: 2 }
259+
);
260+
expect(dequeued).toBeUndefined();
261+
262+
// BUG: the dequeue path logs the exact Sentry-visible error. Match the message and
263+
// the structured fields one-to-one with what TRIGGER-CLOUD-VC reports.
264+
const failedDequeueErrors = errorSpy.mock.calls.filter(
265+
([msg]) => msg === "Failed to dequeue message from worker queue"
266+
);
267+
expect(failedDequeueErrors).toHaveLength(0);
268+
} finally {
269+
errorSpy.mockRestore();
270+
await queue.quit();
271+
}
272+
}
273+
);
178274
});

0 commit comments

Comments
 (0)