Skip to content

Commit 9ccf99c

Browse files
authored
feat: Create EventQueue interface and make tap() async. (#914)
Refactor `EventQueue` to introduce a formal interface (ABC) and a more robust v2 implementation (`EventQueueSource`/`EventQueueSink`) while maintaining EventQueueLegacy. The previous `EventQueue` implementation (now `EventQueueLegacy`) include multiple concurency issue and provide fragile not documented synchronization contracts. New EventQueue/EventQueueSource/EventQueueSink will be used in new version of DefaultRequestHandler. Fixes #869
1 parent ca7edc3 commit 9ccf99c

File tree

9 files changed

+1353
-343
lines changed

9 files changed

+1353
-343
lines changed

src/a2a/server/events/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Event handling components for the A2A server."""
22

33
from a2a.server.events.event_consumer import EventConsumer
4-
from a2a.server.events.event_queue import Event, EventQueue
4+
from a2a.server.events.event_queue import Event, EventQueue, EventQueueLegacy
55
from a2a.server.events.in_memory_queue_manager import InMemoryQueueManager
66
from a2a.server.events.queue_manager import (
77
NoTaskQueue,
@@ -14,6 +14,7 @@
1414
'Event',
1515
'EventConsumer',
1616
'EventQueue',
17+
'EventQueueLegacy',
1718
'InMemoryQueueManager',
1819
'NoTaskQueue',
1920
'QueueManager',

src/a2a/server/events/event_consumer.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
TaskState,
1313
TaskStatusUpdateEvent,
1414
)
15-
from a2a.utils.errors import InternalError
1615
from a2a.utils.telemetry import SpanKind, trace_class
1716

1817

@@ -34,31 +33,6 @@ def __init__(self, queue: EventQueue):
3433
self._exception: BaseException | None = None
3534
logger.debug('EventConsumer initialized')
3635

37-
async def consume_one(self) -> Event:
38-
"""Consume one event from the agent event queue non-blocking.
39-
40-
Returns:
41-
The next event from the queue.
42-
43-
Raises:
44-
InternalError: If the queue is empty when attempting to dequeue
45-
immediately.
46-
"""
47-
logger.debug('Attempting to consume one event.')
48-
try:
49-
event = await self.queue.dequeue_event(no_wait=True)
50-
except asyncio.QueueEmpty as e:
51-
logger.warning('Event queue was empty in consume_one.')
52-
raise InternalError(
53-
message='Agent did not return any response'
54-
) from e
55-
56-
logger.debug('Dequeued event of type: %s in consume_one.', type(event))
57-
58-
self.queue.task_done()
59-
60-
return event
61-
6236
async def consume_all(self) -> AsyncGenerator[Event]:
6337
"""Consume all the generated streaming events from the agent.
6438

src/a2a/server/events/event_queue.py

Lines changed: 130 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import logging
33
import sys
44

5+
from abc import ABC, abstractmethod
56
from types import TracebackType
6-
from typing import Any
7+
from typing import Any, cast
78

89
from typing_extensions import Self
910

@@ -46,8 +47,121 @@ def _create_async_queue(maxsize: int = 0) -> AsyncQueue[Any]:
4647
DEFAULT_MAX_QUEUE_SIZE = 1024
4748

4849

50+
class EventQueue(ABC):
51+
"""Base class and factory for EventQueueSource.
52+
53+
EventQueue provides an abstraction for a queue of events that can be tapped
54+
by multiple consumers.
55+
EventQueue maintain main queue and source and maintain child queues in sync.
56+
GUARANTEE: All sinks (including the default one) will receive events in the exact same order.
57+
58+
WARNING (Concurrency): All events from all sinks (both the default queue and any
59+
tapped child queues) must be regularly consumed and marked as done. If any single
60+
consumer stops processing and its queue reaches capacity, it can block the event
61+
dispatcher and stall the entire system, causing a widespread deadlock.
62+
63+
WARNING (Memory Leak): Event queues spawn background tasks. To prevent memory
64+
and task leaks, all queue objects (both source and sinks) MUST be explicitly
65+
closed via `await queue.close()` or by using the async context manager (`async with queue:`).
66+
Child queues are automatically closed when parent queue is closed, but you
67+
should still close them explicitly to prevent queues from reaching capacity by
68+
unconsumed events.
69+
70+
Typical usage:
71+
queue = EventQueue()
72+
child_queue1 = await queue.tap()
73+
child_queue2 = await queue.tap()
74+
75+
async for event in child_queue1:
76+
do_some_work(event)
77+
child_queue1.task_done()
78+
"""
79+
80+
def __new__(cls, *args: Any, **kwargs: Any) -> Self:
81+
"""Redirects instantiation to EventQueueLegacy for backwards compatibility."""
82+
if cls is EventQueue:
83+
instance = EventQueueLegacy.__new__(EventQueueLegacy)
84+
EventQueueLegacy.__init__(instance, *args, **kwargs)
85+
return cast('Self', instance)
86+
return super().__new__(cls)
87+
88+
@abstractmethod
89+
async def enqueue_event(self, event: Event) -> None:
90+
"""Pushes an event into the queue.
91+
92+
Only main queue can enqueue events. Child queues can only dequeue events.
93+
"""
94+
95+
@abstractmethod
96+
async def dequeue_event(self) -> Event:
97+
"""Pulls an event from the queue."""
98+
99+
@abstractmethod
100+
def task_done(self) -> None:
101+
"""Signals that a work on dequeued event is complete."""
102+
103+
@abstractmethod
104+
async def tap(
105+
self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE
106+
) -> 'EventQueue':
107+
"""Creates a child queue that receives future events.
108+
109+
Note: The tapped queue may receive some old events if the incoming event
110+
queue is lagging behind and hasn't dispatched them yet.
111+
"""
112+
113+
@abstractmethod
114+
async def close(self, immediate: bool = False) -> None:
115+
"""Closes the queue.
116+
117+
For parent queue: it closes the main queue and all its child queues.
118+
For child queue: it closes only child queue.
119+
120+
It is safe to call it multiple times.
121+
If immediate is True, the queue will be closed without waiting for all events to be processed.
122+
If immediate is False, the queue will be closed after all events are processed (and confirmed with task_done() calls).
123+
124+
WARNING: Closing the parent queue with immediate=False is a deadlock risk if there are unconsumed events
125+
in any of the child sinks and the consumer has crashed without draining its queue.
126+
It is highly recommended to wrap graceful shutdowns with a timeout, e.g.,
127+
`asyncio.wait_for(queue.close(immediate=False), timeout=...)`.
128+
"""
129+
130+
@abstractmethod
131+
def is_closed(self) -> bool:
132+
"""[DEPRECATED] Checks if the queue is closed.
133+
134+
NOTE: Relying on this for enqueue logic introduces race conditions.
135+
It is maintained primarily for backwards compatibility, workarounds for
136+
Python 3.10/3.12 async queues in consumers, and for the test suite.
137+
"""
138+
139+
@abstractmethod
140+
async def __aenter__(self) -> Self:
141+
"""Enters the async context manager, returning the queue itself.
142+
143+
WARNING: See `__aexit__` for important deadlock risks associated with
144+
exiting this context manager if unconsumed events remain.
145+
"""
146+
147+
@abstractmethod
148+
async def __aexit__(
149+
self,
150+
exc_type: type[BaseException] | None,
151+
exc_val: BaseException | None,
152+
exc_tb: TracebackType | None,
153+
) -> None:
154+
"""Exits the async context manager, ensuring close() is called.
155+
156+
WARNING: The context manager calls `close(immediate=False)` by default.
157+
If a consumer exits the `async with` block early (e.g., due to an exception
158+
or an explicit `break`) while unconsumed events remain in the queue,
159+
`__aexit__` will deadlock waiting for `task_done()` to be called on those events.
160+
"""
161+
162+
49163
@trace_class(kind=SpanKind.SERVER)
50-
class EventQueue:
164+
class EventQueueLegacy(EventQueue):
51165
"""Event queue for A2A responses from agent.
52166
53167
Acts as a buffer between the agent's asynchronous execution and the
@@ -63,14 +177,19 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
63177
if max_queue_size <= 0:
64178
raise ValueError('max_queue_size must be greater than 0')
65179

66-
self.queue: AsyncQueue[Event] = _create_async_queue(
180+
self._queue: AsyncQueue[Event] = _create_async_queue(
67181
maxsize=max_queue_size
68182
)
69183
self._children: list[EventQueue] = []
70184
self._is_closed = False
71185
self._lock = asyncio.Lock()
72186
logger.debug('EventQueue initialized.')
73187

188+
@property
189+
def queue(self) -> AsyncQueue[Event]:
190+
"""[DEPRECATED] Returns the underlying asyncio.Queue."""
191+
return self._queue
192+
74193
async def __aenter__(self) -> Self:
75194
"""Enters the async context manager, returning the queue itself."""
76195
return self
@@ -106,7 +225,7 @@ async def enqueue_event(self, event: Event) -> None:
106225
for child in self._children:
107226
await child.enqueue_event(event)
108227

109-
async def dequeue_event(self, no_wait: bool = False) -> Event:
228+
async def dequeue_event(self) -> Event:
110229
"""Dequeues an event from the queue.
111230
112231
This implementation expects that dequeue to raise an exception when
@@ -115,38 +234,23 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
115234
the user is awaiting the queue.get method. Python<=3.12 this needs to
116235
manage this lifecycle itself. The current implementation can lead to
117236
blocking if the dequeue_event is called before the EventQueue has been
118-
closed but when there are no events on the queue. Two ways to avoid this
119-
are to call this with no_wait = True which won't block, but is the
120-
callers responsibility to retry as appropriate. Alternatively, one can
121-
use an async Task management solution to cancel the get task if the queue
237+
closed but when there are no events on the queue. One way to avoid this
238+
is to use an async Task management solution to cancel the get task if the queue
122239
has closed or some other condition is met. The implementation of the
123240
EventConsumer uses an async.wait with a timeout to abort the
124241
dequeue_event call and retry, when it will return with a closed error.
125242
126-
Args:
127-
no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
128-
If False (default), wait until an event is available.
129-
130243
Returns:
131244
The next event from the queue.
132245
133246
Raises:
134-
asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
135247
asyncio.QueueShutDown: If the queue has been closed and is empty.
136248
"""
137249
async with self._lock:
138250
if self._is_closed and self.queue.empty():
139251
logger.warning('Queue is closed. Event will not be dequeued.')
140252
raise QueueShutDown('Queue is closed.')
141253

142-
if no_wait:
143-
logger.debug('Attempting to dequeue event (no_wait=True).')
144-
event = self.queue.get_nowait()
145-
logger.debug(
146-
'Dequeued event (no_wait=True) of type: %s', type(event)
147-
)
148-
return event
149-
150254
logger.debug('Attempting to dequeue event (waiting).')
151255
event = await self.queue.get()
152256
logger.debug('Dequeued event (waited) of type: %s', type(event))
@@ -160,15 +264,17 @@ def task_done(self) -> None:
160264
logger.debug('Marking task as done in EventQueue.')
161265
self.queue.task_done()
162266

163-
def tap(self) -> 'EventQueue':
164-
"""Taps the event queue to create a new child queue that receives all future events.
267+
async def tap(
268+
self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE
269+
) -> 'EventQueueLegacy':
270+
"""Taps the event queue to create a new child queue that receives future events.
165271
166272
Returns:
167273
A new `EventQueue` instance that will receive all events enqueued
168274
to this parent queue from this point forward.
169275
"""
170276
logger.debug('Tapping EventQueue to create a child queue.')
171-
queue = EventQueue()
277+
queue = EventQueueLegacy(max_queue_size=max_queue_size)
172278
self._children.append(queue)
173279
return queue
174280

@@ -199,48 +305,3 @@ async def close(self, immediate: bool = False) -> None:
199305
def is_closed(self) -> bool:
200306
"""Checks if the queue is closed."""
201307
return self._is_closed
202-
203-
async def clear_events(self, clear_child_queues: bool = True) -> None:
204-
"""Clears all events from the current queue and optionally all child queues.
205-
206-
This method removes all pending events from the queue without processing them.
207-
Child queues can be optionally cleared based on the clear_child_queues parameter.
208-
209-
Args:
210-
clear_child_queues: If True (default), clear all child queues as well.
211-
If False, only clear the current queue, leaving child queues untouched.
212-
"""
213-
logger.debug('Clearing all events from EventQueue and child queues.')
214-
215-
# Clear all events from the queue, even if closed
216-
cleared_count = 0
217-
async with self._lock:
218-
try:
219-
while True:
220-
event = self.queue.get_nowait()
221-
logger.debug(
222-
'Discarding unprocessed event of type: %s, content: %s',
223-
type(event),
224-
event,
225-
)
226-
self.queue.task_done()
227-
cleared_count += 1
228-
except asyncio.QueueEmpty:
229-
pass
230-
except QueueShutDown:
231-
pass
232-
233-
if cleared_count > 0:
234-
logger.debug(
235-
'Cleared %d unprocessed events from EventQueue.',
236-
cleared_count,
237-
)
238-
239-
# Clear all child queues (lock released before awaiting child tasks)
240-
if clear_child_queues and self._children:
241-
child_tasks = [
242-
asyncio.create_task(child.clear_events())
243-
for child in self._children
244-
]
245-
246-
await asyncio.gather(*child_tasks, return_exceptions=True)

0 commit comments

Comments
 (0)