make async tools first-class: direct registration, survives pause/resume#5711
Open
longcw wants to merge 9 commits into
Open
make async tools first-class: direct registration, survives pause/resume#5711longcw wants to merge 9 commits into
longcw wants to merge 9 commits into
Conversation
Extract `_AsyncToolManager` from `AsyncToolset` so the lifecycle pieces — AsyncRunContext creation, background task spawn, duplicate handling, progress reply coalescing — can be used independently of the toolset wrapper. Each `AgentActivity` now owns one such manager (`on_duplicate_call="reject"`), and at tool dispatch any tool whose signature includes `AsyncRunContext` is routed through that manager — no upfront grouping or `AsyncToolset` wrapping needed. `AsyncToolset` continues to work unchanged: it delegates to its own internal manager and still always exposes `get_running_tasks` / `cancel_task`. The activity surfaces them dynamically — only while at least one activity-managed task is live — so agents that never invoke an async tool see no extra tools in their LLM context. `ToolContext` dedupes by name, so both can coexist. Tools placed inside an explicit `AsyncToolset` retain its richer semantics (toolset-scoped lifetime that survives agent handoff, configurable `on_duplicate_call`). Tools placed directly in `agent.tools` get the activity-scoped default — tasks die on handoff. `AsyncRunContext` is now re-exported from `livekit.agents` next to `RunContext`, so tools can declare it without reaching into the internal module path.
…ync tools Two PR-review fixes: 1. _close_session() now aclose()s the activity's _AsyncToolManager so the pause() path also cancels in-flight bare async tool tasks. Previously only aclose() did this, so a late ctx.update() / return from a paused activity could touch the next agent's chat context via _enqueue_reply. 2. mock_tools() now takes precedence over the activity manager. The check moved ahead of the use_async_manager branch in _execute_tools_task. For async tools with a mock, we pass raw JSON kwargs straight through — _run_mock trims to the mock's actual signature, so the AsyncRunContext param of the original tool isn't required on the mock.
Two layers of cleanup on top of the activity-scoped manager: 1. `prepare_function_arguments` now raises `ToolError` directly for argument validation failures (bad JSON, ValidationError, missing required params, context-type mismatch). Removes the matching try/except boilerplate from `execute_function_call`, `tool_proxy.py::_handle_call`, and the now-deleted helper in `generation.py`. One place owns the validation→ToolError contract; the message LLMs see is consistent. 2. `_execute_tools_task` now uses a single `_execute(ctx)` closure for sync and async tools. The closure preps args against the tool's signature and either runs the mock or the real tool. Sync path partial-binds with a plain `RunContext`; async path hands the closure to `async_tool_manager.spawn`, which calls it with an `AsyncRunContext`. `_run_mock` moves to module level in `run_result.py` so it's shared between dispatch sites. 3. `_AsyncToolManager.spawn` now takes a `function_callable` (the body) and `run_ctx` only — no `tool` or `raw_arguments`. Callers build the closure that captures whatever they need. The manager is a pure async-lifecycle service. 4. AsyncToolset wrappers handle their own mock lookup so mocks of wrapped async tools run through the toolset's manager (full async lifecycle). `_is_async_toolset_wrapper(tool)` is the internal contract that tells dispatch in `_execute_tools_task` to defer mock handling. 5. Activity-scoped manager teardown moved into `_close_session` so the pause() path (not just aclose) cancels in-flight bare async tool tasks. Otherwise a late `ctx.update()` from a paused activity could touch the next agent's chat context. Tests collapsed into `tests/test_async_tool.py`: - 13 unit tests for the manager and dispatch helpers, no real session. - 2 e2e tests driving a real `AgentSession` with a scripted `FakeLLM` and `mock_tools` — including the post-update-raise path that proves errors surface to the LLM via `_enqueue_reply` → `REPLY_INSTRUCTIONS` → `generate_reply`.
prepare_function_arguments now wraps validation failures as ToolError
("Error parsing arguments for ..."). Update the three tests that still
expected ValueError or the old "invalid parameters" JSON message.
ToolError from arg parsing is already logged inside prepare_function_arguments; user-raised ToolError is an intentional message to the LLM. Neither needs a traceback at logger.exception level.
Async toolsets and the activity-scoped async-tool manager now outlive a pause (e.g. handoff to a sub-task like GetEmailTask). Replies queued by a background tool are deferred until the originating activity is current, unpaused, and has no in-flight speech or tool dispatch. Key pieces: - Toolset setup moves from `_start_session` to `start()` so MCP and AsyncToolset state persist across resume. Tear-down only runs in `aclose()`, not in the pause path. - `_AsyncToolManager` now knows its `owning_activity`; the eager chat_ctx insert and the deferred reply route through that activity's agent (or the current activity for session-scoped toolsets). - `AgentActivity.wait_for_idle` / `AgentSession.wait_for_idle` are the new readiness gates; `wait_for_idle` also drains `_background_speeches` so an async tool that fires during another tool's dispatch waits for that dispatch to complete. - The deferred-reply task is watched by the active `RunResult` so a run in flight doesn't return before the reply lands. - `REPLY_INSTRUCTIONS` asks the LLM to return an empty response when the update was already conveyed, replacing the timestamp-based skip that produced silent drops. - `ActivityClosedError` lives next to `AgentActivity`; `_deliver_reply` catches it via lazy import to avoid a runtime cycle.
Drop the update_chat_ctx await between the _pending_updates snapshot/clear and return — preserves the 'no await after this line' invariant so a concurrent _enqueue_reply can't stash an update that the in-flight _deliver_reply never sees. The chat_ctx for the handoff case is built synchronously and threaded into session.generate_reply via the chat_ctx kwarg. Also mirror normal tool dispatch by inserting items into session.history at enqueue time.
When _aclose_impl set _activity = None it left any pending wait_for_idle waiter blocked on _activity_changed_event forever. Set the event on close, and have wait_for_idle bail with ActivityClosedError once self._closing is set and no activity remains.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Direct registration. Any
@function_toolwith anAsyncRunContextparameter is routed through an activity-scoped_AsyncToolManagerautomatically — no need to put it inside anAsyncToolset.get_running_tasks/cancel_tasksurface on the activity only while a task is in flight.AsyncRunContextis re-exported fromlivekit.agents.Survives pause/resume. Async toolsets and the activity's manager outlive a pause. When an agent hands off to a sub-task (e.g.
GetEmailTask), in-flight tasks keep running; their replies are delivered once the original activity is current and idle again. Toolset setup moved intostart()so MCP / async-tool state persists acrossresume().wait_for_idlegates the deferred reply. New on bothAgentActivityandAgentSession. Drains in-flight speech, the speech queue, and_background_speeches, so a deferred tool reply doesn't fire while a regular tool's dispatch (one that handed off to a sub-task) is still running.wait_for_inactiveis deprecated and forwards.mock_tools()works with async tools. The mock-precedence check moved ahead of the async-manager branch in dispatch, so async tools can be replaced with mocks in tests the same way regular tools can.Unified
ToolErrorcontract. Argument validation raisesToolErrordirectly (one place owns the message the LLM sees);execute_function_callno longer logs a traceback forToolError. Regular and async dispatch share a single_execute(ctx)closure.Backward compatibility
AsyncToolset(tools=[...])unchanged —_wrap_toolbody delegates to its own_AsyncToolManagerbut external behavior (duplicate handling, progress narration, cancel) is preserved.Example
No
AsyncToolsetwrapping. The agent acknowledges immediately via the update, then narrates the final result.See
examples/voice_agents/basic_agent.pyfor the full example.