diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index f351fa0cc7..c57825d318 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -208,49 +208,89 @@ describe("TaskRunEventStreamSender", () => { expect(lastCall[0]).not.toContain("/api/projects/"); }); - it("closes the active ingest upload after each drained batch on the proxy path", async () => { + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; - let contentUploads = 0; + let activeStreamClosed = false; const fetchMock = vi.fn( async (_url: string | URL | Request, init?: RequestInit) => { if (!init?.body || typeof init.body === "string") { - return responseForBody(await readRequestBody(init)); + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); } - // Resolves only once the sender closes the upload body. const body = await readRequestBody(init); - contentUploads += 1; + activeStreamClosed = true; requestBodies.push(body); return responseForBody(body); }, ); vi.stubGlobal("fetch", fetchMock); - const sender = createSender({ - flushDelayMs: 0, - eventIngestBaseUrl: "http://agent-proxy:8003/", - }); + const sender = createSender({ flushDelayMs: 0 }); - // A buffering ingress only forwards the request body when the upload - // closes, so each drained batch must ride its own promptly-closed upload - // rather than one long-lived request held open until stop. sender.enqueue({ type: "notification", notification: { method: "first" } }); - await waitFor(() => contentUploads === 1); - expect(eventSequences(requestBodies[0])).toEqual([1]); - expect(completionSequences(requestBodies[0])).toEqual([]); + await waitFor(() => fetchMock.mock.calls.length === 2); + expect(activeStreamClosed).toBe(false); sender.enqueue({ type: "notification", notification: { method: "second" }, }); - await waitFor(() => contentUploads === 2); - expect(eventSequences(requestBodies[1])).toEqual([2]); - expect(completionSequences(requestBodies[1])).toEqual([]); + await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(false); await sender.stop(); - const finalBody = requestBodies.at(-1) ?? ""; - expect(completionSequences(finalBody)).toEqual([2]); + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(true); + expect(parseLines(requestBodies[1])).toEqual([ + { + seq: 1, + event: { type: "notification", notification: { method: "first" } }, + }, + { + seq: 2, + event: { type: "notification", notification: { method: "second" } }, + }, + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 }, + ]); + }); + + it("closes an idle active ingest request after the stream window elapses", async () => { + const requestBodies: string[] = []; + let activeStreamClosed = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return responseForBody(await readRequestBody(init)); + } + + const body = await readRequestBody(init); + activeStreamClosed = true; + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await waitFor(() => fetchMock.mock.calls.length === 2); + expect(activeStreamClosed).toBe(false); + + await waitFor(() => activeStreamClosed, 200); + expect(eventSequences(requestBodies[0])).toEqual([1]); + expect(completionSequences(requestBodies[0])).toEqual([]); + + await sender.stop(); + + expect(eventSequences(requestBodies[1])).toEqual([]); + expect(completionSequences(requestBodies[1])).toEqual([1]); }); it("aborts a stuck ingest response after closing the request body", async () => { diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 437cb5e98c..5e254ded40 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -69,7 +69,6 @@ export class TaskRunEventStreamSender { private readonly requestTimeoutMs: number; private readonly stopTimeoutMs: number; private readonly streamWindowMs: number; - private readonly usingProxy: boolean; private readonly createStreamingUpload: StreamingUploadFactory; private readonly encoder = new TextEncoder(); private sequence = 0; @@ -113,7 +112,6 @@ export class TaskRunEventStreamSender { config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; - this.usingProxy = usingProxy; this.createStreamingUpload = config.createStreamingUpload ?? createNodeStreamingUpload; } @@ -210,16 +208,6 @@ export class TaskRunEventStreamSender { try { await flushPromise; - // On the proxy path, deliver the drained batch now instead of holding one - // long-lived upload. The ingress in front of the proxy buffers the ingest - // request body and only forwards it once the request closes, so a - // long-lived upload strands events; closing per batch forwards each within - // a round-trip. Gated to the proxy write leg so the Django path keeps its - // existing long-lived upload. The stop path leaves closing to drainForStop - // so the completion line rides the final upload. - if (!this.stopped && this.usingProxy) { - await this.closeActiveStream(); - } return this.bufferedEvents.length < previousBufferLength; } catch (error) { this.config.logger.warn(