diff --git a/README.md b/README.md index 99085f6..70e63fd 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,8 @@ uv add agent-client-protocol - **Spec parity:** Generated Pydantic models in `acp.schema` track every ACP release so payloads stay valid. - **Runtime ergonomics:** Async base classes, stdio JSON-RPC plumbing, and lifecycle helpers keep custom agents tiny. -- **Examples ready:** Streaming, permissions, Gemini bridge, and duet demos live under `examples/`. +- **Multiple transports:** Stdio for local agents, HTTP/WebSocket for remote deployments — same `Connection` API for both. +- **Examples ready:** Streaming, permissions, Gemini bridge, HTTP/WebSocket, and duet demos live under `examples/`. - **Helper builders:** `acp.helpers` mirrors the Go/TS SDK APIs for content blocks, tool calls, and session updates. - **Contrib utilities:** Session accumulators, tool call trackers, and permission brokers share patterns from real deployments. @@ -53,9 +54,10 @@ See real adopters like kimi-cli in the [Use Cases list](https://agentclientproto ## Project layout - `src/acp/`: runtime package (agents, clients, transports, helpers, schema bindings, contrib utilities) +- `src/acp/http/`: HTTP/WebSocket transport — `WebSocketStreamAdapter`, `connect_http_agent`, and Starlette wrapper - `schema/`: upstream JSON schema sources (regenerate via `make gen-all`) - `docs/`: MkDocs content backing the published documentation -- `examples/`: runnable scripts covering stdio orchestration patterns +- `examples/`: runnable scripts covering stdio and HTTP/WebSocket orchestration patterns - `tests/`: pytest suite with golden fixtures and optional Gemini coverage ## Developer commands diff --git a/docs/http-transport.md b/docs/http-transport.md new file mode 100644 index 0000000..4815216 --- /dev/null +++ b/docs/http-transport.md @@ -0,0 +1,152 @@ +# HTTP/WebSocket Transport + +The `acp.http` module provides a WebSocket-based transport layer for ACP, enabling agents and clients to communicate over HTTP instead of stdio. This is useful for deploying agents as remote web services. + +## Overview + +While stdio transport works well for local agents spawned as child processes, many deployment scenarios require network-based communication: + +- Agents running on cloud infrastructure (e.g., AWS Bedrock AgentCore) +- Multiple clients connecting to a shared agent service +- Agents behind load balancers or API gateways + +The HTTP/WebSocket transport uses the same `Connection` API as stdio, so existing agent and client code works without changes. + +## Architecture + +The adapter uses a 4-queue architecture to bridge WebSocket messages with asyncio streams: + +``` +INCOMING: WebSocket.recv() → StreamReader.feed_data() → ACP Connection +OUTGOING: ACP Connection → StreamWriter.write() → deque buffer → WebSocket.send() +``` + +Two background tasks pump messages through the bridge: + +- **Receive loop**: Reads from WebSocket, feeds bytes to `asyncio.StreamReader` +- **Send loop**: Drains the outgoing `deque` buffer and sends via WebSocket + +This design allows `AgentSideConnection` and `ClientSideConnection` to work unchanged — they read/write asyncio streams as usual. + +## Components + +### `WebSocketStreamAdapter` + +Bridges any WebSocket connection with asyncio `StreamReader`/`StreamWriter`: + +```python +from acp.http import WebSocketStreamAdapter + +adapter = WebSocketStreamAdapter(websocket) +await adapter.start() + +# Use adapter.reader and adapter.writer with ACP Connection +conn = AgentSideConnection(agent, adapter.writer, adapter.reader) + +# Clean up +await adapter.close() +``` + +### `connect_http_agent` + +Async context manager for client-side WebSocket connections: + +```python +from acp.http import connect_http_agent + +async with connect_http_agent(client, "ws://localhost:8080/ws") as conn: + await conn.initialize(protocol_version=PROTOCOL_VERSION) + session = await conn.new_session(mcp_servers=[], cwd=".") + await conn.prompt(session_id=session.session_id, prompt=[text_block("Hello")]) +``` + +Accepts optional `**ws_kwargs` passed through to `websockets.connect()` (e.g., `max_size`, `extra_headers`, `compression`). + +### `WebSocketLike` Protocol + +Any object implementing `recv()`, `send()`, and `close()` works with the adapter: + +```python +class WebSocketLike(Protocol): + async def recv(self) -> str | bytes: ... + async def send(self, data: str | bytes) -> None: ... + async def close(self) -> None: ... +``` + +### `StarletteWebSocketWrapper` + +Pre-built wrapper for Starlette's WebSocket (which uses `receive_text()`/`send_text()` instead of `recv()`/`send()`): + +```python +from acp.http import StarletteWebSocketWrapper + +wrapped = StarletteWebSocketWrapper(starlette_websocket) +adapter = WebSocketStreamAdapter(wrapped) +``` + +## Server-Side Example + +Serve an ACP agent over WebSocket using Starlette: + +```python +from starlette.applications import Starlette +from starlette.routing import WebSocketRoute +from starlette.websockets import WebSocket + +from acp.agent.connection import AgentSideConnection +from acp.http import StarletteWebSocketWrapper, WebSocketStreamAdapter + +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + wrapped = StarletteWebSocketWrapper(websocket) + adapter = WebSocketStreamAdapter(wrapped) + + await adapter.start() + + agent = MyAgent() + conn = AgentSideConnection( + to_agent=agent, + input_stream=adapter.writer, + output_stream=adapter.reader, + listening=False, + ) + + await conn.listen() + await adapter.close() + +app = Starlette(routes=[WebSocketRoute("/ws", websocket_endpoint)]) +``` + +See `examples/http_echo_agent.py` for a complete runnable server. + +## Client-Side Example + +Connect to a remote agent: + +```python +from acp.http import connect_http_agent +from acp import PROTOCOL_VERSION, text_block +from acp.schema import ClientCapabilities, Implementation + +async with connect_http_agent(MyClient(), "ws://agent.example.com/ws") as conn: + await conn.initialize( + protocol_version=PROTOCOL_VERSION, + client_capabilities=ClientCapabilities(), + client_info=Implementation(name="my-client", title="My Client", version="0.1.0"), + ) + session = await conn.new_session(mcp_servers=[], cwd="/workspace") + await conn.prompt(session_id=session.session_id, prompt=[text_block("Hello!")]) +``` + +See `examples/http_client.py` for a complete runnable client with interactive prompt. + +## Dependencies + +- **`websockets>=12.0`** — Required (core dependency). Used by `connect_http_agent` for client-side connections. +- **`starlette`** — Optional, for server-side `StarletteWebSocketWrapper`. Install separately: `pip install starlette uvicorn`. + +## Limitations + +- Messages use newline-delimited JSON format (one JSON-RPC message per line), consistent with stdio transport. +- Default maximum WebSocket message size is 50MB (matching stdio buffer limit). Override via `max_size` kwarg. +- The `StarletteWebSocketWrapper` currently handles text frames only. Binary WebSocket frames are converted to UTF-8. diff --git a/docs/quickstart.md b/docs/quickstart.md index 401b7d5..2b2e89b 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -81,6 +81,48 @@ Open the Agents panel and start the session. Each message you send should be ech Any ACP client that communicates over stdio can spawn the same script; no additional transport configuration is required. +### HTTP/WebSocket (remote agents) + +For agents deployed as web services (e.g., on AWS Bedrock AgentCore), you can connect via WebSocket instead of stdio. Start the HTTP echo agent server: + +```bash +# Terminal 1: Start the server +pip install starlette uvicorn +python examples/http_echo_agent.py +``` + +Then connect a client from another terminal: + +```bash +# Terminal 2: Connect via WebSocket +python examples/http_client.py ws://localhost:8080/ws +``` + +Or connect programmatically: + +```python +import asyncio +from acp import PROTOCOL_VERSION, text_block +from acp.http import connect_http_agent +from acp.schema import ClientCapabilities, Implementation + +async def main(): + async with connect_http_agent(MyClient(), "ws://localhost:8080/ws") as conn: + await conn.initialize( + protocol_version=PROTOCOL_VERSION, + client_capabilities=ClientCapabilities(), + client_info=Implementation(name="my-client", title="My Client", version="0.1.0"), + ) + session = await conn.new_session(mcp_servers=[], cwd=".") + await conn.prompt(session_id=session.session_id, prompt=[text_block("Hello!")]) + +asyncio.run(main()) +``` + +The `connect_http_agent` context manager handles WebSocket connection lifecycle and uses the same `ClientSideConnection` API as stdio, so all existing client code works unchanged. + +> **Note:** The server-side adapter requires `starlette` and `uvicorn` (listed as dev dependencies). For custom server frameworks, implement the `WebSocketLike` protocol (`recv`, `send`, `close`) and pass to `WebSocketStreamAdapter`. + ### Programmatic launch Prefer to drive agents directly from Python? The `spawn_agent_process` helper wires stdio and lifecycle management for you: diff --git a/examples/http_client.py b/examples/http_client.py new file mode 100644 index 0000000..e741306 --- /dev/null +++ b/examples/http_client.py @@ -0,0 +1,218 @@ +"""HTTP/WebSocket client example for ACP. + +This example shows how to connect to an ACP agent over HTTP/WebSocket using +the connect_http_agent function. This is the client-side counterpart to +http_echo_agent.py. + +Requirements: + pip install agent-client-protocol + +Usage: + # Start the server first (in another terminal): + python examples/http_echo_agent.py + + # Then run the client: + python examples/http_client.py ws://localhost:8080/ws +""" + +import asyncio +import logging +import sys +from typing import Any + +from acp import ( + PROTOCOL_VERSION, + Client, + RequestError, + text_block, +) +from acp.http import connect_http_agent +from acp.schema import ( + AgentMessageChunk, + AgentPlanUpdate, + AgentThoughtChunk, + AudioContentBlock, + AvailableCommandsUpdate, + ClientCapabilities, + CreateTerminalResponse, + CurrentModeUpdate, + EmbeddedResourceContentBlock, + EnvVariable, + ImageContentBlock, + Implementation, + KillTerminalCommandResponse, + PermissionOption, + ReadTextFileResponse, + ReleaseTerminalResponse, + RequestPermissionResponse, + ResourceContentBlock, + TerminalOutputResponse, + TextContentBlock, + ToolCall, + ToolCallProgress, + ToolCallStart, + UserMessageChunk, + WaitForTerminalExitResponse, + WriteTextFileResponse, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleClient(Client): + """Example client implementation with minimal method stubs.""" + + async def request_permission( + self, options: list[PermissionOption], session_id: str, tool_call: ToolCall, **kwargs: Any + ) -> RequestPermissionResponse: + raise RequestError.method_not_found("session/request_permission") + + async def write_text_file( + self, content: str, path: str, session_id: str, **kwargs: Any + ) -> WriteTextFileResponse | None: + raise RequestError.method_not_found("fs/write_text_file") + + async def read_text_file( + self, path: str, session_id: str, limit: int | None = None, line: int | None = None, **kwargs: Any + ) -> ReadTextFileResponse: + raise RequestError.method_not_found("fs/read_text_file") + + async def create_terminal( + self, + command: str, + session_id: str, + args: list[str] | None = None, + cwd: str | None = None, + env: list[EnvVariable] | None = None, + output_byte_limit: int | None = None, + **kwargs: Any, + ) -> CreateTerminalResponse: + raise RequestError.method_not_found("terminal/create") + + async def terminal_output(self, session_id: str, terminal_id: str, **kwargs: Any) -> TerminalOutputResponse: + raise RequestError.method_not_found("terminal/output") + + async def release_terminal( + self, session_id: str, terminal_id: str, **kwargs: Any + ) -> ReleaseTerminalResponse | None: + raise RequestError.method_not_found("terminal/release") + + async def wait_for_terminal_exit( + self, session_id: str, terminal_id: str, **kwargs: Any + ) -> WaitForTerminalExitResponse: + raise RequestError.method_not_found("terminal/wait_for_exit") + + async def kill_terminal( + self, session_id: str, terminal_id: str, **kwargs: Any + ) -> KillTerminalCommandResponse | None: + raise RequestError.method_not_found("terminal/kill") + + async def session_update( + self, + session_id: str, + update: UserMessageChunk + | AgentMessageChunk + | AgentThoughtChunk + | ToolCallStart + | ToolCallProgress + | AgentPlanUpdate + | AvailableCommandsUpdate + | CurrentModeUpdate, + **kwargs: Any, + ) -> None: + """Handle session updates from the agent.""" + if not isinstance(update, AgentMessageChunk): + return + + content = update.content + text: str + if isinstance(content, TextContentBlock): + text = content.text + elif isinstance(content, ImageContentBlock): + text = "" + elif isinstance(content, AudioContentBlock): + text = "