Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microsoft/agent-framework/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Microsoft Agent Framework provides multiple orchestration patterns for coordinating agents and executors. Choose the pattern that best fits your use case:
  • Sequential: Process steps in order, one after another
  • Concurrent: Run multiple agents/executors in parallel
  • Conditional: Route based on message content or state
  • Loops: Iterate until a condition is met
  • Complex graphs: Combine patterns for sophisticated workflows

Sequential Orchestration

Sequential workflows execute executors in a linear pipeline, where each step processes the output of the previous step.
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureOpenAIResponsesClient

client = AzureOpenAIResponsesClient(...)

researcher = client.as_agent(
    name="researcher",
    instructions="Research the topic and provide insights."
)

writer = client.as_agent(
    name="writer",
    instructions="Write content based on research."
)

editor = client.as_agent(
    name="editor",
    instructions="Edit and polish the content."
)

workflow = (
    WorkflowBuilder(start_executor=researcher)
    .add_edge(researcher, writer)
    .add_edge(writer, editor)
    .build()
)

events = await workflow.run("Write an article about electric vehicles")

Concurrent Orchestration

Concurrent workflows fan out work to multiple executors in parallel, then aggregate results.

High-Level API

Use ConcurrentBuilder for agent-based concurrent workflows:
from agent_framework.orchestrations import ConcurrentBuilder

researcher = client.as_agent(
    name="researcher",
    instructions="Provide market research insights."
)

marketer = client.as_agent(
    name="marketer",
    instructions="Create marketing strategy."
)

legal = client.as_agent(
    name="legal",
    instructions="Review for compliance concerns."
)

# Build concurrent workflow - fans out to all agents
workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal]
).build()

events = await workflow.run("Launch a new electric bike for urban commuters")

# Get aggregated messages from all agents
outputs = events.get_outputs()
for output in outputs:
    messages = output  # list[Message]
    for msg in messages:
        print(f"{msg.author_name}: {msg.text}")

Custom Aggregation

Define custom aggregation logic for concurrent results:
Python
from agent_framework import Message

def custom_aggregator(results: list[list[Message]]) -> list[Message]:
    """Custom aggregation: concatenate all messages."""
    all_messages = []
    for agent_messages in results:
        all_messages.extend(agent_messages)
    return all_messages

workflow = ConcurrentBuilder(
    participants=[agent1, agent2, agent3],
    aggregator=custom_aggregator
).build()

Low-Level Fan-Out/Fan-In

For custom concurrent patterns, use add_fan_out_edge and add_fan_in_edge:
Python
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler

class Dispatcher(Executor):
    @handler
    async def dispatch(self, task: dict, ctx: WorkflowContext[dict]) -> None:
        await ctx.send_message(task)

class Worker(Executor):
    def __init__(self, id: str, name: str):
        super().__init__(id=id)
        self.name = name
    
    @handler
    async def work(self, task: dict, ctx: WorkflowContext[dict]) -> None:
        result = {"worker": self.name, "result": f"Processed {task}"}
        await ctx.send_message(result)

class Aggregator(Executor):
    @handler
    async def aggregate(self, result: dict, ctx: WorkflowContext[Never, list]) -> None:
        # Collect results from all workers
        results = ctx.get_state("results", [])
        results.append(result)
        ctx.set_state("results", results)
        
        # When all workers complete, yield output
        if len(results) == 3:
            await ctx.yield_output(results)

dispatcher = Dispatcher(id="dispatcher")
workers = [Worker(id=f"worker_{i}", name=f"Worker {i}") for i in range(3)]
aggregator = Aggregator(id="aggregator")

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(dispatcher, workers)
    .add_fan_in_edge(workers, aggregator)
    .build()
)

Conditional Routing

Route messages to different executors based on conditions.

Conditional Edges

Python
class Router(Executor):
    @handler
    async def route(self, data: dict, ctx: WorkflowContext[dict]) -> None:
        await ctx.send_message(data)

def is_urgent(data: dict) -> bool:
    return data.get("priority") == "urgent"

def is_normal(data: dict) -> bool:
    return data.get("priority") == "normal"

workflow = (
    WorkflowBuilder(start_executor=router)
    .add_edge(router, urgent_handler, condition=is_urgent)
    .add_edge(router, normal_handler, condition=is_normal)
    .add_edge(router, default_handler, condition=lambda d: True)  # Catch-all
    .build()
)

Switch-Case Routing

For mutually exclusive routing (exactly one target):
Python
from agent_framework import Case, Default

def is_high_value(order: dict) -> bool:
    return order.get("total", 0) > 1000

def is_medium_value(order: dict) -> bool:
    total = order.get("total", 0)
    return 100 < total <= 1000

workflow = (
    WorkflowBuilder(start_executor=order_classifier)
    .add_switch_case_edge_group(
        order_classifier,
        [
            Case(condition=is_high_value, target=premium_handler),
            Case(condition=is_medium_value, target=standard_handler),
            Default(target=basic_handler),  # Fallback
        ]
    )
    .build()
)
Switch-case evaluates cases in order. The first matching condition routes the message. Always include a Default case.

Loop Patterns

Create feedback loops for iterative refinement.

Simple Loop

Python
from enum import Enum

class Signal(Enum):
    CONTINUE = "continue"
    DONE = "done"

class Generator(Executor):
    def __init__(self, id: str, max_attempts: int = 3):
        super().__init__(id=id)
        self.attempts = 0
        self.max_attempts = max_attempts
    
    @handler
    async def generate(
        self, 
        signal: Signal, 
        ctx: WorkflowContext[str, str]
    ) -> None:
        self.attempts += 1
        content = f"Attempt {self.attempts}: Generated content"
        
        if self.attempts >= self.max_attempts:
            await ctx.yield_output(content)
        else:
            await ctx.send_message(content)

class Reviewer(Executor):
    @handler
    async def review(self, content: str, ctx: WorkflowContext[Signal]) -> None:
        # Simple review logic
        if "good" in content.lower():
            await ctx.send_message(Signal.DONE)
        else:
            await ctx.send_message(Signal.CONTINUE)

generator = Generator(id="generator", max_attempts=5)
reviewer = Reviewer(id="reviewer")

workflow = (
    WorkflowBuilder(start_executor=generator)
    .add_edge(generator, reviewer)
    .add_edge(reviewer, generator)  # Loop back
    .build()
)

events = await workflow.run(Signal.CONTINUE)

Agent Feedback Loop

Real-world example with agents:
Python
writer = client.as_agent(
    name="writer",
    instructions="Write content based on feedback."
)

reviewer = client.as_agent(
    name="reviewer",
    instructions="Review content. Reply 'approved' or provide feedback."
)

class FeedbackRouter(Executor):
    @handler
    async def route(
        self, 
        response: AgentResponse, 
        ctx: WorkflowContext[AgentExecutorRequest | str, str]
    ) -> None:
        if "approved" in response.text.lower():
            await ctx.yield_output(response.text)
        else:
            # Send feedback back to writer
            await ctx.send_message(
                AgentExecutorRequest(
                    messages=[Message("user", text=f"Revise based on: {response.text}")],
                    should_respond=True
                ),
                target_id="writer"
            )

feedback_router = FeedbackRouter(id="feedback_router")

workflow = (
    WorkflowBuilder(start_executor=writer, max_iterations=10)
    .add_edge(writer, reviewer)
    .add_edge(reviewer, feedback_router)
    .add_edge(feedback_router, writer)  # Loop back
    .build()
)

Complex Graph Patterns

Combine patterns for sophisticated orchestration.

Multi-Stage Pipeline with Fan-Out

Python
# Stage 1: Initial processing
preprocessor = Preprocessor(id="preprocessor")

# Stage 2: Parallel analysis
sentiment_analyzer = SentimentAnalyzer(id="sentiment")
entity_extractor = EntityExtractor(id="entities")
topic_classifier = TopicClassifier(id="topics")

# Stage 3: Synthesis
synthesizer = Synthesizer(id="synthesizer")

workflow = (
    WorkflowBuilder(start_executor=preprocessor)
    # Fan out to parallel analyzers
    .add_fan_out_edge(
        preprocessor,
        [sentiment_analyzer, entity_extractor, topic_classifier]
    )
    # Fan in to synthesizer
    .add_fan_in_edge(
        [sentiment_analyzer, entity_extractor, topic_classifier],
        synthesizer
    )
    .build()
)

Conditional Fan-Out with Dynamic Routing

Python
def select_analyzers(task: dict, available: list[str]) -> list[str]:
    """Dynamically select analyzers based on task type."""
    task_type = task.get("type", "")
    
    if task_type == "text":
        return ["sentiment", "topics"]
    elif task_type == "image":
        return ["vision", "ocr"]
    return available  # Use all

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(
        dispatcher,
        [sentiment, topics, vision, ocr],
        selection_func=select_analyzers
    )
    .add_fan_in_edge([sentiment, topics, vision, ocr], aggregator)
    .build()
)

Decision Tree Pattern

Python
from agent_framework import Case, Default

# Root decision
classifier = Classifier(id="classifier")

# Category-specific sub-workflows
category_a_handler = CategoryAHandler(id="category_a")
category_b_handler = CategoryBHandler(id="category_b")
category_c_handler = CategoryCHandler(id="category_c")

# Sub-handlers for category A
a_priority_high = APriorityHigh(id="a_high")
a_priority_low = APriorityLow(id="a_low")

def is_category_a(data: dict) -> bool:
    return data.get("category") == "A"

def is_category_b(data: dict) -> bool:
    return data.get("category") == "B"

def is_high_priority(data: dict) -> bool:
    return data.get("priority") == "high"

workflow = (
    WorkflowBuilder(start_executor=classifier)
    # First level: category routing
    .add_switch_case_edge_group(
        classifier,
        [
            Case(condition=is_category_a, target=category_a_handler),
            Case(condition=is_category_b, target=category_b_handler),
            Default(target=category_c_handler),
        ]
    )
    # Second level: priority routing for category A
    .add_switch_case_edge_group(
        category_a_handler,
        [
            Case(condition=is_high_priority, target=a_priority_high),
            Default(target=a_priority_low),
        ]
    )
    .build()
)

Streaming and Events

Monitor orchestration progress in real-time:
Python
async for event in workflow.run(initial_input, stream=True):
    if event.type == "executor_invoked":
        print(f"→ {event.executor_id} started")
    elif event.type == "executor_completed":
        print(f"✓ {event.executor_id} completed")
    elif event.type == "superstep_completed":
        print(f"Super step {event.iteration} completed")
    elif event.type == "output":
        print(f"Output: {event.data}")

Best Practices

  • Use sequential for linear pipelines where order matters
  • Use concurrent when tasks are independent and can run in parallel
  • Use conditional when routing depends on message content
  • Use loops for iterative refinement with feedback
  • Always include a Default case in switch-case routing
  • Set max_iterations to prevent infinite loops
  • Add error handling in executors to gracefully handle failures
  • Use type hints for better error detection
  • Use concurrent execution when possible (parallelism)
  • Minimize state size - only store what’s needed
  • Use selective fan-out to reduce unnecessary work
  • Stream results for long-running workflows
  • Use descriptive executor IDs
  • Log important state transitions
  • Monitor workflow events for debugging
  • Use structured data for messages

Next Steps

Checkpoints

Save and restore workflow state for fault tolerance

Human-in-the-Loop

Integrate human decision points into workflows