Use this file to discover all available pages before exploring further.
Workflows enable you to orchestrate multiple agents and executors in a graph-based structure. They provide control flow, state management, and coordination for complex multi-step processes.
from agent_framework import WorkflowBuilderfrom agent_framework.azure import AzureOpenAIResponsesClientfrom azure.identity import AzureCliCredentialclient = AzureOpenAIResponsesClient(credential=AzureCliCredential())writer = client.as_agent( name="Writer", instructions="You are a content writer.")reviewer = client.as_agent( name="Reviewer", instructions="You are a content reviewer. Provide feedback.")# Agents are executors and can be connected with edgesworkflow = WorkflowBuilder(start_executor=writer) \ .add_edge(writer, reviewer) \ .build()
from agent_framework import WorkflowBuilderfrom agent_framework.azure import AzureOpenAIResponsesClientfrom azure.identity import AzureCliCredentialimport osclient = AzureOpenAIResponsesClient( project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"], deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"], credential=AzureCliCredential())writer = client.as_agent( instructions="You are a content writer. Create content based on requests.", name="Writer")reviewer = client.as_agent( instructions="You are a content reviewer. Provide concise, actionable feedback.", name="Reviewer")workflow = WorkflowBuilder(start_executor=writer) \ .add_edge(writer, reviewer) \ .build()events = await workflow.run( "Create a slogan for an affordable electric SUV")for output in events.get_outputs(): print(f"{output.messages[0].author_name}: {output.text}")
Specify input/output types explicitly on @handler:
class FlexibleProcessor(Executor): def __init__(self, id: str): super().__init__(id=id) @handler(input=str | int, output=str) async def process(self, message, ctx) -> None: # type: ignore """Process either string or int input. The input and output types are explicitly declared on @handler, so we don't need type hints on the function parameters. """ result = f"Processed: {message}" await ctx.send_message(result) # type: ignore
# Run and wait for completionevents = await workflow.run("initial input")# Get all outputsoutputs = events.get_outputs()print(outputs)# Get final statefinal_state = events.get_final_state()print(final_state) # WorkflowRunState.IDLE or COMPLETED
workflow: name: content_pipeline start: writer executors: - id: writer type: agent instructions: You are a content writer. - id: reviewer type: agent instructions: You are a content reviewer. edges: - from: writer to: reviewer
Load and run:
from agent_framework.declarative import load_workflow_from_yamlworkflow = load_workflow_from_yaml("workflow.yaml", client=client)events = await workflow.run("Create a blog post")
async for event in workflow.run("input", stream=True): match event.type: case "workflow_start": print("Workflow started") case "executor_start": print(f"Executor {event.executor_id} started") case "executor_output": print(f"Output: {event.data}") case "executor_complete": print(f"Executor {event.executor_id} completed") case "workflow_output": print(f"Final output: {event.data}") case "workflow_complete": print("Workflow completed") case "error": print(f"Error: {event.error}")
Workflows automatically emit OpenTelemetry traces and metrics:
from opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProvider# Configure tracingtrace.set_tracer_provider(TracerProvider())# Workflows will emit spans for each executorevents = await workflow.run("input")
from agent_framework import WorkflowBuilderfrom agent_framework.azure import AzureOpenAIResponsesClientfrom azure.identity import AzureCliCredentialclient = AzureOpenAIResponsesClient(credential=AzureCliCredential())# Create specialized agentsresearcher = client.as_agent( name="Researcher", instructions="Research topics and gather information.")writer = client.as_agent( name="Writer", instructions="Write engaging content based on research.")editor = client.as_agent( name="Editor", instructions="Edit and improve content for clarity and style.")reviewer = client.as_agent( name="Reviewer", instructions="Review content and provide final approval.")# Build the pipelineworkflow = WorkflowBuilder(start_executor=researcher) \ .add_edge(researcher, writer) \ .add_edge(writer, editor) \ .add_edge(editor, reviewer) \ .build()# Run the pipelineevents = await workflow.run( "Create a blog post about quantum computing")for output in events.get_outputs(): print(output.text)