From 7ffdcd7d29e7c9675ad0e8d59cdf699398d6669f Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 20:16:15 -0700 Subject: [PATCH 1/2] Revert "fix(agent): close proxy ingest upload per batch so events stream live (#3043)" This reverts commit 06f1d07a346de0bd51f03283c4507a668a945773. --- .../src/server/event-stream-sender.test.ts | 123 +++++++++++++++--- .../agent/src/server/event-stream-sender.ts | 17 +-- 2 files changed, 107 insertions(+), 33 deletions(-) diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index f351fa0cc7..37dfd15755 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -208,49 +208,132 @@ describe("TaskRunEventStreamSender", () => { expect(lastCall[0]).not.toContain("/api/projects/"); }); - it("closes the active ingest upload after each drained batch on the proxy path", async () => { + it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => { + vi.useFakeTimers(); + try { + const requestBodies: string[] = []; + let activeStreamClosed = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return responseForBody(await readRequestBody(init)); + } + + const body = await readRequestBody(init); + activeStreamClosed = true; + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ + eventIngestBaseUrl: "http://agent-proxy:8003/", + }); + + sender.enqueue({ + type: "notification", + notification: { method: "first" }, + }); + await vi.advanceTimersByTimeAsync(0); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(false); + + await vi.advanceTimersByTimeAsync(1_000); + + expect(activeStreamClosed).toBe(true); + expect(eventSequences(requestBodies[0])).toEqual([1]); + + await sender.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; - let contentUploads = 0; + let activeStreamClosed = false; const fetchMock = vi.fn( async (_url: string | URL | Request, init?: RequestInit) => { if (!init?.body || typeof init.body === "string") { - return responseForBody(await readRequestBody(init)); + const body = await readRequestBody(init); + requestBodies.push(body); + return responseForBody(body); } - // Resolves only once the sender closes the upload body. const body = await readRequestBody(init); - contentUploads += 1; + activeStreamClosed = true; requestBodies.push(body); return responseForBody(body); }, ); vi.stubGlobal("fetch", fetchMock); - const sender = createSender({ - flushDelayMs: 0, - eventIngestBaseUrl: "http://agent-proxy:8003/", - }); + const sender = createSender({ flushDelayMs: 0 }); - // A buffering ingress only forwards the request body when the upload - // closes, so each drained batch must ride its own promptly-closed upload - // rather than one long-lived request held open until stop. sender.enqueue({ type: "notification", notification: { method: "first" } }); - await waitFor(() => contentUploads === 1); - expect(eventSequences(requestBodies[0])).toEqual([1]); - expect(completionSequences(requestBodies[0])).toEqual([]); + await waitFor(() => fetchMock.mock.calls.length === 2); + expect(activeStreamClosed).toBe(false); sender.enqueue({ type: "notification", notification: { method: "second" }, }); - await waitFor(() => contentUploads === 2); - expect(eventSequences(requestBodies[1])).toEqual([2]); - expect(completionSequences(requestBodies[1])).toEqual([]); + await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(false); + + await sender.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(activeStreamClosed).toBe(true); + expect(parseLines(requestBodies[1])).toEqual([ + { + seq: 1, + event: { type: "notification", notification: { method: "first" } }, + }, + { + seq: 2, + event: { type: "notification", notification: { method: "second" } }, + }, + { type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 }, + ]); + }); + + it("closes an idle active ingest request after the stream window elapses", async () => { + const requestBodies: string[] = []; + let activeStreamClosed = false; + const fetchMock = vi.fn( + async (_url: string | URL | Request, init?: RequestInit) => { + if (!init?.body || typeof init.body === "string") { + return responseForBody(await readRequestBody(init)); + } + + const body = await readRequestBody(init); + activeStreamClosed = true; + requestBodies.push(body); + return responseForBody(body); + }, + ); + vi.stubGlobal("fetch", fetchMock); + + const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 }); + + sender.enqueue({ type: "notification", notification: { method: "first" } }); + await waitFor(() => fetchMock.mock.calls.length === 2); + expect(activeStreamClosed).toBe(false); + + await waitFor(() => activeStreamClosed, 200); + expect(eventSequences(requestBodies[0])).toEqual([1]); + expect(completionSequences(requestBodies[0])).toEqual([]); await sender.stop(); - const finalBody = requestBodies.at(-1) ?? ""; - expect(completionSequences(finalBody)).toEqual([2]); + expect(eventSequences(requestBodies[1])).toEqual([]); + expect(completionSequences(requestBodies[1])).toEqual([1]); }); it("aborts a stuck ingest response after closing the request body", async () => { diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 437cb5e98c..83985de026 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -56,6 +56,7 @@ const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_STOP_TIMEOUT_MS = 30_000; const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000; +const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000; const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; export class TaskRunEventStreamSender { @@ -69,7 +70,6 @@ export class TaskRunEventStreamSender { private readonly requestTimeoutMs: number; private readonly stopTimeoutMs: number; private readonly streamWindowMs: number; - private readonly usingProxy: boolean; private readonly createStreamingUpload: StreamingUploadFactory; private readonly encoder = new TextEncoder(); private sequence = 0; @@ -112,8 +112,9 @@ export class TaskRunEventStreamSender { this.requestTimeoutMs = config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; - this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; - this.usingProxy = usingProxy; + this.streamWindowMs = + config.streamWindowMs ?? + (usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS); this.createStreamingUpload = config.createStreamingUpload ?? createNodeStreamingUpload; } @@ -210,16 +211,6 @@ export class TaskRunEventStreamSender { try { await flushPromise; - // On the proxy path, deliver the drained batch now instead of holding one - // long-lived upload. The ingress in front of the proxy buffers the ingest - // request body and only forwards it once the request closes, so a - // long-lived upload strands events; closing per batch forwards each within - // a round-trip. Gated to the proxy write leg so the Django path keeps its - // existing long-lived upload. The stop path leaves closing to drainForStop - // so the completion line rides the final upload. - if (!this.stopped && this.usingProxy) { - await this.closeActiveStream(); - } return this.bufferedEvents.length < previousBufferLength; } catch (error) { this.config.logger.warn( From efe4ff7497f1671abbf7ec1ede52624311af5dac Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 30 Jun 2026 20:16:18 -0700 Subject: [PATCH 2/2] Revert "fix(agent): Flush proxy ingest uploads live (#3042)" This reverts commit b38b7f6c69b4da3d4072284848016e5778b494f7. --- .../src/server/event-stream-sender.test.ts | 43 ------------------- .../agent/src/server/event-stream-sender.ts | 5 +-- 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/packages/agent/src/server/event-stream-sender.test.ts b/packages/agent/src/server/event-stream-sender.test.ts index 37dfd15755..c57825d318 100644 --- a/packages/agent/src/server/event-stream-sender.test.ts +++ b/packages/agent/src/server/event-stream-sender.test.ts @@ -208,49 +208,6 @@ describe("TaskRunEventStreamSender", () => { expect(lastCall[0]).not.toContain("/api/projects/"); }); - it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => { - vi.useFakeTimers(); - try { - const requestBodies: string[] = []; - let activeStreamClosed = false; - const fetchMock = vi.fn( - async (_url: string | URL | Request, init?: RequestInit) => { - if (!init?.body || typeof init.body === "string") { - return responseForBody(await readRequestBody(init)); - } - - const body = await readRequestBody(init); - activeStreamClosed = true; - requestBodies.push(body); - return responseForBody(body); - }, - ); - vi.stubGlobal("fetch", fetchMock); - - const sender = createSender({ - eventIngestBaseUrl: "http://agent-proxy:8003/", - }); - - sender.enqueue({ - type: "notification", - notification: { method: "first" }, - }); - await vi.advanceTimersByTimeAsync(0); - - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(activeStreamClosed).toBe(false); - - await vi.advanceTimersByTimeAsync(1_000); - - expect(activeStreamClosed).toBe(true); - expect(eventSequences(requestBodies[0])).toEqual([1]); - - await sender.stop(); - } finally { - vi.useRealTimers(); - } - }); - it("keeps the active ingest request open across scheduled flushes", async () => { const requestBodies: string[] = []; let activeStreamClosed = false; diff --git a/packages/agent/src/server/event-stream-sender.ts b/packages/agent/src/server/event-stream-sender.ts index 83985de026..5e254ded40 100644 --- a/packages/agent/src/server/event-stream-sender.ts +++ b/packages/agent/src/server/event-stream-sender.ts @@ -56,7 +56,6 @@ const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; const DEFAULT_STOP_TIMEOUT_MS = 30_000; const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000; -const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000; const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete"; export class TaskRunEventStreamSender { @@ -112,9 +111,7 @@ export class TaskRunEventStreamSender { this.requestTimeoutMs = config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS; this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS; - this.streamWindowMs = - config.streamWindowMs ?? - (usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS); + this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS; this.createStreamingUpload = config.createStreamingUpload ?? createNodeStreamingUpload; }