Skip to content

Commit f38f2f9

Browse files
committed
fix: speed up batch queue processing by removing stalls and fixing retry race
1 parent 921285c commit f38f2f9

File tree

6 files changed

+132
-16
lines changed

6 files changed

+132
-16
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Fix slow batch queue processing by removing spurious cooloff on concurrency blocks, making the global rate limiter non-blocking, and fixing a race condition where retry attempt counts were not atomically updated during message re-queue.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ const EnvironmentSchema = z
549549
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
550550
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
551551
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
552-
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(1),
552+
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(5),
553553

554554
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
555555
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,7 @@ export class BatchQueue {
150150
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
151151
startConsumers: false, // We control when to start
152152
cooloff: {
153-
enabled: true,
154-
threshold: 5,
155-
periodMs: 5_000,
153+
enabled: false,
156154
},
157155
// Worker queue configuration - FairQueue routes all messages to our single worker queue
158156
workerQueue: {

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -925,8 +925,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
925925
if (this.concurrencyManager) {
926926
const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor);
927927
if (availableCapacity === 0) {
928-
// Queue at max concurrency, back off to avoid repeated attempts
929-
this.#incrementCooloff(queueId);
928+
// Queue at max concurrency - don't increment cooloff here.
929+
// The outer loop already handles this case (concurrency blocked)
930+
// and explicitly avoids cooloff for it. Cooloff here causes
931+
// spurious 5s stalls when capacity races between the tenant
932+
// pre-check and this per-queue check.
930933
return 0;
931934
}
932935
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
@@ -1228,19 +1231,18 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12281231
attempt: storedMessage.attempt + 1,
12291232
};
12301233

1231-
// Release with delay (and ensure queue is in master queue)
1234+
// Release with delay, passing the updated message data so the Lua script
1235+
// atomically writes the incremented attempt count when re-queuing.
12321236
await this.visibilityManager.release(
12331237
storedMessage.id,
12341238
queueId,
12351239
queueKey,
12361240
queueItemsKey,
12371241
masterQueueKey,
1238-
Date.now() + nextDelay
1242+
Date.now() + nextDelay,
1243+
JSON.stringify(updatedMessage)
12391244
);
12401245

1241-
// Update message in items hash with new attempt count
1242-
await this.redis.hset(queueItemsKey, storedMessage.id, JSON.stringify(updatedMessage));
1243-
12441246
// Release concurrency
12451247
if (this.concurrencyManager) {
12461248
await this.concurrencyManager.release(descriptor, storedMessage.id);

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,4 +1182,104 @@ describe("FairQueue", () => {
11821182
}
11831183
);
11841184
});
1185+
1186+
describe("concurrency block should not trigger cooloff", () => {
1187+
redisTest(
1188+
"should not enter cooloff when queue hits concurrency limit",
1189+
{ timeout: 15000 },
1190+
async ({ redisOptions }) => {
1191+
const processed: string[] = [];
1192+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
1193+
1194+
const scheduler = new DRRScheduler({
1195+
redis: redisOptions,
1196+
keys,
1197+
quantum: 10,
1198+
maxDeficit: 100,
1199+
});
1200+
1201+
const queue = new TestFairQueueHelper(redisOptions, keys, {
1202+
scheduler,
1203+
payloadSchema: TestPayloadSchema,
1204+
shardCount: 1,
1205+
consumerCount: 1,
1206+
consumerIntervalMs: 20,
1207+
visibilityTimeoutMs: 5000,
1208+
cooloff: {
1209+
periodMs: 5000, // Long cooloff - if triggered, messages would stall
1210+
threshold: 1, // Enter cooloff after just 1 increment
1211+
},
1212+
concurrencyGroups: [
1213+
{
1214+
name: "tenant",
1215+
extractGroupId: (q) => q.tenantId,
1216+
getLimit: async () => 1, // Only 1 concurrent per tenant
1217+
defaultLimit: 1,
1218+
},
1219+
],
1220+
startConsumers: false,
1221+
});
1222+
1223+
// Hold first message to keep concurrency slot occupied
1224+
let releaseFirst: (() => void) | undefined;
1225+
const firstBlocking = new Promise<void>((resolve) => {
1226+
releaseFirst = resolve;
1227+
});
1228+
let firstStarted = false;
1229+
1230+
queue.onMessage(async (ctx) => {
1231+
if (ctx.message.payload.value === "msg-0") {
1232+
firstStarted = true;
1233+
// Block this message to saturate concurrency
1234+
await firstBlocking;
1235+
}
1236+
processed.push(ctx.message.payload.value);
1237+
await ctx.complete();
1238+
});
1239+
1240+
// Enqueue 3 messages to same tenant
1241+
for (let i = 0; i < 3; i++) {
1242+
await queue.enqueue({
1243+
queueId: "tenant:t1:queue:q1",
1244+
tenantId: "t1",
1245+
payload: { value: `msg-${i}` },
1246+
});
1247+
}
1248+
1249+
queue.start();
1250+
1251+
// Wait for first message to start processing (blocking the concurrency slot)
1252+
await vi.waitFor(
1253+
() => {
1254+
expect(firstStarted).toBe(true);
1255+
},
1256+
{ timeout: 5000 }
1257+
);
1258+
1259+
// Release the first message so others can proceed
1260+
releaseFirst!();
1261+
1262+
// All 3 messages should process within a reasonable time.
1263+
// If cooloff was incorrectly triggered, this would take 5+ seconds.
1264+
const startTime = Date.now();
1265+
await vi.waitFor(
1266+
() => {
1267+
expect(processed).toHaveLength(3);
1268+
},
1269+
{ timeout: 5000 }
1270+
);
1271+
const elapsed = Date.now() - startTime;
1272+
1273+
// Should complete well under the 5s cooloff period
1274+
expect(elapsed).toBeLessThan(3000);
1275+
1276+
// Cooloff states should be empty (no spurious cooloffs)
1277+
const cacheSizes = queue.fairQueue.getCacheSizes();
1278+
expect(cacheSizes.cooloffStatesSize).toBe(0);
1279+
1280+
await queue.close();
1281+
}
1282+
);
1283+
});
1284+
11851285
});

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ export class VisibilityManager {
284284
queueKey: string,
285285
queueItemsKey: string,
286286
masterQueueKey: string,
287-
score?: number
287+
score?: number,
288+
updatedData?: string
288289
): Promise<void> {
289290
const shardId = this.#getShardForQueue(queueId);
290291
const inflightKey = this.keys.inflightKey(shardId);
@@ -293,7 +294,7 @@ export class VisibilityManager {
293294
const messageScore = score ?? Date.now();
294295

295296
// Use Lua script to atomically:
296-
// 1. Get message data from in-flight
297+
// 1. Get message data from in-flight (or use updatedData if provided)
297298
// 2. Remove from in-flight
298299
// 3. Add back to queue
299300
// 4. Update master queue to ensure queue is picked up
@@ -306,7 +307,8 @@ export class VisibilityManager {
306307
member,
307308
messageId,
308309
messageScore.toString(),
309-
queueId
310+
queueId,
311+
updatedData ?? ""
310312
);
311313

312314
this.logger.debug("Message released", {
@@ -434,7 +436,8 @@ export class VisibilityManager {
434436
member,
435437
messageId,
436438
score.toString(),
437-
queueId
439+
queueId,
440+
""
438441
);
439442

440443
// Track reclaimed message for concurrency release
@@ -680,6 +683,7 @@ local member = ARGV[1]
680683
local messageId = ARGV[2]
681684
local score = tonumber(ARGV[3])
682685
local queueId = ARGV[4]
686+
local updatedData = ARGV[5]
683687
684688
-- Get message data from in-flight
685689
local payload = redis.call('HGET', inflightDataKey, messageId)
@@ -688,6 +692,12 @@ if not payload then
688692
return 0
689693
end
690694
695+
-- Use updatedData if provided (e.g. incremented attempt count for retries),
696+
-- otherwise use the original in-flight data
697+
if updatedData and updatedData ~= "" then
698+
payload = updatedData
699+
end
700+
691701
-- Remove from in-flight
692702
redis.call('ZREM', inflightKey, member)
693703
redis.call('HDEL', inflightDataKey, messageId)
@@ -816,7 +826,8 @@ declare module "@internal/redis" {
816826
member: string,
817827
messageId: string,
818828
score: string,
819-
queueId: string
829+
queueId: string,
830+
updatedData: string
820831
): Promise<number>;
821832

822833
releaseMessageBatch(

0 commit comments

Comments
 (0)