MCP Error Handling Patterns: Build Robust AI Integrations
Published February 3, 2025 · 18 min read
When an MCP tool fails, the LLM needs to understand what went wrong and how to recover. Poor error handling leads to confused AI responses, frustrated users, and unreliable integrations. This guide covers patterns for building MCP servers that fail gracefully and help LLMs recover intelligently.
The MCP Error Model
MCP uses JSON-RPC 2.0 error codes. Understanding these is crucial for proper error handling:
# Standard JSON-RPC error codes
ERROR_CODES = {
-32700: "Parse error", # Invalid JSON
-32600: "Invalid request", # Not valid JSON-RPC
-32601: "Method not found", # Unknown method
-32602: "Invalid params", # Bad parameters
-32603: "Internal error", # Server-side failure
# Custom MCP error codes (application-specific)
# Use codes in the -32000 to -32099 range
-32000: "Resource not found",
-32001: "Permission denied",
-32002: "Rate limited",
-32003: "External service error",
-32004: "Validation failed",
}Basic Error Handling in FastMCP
FastMCP provides a clean way to raise errors that the LLM can understand:
from mcp.server.fastmcp import FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData, INVALID_PARAMS, INTERNAL_ERROR
mcp = FastMCP("error-demo")
@mcp.tool()
async def get_user(user_id: str) -> dict:
"""Get user by ID."""
# Validate input
if not user_id:
raise McpError(
ErrorData(
code=INVALID_PARAMS,
message="user_id is required"
)
)
if not user_id.isdigit():
raise McpError(
ErrorData(
code=INVALID_PARAMS,
message=f"user_id must be numeric, got: {user_id}"
)
)
# Simulate database lookup
users = {"1": {"name": "Alice"}, "2": {"name": "Bob"}}
if user_id not in users:
raise McpError(
ErrorData(
code=-32000, # Custom: resource not found
message=f"User {user_id} not found. Valid IDs: {list(users.keys())}"
)
)
return users[user_id]Pattern 1: Structured Error Responses
Instead of raising exceptions, return structured error objects that give the LLM context to recover:
from typing import TypedDict, Literal
from dataclasses import dataclass
import traceback
@dataclass
class ToolResult:
"""Structured result that always succeeds at transport level."""
success: bool
data: dict | None = None
error: dict | None = None
@mcp.tool()
async def search_files(
query: str,
path: str = ".",
max_results: int = 10
) -> dict:
"""
Search for files matching a query.
Returns structured result with success/error status.
"""
result = {"success": False, "data": None, "error": None}
try:
# Validate inputs
if not query or len(query) < 2:
result["error"] = {
"type": "validation_error",
"field": "query",
"message": "Query must be at least 2 characters",
"suggestion": "Try a longer search term"
}
return result
if max_results < 1 or max_results > 100:
result["error"] = {
"type": "validation_error",
"field": "max_results",
"message": f"max_results must be 1-100, got {max_results}",
"suggestion": "Use max_results between 1 and 100"
}
return result
# Check path exists
from pathlib import Path
search_path = Path(path)
if not search_path.exists():
result["error"] = {
"type": "not_found",
"resource": "path",
"message": f"Path does not exist: {path}",
"suggestion": "Check the path or use '.' for current directory"
}
return result
# Do the search
matches = []
for file in search_path.rglob(f"*{query}*"):
if len(matches) >= max_results:
break
matches.append(str(file))
result["success"] = True
result["data"] = {
"matches": matches,
"count": len(matches),
"truncated": len(matches) == max_results,
"searched_path": str(search_path.absolute())
}
except PermissionError as e:
result["error"] = {
"type": "permission_denied",
"message": f"Cannot access path: {e}",
"suggestion": "Try a different directory or check permissions"
}
except Exception as e:
result["error"] = {
"type": "internal_error",
"message": str(e),
"suggestion": "This is unexpected. Try with different parameters."
}
return resultPattern 2: Retry-Friendly Errors
For transient failures (network issues, rate limits), provide information that helps the LLM decide whether and when to retry:
import asyncio
import httpx
from datetime import datetime, timedelta
class RetryInfo:
def __init__(
self,
should_retry: bool,
retry_after_seconds: int | None = None,
max_retries: int = 3,
backoff_multiplier: float = 2.0
):
self.should_retry = should_retry
self.retry_after_seconds = retry_after_seconds
self.max_retries = max_retries
self.backoff_multiplier = backoff_multiplier
@mcp.tool()
async def fetch_api_data(
endpoint: str,
attempt: int = 1
) -> dict:
"""
Fetch data from external API with retry guidance.
Args:
endpoint: API endpoint to call
attempt: Current attempt number (for retry tracking)
"""
max_attempts = 3
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"https://api.example.com/{endpoint}")
if response.status_code == 200:
return {
"success": True,
"data": response.json(),
"attempt": attempt
}
elif response.status_code == 429: # Rate limited
retry_after = int(response.headers.get("Retry-After", 60))
return {
"success": False,
"error": {
"type": "rate_limited",
"message": "API rate limit exceeded",
"retry": {
"should_retry": attempt < max_attempts,
"retry_after_seconds": retry_after,
"next_attempt": attempt + 1,
"max_attempts": max_attempts
},
"suggestion": f"Wait {retry_after} seconds, then call with attempt={attempt + 1}"
}
}
elif response.status_code >= 500: # Server error
backoff = min(2 ** attempt, 30) # Exponential backoff, max 30s
return {
"success": False,
"error": {
"type": "server_error",
"status_code": response.status_code,
"message": "External API is having issues",
"retry": {
"should_retry": attempt < max_attempts,
"retry_after_seconds": backoff,
"next_attempt": attempt + 1,
"max_attempts": max_attempts
},
"suggestion": f"Server error, try again in {backoff}s with attempt={attempt + 1}"
}
}
else: # Client error (4xx except 429)
return {
"success": False,
"error": {
"type": "client_error",
"status_code": response.status_code,
"message": response.text[:200],
"retry": {
"should_retry": False,
"reason": "Client errors are not retryable"
},
"suggestion": "Check the endpoint parameter and try a different value"
}
}
except httpx.TimeoutException:
backoff = min(2 ** attempt, 30)
return {
"success": False,
"error": {
"type": "timeout",
"message": "Request timed out after 10 seconds",
"retry": {
"should_retry": attempt < max_attempts,
"retry_after_seconds": backoff,
"next_attempt": attempt + 1
},
"suggestion": f"Network slow, retry in {backoff}s with attempt={attempt + 1}"
}
}
except httpx.ConnectError:
return {
"success": False,
"error": {
"type": "connection_failed",
"message": "Could not connect to API",
"retry": {
"should_retry": attempt < max_attempts,
"retry_after_seconds": 5,
"next_attempt": attempt + 1
},
"suggestion": "Network issue, retry in 5s"
}
}Pattern 3: Graceful Degradation
When full functionality isn't available, return partial results with clear explanation of what's missing:
@mcp.tool()
async def get_weather(
location: str,
include_forecast: bool = True,
include_alerts: bool = True
) -> dict:
"""
Get weather for a location with optional forecast and alerts.
Gracefully degrades if some data sources are unavailable.
"""
result = {
"success": True,
"data": {},
"degraded": False,
"warnings": []
}
# Always try to get current conditions
try:
current = await fetch_current_weather(location)
result["data"]["current"] = current
except Exception as e:
# Current weather is required - fail completely
return {
"success": False,
"error": {
"type": "service_unavailable",
"message": f"Cannot fetch current weather: {e}",
"suggestion": "Weather service may be down, try again later"
}
}
# Forecast is optional - degrade gracefully
if include_forecast:
try:
forecast = await fetch_forecast(location)
result["data"]["forecast"] = forecast
except Exception as e:
result["degraded"] = True
result["warnings"].append({
"component": "forecast",
"reason": str(e),
"impact": "7-day forecast unavailable, only current conditions shown"
})
# Alerts are optional - degrade gracefully
if include_alerts:
try:
alerts = await fetch_alerts(location)
result["data"]["alerts"] = alerts
except Exception as e:
result["degraded"] = True
result["warnings"].append({
"component": "alerts",
"reason": str(e),
"impact": "Weather alerts unavailable"
})
# Add metadata about response completeness
result["metadata"] = {
"components_requested": ["current"] +
(["forecast"] if include_forecast else []) +
(["alerts"] if include_alerts else []),
"components_returned": list(result["data"].keys()),
"completeness": len(result["data"]) / (1 + include_forecast + include_alerts)
}
return resultPattern 4: Context-Aware Error Messages
Error messages should help the LLM understand not just what failed, but how to fix it:
from enum import Enum
from typing import Any
class ErrorContext:
"""Build informative error context for LLMs."""
@staticmethod
def validation_error(
field: str,
value: Any,
expected: str,
valid_examples: list[str] | None = None
) -> dict:
"""Create a validation error with helpful context."""
error = {
"type": "validation_error",
"field": field,
"received": str(value),
"expected": expected,
"message": f"Invalid {field}: expected {expected}, got {type(value).__name__}"
}
if valid_examples:
error["examples"] = valid_examples
error["suggestion"] = f"Try one of: {', '.join(valid_examples)}"
return error
@staticmethod
def not_found(
resource_type: str,
identifier: str,
similar: list[str] | None = None
) -> dict:
"""Create a not-found error with alternatives."""
error = {
"type": "not_found",
"resource_type": resource_type,
"identifier": identifier,
"message": f"{resource_type} '{identifier}' not found"
}
if similar:
error["similar_matches"] = similar[:5]
error["suggestion"] = f"Did you mean: {similar[0]}?"
else:
error["suggestion"] = f"Check the {resource_type} identifier and try again"
return error
@staticmethod
def permission_error(
action: str,
resource: str,
required_permission: str | None = None
) -> dict:
"""Create a permission error with required access info."""
error = {
"type": "permission_denied",
"action": action,
"resource": resource,
"message": f"Permission denied: cannot {action} {resource}"
}
if required_permission:
error["required_permission"] = required_permission
error["suggestion"] = f"This action requires '{required_permission}' permission"
else:
error["suggestion"] = "Check access permissions for this resource"
return error
@mcp.tool()
async def manage_project(
action: str,
project_name: str,
settings: dict | None = None
) -> dict:
"""Manage project with context-aware errors."""
valid_actions = ["create", "update", "delete", "archive"]
if action not in valid_actions:
return {
"success": False,
"error": ErrorContext.validation_error(
field="action",
value=action,
expected="one of: create, update, delete, archive",
valid_examples=valid_actions
)
}
# Check if project exists
projects = await get_all_projects()
project_names = [p["name"] for p in projects]
if action in ["update", "delete", "archive"]:
if project_name not in project_names:
# Find similar names for suggestions
similar = find_similar(project_name, project_names)
return {
"success": False,
"error": ErrorContext.not_found(
resource_type="project",
identifier=project_name,
similar=similar
)
}
if action == "create":
if project_name in project_names:
return {
"success": False,
"error": {
"type": "already_exists",
"resource_type": "project",
"identifier": project_name,
"message": f"Project '{project_name}' already exists",
"suggestion": "Use 'update' action to modify existing project, or choose a different name"
}
}
# Check permissions
user_perms = await get_user_permissions()
required_perms = {"create": "project.create", "delete": "project.delete"}
if action in required_perms:
if required_perms[action] not in user_perms:
return {
"success": False,
"error": ErrorContext.permission_error(
action=action,
resource=f"project/{project_name}",
required_permission=required_perms[action]
)
}
# Perform action...
return {"success": True, "data": {"action": action, "project": project_name}}Pattern 5: Error Aggregation for Batch Operations
When processing multiple items, collect all errors and report them together:
from dataclasses import dataclass, field
from typing import Generic, TypeVar
T = TypeVar("T")
@dataclass
class BatchResult(Generic[T]):
"""Result of a batch operation with partial success support."""
total: int = 0
succeeded: int = 0
failed: int = 0
results: list[T] = field(default_factory=list)
errors: list[dict] = field(default_factory=list)
@property
def success_rate(self) -> float:
return self.succeeded / self.total if self.total > 0 else 0.0
@property
def partial_success(self) -> bool:
return self.succeeded > 0 and self.failed > 0
@mcp.tool()
async def process_files(
file_paths: list[str],
operation: str = "analyze"
) -> dict:
"""
Process multiple files with detailed error reporting.
Continues processing even if some files fail.
"""
batch = BatchResult[dict](total=len(file_paths))
for path in file_paths:
try:
# Validate path
from pathlib import Path
file = Path(path)
if not file.exists():
batch.failed += 1
batch.errors.append({
"file": path,
"error_type": "not_found",
"message": f"File does not exist: {path}",
"recoverable": False
})
continue
if not file.is_file():
batch.failed += 1
batch.errors.append({
"file": path,
"error_type": "invalid_type",
"message": f"Path is not a file: {path}",
"recoverable": False
})
continue
# Process file
result = await process_single_file(file, operation)
batch.succeeded += 1
batch.results.append({"file": path, **result})
except PermissionError:
batch.failed += 1
batch.errors.append({
"file": path,
"error_type": "permission_denied",
"message": f"Cannot read file: {path}",
"recoverable": True,
"suggestion": "Check file permissions"
})
except Exception as e:
batch.failed += 1
batch.errors.append({
"file": path,
"error_type": "processing_error",
"message": str(e),
"recoverable": False
})
# Build response
response = {
"success": batch.succeeded > 0,
"summary": {
"total": batch.total,
"succeeded": batch.succeeded,
"failed": batch.failed,
"success_rate": round(batch.success_rate, 2)
},
"results": batch.results
}
if batch.errors:
response["errors"] = batch.errors
# Categorize errors for easy understanding
error_types = {}
for err in batch.errors:
t = err["error_type"]
error_types[t] = error_types.get(t, 0) + 1
response["error_summary"] = {
"by_type": error_types,
"recoverable_count": sum(1 for e in batch.errors if e.get("recoverable")),
"suggestion": generate_batch_suggestion(batch.errors)
}
return response
def generate_batch_suggestion(errors: list[dict]) -> str:
"""Generate helpful suggestion based on error patterns."""
types = [e["error_type"] for e in errors]
if types.count("not_found") == len(types):
return "All files not found. Check the file paths."
elif types.count("permission_denied") > len(types) / 2:
return "Many permission errors. Try running with elevated permissions."
elif "not_found" in types:
return f"{types.count('not_found')} files not found. Remove missing files and retry."
else:
return "Mixed errors. Review individual error messages."Pattern 6: Error Recovery Hints
Include explicit recovery steps that the LLM can follow:
@dataclass
class RecoveryStep:
"""A single step the LLM can take to recover."""
action: str
tool: str | None = None
parameters: dict | None = None
description: str = ""
@mcp.tool()
async def deploy_application(
app_name: str,
environment: str,
version: str | None = None
) -> dict:
"""Deploy application with recovery guidance on failure."""
try:
# Check app exists
app = await get_application(app_name)
if not app:
return {
"success": False,
"error": {
"type": "not_found",
"message": f"Application '{app_name}' not found",
"recovery_steps": [
{
"step": 1,
"action": "List available applications",
"tool": "list_applications",
"parameters": {},
"description": "Find the correct app name"
},
{
"step": 2,
"action": "Retry deployment with correct name",
"tool": "deploy_application",
"parameters": {"environment": environment}
}
]
}
}
# Check environment
valid_envs = ["staging", "production", "development"]
if environment not in valid_envs:
return {
"success": False,
"error": {
"type": "invalid_environment",
"message": f"Unknown environment: {environment}",
"recovery_steps": [
{
"step": 1,
"action": "Use valid environment",
"tool": "deploy_application",
"parameters": {
"app_name": app_name,
"environment": "staging" # Suggest safest option
},
"description": f"Valid environments: {', '.join(valid_envs)}"
}
]
}
}
# Check version exists
if version:
available_versions = await get_app_versions(app_name)
if version not in available_versions:
return {
"success": False,
"error": {
"type": "version_not_found",
"message": f"Version {version} not found",
"recovery_steps": [
{
"step": 1,
"action": "List available versions",
"tool": "list_app_versions",
"parameters": {"app_name": app_name}
},
{
"step": 2,
"action": "Deploy latest version",
"tool": "deploy_application",
"parameters": {
"app_name": app_name,
"environment": environment
},
"description": "Omit version to deploy latest"
}
]
}
}
# Attempt deployment
deployment = await start_deployment(app, environment, version)
return {
"success": True,
"data": {
"deployment_id": deployment["id"],
"status": "in_progress",
"estimated_duration": "2-5 minutes"
},
"next_steps": [
{
"action": "Monitor deployment",
"tool": "check_deployment_status",
"parameters": {"deployment_id": deployment["id"]}
}
]
}
except DeploymentInProgressError as e:
return {
"success": False,
"error": {
"type": "deployment_in_progress",
"message": "Another deployment is running",
"current_deployment": e.deployment_id,
"recovery_steps": [
{
"step": 1,
"action": "Check current deployment status",
"tool": "check_deployment_status",
"parameters": {"deployment_id": e.deployment_id}
},
{
"step": 2,
"action": "Wait for completion or cancel",
"options": [
{"tool": "wait_for_deployment", "parameters": {"deployment_id": e.deployment_id}},
{"tool": "cancel_deployment", "parameters": {"deployment_id": e.deployment_id}}
]
}
]
}
}Pattern 7: Error Logging and Observability
Track errors for debugging while keeping responses LLM-friendly:
import logging
import uuid
from datetime import datetime
from functools import wraps
logger = logging.getLogger(__name__)
def track_errors(func):
"""Decorator to track errors with correlation IDs."""
@wraps(func)
async def wrapper(*args, **kwargs):
correlation_id = str(uuid.uuid4())[:8]
start_time = datetime.utcnow()
try:
result = await func(*args, **kwargs)
# Log success
logger.info(
f"[{correlation_id}] {func.__name__} succeeded",
extra={
"correlation_id": correlation_id,
"function": func.__name__,
"duration_ms": (datetime.utcnow() - start_time).total_seconds() * 1000,
"args": str(args)[:100],
"kwargs": str(kwargs)[:100]
}
)
return result
except Exception as e:
# Log detailed error for debugging
logger.error(
f"[{correlation_id}] {func.__name__} failed: {e}",
extra={
"correlation_id": correlation_id,
"function": func.__name__,
"error_type": type(e).__name__,
"error_message": str(e),
"args": str(args)[:100],
"kwargs": str(kwargs)[:100]
},
exc_info=True
)
# Return LLM-friendly error with reference ID
return {
"success": False,
"error": {
"type": "internal_error",
"message": f"An error occurred: {type(e).__name__}",
"reference_id": correlation_id,
"suggestion": f"If this persists, report error ID: {correlation_id}"
}
}
return wrapper
@mcp.tool()
@track_errors
async def complex_operation(data: dict) -> dict:
"""Operation with automatic error tracking."""
# Your logic here...
passTypeScript Error Handling
The same patterns apply in TypeScript with the MCP SDK:
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { z } from "zod";
const server = new Server(
{ name: "error-demo", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// Custom error types
interface ToolError {
type: string;
message: string;
suggestion?: string;
recoverySteps?: RecoveryStep[];
}
interface RecoveryStep {
step: number;
action: string;
tool?: string;
parameters?: Record<string, unknown>;
}
interface ToolResult<T> {
success: boolean;
data?: T;
error?: ToolError;
}
// Helper functions
function validationError(
field: string,
received: unknown,
expected: string
): ToolError {
return {
type: "validation_error",
message: `Invalid ${field}: expected ${expected}, got ${typeof received}`,
suggestion: `Provide a valid ${field}`
};
}
function notFoundError(
resourceType: string,
id: string,
alternatives?: string[]
): ToolError {
const error: ToolError = {
type: "not_found",
message: `${resourceType} '${id}' not found`
};
if (alternatives?.length) {
error.suggestion = `Did you mean: ${alternatives[0]}?`;
}
return error;
}
// Tool implementation
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "get_item") {
const input = GetItemSchema.safeParse(args);
if (!input.success) {
// Return structured error instead of throwing
const result: ToolResult<never> = {
success: false,
error: {
type: "validation_error",
message: "Invalid parameters",
suggestion: input.error.issues
.map(i => `${i.path.join(".")}: ${i.message}`)
.join("; ")
}
};
return {
content: [{
type: "text",
text: JSON.stringify(result, null, 2)
}]
};
}
try {
const item = await database.getItem(input.data.id);
if (!item) {
const similar = await database.findSimilar(input.data.id);
return {
content: [{
type: "text",
text: JSON.stringify({
success: false,
error: notFoundError("item", input.data.id, similar)
}, null, 2)
}]
};
}
return {
content: [{
type: "text",
text: JSON.stringify({ success: true, data: item }, null, 2)
}]
};
} catch (err) {
// Unexpected error - log and return generic message
console.error("get_item failed:", err);
return {
content: [{
type: "text",
text: JSON.stringify({
success: false,
error: {
type: "internal_error",
message: "An unexpected error occurred",
suggestion: "Try again or use different parameters"
}
}, null, 2)
}]
};
}
}
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${name}`
);
});Testing Error Handling
Verify your error handling works correctly:
import pytest
from mcp.shared.exceptions import McpError
@pytest.mark.asyncio
async def test_validation_error_includes_suggestion():
"""Errors should include actionable suggestions."""
result = await get_user("")
assert not result["success"]
assert result["error"]["type"] == "validation_error"
assert "suggestion" in result["error"]
assert len(result["error"]["suggestion"]) > 0
@pytest.mark.asyncio
async def test_not_found_includes_alternatives():
"""Not found errors should suggest alternatives."""
result = await search_items("nonexistent-item")
assert not result["success"]
assert result["error"]["type"] == "not_found"
assert "similar_matches" in result["error"] or "suggestion" in result["error"]
@pytest.mark.asyncio
async def test_batch_operation_continues_on_error():
"""Batch operations should process all items even with failures."""
files = ["exists.txt", "missing.txt", "also-exists.txt"]
result = await process_files(files)
# Should have partial success
assert result["success"] # At least some succeeded
assert result["summary"]["succeeded"] == 2
assert result["summary"]["failed"] == 1
assert len(result["errors"]) == 1
@pytest.mark.asyncio
async def test_retry_info_for_transient_errors():
"""Transient errors should include retry guidance."""
# Simulate rate limit
result = await fetch_with_rate_limit("endpoint", force_rate_limit=True)
assert not result["success"]
assert result["error"]["type"] == "rate_limited"
assert result["error"]["retry"]["should_retry"]
assert "retry_after_seconds" in result["error"]["retry"]
@pytest.mark.asyncio
async def test_recovery_steps_are_executable():
"""Recovery steps should reference valid tools with valid params."""
result = await deploy_application("missing-app", "production")
assert not result["success"]
steps = result["error"]["recovery_steps"]
for step in steps:
assert "action" in step
if "tool" in step:
# Verify tool exists
assert step["tool"] in available_tools
# Verify parameters are valid
if "parameters" in step:
assert isinstance(step["parameters"], dict)Best Practices Summary
- Always include suggestions — Tell the LLM how to fix the problem
- Use structured errors — Consistent format helps LLM parsing
- Provide alternatives — "Did you mean...?" helps recovery
- Include retry guidance — Should retry? When? How many times?
- Degrade gracefully — Partial results beat total failure
- Reference IDs for debugging — Let humans trace issues
- Recovery steps — Explicit next actions the LLM can take
- Test error paths — They're as important as success paths
What's Next?
Now that you can handle errors gracefully, explore these related topics:
- Testing MCP Servers — Comprehensive testing guide
- Performance Optimization — Handle errors without slowing down
- Troubleshooting Guide — Debug common MCP issues
Questions about error handling? Reach out on Twitter.