diff --git a/langfuse/openai.py b/langfuse/openai.py index 16d293e73..02bd7d678 100644 --- a/langfuse/openai.py +++ b/langfuse/openai.py @@ -830,6 +830,178 @@ def _is_streaming_response(response: Any) -> bool: ) +_openai_stream_iter_hook_installed = False + + +def _install_openai_stream_iteration_hooks() -> None: + global _openai_stream_iter_hook_installed + + if not _is_openai_v1(): + return + + if not _openai_stream_iter_hook_installed: + original_iter = openai.Stream.__iter__ + + def traced_iter(self: Any) -> Any: + try: + yield from original_iter(self) + finally: + finalize_once = getattr(self, "_langfuse_finalize_once", None) + if finalize_once is not None: + finalize_once() + + setattr(openai.Stream, "__iter__", traced_iter) + _openai_stream_iter_hook_installed = True + + +def _finalize_stream_response( + *, + resource: OpenAiDefinition, + items: list[Any], + generation: LangfuseGeneration, + completion_start_time: Optional[datetime], +) -> None: + try: + model, completion, usage, metadata = ( + _extract_streamed_response_api_response(items) + if resource.object == "Responses" or resource.object == "AsyncResponses" + else _extract_streamed_openai_response(resource, items) + ) + + _create_langfuse_update( + completion, + generation, + completion_start_time, + model=model, + usage=usage, + metadata=metadata, + ) + except Exception: + pass + finally: + generation.end() + + +def _instrument_openai_stream( + *, + resource: OpenAiDefinition, + response: Any, + generation: LangfuseGeneration, +) -> Any: + if not hasattr(response, "_iterator"): + return LangfuseResponseGeneratorSync( + resource=resource, + response=response, + generation=generation, + ) + + items: list[Any] = [] + raw_iterator = response._iterator + completion_start_time: Optional[datetime] = None + is_finalized = False + close = response.close + + def finalize_once() -> None: + nonlocal is_finalized + if is_finalized: + return + + is_finalized = True + _finalize_stream_response( + resource=resource, + items=items, + generation=generation, + completion_start_time=completion_start_time, + ) + + response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined] + + def traced_iterator() -> Any: + nonlocal completion_start_time + try: + for item in raw_iterator: + items.append(item) + + if completion_start_time is None: + completion_start_time = _get_timestamp() + + yield item + finally: + finalize_once() + + def traced_close() -> Any: + try: + return close() + finally: + finalize_once() + + response._iterator = traced_iterator() + response.close = traced_close + + return response + + +def _instrument_openai_async_stream( + *, + resource: OpenAiDefinition, + response: Any, + generation: LangfuseGeneration, +) -> Any: + if not hasattr(response, "_iterator"): + return LangfuseResponseGeneratorAsync( + resource=resource, + response=response, + generation=generation, + ) + + items: list[Any] = [] + raw_iterator = response._iterator + completion_start_time: Optional[datetime] = None + is_finalized = False + close = response.close + + async def finalize_once() -> None: + nonlocal is_finalized + if is_finalized: + return + + is_finalized = True + _finalize_stream_response( + resource=resource, + items=items, + generation=generation, + completion_start_time=completion_start_time, + ) + + async def traced_iterator() -> Any: + nonlocal completion_start_time + try: + async for item in raw_iterator: + items.append(item) + + if completion_start_time is None: + completion_start_time = _get_timestamp() + + yield item + finally: + await finalize_once() + + async def traced_close() -> Any: + try: + return await close() + finally: + await finalize_once() + + async def traced_aclose() -> Any: + return await traced_close() + + response._iterator = traced_iterator() + response.close = traced_close + response.aclose = traced_aclose + + return response + + @_langfuse_wrapper def _wrap( open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any @@ -863,7 +1035,13 @@ def _wrap( try: openai_response = wrapped(**arg_extractor.get_openai_args()) - if _is_streaming_response(openai_response): + if _is_openai_v1() and isinstance(openai_response, openai.Stream): + return _instrument_openai_stream( + resource=open_ai_resource, + response=openai_response, + generation=generation, + ) + elif _is_streaming_response(openai_response): return LangfuseResponseGeneratorSync( resource=open_ai_resource, response=openai_response, @@ -934,7 +1112,13 @@ async def _wrap_async( try: openai_response = await wrapped(**arg_extractor.get_openai_args()) - if _is_streaming_response(openai_response): + if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream): + return _instrument_openai_async_stream( + resource=open_ai_resource, + response=openai_response, + generation=generation, + ) + elif _is_streaming_response(openai_response): return LangfuseResponseGeneratorAsync( resource=open_ai_resource, response=openai_response, @@ -994,6 +1178,7 @@ def register_tracing() -> None: register_tracing() +_install_openai_stream_iteration_hooks() class LangfuseResponseGeneratorSync: @@ -1010,6 +1195,7 @@ def __init__( self.response = response self.generation = generation self.completion_start_time: Optional[datetime] = None + self._is_finalized = False def __iter__(self) -> Any: try: @@ -1045,26 +1231,16 @@ def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: pass def _finalize(self) -> None: - try: - model, completion, usage, metadata = ( - _extract_streamed_response_api_response(self.items) - if self.resource.object == "Responses" - or self.resource.object == "AsyncResponses" - else _extract_streamed_openai_response(self.resource, self.items) - ) - - _create_langfuse_update( - completion, - self.generation, - self.completion_start_time, - model=model, - usage=usage, - metadata=metadata, - ) - except Exception: - pass - finally: - self.generation.end() + if self._is_finalized: + return + + self._is_finalized = True + _finalize_stream_response( + resource=self.resource, + items=self.items, + generation=self.generation, + completion_start_time=self.completion_start_time, + ) class LangfuseResponseGeneratorAsync: @@ -1081,6 +1257,7 @@ def __init__( self.response = response self.generation = generation self.completion_start_time: Optional[datetime] = None + self._is_finalized = False async def __aiter__(self) -> Any: try: @@ -1116,26 +1293,16 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None pass async def _finalize(self) -> None: - try: - model, completion, usage, metadata = ( - _extract_streamed_response_api_response(self.items) - if self.resource.object == "Responses" - or self.resource.object == "AsyncResponses" - else _extract_streamed_openai_response(self.resource, self.items) - ) - - _create_langfuse_update( - completion, - self.generation, - self.completion_start_time, - model=model, - usage=usage, - metadata=metadata, - ) - except Exception: - pass - finally: - self.generation.end() + if self._is_finalized: + return + + self._is_finalized = True + _finalize_stream_response( + resource=self.resource, + items=self.items, + generation=self.generation, + completion_start_time=self.completion_start_time, + ) async def close(self) -> None: """Close the response and release the connection. diff --git a/tests/unit/test_openai.py b/tests/unit/test_openai.py index 6ef51ff54..10a731a31 100644 --- a/tests/unit/test_openai.py +++ b/tests/unit/test_openai.py @@ -3,10 +3,109 @@ import pytest +import langfuse.openai as lf_openai_module from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse.openai import openai as lf_openai +class DummySyncResponse: + def __init__(self) -> None: + self.closed = False + + def close(self) -> None: + self.closed = True + + +class DummyAsyncResponse: + def __init__(self) -> None: + self.closed = False + + async def aclose(self) -> None: + self.closed = True + + +class DummyOpenAIStream(lf_openai.Stream): + def __init__(self, items, response) -> None: + self.response = response + self._iterator = iter(items) + + +class DummyOpenAIAsyncStream(lf_openai.AsyncStream): + def __init__(self, items, response) -> None: + self.response = response + self._iterator = self._stream(items) + + async def _stream(self, items): + for item in items: + yield item + + +class DummyGeneration: + def __init__(self) -> None: + self.end_calls = 0 + + def update(self, **kwargs): + return self + + def end(self) -> None: + self.end_calls += 1 + + +def _make_chat_stream_chunks(): + usage = SimpleNamespace(prompt_tokens=3, completion_tokens=1, total_tokens=4) + + return [ + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role="assistant", + content="2", + function_call=None, + tool_calls=None, + ), + finish_reason=None, + ) + ], + usage=None, + ), + SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role=None, + content=None, + function_call=None, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=usage, + ), + ] + + +def _make_single_chunk_stream(): + return SimpleNamespace( + model="gpt-4o-mini", + choices=[ + SimpleNamespace( + delta=SimpleNamespace( + role="assistant", + content="2", + function_call=None, + tool_calls=None, + ), + finish_reason="stop", + ) + ], + usage=None, + ) + + def test_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr ): @@ -161,6 +260,80 @@ def test_chat_completion_error_marks_generation_error(langfuse_memory_client, ge assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in span.attributes +def test_openai_stream_preserves_original_stream_contract( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.OpenAI(api_key="test") + raw_response = DummySyncResponse() + raw_stream = DummyOpenAIStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = openai_client.chat.completions.create( + name="unit-openai-native-stream", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + assert stream is raw_stream + assert isinstance(stream, lf_openai.Stream) + assert stream.response is raw_response + + chunks = list(stream) + stream.close() + + assert len(chunks) == 2 + assert raw_response.closed is True + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-stream") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + +def test_openai_stream_break_still_finalizes_generation( + langfuse_memory_client, get_span +): + openai_client = lf_openai.OpenAI(api_key="test") + raw_response = DummySyncResponse() + raw_stream = DummyOpenAIStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = openai_client.chat.completions.create( + name="unit-openai-native-stream-break", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + for chunk in stream: + assert chunk.choices[0].delta.content == "2" + break + + assert raw_response.closed is False + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-stream-break") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + + @pytest.mark.asyncio async def test_async_chat_completion_exports_generation_span( langfuse_memory_client, get_span, json_attr @@ -206,6 +379,140 @@ async def test_async_chat_completion_exports_generation_span( } +@pytest.mark.asyncio +async def test_openai_async_stream_preserves_original_stream_contract( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.AsyncOpenAI(api_key="test") + raw_response = DummyAsyncResponse() + raw_stream = DummyOpenAIAsyncStream(_make_chat_stream_chunks(), raw_response) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = await openai_client.chat.completions.create( + name="unit-openai-native-async-stream", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + assert stream is raw_stream + assert isinstance(stream, lf_openai.AsyncStream) + assert stream.response is raw_response + assert hasattr(stream, "aclose") + + chunks = [] + async for chunk in stream: + chunks.append(chunk) + + await stream.aclose() + + assert len(chunks) == 2 + assert raw_response.closed is True + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-async-stream") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + +@pytest.mark.asyncio +async def test_openai_async_stream_supports_anext( + langfuse_memory_client, get_span, json_attr +): + openai_client = lf_openai.AsyncOpenAI(api_key="test") + raw_stream = DummyOpenAIAsyncStream( + _make_chat_stream_chunks(), DummyAsyncResponse() + ) + + with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream): + stream = await openai_client.chat.completions.create( + name="unit-openai-native-async-anext", + model="gpt-4o-mini", + messages=[{"role": "user", "content": "1 + 1 = ?"}], + temperature=0, + stream=True, + ) + + first = await stream.__anext__() + second = await stream.__anext__() + + assert first.choices[0].delta.content == "2" + assert second.choices[0].finish_reason == "stop" + + with pytest.raises(StopAsyncIteration): + await stream.__anext__() + + langfuse_memory_client.flush() + span = get_span("unit-openai-native-async-anext") + + assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2" + assert ( + span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME] + is not None + ) + assert span.attributes["langfuse.observation.metadata.finish_reason"] == "stop" + assert json_attr(span, LangfuseOtelSpanAttributes.OBSERVATION_USAGE_DETAILS) == { + "prompt_tokens": 3, + "completion_tokens": 1, + "total_tokens": 4, + } + + +def test_fallback_sync_stream_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorSync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + list(wrapper) + + with pytest.raises(StopIteration): + next(wrapper) + + assert generation.end_calls == 1 + + +@pytest.mark.asyncio +async def test_fallback_async_stream_finalizes_once(): + resource = SimpleNamespace(object="Completions", type="chat") + generation = DummyGeneration() + + async def fallback_stream(): + yield _make_single_chunk_stream() + + wrapper = lf_openai_module.LangfuseResponseGeneratorAsync( + resource=resource, + response=fallback_stream(), + generation=generation, + ) + + async for _ in wrapper: + pass + + with pytest.raises(StopAsyncIteration): + await wrapper.__anext__() + + assert generation.end_calls == 1 + + def test_embedding_exports_dimensions_and_count( langfuse_memory_client, get_span, json_attr ):