fix: add idle timeout and context cancellation to model stream reads#3210
Conversation
Root cause: the SSE stream-reading loop in handleStream had no mechanism to detect a stalled or half-open upstream connection. If the model gateway established a TCP connection and sent HTTP 200 + headers but then stopped forwarding SSE events (e.g. a wedged upstream to Anthropic, a TCP half-open), the goroutine blocked permanently in bufio.Scanner.Scan(). Because there was no read deadline and no idle-timeout watchdog, SIGTERM only worked if Go's HTTP transport closed the connection on its own — which required the transport's readLoop goroutine to run and detect ctx.Done(), a race that could be lost under high scheduler contention or in certain proxy configurations. Fix (pkg/runtime/streaming.go): - Move stream.Recv() into a dedicated reader goroutine so the main loop can select between data, context cancellation, and an idle timer. - Add a 5-minute defaultStreamIdleTimeout (var, so tests can shorten it). - On idle timeout: log a WARN, call cancelStream(errStreamIdle) to trigger Go's HTTP transport to close the underlying TCP connection, and return a clear sentinel error (errStreamIdle, not wrapped in any context error so the retry/fallback machinery classifies it as a non-retryable fatal error and surfaces it to the user). - On ctx.Done(): return ctx.Err() immediately (covers SIGTERM and all other cancellation sources). - A 'done' channel (closed on handleStream return) signals the reader goroutine to stop, preventing goroutine leaks on any exit path. Fix (pkg/runtime/fallback.go): - Create a per-attempt context.WithCancelCause child (streamCtx, streamCancel) before CreateChatCompletionStream so that when cancelStream is called by handleStream's idle-timeout arm, the cancellation propagates to the HTTP request context. This is the mechanism that causes readLoop in net/http's transport to call pc.close(), which closes the TCP socket and unblocks the reader goroutine that is stuck in body.Read(). Tests (pkg/runtime/streaming_test.go): - TestHandleStream_IdleTimeout: stalledStream never returns a chunk; idle timeout fires in 50 ms; asserts errStreamIdle and that cancelStream was called. - TestHandleStream_ContextCancellation: stalledStream blocks; context is cancelled after 20 ms; asserts context.Canceled and prompt return. Closes #3209
docker-agent
left a comment
There was a problem hiding this comment.
Assessment: 🟡 NEEDS ATTENTION
| // done is closed when handleStream exits (for any reason) so the reader | ||
| // goroutine below can detect it and stop trying to send on recvCh. | ||
| done := make(chan struct{}) | ||
| defer close(done) |
There was a problem hiding this comment.
[MEDIUM] Goroutine may persist briefly after handleStream returns due to undocumented Close()→Recv() contract
The reader goroutine relies on stream.Close() to unblock any in-progress stream.Recv() call. This works because the defer order is LIFO:
defer close(done) // registered first → runs LAST
defer stream.Close() // registered second → runs FIRSTSo stream.Close() fires before close(done). After stream.Close() unblocks stream.Recv(), the goroutine wakes up, enters the select, and may successfully send on recvCh (buffered, capacity 1) and loop back to call stream.Recv() again — before close(done) has fired. On a well-behaved stream implementation (where a closed stream's Recv() returns immediately), this is a benign extra iteration and the goroutine exits quickly.
However, if a chat.MessageStream implementation's Recv() can block even after Close() has been called (e.g., it returns from the first Recv() but then blocks on the second call after Close()), the goroutine will be stuck until ctx is cancelled. The done channel and ctx.Done() select arms only help once the goroutine has returned from stream.Recv(). There is no timeout or guarantee that stream.Recv() post-Close() will return promptly.
Suggested mitigation: Document the chat.MessageStream interface contract that Recv() must return promptly (immediately or with an error) after Close() is called. Alternatively, swap the defer order so close(done) fires before stream.Close(), giving the goroutine the best chance to notice done and exit cleanly before the stream is closed:
defer stream.Close() // register first → runs LAST
defer close(done) // register second → runs FIRST (goroutine sees done closed)With this order the goroutine sees done closed while still blocked in stream.Recv() (via the select after Recv returns), though it still needs stream.Close() to unblock the blocking Recv().
Problem
A
docker-agent runsession hung permanently when a streaming LLM request to the model gateway stalled. The outbound TCP connection stayed ESTABLISHED but no data was flowing. Debug logs froze mid-request, CPU dropped to 0%, and SIGTERM was ignored — only SIGKILL worked.Root cause (confirmed by code analysis):
The SSE stream-reading loop in
handleStreamblocked indefinitely instream.Recv()→bufio.Scanner.Scan()→body.Read()→syscall.Read. There was no idle/read deadline and no explicit context-cancellation check between chunks, so:net/httptransport'sreadLoopgoroutine happened to race ahead and callpc.close()before thebody.Readblocked. Under scheduler contention or proxy configurations this race could be lost, making graceful shutdown unreliable.Fix
pkg/runtime/streaming.gostream.Recv()is moved into a dedicated reader goroutine. The main loop selects between the result channel,ctx.Done(), and a 5-minute idle timer. The timer resets on every received chunk. If no chunk arrives within the window the stream is declared stalled.cancelStream(errStreamIdle)is called. This cancels the HTTP-request child context, which propagates to Go'snet/httptransport'sreadLoop;pc.close()closes the TCP socket and unblocks the goroutine that is stuck inbody.Read().ctx.Done()select arm returnsctx.Err()immediately, making SIGTERM / Ctrl-C terminate the stream without racing against the transport.donechannel (closed viadefer) signals the reader goroutine to exit on any return path, preventing leaks.pkg/runtime/fallback.gocontext.WithCancelCausechild context (streamCtx) for each stream attempt.streamCtxtoCreateChatCompletionStreamandhandleStream.streamCancelfunction is thecancelStreamhook described above.Tests
Two new tests in
pkg/runtime/streaming_test.go:TestHandleStream_IdleTimeouterrStreamIdleis returned;cancelStreamis calledTestHandleStream_ContextCancellationcontext.Canceledis returnedValidation
task build✅task test✅ (pre-existingTestLoadExamples/{dmr,unload_on_switch}failures confirmed unrelated)task lint✅ (0 new golangci-lint issues; 2 pre-existing custom-linter warnings in unrelated files)Closes #3209