Skip to content

Commit 7fe642d

Browse files
longcwdavidzhao
andauthored
improve EndCallTool (#4563)
Co-authored-by: David Zhao <dz@livekit.io>
1 parent 883823c commit 7fe642d

3 files changed

Lines changed: 149 additions & 54 deletions

File tree

examples/voice_agents/session_close_callback.py

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,9 @@
22

33
from dotenv import load_dotenv
44

5-
from livekit.agents import (
6-
Agent,
7-
AgentServer,
8-
AgentSession,
9-
CloseEvent,
10-
JobContext,
11-
cli,
12-
room_io,
13-
utils,
14-
)
5+
from livekit.agents import Agent, AgentServer, AgentSession, CloseEvent, JobContext, cli
156
from livekit.agents.beta.tools import EndCallTool
16-
from livekit.plugins import silero
7+
from livekit.plugins import google, silero # noqa: F401
178

189
logger = logging.getLogger("my-worker")
1910
logger.setLevel(logging.INFO)
@@ -25,26 +16,25 @@
2516
# or when the worker is shutting down. When closing the session, agent will be interrupted
2617
# and the last agent message will be added to the chat context.
2718

28-
server = AgentServer()
29-
3019

3120
class MyAgent(Agent):
3221
def __init__(self):
3322
super().__init__(
3423
instructions="You are a helpful assistant.",
35-
tools=[EndCallTool()],
24+
stt="assemblyai/universal-streaming",
25+
llm="openai/gpt-4.1-mini",
26+
tts="cartesia/sonic-3",
27+
# llm=google.realtime.RealtimeModel(),
28+
tools=[
29+
EndCallTool(
30+
end_instructions="thanks the user for calling and tell them goodbye",
31+
delete_room=True, # this will disconnect all remote participants, including SIP callers
32+
)
33+
],
3634
)
3735

38-
@utils.log_exceptions(logger=logger)
39-
async def on_exit(self) -> None:
40-
logger.info("exiting the agent")
41-
if self.session.current_speech:
42-
await self.session.current_speech
43-
44-
logger.info("generating goodbye message")
45-
await self.session.generate_reply(
46-
instructions="say goodbye to the user", tool_choice="none"
47-
)
36+
async def on_enter(self) -> None:
37+
self.session.generate_reply(instructions="say hello to the user")
4838

4939

5040
server = AgentServer()
@@ -53,22 +43,10 @@ async def on_exit(self) -> None:
5343
@server.rtc_session()
5444
async def entrypoint(ctx: JobContext):
5545
session = AgentSession(
56-
stt="assemblyai/universal-streaming",
57-
llm="openai/gpt-4.1-mini",
58-
tts="rime/arcana",
5946
vad=silero.VAD.load(),
6047
)
6148

62-
# session will be closed automatically when the linked participant disconnects
63-
# with reason CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED
64-
# or you can disable it by setting the RoomInputOptions.close_on_disconnect to False
65-
await session.start(
66-
agent=MyAgent(),
67-
room=ctx.room,
68-
room_options=room_io.RoomOptions(
69-
delete_room_on_close=True,
70-
),
71-
)
49+
await session.start(agent=MyAgent(), room=ctx.room)
7250

7351
@session.on("close")
7452
def on_close(ev: CloseEvent):
Lines changed: 120 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,132 @@
1-
from ...llm import Tool, Toolset, function_tool
1+
import asyncio
2+
from collections.abc import Awaitable
3+
from typing import Any, Callable
4+
5+
from ...job import get_job_context
6+
from ...llm import RealtimeModel, Tool, Toolset, function_tool
27
from ...log import logger
3-
from ...voice.events import RunContext
8+
from ...voice.events import CloseEvent, RunContext, SpeechCreatedEvent
9+
from ...voice.speech_handle import SpeechHandle
10+
11+
END_CALL_DESCRIPTION = """
12+
Ends the current call and disconnects immediately.
13+
14+
Call when:
15+
- The user clearly indicates they are done (e.g., “that’s all, bye”).
16+
- The agent determines the conversation is complete and should end.
17+
18+
Do not call when:
19+
- The user asks to pause, hold, or transfer.
20+
- Intent is unclear.
21+
22+
This is the final action the agent can take.
23+
Once called, no further interaction is possible with the user.
24+
Don't generate any other text or response when the tool is called.
25+
"""
426

527

628
class EndCallTool(Toolset):
7-
@function_tool(name="end_call")
8-
async def _end_call(self, ctx: RunContext) -> None:
29+
def __init__(
30+
self,
31+
*,
32+
extra_description: str = "",
33+
delete_room: bool = True,
34+
end_instructions: str | None = "say goodbye to the user",
35+
on_tool_called: Callable[[Toolset.ToolCalledEvent], Awaitable[None]] | None = None,
36+
on_tool_completed: Callable[[Toolset.ToolCompletedEvent], Awaitable[None]] | None = None,
37+
):
938
"""
10-
Ends the current call and disconnects immediately.
39+
This tool allows the agent to end the call and disconnect from the room.
1140
12-
Call when:
13-
- The user clearly indicates they are done (e.g., “that’s all, bye”).
14-
- The agent determines the conversation is complete and should end.
41+
Args:
42+
extra_description: Additional description to add to the end call tool.
43+
delete_room: Whether to delete the room when the user ends the call. deleting the room disconnects all remote users, including SIP callers.
44+
end_instructions: Tool output to the LLM for generating the tool response.
45+
on_tool_called: Callback to call when the tool is called.
46+
on_tool_completed: Callback to call when the tool is completed.
47+
"""
48+
super().__init__()
49+
self._delete_room = delete_room
50+
self._extra_description = extra_description
1551

16-
Do not call when:
17-
- The user asks to pause, hold, or transfer.
18-
- Intent is unclear.
52+
self._end_instructions = end_instructions
53+
self._on_tool_called = on_tool_called
54+
self._on_tool_completed = on_tool_completed
1955

20-
This is the final action the agent can take.
21-
Once called, no further interaction is possible with the user.
22-
"""
56+
self._end_call_tool = function_tool(
57+
self._end_call,
58+
name="end_call",
59+
description=f"{END_CALL_DESCRIPTION}\n{extra_description}",
60+
)
61+
self._shutdown_session_task: asyncio.Task[None] | None = None
62+
63+
async def _end_call(self, ctx: RunContext) -> Any | None:
2364
logger.debug("end_call tool called")
24-
ctx.session.shutdown()
65+
llm_v = ctx.session.current_agent._get_activity_or_raise().llm
66+
67+
def _on_speech_done(_: SpeechHandle) -> None:
68+
if (
69+
not isinstance(llm_v, RealtimeModel)
70+
or not llm_v.capabilities.auto_tool_reply_generation
71+
):
72+
# tool reply will reuse the same speech handle, so we can shutdown the session
73+
# directly after this speech handle is done
74+
ctx.session.shutdown()
75+
else:
76+
self._shutdown_session_task = asyncio.create_task(
77+
self._delayed_session_shutdown(ctx)
78+
)
79+
80+
ctx.speech_handle.add_done_callback(_on_speech_done)
81+
ctx.session.once("close", self._on_session_close)
82+
83+
if self._on_tool_called:
84+
await self._on_tool_called(Toolset.ToolCalledEvent(ctx=ctx, arguments={}))
85+
86+
completed_ev = Toolset.ToolCompletedEvent(ctx=ctx, output=self._end_instructions)
87+
if self._on_tool_completed:
88+
await self._on_tool_completed(completed_ev)
89+
90+
return completed_ev.output
91+
92+
async def _delayed_session_shutdown(self, ctx: RunContext) -> None:
93+
"""Shutdown the session after the tool reply is played out"""
94+
speech_created_fut = asyncio.Future[SpeechHandle]()
95+
96+
@ctx.session.once("speech_created")
97+
def _on_speech_created(ev: SpeechCreatedEvent) -> None:
98+
if not speech_created_fut.done():
99+
speech_created_fut.set_result(ev.speech_handle)
100+
101+
try:
102+
speech_handle = await asyncio.wait_for(speech_created_fut, timeout=5.0)
103+
await speech_handle
104+
except asyncio.TimeoutError:
105+
logger.warning("tool reply timed out, shutting down session")
106+
finally:
107+
ctx.session.off("speech_created", _on_speech_created)
108+
ctx.session.shutdown()
109+
110+
def _on_session_close(self, ev: CloseEvent) -> None:
111+
"""Close the job process when AgentSession is closed"""
112+
if self._shutdown_session_task:
113+
# cleanup
114+
self._shutdown_session_task.cancel()
115+
self._shutdown_session_task = None
116+
117+
job_ctx = get_job_context()
118+
119+
if self._delete_room:
120+
121+
async def _on_shutdown() -> None:
122+
logger.info("deleting the room because the user ended the call")
123+
await job_ctx.delete_room()
124+
125+
job_ctx.add_shutdown_callback(_on_shutdown)
126+
127+
# shutdown the job process
128+
job_ctx.shutdown(reason=ev.reason.value)
25129

26130
@property
27131
def tools(self) -> list[Tool]:
28-
return [self._end_call]
132+
return [self._end_call_tool]

livekit-agents/livekit/agents/llm/tool_context.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
from collections.abc import Awaitable, Sequence
2222
from dataclasses import dataclass
2323
from enum import Flag, auto
24-
from typing import Any, Callable, Generic, Literal, TypeVar, Union, overload
24+
from typing import TYPE_CHECKING, Any, Callable, Generic, Literal, TypeVar, Union, overload
2525

2626
from typing_extensions import NotRequired, ParamSpec, Required, Self, TypedDict, TypeGuard
2727

2828
from . import _provider_format
2929

30+
if TYPE_CHECKING:
31+
from ..voice.events import RunContext
32+
3033

3134
class Tool(ABC): # noqa: B024
3235
pass
@@ -39,6 +42,16 @@ class ProviderTool(Tool):
3942

4043

4144
class Toolset(ABC):
45+
@dataclass
46+
class ToolCalledEvent:
47+
ctx: RunContext
48+
arguments: dict[str, Any]
49+
50+
@dataclass
51+
class ToolCompletedEvent:
52+
ctx: RunContext
53+
output: Any | Exception | None
54+
4255
@property
4356
@abstractmethod
4457
def tools(self) -> list[Tool]:

0 commit comments

Comments
 (0)