Skip to content

Commit 7da5236

Browse files
authored
Add a RestateTracer and RestateTracerProvider for AI observability (#187)
* Add AI Agent tracing integration * Formatting * rename langfuse to tracing * Fix CI errors
1 parent 35b109c commit 7da5236

6 files changed

Lines changed: 160 additions & 7 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ client = ["httpx[http2]"]
2929
adk = ["google-adk>=1.20.0"]
3030
openai = ["openai-agents>=0.6.1"]
3131
pydantic_ai = ["pydantic-ai-slim>=1.68.0"]
32+
tracing = ["opentelemetry-api>=1.36.0"]
3233

3334
[build-system]
3435
requires = ["maturin>=1.6,<2.0"]

python/restate/ext/adk/summarizer.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,14 @@ def from_summarizer(
6767
"""Create a RestateEventSummarizer wrapping a custom summarizer."""
6868
return RestateEventSummarizer(summarizer, max_retries=max_retries)
6969

70-
async def maybe_summarize_events(
71-
self, *, events: list[Event]
72-
) -> Optional[Event]:
70+
async def maybe_summarize_events(self, *, events: list[Event]) -> Optional[Event]:
7371
if not events:
7472
return None
7573

7674
ctx = current_context()
7775
if ctx is None:
7876
raise RuntimeError(
79-
"No Restate context found. "
80-
"RestateEventSummarizer must be used from within a Restate handler."
77+
"No Restate context found. RestateEventSummarizer must be used from within a Restate handler."
8178
)
8279

8380
inner = self._inner
@@ -92,4 +89,4 @@ async def call_inner() -> Optional[Event]:
9289
max_attempts=self._max_retries,
9390
initial_retry_interval=timedelta(seconds=1),
9491
),
95-
)
92+
)

python/restate/ext/pydantic/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from ._serde import PydanticTypeAdapter
99
from ._toolset import RestateContextRunToolSet
1010

11+
1112
def restate_object_context() -> ObjectContext:
1213
"""Get the current Restate ObjectContext."""
1314
ctx = current_context()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ._tracing import RestateTracer, RestateTracerProvider
2+
3+
__all__ = ["RestateTracer", "RestateTracerProvider"]
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
"""Restate OTEL tracer wrapper that flattens all spans under the Restate trace.
2+
3+
Wraps any tracer so that every span — regardless of framework nesting — becomes a
4+
direct child of the Restate invocation trace. Works transparently with any
5+
OTEL-integrated agent framework (Google ADK, Pydantic AI, OpenAI Agents, etc.).
6+
7+
Usage:
8+
tracer = RestateTracer(trace_api.get_tracer("my-tracer"))
9+
# All spans created by this tracer are flat children of the Restate trace.
10+
"""
11+
12+
from opentelemetry.trace import INVALID_SPAN, use_span, Tracer, TracerProvider
13+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
14+
from restate.server_context import (
15+
current_context,
16+
get_extension_data,
17+
set_extension_data,
18+
restate_context_is_replaying,
19+
)
20+
21+
_propagator = TraceContextTextMapPropagator()
22+
_EXTENSION_KEY = "otel_span_cleanup"
23+
24+
25+
class _SpanCleanup:
26+
"""Stored as Restate extension data. ``__close__`` is called automatically
27+
when the Restate invocation context is torn down, ending any spans that
28+
were never properly closed (e.g. because the handler raised)."""
29+
30+
def __init__(self):
31+
self._spans = []
32+
33+
def track(self, span):
34+
self._spans.append(span)
35+
36+
def __close__(self):
37+
for span in self._spans:
38+
if span.is_recording():
39+
span.end()
40+
self._spans.clear()
41+
42+
43+
class RestateTracer(Tracer):
44+
"""Wraps a ``Tracer`` to always parent spans under the Restate root context.
45+
46+
During Restate replay, returns no-op spans to avoid duplicates."""
47+
48+
def __init__(self, tracer):
49+
self._tracer = tracer
50+
51+
@staticmethod
52+
def _get_root_context():
53+
"""Extract the Restate trace parent from the current handler, or None."""
54+
ctx = current_context()
55+
if ctx is None:
56+
raise Exception("You are not in a Restate handler")
57+
return _propagator.extract(ctx.request().attempt_headers)
58+
59+
def start_span(
60+
self,
61+
name,
62+
context=None,
63+
kind=None,
64+
attributes=None,
65+
links=None,
66+
start_time=None,
67+
record_exception=True,
68+
set_status_on_exception=True,
69+
):
70+
if restate_context_is_replaying.get(False):
71+
return INVALID_SPAN
72+
root = self._get_root_context()
73+
if root is not None:
74+
context = root
75+
span = self._tracer.start_span(
76+
name,
77+
context=context,
78+
kind=kind,
79+
attributes=attributes,
80+
links=links,
81+
start_time=start_time,
82+
record_exception=record_exception,
83+
set_status_on_exception=set_status_on_exception,
84+
)
85+
self._track_span(span)
86+
return span
87+
88+
def start_as_current_span(
89+
self,
90+
name,
91+
context=None,
92+
kind=None,
93+
attributes=None,
94+
links=None,
95+
start_time=None,
96+
record_exception=True,
97+
set_status_on_exception=True,
98+
end_on_exit=True,
99+
):
100+
if restate_context_is_replaying.get(False):
101+
return use_span(INVALID_SPAN, end_on_exit=False)
102+
root = self._get_root_context()
103+
if root is not None:
104+
context = root
105+
return self._tracer.start_as_current_span(
106+
name,
107+
context=context,
108+
kind=kind,
109+
attributes=attributes,
110+
links=links,
111+
start_time=start_time,
112+
record_exception=record_exception,
113+
set_status_on_exception=set_status_on_exception,
114+
end_on_exit=end_on_exit,
115+
)
116+
117+
@staticmethod
118+
def _track_span(span):
119+
"""Register a span for cleanup when the Restate invocation ends."""
120+
ctx = current_context()
121+
if ctx is None:
122+
return
123+
cleanup = get_extension_data(ctx, _EXTENSION_KEY)
124+
if cleanup is None:
125+
cleanup = _SpanCleanup()
126+
set_extension_data(ctx, _EXTENSION_KEY, cleanup)
127+
cleanup.track(span)
128+
129+
def __getattr__(self, name):
130+
return getattr(self._tracer, name)
131+
132+
133+
class RestateTracerProvider(TracerProvider):
134+
"""Wraps a ``TracerProvider`` to return ``RestateTracer`` instances.
135+
136+
Pass this to instrumentors (e.g. ``GoogleADKInstrumentor``) so that every
137+
span they create is automatically parented under the Restate invocation."""
138+
139+
def __init__(self, provider):
140+
self._provider = provider
141+
142+
def get_tracer(self, *args, **kwargs):
143+
return RestateTracer(self._provider.get_tracer(*args, **kwargs))
144+
145+
def __getattr__(self, name):
146+
return getattr(self._provider, name)

uv.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)