Skip to content

Commit 45c335f

Browse files
committed
wip of the streaming batch trigger stuff
1 parent 4654592 commit 45c335f

File tree

20 files changed

+2519
-646
lines changed

20 files changed

+2519
-646
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,10 @@ const EnvironmentSchema = z
537537
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
538538
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
539539

540+
// 2-phase batch API settings
541+
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
542+
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(512_000), // 512KB max per NDJSON line
543+
540544
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
541545
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
542546
REALTIME_STREAM_TTL: z.coerce

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
import { prisma } from "~/db.server";
88
import { env } from "~/env.server";
99
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
10-
import { RunEngineBatchTriggerServiceV2 } from "~/runEngine/services/batchTriggerV2.server";
1110
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
1211
import { logger } from "~/services/logger.server";
1312
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
@@ -21,7 +20,6 @@ import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
2120
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
2221
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
2322
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
24-
import { engine } from "~/v3/runEngine.server";
2523

2624
const { action, loader } = createActionApiRoute(
2725
{
@@ -112,68 +110,8 @@ const { action, loader } = createActionApiRoute(
112110
? { traceparent, tracestate }
113111
: { external: { traceparent, tracestate } };
114112

115-
// Use the new V2 service with DRR-based BatchQueue if enabled, otherwise fall back to V1
116-
const useBatchQueueV2 = engine.isBatchQueueEnabled();
117-
118-
logger.debug("Batch trigger service selection", {
119-
useBatchQueueV2,
120-
batchProcessingStrategy,
121-
});
122-
123-
if (useBatchQueueV2) {
124-
// V2: Uses Redis-based BatchQueue with DRR fair scheduling
125-
const serviceV2 = new RunEngineBatchTriggerServiceV2();
126-
127-
serviceV2.onBatchTaskRunCreated.attachOnce(async (batch) => {
128-
await saveRequestIdempotency(requestIdempotencyKey, "batch-trigger", batch.id);
129-
});
130-
131-
try {
132-
const batch = await serviceV2.call(authentication.environment, body, {
133-
triggerVersion: triggerVersion ?? undefined,
134-
traceContext,
135-
spanParentAsLink: spanParentAsLink === 1,
136-
oneTimeUseToken,
137-
realtimeStreamsVersion: determineRealtimeStreamsVersion(
138-
realtimeStreamsVersion ?? undefined
139-
),
140-
idempotencyKey: requestIdempotencyKey ?? undefined,
141-
});
142-
143-
const $responseHeaders = await responseHeaders(
144-
batch,
145-
authentication.environment,
146-
triggerClient
147-
);
148-
149-
return json(batch, {
150-
status: 202,
151-
headers: $responseHeaders,
152-
});
153-
} catch (error) {
154-
logger.error("Batch trigger error (V2)", {
155-
error: {
156-
message: (error as Error).message,
157-
stack: (error as Error).stack,
158-
},
159-
});
160-
161-
if (error instanceof ServiceValidationError) {
162-
return json({ error: error.message }, { status: 422 });
163-
} else if (error instanceof OutOfEntitlementError) {
164-
return json({ error: error.message }, { status: 422 });
165-
} else if (error instanceof Error) {
166-
return json(
167-
{ error: error.message },
168-
{ status: 500, headers: { "x-should-retry": "false" } }
169-
);
170-
}
171-
172-
return json({ error: "Something went wrong" }, { status: 500 });
173-
}
174-
}
175-
176-
// V1: Legacy batch trigger service
113+
// Note: SDK v4.1+ uses the 2-phase batch API (POST /api/v3/batches + streaming items)
114+
// This endpoint is for backwards compatibility with older SDK versions
177115
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);
178116

179117
service.onBatchTaskRunCreated.attachOnce(async (batch) => {
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import { json, type ActionFunctionArgs, type LoaderFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { env } from "~/env.server";
4+
import {
5+
StreamBatchItemsService,
6+
createNdjsonParserStream,
7+
streamToAsyncIterable,
8+
} from "~/runEngine/services/streamBatchItems.server";
9+
import { authenticateApiRequestWithFailure } from "~/services/apiAuth.server";
10+
import { logger } from "~/services/logger.server";
11+
import { ServiceValidationError } from "~/v3/services/baseService.server";
12+
import { engine } from "~/v3/runEngine.server";
13+
14+
const ParamsSchema = z.object({
15+
batchId: z.string(),
16+
});
17+
18+
/**
19+
* Phase 2 of 2-phase batch API: Stream batch items.
20+
*
21+
* POST /api/v3/batches/:batchId/items
22+
*
23+
* Accepts an NDJSON stream of batch items and enqueues them to the BatchQueue.
24+
* Each line in the body should be a valid BatchItemNDJSON object.
25+
*
26+
* The stream is processed with backpressure - items are enqueued as they arrive.
27+
* The batch is sealed when the stream completes successfully.
28+
*/
29+
export async function action({ request, params }: ActionFunctionArgs) {
30+
// Validate params
31+
const paramsResult = ParamsSchema.safeParse(params);
32+
if (!paramsResult.success) {
33+
return json({ error: "Invalid batch ID" }, { status: 400 });
34+
}
35+
36+
const { batchId } = paramsResult.data;
37+
38+
// Validate content type
39+
const contentType = request.headers.get("content-type") || "";
40+
if (
41+
!contentType.includes("application/x-ndjson") &&
42+
!contentType.includes("application/ndjson")
43+
) {
44+
return json(
45+
{
46+
error: "Content-Type must be application/x-ndjson or application/ndjson",
47+
},
48+
{ status: 415 }
49+
);
50+
}
51+
52+
// Authenticate the request
53+
const authResult = await authenticateApiRequestWithFailure(request, {
54+
allowPublicKey: true,
55+
});
56+
57+
if (!authResult.ok) {
58+
return json({ error: authResult.error }, { status: 401 });
59+
}
60+
61+
// Verify BatchQueue is enabled
62+
if (!engine.isBatchQueueEnabled()) {
63+
return json(
64+
{
65+
error: "Streaming batch API is not available. BatchQueue is not enabled.",
66+
},
67+
{ status: 503 }
68+
);
69+
}
70+
71+
// Get the request body stream
72+
const body = request.body;
73+
if (!body) {
74+
return json({ error: "Request body is required" }, { status: 400 });
75+
}
76+
77+
logger.debug("Stream batch items request", {
78+
batchId,
79+
contentType,
80+
envId: authResult.environment.id,
81+
});
82+
83+
try {
84+
// Create NDJSON parser transform stream
85+
const parser = createNdjsonParserStream(env.STREAMING_BATCH_ITEM_MAXIMUM_SIZE);
86+
87+
// Pipe the request body through the parser
88+
const parsedStream = body.pipeThrough(parser);
89+
90+
// Convert to async iterable for the service
91+
const itemsIterator = streamToAsyncIterable(parsedStream);
92+
93+
// Process the stream
94+
const service = new StreamBatchItemsService();
95+
const result = await service.call(authResult.environment, batchId, itemsIterator, {
96+
maxItemBytes: env.STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
97+
});
98+
99+
return json(result, { status: 200 });
100+
} catch (error) {
101+
logger.error("Stream batch items error", {
102+
batchId,
103+
error: {
104+
message: (error as Error).message,
105+
stack: (error as Error).stack,
106+
},
107+
});
108+
109+
if (error instanceof ServiceValidationError) {
110+
return json({ error: error.message }, { status: 422 });
111+
} else if (error instanceof Error) {
112+
// Check for stream parsing errors
113+
if (
114+
error.message.includes("Invalid JSON") ||
115+
error.message.includes("exceeds maximum size")
116+
) {
117+
return json({ error: error.message }, { status: 400 });
118+
}
119+
120+
return json(
121+
{ error: error.message },
122+
{ status: 500, headers: { "x-should-retry": "false" } }
123+
);
124+
}
125+
126+
return json({ error: "Something went wrong" }, { status: 500 });
127+
}
128+
}
129+
130+
export async function loader({ request }: LoaderFunctionArgs) {
131+
// Return 405 for GET requests - only POST is allowed
132+
return json(
133+
{
134+
error: "Method not allowed. Use POST to stream batch items.",
135+
},
136+
{ status: 405 }
137+
);
138+
}

0 commit comments

Comments
 (0)