Skip to content

ndJsonStream receive path does O(n²) work for messages spanning many chunks #206

Description

@doudouOUC

ndJsonStream's receive loop re-scans and re-allocates the entire buffered content on every incoming chunk, so a single large message costs O(message_size² / chunk_size) in string work and blocks the event loop for tens to hundreds of milliseconds.

Description

The receive side of ndJsonStream accumulates undelivered bytes in a string and, on every chunk, splits the whole accumulated content again:

content += textDecoder.decode(value, { stream: true });
const lines = content.split("\n");
content = lines.pop() || "";

content += textDecoder.decode(value, { stream: true });
const lines = content.split("\n");
content = lines.pop() || "";

While a message is still incomplete (no \n seen yet), each iteration performs:

  • content += ... — copies the whole buffer so far, and
  • content.split("\n") — scans the whole buffer and allocates a copy of it as the single array element, which lines.pop() then assigns back to content.

For a message of size M arriving in chunks of size c, that is ~M/c iterations touching ~M/2 bytes each: O(M²/c) total. A 5 MB message delivered in 64 KiB chunks (typical stdio pipe reads) does ~80 iterations over an average of ~2.5 MB ≈ ~200 MB of string scanning/copying — all on the event loop of whatever process hosts the connection. Large messages are routine for agents: session/update notifications carrying big tool results, embedded resource contents, base64 payloads, etc.

Expected Behavior: receiving an M-byte message costs O(M) regardless of how many chunks it arrives in; per-chunk work is proportional to the chunk, not to everything buffered so far.

Actual Behavior: per-chunk work is proportional to the total buffered size, so cost grows quadratically with message size and the event loop stalls while a large message is being received.

Benchmark

One NDJSON-framed message fed through stream.readable in 64 KiB chunks, timed until parsed (median of 7 runs, Node v24.12.0, Apple Silicon, sdk 0.14.1):

Message size SDK ndJsonStream incremental byte scan speedup
1 MB (17 chunks) 3.6 ms 1.2 ms 2.9x
5 MB (81 chunks) 40.0 ms 6.0 ms 6.6x
10 MB (161 chunks) 152.1 ms 12.4 ms 12.3x

The SDK column grows ~quadratically (5→10 MB: 2x size, ~3.8x time); the incremental column grows linearly.

benchmark script (node bench.mjs)
// npm i @agentclientprotocol/sdk@0.14.1
import { ndJsonStream as sdkNdJsonStream } from "@agentclientprotocol/sdk";

// Incremental replacement: scan only the new chunk for 0x0A, keep the
// incomplete tail as byte segments, decode once per complete line.
function fixedNdJsonStream(output, input) {
  const textDecoder = new TextDecoder();
  const readable = new ReadableStream({
    async start(controller) {
      const pending = [];
      const reader = input.getReader();
      try {
        while (true) {
          const { value, done } = await reader.read();
          if (done) break;
          if (!value) continue;
          let start = 0;
          let newline = value.indexOf(0x0a, start);
          while (newline !== -1) {
            const segment = value.subarray(start, newline);
            let lineBytes = segment;
            if (pending.length > 0) {
              const total =
                pending.reduce((n, p) => n + p.byteLength, 0) +
                segment.byteLength;
              lineBytes = new Uint8Array(total);
              let offset = 0;
              for (const part of pending) {
                lineBytes.set(part, offset);
                offset += part.byteLength;
              }
              lineBytes.set(segment, offset);
              pending.length = 0;
            }
            const trimmed = textDecoder.decode(lineBytes).trim();
            if (trimmed) {
              try {
                controller.enqueue(JSON.parse(trimmed));
              } catch (err) {
                console.error("Failed to parse JSON message:", err);
              }
            }
            start = newline + 1;
            newline = value.indexOf(0x0a, start);
          }
          if (start < value.length) pending.push(value.subarray(start));
        }
      } finally {
        reader.releaseLock();
        controller.close();
      }
    },
  });
  return { readable, writable: new WritableStream({ write() {} }) };
}

const CHUNK_SIZE = 64 * 1024;

function buildChunks(targetBytes) {
  const message = {
    jsonrpc: "2.0",
    method: "session/update",
    params: { data: "a".repeat(targetBytes) },
  };
  const bytes = new TextEncoder().encode(JSON.stringify(message) + "\n");
  const chunks = [];
  for (let i = 0; i < bytes.length; i += CHUNK_SIZE) {
    chunks.push(bytes.subarray(i, Math.min(i + CHUNK_SIZE, bytes.length)));
  }
  return chunks;
}

function chunkStream(chunks) {
  let i = 0;
  return new ReadableStream({
    pull(controller) {
      if (i < chunks.length) controller.enqueue(chunks[i++]);
      else controller.close();
    },
  });
}

async function benchOnce(makeStream, chunks) {
  const t0 = performance.now();
  const stream = makeStream(
    new WritableStream({ write() {} }),
    chunkStream(chunks),
  );
  const reader = stream.readable.getReader();
  const { value } = await reader.read();
  const elapsed = performance.now() - t0;
  if (!value) throw new Error("message did not round-trip");
  await reader.read();
  return elapsed;
}

function median(xs) {
  const s = [...xs].sort((a, b) => a - b);
  return s[Math.floor(s.length / 2)];
}

for (const mb of [1, 5, 10]) {
  const chunks = buildChunks(mb * 1024 * 1024);
  for (const [name, impl] of [
    ["sdk", sdkNdJsonStream],
    ["incremental", fixedNdJsonStream],
  ]) {
    await benchOnce(impl, chunks); // warmup
    const times = [];
    for (let i = 0; i < 7; i++) times.push(await benchOnce(impl, chunks));
    console.log(`${mb} MB ${name}: ${median(times).toFixed(1)} ms`);
  }
}

Suggested fix

Only scan bytes that arrived in the current chunk: either track a "scanned up to" offset into the accumulated buffer, or search the raw chunk for 0x0A with indexOf and keep incomplete tails as byte segments, decoding once per complete line. qwen-code now ships a drop-in replacement using the byte-level approach (it also avoids per-message getWriter()/releaseLock() churn on the send side): QwenLM/qwen-code#6263, implementation in packages/acp-bridge/src/ndJsonStream.ts — happy to adapt it into a PR here if you're open to it.

ACP Version

@agentclientprotocol/sdk 0.14.1 (receive loop unchanged on current main, permalink above)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions