·35 min

Complete Guide: Building Your Own Model Context Protocol (MCP) Server for AI Tool Integration and Communication

A comprehensive guide to building a Model Context Protocol (MCP) server for managing contextual information in machine learning systems, including implementation, scaling, security, and best practices for enterprise AI applications.

DK

Daniel Kliewer

Author, Sovereign AI

Model Context ProtocolMCPAI IntegrationContext ManagementFastAPIKubernetesVector DatabasesScalable ArchitectureAI InfrastructureEnterprise AI
Sovereign AI book cover

From the Book

This is from Sovereign AI: Building Local-First Intelligent Systems.

Get the Book — $88
Complete Guide: Building Your Own Model Context Protocol (MCP) Server for AI Tool Integration and Communication

Image

Building Your Own Model Context Protocol (MCP) Server: Comprehensive Guide

1. Introduction to MCP

What is the Model Context Protocol (MCP)?

The Model Context Protocol (MCP) is a standardized framework for managing, transmitting, and utilizing contextual information in machine learning systems. At its core, MCP defines how context—the set of relevant information surrounding a model's operation—should be captured, structured, passed to models, and used during inference.

Unlike traditional ML deployment approaches where models operate as isolated black boxes, MCP creates an ecosystem where models are constantly aware of their operational environment, historical interactions, and user-specific requirements. This context-aware approach enables models to make more informed, personalized, and accurate predictions.

The Importance of Context Management

Context management addresses a fundamental limitation in traditional ML deployments: the assumption that a model's input alone contains all information needed for an optimal response. In reality, several contextual factors affect how a model should perform:

  • Environmental context: Information about the deployment environment, including time, location, system resources, and operational constraints
  • User context: User preferences, history, demographics, interaction patterns, and specific requirements
  • Task context: The broader goal the model is helping to achieve, including prior steps in a multi-step process
  • Data context: Information about the data's source, quality, recency, and potential biases

By managing this context effectively, MCP allows models to:

  • Personalize responses based on user history
  • Adapt to environmental changes
  • Maintain conversation coherence across multiple interactions
  • Understand the intent behind ambiguous requests
  • Follow evolving guidelines or constraints

Benefits of MCP

Scalability

  • Horizontal Scaling: MCP's standardized context format allows for seamless distribution of model workloads across multiple servers
  • Decoupled Architecture: Context management can be scaled independently from model inference
  • Stateless Design: Models can be spun up or down as needed without losing contextual information

Flexibility

  • Model Interchangeability: Different models can access the same context data through a standardized interface
  • Progressive Enhancement: New context attributes can be added without breaking existing functionality
  • Context Filtering: Only relevant context is passed to each model, improving efficiency

Model Lifecycle Management

  • Version Control: Context includes model version information, enabling graceful transitions between versions
  • Performance Monitoring: Context tracking allows for detailed analysis of model behavior across different scenarios
  • Continuous Improvement: Historical context enables targeted retraining based on actual usage patterns

2. Prerequisites for Building Your Own MCP Server

Hardware Requirements

Compute Resources

  • CPU: Minimum 8 cores (16+ recommended for production), preferably server-grade processors like Intel Xeon or AMD EPYC
  • GPU: For transformer-based models, NVIDIA GPUs with at least 16GB VRAM (A100, V100, or RTX 3090/4090); multiple GPUs recommended for high workloads
  • Memory: 32GB RAM minimum (64-128GB recommended for production)
  • Storage:
    • 500GB+ SSD for OS and applications (NVMe preferred)
    • 1TB+ storage for model artifacts and context data (scalable based on expected usage)
    • High IOPS capability for context retrieval operations

Networking

  • Bandwidth: 10Gbps+ network interfaces for high-throughput model serving
  • Latency: Low-latency connections, especially if context data is stored separately from models

Software Requirements

Operating System

  • Linux Distributions: Ubuntu 20.04/22.04 LTS or CentOS 8/9 (preferred for ML workloads)
  • Windows: Windows Server 2019/2022 (if required by organizational constraints)

Containerization

  • Docker: Engine 20.10+ for containerizing individual components
  • Kubernetes: v1.24+ for orchestrating multi-container deployments
  • Helm: For managing Kubernetes applications

Model Management

  • TensorFlow Serving: For TensorFlow models
  • TorchServe: For PyTorch models
  • Triton Inference Server: For multi-framework model serving
  • MLflow: For model lifecycle management
  • KServe/Seldon Core: For Kubernetes-native model serving

Database Systems

  • Vector Database: ChromaDB, Pinecone, or Milvus for storing and retrieving embeddings
  • Relational Database: PostgreSQL 14+ for structured context data and metadata
  • Redis: For high-speed context caching and session management
  • MongoDB: For schema-flexible context storage

Networking and APIs

  • REST Framework: FastAPI or Flask for creating REST endpoints
  • gRPC: For high-performance internal communication
  • Envoy/Istio: For API gateway and service mesh capabilities
  • Protocol Buffers: For efficient data serialization

Monitoring and Logging

  • Prometheus: For metrics collection
  • Grafana: For metrics visualization
  • Elasticsearch, Logstash, Kibana (ELK): For comprehensive logging
  • Jaeger/Zipkin: For distributed tracing

3. Installation and Setup

Operating System Setup

bash
1# Example for Ubuntu Server 22.04 LTS
2# 1. Download Ubuntu Server ISO from ubuntu.com
3# 2. Create bootable USB and install Ubuntu Server
4# 3. Update system packages
5sudo apt update && sudo apt upgrade -y
6
7# 4. Install basic utilities
8sudo apt install -y build-essential curl wget git software-properties-common

Docker Installation

bash
1# Install Docker on Ubuntu
2sudo apt install -y apt-transport-https ca-certificates curl gnupg lsb-release
3curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
4echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
5sudo apt update
6sudo apt install -y docker-ce docker-ce-cli containerd.io
7
8# Add current user to docker group
9sudo usermod -aG docker $USER
10
11# Verify installation
12newgrp docker
13docker --version

Kubernetes Setup

bash
1# Install kubectl
2curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
3sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
4
5# Install minikube for local development
6curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
7sudo install minikube-linux-amd64 /usr/local/bin/minikube
8
9# Start minikube
10minikube start --driver=docker --memory=8g --cpus=4
11
12# For production, consider using kubeadm or managed Kubernetes services

GPU Support

bash
1# Install NVIDIA drivers
2sudo apt install -y nvidia-driver-535 # Choose appropriate version
3
4# Install NVIDIA Container Toolkit
5distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
6curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
7curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
8sudo apt update && sudo apt install -y nvidia-container-toolkit
9sudo systemctl restart docker
10
11# Verify GPU is accessible to Docker
12docker run --gpus all nvidia/cuda:11.8.0-base-ubuntu22.04 nvidia-smi

Database Setup

bash
1# PostgreSQL for structured context data
2sudo apt install -y postgresql postgresql-contrib
3sudo systemctl start postgresql
4sudo systemctl enable postgresql
5
6# Create database for MCP
7sudo -u postgres psql -c "CREATE DATABASE mcp_context;"
8sudo -u postgres psql -c "CREATE USER mcp_user WITH ENCRYPTED PASSWORD 'your_secure_password';"
9sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE mcp_context TO mcp_user;"
10
11# Redis for caching
12sudo apt install -y redis-server
13sudo systemctl start redis-server
14sudo systemctl enable redis-server
15
16# ChromaDB (Vector Database) using Docker
17docker run -d -p 8000:8000 --name chromadb chromadb/chroma

Setting Up Model Serving Infrastructure

bash
1# TensorFlow Serving with Docker
2docker pull tensorflow/serving:latest
3
4# TorchServe
5git clone https://github.com/pytorch/serve.git
6cd serve
7docker build -t torchserve:latest .
8
9# Triton Inference Server
10docker pull nvcr.io/nvidia/tritonserver:22.12-py3
11
12# MLflow
13pip install mlflow
14mlflow server --host 0.0.0.0 --port 5000

4. Configuring the Model Context Protocol

Defining Context Parameters

The MCP server needs to track various context parameters. Create a schema that includes:

python
1# Example context schema (context_schema.py)
2from pydantic import BaseModel, Field
3from typing import Dict, List, Optional, Any
4from datetime import datetime
5import uuid
6
7class UserContext(BaseModel):
8 user_id: str
9 preferences: Dict[str, Any] = {}
10 session_history: List[str] = []
11 demographics: Optional[Dict[str, Any]] = None
12
13class EnvironmentContext(BaseModel):
14 timestamp: datetime = Field(default_factory=datetime.now)
15 deployment_environment: str = "production" # or "staging", "development"
16 server_load: float = 0.0
17 available_resources: Dict[str, float] = {}
18
19class ModelContext(BaseModel):
20 model_id: str
21 model_version: str
22 parameters: Dict[str, Any] = {}
23 constraints: Dict[str, Any] = {}
24
25class DataContext(BaseModel):
26 data_source: str = "unknown"
27 data_timestamp: Optional[datetime] = None
28 data_quality_metrics: Dict[str, float] = {}
29
30class MCPContext(BaseModel):
31 context_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
32 user: UserContext
33 environment: EnvironmentContext = Field(default_factory=EnvironmentContext)
34 model: ModelContext
35 data: DataContext = Field(default_factory=DataContext)
36 custom_attributes: Dict[str, Any] = {}

API Integration

Create a FastAPI server to handle MCP context:

python
1# app.py
2from fastapi import FastAPI, Depends, HTTPException
3from sqlalchemy.orm import Session
4from typing import Dict, Any
5
6from context_schema import MCPContext
7from database import SessionLocal, engine, Base
8import context_store
9import model_manager
10
11# Initialize database
12Base.metadata.create_all(bind=engine)
13
14app = FastAPI(title="MCP Server")
15
16# Dependency
17def get_db():
18 db = SessionLocal()
19 try:
20 yield db
21 finally:
22 db.close()
23
24@app.post("/api/v1/context", response_model=Dict[str, Any])
25def create_context(context: MCPContext, db: Session = Depends(get_db)):
26 """Create a new context record"""
27 context_id = context_store.save_context(db, context)
28 return {"context_id": context_id, "status": "created"}
29
30@app.get("/api/v1/context/{context_id}")
31def get_context(context_id: str, db: Session = Depends(get_db)):
32 """Retrieve a specific context by ID"""
33 context = context_store.get_context(db, context_id)
34 if not context:
35 raise HTTPException(status_code=404, detail="Context not found")
36 return context
37
38@app.post("/api/v1/inference/{model_id}")
39async def model_inference(
40 model_id: str,
41 input_data: Dict[str, Any],
42 context_id: str = None,
43 db: Session = Depends(get_db)
44):
45 """Run model inference with context"""
46 context = None
47 if context_id:
48 context = context_store.get_context(db, context_id)
49 if not context:
50 raise HTTPException(status_code=404, detail="Context not found")
51
52 # Get model and run inference
53 result = await model_manager.run_inference(model_id, input_data, context)
54
55 # Update context with this interaction if needed
56 if context_id:
57 context_store.update_context_after_inference(db, context_id, input_data, result)
58
59 return result
60
61@app.put("/api/v1/context/{context_id}")
62def update_context(
63 context_id: str,
64 updates: Dict[str, Any],
65 db: Session = Depends(get_db)
66):
67 """Update specific fields in the context"""
68 success = context_store.update_context(db, context_id, updates)
69 if not success:
70 raise HTTPException(status_code=404, detail="Context not found")
71 return {"status": "updated"}
72
73if __name__ == "__main__":
74 import uvicorn
75 uvicorn.run(app, host="0.0.0.0", port=8080)

Context Handling Implementation

Create the context store:

python
1# context_store.py
2from sqlalchemy.orm import Session
3from sqlalchemy import Column, String, JSON, DateTime
4import json
5from datetime import datetime
6from typing import Dict, Any, Optional
7import uuid
8
9from database import Base
10
11class ContextRecord(Base):
12 __tablename__ = "contexts"
13
14 id = Column(String, primary_key=True, index=True)
15 user_context = Column(JSON)
16 environment_context = Column(JSON)
17 model_context = Column(JSON)
18 data_context = Column(JSON)
19 custom_attributes = Column(JSON)
20 created_at = Column(DateTime, default=datetime.utcnow)
21 updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
22
23def save_context(db: Session, context_data) -> str:
24 """Save context to database"""
25 context_dict = context_data.dict()
26 context_id = context_dict.get("context_id", str(uuid.uuid4()))
27
28 db_context = ContextRecord(
29 id=context_id,
30 user_context=context_dict.get("user"),
31 environment_context=context_dict.get("environment"),
32 model_context=context_dict.get("model"),
33 data_context=context_dict.get("data"),
34 custom_attributes=context_dict.get("custom_attributes", {})
35 )
36
37 db.add(db_context)
38 db.commit()
39 db.refresh(db_context)
40 return context_id
41
42def get_context(db: Session, context_id: str) -> Optional[Dict[str, Any]]:
43 """Retrieve context from database"""
44 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()
45 if not db_context:
46 return None
47
48 return {
49 "context_id": db_context.id,
50 "user": db_context.user_context,
51 "environment": db_context.environment_context,
52 "model": db_context.model_context,
53 "data": db_context.data_context,
54 "custom_attributes": db_context.custom_attributes,
55 "created_at": db_context.created_at,
56 "updated_at": db_context.updated_at
57 }
58
59def update_context(db: Session, context_id: str, updates: Dict[str, Any]) -> bool:
60 """Update specific fields in the context"""
61 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()
62 if not db_context:
63 return False
64
65 # Update appropriate fields based on the structure
66 for key, value in updates.items():
67 if key == "user":
68 db_context.user_context = value
69 elif key == "environment":
70 db_context.environment_context = value
71 elif key == "model":
72 db_context.model_context = value
73 elif key == "data":
74 db_context.data_context = value
75 elif key == "custom_attributes":
76 db_context.custom_attributes = value
77
78 db_context.updated_at = datetime.utcnow()
79 db.commit()
80 return True
81
82def update_context_after_inference(
83 db: Session,
84 context_id: str,
85 input_data: Dict[str, Any],
86 result: Dict[str, Any]
87) -> bool:
88 """Update context after model inference"""
89 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()
90 if not db_context:
91 return False
92
93 # Add this interaction to user history
94 user_context = db_context.user_context
95 if "session_history" not in user_context:
96 user_context["session_history"] = []
97
98 # Add interaction record
99 user_context["session_history"].append({
100 "timestamp": datetime.utcnow().isoformat(),
101 "input": input_data,
102 "output": result
103 })
104
105 # Limit history size
106 if len(user_context["session_history"]) > 100: # Example limit
107 user_context["session_history"] = user_context["session_history"][-100:]
108
109 db_context.user_context = user_context
110 db_context.updated_at = datetime.utcnow()
111 db.commit()
112 return True

Model Manager Implementation

python
1# model_manager.py
2import os
3import json
4import asyncio
5import httpx
6from typing import Dict, Any, Optional
7import numpy as np
8import tensorflow as tf
9import torch
10import redis
11
12# Redis client for caching
13redis_client = redis.Redis(host='localhost', port=6379, db=0)
14
15# Model registry - in production this would be a database
16MODEL_REGISTRY = {
17 "gpt-model": {
18 "type": "http",
19 "endpoint": "http://localhost:8001/v1/models/gpt:predict",
20 "version": "1.0.0"
21 },
22 "bert-embedding": {
23 "type": "tensorflow",
24 "path": "/models/bert",
25 "version": "2.1.0"
26 },
27 "image-classifier": {
28 "type": "pytorch",
29 "path": "/models/image_classifier.pt",
30 "version": "1.2.0"
31 }
32}
33
34async def run_inference(model_id: str, input_data: Dict[str, Any], context: Optional[Dict[str, Any]] = None):
35 """Run model inference with context awareness"""
36 if model_id not in MODEL_REGISTRY:
37 raise ValueError(f"Model {model_id} not found in registry")
38
39 model_info = MODEL_REGISTRY[model_id]
40
41 # Prepare input with context
42 inference_input = prepare_input_with_context(model_id, input_data, context)
43
44 # Check cache for identical request if appropriate
45 cache_key = None
46 if model_info.get("cacheable", False):
47 cache_key = f"{model_id}:{hash(json.dumps(inference_input, sort_keys=True))}"
48 cached_result = redis_client.get(cache_key)
49 if cached_result:
50 return json.loads(cached_result)
51
52 # Run model based on type
53 if model_info["type"] == "http":
54 result = await http_inference(model_info["endpoint"], inference_input)
55 elif model_info["type"] == "tensorflow":
56 result = tf_inference(model_info["path"], inference_input)
57 elif model_info["type"] == "pytorch":
58 result = pytorch_inference(model_info["path"], inference_input)
59 else:
60 raise ValueError(f"Unsupported model type: {model_info['type']}")
61
62 # Store in cache if appropriate
63 if cache_key:
64 redis_client.setex(
65 cache_key,
66 model_info.get("cache_ttl", 3600), # Default 1 hour TTL
67 json.dumps(result)
68 )
69
70 return result
71
72def prepare_input_with_context(model_id: str, input_data: Dict[str, Any], context: Optional[Dict[str, Any]]):
73 """Prepare model input with relevant context information"""
74 if not context:
75 return input_data
76
77 # Deep copy to avoid modifying original
78 enhanced_input = input_data.copy()
79
80 # Add context based on model requirements
81 if model_id == "gpt-model":
82 # For a GPT-like model, we might include conversation history
83 if "user" in context and "session_history" in context["user"]:
84 # Format history appropriately for the model
85 history = context["user"]["session_history"][-5:] # Last 5 interactions
86 enhanced_input["conversation_history"] = history
87
88 # Add user preferences if available
89 if "user" in context and "preferences" in context["user"]:
90 enhanced_input["user_preferences"] = context["user"]["preferences"]
91
92 elif model_id == "bert-embedding":
93 # For embeddings, maybe we add language preference
94 if "user" in context and "preferences" in context["user"]:
95 enhanced_input["language"] = context["user"]["preferences"].get("language", "en")
96
97 # Add model-specific parameters from context
98 if "model" in context and "parameters" in context["model"]:
99 enhanced_input["parameters"] = context["model"]["parameters"]
100
101 # Add environmental context if relevant
102 if "environment" in context:
103 enhanced_input["environment"] = {
104 "timestamp": context["environment"].get("timestamp"),
105 "deployment": context["environment"].get("deployment_environment")
106 }
107
108 return enhanced_input
109
110async def http_inference(endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
111 """Call a model exposed via HTTP endpoint"""
112 async with httpx.AsyncClient() as client:
113 response = await client.post(endpoint, json=data)
114 response.raise_for_status()
115 return response.json()
116
117def tf_inference(model_path: str, data: Dict[str, Any]) -> Dict[str, Any]:
118 """Run inference on a TensorFlow model"""
119 # Load model (in production, this would be cached)
120 model = tf.saved_model.load(model_path)
121
122 # Prepare tensors
123 input_tensors = {}
124 for key, value in data.items():
125 if key != "parameters" and key != "environment":
126 if isinstance(value, list):
127 input_tensors[key] = tf.constant(value)
128 else:
129 input_tensors[key] = tf.constant([value])
130
131 # Run inference
132 results = model.signatures["serving_default"](**input_tensors)
133
134 # Convert results to Python types
135 output = {}
136 for key, tensor in results.items():
137 output[key] = tensor.numpy().tolist()
138
139 return output
140
141def pytorch_inference(model_path: str, data: Dict[str, Any]) -> Dict[str, Any]:
142 """Run inference on a PyTorch model"""
143 # Load model (in production, this would be cached)
144 model = torch.load(model_path)
145 model.eval()
146
147 # Prepare tensors
148 input_tensors = {}
149 for key, value in data.items():
150 if key != "parameters" and key != "environment":
151 if isinstance(value, list):
152 input_tensors[key] = torch.tensor(value)
153 else:
154 input_tensors[key] = torch.tensor([value])
155
156 # Run inference
157 with torch.no_grad():
158 results = model(**input_tensors)
159
160 # Convert results to Python types
161 if isinstance(results, tuple):
162 output = {}
163 for i, result in enumerate(results):
164 output[f"output_{i}"] = result.numpy().tolist()
165 else:
166 output = {"output": results.numpy().tolist()}
167
168 return output

Contextual Adaptation

Implement a context adapter that modifies model behavior based on context:

python
1# context_adapter.py
2from typing import Dict, Any, List, Optional
3
4class ContextAdapter:
5 """Adapts model behavior based on context"""
6
7 @staticmethod
8 def adapt_model_parameters(model_id: str, default_params: Dict[str, Any],
9 context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
10 """Modify model parameters based on context"""
11 if not context:
12 return default_params
13
14 params = default_params.copy()
15
16 # User-specific adaptations
17 if "user" in context:
18 user = context["user"]
19
20 # Adapt language model temperature based on user preference
21 if model_id.startswith("gpt") and "preferences" in user:
22 if "creativity" in user["preferences"]:
23 creativity = user["preferences"]["creativity"]
24 # Map creativity preference to temperature
25 if creativity == "high":
26 params["temperature"] = max(params.get("temperature", 0.7), 0.9)
27 elif creativity == "low":
28 params["temperature"] = min(params.get("temperature", 0.7), 0.3)
29
30 # Adapt response length based on user preference
31 if "preferences" in user and "verbosity" in user["preferences"]:
32 verbosity = user["preferences"]["verbosity"]
33 if verbosity == "concise":
34 params["max_tokens"] = min(params.get("max_tokens", 1024), 256)
35 elif verbosity == "detailed":
36 params["max_tokens"] = max(params.get("max_tokens", 1024), 1024)
37
38 # Environment-specific adaptations
39 if "environment" in context:
40 env = context["environment"]
41
42 # Reduce complexity under high server load
43 if "server_load" in env and env["server_load"] > 0.8:
44 params["max_tokens"] = min(params.get("max_tokens", 1024), 512)
45 if "top_k" in params:
46 params["top_k"] = min(params["top_k"], 10)
47
48 # Adapt based on deployment environment
49 if "deployment_environment" in env:
50 if env["deployment_environment"] == "development":
51 # More logging in development
52 params["verbose"] = True
53 elif env["deployment_environment"] == "production":
54 # Safer settings in production
55 params["safety_filter"] = True
56
57 # Data-specific adaptations
58 if "data" in context and "data_quality_metrics" in context["data"]:
59 quality = context["data"]["data_quality_metrics"]
60
61 # If input data quality is low, be more conservative
62 if "noise_level" in quality and quality["noise_level"] > 0.6:
63 params["temperature"] = min(params.get("temperature", 0.7), 0.4)
64 if "top_p" in params:
65 params["top_p"] = min(params["top_p"], 0.92)
66
67 # Model-specific adaptations from context
68 if "model" in context and "parameters" in context["model"]:
69 # Explicit parameter overrides from context
70 for k, v in context["model"]["parameters"].items():
71 params[k] = v
72
73 return params
74
75 @staticmethod
76 def adapt_response(model_id: str, response: Any,
77 context: Optional[Dict[str, Any]]) -> Any:
78 """Post-process model response based on context"""
79 if not context:
80 return response
81
82 # For text responses
83 if isinstance(response, str) or (isinstance(response, dict) and "text" in response):
84 text = response if isinstance(response, str) else response["text"]
85
86 # Apply user language preference
87 if "user" in context and "preferences" in context["user"]:
88 prefs = context["user"]["preferences"]
89 if "language" in prefs and prefs["language"] != "en":
90 # In a real system, this would call a translation service
91 pass
92
93 # Apply formality preference
94 if "formality" in prefs:
95 if prefs["formality"] == "formal" and not text.startswith("Dear"):
96 text = "I would like to inform you that " + text
97 elif prefs["formality"] == "casual" and text.startswith("Dear"):
98 text = text.replace("Dear", "Hey").replace("Sincerely", "Cheers")
99
100 # Format response appropriately
101 if isinstance(response, dict):
102 response["text"] = text
103 else:
104 response = text
105
106 return response

5. Testing and Validation

Unit Tests

Create a test suite to validate the MCP server:

python
1# test_mcp_server.py
2import unittest
3import json
4from fastapi.testclient import TestClient
5from sqlalchemy import create_engine
6from sqlalchemy.orm import sessionmaker
7from sqlalchemy.pool import StaticPool
8
9from app import app, get_db
10from database import Base
11from context_schema import MCPContext, UserContext, ModelContext
12
13# Create in-memory database for testing
14engine = create_engine(
15 "sqlite:///:memory:",
16 connect_args={"check_same_thread": False},
17 poolclass=StaticPool,
18)
19TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
20Base.metadata.create_all(bind=engine)
21
22def override_get_db():
23 try:
24 db = TestingSessionLocal()
25 yield db
26 finally:
27 db.close()
28
29app.dependency_overrides[get_db] = override_get_db
30client = TestClient(app)
31
32class TestMCPServer(unittest.TestCase):
33 def test_create_context(self):
34 """Test creating a new context"""
35 context_data = {
36 "user": {
37 "user_id": "test_user_1",
38 "preferences": {"language": "en"}
39 },
40 "model": {
41 "model_id": "gpt-model",
42 "model_version": "1.0.0"
43 }
44 }
45
46 response = client.post("/api/v1/context", json=context_data)
47 self.assertEqual(response.status_code, 200)
48 result = response.json()
49 self.assertIn("context_id", result)
50 self.assertEqual(result["status"], "created")
51
52 # Verify we can retrieve it
53 context_id = result["context_id"]
54 get_response = client.get(f"/api/v1/context/{context_id}")
55 self.assertEqual(get_response.status_code, 200)
56 context = get_response.json()
57 self.assertEqual(context["user"]["user_id"], "test_user_1")
58
59 def test_update_context(self):
60 """Test updating an existing context"""
61 # First create a context
62 context_data = {
63 "user": {
64 "user_id": "test_user_2",
65 "preferences": {"language": "en"}
66 },
67 "model": {
68 "model_id": "gpt-model",
69 "model_version": "1.0.0"
70 }
71 }
72
73 response = client.post("/api/v1/context", json=context_data)
74 context_id = response.json()["context_id"]
75
76 # Now update it
77 update_data = {
78 "user": {
79 "user_id": "test_user_2",
80 "preferences": {"language": "fr", "formality": "formal"}
81 }
82 }
83
84 update_response = client.put(f"/api/v1/context/{context_id}", json=update_data)
85 self.assertEqual(update_response.status_code, 200)
86
87 # Verify the update
88 get_response = client.get(f"/api/v1/context/{context_id}")
89 context = get_response.json()
90 self.assertEqual(context["user"]["preferences"]["language"], "fr")
91 self.assertEqual(context["user"]["preferences"]["formality"], "formal")
92
93 def test_model_inference_with_context(self):
94 """Test model inference with context (mocked)"""
95 # This would be a more complex mock in a real test
96 # For now, we'll just verify the API structure
97
98 # First create a context
99 context_data = {
100 "user": {
101 "user_id": "test_user_3",
102 "preferences": {"language": "en", "creativity": "high"}
103 },
104 "model": {
105 "model_id": "gpt-model",
106 "model_version": "1.0.0",
107 "parameters": {"max_tokens": 100}
108 }
109 }
110
111 response = client.post("/api/v1/context", json=context_data)
112 context_id = response.json()["context_id"]
113
114 # Mock inference request
115 inference_data = {
116 "prompt": "Tell me a story about a dragon"
117 }
118
119 # This will fail in the test environment without mocking
120 # But we can test the API structure
121 try:
122 inference_response = client.post(
123 f"/api/v1/inference/gpt-model?context_id={context_id}",
124 json=inference_data
125 )
126 except Exception as e:
127 # Expected to fail without proper mock
128 pass
129
130if __name__ == "__main__":
131 unittest.main()

Integration Tests

Create a script to test the entire system:

python
1# integration_test.py
2import asyncio
3import httpx
4import json
5import time
6from typing import Dict, Any
7
8async def test_full_workflow():
9 """Test the entire MCP workflow from context creation to inference"""
10 async with httpx.AsyncClient() as client:
11 print("Testing MCP Server Integration...")
12
13 # 1. Create a context
14 context_data = {
15 "user": {
16 "user_id": "integration_test_user",
17 "preferences": {
18 "language": "en",
19 "creativity": "high",
20 "verbosity": "concise"
21 }
22 },
23 "environment": {
24 "deployment_environment": "test",
25 "server_load": 0.2
26 },
27 "model": {
28 "model_id": "gpt-model",
29 "model_version": "1.0.0",
30 "parameters": {
31 "temperature": 0.7,
32 "max_tokens": 100
33 }
34 },
35 "data": {
36 "data_source": "integration_test",
37 "data_quality_metrics": {
38 "noise_level": 0.1
39 }
40 }
41 }
42
43 print("Step 1: Creating context...")
44 response = await client.post(
45 "http://localhost:8080/api/v1/context",
46 json=context_data
47 )
48
49 assert response.status_code == 200, f"Failed to create context: {response.text}"
50 result = response.json()
51 context_id = result["context_id"]
52 print(f"Context created with ID: {context_id}")
53
54 # 2. Retrieve the context to verify
55 print("Step 2: Retrieving context...")
56 get_response = await client.get(f"http://localhost:8080/api/v1/context/{context_id}")
57 assert get_response.status_code == 200, f"Failed to retrieve context: {get_response.text}"
58 context = get_response.json()
59 assert context["user"]["user_id"] == "integration_test_user"
60 print("Context retrieved successfully")
61
62 # 3. Update the context with new preferences
63 print("Step 3: Updating context...")
64 update_data = {
65 "user": {
66 "user_id": "integration_test_user",
67 "preferences": {
68 "language": "en",
69 "creativity": "low", # Changed from high to low
70 "verbosity": "concise"
71 }
72 }
73 }
74
75 update_response = await client.put(
76 f"http://localhost:8080/api/v1/context/{context_id}",
77 json=update_data
78 )
79
80 assert update_response.status_code == 200, f"Failed to update context: {update_response.text}"
81 print("Context updated successfully")
82
83 # 4. Run model inference with context
84 print("Step 4: Running model inference with context...")
85 inference_data = {
86 "prompt": "Write a short poem about artificial intelligence"
87 }
88
89 inference_response = await client.post(
90 f"http://localhost:8080/api/v1/inference/gpt-model?context_id={context_id}",
91 json=inference_data
92 )
93
94 # This might fail in a test environment without actual models
95 # For a real test, check the response structure
96 if inference_response.status_code == 200:
97 result = inference_response.json()
98 print(f"Model response: {result}")
99 else:
100 print(f"Inference failed (expected in test environment): {inference_response.text}")
101
102 # 5. Verify context was updated after inference
103 print("Step 5: Verifying context update after inference...")
104 final_get_response = await client.get(f"http://localhost:8080/api/v1/context/{context_id}")
105 final_context = final_get_response.json()
106
107 # Check if session history was updated
108 if "session_history" in final_context["user"]:
109 history = final_context["user"]["session_history"]
110 if history:
111 print(f"Session history updated: {len(history)} interactions recorded")
112 else:
113 print("Session history exists but no interactions recorded")
114 else:
115 print("No session history found in context")
116
117 print("Integration test completed successfully!")
118
119if __name__ == "__main__":
120 asyncio.run(test_full_workflow())

Performance Testing

Create a script to test performance:

python
1# performance_test.py
2import asyncio
3import httpx
4import time
5import random
6import statistics
7from typing import List, Dict, Any
8import uuid
9
10async def create_context(client: httpx.AsyncClient, user_id: str) -> str:
11 """Create a context and return its ID"""
12 context_data = {
13 "user": {
14 "user_id": user_id,
15 "preferences": {
16 "language": "en",
17 "creativity": random.choice(["high", "medium", "low"])
18 }
19 },
20 "model": {
21 "model_id": "gpt-model",
22 "model_version": "1.0.0"
23 }
24 }
25
26 response = await client.post("http://localhost:8080/api/v1/context", json=context_data)
27 if response.status_code != 200:
28 raise Exception(f"Failed to create context: {response.text}")
29
30 return response.json()["context_id"]
31
32async def run_inference(client: httpx.AsyncClient, context_id: str, prompt: str) -> float:
33 """Run inference and return time taken"""
34 inference_data = {"prompt": prompt}
35
36 start_time = time.time()
37 response = await client.post(
38 f"http://localhost:8080/api/v1/inference/gpt-model?context_id={context_id}",
39 json=inference_data
40 )
41 end_time = time.time()
42
43 if response.status_code != 200:
44 print(f"Inference failed: {response.text}")
45
46 return end_time - start_time
47
48async def performance_test(num_users: int, requests_per_user: int):
49 """Run performance test with multiple simulated users"""
50 async with httpx.AsyncClient() as client:
51 print(f"Starting performance test with {num_users} users, {requests_per_user} requests each")
52
53 # Create contexts for all users
54 print("Creating contexts...")
55 context_ids = []
56 for i in range(num_users):
57 user_id = f"perf_test_user_{uuid.uuid4()}"
58 context_id = await create_context(client, user_id)
59 context_ids.append(context_id)
60
61 # Test prompts
62 prompts = [
63 "Tell me a story about a robot",
64 "Explain quantum computing",
65 "Write a poem about the ocean",
66 "Give me a recipe for chocolate cake",
67 "Describe the solar system"
68 ]
69
70 # Run inference requests concurrently
71 print("Running inference requests...")
72 tasks = []
73 for i in range(num_users):
74 for j in range(requests_per_user):
75 prompt = random.choice(prompts)
76 tasks.append(run_inference(client, context_ids[i], prompt))
77
78 # Gather results
79 results = await asyncio.gather(*tasks, return_exceptions=True)
80
81 # Calculate statistics
82 successful_times = [t for t in results if isinstance(t, float)]
83 errors = [e for e in results if isinstance(e, Exception)]
84
85 if successful_times:
86 avg_time = statistics.mean(successful_times)
87 min_time = min(successful_times)
88 max_time = max(successful_times)
89 p95_time = sorted(successful_times)[int(len(successful_times) * 0.95)]
90
91 print(f"Performance results:")
92 print(f" Total requests: {len(results)}")
93 print(f" Successful: {len(successful_times)}")
94 print(f" Failed: {len(errors)}")
95 print(f" Average response time: {avg_time:.4f}s")
96 print(f" Min response time: {min_time:.4f}s")
97 print(f" Max response time: {max_time:.4f}s")
98 print(f" 95th percentile: {p95_time:.4f}s")
99 print(f" Requests per second: {len(successful_times) / sum(successful_times):.2f}")
100 else:
101 print("No successful requests to analyze")
102
103if __name__ == "__main__":
104 asyncio.run(performance_test(num_users=10, requests_per_user=5))

6. Scaling and Optimization

Scaling with Kubernetes

Create Kubernetes deployment files for your MCP server:

yaml
1# kubernetes/mcp-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 name: mcp-server
6 labels:
7 app: mcp-server
8spec:
9 replicas: 3
10 selector:
11 matchLabels:
12 app: mcp-server
13 template:
14 metadata:
15 labels:
16 app: mcp-server
17 spec:
18 containers:
19 - name: mcp-server
20 image: mcp-server:latest
21 ports:
22 - containerPort: 8080
23 resources:
24 requests:
25 memory: "1Gi"
26 cpu: "500m"
27 limits:
28 memory: "2Gi"
29 cpu: "1000m"
30 env:
31 - name: DATABASE_URL
32 valueFrom:
33 secretKeyRef:
34 name: mcp-secrets
35 key: database-url
36 - name: REDIS_HOST
37 value: "redis-service"
38 livenessProbe:
39 httpGet:
40 path: /health
41 port: 8080
42 initialDelaySeconds: 30
43 periodSeconds: 10
44 readinessProbe:
45 httpGet:
46 path: /ready
47 port: 8080
48 initialDelaySeconds: 5
49 periodSeconds: 5
50---
51apiVersion: v1
52kind: Service
53metadata:
54 name: mcp-server-service
55spec:
56 selector:
57 app: mcp-server
58 ports:
59 - port: 80
60 targetPort: 8080
61 type: ClusterIP
62---
63apiVersion: autoscaling/v2
64kind: HorizontalPodAutoscaler
65metadata:
66 name: mcp-server-hpa
67spec:
68 scaleTargetRef:
69 apiVersion: apps/v1
70 kind: Deployment
71 name: mcp-server
72 minReplicas: 3
73 maxReplicas: 10
74 metrics:
75 - type: Resource
76 resource:
77 name: cpu
78 target:
79 type: Utilization
80 averageUtilization: 70
81 - type: Resource
82 resource:
83 name: memory
84 target:
85 type: Utilization
86 averageUtilization: 80

Database Scaling

yaml
1# kubernetes/database-statefulset.yaml
2apiVersion: apps/v1
3kind: StatefulSet
4metadata:
5 name: postgres
6spec:
7 serviceName: "postgres"
8 replicas: 1
9 selector:
10 matchLabels:
11 app: postgres
12 template:
13 metadata:
14 labels:
15 app: postgres
16 spec:
17 containers:
18 - name: postgres
19 image: postgres:14
20 ports:
21 - containerPort: 5432
22 name: postgres
23 env:
24 - name: POSTGRES_PASSWORD
25 valueFrom:
26 secretKeyRef:
27 name: mcp-secrets
28 key: postgres-password
29 - name: POSTGRES_USER
30 value: mcp_user
31 - name: POSTGRES_DB
32 value: mcp_context
33 volumeMounts:
34 - name: postgres-data
35 mountPath: /var/lib/postgresql/data
36 volumeClaimTemplates:
37 - metadata:
38 name: postgres-data
39 spec:
40 accessModes: [ "ReadWriteOnce" ]
41 resources:
42 requests:
43 storage: 10Gi
44---
45apiVersion: v1
46kind: Service
47metadata:
48 name: postgres
49spec:
50 selector:
51 app: postgres
52 ports:
53 - port: 5432
54 targetPort: 5432
55 clusterIP: None

Redis for Caching

yaml
1# kubernetes/redis-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 name: redis
6spec:
7 replicas: 1
8 selector:
9 matchLabels:
10 app: redis
11 template:
12 metadata:
13 labels:
14 app: redis
15 spec:
16 containers:
17 - name: redis
18 image: redis:6.2-alpine
19 ports:
20 - containerPort: 6379
21 resources:
22 requests:
23 memory: "256Mi"
24 cpu: "100m"
25 limits:
26 memory: "512Mi"
27 cpu: "200m"
28 volumeMounts:
29 - name: redis-data
30 mountPath: /data
31 volumes:
32 - name: redis-data
33 emptyDir: {}
34---
35apiVersion: v1
36kind: Service
37metadata:
38 name: redis-service
39spec:
40 selector:
41 app: redis
42 ports:
43 - port: 6379
44 targetPort: 6379

Optimizing Performance

Implement a request throttler:

python
1# throttler.py
2import time
3import asyncio
4from typing import Dict, Any, Callable, Awaitable
5import redis
6
7class RequestThrottler:
8 """Throttles requests to manage load"""
9
10 def __init__(self, redis_client: redis.Redis, limits: Dict[str, int]):
11 """
12 Initialize with Redis client and limits dict
13
14 limits: Dict mapping model IDs to requests per minute
15 """
16 self.redis = redis_client
17 self.limits = limits
18 self.default_limit = 60 # Default to 60 RPM
19
20 async def throttle(self, model_id: str) -> bool:
21 """
22 Check if request should be throttled
23
24 Returns:
25 True if request is allowed, False if it should be throttled
26 """
27 limit = self.limits.get(model_id, self.default_limit)
28 key = f"throttle:{model_id}:{int(time.time() / 60)}" # Key expires each minute
29
30 # Increment counter for this minute
31 current = self.redis.incr(key)
32
33 # Set expiry to ensure cleanup
34 if current == 1:
35 self.redis.expire(key, 120) # 2 minutes (to handle edge cases)
36
37 # Check if under limit
38 return current <= limit
39
40 async def with_throttling(
41 self,
42 model_id: str,
43 func: Callable[..., Awaitable[Any]],
44 *args: Any,
45 **kwargs: Any
46 ) -> Any:
47 """
48 Execute function with throttling
49
50 Raises:
51 Exception if throttled
52 """
53 if await self.throttle(model_id):
54 return await func(*args, **kwargs)
55 else:
56 # In a real implementation, you might queue the request instead
57 raise Exception(f"Request throttled for model {model_id}")

Add connection pooling for database:

python
1# database.py
2from sqlalchemy import create_engine
3from sqlalchemy.ext.declarative import declarative_base
4from sqlalchemy.orm import sessionmaker
5import os
6
7# Get DB URL from environment or use default
8DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://mcp_user:password@localhost/mcp_context")
9
10# Create engine with connection pooling
11engine = create_engine(
12 DATABASE_URL,
13 pool_size=20, # Maximum number of connections
14 max_overflow=10, # Allow 10 connections beyond pool_size
15 pool_timeout=30, # Wait up to 30 seconds for a connection
16 pool_recycle=1800, # Recycle connections after 30 minutes
17 pool_pre_ping=True # Check connection validity before using
18)
19
20SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
21Base = declarative_base()

7. Security and Maintenance

Authentication and Authorization

Implement JWT authentication:

python
1# auth.py
2from datetime import datetime, timedelta
3from typing import Optional, Dict, Any
4from fastapi import Depends, HTTPException, status
5from fastapi.security import OAuth2PasswordBearer
6from jose import JWTError, jwt
7from passlib.context import CryptContext
8from pydantic import BaseModel
9import os
10
11# Configuration
12SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-for-development")
13ALGORITHM = "HS256"
14ACCESS_TOKEN_EXPIRE_MINUTES = 30
15
16# Password hashing
17pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
18oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
19
20# Models
21class User(BaseModel):
22 username: str
23 email: Optional[str] = None
24 full_name: Optional[str] = None
25 disabled: Optional[bool] = None
26 role: str = "user" # "user", "admin", "model_developer"
27
28class UserInDB(User):
29 hashed_password: str
30
31class Token(BaseModel):
32 access_token: str
33 token_type: str
34
35class TokenData(BaseModel):
36 username: Optional[str] = None
37
38# User database - in production, this would be in a real database
39USERS_DB = {
40 "johndoe": {
41 "username": "johndoe",
42 "full_name": "John Doe",
43 "email": "johndoe@example.com",
44 "hashed_password": pwd_context.hash("secret"),
45 "disabled": False,
46 "role": "user"
47 },
48 "alice": {
49 "username": "alice",
50 "full_name": "Alice Smith",
51 "email": "alice@example.com",
52 "hashed_password": pwd_context.hash("password"),
53 "disabled": False,
54 "role": "admin"
55 }
56}
57
58# Authentication functions
59def verify_password(plain_password, hashed_password):
60 return pwd_context.verify(plain_password, hashed_password)
61
62def get_user(db, username: str):
63 if username in db:
64 user_dict = db[username]
65 return UserInDB(**user_dict)
66 return None
67
68def authenticate_user(fake_db, username: str, password: str):
69 user = get_user(fake_db, username)
70 if not user:
71 return False
72 if not verify_password(password, user.hashed_password):
73 return False
74 return user
75
76def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None):
77 to_encode = data.copy()
78 if expires_delta:
79 expire = datetime.utcnow() + expires_delta
80 else:
81 expire = datetime.utcnow() + timedelta(minutes=15)
82 to_encode.update({"exp": expire})
83 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
84 return encoded_jwt
85
86async def get_current_user(token: str = Depends(oauth2_scheme)):
87 credentials_exception = HTTPException(
88 status_code=status.HTTP_401_UNAUTHORIZED,
89 detail="Could not validate credentials",
90 headers={"WWW-Authenticate": "Bearer"},
91 )
92 try:
93 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
94 username: str = payload.get("sub")
95 if username is None:
96 raise credentials_exception
97 token_data = TokenData(username=username)
98 except JWTError:
99 raise credentials_exception
100 user = get_user(USERS_DB, username=token_data.username)
101 if user is None:
102 raise credentials_exception
103 return user
104
105async def get_current_active_user(current_user: User = Depends(get_current_user)):
106 if current_user.disabled:
107 raise HTTPException(status_code=400, detail="Inactive user")
108 return current_user
109
110def has_role(required_role: str):
111 async def role_checker(current_user: User = Depends(get_current_active_user)):
112 if current_user.role != required_role and current_user.role != "admin":
113 raise HTTPException(
114 status_code=status.HTTP_403_FORBIDDEN,
115 detail=f"Operation requires role: {required_role}"
116 )
117 return current_user
118 return role_checker

Update your app to use authentication:

python
1# Update app.py to include authentication
2
3from fastapi import FastAPI, Depends, HTTPException, status
4from fastapi.security import OAuth2PasswordRequestForm
5from datetime import timedelta
6from typing import Dict, Any
7
8from auth import (
9 User, Token, authenticate_user, create_access_token,
10 ACCESS_TOKEN_EXPIRE_MINUTES, get_current_active_user, has_role, USERS_DB
11)
12
13# ... other imports as before
14
15app = FastAPI(title="MCP Server")
16
17@app.post("/token", response_model=Token)
18async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
19 user = authenticate_user(USERS_DB, form_data.username, form_data.password)
20 if not user:
21 raise HTTPException(
22 status_code=status.HTTP_401_UNAUTHORIZED,
23 detail="Incorrect username or password",
24 headers={"WWW-Authenticate": "Bearer"},
25 )
26 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
27 access_token = create_access_token(
28 data={"sub": user.username}, expires_delta=access_token_expires
29 )
30 return {"access_token": access_token, "token_type": "bearer"}
31
32@app.get("/users/me/", response_model=User)
33async def read_users_me(current_user: User = Depends(get_current_active_user)):
34 return current_user
35
36# Now protect your API endpoints with authentication
37@app.post("/api/v1/context", response_model=Dict[str, Any])
38def create_context(
39 context: MCPContext,
40 db: Session = Depends(get_db),
41 current_user: User = Depends(get_current_active_user)
42):
43 """Create a new context record (requires authentication)"""
44 context_id = context_store.save_context(db, context)
45 return {"context_id": context_id, "status": "created"}
46
47# Admin-only endpoint
48@app.delete("/api/v1/context/{context_id}")
49def delete_context(
50 context_id: str,
51 db: Session = Depends(get_db),
52 current_user: User = Depends(has_role("admin"))
53):
54 """Delete a context (admin only)"""
55 success = context_store.delete_context(db, context_id)
56 if not success:
57 raise HTTPException(status_code=404, detail="Context not found")
58 return {"status": "deleted"}
59
60# ... other endpoints

Security Best Practices

Implement data encryption for sensitive context data:

python
1# encryption.py
2from cryptography.fernet import Fernet
3import os
4import json
5from typing import Dict, Any, Optional
6
7class ContextEncryption:
8 """Handles encryption of sensitive context data"""
9
10 def __init__(self, key_path: Optional[str] = None):
11 """Initialize with encryption key"""
12 if key_path and os.path.exists(key_path):
13 with open(key_path, "rb") as key_file:
14 self.key = key_file.read()
15 else:
16 # Generate a key and save it
17 self.key = Fernet.generate_key()
18 if key_path:
19 os.makedirs(os.path.dirname(key_path), exist_ok=True)
20 with open(key_path, "wb") as key_file:
21 key_file.write(self.key)
22
23 self.cipher = Fernet(self.key)
24
25 def encrypt_context(self, context: Dict[str, Any]) -> Dict[str, Any]:
26 """Encrypt sensitive parts of the context"""
27 # Create a deep copy to avoid modifying the original
28 encrypted_context = context.copy()
29
30 # Encrypt user data if present
31 if "user" in encrypted_context:
32 user_data = encrypted_context["user"]
33
34 # Encrypt user preferences
35 if "preferences" in user_data:
36 user_data["preferences"] = self._encrypt_data(user_data["preferences"])
37
38 # Encrypt demographics
39 if "demographics" in user_data:
40 user_data["demographics"] = self._encrypt_data(user_data["demographics"])
41
42 # Encrypt custom attributes
43 if "custom_attributes" in encrypted_context:
44 encrypted_context["custom_attributes"] = self._encrypt_data(
45 encrypted_context["custom_attributes"]
46 )
47
48 return encrypted_context
49
50 def decrypt_context(self, encrypted_context: Dict[str, Any]) -> Dict[str, Any]:
51 """Decrypt encrypted parts of the context"""
52 # Create a deep copy to avoid modifying the original
53 decrypted_context = encrypted_context.copy()
54
55 # Decrypt user data if present
56 if "user" in decrypted_context:
57 user_data = decrypted_context["user"]
58
59 # Decrypt user preferences
60 if "preferences" in user_data and isinstance(user_data["preferences"], str):
61 user_data["preferences"] = self._decrypt_data(user_data["preferences"])
62
63 # Decrypt demographics
64 if "demographics" in user_data and isinstance(user_data["demographics"], str):
65 user_data["demographics"] = self._decrypt_data(user_data["demographics"])
66
67 # Decrypt custom attributes
68 if "custom_attributes" in decrypted_context and isinstance(decrypted_context["custom_attributes"], str):
69 decrypted_context["custom_attributes"] = self._decrypt_data(
70 decrypted_context["custom_attributes"]
71 )
72
73 return decrypted_context
74
75 def _encrypt_data(self, data: Any) -> str:
76 """Encrypt any data by converting to JSON and encrypting"""
77 json_data = json.dumps(data)
78 encrypted_bytes = self.cipher.encrypt(json_data.encode('utf-8'))
79 return encrypted_bytes.decode('utf-8')
80
81 def _decrypt_data(self, encrypted_str: str) -> Any:
82 """Decrypt data and convert from JSON"""
83 decrypted_bytes = self.cipher.decrypt(encrypted_str.encode('utf-8'))
84 return json.loads(decrypted_bytes.decode('utf-8'))

Monitoring and Logging

python
1# monitoring.py
2import logging
3import time
4from functools import wraps
5from typing import Dict, Any, Callable, Optional
6import json
7import prometheus_client as prom
8from prometheus_client import Counter, Histogram, Gauge
9
10# Configure logging
11logging.basicConfig(
12 level=logging.INFO,
13 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
14)
15logger = logging.getLogger("mcp_server")
16
17# Prometheus metrics
18MODEL_REQUESTS = Counter(
19 'model_requests_total',
20 'Total model inference requests',
21 ['model_id', 'status']
22)
23
24CONTEXT_OPERATIONS = Counter(
25 'context_operations_total',
26 'Context CRUD operations',
27 ['operation']
28)
29
30RESPONSE_TIME = Histogram(
31 'response_time_seconds',
32 'Response time in seconds',
33 ['endpoint', 'method'],
34 buckets=(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)
35)
36
37ACTIVE_REQUESTS = Gauge(
38 'active_requests',
39 'Number of active requests',
40 ['endpoint']
41)
42
43def log_context_operation(operation: str, context_id: str, data: Optional[Dict[str, Any]] = None):
44 """Log context operations with standardized format"""
45 log_data = {
46 "operation": operation,
47 "context_id": context_id,
48 "timestamp": time.time()
49 }
50
51 if data:
52 # Exclude sensitive data from logging
53 sanitized_data = data.copy()
54 if "user" in sanitized_data and "preferences" in sanitized_data["user"]:
55 sanitized_data["user"]["preferences"] = "[REDACTED]"
56 if "user" in sanitized_data and "demographics" in sanitized_data["user"]:
57 sanitized_data["user"]["demographics"] = "[REDACTED]"
58
59 log_data["data"] = sanitized_data
60
61 logger.info(f"Context {operation}: {json.dumps(log_data)}")
62 CONTEXT_OPERATIONS.labels(operation=operation).inc()
63
64def log_model_request(model_id: str, status: str, context_id: Optional[str] = None,
65 input_data: Optional[Dict[str, Any]] = None,
66 response: Optional[Dict[str, Any]] = None,
67 error: Optional[str] = None):
68 """Log model inference requests"""
69 log_data = {
70 "model_id": model_id,
71 "status": status,
72 "timestamp": time.time()
73 }
74
75 if context_id:
76 log_data["context_id"] = context_id
77
78 if input_data:
79 # Sanitize input data for logging
80 log_data["input"] = "[DATA]"
81
82 if response and status == "success":
83 # Only log response structure, not content
84 log_data["response_type"] = type(response).__name__
85
86 if error:
87 log_data["error"] = error
88
89 logger.info(f"Model request: {json.dumps(log_data)}")
90 MODEL_REQUESTS.labels(model_id=model_id, status=status).inc()
91
92def timer_decorator(endpoint: str, method: str):
93 """Decorator to time and log endpoint execution"""
94 def decorator(func):
95 @wraps(func)
96 async def wrapper(*args, **kwargs):
97 start_time = time.time()
98
99 ACTIVE_REQUESTS.labels(endpoint=endpoint).inc()
100 try:
101 result = await func(*args, **kwargs)
102 status = "success"
103 except Exception as e:
104 logger.error(f"Error in {endpoint}: {str(e)}")
105 status = "error"
106 raise
107 finally:
108 ACTIVE_REQUESTS.labels(endpoint=endpoint).dec()
109
110 end_time = time.time()
111 execution_time = end_time - start_time
112
113 # Record metrics
114 RESPONSE_TIME.labels(endpoint=endpoint, method=method).observe(execution_time)
115
116 # Log request
117 logger.info(f"{method} {endpoint} - {status} - {execution_time:.4f}s")
118
119 return result
120 return wrapper
121 return decorator

System Health Checks

python
1# health.py
2from fastapi import APIRouter, Depends, HTTPException, status
3from sqlalchemy.orm import Session
4from sqlalchemy import text
5import redis
6import psutil
7import os
8from typing import Dict, Any
9
10from database import get_db
11
12health_router = APIRouter()
13
14@health_router.get("/health")
15async def health_check():
16 """Basic health check endpoint"""
17 return {"status": "ok"}
18
19@health_router.get("/ready")
20async def readiness_check(db: Session = Depends(get_db)):
21 """Readiness check - ensures database connection works"""
22 try:
23 # Execute a simple query to check if database is responsive
24 db.execute(text("SELECT 1"))
25 db_status = "ok"
26 except Exception as e:
27 db_status = f"error: {str(e)}"
28
29 try:
30 # Check Redis connection
31 r = redis.Redis(host=os.getenv("REDIS_HOST", "localhost"))
32 r.ping()
33 redis_status = "ok"
34 except Exception as e:
35 redis_status = f"error: {str(e)}"
36
37 # Get system resources
38 cpu_percent = psutil.cpu_percent()
39 memory = psutil.virtual_memory()
40 disk = psutil.disk_usage('/')
41
42 system_status = {
43 "cpu_percent": cpu_percent,
44 "memory_percent": memory.percent,
45 "disk_percent": disk.percent
46 }
47
48 # Determine overall status
49 if db_status == "ok" and redis_status == "ok":
50 status = "ready"
51 else:
52 status = "not ready"
53
54 return {
55 "status": status,
56 "database": db_status,
57 "redis": redis_status,
58 "system": system_status
59 }
60
61@health_router.get("/metrics")
62async def metrics():
63 """Endpoint for Prometheus metrics"""
64 from prometheus_client import generate_latest
65
66 # Generate metrics in Prometheus format
67 metrics_data = generate_latest()
68 return metrics_data

8. Best Practices and Pitfalls to Avoid

Best Practices for MCP Server Deployment

Context Management

  1. Structured Context Schema: Always use a well-defined, versioned schema for context data. This ensures compatibility across models and prevents unexpected behavior.

  2. Context Lifetime Management: Implement policies to expire context data after a certain period of inactivity to prevent context bloat:

python
1# Example context cleanup job (scheduled job)
2async def cleanup_stale_contexts():
3 """Remove contexts that haven't been used for X days"""
4 cutoff_date = datetime.utcnow() - timedelta(days=30)
5
6 async with async_session() as session:
7 # Find and delete old contexts
8 result = await session.execute(
9 delete(ContextRecord).where(ContextRecord.updated_at < cutoff_date)
10 )
11 await session.commit()
12
13 num_deleted = result.rowcount
14 logger.info(f"Cleaned up {num_deleted} stale contexts")
  1. Progressive Context Enhancement: Design your system to allow new context attributes to be added without breaking existing functionality.
python
1# Example of context schema versioning and migration
2class ContextMigration:
3 @staticmethod
4 def migrate_context(context_data, from_version, to_version):
5 """Migrate context data between schema versions"""
6 if from_version == 1 and to_version == 2:
7 # Add new fields for version 2
8 if "user" in context_data and "preferences" not in context_data["user"]:
9 context_data["user"]["preferences"] = {}
10
11 # Restructure existing fields
12 if "environment" in context_data and "location" in context_data["environment"]:
13 location = context_data["environment"].pop("location")
14 if "geography" not in context_data:
15 context_data["geography"] = {}
16 context_data["geography"]["location"] = location
17
18 # Update version
19 context_data["schema_version"] = 2
20
21 # Add more migration paths as needed
22 return context_data

Performance Optimization

  1. Efficient Context Retrieval: Ensure your database schema is properly indexed for quick context lookups:
sql
1-- Example SQL for adding indexes to the contexts table
2CREATE INDEX idx_contexts_user_id ON contexts ((user_context->>'user_id'));
3CREATE INDEX idx_contexts_updated_at ON contexts (updated_at);
  1. Implement Caching Layers: Use Redis or a similar in-memory store to cache frequently accessed contexts:
python
1# Example context caching implementation
2async def get_cached_context(context_id: str, db: Session):
3 """Get context with caching"""
4 # Try cache first
5 cache_key = f"context:{context_id}"
6 cached = redis_client.get(cache_key)
7
8 if cached:
9 return json.loads(cached)
10
11 # If not in cache, get from database
12 context = context_store.get_context(db, context_id)
13 if context:
14 # Store in cache with TTL
15 redis_client.setex(cache_key, 3600, json.dumps(context))
16
17 return context
  1. Batch Processing: For high-volume applications, implement batch processing for context updates:
python
1# Example batch context update
2async def batch_update_contexts(updates: List[Dict[str, Any]]):
3 """Update multiple contexts in a single transaction"""
4 async with async_session() as session:
5 async with session.begin():
6 for update in updates:
7 context_id = update["context_id"]
8 data = update["data"]
9
10 stmt = (
11 update(ContextRecord)
12 .where(ContextRecord.id == context_id)
13 .values(updated_at=datetime.utcnow(), **data)
14 )
15 await session.execute(stmt)

Security and Compliance

  1. Implement Role-Based Access Control: Ensure only authorized users can access context data:
python
1# Extend the has_role function to check context ownership
2def has_context_access(context_id: str):
3 async def context_access_checker(current_user: User = Depends(get_current_active_user),
4 db: Session = Depends(get_db)):
5 # Admin always has access
6 if current_user.role == "admin":
7 return current_user
8
9 # Get the context
10 context = context_store.get_context(db, context_id)
11 if not context:
12 raise HTTPException(status_code=404, detail="Context not found")
13
14 # Check if user owns this context
15 if (context["user"]["user_id"] != current_user.username and
16 current_user.role != "model_developer"):
17 raise HTTPException(
18 status_code=status.HTTP_403_FORBIDDEN,
19 detail="You don't have access to this context"
20 )
21
22 return current_user
23 return context_access_checker
  1. Implement Data Retention Policies: Ensure compliance with regulations like GDPR by implementing proper data retention:
python
1# Example GDPR compliance handler
2async def delete_user_data(user_id: str):
3 """Delete all contexts associated with a user"""
4 async with async_session() as session:
5 # Find contexts with this user
6 result = await session.execute(
7 select(ContextRecord)
8 .where(ContextRecord.user_context.contains({"user_id": user_id}))
9 )
10 contexts = result.scalars().all()
11
12 # Delete each context
13 for context in contexts:
14 await session.delete(context)
15
16 await session.commit()
17
18 return len(contexts)

Common Pitfalls to Avoid

Context Management Issues

  1. Overly Large Contexts: Contexts that grow unbounded can cause performance issues and increased latency:
python
1# Implement context size limits
2def validate_context_size(context):
3 """Check if context is within size limits"""
4 context_json = json.dumps(context)
5 size_kb = len(context_json) / 1024
6
7 if size_kb > 100: # Example: 100KB limit
8 logger.warning(f"Context size {size_kb:.2f}KB exceeds recommended limit of 100KB")
9
10 # Take action - could truncate history, remove less important fields, etc.
11 if "user" in context and "session_history" in context["user"]:
12 # Keep only last 10 interactions
13 context["user"]["session_history"] = context["user"]["session_history"][-10:]
14
15 return context
  1. Inconsistent Context Formats: Ensure all services create and consume context in consistent formats:
python
1# Use validation middleware
2@app.middleware("http")
3async def validate_context_middleware(request: Request, call_next):
4 """Validate context structure in requests"""
5 if request.url.path.startswith("/api/v1/context"):
6 body = await request.json()
7 try:
8 # Validate against schema
9 MCPContext(**body)
10 except ValidationError as e:
11 return JSONResponse(
12 status_code=422,
13 content={"detail": "Invalid context format", "errors": e.errors()}
14 )
15
16 response = await call_next(request)
17 return response

Performance Pitfalls

  1. N+1 Query Problems: Avoid making multiple database queries in loops:
python
1# BAD EXAMPLE - DON'T DO THIS
2async def get_multiple_contexts(context_ids: List[str]):
3 contexts = []
4 for context_id in context_ids:
5 # N+1 problem - one query per context
6 context = await get_context(context_id)
7 contexts.append(context)
8 return contexts
9
10# GOOD EXAMPLE - DO THIS INSTEAD
11async def get_multiple_contexts_efficiently(context_ids: List[str]):
12 # Single query to get all contexts
13 async with async_session() as session:
14 result = await session.execute(
15 select(ContextRecord)
16 .where(ContextRecord.id.in_(context_ids))
17 )
18 contexts = result.scalars().all()
19
20 # Process results
21 return [context.to_dict() for context in contexts]
  1. Synchronous I/O in Async Code: Ensure all I/O operations are properly async:
python
1# BAD EXAMPLE - DON'T DO THIS
2async def get_model_result(model_id: str, input_data: Dict[str, Any]):
3 # Blocking I/O in async function!
4 with open(f"/models/{model_id}/config.json", "r") as f:
5 config = json.load(f)
6
7 # More async code...
8
9# GOOD EXAMPLE - DO THIS INSTEAD
10async def get_model_result_correctly(model_id: str, input_data: Dict[str, Any]):
11 # Use aiofiles for async file I/O
12 import aiofiles
13
14 async with aiofiles.open(f"/models/{model_id}/config.json", "r") as f:
15 content = await f.read()
16 config = json.loads(content)
17
18 # More async code...

Security Pitfalls

  1. Inadequate Input Validation: Always validate all inputs, especially context data:
python
1# Implement strict validation
2class UpdateContextRequest(BaseModel):
3 """Schema for context update requests"""
4 user: Optional[Dict[str, Any]] = None
5 environment: Optional[Dict[str, Any]] = None
6 model: Optional[Dict[str, Any]] = None
7 data: Optional[Dict[str, Any]] = None
8 custom_attributes: Optional[Dict[str, Any]] = None
9
10 class Config:
11 # Prevent additional fields
12 extra = "forbid"
  1. Using Passwords or Keys in Context: Never store sensitive authentication data in context:
python
1# Context sanitizer example
2def sanitize_context(context: Dict[str, Any]) -> Dict[str, Any]:
3 """Remove sensitive data from context"""
4 sanitized = context.copy()
5
6 # Remove any fields that might contain sensitive data
7 sensitive_fields = [
8 "password", "token", "key", "secret", "auth", "credential", "api_key"
9 ]
10
11 # Check in custom attributes
12 if "custom_attributes" in sanitized:
13 for field in sensitive_fields:
14 if field in sanitized["custom_attributes"]:
15 del sanitized["custom_attributes"][field]
16
17 # Check in user preferences
18 if "user" in sanitized and "preferences" in sanitized["user"]:
19 for field in sensitive_fields:
20 if field in sanitized["user"]["preferences"]:
21 del sanitized["user"]["preferences"][field]
22
23 return sanitized

9. Conclusion

MCP Server Benefits

Building your own MCP-compliant server provides several key advantages:

  1. Complete Control: With a custom MCP server, you have full control over how context is managed, stored, and utilized by your models.

  2. Tailored Performance: You can optimize performance based on your specific workloads and deployment environment, from resource allocation to caching strategies.

  3. Customized Security: Implement security measures that align with your organization's requirements and compliance needs.

  4. Integration Flexibility: Connect your MCP server to your existing systems, databases, and services with custom integrations.

  5. Cost Optimization: Avoid vendor lock-in and potentially reduce costs by implementing exactly what you need, especially for high-volume deployments.

Sovereign AI book cover

Sovereign AI: Building Local-First Intelligent Systems

by Daniel Kliewer · Paperback · 72 pages

The hands-on guide to building AI that runs on your hardware, keeps your data private, and eliminates cloud dependence. Working code included.