Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions src/a2a/server/routes/jsonrpc_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
DefaultServerCallContextBuilder,
ServerCallContextBuilder,
)
from a2a.types import A2ARequest
from a2a.types.a2a_pb2 import (
AgentCard,
CancelTaskRequest,
Expand Down Expand Up @@ -349,7 +348,7 @@ async def handle_requests(self, request: Request) -> Response: # noqa: PLR0911,
else:
try:
raw_result = await self._process_non_streaming_request(
request_id, specific_request, call_context
specific_request, call_context
)
handler_result = JSONRPC20Response(
result=raw_result, _id=request_id
Expand Down Expand Up @@ -385,7 +384,7 @@ async def handle_requests(self, request: Request) -> Response: # noqa: PLR0911,
async def _process_streaming_request(
self,
request_id: str | int | None,
request_obj: A2ARequest,
request_obj: Any,
context: ServerCallContext,
) -> AsyncGenerator[dict[str, Any], None]:
"""Processes streaming requests (SendStreamingMessage or SubscribeToTask).
Expand All @@ -399,11 +398,12 @@ async def _process_streaming_request(
An `AsyncGenerator` object to stream results to the client.
"""
stream: AsyncGenerator | None = None
if isinstance(request_obj, SendMessageRequest):
method = context.state.get('method')
if method == 'SendStreamingMessage':
stream = self.request_handler.on_message_send_stream(
request_obj, context
)
elif isinstance(request_obj, SubscribeToTaskRequest):
elif method == 'SubscribeToTask':
stream = self.request_handler.on_subscribe_to_task(
request_obj, context
)
Expand Down Expand Up @@ -538,55 +538,53 @@ async def _handle_get_extended_agent_card(
@validate_version(constants.PROTOCOL_VERSION_1_0)
async def _process_non_streaming_request( # noqa: PLR0911
self,
request_id: str | int | None,
request_obj: A2ARequest,
request_obj: Any,
context: ServerCallContext,
) -> dict[str, Any] | None:
"""Processes non-streaming requests (message/send, tasks/get, tasks/cancel, tasks/pushNotificationConfig/*).
"""Processes non-streaming requests.

Args:
request_id: The ID of the request.
request_obj: The proto request message.
context: The ServerCallContext for the request.

Returns:
A dict containing the result or error.
"""
match request_obj:
case SendMessageRequest():
method = context.state.get('method')
match method:
case 'SendMessage':
return await self._handle_send_message(request_obj, context)
case CancelTaskRequest():
case 'CancelTask':
return await self._handle_cancel_task(request_obj, context)
case GetTaskRequest():
case 'GetTask':
return await self._handle_get_task(request_obj, context)
case ListTasksRequest():
case 'ListTasks':
return await self._handle_list_tasks(request_obj, context)
case TaskPushNotificationConfig():
case 'CreateTaskPushNotificationConfig':
return await self._handle_create_task_push_notification_config(
request_obj, context
)
case GetTaskPushNotificationConfigRequest():
case 'GetTaskPushNotificationConfig':
return await self._handle_get_task_push_notification_config(
request_obj, context
)
case ListTaskPushNotificationConfigsRequest():
case 'ListTaskPushNotificationConfigs':
return await self._handle_list_task_push_notification_configs(
request_obj, context
)
case DeleteTaskPushNotificationConfigRequest():
return await self._handle_delete_task_push_notification_config(
case 'DeleteTaskPushNotificationConfig':
await self._handle_delete_task_push_notification_config(
request_obj, context
)
case GetExtendedAgentCardRequest():
return None
case 'GetExtendedAgentCard':
return await self._handle_get_extended_agent_card(
request_obj, context
)
case _:
logger.error(
'Unhandled validated request type: %s', type(request_obj)
)
logger.error('Unhandled method: %s', method)
raise UnsupportedOperationError(
message=f'Request type {type(request_obj).__name__} is unknown.'
message=f'Method {method} is not supported.'
)

def _create_response(
Expand Down
Loading
Loading