88 redisConfigMock ,
99 redisConfigMockFns ,
1010} from '@sim/testing'
11- import { sleep } from '@sim/utils/helpers'
1211import { afterEach , beforeEach , describe , expect , it , vi } from 'vitest'
1312
1413type MockProc = EventEmitter & {
@@ -83,6 +82,54 @@ function createReadyProcWithDelay(delayMs: number): MockProc {
8382 return proc
8483}
8584
85+ type ControllableReadyProc = {
86+ spawn : SpawnFactory
87+ dispatched : Promise < void >
88+ release : ( result ?: unknown ) => void
89+ }
90+
91+ /**
92+ * A ready worker whose execution is held open until {@link ControllableReadyProc.release}
93+ * is called. {@link ControllableReadyProc.dispatched} resolves the moment the worker
94+ * receives its `execute` message — i.e. once the scheduler has counted the execution as
95+ * active — giving tests a deterministic, event-driven signal that the concurrency slot is
96+ * occupied without relying on wall-clock delays. The proc is built inside {@link spawn} so
97+ * its `ready` event is emitted only after the module attaches its listeners.
98+ */
99+ function createControllableReadyProc ( ) : ControllableReadyProc {
100+ let markDispatched : ( ) => void = ( ) => { }
101+ const dispatched = new Promise < void > ( ( resolve ) => {
102+ markDispatched = resolve
103+ } )
104+ let proc : MockProc | undefined
105+ let lastExecutionId = 0
106+
107+ const spawn : SpawnFactory = ( ) => {
108+ const current = createBaseProc ( )
109+ current . send = ( message : unknown ) => {
110+ const msg = message as { type ?: string ; executionId ?: number }
111+ if ( msg . type === 'execute' ) {
112+ lastExecutionId = msg . executionId ?? 0
113+ markDispatched ( )
114+ }
115+ return true
116+ }
117+ setImmediate ( ( ) => current . emit ( 'message' , { type : 'ready' } ) )
118+ proc = current
119+ return current
120+ }
121+
122+ const release = ( result : unknown = 'released' ) => {
123+ proc ?. emit ( 'message' , {
124+ type : 'result' ,
125+ executionId : lastExecutionId ,
126+ result : { result, stdout : '' } ,
127+ } )
128+ }
129+
130+ return { spawn, dispatched, release }
131+ }
132+
86133function createReadyFetchProxyProc ( fetchMessage : { url : string ; optionsJson ?: string } ) : MockProc {
87134 const proc = createBaseProc ( )
88135 let currentExecutionId = 0
@@ -256,15 +303,31 @@ describe('isolated-vm scheduler', () => {
256303 } )
257304
258305 it ( 'rejects new requests when the queue is full' , async ( ) => {
306+ const holder = createControllableReadyProc ( )
259307 const { executeInIsolatedVM } = await loadExecutionModule ( {
260308 envOverrides : {
309+ IVM_MAX_CONCURRENT : '1' ,
261310 IVM_MAX_QUEUE_SIZE : '1' ,
262- IVM_QUEUE_TIMEOUT_MS : '200 ' ,
311+ IVM_QUEUE_TIMEOUT_MS : '50 ' ,
263312 } ,
264- spawns : [ createStartupFailureProc , createStartupFailureProc , createStartupFailureProc ] ,
313+ spawns : [ holder . spawn ] ,
265314 } )
266315
267- const firstPromise = executeInIsolatedVM ( {
316+ // Occupy the single global concurrency slot with an active execution. Awaiting
317+ // `dispatched` is event-driven, so the queue state below is fully deterministic.
318+ const holderPromise = executeInIsolatedVM ( {
319+ code : 'return "holder"' ,
320+ params : { } ,
321+ envVars : { } ,
322+ contextVariables : { } ,
323+ timeoutMs : 1000 ,
324+ requestId : 'req-holder' ,
325+ ownerKey : 'user:holder' ,
326+ } )
327+ await holder . dispatched
328+
329+ // With the slot held, this request lands in the queue (size 1, now full)...
330+ const queuedPromise = executeInIsolatedVM ( {
268331 code : 'return 1' ,
269332 params : { } ,
270333 envVars : { } ,
@@ -274,9 +337,8 @@ describe('isolated-vm scheduler', () => {
274337 ownerKey : 'user:a' ,
275338 } )
276339
277- await sleep ( 1 )
278-
279- const second = await executeInIsolatedVM ( {
340+ // ...and this one overflows it and is rejected immediately.
341+ const rejected = await executeInIsolatedVM ( {
280342 code : 'return 2' ,
281343 params : { } ,
282344 envVars : { } ,
@@ -286,22 +348,41 @@ describe('isolated-vm scheduler', () => {
286348 ownerKey : 'user:b' ,
287349 } )
288350
289- expect ( second . error ?. message ) . toContain ( 'at capacity' )
351+ expect ( rejected . error ?. message ) . toContain ( 'at capacity' )
352+
353+ const queued = await queuedPromise
354+ expect ( queued . error ?. message ) . toContain ( 'timed out waiting' )
290355
291- const first = await firstPromise
292- expect ( first . error ?. message ) . toContain ( 'timed out waiting' )
356+ holder . release ( )
357+ await holderPromise
293358 } )
294359
295360 it ( 'enforces per-owner queued limit' , async ( ) => {
361+ const holder = createControllableReadyProc ( )
296362 const { executeInIsolatedVM } = await loadExecutionModule ( {
297363 envOverrides : {
364+ IVM_MAX_CONCURRENT : '1' ,
298365 IVM_MAX_QUEUED_PER_OWNER : '1' ,
299- IVM_QUEUE_TIMEOUT_MS : '200 ' ,
366+ IVM_QUEUE_TIMEOUT_MS : '50 ' ,
300367 } ,
301- spawns : [ createStartupFailureProc , createStartupFailureProc , createStartupFailureProc ] ,
368+ spawns : [ holder . spawn ] ,
369+ } )
370+
371+ // Hold the single global slot with one of the owner's executions so the next
372+ // requests deterministically queue instead of dispatching.
373+ const holderPromise = executeInIsolatedVM ( {
374+ code : 'return "holder"' ,
375+ params : { } ,
376+ envVars : { } ,
377+ contextVariables : { } ,
378+ timeoutMs : 1000 ,
379+ requestId : 'req-holder' ,
380+ ownerKey : 'user:hog' ,
302381 } )
382+ await holder . dispatched
303383
304- const firstPromise = executeInIsolatedVM ( {
384+ // First queued request for the owner fills their per-owner queue allowance...
385+ const queuedPromise = executeInIsolatedVM ( {
305386 code : 'return 1' ,
306387 params : { } ,
307388 envVars : { } ,
@@ -311,9 +392,8 @@ describe('isolated-vm scheduler', () => {
311392 ownerKey : 'user:hog' ,
312393 } )
313394
314- await sleep ( 1 )
315-
316- const second = await executeInIsolatedVM ( {
395+ // ...so the second is rejected for exceeding the per-owner queued limit.
396+ const rejected = await executeInIsolatedVM ( {
317397 code : 'return 2' ,
318398 params : { } ,
319399 envVars : { } ,
@@ -323,10 +403,13 @@ describe('isolated-vm scheduler', () => {
323403 ownerKey : 'user:hog' ,
324404 } )
325405
326- expect ( second . error ?. message ) . toContain ( 'Too many concurrent' )
406+ expect ( rejected . error ?. message ) . toContain ( 'Too many concurrent' )
407+
408+ const queued = await queuedPromise
409+ expect ( queued . error ?. message ) . toContain ( 'timed out waiting' )
327410
328- const first = await firstPromise
329- expect ( first . error ?. message ) . toContain ( 'timed out waiting' )
411+ holder . release ( )
412+ await holderPromise
330413 } )
331414
332415 it ( 'enforces distributed owner in-flight lease limit when Redis is configured' , async ( ) => {
0 commit comments