Complete Guide: Building Your Own Model Context Protocol (MCP) Server for AI Tool Integration and Communication
A comprehensive guide to building a Model Context Protocol (MCP) server for managing contextual information in machine learning systems, including implementation, scaling, security, and best practices for enterprise AI applications.
Daniel Kliewer
Author, Sovereign AI


Building Your Own Model Context Protocol (MCP) Server: Comprehensive Guide
1. Introduction to MCP
What is the Model Context Protocol (MCP)?
The Model Context Protocol (MCP) is a standardized framework for managing, transmitting, and utilizing contextual information in machine learning systems. At its core, MCP defines how context—the set of relevant information surrounding a model's operation—should be captured, structured, passed to models, and used during inference.
Unlike traditional ML deployment approaches where models operate as isolated black boxes, MCP creates an ecosystem where models are constantly aware of their operational environment, historical interactions, and user-specific requirements. This context-aware approach enables models to make more informed, personalized, and accurate predictions.
The Importance of Context Management
Context management addresses a fundamental limitation in traditional ML deployments: the assumption that a model's input alone contains all information needed for an optimal response. In reality, several contextual factors affect how a model should perform:
- Environmental context: Information about the deployment environment, including time, location, system resources, and operational constraints
- User context: User preferences, history, demographics, interaction patterns, and specific requirements
- Task context: The broader goal the model is helping to achieve, including prior steps in a multi-step process
- Data context: Information about the data's source, quality, recency, and potential biases
By managing this context effectively, MCP allows models to:
- Personalize responses based on user history
- Adapt to environmental changes
- Maintain conversation coherence across multiple interactions
- Understand the intent behind ambiguous requests
- Follow evolving guidelines or constraints
Benefits of MCP
Scalability
- Horizontal Scaling: MCP's standardized context format allows for seamless distribution of model workloads across multiple servers
- Decoupled Architecture: Context management can be scaled independently from model inference
- Stateless Design: Models can be spun up or down as needed without losing contextual information
Flexibility
- Model Interchangeability: Different models can access the same context data through a standardized interface
- Progressive Enhancement: New context attributes can be added without breaking existing functionality
- Context Filtering: Only relevant context is passed to each model, improving efficiency
Model Lifecycle Management
- Version Control: Context includes model version information, enabling graceful transitions between versions
- Performance Monitoring: Context tracking allows for detailed analysis of model behavior across different scenarios
- Continuous Improvement: Historical context enables targeted retraining based on actual usage patterns
2. Prerequisites for Building Your Own MCP Server
Hardware Requirements
Compute Resources
- CPU: Minimum 8 cores (16+ recommended for production), preferably server-grade processors like Intel Xeon or AMD EPYC
- GPU: For transformer-based models, NVIDIA GPUs with at least 16GB VRAM (A100, V100, or RTX 3090/4090); multiple GPUs recommended for high workloads
- Memory: 32GB RAM minimum (64-128GB recommended for production)
- Storage:
- 500GB+ SSD for OS and applications (NVMe preferred)
- 1TB+ storage for model artifacts and context data (scalable based on expected usage)
- High IOPS capability for context retrieval operations
Networking
- Bandwidth: 10Gbps+ network interfaces for high-throughput model serving
- Latency: Low-latency connections, especially if context data is stored separately from models
Software Requirements
Operating System
- Linux Distributions: Ubuntu 20.04/22.04 LTS or CentOS 8/9 (preferred for ML workloads)
- Windows: Windows Server 2019/2022 (if required by organizational constraints)
Containerization
- Docker: Engine 20.10+ for containerizing individual components
- Kubernetes: v1.24+ for orchestrating multi-container deployments
- Helm: For managing Kubernetes applications
Model Management
- TensorFlow Serving: For TensorFlow models
- TorchServe: For PyTorch models
- Triton Inference Server: For multi-framework model serving
- MLflow: For model lifecycle management
- KServe/Seldon Core: For Kubernetes-native model serving
Database Systems
- Vector Database: ChromaDB, Pinecone, or Milvus for storing and retrieving embeddings
- Relational Database: PostgreSQL 14+ for structured context data and metadata
- Redis: For high-speed context caching and session management
- MongoDB: For schema-flexible context storage
Networking and APIs
- REST Framework: FastAPI or Flask for creating REST endpoints
- gRPC: For high-performance internal communication
- Envoy/Istio: For API gateway and service mesh capabilities
- Protocol Buffers: For efficient data serialization
Monitoring and Logging
- Prometheus: For metrics collection
- Grafana: For metrics visualization
- Elasticsearch, Logstash, Kibana (ELK): For comprehensive logging
- Jaeger/Zipkin: For distributed tracing
3. Installation and Setup
Operating System Setup
bash1# Example for Ubuntu Server 22.04 LTS2# 1. Download Ubuntu Server ISO from ubuntu.com3# 2. Create bootable USB and install Ubuntu Server4# 3. Update system packages5sudo apt update && sudo apt upgrade -y67# 4. Install basic utilities8sudo apt install -y build-essential curl wget git software-properties-common
Docker Installation
bash1# Install Docker on Ubuntu2sudo apt install -y apt-transport-https ca-certificates curl gnupg lsb-release3curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg4echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null5sudo apt update6sudo apt install -y docker-ce docker-ce-cli containerd.io78# Add current user to docker group9sudo usermod -aG docker $USER1011# Verify installation12newgrp docker13docker --version
Kubernetes Setup
bash1# Install kubectl2curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"3sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl45# Install minikube for local development6curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd647sudo install minikube-linux-amd64 /usr/local/bin/minikube89# Start minikube10minikube start --driver=docker --memory=8g --cpus=41112# For production, consider using kubeadm or managed Kubernetes services
GPU Support
bash1# Install NVIDIA drivers2sudo apt install -y nvidia-driver-535 # Choose appropriate version34# Install NVIDIA Container Toolkit5distribution=$(. /etc/os-release;echo $ID$VERSION_ID)6curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -7curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list8sudo apt update && sudo apt install -y nvidia-container-toolkit9sudo systemctl restart docker1011# Verify GPU is accessible to Docker12docker run --gpus all nvidia/cuda:11.8.0-base-ubuntu22.04 nvidia-smi
Database Setup
bash1# PostgreSQL for structured context data2sudo apt install -y postgresql postgresql-contrib3sudo systemctl start postgresql4sudo systemctl enable postgresql56# Create database for MCP7sudo -u postgres psql -c "CREATE DATABASE mcp_context;"8sudo -u postgres psql -c "CREATE USER mcp_user WITH ENCRYPTED PASSWORD 'your_secure_password';"9sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE mcp_context TO mcp_user;"1011# Redis for caching12sudo apt install -y redis-server13sudo systemctl start redis-server14sudo systemctl enable redis-server1516# ChromaDB (Vector Database) using Docker17docker run -d -p 8000:8000 --name chromadb chromadb/chroma
Setting Up Model Serving Infrastructure
bash1# TensorFlow Serving with Docker2docker pull tensorflow/serving:latest34# TorchServe5git clone https://github.com/pytorch/serve.git6cd serve7docker build -t torchserve:latest .89# Triton Inference Server10docker pull nvcr.io/nvidia/tritonserver:22.12-py31112# MLflow13pip install mlflow14mlflow server --host 0.0.0.0 --port 5000
4. Configuring the Model Context Protocol
Defining Context Parameters
The MCP server needs to track various context parameters. Create a schema that includes:
python1# Example context schema (context_schema.py)2from pydantic import BaseModel, Field3from typing import Dict, List, Optional, Any4from datetime import datetime5import uuid67class UserContext(BaseModel):8 user_id: str9 preferences: Dict[str, Any] = {}10 session_history: List[str] = []11 demographics: Optional[Dict[str, Any]] = None1213class EnvironmentContext(BaseModel):14 timestamp: datetime = Field(default_factory=datetime.now)15 deployment_environment: str = "production" # or "staging", "development"16 server_load: float = 0.017 available_resources: Dict[str, float] = {}1819class ModelContext(BaseModel):20 model_id: str21 model_version: str22 parameters: Dict[str, Any] = {}23 constraints: Dict[str, Any] = {}2425class DataContext(BaseModel):26 data_source: str = "unknown"27 data_timestamp: Optional[datetime] = None28 data_quality_metrics: Dict[str, float] = {}2930class MCPContext(BaseModel):31 context_id: str = Field(default_factory=lambda: str(uuid.uuid4()))32 user: UserContext33 environment: EnvironmentContext = Field(default_factory=EnvironmentContext)34 model: ModelContext35 data: DataContext = Field(default_factory=DataContext)36 custom_attributes: Dict[str, Any] = {}
API Integration
Create a FastAPI server to handle MCP context:
python1# app.py2from fastapi import FastAPI, Depends, HTTPException3from sqlalchemy.orm import Session4from typing import Dict, Any56from context_schema import MCPContext7from database import SessionLocal, engine, Base8import context_store9import model_manager1011# Initialize database12Base.metadata.create_all(bind=engine)1314app = FastAPI(title="MCP Server")1516# Dependency17def get_db():18 db = SessionLocal()19 try:20 yield db21 finally:22 db.close()2324@app.post("/api/v1/context", response_model=Dict[str, Any])25def create_context(context: MCPContext, db: Session = Depends(get_db)):26 """Create a new context record"""27 context_id = context_store.save_context(db, context)28 return {"context_id": context_id, "status": "created"}2930@app.get("/api/v1/context/{context_id}")31def get_context(context_id: str, db: Session = Depends(get_db)):32 """Retrieve a specific context by ID"""33 context = context_store.get_context(db, context_id)34 if not context:35 raise HTTPException(status_code=404, detail="Context not found")36 return context3738@app.post("/api/v1/inference/{model_id}")39async def model_inference(40 model_id: str,41 input_data: Dict[str, Any],42 context_id: str = None,43 db: Session = Depends(get_db)44):45 """Run model inference with context"""46 context = None47 if context_id:48 context = context_store.get_context(db, context_id)49 if not context:50 raise HTTPException(status_code=404, detail="Context not found")5152 # Get model and run inference53 result = await model_manager.run_inference(model_id, input_data, context)5455 # Update context with this interaction if needed56 if context_id:57 context_store.update_context_after_inference(db, context_id, input_data, result)5859 return result6061@app.put("/api/v1/context/{context_id}")62def update_context(63 context_id: str,64 updates: Dict[str, Any],65 db: Session = Depends(get_db)66):67 """Update specific fields in the context"""68 success = context_store.update_context(db, context_id, updates)69 if not success:70 raise HTTPException(status_code=404, detail="Context not found")71 return {"status": "updated"}7273if __name__ == "__main__":74 import uvicorn75 uvicorn.run(app, host="0.0.0.0", port=8080)
Context Handling Implementation
Create the context store:
python1# context_store.py2from sqlalchemy.orm import Session3from sqlalchemy import Column, String, JSON, DateTime4import json5from datetime import datetime6from typing import Dict, Any, Optional7import uuid89from database import Base1011class ContextRecord(Base):12 __tablename__ = "contexts"1314 id = Column(String, primary_key=True, index=True)15 user_context = Column(JSON)16 environment_context = Column(JSON)17 model_context = Column(JSON)18 data_context = Column(JSON)19 custom_attributes = Column(JSON)20 created_at = Column(DateTime, default=datetime.utcnow)21 updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)2223def save_context(db: Session, context_data) -> str:24 """Save context to database"""25 context_dict = context_data.dict()26 context_id = context_dict.get("context_id", str(uuid.uuid4()))2728 db_context = ContextRecord(29 id=context_id,30 user_context=context_dict.get("user"),31 environment_context=context_dict.get("environment"),32 model_context=context_dict.get("model"),33 data_context=context_dict.get("data"),34 custom_attributes=context_dict.get("custom_attributes", {})35 )3637 db.add(db_context)38 db.commit()39 db.refresh(db_context)40 return context_id4142def get_context(db: Session, context_id: str) -> Optional[Dict[str, Any]]:43 """Retrieve context from database"""44 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()45 if not db_context:46 return None4748 return {49 "context_id": db_context.id,50 "user": db_context.user_context,51 "environment": db_context.environment_context,52 "model": db_context.model_context,53 "data": db_context.data_context,54 "custom_attributes": db_context.custom_attributes,55 "created_at": db_context.created_at,56 "updated_at": db_context.updated_at57 }5859def update_context(db: Session, context_id: str, updates: Dict[str, Any]) -> bool:60 """Update specific fields in the context"""61 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()62 if not db_context:63 return False6465 # Update appropriate fields based on the structure66 for key, value in updates.items():67 if key == "user":68 db_context.user_context = value69 elif key == "environment":70 db_context.environment_context = value71 elif key == "model":72 db_context.model_context = value73 elif key == "data":74 db_context.data_context = value75 elif key == "custom_attributes":76 db_context.custom_attributes = value7778 db_context.updated_at = datetime.utcnow()79 db.commit()80 return True8182def update_context_after_inference(83 db: Session,84 context_id: str,85 input_data: Dict[str, Any],86 result: Dict[str, Any]87) -> bool:88 """Update context after model inference"""89 db_context = db.query(ContextRecord).filter(ContextRecord.id == context_id).first()90 if not db_context:91 return False9293 # Add this interaction to user history94 user_context = db_context.user_context95 if "session_history" not in user_context:96 user_context["session_history"] = []9798 # Add interaction record99 user_context["session_history"].append({100 "timestamp": datetime.utcnow().isoformat(),101 "input": input_data,102 "output": result103 })104105 # Limit history size106 if len(user_context["session_history"]) > 100: # Example limit107 user_context["session_history"] = user_context["session_history"][-100:]108109 db_context.user_context = user_context110 db_context.updated_at = datetime.utcnow()111 db.commit()112 return True
Model Manager Implementation
python1# model_manager.py2import os3import json4import asyncio5import httpx6from typing import Dict, Any, Optional7import numpy as np8import tensorflow as tf9import torch10import redis1112# Redis client for caching13redis_client = redis.Redis(host='localhost', port=6379, db=0)1415# Model registry - in production this would be a database16MODEL_REGISTRY = {17 "gpt-model": {18 "type": "http",19 "endpoint": "http://localhost:8001/v1/models/gpt:predict",20 "version": "1.0.0"21 },22 "bert-embedding": {23 "type": "tensorflow",24 "path": "/models/bert",25 "version": "2.1.0"26 },27 "image-classifier": {28 "type": "pytorch",29 "path": "/models/image_classifier.pt",30 "version": "1.2.0"31 }32}3334async def run_inference(model_id: str, input_data: Dict[str, Any], context: Optional[Dict[str, Any]] = None):35 """Run model inference with context awareness"""36 if model_id not in MODEL_REGISTRY:37 raise ValueError(f"Model {model_id} not found in registry")3839 model_info = MODEL_REGISTRY[model_id]4041 # Prepare input with context42 inference_input = prepare_input_with_context(model_id, input_data, context)4344 # Check cache for identical request if appropriate45 cache_key = None46 if model_info.get("cacheable", False):47 cache_key = f"{model_id}:{hash(json.dumps(inference_input, sort_keys=True))}"48 cached_result = redis_client.get(cache_key)49 if cached_result:50 return json.loads(cached_result)5152 # Run model based on type53 if model_info["type"] == "http":54 result = await http_inference(model_info["endpoint"], inference_input)55 elif model_info["type"] == "tensorflow":56 result = tf_inference(model_info["path"], inference_input)57 elif model_info["type"] == "pytorch":58 result = pytorch_inference(model_info["path"], inference_input)59 else:60 raise ValueError(f"Unsupported model type: {model_info['type']}")6162 # Store in cache if appropriate63 if cache_key:64 redis_client.setex(65 cache_key,66 model_info.get("cache_ttl", 3600), # Default 1 hour TTL67 json.dumps(result)68 )6970 return result7172def prepare_input_with_context(model_id: str, input_data: Dict[str, Any], context: Optional[Dict[str, Any]]):73 """Prepare model input with relevant context information"""74 if not context:75 return input_data7677 # Deep copy to avoid modifying original78 enhanced_input = input_data.copy()7980 # Add context based on model requirements81 if model_id == "gpt-model":82 # For a GPT-like model, we might include conversation history83 if "user" in context and "session_history" in context["user"]:84 # Format history appropriately for the model85 history = context["user"]["session_history"][-5:] # Last 5 interactions86 enhanced_input["conversation_history"] = history8788 # Add user preferences if available89 if "user" in context and "preferences" in context["user"]:90 enhanced_input["user_preferences"] = context["user"]["preferences"]9192 elif model_id == "bert-embedding":93 # For embeddings, maybe we add language preference94 if "user" in context and "preferences" in context["user"]:95 enhanced_input["language"] = context["user"]["preferences"].get("language", "en")9697 # Add model-specific parameters from context98 if "model" in context and "parameters" in context["model"]:99 enhanced_input["parameters"] = context["model"]["parameters"]100101 # Add environmental context if relevant102 if "environment" in context:103 enhanced_input["environment"] = {104 "timestamp": context["environment"].get("timestamp"),105 "deployment": context["environment"].get("deployment_environment")106 }107108 return enhanced_input109110async def http_inference(endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:111 """Call a model exposed via HTTP endpoint"""112 async with httpx.AsyncClient() as client:113 response = await client.post(endpoint, json=data)114 response.raise_for_status()115 return response.json()116117def tf_inference(model_path: str, data: Dict[str, Any]) -> Dict[str, Any]:118 """Run inference on a TensorFlow model"""119 # Load model (in production, this would be cached)120 model = tf.saved_model.load(model_path)121122 # Prepare tensors123 input_tensors = {}124 for key, value in data.items():125 if key != "parameters" and key != "environment":126 if isinstance(value, list):127 input_tensors[key] = tf.constant(value)128 else:129 input_tensors[key] = tf.constant([value])130131 # Run inference132 results = model.signatures["serving_default"](**input_tensors)133134 # Convert results to Python types135 output = {}136 for key, tensor in results.items():137 output[key] = tensor.numpy().tolist()138139 return output140141def pytorch_inference(model_path: str, data: Dict[str, Any]) -> Dict[str, Any]:142 """Run inference on a PyTorch model"""143 # Load model (in production, this would be cached)144 model = torch.load(model_path)145 model.eval()146147 # Prepare tensors148 input_tensors = {}149 for key, value in data.items():150 if key != "parameters" and key != "environment":151 if isinstance(value, list):152 input_tensors[key] = torch.tensor(value)153 else:154 input_tensors[key] = torch.tensor([value])155156 # Run inference157 with torch.no_grad():158 results = model(**input_tensors)159160 # Convert results to Python types161 if isinstance(results, tuple):162 output = {}163 for i, result in enumerate(results):164 output[f"output_{i}"] = result.numpy().tolist()165 else:166 output = {"output": results.numpy().tolist()}167168 return output
Contextual Adaptation
Implement a context adapter that modifies model behavior based on context:
python1# context_adapter.py2from typing import Dict, Any, List, Optional34class ContextAdapter:5 """Adapts model behavior based on context"""67 @staticmethod8 def adapt_model_parameters(model_id: str, default_params: Dict[str, Any],9 context: Optional[Dict[str, Any]]) -> Dict[str, Any]:10 """Modify model parameters based on context"""11 if not context:12 return default_params1314 params = default_params.copy()1516 # User-specific adaptations17 if "user" in context:18 user = context["user"]1920 # Adapt language model temperature based on user preference21 if model_id.startswith("gpt") and "preferences" in user:22 if "creativity" in user["preferences"]:23 creativity = user["preferences"]["creativity"]24 # Map creativity preference to temperature25 if creativity == "high":26 params["temperature"] = max(params.get("temperature", 0.7), 0.9)27 elif creativity == "low":28 params["temperature"] = min(params.get("temperature", 0.7), 0.3)2930 # Adapt response length based on user preference31 if "preferences" in user and "verbosity" in user["preferences"]:32 verbosity = user["preferences"]["verbosity"]33 if verbosity == "concise":34 params["max_tokens"] = min(params.get("max_tokens", 1024), 256)35 elif verbosity == "detailed":36 params["max_tokens"] = max(params.get("max_tokens", 1024), 1024)3738 # Environment-specific adaptations39 if "environment" in context:40 env = context["environment"]4142 # Reduce complexity under high server load43 if "server_load" in env and env["server_load"] > 0.8:44 params["max_tokens"] = min(params.get("max_tokens", 1024), 512)45 if "top_k" in params:46 params["top_k"] = min(params["top_k"], 10)4748 # Adapt based on deployment environment49 if "deployment_environment" in env:50 if env["deployment_environment"] == "development":51 # More logging in development52 params["verbose"] = True53 elif env["deployment_environment"] == "production":54 # Safer settings in production55 params["safety_filter"] = True5657 # Data-specific adaptations58 if "data" in context and "data_quality_metrics" in context["data"]:59 quality = context["data"]["data_quality_metrics"]6061 # If input data quality is low, be more conservative62 if "noise_level" in quality and quality["noise_level"] > 0.6:63 params["temperature"] = min(params.get("temperature", 0.7), 0.4)64 if "top_p" in params:65 params["top_p"] = min(params["top_p"], 0.92)6667 # Model-specific adaptations from context68 if "model" in context and "parameters" in context["model"]:69 # Explicit parameter overrides from context70 for k, v in context["model"]["parameters"].items():71 params[k] = v7273 return params7475 @staticmethod76 def adapt_response(model_id: str, response: Any,77 context: Optional[Dict[str, Any]]) -> Any:78 """Post-process model response based on context"""79 if not context:80 return response8182 # For text responses83 if isinstance(response, str) or (isinstance(response, dict) and "text" in response):84 text = response if isinstance(response, str) else response["text"]8586 # Apply user language preference87 if "user" in context and "preferences" in context["user"]:88 prefs = context["user"]["preferences"]89 if "language" in prefs and prefs["language"] != "en":90 # In a real system, this would call a translation service91 pass9293 # Apply formality preference94 if "formality" in prefs:95 if prefs["formality"] == "formal" and not text.startswith("Dear"):96 text = "I would like to inform you that " + text97 elif prefs["formality"] == "casual" and text.startswith("Dear"):98 text = text.replace("Dear", "Hey").replace("Sincerely", "Cheers")99100 # Format response appropriately101 if isinstance(response, dict):102 response["text"] = text103 else:104 response = text105106 return response
5. Testing and Validation
Unit Tests
Create a test suite to validate the MCP server:
python1# test_mcp_server.py2import unittest3import json4from fastapi.testclient import TestClient5from sqlalchemy import create_engine6from sqlalchemy.orm import sessionmaker7from sqlalchemy.pool import StaticPool89from app import app, get_db10from database import Base11from context_schema import MCPContext, UserContext, ModelContext1213# Create in-memory database for testing14engine = create_engine(15 "sqlite:///:memory:",16 connect_args={"check_same_thread": False},17 poolclass=StaticPool,18)19TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)20Base.metadata.create_all(bind=engine)2122def override_get_db():23 try:24 db = TestingSessionLocal()25 yield db26 finally:27 db.close()2829app.dependency_overrides[get_db] = override_get_db30client = TestClient(app)3132class TestMCPServer(unittest.TestCase):33 def test_create_context(self):34 """Test creating a new context"""35 context_data = {36 "user": {37 "user_id": "test_user_1",38 "preferences": {"language": "en"}39 },40 "model": {41 "model_id": "gpt-model",42 "model_version": "1.0.0"43 }44 }4546 response = client.post("/api/v1/context", json=context_data)47 self.assertEqual(response.status_code, 200)48 result = response.json()49 self.assertIn("context_id", result)50 self.assertEqual(result["status"], "created")5152 # Verify we can retrieve it53 context_id = result["context_id"]54 get_response = client.get(f"/api/v1/context/{context_id}")55 self.assertEqual(get_response.status_code, 200)56 context = get_response.json()57 self.assertEqual(context["user"]["user_id"], "test_user_1")5859 def test_update_context(self):60 """Test updating an existing context"""61 # First create a context62 context_data = {63 "user": {64 "user_id": "test_user_2",65 "preferences": {"language": "en"}66 },67 "model": {68 "model_id": "gpt-model",69 "model_version": "1.0.0"70 }71 }7273 response = client.post("/api/v1/context", json=context_data)74 context_id = response.json()["context_id"]7576 # Now update it77 update_data = {78 "user": {79 "user_id": "test_user_2",80 "preferences": {"language": "fr", "formality": "formal"}81 }82 }8384 update_response = client.put(f"/api/v1/context/{context_id}", json=update_data)85 self.assertEqual(update_response.status_code, 200)8687 # Verify the update88 get_response = client.get(f"/api/v1/context/{context_id}")89 context = get_response.json()90 self.assertEqual(context["user"]["preferences"]["language"], "fr")91 self.assertEqual(context["user"]["preferences"]["formality"], "formal")9293 def test_model_inference_with_context(self):94 """Test model inference with context (mocked)"""95 # This would be a more complex mock in a real test96 # For now, we'll just verify the API structure9798 # First create a context99 context_data = {100 "user": {101 "user_id": "test_user_3",102 "preferences": {"language": "en", "creativity": "high"}103 },104 "model": {105 "model_id": "gpt-model",106 "model_version": "1.0.0",107 "parameters": {"max_tokens": 100}108 }109 }110111 response = client.post("/api/v1/context", json=context_data)112 context_id = response.json()["context_id"]113114 # Mock inference request115 inference_data = {116 "prompt": "Tell me a story about a dragon"117 }118119 # This will fail in the test environment without mocking120 # But we can test the API structure121 try:122 inference_response = client.post(123 f"/api/v1/inference/gpt-model?context_id={context_id}",124 json=inference_data125 )126 except Exception as e:127 # Expected to fail without proper mock128 pass129130if __name__ == "__main__":131 unittest.main()
Integration Tests
Create a script to test the entire system:
python1# integration_test.py2import asyncio3import httpx4import json5import time6from typing import Dict, Any78async def test_full_workflow():9 """Test the entire MCP workflow from context creation to inference"""10 async with httpx.AsyncClient() as client:11 print("Testing MCP Server Integration...")1213 # 1. Create a context14 context_data = {15 "user": {16 "user_id": "integration_test_user",17 "preferences": {18 "language": "en",19 "creativity": "high",20 "verbosity": "concise"21 }22 },23 "environment": {24 "deployment_environment": "test",25 "server_load": 0.226 },27 "model": {28 "model_id": "gpt-model",29 "model_version": "1.0.0",30 "parameters": {31 "temperature": 0.7,32 "max_tokens": 10033 }34 },35 "data": {36 "data_source": "integration_test",37 "data_quality_metrics": {38 "noise_level": 0.139 }40 }41 }4243 print("Step 1: Creating context...")44 response = await client.post(45 "http://localhost:8080/api/v1/context",46 json=context_data47 )4849 assert response.status_code == 200, f"Failed to create context: {response.text}"50 result = response.json()51 context_id = result["context_id"]52 print(f"Context created with ID: {context_id}")5354 # 2. Retrieve the context to verify55 print("Step 2: Retrieving context...")56 get_response = await client.get(f"http://localhost:8080/api/v1/context/{context_id}")57 assert get_response.status_code == 200, f"Failed to retrieve context: {get_response.text}"58 context = get_response.json()59 assert context["user"]["user_id"] == "integration_test_user"60 print("Context retrieved successfully")6162 # 3. Update the context with new preferences63 print("Step 3: Updating context...")64 update_data = {65 "user": {66 "user_id": "integration_test_user",67 "preferences": {68 "language": "en",69 "creativity": "low", # Changed from high to low70 "verbosity": "concise"71 }72 }73 }7475 update_response = await client.put(76 f"http://localhost:8080/api/v1/context/{context_id}",77 json=update_data78 )7980 assert update_response.status_code == 200, f"Failed to update context: {update_response.text}"81 print("Context updated successfully")8283 # 4. Run model inference with context84 print("Step 4: Running model inference with context...")85 inference_data = {86 "prompt": "Write a short poem about artificial intelligence"87 }8889 inference_response = await client.post(90 f"http://localhost:8080/api/v1/inference/gpt-model?context_id={context_id}",91 json=inference_data92 )9394 # This might fail in a test environment without actual models95 # For a real test, check the response structure96 if inference_response.status_code == 200:97 result = inference_response.json()98 print(f"Model response: {result}")99 else:100 print(f"Inference failed (expected in test environment): {inference_response.text}")101102 # 5. Verify context was updated after inference103 print("Step 5: Verifying context update after inference...")104 final_get_response = await client.get(f"http://localhost:8080/api/v1/context/{context_id}")105 final_context = final_get_response.json()106107 # Check if session history was updated108 if "session_history" in final_context["user"]:109 history = final_context["user"]["session_history"]110 if history:111 print(f"Session history updated: {len(history)} interactions recorded")112 else:113 print("Session history exists but no interactions recorded")114 else:115 print("No session history found in context")116117 print("Integration test completed successfully!")118119if __name__ == "__main__":120 asyncio.run(test_full_workflow())
Performance Testing
Create a script to test performance:
python1# performance_test.py2import asyncio3import httpx4import time5import random6import statistics7from typing import List, Dict, Any8import uuid910async def create_context(client: httpx.AsyncClient, user_id: str) -> str:11 """Create a context and return its ID"""12 context_data = {13 "user": {14 "user_id": user_id,15 "preferences": {16 "language": "en",17 "creativity": random.choice(["high", "medium", "low"])18 }19 },20 "model": {21 "model_id": "gpt-model",22 "model_version": "1.0.0"23 }24 }2526 response = await client.post("http://localhost:8080/api/v1/context", json=context_data)27 if response.status_code != 200:28 raise Exception(f"Failed to create context: {response.text}")2930 return response.json()["context_id"]3132async def run_inference(client: httpx.AsyncClient, context_id: str, prompt: str) -> float:33 """Run inference and return time taken"""34 inference_data = {"prompt": prompt}3536 start_time = time.time()37 response = await client.post(38 f"http://localhost:8080/api/v1/inference/gpt-model?context_id={context_id}",39 json=inference_data40 )41 end_time = time.time()4243 if response.status_code != 200:44 print(f"Inference failed: {response.text}")4546 return end_time - start_time4748async def performance_test(num_users: int, requests_per_user: int):49 """Run performance test with multiple simulated users"""50 async with httpx.AsyncClient() as client:51 print(f"Starting performance test with {num_users} users, {requests_per_user} requests each")5253 # Create contexts for all users54 print("Creating contexts...")55 context_ids = []56 for i in range(num_users):57 user_id = f"perf_test_user_{uuid.uuid4()}"58 context_id = await create_context(client, user_id)59 context_ids.append(context_id)6061 # Test prompts62 prompts = [63 "Tell me a story about a robot",64 "Explain quantum computing",65 "Write a poem about the ocean",66 "Give me a recipe for chocolate cake",67 "Describe the solar system"68 ]6970 # Run inference requests concurrently71 print("Running inference requests...")72 tasks = []73 for i in range(num_users):74 for j in range(requests_per_user):75 prompt = random.choice(prompts)76 tasks.append(run_inference(client, context_ids[i], prompt))7778 # Gather results79 results = await asyncio.gather(*tasks, return_exceptions=True)8081 # Calculate statistics82 successful_times = [t for t in results if isinstance(t, float)]83 errors = [e for e in results if isinstance(e, Exception)]8485 if successful_times:86 avg_time = statistics.mean(successful_times)87 min_time = min(successful_times)88 max_time = max(successful_times)89 p95_time = sorted(successful_times)[int(len(successful_times) * 0.95)]9091 print(f"Performance results:")92 print(f" Total requests: {len(results)}")93 print(f" Successful: {len(successful_times)}")94 print(f" Failed: {len(errors)}")95 print(f" Average response time: {avg_time:.4f}s")96 print(f" Min response time: {min_time:.4f}s")97 print(f" Max response time: {max_time:.4f}s")98 print(f" 95th percentile: {p95_time:.4f}s")99 print(f" Requests per second: {len(successful_times) / sum(successful_times):.2f}")100 else:101 print("No successful requests to analyze")102103if __name__ == "__main__":104 asyncio.run(performance_test(num_users=10, requests_per_user=5))
6. Scaling and Optimization
Scaling with Kubernetes
Create Kubernetes deployment files for your MCP server:
yaml1# kubernetes/mcp-deployment.yaml2apiVersion: apps/v13kind: Deployment4metadata:5 name: mcp-server6 labels:7 app: mcp-server8spec:9 replicas: 310 selector:11 matchLabels:12 app: mcp-server13 template:14 metadata:15 labels:16 app: mcp-server17 spec:18 containers:19 - name: mcp-server20 image: mcp-server:latest21 ports:22 - containerPort: 808023 resources:24 requests:25 memory: "1Gi"26 cpu: "500m"27 limits:28 memory: "2Gi"29 cpu: "1000m"30 env:31 - name: DATABASE_URL32 valueFrom:33 secretKeyRef:34 name: mcp-secrets35 key: database-url36 - name: REDIS_HOST37 value: "redis-service"38 livenessProbe:39 httpGet:40 path: /health41 port: 808042 initialDelaySeconds: 3043 periodSeconds: 1044 readinessProbe:45 httpGet:46 path: /ready47 port: 808048 initialDelaySeconds: 549 periodSeconds: 550---51apiVersion: v152kind: Service53metadata:54 name: mcp-server-service55spec:56 selector:57 app: mcp-server58 ports:59 - port: 8060 targetPort: 808061 type: ClusterIP62---63apiVersion: autoscaling/v264kind: HorizontalPodAutoscaler65metadata:66 name: mcp-server-hpa67spec:68 scaleTargetRef:69 apiVersion: apps/v170 kind: Deployment71 name: mcp-server72 minReplicas: 373 maxReplicas: 1074 metrics:75 - type: Resource76 resource:77 name: cpu78 target:79 type: Utilization80 averageUtilization: 7081 - type: Resource82 resource:83 name: memory84 target:85 type: Utilization86 averageUtilization: 80
Database Scaling
yaml1# kubernetes/database-statefulset.yaml2apiVersion: apps/v13kind: StatefulSet4metadata:5 name: postgres6spec:7 serviceName: "postgres"8 replicas: 19 selector:10 matchLabels:11 app: postgres12 template:13 metadata:14 labels:15 app: postgres16 spec:17 containers:18 - name: postgres19 image: postgres:1420 ports:21 - containerPort: 543222 name: postgres23 env:24 - name: POSTGRES_PASSWORD25 valueFrom:26 secretKeyRef:27 name: mcp-secrets28 key: postgres-password29 - name: POSTGRES_USER30 value: mcp_user31 - name: POSTGRES_DB32 value: mcp_context33 volumeMounts:34 - name: postgres-data35 mountPath: /var/lib/postgresql/data36 volumeClaimTemplates:37 - metadata:38 name: postgres-data39 spec:40 accessModes: [ "ReadWriteOnce" ]41 resources:42 requests:43 storage: 10Gi44---45apiVersion: v146kind: Service47metadata:48 name: postgres49spec:50 selector:51 app: postgres52 ports:53 - port: 543254 targetPort: 543255 clusterIP: None
Redis for Caching
yaml1# kubernetes/redis-deployment.yaml2apiVersion: apps/v13kind: Deployment4metadata:5 name: redis6spec:7 replicas: 18 selector:9 matchLabels:10 app: redis11 template:12 metadata:13 labels:14 app: redis15 spec:16 containers:17 - name: redis18 image: redis:6.2-alpine19 ports:20 - containerPort: 637921 resources:22 requests:23 memory: "256Mi"24 cpu: "100m"25 limits:26 memory: "512Mi"27 cpu: "200m"28 volumeMounts:29 - name: redis-data30 mountPath: /data31 volumes:32 - name: redis-data33 emptyDir: {}34---35apiVersion: v136kind: Service37metadata:38 name: redis-service39spec:40 selector:41 app: redis42 ports:43 - port: 637944 targetPort: 6379
Optimizing Performance
Implement a request throttler:
python1# throttler.py2import time3import asyncio4from typing import Dict, Any, Callable, Awaitable5import redis67class RequestThrottler:8 """Throttles requests to manage load"""910 def __init__(self, redis_client: redis.Redis, limits: Dict[str, int]):11 """12 Initialize with Redis client and limits dict1314 limits: Dict mapping model IDs to requests per minute15 """16 self.redis = redis_client17 self.limits = limits18 self.default_limit = 60 # Default to 60 RPM1920 async def throttle(self, model_id: str) -> bool:21 """22 Check if request should be throttled2324 Returns:25 True if request is allowed, False if it should be throttled26 """27 limit = self.limits.get(model_id, self.default_limit)28 key = f"throttle:{model_id}:{int(time.time() / 60)}" # Key expires each minute2930 # Increment counter for this minute31 current = self.redis.incr(key)3233 # Set expiry to ensure cleanup34 if current == 1:35 self.redis.expire(key, 120) # 2 minutes (to handle edge cases)3637 # Check if under limit38 return current <= limit3940 async def with_throttling(41 self,42 model_id: str,43 func: Callable[..., Awaitable[Any]],44 *args: Any,45 **kwargs: Any46 ) -> Any:47 """48 Execute function with throttling4950 Raises:51 Exception if throttled52 """53 if await self.throttle(model_id):54 return await func(*args, **kwargs)55 else:56 # In a real implementation, you might queue the request instead57 raise Exception(f"Request throttled for model {model_id}")
Add connection pooling for database:
python1# database.py2from sqlalchemy import create_engine3from sqlalchemy.ext.declarative import declarative_base4from sqlalchemy.orm import sessionmaker5import os67# Get DB URL from environment or use default8DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://mcp_user:password@localhost/mcp_context")910# Create engine with connection pooling11engine = create_engine(12 DATABASE_URL,13 pool_size=20, # Maximum number of connections14 max_overflow=10, # Allow 10 connections beyond pool_size15 pool_timeout=30, # Wait up to 30 seconds for a connection16 pool_recycle=1800, # Recycle connections after 30 minutes17 pool_pre_ping=True # Check connection validity before using18)1920SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)21Base = declarative_base()
7. Security and Maintenance
Authentication and Authorization
Implement JWT authentication:
python1# auth.py2from datetime import datetime, timedelta3from typing import Optional, Dict, Any4from fastapi import Depends, HTTPException, status5from fastapi.security import OAuth2PasswordBearer6from jose import JWTError, jwt7from passlib.context import CryptContext8from pydantic import BaseModel9import os1011# Configuration12SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-for-development")13ALGORITHM = "HS256"14ACCESS_TOKEN_EXPIRE_MINUTES = 301516# Password hashing17pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")18oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")1920# Models21class User(BaseModel):22 username: str23 email: Optional[str] = None24 full_name: Optional[str] = None25 disabled: Optional[bool] = None26 role: str = "user" # "user", "admin", "model_developer"2728class UserInDB(User):29 hashed_password: str3031class Token(BaseModel):32 access_token: str33 token_type: str3435class TokenData(BaseModel):36 username: Optional[str] = None3738# User database - in production, this would be in a real database39USERS_DB = {40 "johndoe": {41 "username": "johndoe",42 "full_name": "John Doe",43 "email": "johndoe@example.com",44 "hashed_password": pwd_context.hash("secret"),45 "disabled": False,46 "role": "user"47 },48 "alice": {49 "username": "alice",50 "full_name": "Alice Smith",51 "email": "alice@example.com",52 "hashed_password": pwd_context.hash("password"),53 "disabled": False,54 "role": "admin"55 }56}5758# Authentication functions59def verify_password(plain_password, hashed_password):60 return pwd_context.verify(plain_password, hashed_password)6162def get_user(db, username: str):63 if username in db:64 user_dict = db[username]65 return UserInDB(**user_dict)66 return None6768def authenticate_user(fake_db, username: str, password: str):69 user = get_user(fake_db, username)70 if not user:71 return False72 if not verify_password(password, user.hashed_password):73 return False74 return user7576def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None):77 to_encode = data.copy()78 if expires_delta:79 expire = datetime.utcnow() + expires_delta80 else:81 expire = datetime.utcnow() + timedelta(minutes=15)82 to_encode.update({"exp": expire})83 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)84 return encoded_jwt8586async def get_current_user(token: str = Depends(oauth2_scheme)):87 credentials_exception = HTTPException(88 status_code=status.HTTP_401_UNAUTHORIZED,89 detail="Could not validate credentials",90 headers={"WWW-Authenticate": "Bearer"},91 )92 try:93 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])94 username: str = payload.get("sub")95 if username is None:96 raise credentials_exception97 token_data = TokenData(username=username)98 except JWTError:99 raise credentials_exception100 user = get_user(USERS_DB, username=token_data.username)101 if user is None:102 raise credentials_exception103 return user104105async def get_current_active_user(current_user: User = Depends(get_current_user)):106 if current_user.disabled:107 raise HTTPException(status_code=400, detail="Inactive user")108 return current_user109110def has_role(required_role: str):111 async def role_checker(current_user: User = Depends(get_current_active_user)):112 if current_user.role != required_role and current_user.role != "admin":113 raise HTTPException(114 status_code=status.HTTP_403_FORBIDDEN,115 detail=f"Operation requires role: {required_role}"116 )117 return current_user118 return role_checker
Update your app to use authentication:
python1# Update app.py to include authentication23from fastapi import FastAPI, Depends, HTTPException, status4from fastapi.security import OAuth2PasswordRequestForm5from datetime import timedelta6from typing import Dict, Any78from auth import (9 User, Token, authenticate_user, create_access_token,10 ACCESS_TOKEN_EXPIRE_MINUTES, get_current_active_user, has_role, USERS_DB11)1213# ... other imports as before1415app = FastAPI(title="MCP Server")1617@app.post("/token", response_model=Token)18async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):19 user = authenticate_user(USERS_DB, form_data.username, form_data.password)20 if not user:21 raise HTTPException(22 status_code=status.HTTP_401_UNAUTHORIZED,23 detail="Incorrect username or password",24 headers={"WWW-Authenticate": "Bearer"},25 )26 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)27 access_token = create_access_token(28 data={"sub": user.username}, expires_delta=access_token_expires29 )30 return {"access_token": access_token, "token_type": "bearer"}3132@app.get("/users/me/", response_model=User)33async def read_users_me(current_user: User = Depends(get_current_active_user)):34 return current_user3536# Now protect your API endpoints with authentication37@app.post("/api/v1/context", response_model=Dict[str, Any])38def create_context(39 context: MCPContext,40 db: Session = Depends(get_db),41 current_user: User = Depends(get_current_active_user)42):43 """Create a new context record (requires authentication)"""44 context_id = context_store.save_context(db, context)45 return {"context_id": context_id, "status": "created"}4647# Admin-only endpoint48@app.delete("/api/v1/context/{context_id}")49def delete_context(50 context_id: str,51 db: Session = Depends(get_db),52 current_user: User = Depends(has_role("admin"))53):54 """Delete a context (admin only)"""55 success = context_store.delete_context(db, context_id)56 if not success:57 raise HTTPException(status_code=404, detail="Context not found")58 return {"status": "deleted"}5960# ... other endpoints
Security Best Practices
Implement data encryption for sensitive context data:
python1# encryption.py2from cryptography.fernet import Fernet3import os4import json5from typing import Dict, Any, Optional67class ContextEncryption:8 """Handles encryption of sensitive context data"""910 def __init__(self, key_path: Optional[str] = None):11 """Initialize with encryption key"""12 if key_path and os.path.exists(key_path):13 with open(key_path, "rb") as key_file:14 self.key = key_file.read()15 else:16 # Generate a key and save it17 self.key = Fernet.generate_key()18 if key_path:19 os.makedirs(os.path.dirname(key_path), exist_ok=True)20 with open(key_path, "wb") as key_file:21 key_file.write(self.key)2223 self.cipher = Fernet(self.key)2425 def encrypt_context(self, context: Dict[str, Any]) -> Dict[str, Any]:26 """Encrypt sensitive parts of the context"""27 # Create a deep copy to avoid modifying the original28 encrypted_context = context.copy()2930 # Encrypt user data if present31 if "user" in encrypted_context:32 user_data = encrypted_context["user"]3334 # Encrypt user preferences35 if "preferences" in user_data:36 user_data["preferences"] = self._encrypt_data(user_data["preferences"])3738 # Encrypt demographics39 if "demographics" in user_data:40 user_data["demographics"] = self._encrypt_data(user_data["demographics"])4142 # Encrypt custom attributes43 if "custom_attributes" in encrypted_context:44 encrypted_context["custom_attributes"] = self._encrypt_data(45 encrypted_context["custom_attributes"]46 )4748 return encrypted_context4950 def decrypt_context(self, encrypted_context: Dict[str, Any]) -> Dict[str, Any]:51 """Decrypt encrypted parts of the context"""52 # Create a deep copy to avoid modifying the original53 decrypted_context = encrypted_context.copy()5455 # Decrypt user data if present56 if "user" in decrypted_context:57 user_data = decrypted_context["user"]5859 # Decrypt user preferences60 if "preferences" in user_data and isinstance(user_data["preferences"], str):61 user_data["preferences"] = self._decrypt_data(user_data["preferences"])6263 # Decrypt demographics64 if "demographics" in user_data and isinstance(user_data["demographics"], str):65 user_data["demographics"] = self._decrypt_data(user_data["demographics"])6667 # Decrypt custom attributes68 if "custom_attributes" in decrypted_context and isinstance(decrypted_context["custom_attributes"], str):69 decrypted_context["custom_attributes"] = self._decrypt_data(70 decrypted_context["custom_attributes"]71 )7273 return decrypted_context7475 def _encrypt_data(self, data: Any) -> str:76 """Encrypt any data by converting to JSON and encrypting"""77 json_data = json.dumps(data)78 encrypted_bytes = self.cipher.encrypt(json_data.encode('utf-8'))79 return encrypted_bytes.decode('utf-8')8081 def _decrypt_data(self, encrypted_str: str) -> Any:82 """Decrypt data and convert from JSON"""83 decrypted_bytes = self.cipher.decrypt(encrypted_str.encode('utf-8'))84 return json.loads(decrypted_bytes.decode('utf-8'))
Monitoring and Logging
python1# monitoring.py2import logging3import time4from functools import wraps5from typing import Dict, Any, Callable, Optional6import json7import prometheus_client as prom8from prometheus_client import Counter, Histogram, Gauge910# Configure logging11logging.basicConfig(12 level=logging.INFO,13 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'14)15logger = logging.getLogger("mcp_server")1617# Prometheus metrics18MODEL_REQUESTS = Counter(19 'model_requests_total',20 'Total model inference requests',21 ['model_id', 'status']22)2324CONTEXT_OPERATIONS = Counter(25 'context_operations_total',26 'Context CRUD operations',27 ['operation']28)2930RESPONSE_TIME = Histogram(31 'response_time_seconds',32 'Response time in seconds',33 ['endpoint', 'method'],34 buckets=(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)35)3637ACTIVE_REQUESTS = Gauge(38 'active_requests',39 'Number of active requests',40 ['endpoint']41)4243def log_context_operation(operation: str, context_id: str, data: Optional[Dict[str, Any]] = None):44 """Log context operations with standardized format"""45 log_data = {46 "operation": operation,47 "context_id": context_id,48 "timestamp": time.time()49 }5051 if data:52 # Exclude sensitive data from logging53 sanitized_data = data.copy()54 if "user" in sanitized_data and "preferences" in sanitized_data["user"]:55 sanitized_data["user"]["preferences"] = "[REDACTED]"56 if "user" in sanitized_data and "demographics" in sanitized_data["user"]:57 sanitized_data["user"]["demographics"] = "[REDACTED]"5859 log_data["data"] = sanitized_data6061 logger.info(f"Context {operation}: {json.dumps(log_data)}")62 CONTEXT_OPERATIONS.labels(operation=operation).inc()6364def log_model_request(model_id: str, status: str, context_id: Optional[str] = None,65 input_data: Optional[Dict[str, Any]] = None,66 response: Optional[Dict[str, Any]] = None,67 error: Optional[str] = None):68 """Log model inference requests"""69 log_data = {70 "model_id": model_id,71 "status": status,72 "timestamp": time.time()73 }7475 if context_id:76 log_data["context_id"] = context_id7778 if input_data:79 # Sanitize input data for logging80 log_data["input"] = "[DATA]"8182 if response and status == "success":83 # Only log response structure, not content84 log_data["response_type"] = type(response).__name__8586 if error:87 log_data["error"] = error8889 logger.info(f"Model request: {json.dumps(log_data)}")90 MODEL_REQUESTS.labels(model_id=model_id, status=status).inc()9192def timer_decorator(endpoint: str, method: str):93 """Decorator to time and log endpoint execution"""94 def decorator(func):95 @wraps(func)96 async def wrapper(*args, **kwargs):97 start_time = time.time()9899 ACTIVE_REQUESTS.labels(endpoint=endpoint).inc()100 try:101 result = await func(*args, **kwargs)102 status = "success"103 except Exception as e:104 logger.error(f"Error in {endpoint}: {str(e)}")105 status = "error"106 raise107 finally:108 ACTIVE_REQUESTS.labels(endpoint=endpoint).dec()109110 end_time = time.time()111 execution_time = end_time - start_time112113 # Record metrics114 RESPONSE_TIME.labels(endpoint=endpoint, method=method).observe(execution_time)115116 # Log request117 logger.info(f"{method} {endpoint} - {status} - {execution_time:.4f}s")118119 return result120 return wrapper121 return decorator
System Health Checks
python1# health.py2from fastapi import APIRouter, Depends, HTTPException, status3from sqlalchemy.orm import Session4from sqlalchemy import text5import redis6import psutil7import os8from typing import Dict, Any910from database import get_db1112health_router = APIRouter()1314@health_router.get("/health")15async def health_check():16 """Basic health check endpoint"""17 return {"status": "ok"}1819@health_router.get("/ready")20async def readiness_check(db: Session = Depends(get_db)):21 """Readiness check - ensures database connection works"""22 try:23 # Execute a simple query to check if database is responsive24 db.execute(text("SELECT 1"))25 db_status = "ok"26 except Exception as e:27 db_status = f"error: {str(e)}"2829 try:30 # Check Redis connection31 r = redis.Redis(host=os.getenv("REDIS_HOST", "localhost"))32 r.ping()33 redis_status = "ok"34 except Exception as e:35 redis_status = f"error: {str(e)}"3637 # Get system resources38 cpu_percent = psutil.cpu_percent()39 memory = psutil.virtual_memory()40 disk = psutil.disk_usage('/')4142 system_status = {43 "cpu_percent": cpu_percent,44 "memory_percent": memory.percent,45 "disk_percent": disk.percent46 }4748 # Determine overall status49 if db_status == "ok" and redis_status == "ok":50 status = "ready"51 else:52 status = "not ready"5354 return {55 "status": status,56 "database": db_status,57 "redis": redis_status,58 "system": system_status59 }6061@health_router.get("/metrics")62async def metrics():63 """Endpoint for Prometheus metrics"""64 from prometheus_client import generate_latest6566 # Generate metrics in Prometheus format67 metrics_data = generate_latest()68 return metrics_data
8. Best Practices and Pitfalls to Avoid
Best Practices for MCP Server Deployment
Context Management
-
Structured Context Schema: Always use a well-defined, versioned schema for context data. This ensures compatibility across models and prevents unexpected behavior.
-
Context Lifetime Management: Implement policies to expire context data after a certain period of inactivity to prevent context bloat:
python1# Example context cleanup job (scheduled job)2async def cleanup_stale_contexts():3 """Remove contexts that haven't been used for X days"""4 cutoff_date = datetime.utcnow() - timedelta(days=30)56 async with async_session() as session:7 # Find and delete old contexts8 result = await session.execute(9 delete(ContextRecord).where(ContextRecord.updated_at < cutoff_date)10 )11 await session.commit()1213 num_deleted = result.rowcount14 logger.info(f"Cleaned up {num_deleted} stale contexts")
- Progressive Context Enhancement: Design your system to allow new context attributes to be added without breaking existing functionality.
python1# Example of context schema versioning and migration2class ContextMigration:3 @staticmethod4 def migrate_context(context_data, from_version, to_version):5 """Migrate context data between schema versions"""6 if from_version == 1 and to_version == 2:7 # Add new fields for version 28 if "user" in context_data and "preferences" not in context_data["user"]:9 context_data["user"]["preferences"] = {}1011 # Restructure existing fields12 if "environment" in context_data and "location" in context_data["environment"]:13 location = context_data["environment"].pop("location")14 if "geography" not in context_data:15 context_data["geography"] = {}16 context_data["geography"]["location"] = location1718 # Update version19 context_data["schema_version"] = 22021 # Add more migration paths as needed22 return context_data
Performance Optimization
- Efficient Context Retrieval: Ensure your database schema is properly indexed for quick context lookups:
sql1-- Example SQL for adding indexes to the contexts table2CREATE INDEX idx_contexts_user_id ON contexts ((user_context->>'user_id'));3CREATE INDEX idx_contexts_updated_at ON contexts (updated_at);
- Implement Caching Layers: Use Redis or a similar in-memory store to cache frequently accessed contexts:
python1# Example context caching implementation2async def get_cached_context(context_id: str, db: Session):3 """Get context with caching"""4 # Try cache first5 cache_key = f"context:{context_id}"6 cached = redis_client.get(cache_key)78 if cached:9 return json.loads(cached)1011 # If not in cache, get from database12 context = context_store.get_context(db, context_id)13 if context:14 # Store in cache with TTL15 redis_client.setex(cache_key, 3600, json.dumps(context))1617 return context
- Batch Processing: For high-volume applications, implement batch processing for context updates:
python1# Example batch context update2async def batch_update_contexts(updates: List[Dict[str, Any]]):3 """Update multiple contexts in a single transaction"""4 async with async_session() as session:5 async with session.begin():6 for update in updates:7 context_id = update["context_id"]8 data = update["data"]910 stmt = (11 update(ContextRecord)12 .where(ContextRecord.id == context_id)13 .values(updated_at=datetime.utcnow(), **data)14 )15 await session.execute(stmt)
Security and Compliance
- Implement Role-Based Access Control: Ensure only authorized users can access context data:
python1# Extend the has_role function to check context ownership2def has_context_access(context_id: str):3 async def context_access_checker(current_user: User = Depends(get_current_active_user),4 db: Session = Depends(get_db)):5 # Admin always has access6 if current_user.role == "admin":7 return current_user89 # Get the context10 context = context_store.get_context(db, context_id)11 if not context:12 raise HTTPException(status_code=404, detail="Context not found")1314 # Check if user owns this context15 if (context["user"]["user_id"] != current_user.username and16 current_user.role != "model_developer"):17 raise HTTPException(18 status_code=status.HTTP_403_FORBIDDEN,19 detail="You don't have access to this context"20 )2122 return current_user23 return context_access_checker
- Implement Data Retention Policies: Ensure compliance with regulations like GDPR by implementing proper data retention:
python1# Example GDPR compliance handler2async def delete_user_data(user_id: str):3 """Delete all contexts associated with a user"""4 async with async_session() as session:5 # Find contexts with this user6 result = await session.execute(7 select(ContextRecord)8 .where(ContextRecord.user_context.contains({"user_id": user_id}))9 )10 contexts = result.scalars().all()1112 # Delete each context13 for context in contexts:14 await session.delete(context)1516 await session.commit()1718 return len(contexts)
Common Pitfalls to Avoid
Context Management Issues
- Overly Large Contexts: Contexts that grow unbounded can cause performance issues and increased latency:
python1# Implement context size limits2def validate_context_size(context):3 """Check if context is within size limits"""4 context_json = json.dumps(context)5 size_kb = len(context_json) / 102467 if size_kb > 100: # Example: 100KB limit8 logger.warning(f"Context size {size_kb:.2f}KB exceeds recommended limit of 100KB")910 # Take action - could truncate history, remove less important fields, etc.11 if "user" in context and "session_history" in context["user"]:12 # Keep only last 10 interactions13 context["user"]["session_history"] = context["user"]["session_history"][-10:]1415 return context
- Inconsistent Context Formats: Ensure all services create and consume context in consistent formats:
python1# Use validation middleware2@app.middleware("http")3async def validate_context_middleware(request: Request, call_next):4 """Validate context structure in requests"""5 if request.url.path.startswith("/api/v1/context"):6 body = await request.json()7 try:8 # Validate against schema9 MCPContext(**body)10 except ValidationError as e:11 return JSONResponse(12 status_code=422,13 content={"detail": "Invalid context format", "errors": e.errors()}14 )1516 response = await call_next(request)17 return response
Performance Pitfalls
- N+1 Query Problems: Avoid making multiple database queries in loops:
python1# BAD EXAMPLE - DON'T DO THIS2async def get_multiple_contexts(context_ids: List[str]):3 contexts = []4 for context_id in context_ids:5 # N+1 problem - one query per context6 context = await get_context(context_id)7 contexts.append(context)8 return contexts910# GOOD EXAMPLE - DO THIS INSTEAD11async def get_multiple_contexts_efficiently(context_ids: List[str]):12 # Single query to get all contexts13 async with async_session() as session:14 result = await session.execute(15 select(ContextRecord)16 .where(ContextRecord.id.in_(context_ids))17 )18 contexts = result.scalars().all()1920 # Process results21 return [context.to_dict() for context in contexts]
- Synchronous I/O in Async Code: Ensure all I/O operations are properly async:
python1# BAD EXAMPLE - DON'T DO THIS2async def get_model_result(model_id: str, input_data: Dict[str, Any]):3 # Blocking I/O in async function!4 with open(f"/models/{model_id}/config.json", "r") as f:5 config = json.load(f)67 # More async code...89# GOOD EXAMPLE - DO THIS INSTEAD10async def get_model_result_correctly(model_id: str, input_data: Dict[str, Any]):11 # Use aiofiles for async file I/O12 import aiofiles1314 async with aiofiles.open(f"/models/{model_id}/config.json", "r") as f:15 content = await f.read()16 config = json.loads(content)1718 # More async code...
Security Pitfalls
- Inadequate Input Validation: Always validate all inputs, especially context data:
python1# Implement strict validation2class UpdateContextRequest(BaseModel):3 """Schema for context update requests"""4 user: Optional[Dict[str, Any]] = None5 environment: Optional[Dict[str, Any]] = None6 model: Optional[Dict[str, Any]] = None7 data: Optional[Dict[str, Any]] = None8 custom_attributes: Optional[Dict[str, Any]] = None910 class Config:11 # Prevent additional fields12 extra = "forbid"
- Using Passwords or Keys in Context: Never store sensitive authentication data in context:
python1# Context sanitizer example2def sanitize_context(context: Dict[str, Any]) -> Dict[str, Any]:3 """Remove sensitive data from context"""4 sanitized = context.copy()56 # Remove any fields that might contain sensitive data7 sensitive_fields = [8 "password", "token", "key", "secret", "auth", "credential", "api_key"9 ]1011 # Check in custom attributes12 if "custom_attributes" in sanitized:13 for field in sensitive_fields:14 if field in sanitized["custom_attributes"]:15 del sanitized["custom_attributes"][field]1617 # Check in user preferences18 if "user" in sanitized and "preferences" in sanitized["user"]:19 for field in sensitive_fields:20 if field in sanitized["user"]["preferences"]:21 del sanitized["user"]["preferences"][field]2223 return sanitized
9. Conclusion
MCP Server Benefits
Building your own MCP-compliant server provides several key advantages:
-
Complete Control: With a custom MCP server, you have full control over how context is managed, stored, and utilized by your models.
-
Tailored Performance: You can optimize performance based on your specific workloads and deployment environment, from resource allocation to caching strategies.
-
Customized Security: Implement security measures that align with your organization's requirements and compliance needs.
-
Integration Flexibility: Connect your MCP server to your existing systems, databases, and services with custom integrations.
-
Cost Optimization: Avoid vendor lock-in and potentially reduce costs by implementing exactly what you need, especially for high-volume deployments.

Sovereign AI: Building Local-First Intelligent Systems
by Daniel Kliewer · Paperback · 72 pages
The hands-on guide to building AI that runs on your hardware, keeps your data private, and eliminates cloud dependence. Working code included.