77import { prisma } from "~/db.server" ;
88import { env } from "~/env.server" ;
99import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server" ;
10+ import { RunEngineBatchTriggerServiceV2 } from "~/runEngine/services/batchTriggerV2.server" ;
1011import { AuthenticatedEnvironment , getOneTimeUseToken } from "~/services/apiAuth.server" ;
1112import { logger } from "~/services/logger.server" ;
1213import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server" ;
@@ -20,6 +21,7 @@ import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
2021import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger" ;
2122import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server" ;
2223import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server" ;
24+ import { engine } from "~/v3/runEngine.server" ;
2325
2426const { action, loader } = createActionApiRoute (
2527 {
@@ -110,6 +112,68 @@ const { action, loader } = createActionApiRoute(
110112 ? { traceparent, tracestate }
111113 : { external : { traceparent, tracestate } } ;
112114
115+ // Use the new V2 service with DRR-based BatchQueue if enabled, otherwise fall back to V1
116+ const useBatchQueueV2 = engine . isBatchQueueEnabled ( ) ;
117+
118+ logger . debug ( "Batch trigger service selection" , {
119+ useBatchQueueV2,
120+ batchProcessingStrategy,
121+ } ) ;
122+
123+ if ( useBatchQueueV2 ) {
124+ // V2: Uses Redis-based BatchQueue with DRR fair scheduling
125+ const serviceV2 = new RunEngineBatchTriggerServiceV2 ( ) ;
126+
127+ serviceV2 . onBatchTaskRunCreated . attachOnce ( async ( batch ) => {
128+ await saveRequestIdempotency ( requestIdempotencyKey , "batch-trigger" , batch . id ) ;
129+ } ) ;
130+
131+ try {
132+ const batch = await serviceV2 . call ( authentication . environment , body , {
133+ triggerVersion : triggerVersion ?? undefined ,
134+ traceContext,
135+ spanParentAsLink : spanParentAsLink === 1 ,
136+ oneTimeUseToken,
137+ realtimeStreamsVersion : determineRealtimeStreamsVersion (
138+ realtimeStreamsVersion ?? undefined
139+ ) ,
140+ idempotencyKey : requestIdempotencyKey ?? undefined ,
141+ } ) ;
142+
143+ const $responseHeaders = await responseHeaders (
144+ batch ,
145+ authentication . environment ,
146+ triggerClient
147+ ) ;
148+
149+ return json ( batch , {
150+ status : 202 ,
151+ headers : $responseHeaders ,
152+ } ) ;
153+ } catch ( error ) {
154+ logger . error ( "Batch trigger error (V2)" , {
155+ error : {
156+ message : ( error as Error ) . message ,
157+ stack : ( error as Error ) . stack ,
158+ } ,
159+ } ) ;
160+
161+ if ( error instanceof ServiceValidationError ) {
162+ return json ( { error : error . message } , { status : 422 } ) ;
163+ } else if ( error instanceof OutOfEntitlementError ) {
164+ return json ( { error : error . message } , { status : 422 } ) ;
165+ } else if ( error instanceof Error ) {
166+ return json (
167+ { error : error . message } ,
168+ { status : 500 , headers : { "x-should-retry" : "false" } }
169+ ) ;
170+ }
171+
172+ return json ( { error : "Something went wrong" } , { status : 500 } ) ;
173+ }
174+ }
175+
176+ // V1: Legacy batch trigger service
113177 const service = new RunEngineBatchTriggerService ( batchProcessingStrategy ?? undefined ) ;
114178
115179 service . onBatchTaskRunCreated . attachOnce ( async ( batch ) => {
0 commit comments