Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ def __init__(self, max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE) -> None:
self._children: list[EventQueue] = []
self._is_closed = False
self._lock = asyncio.Lock()
self._bg_tasks: set[asyncio.Task[None]] = set()
logger.debug('EventQueue initialized.')

async def enqueue_event(self, event: Event) -> None:
"""Enqueues an event to this queue and all its children.
"""Enqueues an event to this queue and propagates it to all child queues.

Args:
event: The event object to enqueue.
Expand All @@ -59,7 +60,12 @@ async def enqueue_event(self, event: Event) -> None:
# Make sure to use put instead of put_nowait to avoid blocking the event loop.
await self.queue.put(event)
for child in self._children:
await child.enqueue_event(event)
# We use a background task to enqueue to children to avoid blocking
# the parent queue if a child queue is full (e.g. slow consumer).
# This prevents deadlocks where a slow consumer blocks the producer.
task = asyncio.create_task(child.enqueue_event(event))
self._bg_tasks.add(task)
task.add_done_callback(self._bg_tasks.discard)

async def dequeue_event(self, no_wait: bool = False) -> Event:
"""Dequeues an event from the queue.
Expand Down Expand Up @@ -132,6 +138,17 @@ def tap(self) -> 'EventQueue':
self._children.append(queue)
return queue

async def flush(self) -> None:
"""Waits for all pending background propagation tasks to complete recursively."""
while self._bg_tasks:
# Copy the set to avoid "Set changed size during iteration"
tasks = list(self._bg_tasks)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
Comment on lines +146 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The if tasks: check is redundant because the while self._bg_tasks: loop on line 143 already ensures that tasks will be a non-empty list here. You can remove this conditional and un-indent the following line for simplification.

            await asyncio.gather(*tasks, return_exceptions=True)


if self._children:
await asyncio.gather(*(child.flush() for child in self._children))

async def close(self, immediate: bool = False) -> None:
"""Closes the queue for future push events and also closes all child queues.

Expand Down Expand Up @@ -161,6 +178,12 @@ async def close(self, immediate: bool = False) -> None:
return
if not self._is_closed:
self._is_closed = True

if immediate:
# Cancel all pending background propagation tasks
for task in self._bg_tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Iterating directly over self._bg_tasks is not safe here. If a task being cancelled finishes quickly, its done_callback will modify the set, leading to a RuntimeError: Set changed size during iteration. You should iterate over a copy of the set to prevent this.

            for task in list(self._bg_tasks):

task.cancel()

# If using python 3.13 or higher, use shutdown but match <3.13 semantics
if sys.version_info >= (3, 13):
if immediate:
Expand All @@ -170,10 +193,12 @@ async def close(self, immediate: bool = False) -> None:
for child in self._children:
await child.close(True)
return
# Graceful: prevent further gets/puts via shutdown, then wait for drain and children
# Graceful: prevent further gets/puts via shutdown, then wait for drain, propagation and children
self.queue.shutdown(False)
await asyncio.gather(
self.queue.join(), *(child.close() for child in self._children)
self.queue.join(),
self.flush(),
*(child.close() for child in self._children),
)
# Otherwise, join the queue
else:
Expand All @@ -182,8 +207,11 @@ async def close(self, immediate: bool = False) -> None:
for child in self._children:
await child.close(immediate)
return
# Graceful: wait for drain, propagation and children
await asyncio.gather(
self.queue.join(), *(child.close() for child in self._children)
self.queue.join(),
self.flush(),
*(child.close() for child in self._children),
)

def is_closed(self) -> bool:
Expand Down
39 changes: 39 additions & 0 deletions tests/server/events/test_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ async def test_enqueue_event_propagates_to_children(
await event_queue.enqueue_event(event1)
await event_queue.enqueue_event(event2)

# Wait for all background tasks to complete
await event_queue.flush()

# Check parent queue
assert await event_queue.dequeue_event(no_wait=True) == event1
assert await event_queue.dequeue_event(no_wait=True) == event2
Expand Down Expand Up @@ -203,6 +206,36 @@ async def test_enqueue_event_when_closed(
await child_queue.dequeue_event(no_wait=True)


@pytest.mark.asyncio
async def test_event_queue_slow_consumer_does_not_block_parent(
event_queue: EventQueue,
) -> None:
"""Test that a slow or blocked consumer on a tapped queue doesn't block the parent queue."""
child_queue = event_queue.tap()

# Artificially limit the child queue to a size of 1 so it fills up instantly
child_queue.queue = asyncio.Queue(maxsize=1)

# Enqueue first event. It should fit in the child queue.
event1 = create_sample_message('1')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The event1 variable is assigned but never used. You can enqueue the event directly to improve readability.

Suggested change
event1 = create_sample_message('1')
await event_queue.enqueue_event(create_sample_message('1'))

await event_queue.enqueue_event(event1)

# Enqueue second event. The child queue is now full.
# If the parent blocks on `await child_queue.enqueue_event()`, this will hang.
event2 = create_sample_message('2')
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1)
Comment on lines +225 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The event2 variable is assigned but never used. You can remove it and create the message directly inside the enqueue_event call to make the code cleaner.

Suggested change
event2 = create_sample_message('2')
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(event2), timeout=0.1)
try:
# Give it a short timeout. If it hangs, it means the parent is blocked.
await asyncio.wait_for(event_queue.enqueue_event(create_sample_message('2')), timeout=0.1)

except asyncio.TimeoutError:
pytest.fail(
'Parent EventQueue was blocked by a full child queue (slow consumer)!'
)

# Clean up to prevent background tasks from leaking or complaining
await child_queue.dequeue_event()
await child_queue.dequeue_event()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current cleanup logic might be racy. After the first dequeue_event, the blocked background task for the second event can proceed, but it's not guaranteed to have completed before the second dequeue_event is called. This could lead to a flaky test. Using event_queue.flush() would make this more robust.

Suggested change
await child_queue.dequeue_event()
await event_queue.flush() # Wait for the background task for event2 to complete
await child_queue.dequeue_event() # Dequeue event2



@pytest.fixture
def expected_queue_closed_exception() -> type[Exception]:
if sys.version_info < (3, 13):
Expand Down Expand Up @@ -420,6 +453,9 @@ async def test_close_immediate_propagates_to_children(
event = create_sample_message()
await event_queue.enqueue_event(event)

# Wait for background propagation to finish
await event_queue.flush()

assert child_queue.is_closed() is False
assert child_queue.queue.empty() is False

Expand All @@ -440,6 +476,9 @@ async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None:
await event_queue.enqueue_event(event1)
await event_queue.enqueue_event(event2)

# Wait for all background tasks to complete
await event_queue.flush()

# Clear only parent queue
await event_queue.clear_events(clear_child_queues=False)

Expand Down
Loading