Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions packages/agent/src/server/event-stream-sender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,49 +208,89 @@ describe("TaskRunEventStreamSender", () => {
expect(lastCall[0]).not.toContain("/api/projects/");
});

it("closes the active ingest upload after each drained batch on the proxy path", async () => {
it("keeps the active ingest request open across scheduled flushes", async () => {
const requestBodies: string[] = [];
let contentUploads = 0;
let activeStreamClosed = false;
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
if (!init?.body || typeof init.body === "string") {
return responseForBody(await readRequestBody(init));
const body = await readRequestBody(init);
requestBodies.push(body);
return responseForBody(body);
}

// Resolves only once the sender closes the upload body.
const body = await readRequestBody(init);
contentUploads += 1;
activeStreamClosed = true;
requestBodies.push(body);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({
flushDelayMs: 0,
eventIngestBaseUrl: "http://agent-proxy:8003/",
});
const sender = createSender({ flushDelayMs: 0 });

// A buffering ingress only forwards the request body when the upload
// closes, so each drained batch must ride its own promptly-closed upload
// rather than one long-lived request held open until stop.
sender.enqueue({ type: "notification", notification: { method: "first" } });
await waitFor(() => contentUploads === 1);
expect(eventSequences(requestBodies[0])).toEqual([1]);
expect(completionSequences(requestBodies[0])).toEqual([]);
await waitFor(() => fetchMock.mock.calls.length === 2);
expect(activeStreamClosed).toBe(false);

sender.enqueue({
type: "notification",
notification: { method: "second" },
});
await waitFor(() => contentUploads === 2);
expect(eventSequences(requestBodies[1])).toEqual([2]);
expect(completionSequences(requestBodies[1])).toEqual([]);
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));

expect(fetchMock).toHaveBeenCalledTimes(2);
expect(activeStreamClosed).toBe(false);

await sender.stop();

const finalBody = requestBodies.at(-1) ?? "";
expect(completionSequences(finalBody)).toEqual([2]);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(activeStreamClosed).toBe(true);
expect(parseLines(requestBodies[1])).toEqual([
{
seq: 1,
event: { type: "notification", notification: { method: "first" } },
},
{
seq: 2,
event: { type: "notification", notification: { method: "second" } },
},
{ type: STREAM_COMPLETE_CONTROL_TYPE, final_seq: 2 },
]);
});

it("closes an idle active ingest request after the stream window elapses", async () => {
const requestBodies: string[] = [];
let activeStreamClosed = false;
const fetchMock = vi.fn(
async (_url: string | URL | Request, init?: RequestInit) => {
if (!init?.body || typeof init.body === "string") {
return responseForBody(await readRequestBody(init));
}

const body = await readRequestBody(init);
activeStreamClosed = true;
requestBodies.push(body);
return responseForBody(body);
},
);
vi.stubGlobal("fetch", fetchMock);

const sender = createSender({ flushDelayMs: 0, streamWindowMs: 5 });

sender.enqueue({ type: "notification", notification: { method: "first" } });
await waitFor(() => fetchMock.mock.calls.length === 2);
expect(activeStreamClosed).toBe(false);

await waitFor(() => activeStreamClosed, 200);
expect(eventSequences(requestBodies[0])).toEqual([1]);
expect(completionSequences(requestBodies[0])).toEqual([]);

await sender.stop();

expect(eventSequences(requestBodies[1])).toEqual([]);
expect(completionSequences(requestBodies[1])).toEqual([1]);
});

it("aborts a stuck ingest response after closing the request body", async () => {
Expand Down
12 changes: 0 additions & 12 deletions packages/agent/src/server/event-stream-sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ export class TaskRunEventStreamSender {
private readonly requestTimeoutMs: number;
private readonly stopTimeoutMs: number;
private readonly streamWindowMs: number;
private readonly usingProxy: boolean;
private readonly createStreamingUpload: StreamingUploadFactory;
private readonly encoder = new TextEncoder();
private sequence = 0;
Expand Down Expand Up @@ -113,7 +112,6 @@ export class TaskRunEventStreamSender {
config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS;
this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS;
this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS;
this.usingProxy = usingProxy;
this.createStreamingUpload =
config.createStreamingUpload ?? createNodeStreamingUpload;
}
Expand Down Expand Up @@ -210,16 +208,6 @@ export class TaskRunEventStreamSender {

try {
await flushPromise;
// On the proxy path, deliver the drained batch now instead of holding one
// long-lived upload. The ingress in front of the proxy buffers the ingest
// request body and only forwards it once the request closes, so a
// long-lived upload strands events; closing per batch forwards each within
// a round-trip. Gated to the proxy write leg so the Django path keeps its
// existing long-lived upload. The stop path leaves closing to drainForStop
// so the completion line rides the final upload.
if (!this.stopped && this.usingProxy) {
await this.closeActiveStream();
}
return this.bufferedEvents.length < previousBufferLength;
} catch (error) {
this.config.logger.warn(
Expand Down
Loading