← Back to MCP Tutorials

Multi-Tenant MCP Architecture

Build secure MCP servers that serve multiple users or organizations with proper isolation, authentication, and rate limiting.

February 202520 min readAdvanced

As MCP adoption grows, you'll likely want to build servers that can handle multiple users or organizations. This guide covers patterns for building production-ready multi-tenant MCP servers that maintain security, performance, and data isolation between tenants.

What You'll Learn

  • Tenant identification and authentication patterns
  • Data isolation strategies (database per tenant vs shared)
  • Rate limiting and quotas per tenant
  • Request context and middleware patterns
  • Deployment architectures for scale

Understanding Multi-Tenancy in MCP

Multi-tenancy means a single MCP server instance serves multiple independent users (tenants). Each tenant should feel like they have their own dedicated server while you benefit from shared infrastructure.

Key Challenges

  • Data Isolation — Tenant A must never see Tenant B's data
  • Authentication — Verify who's making requests
  • Resource Limits — Prevent one tenant from impacting others
  • Configuration — Different tenants may have different settings
  • Billing — Track usage per tenant for monetization

Tenant Identification Patterns

The first challenge is identifying which tenant is making a request. Here are common patterns:

Pattern 1: API Key Authentication

# tenant_auth.py
from dataclasses import dataclass
from typing import Optional
import hashlib
import secrets
import sqlite3

@dataclass
class Tenant:
    id: str
    name: str
    api_key_hash: str
    tier: str  # 'free', 'pro', 'enterprise'
    rate_limit: int
    created_at: str

class TenantAuth:
    def __init__(self, db_path: str = "tenants.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_db()
    
    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tenants (
                id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                api_key_hash TEXT UNIQUE NOT NULL,
                tier TEXT DEFAULT 'free',
                rate_limit INTEGER DEFAULT 100,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()
    
    def create_tenant(self, name: str, tier: str = 'free') -> tuple[str, str]:
        """Create a tenant and return (tenant_id, api_key)"""
        tenant_id = secrets.token_hex(8)
        api_key = f"mcp_{secrets.token_hex(24)}"
        api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        rate_limits = {'free': 100, 'pro': 1000, 'enterprise': 10000}
        
        self.conn.execute(
            "INSERT INTO tenants (id, name, api_key_hash, tier, rate_limit) VALUES (?, ?, ?, ?, ?)",
            (tenant_id, name, api_key_hash, tier, rate_limits.get(tier, 100))
        )
        self.conn.commit()
        
        return tenant_id, api_key
    
    def authenticate(self, api_key: str) -> Optional[Tenant]:
        """Authenticate and return tenant, or None if invalid"""
        api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        cursor = self.conn.execute(
            "SELECT id, name, api_key_hash, tier, rate_limit, created_at FROM tenants WHERE api_key_hash = ?",
            (api_key_hash,)
        )
        row = cursor.fetchone()
        
        if row:
            return Tenant(*row)
        return None

Pattern 2: JWT/OAuth Token

# jwt_auth.py
import jwt
from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta

@dataclass
class TokenPayload:
    tenant_id: str
    user_id: str
    scopes: list[str]
    exp: datetime

class JWTAuth:
    def __init__(self, secret: str, algorithm: str = "HS256"):
        self.secret = secret
        self.algorithm = algorithm
    
    def create_token(
        self, 
        tenant_id: str, 
        user_id: str, 
        scopes: list[str],
        expires_in: timedelta = timedelta(hours=24)
    ) -> str:
        payload = {
            "tenant_id": tenant_id,
            "user_id": user_id,
            "scopes": scopes,
            "exp": datetime.utcnow() + expires_in,
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)
    
    def verify_token(self, token: str) -> Optional[TokenPayload]:
        try:
            payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
            return TokenPayload(
                tenant_id=payload["tenant_id"],
                user_id=payload["user_id"],
                scopes=payload["scopes"],
                exp=datetime.fromtimestamp(payload["exp"])
            )
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def has_scope(self, token_payload: TokenPayload, required_scope: str) -> bool:
        return required_scope in token_payload.scopes or "admin" in token_payload.scopes

Request Context Pattern

Use a context object to carry tenant information through your MCP server. This pattern keeps your tool implementations clean while maintaining tenant awareness.

# context.py
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class RequestContext:
    tenant_id: str
    user_id: Optional[str] = None
    scopes: list[str] = None
    metadata: dict[str, Any] = None
    
    def __post_init__(self):
        if self.scopes is None:
            self.scopes = []
        if self.metadata is None:
            self.metadata = {}

# Thread-safe context variable
_request_context: ContextVar[Optional[RequestContext]] = ContextVar(
    'request_context', 
    default=None
)

def set_context(ctx: RequestContext):
    _request_context.set(ctx)

def get_context() -> Optional[RequestContext]:
    return _request_context.get()

def require_context() -> RequestContext:
    ctx = get_context()
    if ctx is None:
        raise RuntimeError("No request context set")
    return ctx

Data Isolation Strategies

Strategy 1: Row-Level Isolation (Shared Database)

All tenants share a database, with tenant_id columns filtering data. Best for smaller scale with many tenants.

# shared_db.py
import asyncpg
from context import require_context

class TenantAwareDB:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    async def get_documents(self, limit: int = 100) -> list[dict]:
        """Automatically filters by current tenant"""
        ctx = require_context()
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT id, title, content, created_at
                FROM documents
                WHERE tenant_id = $1
                ORDER BY created_at DESC
                LIMIT $2
            """, ctx.tenant_id, limit)
            
            return [dict(row) for row in rows]
    
    async def create_document(self, title: str, content: str) -> str:
        """Automatically sets tenant_id"""
        ctx = require_context()
        
        async with self.pool.acquire() as conn:
            doc_id = await conn.fetchval("""
                INSERT INTO documents (tenant_id, title, content)
                VALUES ($1, $2, $3)
                RETURNING id
            """, ctx.tenant_id, title, content)
            
            return doc_id
    
    async def get_document(self, doc_id: str) -> dict | None:
        """Only returns document if it belongs to current tenant"""
        ctx = require_context()
        
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                SELECT id, title, content, created_at
                FROM documents
                WHERE id = $1 AND tenant_id = $2
            """, doc_id, ctx.tenant_id)
            
            return dict(row) if row else None

Strategy 2: Schema-Per-Tenant

Each tenant gets their own database schema. Better isolation while sharing the same database server.

# schema_isolation.py
import asyncpg
from context import require_context

class SchemaIsolatedDB:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    def _schema_name(self, tenant_id: str) -> str:
        # Sanitize tenant_id for use as schema name
        return f"tenant_{tenant_id.replace('-', '_')}"
    
    async def provision_tenant(self, tenant_id: str):
        """Create schema and tables for new tenant"""
        schema = self._schema_name(tenant_id)
        
        async with self.pool.acquire() as conn:
            await conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {schema}.documents (
                    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    title TEXT NOT NULL,
                    content TEXT,
                    created_at TIMESTAMP DEFAULT NOW()
                )
            """)
    
    async def get_documents(self, limit: int = 100) -> list[dict]:
        ctx = require_context()
        schema = self._schema_name(ctx.tenant_id)
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(f"""
                SELECT id, title, content, created_at
                FROM {schema}.documents
                ORDER BY created_at DESC
                LIMIT $1
            """, limit)
            
            return [dict(row) for row in rows]

Strategy 3: Database-Per-Tenant

Maximum isolation — each tenant gets their own database. Best for enterprise/compliance requirements.

# db_per_tenant.py
import asyncpg
from typing import Dict
from context import require_context

class TenantDBManager:
    def __init__(self, base_dsn: str):
        self.base_dsn = base_dsn
        self._pools: Dict[str, asyncpg.Pool] = {}
    
    def _db_name(self, tenant_id: str) -> str:
        return f"tenant_{tenant_id.replace('-', '_')}"
    
    async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
        """Get or create connection pool for tenant"""
        if tenant_id not in self._pools:
            db_name = self._db_name(tenant_id)
            dsn = f"{self.base_dsn}/{db_name}"
            self._pools[tenant_id] = await asyncpg.create_pool(dsn, min_size=1, max_size=5)
        
        return self._pools[tenant_id]
    
    async def provision_tenant(self, tenant_id: str):
        """Create database for new tenant"""
        db_name = self._db_name(tenant_id)
        
        # Connect to default database to create new one
        conn = await asyncpg.connect(f"{self.base_dsn}/postgres")
        try:
            await conn.execute(f"CREATE DATABASE {db_name}")
        finally:
            await conn.close()
        
        # Initialize schema in new database
        pool = await self.get_pool(tenant_id)
        async with pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS documents (
                    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    title TEXT NOT NULL,
                    content TEXT,
                    created_at TIMESTAMP DEFAULT NOW()
                )
            """)
    
    async def execute_for_tenant(self, query: str, *args) -> list:
        """Execute query against current tenant's database"""
        ctx = require_context()
        pool = await self.get_pool(ctx.tenant_id)
        
        async with pool.acquire() as conn:
            return await conn.fetch(query, *args)

Rate Limiting Per Tenant

Implement per-tenant rate limits to ensure fair resource distribution and protect your service.

# rate_limiter.py
import time
import redis
from dataclasses import dataclass
from typing import Optional
from context import require_context

@dataclass
class RateLimitResult:
    allowed: bool
    remaining: int
    reset_at: float
    retry_after: Optional[float] = None

class TenantRateLimiter:
    def __init__(self, redis_client: redis.Redis, default_limit: int = 100):
        self.redis = redis_client
        self.default_limit = default_limit
        self.window_seconds = 60  # 1 minute window
    
    def _key(self, tenant_id: str) -> str:
        window = int(time.time() / self.window_seconds)
        return f"rate_limit:{tenant_id}:{window}"
    
    async def check_limit(self, limit: Optional[int] = None) -> RateLimitResult:
        """Check and increment rate limit for current tenant"""
        ctx = require_context()
        key = self._key(ctx.tenant_id)
        limit = limit or self.default_limit
        
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, self.window_seconds)
        results = pipe.execute()
        
        current = results[0]
        window_end = (int(time.time() / self.window_seconds) + 1) * self.window_seconds
        
        if current > limit:
            return RateLimitResult(
                allowed=False,
                remaining=0,
                reset_at=window_end,
                retry_after=window_end - time.time()
            )
        
        return RateLimitResult(
            allowed=True,
            remaining=limit - current,
            reset_at=window_end
        )

class TieredRateLimiter(TenantRateLimiter):
    """Rate limiter with different limits per tier"""
    
    TIER_LIMITS = {
        'free': 100,
        'pro': 1000,
        'enterprise': 10000
    }
    
    async def check_limit_for_tier(self, tier: str) -> RateLimitResult:
        limit = self.TIER_LIMITS.get(tier, self.default_limit)
        return await self.check_limit(limit)

Complete Multi-Tenant MCP Server

Here's a complete example putting all the patterns together:

# multi_tenant_server.py
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.exceptions import McpError
import asyncpg
import redis

from tenant_auth import TenantAuth, Tenant
from context import RequestContext, set_context, require_context
from rate_limiter import TieredRateLimiter

# Initialize
mcp = FastMCP("Multi-Tenant Document Server")
auth = TenantAuth()
redis_client = redis.Redis()
rate_limiter = TieredRateLimiter(redis_client)
db_pool: asyncpg.Pool = None

@mcp.on_startup
async def startup():
    global db_pool
    db_pool = await asyncpg.create_pool(
        "postgresql://localhost/multitenant_mcp",
        min_size=5,
        max_size=20
    )

# Middleware for authentication and rate limiting
async def authenticate_request(api_key: str) -> Tenant:
    """Authenticate and set up request context"""
    tenant = auth.authenticate(api_key)
    if not tenant:
        raise McpError("INVALID_REQUEST", "Invalid API key")
    
    # Check rate limit
    ctx = RequestContext(tenant_id=tenant.id)
    set_context(ctx)
    
    result = await rate_limiter.check_limit_for_tier(tenant.tier)
    if not result.allowed:
        raise McpError(
            "RESOURCE_EXHAUSTED",
            "Rate limit exceeded. Retry after " + str(int(result.retry_after)) + " seconds"
        )
    
    return tenant

# Tools with tenant isolation

@mcp.tool()
async def list_documents(api_key: str, limit: int = 20) -> str:
    """List documents for the authenticated tenant"""
    tenant = await authenticate_request(api_key)
    ctx = require_context()
    
    async with db_pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT id, title, created_at
            FROM documents
            WHERE tenant_id = $1
            ORDER BY created_at DESC
            LIMIT $2
        """, ctx.tenant_id, limit)
    
    if not rows:
        return f"No documents found for {tenant.name}"
    
    docs = [f"- {row['title']} (ID: {row['id']})" for row in rows]
    return f"Documents for {tenant.name}:\n" + "\n".join(docs)

@mcp.tool()
async def create_document(api_key: str, title: str, content: str) -> str:
    """Create a new document for the authenticated tenant"""
    tenant = await authenticate_request(api_key)
    ctx = require_context()
    
    async with db_pool.acquire() as conn:
        doc_id = await conn.fetchval("""
            INSERT INTO documents (tenant_id, title, content)
            VALUES ($1, $2, $3)
            RETURNING id
        """, ctx.tenant_id, title, content)
    
    # Track for billing
    await track_usage(ctx.tenant_id, "document_created")
    
    return f"Created document '{title}' with ID: {doc_id}"

@mcp.tool()
async def get_document(api_key: str, document_id: str) -> str:
    """Retrieve a document (only if it belongs to the tenant)"""
    tenant = await authenticate_request(api_key)
    ctx = require_context()
    
    async with db_pool.acquire() as conn:
        row = await conn.fetchrow("""
            SELECT title, content, created_at
            FROM documents
            WHERE id = $1 AND tenant_id = $2
        """, document_id, ctx.tenant_id)
    
    if not row:
        return "Document not found or access denied"
    
    return f"# {row['title']}\n\n{row['content']}\n\nCreated: {row['created_at']}"

@mcp.tool()
async def search_documents(api_key: str, query: str, limit: int = 10) -> str:
    """Search documents within the tenant's scope"""
    tenant = await authenticate_request(api_key)
    ctx = require_context()
    
    async with db_pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT id, title, 
                   ts_headline('english', content, plainto_tsquery($2)) as snippet
            FROM documents
            WHERE tenant_id = $1
              AND to_tsvector('english', title || ' ' || content) @@ plainto_tsquery($2)
            ORDER BY ts_rank(to_tsvector('english', title || ' ' || content), plainto_tsquery($2)) DESC
            LIMIT $3
        """, ctx.tenant_id, query, limit)
    
    if not rows:
        return f"No documents matching '{query}'"
    
    results = [f"**{row['title']}** (ID: {row['id']})\n{row['snippet']}" for row in rows]
    return f"Search results for '{query}':\n\n" + "\n\n".join(results)

# Usage tracking for billing
async def track_usage(tenant_id: str, event: str, count: int = 1):
    """Track usage events for billing"""
    key = f"usage:{tenant_id}:{event}"
    redis_client.incrby(key, count)

if __name__ == "__main__":
    mcp.run()

Deployment Architecture

Shared Infrastructure (Most Common)

┌─────────────────────────────────────────────────────────┐
│                    Load Balancer                         │
│                  (nginx / ALB / Cloudflare)              │
└─────────────────────────────────────────────────────────┘
                           │
           ┌───────────────┼───────────────┐
           ▼               ▼               ▼
    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
    │ MCP Server  │ │ MCP Server  │ │ MCP Server  │
    │  Instance 1 │ │  Instance 2 │ │  Instance 3 │
    └─────────────┘ └─────────────┘ └─────────────┘
           │               │               │
           └───────────────┼───────────────┘
                           │
    ┌──────────────────────┼──────────────────────┐
    │                      ▼                      │
    │  ┌─────────────┐  ┌─────────────┐          │
    │  │  PostgreSQL │  │    Redis    │          │
    │  │  (Shared DB)│  │ (Rate Limit)│          │
    │  └─────────────┘  └─────────────┘          │
    └─────────────────────────────────────────────┘

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-multi-tenant
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-multi-tenant
  template:
    metadata:
      labels:
        app: mcp-multi-tenant
    spec:
      containers:
      - name: mcp-server
        image: your-registry/mcp-multi-tenant:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: database-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: redis-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-multi-tenant-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-multi-tenant
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: mcp_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"

Billing & Usage Tracking

# billing.py
import redis
from datetime import datetime
from typing import Dict
from dataclasses import dataclass

@dataclass
class UsageReport:
    tenant_id: str
    period: str
    events: Dict[str, int]
    total_requests: int
    estimated_cost: float

class BillingTracker:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        
        # Pricing per event type
        self.pricing = {
            'tool_call': 0.001,        # $0.001 per tool call
            'document_created': 0.01,   # $0.01 per document
            'search_query': 0.005,      # $0.005 per search
        }
    
    def track(self, tenant_id: str, event_type: str, count: int = 1):
        """Track a billable event"""
        month = datetime.now().strftime("%Y-%m")
        key = f"billing:{tenant_id}:{month}:{event_type}"
        self.redis.incrby(key, count)
    
    def get_usage(self, tenant_id: str, month: str = None) -> UsageReport:
        """Get usage report for a tenant"""
        month = month or datetime.now().strftime("%Y-%m")
        pattern = f"billing:{tenant_id}:{month}:*"
        
        events = {}
        total_requests = 0
        estimated_cost = 0.0
        
        for key in self.redis.scan_iter(pattern):
            event_type = key.decode().split(":")[-1]
            count = int(self.redis.get(key) or 0)
            events[event_type] = count
            total_requests += count
            estimated_cost += count * self.pricing.get(event_type, 0)
        
        return UsageReport(
            tenant_id=tenant_id,
            period=month,
            events=events,
            total_requests=total_requests,
            estimated_cost=estimated_cost
        )

# Add billing tool to MCP server
@mcp.tool()
async def get_usage_report(api_key: str, month: str = None) -> str:
    """Get usage report for the current billing period"""
    tenant = await authenticate_request(api_key)
    report = billing.get_usage(tenant.id, month)
    
    lines = [
        "# Usage Report for " + tenant.name,
        "Period: " + report.period,
        "Total Requests: " + str(report.total_requests),
        "",
        "## Breakdown:",
    ]
    
    for event, count in report.events.items():
        cost = count * billing.pricing.get(event, 0)
        lines.append("- " + event + ": " + str(count) + " ($" + format(cost, '.2f') + ")")
    
    lines.append("\n**Estimated Total: $" + format(report.estimated_cost, '.2f') + "**")
    
    return "\n".join(lines)

Security Best Practices

Multi-Tenant Security Checklist

  • ✓ Data Isolation

    Every database query includes tenant_id filter. No exceptions.

  • ✓ Input Validation

    Validate all inputs. Use parameterized queries. Never trust client data.

  • ✓ API Key Rotation

    Support key rotation without downtime. Old keys grace period.

  • ✓ Audit Logging

    Log all sensitive operations with tenant context for compliance.

  • ✓ Resource Limits

    Rate limiting, request size limits, timeout enforcement per tenant.

  • ✓ Encryption

    TLS for transport. Consider tenant-specific encryption keys for sensitive data.

Next Steps