From dee6e1e694ead82d2b08320dcead3280fa777f93 Mon Sep 17 00:00:00 2001 From: CorrectRoadH <29306285+CorrectRoadH@users.noreply.github.com> Date: Mon, 29 Jun 2026 11:47:31 +0800 Subject: [PATCH 1/5] fix(model_runner): request token usage on streaming completions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming chat completions from OpenAI-compatible providers omit the `usage` block unless the request sets `stream_options.include_usage`. Because the streaming call never set it, `Tape._extract_usage` found no usage on any chunk, so every streamed run recorded zero tokens and zero cost in the tape. Pass `stream_options={"include_usage": True}` when streaming, gated on the provider base class so only OpenAI-compatible providers (which accept the field) receive it — anthropic/gemini reject it and are left untouched. Non-streaming completions already carry usage in the response body and are unaffected. Adds unit tests covering the gate: streaming OpenAI-compatible providers get the field; non-streaming and non-OpenAI providers do not. --- src/bub/builtin/model_runner.py | 18 +++++++++++++- tests/test_model_runner_usage.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 tests/test_model_runner_usage.py diff --git a/src/bub/builtin/model_runner.py b/src/bub/builtin/model_runner.py index 0684f8fc..2664b964 100644 --- a/src/bub/builtin/model_runner.py +++ b/src/bub/builtin/model_runner.py @@ -10,6 +10,7 @@ from typing import Any, Literal, cast from any_llm import AnyLLM +from any_llm.providers.openai.base import BaseOpenAIProvider from any_llm.types.completion import ( ChatCompletion, ChatCompletionChunk, @@ -36,6 +37,19 @@ CompletionResult = ChatCompletion | ParsedChatCompletion[Any] | AsyncIterator[ChatCompletionChunk] +def _stream_usage_options(llm: AnyLLM, *, stream: bool) -> dict[str, Any] | None: + """Make streaming completions report token usage. + + OpenAI-style streaming responses omit the `usage` block unless the request + sets `stream_options.include_usage`; without it every streamed run records + zero tokens (and zero cost). Only OpenAI-compatible providers accept the + field, so gate on the provider base class — anthropic/gemini reject it. + """ + if stream and isinstance(llm, BaseOpenAIProvider): + return {"include_usage": True} + return None + + class ModelRunner: def __init__(self, settings: AgentSettings) -> None: self.settings = settings @@ -61,12 +75,14 @@ async def completion_response( completion_error: Exception | None = None for index, (candidate, llm) in enumerate(clients): try: + streaming = llm.SUPPORTS_COMPLETION_STREAMING return await llm.acompletion( model=candidate.model_id, messages=completion_messages, tools=tool_payloads, max_tokens=self.settings.max_tokens, - stream=llm.SUPPORTS_COMPLETION_STREAMING, + stream=streaming, + stream_options=_stream_usage_options(llm, stream=streaming), ) except Exception as exc: if completion_error is None: diff --git a/tests/test_model_runner_usage.py b/tests/test_model_runner_usage.py new file mode 100644 index 00000000..8afcb47d --- /dev/null +++ b/tests/test_model_runner_usage.py @@ -0,0 +1,41 @@ +"""Regression: streaming completions must request token usage. + +OpenAI-style streaming responses omit the `usage` block unless the request sets +`stream_options.include_usage`. Without it every streamed run records zero +tokens (and zero cost) in the tape. The field is only valid for OpenAI-compatible +providers, so it must be gated on the provider base class. +""" + +from __future__ import annotations + +import pytest +from any_llm import AnyLLM + +from bub.builtin.model_runner import _stream_usage_options + + +def _provider(name: str) -> AnyLLM: + return AnyLLM.create(name, api_key="test-key") + + +def test_openai_streaming_requests_usage() -> None: + assert _stream_usage_options(_provider("openai"), stream=True) == {"include_usage": True} + + +def test_openai_compatible_provider_streaming_requests_usage() -> None: + # openrouter (and other OpenAI-compatible providers) subclass BaseOpenAIProvider. + assert _stream_usage_options(_provider("openrouter"), stream=True) == {"include_usage": True} + + +def test_non_streaming_does_not_set_options() -> None: + # Non-streaming completions already carry usage in the response body. + assert _stream_usage_options(_provider("openai"), stream=False) is None + + +def test_non_openai_provider_is_not_offered_the_field() -> None: + # anthropic is not a BaseOpenAIProvider and rejects stream_options. + assert _stream_usage_options(_provider("anthropic"), stream=True) is None + + +if __name__ == "__main__": # pragma: no cover + raise SystemExit(pytest.main([__file__, "-q"])) From e5560d88ea7f63eef5af7edec7492da4c86c4db9 Mon Sep 17 00:00:00 2001 From: CorrectRoadH <29306285+CorrectRoadH@users.noreply.github.com> Date: Mon, 29 Jun 2026 11:50:19 +0800 Subject: [PATCH 2/5] test: trim usage-gate tests to one parametrized case Collapse four near-duplicate tests into a single parametrized test that guards the non-obvious intent (non-OpenAI providers must not receive stream_options) and the primary path, dropping the redundant cases. --- tests/test_model_runner_usage.py | 47 ++++++++++++-------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/tests/test_model_runner_usage.py b/tests/test_model_runner_usage.py index 8afcb47d..17503998 100644 --- a/tests/test_model_runner_usage.py +++ b/tests/test_model_runner_usage.py @@ -1,9 +1,12 @@ -"""Regression: streaming completions must request token usage. +"""Regression: streaming completions must request token usage — but only from +providers that accept the field. OpenAI-style streaming responses omit the `usage` block unless the request sets -`stream_options.include_usage`. Without it every streamed run records zero -tokens (and zero cost) in the tape. The field is only valid for OpenAI-compatible -providers, so it must be gated on the provider base class. +`stream_options.include_usage`; without it every streamed run records zero +tokens. The field is only valid for OpenAI-compatible providers, so it must be +gated on the provider base class — passing it to e.g. anthropic would break the +request. This test guards that gate (the non-obvious part); the trivial +"streaming openai gets the field" path is covered implicitly. """ from __future__ import annotations @@ -14,28 +17,14 @@ from bub.builtin.model_runner import _stream_usage_options -def _provider(name: str) -> AnyLLM: - return AnyLLM.create(name, api_key="test-key") - - -def test_openai_streaming_requests_usage() -> None: - assert _stream_usage_options(_provider("openai"), stream=True) == {"include_usage": True} - - -def test_openai_compatible_provider_streaming_requests_usage() -> None: - # openrouter (and other OpenAI-compatible providers) subclass BaseOpenAIProvider. - assert _stream_usage_options(_provider("openrouter"), stream=True) == {"include_usage": True} - - -def test_non_streaming_does_not_set_options() -> None: - # Non-streaming completions already carry usage in the response body. - assert _stream_usage_options(_provider("openai"), stream=False) is None - - -def test_non_openai_provider_is_not_offered_the_field() -> None: - # anthropic is not a BaseOpenAIProvider and rejects stream_options. - assert _stream_usage_options(_provider("anthropic"), stream=True) is None - - -if __name__ == "__main__": # pragma: no cover - raise SystemExit(pytest.main([__file__, "-q"])) +@pytest.mark.parametrize( + ("provider", "stream", "expected"), + [ + ("openai", True, {"include_usage": True}), # primary path: usage must be requested + ("openai", False, None), # non-streaming already carries usage in the body + ("anthropic", True, None), # not OpenAI-compatible: must not receive the field + ], +) +def test_stream_usage_options_gate(provider: str, stream: bool, expected: dict | None) -> None: + llm = AnyLLM.create(provider, api_key="test-key") + assert _stream_usage_options(llm, stream=stream) == expected From cddc95fc700eee5b31810f5c2429ead2b8e89a7a Mon Sep 17 00:00:00 2001 From: CorrectRoadH <29306285+CorrectRoadH@users.noreply.github.com> Date: Mon, 29 Jun 2026 11:50:35 +0800 Subject: [PATCH 3/5] test: drop usage-gate test The gate is a two-line pure helper; a dedicated test mostly mirrors its implementation rather than guarding real behavior. --- tests/test_model_runner_usage.py | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 tests/test_model_runner_usage.py diff --git a/tests/test_model_runner_usage.py b/tests/test_model_runner_usage.py deleted file mode 100644 index 17503998..00000000 --- a/tests/test_model_runner_usage.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Regression: streaming completions must request token usage — but only from -providers that accept the field. - -OpenAI-style streaming responses omit the `usage` block unless the request sets -`stream_options.include_usage`; without it every streamed run records zero -tokens. The field is only valid for OpenAI-compatible providers, so it must be -gated on the provider base class — passing it to e.g. anthropic would break the -request. This test guards that gate (the non-obvious part); the trivial -"streaming openai gets the field" path is covered implicitly. -""" - -from __future__ import annotations - -import pytest -from any_llm import AnyLLM - -from bub.builtin.model_runner import _stream_usage_options - - -@pytest.mark.parametrize( - ("provider", "stream", "expected"), - [ - ("openai", True, {"include_usage": True}), # primary path: usage must be requested - ("openai", False, None), # non-streaming already carries usage in the body - ("anthropic", True, None), # not OpenAI-compatible: must not receive the field - ], -) -def test_stream_usage_options_gate(provider: str, stream: bool, expected: dict | None) -> None: - llm = AnyLLM.create(provider, api_key="test-key") - assert _stream_usage_options(llm, stream=stream) == expected From 3bb26e964766135ee3c30b742f2d5f11424efdc6 Mon Sep 17 00:00:00 2001 From: CorrectRoadH <29306285+CorrectRoadH@users.noreply.github.com> Date: Tue, 30 Jun 2026 13:20:22 +0800 Subject: [PATCH 4/5] test: cover streaming usage tape recording --- tests/test_builtin_model_runner.py | 100 +++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 tests/test_builtin_model_runner.py diff --git a/tests/test_builtin_model_runner.py b/tests/test_builtin_model_runner.py new file mode 100644 index 00000000..03a9a8c7 --- /dev/null +++ b/tests/test_builtin_model_runner.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Iterator +from pathlib import Path +from typing import Any + +import pytest +from any_llm.constants import LLMProvider +from any_llm.providers.openai.base import BaseOpenAIProvider +from any_llm.types.completion import ChatCompletionChunk + +from bub.builtin.model_runner import ModelRunner +from bub.builtin.settings import AgentSettings, ModelCandidate +from bub.builtin.tape import Tape +from bub.tape import AsyncTapeStoreAdapter, InMemoryTapeStore, TapeContext + + +class _FakeStreamingOpenAIProvider(BaseOpenAIProvider): + SUPPORTS_COMPLETION_STREAMING = True + + def __init__(self) -> None: + self.completion_kwargs: dict[str, Any] | None = None + + async def acompletion(self, **kwargs: Any) -> AsyncIterator[ChatCompletionChunk]: + self.completion_kwargs = kwargs + include_usage = kwargs.get("stream_options") == {"include_usage": True} + + async def stream() -> AsyncIterator[ChatCompletionChunk]: + yield ChatCompletionChunk.model_validate( + { + "id": "chatcmpl_test", + "object": "chat.completion.chunk", + "created": 0, + "model": "gpt-test", + "choices": [ + { + "index": 0, + "finish_reason": None, + "delta": {"role": "assistant", "content": "done"}, + } + ], + } + ) + final_chunk: dict[str, Any] = { + "id": "chatcmpl_test", + "object": "chat.completion.chunk", + "created": 0, + "model": "gpt-test", + "choices": [], + } + if include_usage: + final_chunk["usage"] = {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5} + yield ChatCompletionChunk.model_validate(final_chunk) + + return stream() + + +class _FakeOpenAIModelRunner(ModelRunner): + def __init__(self, settings: AgentSettings, llm: _FakeStreamingOpenAIProvider) -> None: + super().__init__(settings) + self._llm = llm + + def iter_llm_clients(self, model: str) -> Iterator[tuple[ModelCandidate, _FakeStreamingOpenAIProvider]]: + yield ModelCandidate(provider=LLMProvider.OPENAI, model_id=model, name=f"openai:{model}"), self._llm + + +@pytest.mark.asyncio +async def test_streaming_openai_usage_is_requested_and_recorded_in_tape(tmp_path: Path) -> None: + store = InMemoryTapeStore() + tape = Tape(tmp_path, AsyncTapeStoreAdapter(store), TapeContext()).scoped("test-tape") + llm = _FakeStreamingOpenAIProvider() + runner = _FakeOpenAIModelRunner( + AgentSettings.model_construct(model="openai:gpt-test", max_tokens=100, model_timeout_seconds=None), + llm, + ) + + await tape.ensure_bootstrap_anchor() + events = [ + event + async for event in runner.run(tape=tape, model="gpt-test", tools=[], system_prompt=None, prompt="hello") + ] + + assert llm.completion_kwargs is not None + assert llm.completion_kwargs["stream"] is True + assert llm.completion_kwargs["stream_options"] == {"include_usage": True} + assert [(event.kind, event.data) for event in events] == [ + ("text", {"delta": "done"}), + ("final", {"ok": True, "text": "done"}), + ] + run_events = [ + entry + for entry in store.read("test-tape") or [] + if entry.kind == "event" and entry.payload.get("name") == "run" + ] + assert len(run_events) == 1 + assert run_events[0].payload["data"]["usage"] == { + "completion_tokens": 2, + "prompt_tokens": 3, + "total_tokens": 5, + } From 6bf1953ed296e8aef9ea1f5970ace95d0763a1bd Mon Sep 17 00:00:00 2001 From: CorrectRoadH <29306285+CorrectRoadH@users.noreply.github.com> Date: Tue, 30 Jun 2026 13:38:49 +0800 Subject: [PATCH 5/5] test: format model runner test --- tests/test_builtin_model_runner.py | 35 +++++++++++++----------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/tests/test_builtin_model_runner.py b/tests/test_builtin_model_runner.py index 03a9a8c7..7daebe71 100644 --- a/tests/test_builtin_model_runner.py +++ b/tests/test_builtin_model_runner.py @@ -26,21 +26,19 @@ async def acompletion(self, **kwargs: Any) -> AsyncIterator[ChatCompletionChunk] include_usage = kwargs.get("stream_options") == {"include_usage": True} async def stream() -> AsyncIterator[ChatCompletionChunk]: - yield ChatCompletionChunk.model_validate( - { - "id": "chatcmpl_test", - "object": "chat.completion.chunk", - "created": 0, - "model": "gpt-test", - "choices": [ - { - "index": 0, - "finish_reason": None, - "delta": {"role": "assistant", "content": "done"}, - } - ], - } - ) + yield ChatCompletionChunk.model_validate({ + "id": "chatcmpl_test", + "object": "chat.completion.chunk", + "created": 0, + "model": "gpt-test", + "choices": [ + { + "index": 0, + "finish_reason": None, + "delta": {"role": "assistant", "content": "done"}, + } + ], + }) final_chunk: dict[str, Any] = { "id": "chatcmpl_test", "object": "chat.completion.chunk", @@ -76,8 +74,7 @@ async def test_streaming_openai_usage_is_requested_and_recorded_in_tape(tmp_path await tape.ensure_bootstrap_anchor() events = [ - event - async for event in runner.run(tape=tape, model="gpt-test", tools=[], system_prompt=None, prompt="hello") + event async for event in runner.run(tape=tape, model="gpt-test", tools=[], system_prompt=None, prompt="hello") ] assert llm.completion_kwargs is not None @@ -88,9 +85,7 @@ async def test_streaming_openai_usage_is_requested_and_recorded_in_tape(tmp_path ("final", {"ok": True, "text": "done"}), ] run_events = [ - entry - for entry in store.read("test-tape") or [] - if entry.kind == "event" and entry.payload.get("name") == "run" + entry for entry in store.read("test-tape") or [] if entry.kind == "event" and entry.payload.get("name") == "run" ] assert len(run_events) == 1 assert run_events[0].payload["data"]["usage"] == {