-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathcustom_agent_executors.py
More file actions
144 lines (115 loc) · 5.57 KB
/
custom_agent_executors.py
File metadata and controls
144 lines (115 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from agent_framework import (
Agent,
Executor,
Message,
WorkflowBuilder,
WorkflowContext,
handler,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
"""
Sample: Custom Agent Executors in a Workflow
This sample uses two custom executors. A Writer agent creates or edits content,
then hands the conversation to a Reviewer agent which evaluates and finalizes the result.
Purpose:
Show how to wrap chat agents created by AzureOpenAIResponsesClient inside workflow executors. Demonstrate the @handler
pattern with typed inputs and typed WorkflowContext[T] outputs, connect executors with the fluent WorkflowBuilder,
and finish by yielding outputs from the terminal node.
Note: When an agent is passed to a workflow, the workflow wraps the agent in a more sophisticated executor.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- Azure OpenAI configured for AzureOpenAIResponsesClient with required environment variables.
- Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample.
- Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming or non streaming runs.
"""
class Writer(Executor):
"""Custom executor that owns a domain specific agent responsible for generating content.
This class demonstrates:
- Attaching a Agent to an Executor so it participates as a node in a workflow.
- Using a @handler method to accept a typed input and forward a typed output via ctx.send_message.
"""
agent: Agent
def __init__(self, id: str = "writer"):
# Create a domain specific agent using your configured AzureOpenAIResponsesClient.
self.agent = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
# Associate the agent with this executor node. The base Executor stores it on self.agent.
super().__init__(id=id)
@handler
async def handle(self, message: Message, ctx: WorkflowContext[list[Message], str]) -> None:
"""Generate content using the agent and forward the updated conversation.
Contract for this handler:
- message is the inbound user Message.
- ctx is a WorkflowContext that expects a list[Message] to be sent downstream.
Pattern shown here:
1) Seed the conversation with the inbound message.
2) Run the attached agent to produce assistant messages.
3) Forward the cumulative messages to the next executor with ctx.send_message.
"""
# Start the conversation with the incoming user message.
messages: list[Message] = [message]
# Run the agent and extend the conversation with the agent's messages.
response = await self.agent.run(messages)
messages.extend(response.messages)
# Forward the accumulated messages to the next executor in the workflow.
await ctx.send_message(messages)
class Reviewer(Executor):
"""Custom executor that owns a review agent and completes the workflow.
This class demonstrates:
- Consuming a typed payload produced upstream.
- Yielding the final text outcome to complete the workflow.
"""
agent: Agent
def __init__(self, id: str = "reviewer"):
# Create a domain specific agent that evaluates and refines content.
self.agent = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are an excellent content reviewer. You review the content and provide feedback to the writer."
),
)
super().__init__(id=id)
@handler
async def handle(self, messages: list[Message], ctx: WorkflowContext[list[Message], str]) -> None:
"""Review the full conversation transcript and complete with a final string.
This node consumes all messages so far. It uses its agent to produce the final text,
then signals completion by yielding the output.
"""
response = await self.agent.run(messages)
await ctx.yield_output(response.text)
async def main():
"""Build and run a simple two node agent workflow: Writer then Reviewer."""
# Create the executors
writer = Writer()
reviewer = Reviewer()
# Build the workflow using the fluent builder.
# Set the start node and connect an edge from writer to reviewer.
workflow = WorkflowBuilder(start_executor=writer).add_edge(writer, reviewer).build()
# Run the workflow with the user's initial message.
# For foundational clarity, use run (non streaming) and print the workflow output.
events = await workflow.run(
Message("user", ["Create a slogan for a new electric SUV that is affordable and fun to drive."])
)
# The terminal node yields output; print its contents.
outputs = events.get_outputs()
if outputs:
print(outputs[-1])
if __name__ == "__main__":
asyncio.run(main())