Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/agent-framework/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Microsoft Agent Framework integrates with Durable Functions and Durable Task Framework to enable long-running, stateful agent workflows. This integration provides checkpoint and resume capabilities, parallel execution, human-in-the-loop patterns, and reliable state management for complex agent orchestrations.
Key Features
Durable orchestrations : Coordinate multiple agent calls with automatic checkpointing
State persistence : Agent conversation state survives restarts and failures
Parallel execution : Run multiple agents concurrently and aggregate results
External events : Human-in-the-loop and external approval workflows
Long-running tools : Agents can invoke durable activities for extended operations
Reliable streaming : Resumable streaming with cursor-based resumption
Architecture Patterns
Worker-Client Architecture
DurableTask uses a distributed worker-client pattern:
┌─────────────┐ ┌─────────────────────┐ ┌──────────────┐
│ Client │────────›│ DurableTask Server │‹────────│ Worker │
│ │ │ (Orchestrator) │ │ (Executor) │
│ - Schedule │ │ - Task Hub │ │ - Agents │
│ - Query │ │ - State Storage │ │ - Activities │
│ - Events │ │ - Event Queue │ │ - Entities │
└─────────────┘ └─────────────────────┘ └──────────────┘
Hosting Options
Azure Functions : Fully managed with auto-scaling
Console Apps : Self-hosted for development or on-premises
ASP.NET : Embedded in web applications
Quick Start
Prerequisites
Start the Durable Task Scheduler emulator:
docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest
DTS dashboard available at http://localhost:8082.
Python Setup
Install Package
pip install agent-framework-durabletask
Create Worker
Create worker.py: from agent_framework import Agent
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
import os
# Create agent
agent = AzureOpenAIChatClient( credential = AzureCliCredential()).as_agent(
name = "Assistant" ,
instructions = "You are a helpful assistant."
)
# Create DurableTask worker
dts_worker = DurableTaskSchedulerWorker(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
# Wrap with agent worker
agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(agent)
# Start worker
print ( "Worker started. Press Ctrl+C to stop." )
dts_worker.start()
Create Client
Create client.py: import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
async def main ():
# Create DurableTask client
dts_client = DurableTaskSchedulerClient(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
# Wrap with agent client
agent_client = DurableAIAgentClient(dts_client)
agent = agent_client.get_agent( "Assistant" )
# Run agent
response = await agent.run( "What is machine learning?" )
print ( f "Response: { response.text } " )
asyncio.run(main())
Run
In one terminal: In another terminal:
C# Setup
Add Package
dotnet add package Microsoft.Agents.AI.Hosting.AzureFunctions
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.DurableTask
Create Program.cs
using Azure . AI . OpenAI ;
using Azure . Identity ;
using Microsoft . Agents . AI ;
using Microsoft . Agents . AI . Hosting . AzureFunctions ;
using Microsoft . Azure . Functions . Worker . Builder ;
using Microsoft . Extensions . Hosting ;
using OpenAI . Chat ;
string endpoint = Environment . GetEnvironmentVariable ( "AZURE_OPENAI_ENDPOINT" )
?? throw new InvalidOperationException ( "AZURE_OPENAI_ENDPOINT is not set." );
string deploymentName = Environment . GetEnvironmentVariable ( "AZURE_OPENAI_DEPLOYMENT_NAME" )
?? "gpt-4o-mini" ;
var client = new AzureOpenAIClient ( new Uri ( endpoint ), new DefaultAzureCredential ());
AIAgent agent = client . GetChatClient ( deploymentName )
. AsAIAgent ( "You are a helpful assistant." , "Assistant" );
using IHost app = FunctionsApplication
. CreateBuilder ( args )
. ConfigureFunctionsWebApplication ()
. ConfigureDurableAgents ( options = > options . AddAIAgent ( agent ))
. Build ();
app . Run ();
Configure Settings
Create local.settings.json: {
"Values" : {
"FUNCTIONS_WORKER_RUNTIME" : "dotnet-isolated" ,
"AzureWebJobsStorage" : "UseDevelopmentStorage=true" ,
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING" : "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" ,
"AZURE_OPENAI_ENDPOINT" : "https://your-resource.openai.azure.com/" ,
"AZURE_OPENAI_DEPLOYMENT_NAME" : "gpt-4o-mini"
}
}
Single Agent Pattern
Host a single durable agent with persistent state:
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Create agent
agent = AzureOpenAIChatClient( credential = AzureCliCredential()).as_agent(
name = "Joker" ,
instructions = "You are good at telling jokes."
)
# Setup worker
dts_worker = DurableTaskSchedulerWorker(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(agent)
print ( "Worker ready. Waiting for requests..." )
dts_worker.start()
import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
async def main ():
dts_client = DurableTaskSchedulerClient(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
agent_client = DurableAIAgentClient(dts_client)
joker = agent_client.get_agent( "Joker" )
# Multi-turn conversation with persistent state
response1 = await joker.run( "Tell me a joke about clouds" )
print ( f "Joke 1: { response1.text } " )
response2 = await joker.run( "Tell me another one" )
print ( f "Joke 2: { response2.text } " )
asyncio.run(main())
Multi-Agent Pattern
Host multiple specialized agents:
from agent_framework.azure import AzureOpenAIChatClient, DurableAIAgentWorker
from azure.identity import AzureCliCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
client = AzureOpenAIChatClient( credential = AzureCliCredential())
# Create specialized agents
physicist = client.as_agent(
name = "Physicist" ,
instructions = "You are a physicist expert. Answer physics questions."
)
chemist = client.as_agent(
name = "Chemist" ,
instructions = "You are a chemistry expert. Answer chemistry questions."
)
# Register all agents
dts_worker = DurableTaskSchedulerWorker(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
agent_worker = DurableAIAgentWorker(dts_worker)
agent_worker.add_agent(physicist)
agent_worker.add_agent(chemist)
print ( f "Registered agents: { physicist.name } , { chemist.name } " )
dts_worker.start()
import asyncio
from agent_framework_durabletask import DurableAIAgentClient
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
async def main ():
dts_client = DurableTaskSchedulerClient(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
agent_client = DurableAIAgentClient(dts_client)
# Get specific agents
physicist = agent_client.get_agent( "Physicist" )
chemist = agent_client.get_agent( "Chemist" )
# Query different agents
physics_answer = await physicist.run( "Explain quantum entanglement" )
chemistry_answer = await chemist.run( "Explain chemical bonds" )
print ( f "Physicist: { physics_answer.text } " )
print ( f "Chemist: { chemistry_answer.text } " )
asyncio.run(main())
Orchestration Patterns
Sequential Chaining
Chain agent calls with shared session:
function_app.py
FunctionTriggers.cs
from azure.durable_functions import DurableOrchestrationContext
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
from azure.identity import AzureCliCredential
WRITER_AGENT_NAME = "WriterAgent"
agent = AzureOpenAIChatClient( credential = AzureCliCredential()).as_agent(
name = WRITER_AGENT_NAME ,
instructions = """You refine text. Given an initial sentence you enhance it;
given an improved sentence you polish it further."""
)
app = AgentFunctionApp( agents = [agent])
@app.orchestration_trigger ( context_name = "context" )
def writer_orchestration ( context : DurableOrchestrationContext):
"""Sequential refinement with shared conversation."""
writer = app.get_agent(context, WRITER_AGENT_NAME )
session = writer.create_session()
# First pass
initial = yield writer.run(
messages = "Write a sentence about learning." ,
session = session
)
# Second pass with context
refined = yield writer.run(
messages = f "Improve this: { initial.text } " ,
session = session
)
return refined.text
Parallel Execution
Run multiple agents concurrently:
function_app.py
FunctionTriggers.cs
import asyncio
from azure.durable_functions import DurableOrchestrationContext
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
app = AgentFunctionApp( agents = [physicist, chemist, mathematician])
@app.orchestration_trigger ( context_name = "context" )
def parallel_orchestration ( context : DurableOrchestrationContext):
"""Run multiple agents concurrently and aggregate results."""
physicist = app.get_agent(context, "Physicist" )
chemist = app.get_agent(context, "Chemist" )
mathematician = app.get_agent(context, "Mathematician" )
topic = context.get_input()
# Schedule all agents concurrently
physics_task = physicist.run( f "Explain { topic } from a physics perspective" )
chemistry_task = chemist.run( f "Explain { topic } from a chemistry perspective" )
math_task = mathematician.run( f "Explain { topic } from a mathematics perspective" )
# Wait for all to complete
results = yield [physics_task, chemistry_task, math_task]
# Aggregate results
combined = " \n\n " .join([
f "Physics: { results[ 0 ].text } " ,
f "Chemistry: { results[ 1 ].text } " ,
f "Mathematics: { results[ 2 ].text } "
])
return combined
Conditional Routing
Route to different agents based on conditions:
function_app.py
FunctionTriggers.cs
from azure.durable_functions import DurableOrchestrationContext
from pydantic import BaseModel
class Classification ( BaseModel ):
is_spam: bool
confidence: float
@app.orchestration_trigger ( context_name = "context" )
def conditional_orchestration ( context : DurableOrchestrationContext):
"""Route based on spam detection."""
spam_detector = app.get_agent(context, "SpamDetector" )
email_assistant = app.get_agent(context, "EmailAssistant" )
email = context.get_input()
# Step 1: Detect spam
classification = yield spam_detector.run(
messages = f "Classify this email: { email } " ,
response_format = Classification
)
# Step 2: Conditional routing
if classification.is_spam and classification.confidence > 0.8 :
return { "action" : "blocked" , "reason" : "spam detected" }
else :
# Process legitimate email
response = yield email_assistant.run(
messages = f "Draft a reply to: { email } "
)
return { "action" : "replied" , "draft" : response.text}
Human-in-the-Loop (HITL)
Incorporate human approval in workflows:
function_app.py
FunctionTriggers.cs
import asyncio
from datetime import timedelta
from azure.durable_functions import DurableOrchestrationContext
@app.orchestration_trigger ( context_name = "context" )
def hitl_orchestration ( context : DurableOrchestrationContext):
"""Request human approval before proceeding."""
writer = app.get_agent(context, "WriterAgent" )
# Step 1: Generate draft
draft = yield writer.run( "Write a blog post about AI" )
# Step 2: Wait for human approval
approval_event = context.wait_for_external_event(
"approval_received" ,
timeout = timedelta( hours = 24 )
)
timeout_event = context.create_timer(
context.current_utc_datetime + timedelta( hours = 24 )
)
winner = yield context.task_any([approval_event, timeout_event])
if winner == approval_event:
# Approved: publish
approval_data = approval_event.result
if approval_data.get( "approved" ):
return { "status" : "published" , "content" : draft.text}
else :
# Revise based on feedback
revised = yield writer.run(
f "Revise this based on feedback ' { approval_data.get( 'feedback' ) } ': { draft.text } "
)
return { "status" : "revised" , "content" : revised.text}
else :
# Timeout: escalate
return { "status" : "escalated" , "reason" : "approval timeout" }
Send approval:
# Approve
curl -X POST http://localhost:7071/api/approval/{instanceId} \
-H "Content-Type: application/json" \
-d '{"approved": true}'
# Request revision
curl -X POST http://localhost:7071/api/approval/{instanceId} \
-H "Content-Type: application/json" \
-d '{"approved": false, "feedback": "Add more technical details"}'
Reliable Streaming
Implement resumable streaming with Redis:
redis_stream_response_handler.py
RedisStreamResponseHandler.cs
import redis.asyncio as redis
from agent_framework_durabletask import AgentResponseCallbackProtocol, AgentCallbackContext
from typing import AsyncIterator
class RedisStreamResponseHandler ( AgentResponseCallbackProtocol ):
"""Reliable streaming using Redis Streams."""
def __init__ ( self , redis_client : redis.Redis, ttl_seconds : int = 3600 ):
self .redis = redis_client
self .ttl = ttl_seconds
async def handle_agent_response (
self ,
context : AgentCallbackContext,
response_stream : AsyncIterator[ str ]
):
"""Stream agent responses to Redis."""
stream_key = f "agent:response: { context.correlation_id } "
async for chunk in response_stream:
# Write to Redis Stream
await self .redis.xadd(
stream_key,
{ "chunk" : chunk, "timestamp" : context.timestamp}
)
# Set expiration
await self .redis.expire(stream_key, self .ttl)
# Mark as complete
await self .redis.xadd(
stream_key,
{ "status" : "complete" }
)
Consume stream with cursor:
import redis.asyncio as redis
async def consume_stream ( correlation_id : str , cursor : str = "0" ):
"""Resume streaming from cursor position."""
r = redis.from_url( "redis://localhost:6379" )
stream_key = f "agent:response: { correlation_id } "
current_cursor = cursor
while True :
# Read from cursor position
entries = await r.xread(
{stream_key: current_cursor},
count = 10 ,
block = 1000
)
if not entries:
break
for stream, messages in entries:
for message_id, data in messages:
if b "chunk" in data:
print (data[ b "chunk" ].decode(), end = "" )
current_cursor = message_id
elif b "status" in data and data[ b "status" ] == b "complete" :
return
# Return cursor for resumption
return current_cursor
Configuration
Connection Settings
Python
C# (local.settings.json)
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from azure.identity import DefaultAzureCredential
# Local development
worker = DurableTaskSchedulerWorker(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
# Azure production
worker = DurableTaskSchedulerWorker(
host_address = "https://my-dts.azurewebsites.net" ,
secure_channel = True ,
taskhub = "production" ,
token_credential = DefaultAzureCredential()
)
Task Hub Configuration
Task hubs provide isolation between environments:
# Development
export TASKHUB = "dev"
# Staging
export TASKHUB = "staging"
# Production
export TASKHUB = "production"
Monitoring
DTS Dashboard
View orchestration status at http://localhost:8082:
Orchestrations : List all running/completed orchestrations
Instance details : View inputs, outputs, and execution history
Timeline : See checkpoints and state transitions
Query Orchestration Status
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
client = DurableTaskSchedulerClient(
host_address = "http://localhost:8080" ,
secure_channel = False ,
taskhub = "default"
)
# Get instance metadata
metadata = await client.get_instance_metadata(instance_id)
print ( f "Status: { metadata.runtime_status } " )
print ( f "Input: { metadata.serialized_input } " )
print ( f "Output: { metadata.serialized_output } " )
Best Practices
Orchestrations may replay. Ensure activities are idempotent: @app.activity_trigger ( input_name = "data" )
def process_data ( data : dict ) -> dict :
# Use idempotency key
idempotency_key = data[ "id" ]
# Check if already processed
if already_processed(idempotency_key):
return get_cached_result(idempotency_key)
# Process and cache
result = do_work(data)
cache_result(idempotency_key, result)
return result
Minimize Orchestration State
Keep orchestration input/output small to reduce storage costs: # Bad: Large data in orchestration
large_data = context.get_input() # 10 MB
result = yield process_activity(large_data)
# Good: Use references
blob_url = context.get_input() # Just URL
result = yield process_activity(blob_url)
Handle Timeouts Gracefully
Set appropriate timeouts for external events: from datetime import timedelta
# Wait with timeout
approval = context.wait_for_external_event( "approval" )
timeout = context.create_timer(
context.current_utc_datetime + timedelta( hours = 24 )
)
winner = yield context.task_any([approval, timeout])
if winner == timeout:
# Handle timeout
yield context.call_activity( "send_escalation_email" )
Use Task Hubs for Isolation
Separate environments with different task hubs:
Development: dev
Staging: staging
Production: production
Troubleshooting
Worker Not Receiving Tasks
Symptoms : Client schedules tasks but worker doesn’t process themSolutions :
Verify worker and client use same task hub name
Check DTS connection strings match
Ensure worker called start() and is running
Check DTS dashboard for pending tasks
Symptoms : Orchestration shows as “Running” indefinitelySolutions :
Check worker logs for errors
Verify all activities are registered
Look for unhandled exceptions in orchestration code
Check if waiting for external event that never arrives
Symptoms : “Non-deterministic behavior detected” errorsSolutions :
Don’t use random numbers or current time in orchestrations
Don’t make non-deterministic API calls
Use context.current_utc_datetime instead of datetime.now()
Generate GUIDs with context.new_uuid()
Next Steps
Azure Functions Deploy durable agents to Azure Functions
A2A Protocol Integrate with A2A agents in orchestrations