Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/agent-framework/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Agent-to-Agent (A2A) protocol enables standardized communication between agents built with different frameworks and technologies. The Microsoft Agent Framework provides full support for both hosting A2A-compliant agents and consuming external A2A agents.
Key Concepts
Agent Discovery
A2A agents expose an AgentCard at /.well-known/agent.json that describes:
Agent name and description
Available skills and capabilities
Supported input/output modes
API endpoints
Communication Modes
Non-streaming : Request-response pattern for simple interactions
Streaming : Server-Sent Events (SSE) for real-time updates
Background tasks : Long-running operations with continuation tokens
Task States
A2A tasks progress through states:
working: Task is being processed
completed: Task finished successfully
failed: Task encountered an error
cancelled: Task was cancelled
Consuming A2A Agents
Connect to external A2A-compliant agents:
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
async def main ():
a2a_host = "https://example-agent.com"
# 1. Discover agent capabilities
async with httpx.AsyncClient( timeout = 60.0 ) as http_client:
resolver = A2ACardResolver( httpx_client = http_client, base_url = a2a_host)
agent_card = await resolver.get_agent_card()
print ( f "Found agent: { agent_card.name } " )
print ( f "Description: { agent_card.description } " )
print ( f "Skills: { [s.name for s in agent_card.skills] } " )
# 2. Create A2A agent instance
async with A2AAgent(
name = agent_card.name,
description = agent_card.description,
agent_card = agent_card,
url = a2a_host,
) as agent:
# 3. Simple request/response
response = await agent.run( "What can you do?" )
print ( f "Response: { response.text } " )
# 4. Streaming response
async with agent.run( "Explain AI" , stream = True ) as stream:
async for update in stream:
for content in update.contents:
if content.text:
print (content.text, end = "" )
final = await stream.get_final_response()
print ( f " \n\n Complete. { len (final.messages) } message(s) received." )
asyncio.run(main())
Hosting A2A Agents
Expose your agents via the A2A protocol:
Using Azure Functions
The AgentFunctionApp automatically exposes A2A-compliant endpoints:
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
from azure.identity import AzureCliCredential
agent = AzureOpenAIChatClient( credential = AzureCliCredential()).as_agent(
name = "MyAgent" ,
instructions = "You are a helpful assistant." ,
description = "A general-purpose AI assistant"
)
app = AgentFunctionApp( agents = [agent])
# Automatically available:
# - /.well-known/agent.json (AgentCard)
# - /api/agents/MyAgent/run (invoke endpoint)
# - /api/agents/MyAgent/task/{taskId} (task status)
Custom A2A Server
Implement A2A protocol manually for custom hosting:
from agent_framework.a2a import A2AServer
from agent_framework import Agent
from fastapi import FastAPI
app = FastAPI()
agent = # ... create your agent
# Create A2A server
a2a_server = A2AServer(agent)
# Expose endpoints
@app.get ( "/.well-known/agent.json" )
async def get_agent_card ():
return a2a_server.get_agent_card()
@app.post ( "/api/run" )
async def run_agent ( request : dict ):
return await a2a_server.handle_request(request)
@app.get ( "/api/task/ {task_id} " )
async def get_task_status ( task_id : str ):
return await a2a_server.get_task_status(task_id)
AgentCard Structure
The AgentCard describes your agent’s capabilities:
{
"@type" : "AgentCard" ,
"name" : "MyAgent" ,
"description" : "A helpful AI assistant" ,
"version" : "1.0" ,
"url" : "https://example.com" ,
"skills" : [
{
"name" : "general_assistance" ,
"description" : "Answer questions and help with tasks" ,
"tags" : [ "qa" , "assistant" ],
"examples" : [
"What is the weather?" ,
"Explain quantum computing"
],
"inputModes" : [ "text/plain" ],
"outputModes" : [ "text/plain" , "application/json" ]
}
],
"capabilities" : {
"streaming" : true ,
"backgroundTasks" : true ,
"multimodal" : false
}
}
Use A2A agent skills as tools for another agent:
from agent_framework.a2a import A2AAgent, a2a_skills_as_tools
# Discover remote agent
a2a_agent = await A2AAgent.from_url( "https://specialist-agent.com" )
# Convert skills to tools
tools = a2a_skills_as_tools(a2a_agent)
# Use in main agent
main_agent = client.as_agent(
name = "MainAgent" ,
instructions = "You coordinate with specialist agents." ,
tools = tools
)
# Main agent can now call A2A agent skills
response = await main_agent.run(
"Use the specialist to analyze this data: {...}"
)
Background Tasks
Handle long-running operations with continuation tokens:
from agent_framework.a2a import A2AAgent
# Start background task
agent = A2AAgent(
name = "LongRunningAgent" ,
agent_card = agent_card,
url = a2a_host,
background = True # Don't wait for completion
)
response = await agent.run(
"Perform a long computation" ,
background = True
)
# Get continuation token
task_id = response.task_id
continuation_token = response.continuation_token
# Later: poll for status
status = await agent.get_task_status(task_id)
print ( f "Status: { status.state } " ) # working, completed, failed
if status.state == "completed" :
final_response = status.result
print ( f "Result: { final_response.text } " )
# Or: resume streaming from continuation token
if continuation_token:
async with agent.resume_stream(task_id, continuation_token) as stream:
async for update in stream:
print ( f "Update: { update.text } " )
Polling for Completion
Implement polling with exponential backoff:
import asyncio
from agent_framework.a2a import A2AAgent, TaskState
async def poll_until_complete (
agent : A2AAgent,
task_id : str ,
max_attempts : int = 60 ,
initial_delay : float = 1.0
):
"""Poll task status until completion."""
delay = initial_delay
for attempt in range (max_attempts):
status = await agent.get_task_status(task_id)
if status.state == TaskState. COMPLETED :
return status.result
elif status.state == TaskState. FAILED :
raise Exception ( f "Task failed: { status.error } " )
elif status.state == TaskState. CANCELLED :
raise Exception ( "Task was cancelled" )
# Exponential backoff
await asyncio.sleep(delay)
delay = min (delay * 1.5 , 30.0 ) # Cap at 30 seconds
raise TimeoutError ( f "Task did not complete within { max_attempts } attempts" )
# Usage
response = await agent.run( "Long task" , background = True )
result = await poll_until_complete(agent, response.task_id)
print ( f "Final result: { result.text } " )
Multi-Agent Communication
Orchestrate multiple A2A agents:
import asyncio
from agent_framework.a2a import A2AAgent
async def orchestrate_agents ():
"""Coordinate multiple A2A agents."""
# Discover agents
research_agent = await A2AAgent.from_url( "https://research-agent.com" )
writing_agent = await A2AAgent.from_url( "https://writing-agent.com" )
review_agent = await A2AAgent.from_url( "https://review-agent.com" )
# Sequential workflow
topic = "quantum computing"
# Step 1: Research
research = await research_agent.run( f "Research { topic } " )
print ( f "Research complete: { len (research.text) } chars" )
# Step 2: Write draft
draft = await writing_agent.run(
f "Write an article based on this research: { research.text } "
)
print ( f "Draft complete: { len (draft.text) } chars" )
# Step 3: Review and revise
final = await review_agent.run(
f "Review and improve this article: { draft.text } "
)
print ( f "Final article: { final.text } " )
return final.text
# Parallel execution
async def parallel_agents ():
"""Run multiple agents concurrently."""
analyst1 = await A2AAgent.from_url( "https://analyst1.com" )
analyst2 = await A2AAgent.from_url( "https://analyst2.com" )
analyst3 = await A2AAgent.from_url( "https://analyst3.com" )
# Gather results concurrently
results = await asyncio.gather(
analyst1.run( "Analyze market trends" ),
analyst2.run( "Analyze competitor data" ),
analyst3.run( "Analyze customer feedback" )
)
# Aggregate results
combined = " \n\n " .join([r.text for r in results])
return combined
Error Handling
Handle A2A protocol errors gracefully:
from agent_framework.a2a import (
A2AAgent,
A2AConnectionError,
A2AAuthenticationError,
A2ATaskError,
TaskState
)
async def robust_a2a_call ( agent : A2AAgent, message : str ):
"""Call A2A agent with comprehensive error handling."""
try :
response = await agent.run(message)
return response
except A2AConnectionError as e:
print ( f "Connection failed: { e } " )
# Retry with exponential backoff
except A2AAuthenticationError as e:
print ( f "Authentication failed: { e } " )
# Refresh credentials
except A2ATaskError as e:
print ( f "Task failed: { e } " )
if e.state == TaskState. FAILED :
print ( f "Error details: { e.error_message } " )
# Handle task-specific errors
except TimeoutError :
print ( "Request timed out" )
# Handle timeout
except Exception as e:
print ( f "Unexpected error: { e } " )
# General error handling
Testing A2A Agents
Using .NET Sample Server
For quick testing, use the .NET A2A sample server:
# Clone repository
git clone https://github.com/microsoft/agent-framework.git
cd agent-framework/dotnet/samples/05-end-to-end/A2AClientServer
# Set environment variables
export AZURE_OPENAI_ENDPOINT = "https://your-resource.openai.azure.com/"
export AZURE_OPENAI_DEPLOYMENT_NAME = "gpt-4o-mini"
# Run server
dotnet run
# Server available at http://localhost:5001/
Then test with Python client:
# Set A2A host
export A2A_AGENT_HOST = "http://localhost:5001/"
# Run Python sample
python agent_with_a2a.py
Mock A2A Server
Create a mock server for testing:
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class AgentCard ( BaseModel ):
name: str
description: str
skills: list
@app.get ( "/.well-known/agent.json" )
async def get_agent_card ():
return {
"@type" : "AgentCard" ,
"name" : "MockAgent" ,
"description" : "A mock agent for testing" ,
"version" : "1.0" ,
"skills" : [
{
"name" : "echo" ,
"description" : "Echo input back" ,
"inputModes" : [ "text/plain" ],
"outputModes" : [ "text/plain" ]
}
]
}
@app.post ( "/api/run" )
async def run_agent ( request : dict ):
return {
"status" : "completed" ,
"result" : {
"text" : f "Echo: { request.get( 'message' ) } "
}
}
# Run: uvicorn mock_server:app --port 5001
Best Practices
Clear descriptions : Make skills discoverable with detailed descriptions
Relevant tags : Use tags for categorization and searchability
Provide examples : Include usage examples for each skill
Specify modes : Declare supported input/output MIME types
Retry logic : Implement exponential backoff for transient errors
Timeout management : Set appropriate timeouts for long-running tasks
Graceful degradation : Handle agent unavailability gracefully
Detailed errors : Return meaningful error messages in task failures
Authentication : Implement OAuth 2.0 or API key authentication
HTTPS : Always use HTTPS in production
Rate limiting : Protect against abuse with rate limits
Input validation : Validate all inputs before processing
Troubleshooting
Error : Cannot retrieve AgentCard at /.well-known/agent.jsonSolutions :
Verify URL is correct and accessible
Check CORS configuration if accessing from browser
Ensure AgentCard endpoint returns valid JSON
Test with curl: curl https://agent.com/.well-known/agent.json
Error : Request times out when calling A2A agentSolutions :
Increase timeout: httpx.AsyncClient(timeout=120.0)
Use background tasks for long operations
Implement polling instead of waiting for completion
Check network connectivity and firewall rules
Error : Task remains in working state indefinitelySolutions :
Implement timeout in polling loop
Check A2A agent logs for errors
Verify task is actually processing (check metrics)
Consider using continuation tokens to resume
Error : Streaming response disconnects prematurelySolutions :
Use continuation tokens to resume from last position
Implement retry logic with exponential backoff
Check load balancer timeout settings
Verify SSE configuration on both client and server
Protocol Compliance
Ensure your A2A implementation follows the specification:
✅ AgentCard at /.well-known/agent.json
✅ Proper content negotiation (Accept headers)
✅ Task state transitions (working → completed/failed)
✅ Continuation token support for resumption
✅ SSE format for streaming responses
✅ Error responses with proper status codes
Validate compliance using the A2A validation tool .
Next Steps
Azure Functions Host A2A agents on Azure Functions
DurableTask Orchestrate A2A agents with Durable Functions