ndJsonStream's receive loop re-scans and re-allocates the entire buffered content on every incoming chunk, so a single large message costs O(message_size² / chunk_size) in string work and blocks the event loop for tens to hundreds of milliseconds.
Description
The receive side of ndJsonStream accumulates undelivered bytes in a string and, on every chunk, splits the whole accumulated content again:
|
content += textDecoder.decode(value, { stream: true }); |
|
const lines = content.split("\n"); |
|
content = lines.pop() || ""; |
content += textDecoder.decode(value, { stream: true });
const lines = content.split("\n");
content = lines.pop() || "";
While a message is still incomplete (no \n seen yet), each iteration performs:
content += ... — copies the whole buffer so far, and
content.split("\n") — scans the whole buffer and allocates a copy of it as the single array element, which lines.pop() then assigns back to content.
For a message of size M arriving in chunks of size c, that is ~M/c iterations touching ~M/2 bytes each: O(M²/c) total. A 5 MB message delivered in 64 KiB chunks (typical stdio pipe reads) does ~80 iterations over an average of ~2.5 MB ≈ ~200 MB of string scanning/copying — all on the event loop of whatever process hosts the connection. Large messages are routine for agents: session/update notifications carrying big tool results, embedded resource contents, base64 payloads, etc.
Expected Behavior: receiving an M-byte message costs O(M) regardless of how many chunks it arrives in; per-chunk work is proportional to the chunk, not to everything buffered so far.
Actual Behavior: per-chunk work is proportional to the total buffered size, so cost grows quadratically with message size and the event loop stalls while a large message is being received.
Benchmark
One NDJSON-framed message fed through stream.readable in 64 KiB chunks, timed until parsed (median of 7 runs, Node v24.12.0, Apple Silicon, sdk 0.14.1):
| Message size |
SDK ndJsonStream |
incremental byte scan |
speedup |
| 1 MB (17 chunks) |
3.6 ms |
1.2 ms |
2.9x |
| 5 MB (81 chunks) |
40.0 ms |
6.0 ms |
6.6x |
| 10 MB (161 chunks) |
152.1 ms |
12.4 ms |
12.3x |
The SDK column grows ~quadratically (5→10 MB: 2x size, ~3.8x time); the incremental column grows linearly.
benchmark script (node bench.mjs)
// npm i @agentclientprotocol/sdk@0.14.1
import { ndJsonStream as sdkNdJsonStream } from "@agentclientprotocol/sdk";
// Incremental replacement: scan only the new chunk for 0x0A, keep the
// incomplete tail as byte segments, decode once per complete line.
function fixedNdJsonStream(output, input) {
const textDecoder = new TextDecoder();
const readable = new ReadableStream({
async start(controller) {
const pending = [];
const reader = input.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (!value) continue;
let start = 0;
let newline = value.indexOf(0x0a, start);
while (newline !== -1) {
const segment = value.subarray(start, newline);
let lineBytes = segment;
if (pending.length > 0) {
const total =
pending.reduce((n, p) => n + p.byteLength, 0) +
segment.byteLength;
lineBytes = new Uint8Array(total);
let offset = 0;
for (const part of pending) {
lineBytes.set(part, offset);
offset += part.byteLength;
}
lineBytes.set(segment, offset);
pending.length = 0;
}
const trimmed = textDecoder.decode(lineBytes).trim();
if (trimmed) {
try {
controller.enqueue(JSON.parse(trimmed));
} catch (err) {
console.error("Failed to parse JSON message:", err);
}
}
start = newline + 1;
newline = value.indexOf(0x0a, start);
}
if (start < value.length) pending.push(value.subarray(start));
}
} finally {
reader.releaseLock();
controller.close();
}
},
});
return { readable, writable: new WritableStream({ write() {} }) };
}
const CHUNK_SIZE = 64 * 1024;
function buildChunks(targetBytes) {
const message = {
jsonrpc: "2.0",
method: "session/update",
params: { data: "a".repeat(targetBytes) },
};
const bytes = new TextEncoder().encode(JSON.stringify(message) + "\n");
const chunks = [];
for (let i = 0; i < bytes.length; i += CHUNK_SIZE) {
chunks.push(bytes.subarray(i, Math.min(i + CHUNK_SIZE, bytes.length)));
}
return chunks;
}
function chunkStream(chunks) {
let i = 0;
return new ReadableStream({
pull(controller) {
if (i < chunks.length) controller.enqueue(chunks[i++]);
else controller.close();
},
});
}
async function benchOnce(makeStream, chunks) {
const t0 = performance.now();
const stream = makeStream(
new WritableStream({ write() {} }),
chunkStream(chunks),
);
const reader = stream.readable.getReader();
const { value } = await reader.read();
const elapsed = performance.now() - t0;
if (!value) throw new Error("message did not round-trip");
await reader.read();
return elapsed;
}
function median(xs) {
const s = [...xs].sort((a, b) => a - b);
return s[Math.floor(s.length / 2)];
}
for (const mb of [1, 5, 10]) {
const chunks = buildChunks(mb * 1024 * 1024);
for (const [name, impl] of [
["sdk", sdkNdJsonStream],
["incremental", fixedNdJsonStream],
]) {
await benchOnce(impl, chunks); // warmup
const times = [];
for (let i = 0; i < 7; i++) times.push(await benchOnce(impl, chunks));
console.log(`${mb} MB ${name}: ${median(times).toFixed(1)} ms`);
}
}
Suggested fix
Only scan bytes that arrived in the current chunk: either track a "scanned up to" offset into the accumulated buffer, or search the raw chunk for 0x0A with indexOf and keep incomplete tails as byte segments, decoding once per complete line. qwen-code now ships a drop-in replacement using the byte-level approach (it also avoids per-message getWriter()/releaseLock() churn on the send side): QwenLM/qwen-code#6263, implementation in packages/acp-bridge/src/ndJsonStream.ts — happy to adapt it into a PR here if you're open to it.
ACP Version
@agentclientprotocol/sdk 0.14.1 (receive loop unchanged on current main, permalink above)
ndJsonStream's receive loop re-scans and re-allocates the entire buffered content on every incoming chunk, so a single large message costs O(message_size² / chunk_size) in string work and blocks the event loop for tens to hundreds of milliseconds.Description
The receive side of
ndJsonStreamaccumulates undelivered bytes in a string and, on every chunk, splits the whole accumulated content again:typescript-sdk/src/stream.ts
Lines 59 to 61 in 87e2df3
While a message is still incomplete (no
\nseen yet), each iteration performs:content += ...— copies the whole buffer so far, andcontent.split("\n")— scans the whole buffer and allocates a copy of it as the single array element, whichlines.pop()then assigns back tocontent.For a message of size M arriving in chunks of size c, that is ~M/c iterations touching ~M/2 bytes each: O(M²/c) total. A 5 MB message delivered in 64 KiB chunks (typical stdio pipe reads) does ~80 iterations over an average of ~2.5 MB ≈ ~200 MB of string scanning/copying — all on the event loop of whatever process hosts the connection. Large messages are routine for agents:
session/updatenotifications carrying big tool results, embedded resource contents, base64 payloads, etc.Expected Behavior: receiving an M-byte message costs O(M) regardless of how many chunks it arrives in; per-chunk work is proportional to the chunk, not to everything buffered so far.
Actual Behavior: per-chunk work is proportional to the total buffered size, so cost grows quadratically with message size and the event loop stalls while a large message is being received.
Benchmark
One NDJSON-framed message fed through
stream.readablein 64 KiB chunks, timed until parsed (median of 7 runs, Node v24.12.0, Apple Silicon, sdk 0.14.1):ndJsonStreamThe SDK column grows ~quadratically (5→10 MB: 2x size, ~3.8x time); the incremental column grows linearly.
benchmark script (node bench.mjs)
Suggested fix
Only scan bytes that arrived in the current chunk: either track a "scanned up to" offset into the accumulated buffer, or search the raw chunk for
0x0AwithindexOfand keep incomplete tails as byte segments, decoding once per complete line. qwen-code now ships a drop-in replacement using the byte-level approach (it also avoids per-messagegetWriter()/releaseLock()churn on the send side): QwenLM/qwen-code#6263, implementation inpackages/acp-bridge/src/ndJsonStream.ts— happy to adapt it into a PR here if you're open to it.ACP Version
@agentclientprotocol/sdk 0.14.1 (receive loop unchanged on current
main, permalink above)