Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/microsoft/agent-framework/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Agent-to-Agent (A2A) protocol enables standardized communication between agents built with different frameworks and technologies. The Microsoft Agent Framework provides full support for both hosting A2A-compliant agents and consuming external A2A agents.
Learn more about the A2A protocol at a2a-protocol.org

Key Concepts

Agent Discovery

A2A agents expose an AgentCard at /.well-known/agent.json that describes:
  • Agent name and description
  • Available skills and capabilities
  • Supported input/output modes
  • API endpoints

Communication Modes

  • Non-streaming: Request-response pattern for simple interactions
  • Streaming: Server-Sent Events (SSE) for real-time updates
  • Background tasks: Long-running operations with continuation tokens

Task States

A2A tasks progress through states:
  • working: Task is being processed
  • completed: Task finished successfully
  • failed: Task encountered an error
  • cancelled: Task was cancelled

Consuming A2A Agents

Connect to external A2A-compliant agents:
import asyncio
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent

async def main():
    a2a_host = "https://example-agent.com"
    
    # 1. Discover agent capabilities
    async with httpx.AsyncClient(timeout=60.0) as http_client:
        resolver = A2ACardResolver(httpx_client=http_client, base_url=a2a_host)
        agent_card = await resolver.get_agent_card()
        print(f"Found agent: {agent_card.name}")
        print(f"Description: {agent_card.description}")
        print(f"Skills: {[s.name for s in agent_card.skills]}")
    
    # 2. Create A2A agent instance
    async with A2AAgent(
        name=agent_card.name,
        description=agent_card.description,
        agent_card=agent_card,
        url=a2a_host,
    ) as agent:
        # 3. Simple request/response
        response = await agent.run("What can you do?")
        print(f"Response: {response.text}")
        
        # 4. Streaming response
        async with agent.run("Explain AI", stream=True) as stream:
            async for update in stream:
                for content in update.contents:
                    if content.text:
                        print(content.text, end="")
            
            final = await stream.get_final_response()
            print(f"\n\nComplete. {len(final.messages)} message(s) received.")

asyncio.run(main())

Hosting A2A Agents

Expose your agents via the A2A protocol:

Using Azure Functions

The AgentFunctionApp automatically exposes A2A-compliant endpoints:
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
from azure.identity import AzureCliCredential

agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    name="MyAgent",
    instructions="You are a helpful assistant.",
    description="A general-purpose AI assistant"
)

app = AgentFunctionApp(agents=[agent])

# Automatically available:
# - /.well-known/agent.json (AgentCard)
# - /api/agents/MyAgent/run (invoke endpoint)
# - /api/agents/MyAgent/task/{taskId} (task status)

Custom A2A Server

Implement A2A protocol manually for custom hosting:
from agent_framework.a2a import A2AServer
from agent_framework import Agent
from fastapi import FastAPI

app = FastAPI()
agent = # ... create your agent

# Create A2A server
a2a_server = A2AServer(agent)

# Expose endpoints
@app.get("/.well-known/agent.json")
async def get_agent_card():
    return a2a_server.get_agent_card()

@app.post("/api/run")
async def run_agent(request: dict):
    return await a2a_server.handle_request(request)

@app.get("/api/task/{task_id}")
async def get_task_status(task_id: str):
    return await a2a_server.get_task_status(task_id)

AgentCard Structure

The AgentCard describes your agent’s capabilities:
{
  "@type": "AgentCard",
  "name": "MyAgent",
  "description": "A helpful AI assistant",
  "version": "1.0",
  "url": "https://example.com",
  "skills": [
    {
      "name": "general_assistance",
      "description": "Answer questions and help with tasks",
      "tags": ["qa", "assistant"],
      "examples": [
        "What is the weather?",
        "Explain quantum computing"
      ],
      "inputModes": ["text/plain"],
      "outputModes": ["text/plain", "application/json"]
    }
  ],
  "capabilities": {
    "streaming": true,
    "backgroundTasks": true,
    "multimodal": false
  }
}

A2A Skills as Tools

Use A2A agent skills as tools for another agent:
from agent_framework.a2a import A2AAgent, a2a_skills_as_tools

# Discover remote agent
a2a_agent = await A2AAgent.from_url("https://specialist-agent.com")

# Convert skills to tools
tools = a2a_skills_as_tools(a2a_agent)

# Use in main agent
main_agent = client.as_agent(
    name="MainAgent",
    instructions="You coordinate with specialist agents.",
    tools=tools
)

# Main agent can now call A2A agent skills
response = await main_agent.run(
    "Use the specialist to analyze this data: {...}"
)

Background Tasks

Handle long-running operations with continuation tokens:
from agent_framework.a2a import A2AAgent

# Start background task
agent = A2AAgent(
    name="LongRunningAgent",
    agent_card=agent_card,
    url=a2a_host,
    background=True  # Don't wait for completion
)

response = await agent.run(
    "Perform a long computation",
    background=True
)

# Get continuation token
task_id = response.task_id
continuation_token = response.continuation_token

# Later: poll for status
status = await agent.get_task_status(task_id)
print(f"Status: {status.state}")  # working, completed, failed

if status.state == "completed":
    final_response = status.result
    print(f"Result: {final_response.text}")

# Or: resume streaming from continuation token
if continuation_token:
    async with agent.resume_stream(task_id, continuation_token) as stream:
        async for update in stream:
            print(f"Update: {update.text}")

Polling for Completion

Implement polling with exponential backoff:
import asyncio
from agent_framework.a2a import A2AAgent, TaskState

async def poll_until_complete(
    agent: A2AAgent,
    task_id: str,
    max_attempts: int = 60,
    initial_delay: float = 1.0
):
    """Poll task status until completion."""
    delay = initial_delay
    
    for attempt in range(max_attempts):
        status = await agent.get_task_status(task_id)
        
        if status.state == TaskState.COMPLETED:
            return status.result
        elif status.state == TaskState.FAILED:
            raise Exception(f"Task failed: {status.error}")
        elif status.state == TaskState.CANCELLED:
            raise Exception("Task was cancelled")
        
        # Exponential backoff
        await asyncio.sleep(delay)
        delay = min(delay * 1.5, 30.0)  # Cap at 30 seconds
    
    raise TimeoutError(f"Task did not complete within {max_attempts} attempts")

# Usage
response = await agent.run("Long task", background=True)
result = await poll_until_complete(agent, response.task_id)
print(f"Final result: {result.text}")

Multi-Agent Communication

Orchestrate multiple A2A agents:
import asyncio
from agent_framework.a2a import A2AAgent

async def orchestrate_agents():
    """Coordinate multiple A2A agents."""
    
    # Discover agents
    research_agent = await A2AAgent.from_url("https://research-agent.com")
    writing_agent = await A2AAgent.from_url("https://writing-agent.com")
    review_agent = await A2AAgent.from_url("https://review-agent.com")
    
    # Sequential workflow
    topic = "quantum computing"
    
    # Step 1: Research
    research = await research_agent.run(f"Research {topic}")
    print(f"Research complete: {len(research.text)} chars")
    
    # Step 2: Write draft
    draft = await writing_agent.run(
        f"Write an article based on this research: {research.text}"
    )
    print(f"Draft complete: {len(draft.text)} chars")
    
    # Step 3: Review and revise
    final = await review_agent.run(
        f"Review and improve this article: {draft.text}"
    )
    print(f"Final article: {final.text}")
    
    return final.text

# Parallel execution
async def parallel_agents():
    """Run multiple agents concurrently."""
    
    analyst1 = await A2AAgent.from_url("https://analyst1.com")
    analyst2 = await A2AAgent.from_url("https://analyst2.com")
    analyst3 = await A2AAgent.from_url("https://analyst3.com")
    
    # Gather results concurrently
    results = await asyncio.gather(
        analyst1.run("Analyze market trends"),
        analyst2.run("Analyze competitor data"),
        analyst3.run("Analyze customer feedback")
    )
    
    # Aggregate results
    combined = "\n\n".join([r.text for r in results])
    return combined

Error Handling

Handle A2A protocol errors gracefully:
from agent_framework.a2a import (
    A2AAgent,
    A2AConnectionError,
    A2AAuthenticationError,
    A2ATaskError,
    TaskState
)

async def robust_a2a_call(agent: A2AAgent, message: str):
    """Call A2A agent with comprehensive error handling."""
    
    try:
        response = await agent.run(message)
        return response
        
    except A2AConnectionError as e:
        print(f"Connection failed: {e}")
        # Retry with exponential backoff
        
    except A2AAuthenticationError as e:
        print(f"Authentication failed: {e}")
        # Refresh credentials
        
    except A2ATaskError as e:
        print(f"Task failed: {e}")
        if e.state == TaskState.FAILED:
            print(f"Error details: {e.error_message}")
        # Handle task-specific errors
        
    except TimeoutError:
        print("Request timed out")
        # Handle timeout
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # General error handling

Testing A2A Agents

Using .NET Sample Server

For quick testing, use the .NET A2A sample server:
# Clone repository
git clone https://github.com/microsoft/agent-framework.git
cd agent-framework/dotnet/samples/05-end-to-end/A2AClientServer

# Set environment variables
export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4o-mini"

# Run server
dotnet run

# Server available at http://localhost:5001/
Then test with Python client:
# Set A2A host
export A2A_AGENT_HOST="http://localhost:5001/"

# Run Python sample
python agent_with_a2a.py

Mock A2A Server

Create a mock server for testing:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class AgentCard(BaseModel):
    name: str
    description: str
    skills: list

@app.get("/.well-known/agent.json")
async def get_agent_card():
    return {
        "@type": "AgentCard",
        "name": "MockAgent",
        "description": "A mock agent for testing",
        "version": "1.0",
        "skills": [
            {
                "name": "echo",
                "description": "Echo input back",
                "inputModes": ["text/plain"],
                "outputModes": ["text/plain"]
            }
        ]
    }

@app.post("/api/run")
async def run_agent(request: dict):
    return {
        "status": "completed",
        "result": {
            "text": f"Echo: {request.get('message')}"
        }
    }

# Run: uvicorn mock_server:app --port 5001

Best Practices

  • Clear descriptions: Make skills discoverable with detailed descriptions
  • Relevant tags: Use tags for categorization and searchability
  • Provide examples: Include usage examples for each skill
  • Specify modes: Declare supported input/output MIME types
  • Retry logic: Implement exponential backoff for transient errors
  • Timeout management: Set appropriate timeouts for long-running tasks
  • Graceful degradation: Handle agent unavailability gracefully
  • Detailed errors: Return meaningful error messages in task failures
  • Connection pooling: Reuse HTTP connections for multiple requests
  • Parallel execution: Use asyncio.gather() for concurrent agent calls
  • Caching: Cache AgentCards to avoid repeated discovery
  • Streaming: Use streaming for long responses to reduce latency
  • Authentication: Implement OAuth 2.0 or API key authentication
  • HTTPS: Always use HTTPS in production
  • Rate limiting: Protect against abuse with rate limits
  • Input validation: Validate all inputs before processing

Troubleshooting

Error: Cannot retrieve AgentCard at /.well-known/agent.jsonSolutions:
  • Verify URL is correct and accessible
  • Check CORS configuration if accessing from browser
  • Ensure AgentCard endpoint returns valid JSON
  • Test with curl: curl https://agent.com/.well-known/agent.json
Error: Request times out when calling A2A agentSolutions:
  • Increase timeout: httpx.AsyncClient(timeout=120.0)
  • Use background tasks for long operations
  • Implement polling instead of waiting for completion
  • Check network connectivity and firewall rules
Error: Task remains in working state indefinitelySolutions:
  • Implement timeout in polling loop
  • Check A2A agent logs for errors
  • Verify task is actually processing (check metrics)
  • Consider using continuation tokens to resume
Error: Streaming response disconnects prematurelySolutions:
  • Use continuation tokens to resume from last position
  • Implement retry logic with exponential backoff
  • Check load balancer timeout settings
  • Verify SSE configuration on both client and server

Protocol Compliance

Ensure your A2A implementation follows the specification:
  • ✅ AgentCard at /.well-known/agent.json
  • ✅ Proper content negotiation (Accept headers)
  • ✅ Task state transitions (working → completed/failed)
  • ✅ Continuation token support for resumption
  • ✅ SSE format for streaming responses
  • ✅ Error responses with proper status codes
Validate compliance using the A2A validation tool.

Next Steps

Azure Functions

Host A2A agents on Azure Functions

DurableTask

Orchestrate A2A agents with Durable Functions