Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ const EnvironmentSchema = z
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
BATCH_QUEUE_WORKER_ENABLED: BoolEnv.default(true),
// Number of master queue shards for horizontal scaling
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
// Maximum queues to fetch from master queue per iteration
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ function createRunEngine() {
: undefined,
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
consumerEnabled: env.BATCH_QUEUE_WORKER_ENABLED,
// Default processing concurrency when no specific limit is set
// This is overridden per-batch based on the plan type at batch creation
defaultConcurrency: env.BATCH_CONCURRENCY_LIMIT_DEFAULT,
Expand Down
70 changes: 35 additions & 35 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ export class RunEngine {
id: payload.waitpointId,
output: payload.error
? {
value: payload.error,
isError: true,
}
value: payload.error,
isError: true,
}
: undefined,
});
},
Expand Down Expand Up @@ -329,8 +329,8 @@ export class RunEngine {
});

// Initialize BatchQueue for DRR-based batch processing (if configured)
// Only start consumers if worker is not disabled (same as main worker)
const startConsumers = !options.worker.disabled;
// Only start consumers if consumerDisabled is not set or is false
const startBatchQueueConsumers = options.batchQueue?.consumerEnabled ?? true;

this.batchQueue = new BatchQueue({
redis: {
Expand All @@ -348,7 +348,7 @@ export class RunEngine {
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
globalRateLimiter: options.batchQueue?.globalRateLimiter,
startConsumers,
startConsumers: startBatchQueueConsumers,
tracer: options.tracer,
meter: options.meter,
});
Expand All @@ -357,7 +357,7 @@ export class RunEngine {
consumerCount: options.batchQueue?.consumerCount ?? 2,
drrQuantum: options.batchQueue?.drr?.quantum ?? 5,
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
consumersEnabled: startConsumers,
consumersEnabled: startBatchQueueConsumers,
});

this.runAttemptSystem = new RunAttemptSystem({
Expand Down Expand Up @@ -464,18 +464,18 @@ export class RunEngine {
debounce:
debounce.mode === "trailing"
? {
...debounce,
updateData: {
payload,
payloadType,
metadata,
metadataType,
tags,
maxAttempts,
maxDurationInSeconds,
machine,
},
}
...debounce,
updateData: {
payload,
payloadType,
metadata,
metadataType,
tags,
maxAttempts,
maxDurationInSeconds,
machine,
},
}
: debounce,
tx: prisma,
});
Expand Down Expand Up @@ -574,8 +574,8 @@ export class RunEngine {
tags.length === 0
? undefined
: {
connect: tags,
},
connect: tags,
},
runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name),
oneTimeUseToken,
parentTaskRunId,
Expand All @@ -598,10 +598,10 @@ export class RunEngine {
realtimeStreamsVersion,
debounce: debounce
? {
key: debounce.key,
delay: debounce.delay,
createdAt: new Date(),
}
key: debounce.key,
delay: debounce.delay,
createdAt: new Date(),
}
: undefined,
executionSnapshots: {
create: {
Expand Down Expand Up @@ -1750,17 +1750,17 @@ export class RunEngine {
const error =
latestSnapshot.environmentType === "DEVELOPMENT"
? ({
type: "INTERNAL_ERROR",
code: taskStalledErrorCode,
message: errorMessage,
} satisfies TaskRunInternalError)
type: "INTERNAL_ERROR",
code: taskStalledErrorCode,
message: errorMessage,
} satisfies TaskRunInternalError)
: this.options.treatProductionExecutionStallsAsOOM
? ({
? ({
type: "INTERNAL_ERROR",
code: "TASK_PROCESS_OOM_KILLED",
message: "Run was terminated due to running out of memory",
} satisfies TaskRunInternalError)
: ({
: ({
type: "INTERNAL_ERROR",
code: taskStalledErrorCode,
message: errorMessage,
Expand All @@ -1775,10 +1775,10 @@ export class RunEngine {
error,
retry: shouldRetry
? {
//250ms in the future
timestamp: Date.now() + retryDelay,
delay: retryDelay,
}
//250ms in the future
timestamp: Date.now() + retryDelay,
delay: retryDelay,
}
: undefined,
},
forceRequeue: true,
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export type RunEngineOptions = {
shardCount?: number;
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
workerQueueBlockingTimeoutSeconds?: number;
consumerEnabled?: boolean;
consumerCount?: number;
consumerIntervalMs?: number;
/** Default processing concurrency per environment when no specific limit is set */
Expand Down