Skip to content

Commit d275304

Browse files
committed
fix: speed up batch queue processing by removing stalls and fixing retry race
1 parent 921285c commit d275304

File tree

5 files changed

+217
-21
lines changed

5 files changed

+217
-21
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Fix slow batch queue processing by removing spurious cooloff on concurrency blocks, making the global rate limiter non-blocking, and fixing a race condition where retry attempt counts were not atomically updated during message re-queue.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ const EnvironmentSchema = z
549549
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
550550
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
551551
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
552-
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(1),
552+
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(5),
553553

554554
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
555555
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -925,22 +925,24 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
925925
if (this.concurrencyManager) {
926926
const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor);
927927
if (availableCapacity === 0) {
928-
// Queue at max concurrency, back off to avoid repeated attempts
929-
this.#incrementCooloff(queueId);
928+
// Queue at max concurrency - don't increment cooloff here.
929+
// The outer loop already handles this case (concurrency blocked)
930+
// and explicitly avoids cooloff for it. Cooloff here causes
931+
// spurious 5s stalls when capacity races between the tenant
932+
// pre-check and this per-queue check.
930933
return 0;
931934
}
932935
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
933936
}
934937

935-
// Check global rate limit - wait if rate limited
938+
// Check global rate limit - return immediately if rate limited
936939
if (this.globalRateLimiter) {
937940
const result = await this.globalRateLimiter.limit();
938-
if (!result.allowed && result.resetAt) {
939-
const waitMs = Math.max(0, result.resetAt - Date.now());
940-
if (waitMs > 0) {
941-
this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId });
942-
await new Promise((resolve) => setTimeout(resolve, waitMs));
943-
}
941+
if (!result.allowed) {
942+
// Don't sleep - return 0 and let the consumer loop retry on the
943+
// next tick. Sleeping blocks this consumer entirely (up to 1s),
944+
// and with multiple consumers all sleeping, the batch queue stalls.
945+
return 0;
944946
}
945947
}
946948

@@ -1228,19 +1230,18 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12281230
attempt: storedMessage.attempt + 1,
12291231
};
12301232

1231-
// Release with delay (and ensure queue is in master queue)
1233+
// Release with delay, passing the updated message data so the Lua script
1234+
// atomically writes the incremented attempt count when re-queuing.
12321235
await this.visibilityManager.release(
12331236
storedMessage.id,
12341237
queueId,
12351238
queueKey,
12361239
queueItemsKey,
12371240
masterQueueKey,
1238-
Date.now() + nextDelay
1241+
Date.now() + nextDelay,
1242+
JSON.stringify(updatedMessage)
12391243
);
12401244

1241-
// Update message in items hash with new attempt count
1242-
await this.redis.hset(queueItemsKey, storedMessage.id, JSON.stringify(updatedMessage));
1243-
12441245
// Release concurrency
12451246
if (this.concurrencyManager) {
12461247
await this.concurrencyManager.release(descriptor, storedMessage.id);

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ import {
99
NoRetry,
1010
WorkerQueueManager,
1111
} from "../index.js";
12-
import type { FairQueueKeyProducer, FairQueueOptions, StoredMessage } from "../types.js";
12+
import type {
13+
FairQueueKeyProducer,
14+
FairQueueOptions,
15+
GlobalRateLimiter,
16+
StoredMessage,
17+
} from "../types.js";
1318
import type { RedisOptions } from "@internal/redis";
1419

1520
// Define a common payload schema for tests
@@ -1182,4 +1187,178 @@ describe("FairQueue", () => {
11821187
}
11831188
);
11841189
});
1190+
1191+
describe("concurrency block should not trigger cooloff", () => {
1192+
redisTest(
1193+
"should not enter cooloff when queue hits concurrency limit",
1194+
{ timeout: 15000 },
1195+
async ({ redisOptions }) => {
1196+
const processed: string[] = [];
1197+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
1198+
1199+
const scheduler = new DRRScheduler({
1200+
redis: redisOptions,
1201+
keys,
1202+
quantum: 10,
1203+
maxDeficit: 100,
1204+
});
1205+
1206+
const queue = new TestFairQueueHelper(redisOptions, keys, {
1207+
scheduler,
1208+
payloadSchema: TestPayloadSchema,
1209+
shardCount: 1,
1210+
consumerCount: 1,
1211+
consumerIntervalMs: 20,
1212+
visibilityTimeoutMs: 5000,
1213+
cooloff: {
1214+
periodMs: 5000, // Long cooloff - if triggered, messages would stall
1215+
threshold: 1, // Enter cooloff after just 1 increment
1216+
},
1217+
concurrencyGroups: [
1218+
{
1219+
name: "tenant",
1220+
extractGroupId: (q) => q.tenantId,
1221+
getLimit: async () => 1, // Only 1 concurrent per tenant
1222+
defaultLimit: 1,
1223+
},
1224+
],
1225+
startConsumers: false,
1226+
});
1227+
1228+
// Hold first message to keep concurrency slot occupied
1229+
let releaseFirst: (() => void) | undefined;
1230+
const firstBlocking = new Promise<void>((resolve) => {
1231+
releaseFirst = resolve;
1232+
});
1233+
let firstStarted = false;
1234+
1235+
queue.onMessage(async (ctx) => {
1236+
if (ctx.message.payload.value === "msg-0") {
1237+
firstStarted = true;
1238+
// Block this message to saturate concurrency
1239+
await firstBlocking;
1240+
}
1241+
processed.push(ctx.message.payload.value);
1242+
await ctx.complete();
1243+
});
1244+
1245+
// Enqueue 3 messages to same tenant
1246+
for (let i = 0; i < 3; i++) {
1247+
await queue.enqueue({
1248+
queueId: "tenant:t1:queue:q1",
1249+
tenantId: "t1",
1250+
payload: { value: `msg-${i}` },
1251+
});
1252+
}
1253+
1254+
queue.start();
1255+
1256+
// Wait for first message to start processing (blocking the concurrency slot)
1257+
await vi.waitFor(
1258+
() => {
1259+
expect(firstStarted).toBe(true);
1260+
},
1261+
{ timeout: 5000 }
1262+
);
1263+
1264+
// Release the first message so others can proceed
1265+
releaseFirst!();
1266+
1267+
// All 3 messages should process within a reasonable time.
1268+
// If cooloff was incorrectly triggered, this would take 5+ seconds.
1269+
const startTime = Date.now();
1270+
await vi.waitFor(
1271+
() => {
1272+
expect(processed).toHaveLength(3);
1273+
},
1274+
{ timeout: 5000 }
1275+
);
1276+
const elapsed = Date.now() - startTime;
1277+
1278+
// Should complete well under the 5s cooloff period
1279+
expect(elapsed).toBeLessThan(3000);
1280+
1281+
// Cooloff states should be empty (no spurious cooloffs)
1282+
const cacheSizes = queue.fairQueue.getCacheSizes();
1283+
expect(cacheSizes.cooloffStatesSize).toBe(0);
1284+
1285+
await queue.close();
1286+
}
1287+
);
1288+
});
1289+
1290+
describe("global rate limiter should be non-blocking", () => {
1291+
redisTest(
1292+
"should not block consumer when rate limit is hit",
1293+
{ timeout: 15000 },
1294+
async ({ redisOptions }) => {
1295+
const processed: string[] = [];
1296+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
1297+
1298+
const scheduler = new DRRScheduler({
1299+
redis: redisOptions,
1300+
keys,
1301+
quantum: 10,
1302+
maxDeficit: 100,
1303+
});
1304+
1305+
// Track how many times limit() was called and control when it allows
1306+
let limitCallCount = 0;
1307+
let allowAfter = 3; // Block first 3 calls, then allow
1308+
1309+
const mockRateLimiter: GlobalRateLimiter = {
1310+
limit: async () => {
1311+
limitCallCount++;
1312+
if (limitCallCount <= allowAfter) {
1313+
return { allowed: false, resetAt: Date.now() + 1000 };
1314+
}
1315+
return { allowed: true };
1316+
},
1317+
};
1318+
1319+
const queue = new TestFairQueueHelper(redisOptions, keys, {
1320+
scheduler,
1321+
payloadSchema: TestPayloadSchema,
1322+
shardCount: 1,
1323+
consumerCount: 1,
1324+
consumerIntervalMs: 20,
1325+
visibilityTimeoutMs: 5000,
1326+
globalRateLimiter: mockRateLimiter,
1327+
startConsumers: false,
1328+
});
1329+
1330+
queue.onMessage(async (ctx) => {
1331+
processed.push(ctx.message.payload.value);
1332+
await ctx.complete();
1333+
});
1334+
1335+
await queue.enqueue({
1336+
queueId: "tenant:t1:queue:q1",
1337+
tenantId: "t1",
1338+
payload: { value: "msg-1" },
1339+
});
1340+
1341+
const startTime = Date.now();
1342+
queue.start();
1343+
1344+
// Message should be processed quickly despite rate limiter denials.
1345+
// Old behavior: each denial would sleep ~1s, so 3 denials = ~3s.
1346+
// New behavior: denials return immediately, retry on next loop tick.
1347+
await vi.waitFor(
1348+
() => {
1349+
expect(processed).toHaveLength(1);
1350+
},
1351+
{ timeout: 5000 }
1352+
);
1353+
1354+
const elapsed = Date.now() - startTime;
1355+
1356+
// Should complete well under 1s (old behavior would take ~3s)
1357+
expect(elapsed).toBeLessThan(1000);
1358+
expect(processed).toContain("msg-1");
1359+
1360+
await queue.close();
1361+
}
1362+
);
1363+
});
11851364
});

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ export class VisibilityManager {
284284
queueKey: string,
285285
queueItemsKey: string,
286286
masterQueueKey: string,
287-
score?: number
287+
score?: number,
288+
updatedData?: string
288289
): Promise<void> {
289290
const shardId = this.#getShardForQueue(queueId);
290291
const inflightKey = this.keys.inflightKey(shardId);
@@ -293,7 +294,7 @@ export class VisibilityManager {
293294
const messageScore = score ?? Date.now();
294295

295296
// Use Lua script to atomically:
296-
// 1. Get message data from in-flight
297+
// 1. Get message data from in-flight (or use updatedData if provided)
297298
// 2. Remove from in-flight
298299
// 3. Add back to queue
299300
// 4. Update master queue to ensure queue is picked up
@@ -306,7 +307,8 @@ export class VisibilityManager {
306307
member,
307308
messageId,
308309
messageScore.toString(),
309-
queueId
310+
queueId,
311+
updatedData ?? ""
310312
);
311313

312314
this.logger.debug("Message released", {
@@ -434,7 +436,8 @@ export class VisibilityManager {
434436
member,
435437
messageId,
436438
score.toString(),
437-
queueId
439+
queueId,
440+
""
438441
);
439442

440443
// Track reclaimed message for concurrency release
@@ -680,6 +683,7 @@ local member = ARGV[1]
680683
local messageId = ARGV[2]
681684
local score = tonumber(ARGV[3])
682685
local queueId = ARGV[4]
686+
local updatedData = ARGV[5]
683687
684688
-- Get message data from in-flight
685689
local payload = redis.call('HGET', inflightDataKey, messageId)
@@ -688,6 +692,12 @@ if not payload then
688692
return 0
689693
end
690694
695+
-- Use updatedData if provided (e.g. incremented attempt count for retries),
696+
-- otherwise use the original in-flight data
697+
if updatedData and updatedData ~= "" then
698+
payload = updatedData
699+
end
700+
691701
-- Remove from in-flight
692702
redis.call('ZREM', inflightKey, member)
693703
redis.call('HDEL', inflightDataKey, messageId)
@@ -816,7 +826,8 @@ declare module "@internal/redis" {
816826
member: string,
817827
messageId: string,
818828
score: string,
819-
queueId: string
829+
queueId: string,
830+
updatedData: string
820831
): Promise<number>;
821832

822833
releaseMessageBatch(

0 commit comments

Comments
 (0)