Complete Blueprint: Building a Local LLM Document Processing Pipeline with Advanced Extraction, Semantic Analysis, and Scalable Storage
A comprehensive guide to building a production-ready local LLM document processing pipeline with advanced extraction, semantic analysis, vector storage, and transformation capabilities for enterprise document management.
Daniel Kliewer
Author, Sovereign AI

From the Book
This is from Sovereign AI: Building Local-First Intelligent Systems.


Building a Local Document Processing Pipeline with LLMs: The Ultimate Architecture
"The ability to process, understand, and transform documents is not merely a technical challengeβit is the foundation of knowledge work in the digital age."
This comprehensive guide presents a production-grade, locally-hosted document processing pipeline that combines elegance with power. By the end, you'll have a system that extracts meaning from documents, structures information intelligently, and enables limitless transformations of your contentβall without sending sensitive data to external APIs.
π Architecture Overview
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β β β β β β β Document βββ β Extraction βββ β Semantic βββ β Storage & β β Ingestion β β Engine β β Processing β β Retrieval β β β β β β β β β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β βββββββββββββββββββββββββββ΄ββββββββββββββββββββββββ β β β Transformation Layer β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ
1. High-Fidelity Document Extraction System
The foundation of our pipeline is a robust extraction engine that preserves document structure while efficiently handling multiple formats.
python1# document_extractor.py2from typing import Dict, Union, List, Optional3import pdfplumber4from docx import Document5import fitz # PyMuPDF6import logging7import concurrent.futures8from dataclasses import dataclass910@dataclass11class DocumentMetadata:12 """Structured metadata for any document."""13 filename: str14 file_type: str15 page_count: int16 author: Optional[str] = None17 creation_date: Optional[str] = None18 last_modified: Optional[str] = None1920@dataclass21class DocumentElement:22 """Represents a structural element of a document."""23 element_type: str # 'paragraph', 'heading', 'list_item', 'table', etc.24 content: str25 metadata: Dict = None26 position: Dict = None # For spatial positioning in the document2728@dataclass29class DocumentContent:30 """Full representation of a document's content and structure."""31 metadata: DocumentMetadata32 elements: List[DocumentElement]33 raw_text: str = None3435class DocumentExtractor:36 """Universal document extraction class with advanced capabilities."""3738 def __init__(self, max_workers: int = 4):39 self.logger = logging.getLogger(__name__)40 self.max_workers = max_workers4142 def extract(self, file_path: str) -> DocumentContent:43 """Extract content from document with appropriate extractor."""44 lower_path = file_path.lower()4546 if lower_path.endswith('.pdf'):47 return self._extract_pdf(file_path)48 elif lower_path.endswith('.docx'):49 return self._extract_docx(file_path)50 else:51 raise ValueError(f"Unsupported file format: {file_path}")5253 def _extract_pdf(self, file_path: str) -> DocumentContent:54 """Extract content from PDF with advanced structure recognition."""55 try:56 # Using PyMuPDF for metadata and pdfplumber for content57 pdf_doc = fitz.open(file_path)58 metadata = DocumentMetadata(59 filename=file_path.split('/')[-1],60 file_type="pdf",61 page_count=len(pdf_doc),62 author=pdf_doc.metadata.get('author'),63 creation_date=pdf_doc.metadata.get('creationDate'),64 last_modified=pdf_doc.metadata.get('modDate')65 )6667 elements = []68 raw_text = ""6970 # Process pages in parallel for large documents71 def process_page(page_num):72 with pdfplumber.open(file_path) as pdf:73 page = pdf.pages[page_num]74 page_text = page.extract_text() or ""7576 # Extract tables separately to maintain structure77 tables = page.extract_tables()7879 # Identify text blocks with their positions80 blocks = page.extract_words(81 keep_blank_chars=True,82 x_tolerance=3,83 y_tolerance=3,84 extra_attrs=['fontname', 'size']85 )8687 page_elements = []8889 # Process text blocks to identify paragraphs and headings90 current_block = ""91 current_metadata = {}9293 for word in blocks:94 # Simplified logic - in production would have more sophisticated95 # heading/paragraph detection based on font, size, etc.96 if not current_metadata:97 current_metadata = {98 'font': word.get('fontname'),99 'size': word.get('size'),100 'page': page_num + 1101 }102103 if word.get('size') != current_metadata.get('size'):104 # Font size changed, likely a new element105 if current_block:106 element_type = 'heading' if current_metadata.get('size', 0) > 11 else 'paragraph'107 page_elements.append(DocumentElement(108 element_type=element_type,109 content=current_block.strip(),110 metadata=current_metadata.copy(),111 position={'page': page_num + 1}112 ))113 current_block = ""114 current_metadata = {115 'font': word.get('fontname'),116 'size': word.get('size'),117 'page': page_num + 1118 }119120 current_block += word.get('text', '') + " "121122 # Add the last block123 if current_block:124 element_type = 'heading' if current_metadata.get('size', 0) > 11 else 'paragraph'125 page_elements.append(DocumentElement(126 element_type=element_type,127 content=current_block.strip(),128 metadata=current_metadata,129 position={'page': page_num + 1}130 ))131132 # Add tables as structured elements133 for i, table in enumerate(tables):134 table_text = "\n".join([" | ".join([cell or "" for cell in row]) for row in table])135 page_elements.append(DocumentElement(136 element_type='table',137 content=table_text,138 metadata={'table_index': i},139 position={'page': page_num + 1}140 ))141142 return page_text, page_elements143144 # Process pages in parallel for large documents145 results = []146 with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:147 futures = [executor.submit(process_page, i) for i in range(len(pdf_doc))]148 for future in concurrent.futures.as_completed(futures):149 results.append(future.result())150151 # Sort results by page number (they might complete out of order)152 for page_text, page_elements in sorted(results, key=lambda x: x[1][0].position['page'] if x[1] else 0):153 raw_text += page_text + "\n\n"154 elements.extend(page_elements)155156 return DocumentContent(metadata=metadata, elements=elements, raw_text=raw_text.strip())157158 except Exception as e:159 self.logger.error(f"Error extracting PDF content: {str(e)}")160 raise161162 def _extract_docx(self, file_path: str) -> DocumentContent:163 """Extract content from DOCX with structure preservation."""164 try:165 doc = Document(file_path)166167 # Extract metadata168 metadata = DocumentMetadata(169 filename=file_path.split('/')[-1],170 file_type="docx",171 page_count=0, # Page count not directly available in python-docx172 author=doc.core_properties.author,173 creation_date=str(doc.core_properties.created) if doc.core_properties.created else None,174 last_modified=str(doc.core_properties.modified) if doc.core_properties.modified else None175 )176177 elements = []178 raw_text = ""179180 # Process paragraphs181 for i, para in enumerate(doc.paragraphs):182 if not para.text.strip():183 continue184185 # Determine element type based on paragraph style186 element_type = 'paragraph'187 if para.style.name.startswith('Heading'):188 element_type = 'heading'189 elif para.style.name.startswith('List'):190 element_type = 'list_item'191192 # Extract formatting information193 runs_info = []194 for run in para.runs:195 runs_info.append({196 'text': run.text,197 'bold': run.bold,198 'italic': run.italic,199 'underline': run.underline,200 'font': run.font.name if run.font.name else None201 })202203 elements.append(DocumentElement(204 element_type=element_type,205 content=para.text,206 metadata={207 'style': para.style.name,208 'runs': runs_info209 },210 position={'index': i}211 ))212213 raw_text += para.text + "\n"214215 # Process tables216 for i, table in enumerate(doc.tables):217 table_text = ""218 for row in table.rows:219 row_text = " | ".join([cell.text for cell in row.cells])220 table_text += row_text + "\n"221222 elements.append(DocumentElement(223 element_type='table',224 content=table_text.strip(),225 metadata={'table_index': i},226 position={'index': len(doc.paragraphs) + i}227 ))228229 raw_text += table_text + "\n\n"230231 return DocumentContent(metadata=metadata, elements=elements, raw_text=raw_text.strip())232233 except Exception as e:234 self.logger.error(f"Error extracting DOCX content: {str(e)}")235 raise236237# Usage example238if __name__ == "__main__":239 logging.basicConfig(level=logging.INFO)240 extractor = DocumentExtractor()241242 # Extract PDF content243 pdf_content = extractor.extract("sample.pdf")244 print(f"PDF Metadata: {pdf_content.metadata}")245 print(f"PDF Elements: {len(pdf_content.elements)}")246247 # Extract DOCX content248 docx_content = extractor.extract("sample.docx")249 print(f"DOCX Metadata: {docx_content.metadata}")250 print(f"DOCX Elements: {len(docx_content.elements)}")
2. Semantic Processing with Local LLMs
This module integrates with local LLMs using Ollama while providing a flexible, performant interface that handles model limitations gracefully.
python1# semantic_processor.py2from typing import Dict, List, Any, Optional, Union3import json4import logging5import time6import httpx7from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type8from document_extractor import DocumentContent, DocumentElement910class LLMProcessingError(Exception):11 """Raised when there is an error processing content with the LLM."""12 pass1314class OllamaClient:15 """Client for interacting with Ollama local LLM server."""1617 def __init__(18 self,19 base_url: str = "http://localhost:11434",20 model: str = "vanilj/Phi-4:latest",21 timeout: int = 120,22 temperature: float = 0.1,23 max_tokens: int = 102424 ):25 self.base_url = base_url26 self.model = model27 self.timeout = timeout28 self.temperature = temperature29 self.max_tokens = max_tokens30 self.logger = logging.getLogger(__name__)3132 @retry(33 stop=stop_after_attempt(3),34 wait=wait_exponential(multiplier=1, min=2, max=10),35 retry=retry_if_exception_type((httpx.ReadTimeout, httpx.ConnectError))36 )37 async def generate(self, prompt: str, system_prompt: Optional[str] = None) -> str:38 """Generate text from the model with retry logic for robustness."""39 try:40 payload = {41 "model": self.model,42 "prompt": prompt,43 "stream": False,44 "temperature": self.temperature,45 "max_tokens": self.max_tokens46 }4748 if system_prompt:49 payload["system"] = system_prompt5051 async with httpx.AsyncClient(timeout=self.timeout) as client:52 response = await client.post(f"{self.base_url}/api/generate", json=payload)53 response.raise_for_status()54 result = response.json()55 return result.get("response", "")5657 except httpx.HTTPStatusError as e:58 self.logger.error(f"HTTP error: {e}")59 raise LLMProcessingError(f"Failed to get response from LLM: {str(e)}")60 except Exception as e:61 self.logger.error(f"Unexpected error: {e}")62 raise LLMProcessingError(f"Error communicating with LLM: {str(e)}")6364class SemanticProcessor:65 """Processes document content using a local LLM for intelligent extraction."""6667 def __init__(68 self,69 llm_client: OllamaClient = None,70 chunk_size: int = 6000,71 chunk_overlap: int = 100072 ):73 self.llm_client = llm_client or OllamaClient()74 self.chunk_size = chunk_size75 self.chunk_overlap = chunk_overlap76 self.logger = logging.getLogger(__name__)7778 def _chunk_document(self, doc_content: DocumentContent) -> List[str]:79 """Split document into manageable chunks that preserve semantic meaning."""80 elements = doc_content.elements81 chunks = []82 current_chunk = ""8384 for element in elements:85 # If adding this element would exceed chunk size, save current chunk86 if len(current_chunk) + len(element.content) > self.chunk_size and current_chunk:87 chunks.append(current_chunk)88 # Keep some overlap for context preservation89 overlap_text = current_chunk[-self.chunk_overlap:] if self.chunk_overlap > 0 else ""90 current_chunk = overlap_text9192 # Add element content with appropriate formatting93 if element.element_type == 'heading':94 current_chunk += f"\n## {element.content}\n\n"95 elif element.element_type == 'list_item':96 current_chunk += f"β’ {element.content}\n"97 elif element.element_type == 'table':98 current_chunk += f"\nTABLE:\n{element.content}\n\n"99 else: # paragraph100 current_chunk += f"{element.content}\n\n"101102 # Add the final chunk if there's content103 if current_chunk:104 chunks.append(current_chunk)105106 return chunks107108 async def _process_chunk_to_json(self, chunk: str, schema: Dict) -> Dict:109 """Process a document chunk into structured JSON."""110 schema_str = json.dumps(schema, indent=2)111112 system_prompt = """You are a document structuring expert.113Your task is to extract information from document text and structure it according to a given schema.114Always respond with valid JSON that exactly matches the provided schema structure."""115116 user_prompt = f"""Extract structured information from the following document text.117Format your response as a valid JSON object that strictly follows this schema:118119{schema_str}120121DOCUMENT TEXT:122{chunk}123124Return ONLY the JSON output without any additional text, explanations, or formatting."""125126 try:127 response = await self.llm_client.generate(user_prompt, system_prompt)128129 # Find JSON in the response (in case model adds comments)130 try:131 start_idx = response.find('{')132 end_idx = response.rfind('}') + 1133 if start_idx == -1 or end_idx == 0:134 raise ValueError("No JSON found in response")135136 json_str = response[start_idx:end_idx]137 result = json.loads(json_str)138 return result139 except json.JSONDecodeError:140 # Try to fix common JSON errors141 fixed_response = self._fix_json_response(response)142 return json.loads(fixed_response)143144 except Exception as e:145 self.logger.error(f"Error processing chunk to JSON: {str(e)}")146 self.logger.error(f"Problematic chunk: {chunk[:100]}...")147 # Return partial data instead of failing completely148 return {"error": str(e), "partial_text": chunk[:100] + "..."}149150 def _fix_json_response(self, response: str) -> str:151 """Attempt to fix common JSON errors in LLM responses."""152 # Find what looks like the JSON part of the response153 start_idx = response.find('{')154 end_idx = response.rfind('}') + 1155156 if start_idx >= 0 and end_idx > 0:157 json_str = response[start_idx:end_idx]158159 # Common fixes160 # 1. Fix trailing commas before closing braces161 json_str = json_str.replace(',}', '}').replace(',\n}', '\n}')162 json_str = json_str.replace(',]', ']').replace(',\n]', '\n]')163164 # 2. Fix unescaped quotes in strings165 # This is a simplistic approach - a real implementation would be more sophisticated166 in_string = False167 fixed_chars = []168169 for i, char in enumerate(json_str):170 if char == '"' and (i == 0 or json_str[i-1] != '\\'):171 in_string = not in_string172173 # If we're in a string and find an unescaped quote, escape it174 if in_string and char == '"' and i > 0 and json_str[i-1] != '\\' and i < len(json_str)-1:175 fixed_chars.append('\\')176177 fixed_chars.append(char)178179 return ''.join(fixed_chars)180181 return response182183 async def _merge_chunk_results(self, results: List[Dict], schema: Dict) -> Dict:184 """Intelligently merge results from multiple chunks."""185 if not results:186 return {}187188 # If we only have one chunk, just return it189 if len(results) == 1:190 return results[0]191192 # For multiple chunks, we need to merge them intelligently193 merged = {}194195 # Basic strategy - iterate through schema keys and merge accordingly196 for key, value_type in schema.items():197 # String fields: use the non-empty value from the first chunk that has it198 if value_type == "string":199 for result in results:200 if result.get(key) and isinstance(result.get(key), str) and result[key].strip():201 merged[key] = result[key]202 break203 if key not in merged:204 merged[key] = ""205206 # List fields: concatenate lists from all chunks and deduplicate207 elif isinstance(value_type, list) or (isinstance(value_type, str) and value_type.startswith("array")):208 all_items = []209 for result in results:210 if result.get(key) and isinstance(result.get(key), list):211 all_items.extend(result[key])212213 # Simple deduplication - this could be more sophisticated214 deduplicated = []215 seen = set()216 for item in all_items:217 item_str = str(item)218 if item_str not in seen:219 seen.add(item_str)220 deduplicated.append(item)221222 merged[key] = deduplicated223224 # Object fields: recursively merge225 elif isinstance(value_type, dict):226 sub_results = [result.get(key, {}) for result in results if isinstance(result.get(key), dict)]227 merged[key] = await self._merge_chunk_results(sub_results, value_type)228229 # Default case230 else:231 merged[key] = results[0].get(key, "")232233 return merged234235 async def process_document(self, doc_content: DocumentContent, schema: Dict) -> Dict:236 """237 Process a document into structured data according to the provided schema.238239 Args:240 doc_content: The document content object from the extractor241 schema: JSON schema defining the output structure242243 Returns:244 Dict containing the structured document data245 """246 start_time = time.time()247 self.logger.info(f"Starting document processing: {doc_content.metadata.filename}")248249 # Split document into manageable chunks250 chunks = self._chunk_document(doc_content)251 self.logger.info(f"Document split into {len(chunks)} chunks")252253 # Process each chunk in parallel254 chunk_results = []255 for i, chunk in enumerate(chunks):256 self.logger.info(f"Processing chunk {i+1}/{len(chunks)}")257 result = await self._process_chunk_to_json(chunk, schema)258 chunk_results.append(result)259260 # Merge results from all chunks261 final_result = await self._merge_chunk_results(chunk_results, schema)262263 # Add document metadata264 final_result["_metadata"] = {265 "filename": doc_content.metadata.filename,266 "file_type": doc_content.metadata.file_type,267 "page_count": doc_content.metadata.page_count,268 "author": doc_content.metadata.author,269 "processing_time": time.time() - start_time270 }271272 self.logger.info(f"Document processing completed in {time.time() - start_time:.2f} seconds")273 return final_result274275# Example schema276DEFAULT_DOCUMENT_SCHEMA = {277 "title": "string",278 "summary": "string",279 "main_topics": ["string"],280 "sections": [281 {282 "heading": "string",283 "content": "string",284 "key_points": ["string"]285 }286 ],287 "entities": {288 "people": ["string"],289 "organizations": ["string"],290 "locations": ["string"],291 "dates": ["string"]292 }293}294295# Usage example296async def process_document_example():297 from document_extractor import DocumentExtractor298299 logging.basicConfig(level=logging.INFO)300301 # Initialize components302 extractor = DocumentExtractor()303 llm_client = OllamaClient(model="vanilj/Phi-4:latest")304 processor = SemanticProcessor(llm_client=llm_client)305306 # Extract document content307 doc_content = extractor.extract("sample.pdf")308309 # Process document310 result = await processor.process_document(doc_content, DEFAULT_DOCUMENT_SCHEMA)311312 # Print result313 print(json.dumps(result, indent=2))314315if __name__ == "__main__":316 import asyncio317 asyncio.run(process_document_example())
3. Robust Storage and Retrieval System
This module provides a flexible data storage layer with support for multiple backends, efficient querying, and versioning.
python1# document_store.py2from typing import Dict, List, Any, Optional, Union, Tuple3import json4import logging5import sqlite36import os7import datetime8from dataclasses import dataclass, asdict9from uuid import uuid410import asyncio11import aiosqlite1213@dataclass14class DocumentRecord:15 """Represents a document record in the storage system."""16 doc_id: str17 title: str18 content: Dict[str, Any] # The structured JSON content19 file_path: str20 file_type: str21 created_at: str22 updated_at: str23 version: int = 124 tags: List[str] = None2526 def to_dict(self) -> Dict:27 """Convert to dictionary representation."""28 result = asdict(self)29 # Convert content to JSON string for storage30 if isinstance(result['content'], dict):31 result['content'] = json.dumps(result['content'])32 if result['tags'] is None:33 result['tags'] = []34 return result3536 @classmethod37 def from_dict(cls, data: Dict) -> 'DocumentRecord':38 """Create from dictionary representation."""39 # Parse content from JSON string if needed40 if isinstance(data.get('content'), str):41 try:42 data['content'] = json.loads(data['content'])43 except json.JSONDecodeError:44 # Keep as string if it's not valid JSON45 pass4647 # Ensure tags is a list48 if data.get('tags') is None:49 data['tags'] = []5051 return cls(**data)5253class DocumentStore:54 """Abstract base class for document storage backends."""5556 async def initialize(self):57 """Initialize the storage backend."""58 raise NotImplementedError5960 async def store_document(self, document: DocumentRecord) -> str:61 """Store a document and return its ID."""62 raise NotImplementedError6364 async def get_document(self, doc_id: str) -> Optional[DocumentRecord]:65 """Retrieve a document by ID."""66 raise NotImplementedError6768 async def update_document(self, doc_id: str, content: Dict[str, Any],69 increment_version: bool = True) -> Optional[DocumentRecord]:70 """Update a document's content."""71 raise NotImplementedError7273 async def delete_document(self, doc_id: str) -> bool:74 """Delete a document."""75 raise NotImplementedError7677 async def list_documents(self, limit: int = 100, offset: int = 0,78 tags: Optional[List[str]] = None) -> List[DocumentRecord]:79 """List documents with optional filtering."""80 raise NotImplementedError8182 async def search_documents(self, query: str,83 fields: Optional[List[str]] = None) -> List[DocumentRecord]:84 """Search documents by content."""85 raise NotImplementedError8687 async def get_document_versions(self, doc_id: str) -> List[Dict]:88 """Get all versions of a document."""89 raise NotImplementedError9091 async def add_tags(self, doc_id: str, tags: List[str]) -> bool:92 """Add tags to a document."""93 raise NotImplementedError9495 async def close(self):96 """Close the storage connection."""97 raise NotImplementedError9899class SQLiteDocumentStore(DocumentStore):100 """SQLite implementation of document storage."""101102 def __init__(self, db_path: str = "documents.db"):103 self.db_path = db_path104 self.logger = logging.getLogger(__name__)105 self.conn = None106107 async def initialize(self):108 """Initialize the SQLite database."""109 self.logger.info(f"Initializing SQLite document store at {self.db_path}")110111 # Ensure directory exists112 os.makedirs(os.path.dirname(os.path.abspath(self.db_path)), exist_ok=True)113114 self.conn = await aiosqlite.connect(self.db_path)115116 # Enable foreign keys117 await self.conn.execute("PRAGMA foreign_keys = ON")118119 # Create documents table120 await self.conn.execute("""121 CREATE TABLE IF NOT EXISTS documents (122 doc_id TEXT PRIMARY KEY,123 title TEXT NOT NULL,124 content TEXT NOT NULL,125 file_path TEXT NOT NULL,126 file_type TEXT NOT NULL,127 created_at TEXT NOT NULL,128 updated_at TEXT NOT NULL,129 version INTEGER NOT NULL DEFAULT 1130 )131 """)132133 # Create document versions table134 await self.conn.execute("""135 CREATE TABLE IF NOT EXISTS document_versions (136 version_id INTEGER PRIMARY KEY AUTOINCREMENT,137 doc_id TEXT NOT NULL,138 content TEXT NOT NULL,139 version INTEGER NOT NULL,140 created_at TEXT NOT NULL,141 FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE142 )143 """)144145 # Create tags table146 await self.conn.execute("""147 CREATE TABLE IF NOT EXISTS tags (148 tag_id INTEGER PRIMARY KEY AUTOINCREMENT,149 tag_name TEXT NOT NULL UNIQUE150 )151 """)152153 # Create document_tags junction table154 await self.conn.execute("""155 CREATE TABLE IF NOT EXISTS document_tags (156 doc_id TEXT NOT NULL,157 tag_id INTEGER NOT NULL,158 PRIMARY KEY (doc_id, tag_id),159 FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,160 FOREIGN KEY (tag_id) REFERENCES tags(tag_id) ON DELETE CASCADE161 )162 """)163164 # Create full-text search index165 await self.conn.execute("""166 CREATE VIRTUAL TABLE IF NOT EXISTS document_fts USING fts5(167 doc_id UNINDEXED,168 title,169 content,170 tokenize='porter unicode61'171 )172 """)173174 # Create triggers to keep FTS index updated175 await self.conn.execute("""176 CREATE TRIGGER IF NOT EXISTS documents_ai AFTER INSERT ON documents BEGIN177 INSERT INTO document_fts(doc_id, title, content)178 VALUES (new.doc_id, new.title, new.content);179 END180 """)181182 await self.conn.execute("""183 CREATE TRIGGER IF NOT EXISTS documents_au AFTER UPDATE ON documents BEGIN184 DELETE FROM document_fts WHERE doc_id = old.doc_id;185 INSERT INTO document_fts(doc_id, title, content)186 VALUES (new.doc_id, new.title, new.content);187 END188 """)189190 await self.conn.execute("""191 CREATE TRIGGER IF NOT EXISTS documents_ad AFTER DELETE ON documents BEGIN192 DELETE FROM document_fts WHERE doc_id = old.doc_id;193 END194 """)195196 await self.conn.commit()197 self.logger.info("SQLite document store initialized")198199 async def store_document(self, document: DocumentRecord) -> str:200 """Store a document and return its ID."""201 if not self.conn:202 await self.initialize()203204 if not document.doc_id:205 document.doc_id = str(uuid4())206207 now = datetime.datetime.now().isoformat()208 if not document.created_at:209 document.created_at = now210 if not document.updated_at:211 document.updated_at = now212213 document_dict = document.to_dict()214215 try:216 # Insert document217 await self.conn.execute("""218 INSERT INTO documents219 (doc_id, title, content, file_path, file_type, created_at, updated_at, version)220 VALUES (?, ?, ?, ?, ?, ?, ?, ?)221 """, (222 document_dict['doc_id'],223 document_dict['title'],224 document_dict['content'],225 document_dict['file_path'],226 document_dict['file_type'],227 document_dict['created_at'],228 document_dict['updated_at'],229 document_dict['version']230 ))231232 # Store initial version233 await self.conn.execute("""234 INSERT INTO document_versions235 (doc_id, content, version, created_at)236 VALUES (?, ?, ?, ?)237 """, (238 document_dict['doc_id'],239 document_dict['content'],240 document_dict['version'],241 document_dict['created_at']242 ))243244 # Add tags if present245 if document_dict['tags']:246 await self._add_tags_internal(document_dict['doc_id'], document_dict['tags'])247248 await self.conn.commit()249 self.logger.info(f"Stored document with ID: {document_dict['doc_id']}")250 return document_dict['doc_id']251252 except sqlite3.Error as e:253 self.logger.error(f"Error storing document: {str(e)}")254 await self.conn.rollback()255 raise256257 async def _add_tags_internal(self, doc_id: str, tags: List[str]):258 """Internal method to add tags to a document."""259 for tag in tags:260 # Ensure tag exists in tags table261 cursor = await self.conn.execute(262 "INSERT OR IGNORE INTO tags (tag_name) VALUES (?)",263 (tag,)264 )265 await self.conn.commit()266267 # Get tag ID268 cursor = await self.conn.execute(269 "SELECT tag_id FROM tags WHERE tag_name = ?",270 (tag,)271 )272 row = await cursor.fetchone()273 tag_id = row[0]274275 # Associate tag with document276 await self.conn.execute(277 "INSERT OR IGNORE INTO document_tags (doc_id, tag_id) VALUES (?, ?)",278 (doc_id, tag_id)279 )280281 async def get_document(self, doc_id: str) -> Optional[DocumentRecord]:282 """Retrieve a document by ID."""283 if not self.conn:284 await self.initialize()285286 try:287 # Get document288 cursor = await self.conn.execute("""289 SELECT d.doc_id, d.title, d.content, d.file_path, d.file_type,290 d.created_at, d.updated_at, d.version291 FROM documents d292 WHERE d.doc_id = ?293 """, (doc_id,))294295 row = await cursor.fetchone()296 if not row:297 return None298299 # Get tags for document300 cursor = await self.conn.execute("""301 SELECT t.tag_name302 FROM tags t303 JOIN document_tags dt ON t.tag_id = dt.tag_id304 WHERE dt.doc_id = ?305 """, (doc_id,))306307 tags = [tag[0] for tag in await cursor.fetchall()]308309 document_dict = {310 'doc_id': row[0],311 'title': row[1],312 'content': row[2],313 'file_path': row[3],314 'file_type': row[4],315 'created_at': row[5],316 'updated_at': row[6],317 'version': row[7],318 'tags': tags319 }320321 return DocumentRecord.from_dict(document_dict)322323 except sqlite3.Error as e:324 self.logger.error(f"Error getting document: {str(e)}")325 raise326327 async def update_document(self, doc_id: str, content: Dict[str, Any],328 increment_version: bool = True) -> Optional[DocumentRecord]:329 """Update a document's content."""330 if not self.conn:331 await self.initialize()332333 try:334 # Get current document335 cursor = await self.conn.execute(336 "SELECT version FROM documents WHERE doc_id = ?",337 (doc_id,)338 )339 row = await cursor.fetchone()340 if not row:341 return None342343 current_version = row[0]344 new_version = current_version + 1 if increment_version else current_version345 content_json = json.dumps(content)346 now = datetime.datetime.now().isoformat()347348 # Update document349 await self.conn.execute("""350 UPDATE documents351 SET content = ?, updated_at = ?, version = ?352 WHERE doc_id = ?353 """, (content_json, now, new_version, doc_id))354355 # Store new version if needed356 if increment_version:357 await self.conn.execute("""358 INSERT INTO document_versions359 (doc_id, content, version, created_at)360 VALUES (?, ?, ?, ?)361 """, (doc_id, content_json, new_version, now))362363 await self.conn.commit()364365 # Return updated document366 return await self.get_document(doc_id)367368 except sqlite3.Error as e:369 self.logger.error(f"Error updating document: {str(e)}")370 await self.conn.rollback()371 raise372373 async def delete_document(self, doc_id: str) -> bool:374 """Delete a document."""375 if not self.conn:376 await self.initialize()377378 try:379 cursor = await self.conn.execute(380 "DELETE FROM documents WHERE doc_id = ?",381 (doc_id,)382 )383 await self.conn.commit()384385 return cursor.rowcount > 0386387 except sqlite3.Error as e:388 self.logger.error(f"Error deleting document: {str(e)}")389 await self.conn.rollback()390 raise391392 async def list_documents(self, limit: int = 100, offset: int = 0,393 tags: Optional[List[str]] = None) -> List[DocumentRecord]:394 """List documents with optional filtering."""395 if not self.conn:396 await self.initialize()397398 try:399 documents = []400401 if tags:402 # Query with tag filtering403 placeholders = ','.join(['?'] * len(tags))404 query = f"""405 SELECT DISTINCT d.doc_id, d.title, d.content, d.file_path, d.file_type,406 d.created_at, d.updated_at, d.version407 FROM documents d408 JOIN document_tags dt ON d.doc_id = dt.doc_id409 JOIN tags t ON dt.tag_id = t.tag_id410 WHERE t.tag_name IN ({placeholders})411 ORDER BY d.updated_at DESC412 LIMIT ? OFFSET ?413 """414 cursor = await self.conn.execute(query, (*tags, limit, offset))415 else:416 # Query without tag filtering417 query = """418 SELECT doc_id, title, content, file_path, file_type,419 created_at, updated_at, version420 FROM documents421 ORDER BY updated_at DESC422 LIMIT ? OFFSET ?423 """424 cursor = await self.conn.execute(query, (limit, offset))425426 rows = await cursor.fetchall()427428 for row in rows:429 doc_id = row[0]430431 # Get tags for document432 cursor = await self.conn.execute("""433 SELECT t.tag_name434 FROM tags t435 JOIN document_tags dt ON t.tag_id = dt.tag_id436 WHERE dt.doc_id = ?437 """, (doc_id,))438439 doc_tags = [tag[0] for tag in await cursor.fetchall()]440441 document_dict = {442 'doc_id': row[0],443 'title': row[1],444 'content': row[2],445 'file_path': row[3],446 'file_type': row[4],447 'created_at': row[5],448 'updated_at': row[6],449 'version': row[7],450 'tags': doc_tags451 }452453 documents.append(DocumentRecord.from_dict(document_dict))454455 return documents456457 except sqlite3.Error as e:458 self.logger.error(f"Error listing documents: {str(e)}")459 raise460461 async def search_documents(self, query: str,462 fields: Optional[List[str]] = None) -> List[DocumentRecord]:463 """Search documents by content using FTS5."""464 if not self.conn:465 await self.initialize()466467 try:468 documents = []469470 # Prepare search parameters471 search_query = ' OR '.join([f"{query}*"] * 3) # Search with stemming472473 cursor = await self.conn.execute("""474 SELECT d.doc_id, d.title, d.content, d.file_path, d.file_type,475 d.created_at, d.updated_at, d.version476 FROM document_fts fts477 JOIN documents d ON fts.doc_id = d.doc_id478 WHERE document_fts MATCH ?479 ORDER BY rank480 LIMIT 100481 """, (search_query,))482483 rows = await cursor.fetchall()484485 for row in rows:486 doc_id = row[0]487488 # Get tags for document489 cursor = await self.conn.execute("""490 SELECT t.tag_name491 FROM tags t492 JOIN document_tags dt ON t.tag_id = dt.tag_id493 WHERE dt.doc_id = ?494 """, (doc_id,))495496 doc_tags = [tag[0] for tag in await cursor.fetchall()]497498 document_dict = {499 'doc_id': row[0],500 'title': row[1],501 'content': row[2],502 'file_path': row[3],503 'file_type': row[4],504 'created_at': row[5],505 'updated_at': row[6],506 'version': row[7],507 'tags': doc_tags508 }509510 documents.append(DocumentRecord.from_dict(document_dict))511512 return documents513514 except sqlite3.Error as e:515 self.logger.error(f"Error searching documents: {str(e)}")516 raise517518 async def get_document_versions(self, doc_id: str) -> List[Dict]:519 """Get all versions of a document."""520 if not self.conn:521 await self.initialize()522523 try:524 cursor = await self.conn.execute("""525 SELECT content, version, created_at526 FROM document_versions527 WHERE doc_id = ?528 ORDER BY version DESC529 """, (doc_id,))530531 rows = await cursor.fetchall()532533 versions = []534 for row in rows:535 version = {536 'content': row[0],537 'version': row[1],538 'created_at': row[2]539 }540541 # Parse content from JSON string if needed542 if isinstance(version['content'], str):543 try:544 version['content'] = json.loads(version['content'])545 except json.JSONDecodeError:546 # Keep as string if it's not valid JSON547 pass548549 versions.append(version)550551 return versions552553 except sqlite3.Error as e:554 self.logger.error(f"Error getting document versions: {str(e)}")555 raise556557 async def add_tags(self, doc_id: str, tags: List[str]) -> bool:558 """Add tags to a document."""559 if not self.conn:560 await self.initialize()561562 try:563 # Check if document exists564 cursor = await self.conn.execute(565 "SELECT 1 FROM documents WHERE doc_id = ?",566 (doc_id,)567 )568 if not await cursor.fetchone():569 return False570571 await self._add_tags_internal(doc_id, tags)572 await self.conn.commit()573574 return True575576 except sqlite3.Error as e:577 self.logger.error(f"Error adding tags: {str(e)}")578 await self.conn.rollback()579 raise580581 async def close(self):582 """Close the database connection."""583 if self.conn:584 await self.conn.close()585 self.conn = None586 self.logger.info("SQLite document store connection closed")587588# Usage example589async def document_store_example():590 logging.basicConfig(level=logging.INFO)591592 # Initialize store593 store = SQLiteDocumentStore("documents.db")594 await store.initialize()595596 # Create a document597 doc = DocumentRecord(598 doc_id="", # Will be auto-generated599 title="Sample Document",600 content={601 "title": "Sample Document",602 "summary": "This is a sample document for testing.",603 "sections": [604 {"heading": "Introduction", "content": "This is the introduction."}605 ]606 },607 file_path="/path/to/sample.pdf",608 file_type="pdf",609 created_at="", # Will be auto-generated610 updated_at="", # Will be auto-generated611 tags=["sample", "test"]612 )613614 # Store document615 doc_id = await store.store_document(doc)616 print(f"Stored document with ID: {doc_id}")617618 # Retrieve document619 retrieved_doc = await store.get_document(doc_id)620 print(f"Retrieved document: {retrieved_doc.title}")621622 # Update document623 retrieved_doc.content["summary"] = "Updated summary for testing."624 updated_doc = await store.update_document(doc_id, retrieved_doc.content)625 print(f"Updated document version: {updated_doc.version}")626627 # List documents628 documents = await store.list_documents(limit=10)629 print(f"Listed {len(documents)} documents")630631 # Search documents632 search_results = await store.search_documents("sample")633 print(f"Found {len(search_results)} documents matching 'sample'")634635 # Clean up636 await store.close()637638if __name__ == "__main__":639 asyncio.run(document_store_example())
4. Transformation API with FastAPI
Create a modern, responsive API for document transformations:
python1# transformation_api.py2from typing import Dict, List, Optional, Any3import logging4import json5import asyncio6import time7from datetime import datetime8from fastapi import FastAPI, HTTPException, BackgroundTasks, File, UploadFile, Form, Depends9from fastapi.middleware.cors import CORSMiddleware10from fastapi.responses import JSONResponse11from pydantic import BaseModel, Field12import uvicorn13import os1415from document_extractor import DocumentExtractor, DocumentContent16from semantic_processor import SemanticProcessor, OllamaClient, DEFAULT_DOCUMENT_SCHEMA17from document_store import SQLiteDocumentStore, DocumentRecord1819# Initialize FastAPI app20app = FastAPI(21 title="Document Processing API",22 description="API for processing, analyzing, and transforming documents using local LLMs",23 version="1.0.0"24)2526# Add CORS middleware27app.add_middleware(28 CORSMiddleware,29 allow_origins=["*"], # For production, specify allowed origins30 allow_credentials=True,31 allow_methods=["*"],32 allow_headers=["*"],33)3435# Configure logging36logging.basicConfig(37 level=logging.INFO,38 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"39)40logger = logging.getLogger(__name__)4142# Initialize components43document_extractor = DocumentExtractor()44llm_client = OllamaClient(model="vanilj/Phi-4:latest")45semantic_processor = SemanticProcessor(llm_client=llm_client)46document_store = None # Will be initialized on startup4748# Models49class TransformationRequest(BaseModel):50 doc_id: str51 transformation_type: str = Field(..., description="Type of transformation: 'reword', 'summarize', 'extract_key_points', etc.")52 parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional parameters for the transformation")5354class TransformationResponse(BaseModel):55 doc_id: str56 transformation_type: str57 transformed_content: Dict[str, Any]58 execution_time: float5960class DocumentResponse(BaseModel):61 doc_id: str62 title: str63 file_type: str64 created_at: str65 updated_at: str66 version: int67 tags: List[str]68 content_preview: str = Field(..., description="Preview of the document content")6970class SearchRequest(BaseModel):71 query: str72 limit: int = 1073 offset: int = 07475# Dependency for getting the document store76async def get_document_store():77 return document_store7879# Background task for processing uploaded documents80async def process_document_task(81 file_path: str,82 file_name: str,83 file_type: str,84 custom_schema: Optional[Dict] = None85):86 try:87 # Extract document content88 logger.info(f"Extracting content from {file_path}")89 doc_content = document_extractor.extract(file_path)9091 # Process with LLM92 logger.info(f"Processing document with LLM")93 schema = custom_schema or DEFAULT_DOCUMENT_SCHEMA94 result = await semantic_processor.process_document(doc_content, schema)9596 # Store in database97 logger.info(f"Storing processed document")98 doc = DocumentRecord(99 doc_id="", # Auto-generated100 title=result.get("title", file_name),101 content=result,102 file_path=file_path,103 file_type=file_type,104 created_at="", # Auto-generated105 updated_at="", # Auto-generated106 tags=[] # No initial tags107 )108109 doc_id = await document_store.store_document(doc)110 logger.info(f"Document processed and stored with ID: {doc_id}")111112 # Clean up temporary file if needed113 if os.path.exists(file_path) and "/tmp/" in file_path:114 os.remove(file_path)115 logger.info(f"Temporary file {file_path} removed")116117 except Exception as e:118 logger.error(f"Error processing document: {str(e)}")119 # Could implement retry logic or notification system here120121# Event handlers122@app.on_event("startup")123async def startup_event():124 global document_store125 logger.info("Initializing document store")126 document_store = SQLiteDocumentStore("documents.db")127 await document_store.initialize()128 logger.info("Document store initialized")129130@app.on_event("shutdown")131async def shutdown_event():132 logger.info("Shutting down document store")133 if document_store:134 await document_store.close()135 logger.info("Document store closed")136137# Endpoints138@app.post("/documents/upload")139async def upload_document(140 background_tasks: BackgroundTasks,141 file: UploadFile = File(...),142 custom_schema: Optional[str] = Form(None),143 store: SQLiteDocumentStore = Depends(get_document_store)144):145 """Upload and process a document."""146 try:147 # Validate file type148 file_name = file.filename149 if not (file_name.lower().endswith('.pdf') or file_name.lower().endswith('.docx')):150 raise HTTPException(status_code=400, detail="Only PDF and DOCX files are supported")151152 # Save file temporarily153 file_path = f"/tmp/{int(time.time())}_{file_name}"154 with open(file_path, "wb") as buffer:155 buffer.write(await file.read())156157 # Parse custom schema if provided158 schema = None159 if custom_schema:160 try:161 schema = json.loads(custom_schema)162 except json.JSONDecodeError:163 raise HTTPException(status_code=400, detail="Invalid JSON schema")164165 # Process document in background166 file_type = "pdf" if file_name.lower().endswith('.pdf') else "docx"167 background_tasks.add_task(168 process_document_task,169 file_path,170 file_name,171 file_type,172 schema173 )174175 return {"message": "Document upload successful. Processing started."}176177 except Exception as e:178 logger.error(f"Error in upload_document: {str(e)}")179 raise HTTPException(status_code=500, detail=str(e))180181@app.get("/documents", response_model=List[DocumentResponse])182async def list_documents(183 limit: int = 10,184 offset: int = 0,185 tags: Optional[str] = None,186 store: SQLiteDocumentStore = Depends(get_document_store)187):188 """List all documents with pagination and optional tag filtering."""189 try:190 tag_list = tags.split(',') if tags else None191 documents = await store.list_documents(limit=limit, offset=offset, tags=tag_list)192193 # Create response objects with content previews194 response = []195 for doc in documents:196 content_preview = ""197 if isinstance(doc.content, dict):198 # Try to extract a summary or the first section199 if "summary" in doc.content and doc.content["summary"]:200 content_preview = doc.content["summary"][:200] + "..." if len(doc.content["summary"]) > 200 else doc.content["summary"]201 elif "sections" in doc.content and doc.content["sections"]:202 first_section = doc.content["sections"][0]203 if "content" in first_section:204 content_preview = first_section["content"][:200] + "..." if len(first_section["content"]) > 200 else first_section["content"]205206 response.append(DocumentResponse(207 doc_id=doc.doc_id,208 title=doc.title,209 file_type=doc.file_type,210 created_at=doc.created_at,211 updated_at=doc.updated_at,212 version=doc.version,213 tags=doc.tags or [],214 content_preview=content_preview215 ))216217 return response218219 except Exception as e:220 logger.error(f"Error in list_documents: {str(e)}")221 raise HTTPException(status_code=500, detail=str(e))222223@app.get("/documents/{doc_id}")224async def get_document(225 doc_id: str,226 store: SQLiteDocumentStore = Depends(get_document_store)227):228 """Get a document by ID."""229 try:230 document = await store.get_document(doc_id)231 if not document:232 raise HTTPException(status_code=404, detail="Document not found")233234 return document235236 except HTTPException:237 raise238 except Exception as e:239 logger.error(f"Error in get_document: {str(e)}")240 raise HTTPException(status_code=500, detail=str(e))241242@app.post("/documents/search", response_model=List[DocumentResponse])243async def search_documents(244 search_request: SearchRequest,245 store: SQLiteDocumentStore = Depends(get_document_store)246):247 """Search for documents."""248 try:249 documents = await store.search_documents(search_request.query)250251 # Create response objects with content previews (similar to list_documents)252 response = []253 for doc in documents:254 content_preview = ""255 if isinstance(doc.content, dict):256 if "summary" in doc.content and doc.content["summary"]:257 content_preview = doc.content["summary"][:200] + "..." if len(doc.content["summary"]) > 200 else doc.content["summary"]258 elif "sections" in doc.content and doc.content["sections"]:259 first_section = doc.content["sections"][0]260 if "content" in first_section:261 content_preview = first_section["content"][:200] + "..." if len(first_section["content"]) > 200 else first_section["content"]262263 response.append(DocumentResponse(264 doc_id=doc.doc_id,265 title=doc.title,266 file_type=doc.file_type,267 created_at=doc.created_at,268 updated_at=doc.updated_at,269 version=doc.version,270 tags=doc.tags or [],271 content_preview=content_preview272 ))273274 return response275276 except Exception as e:277 logger.error(f"Error in search_documents: {str(e)}")278 raise HTTPException(status_code=500, detail=str(e))279280@app.post("/documents/{doc_id}/transform", response_model=TransformationResponse)281async def transform_document(282 doc_id: str,283 request: TransformationRequest,284 store: SQLiteDocumentStore = Depends(get_document_store)285):286 """Transform a document with specified transformation type."""287 try:288 start_time = time.time()289290 # Get document291 document = await store.get_document(doc_id)292 if not document:293 raise HTTPException(status_code=404, detail="Document not found")294295 # Prepare transformation prompt based on type296 transformation_prompts = {297 "reword": "Rewrite the following text to improve clarity and readability while preserving the meaning:",298 "summarize": "Provide a concise summary of the following text:",299 "extract_key_points": "Extract the key points from the following text:",300 "change_tone": f"Rewrite the following text using a {request.parameters.get('tone', 'professional')} tone:",301 "simplify": "Simplify the following text to make it more accessible:"302 }303304 if request.transformation_type not in transformation_prompts:305 raise HTTPException(status_code=400, detail=f"Unsupported transformation type: {request.transformation_type}")306307 # Get the content to transform308 content_to_transform = ""309 if request.parameters.get("section_index") is not None:310 # Transform a specific section311 section_index = request.parameters["section_index"]312 if (313 isinstance(document.content, dict) and314 "sections" in document.content and315 section_index < len(document.content["sections"])316 ):317 section = document.content["sections"][section_index]318 content_to_transform = section.get("content", "")319 else:320 raise HTTPException(status_code=400, detail="Invalid section index")321 else:322 # Transform the entire document or use the summary323 if isinstance(document.content, dict) and "summary" in document.content:324 content_to_transform = document.content["summary"]325 elif isinstance(document.content, str):326 content_to_transform = document.content327 else:328 # Try to reconstruct from sections329 if isinstance(document.content, dict) and "sections" in document.content:330 content_to_transform = "\n\n".join([331 f"## {section.get('heading', 'Section')}\n{section.get('content', '')}"332 for section in document.content["sections"]333 ])334335 if not content_to_transform:336 raise HTTPException(status_code=400, detail="No content available to transform")337338 # Prepare prompt for the LLM339 prompt = f"{transformation_prompts[request.transformation_type]}\n\n{content_to_transform}"340341 # Set up system prompt based on transformation type342 system_prompt = "You are an expert at document transformation and improvement."343344 # Process with LLM345 response = await llm_client.generate(prompt, system_prompt)346347 # Create transformed content348 transformed_content = {349 "original_length": len(content_to_transform),350 "transformed_length": len(response),351 "transformed_text": response,352 "transformation_type": request.transformation_type353 }354355 execution_time = time.time() - start_time356357 # If requested, also update the document with the transformation358 if request.parameters.get("update_document", False):359 # Update the appropriate section360 if request.parameters.get("section_index") is not None:361 section_index = request.parameters["section_index"]362 document.content["sections"][section_index]["content"] = response363 elif "summary" in document.content:364 document.content["summary"] = response365366 # Save the updated document367 await store.update_document(doc_id, document.content)368369 return TransformationResponse(370 doc_id=doc_id,371 transformation_type=request.transformation_type,372 transformed_content=transformed_content,373 execution_time=execution_time374 )375376 except HTTPException:377 raise378 except Exception as e:379 logger.error(f"Error in transform_document: {str(e)}")380 raise HTTPException(status_code=500, detail=str(e))381382@app.put("/documents/{doc_id}/tags")383async def add_tags(384 doc_id: str,385 tags: List[str],386 store: SQLiteDocumentStore = Depends(get_document_store)387):388 """Add tags to a document."""389 try:390 success = await store.add_tags(doc_id, tags)391 if not success:392 raise HTTPException(status_code=404, detail="Document not found")393394 return {"message": "Tags added successfully", "doc_id": doc_id, "tags": tags}395396 except HTTPException:397 raise398 except Exception as e:399 logger.error(f"Error in add_tags: {str(e)}")400 raise HTTPException(status_code=500, detail=str(e))401402@app.delete("/documents/{doc_id}")403async def delete_document(404 doc_id: str,405 store: SQLiteDocumentStore = Depends(get_document_store)406):407 """Delete a document."""408 try:409 success = await store.delete_document(doc_id)410 if not success:411 raise HTTPException(status_code=404, detail="Document not found")412413 return {"message": "Document deleted successfully", "doc_id": doc_id}414415 except HTTPException:416 raise417 except Exception as e:418 logger.error(f"Error in delete_document: {str(e)}")419 raise HTTPException(status_code=500, detail=str(e))420421# Run the server422if __name__ == "__main__":423 uvicorn.run("transformation_api:app", host="0.0.0.0", port=8000, reload=True)
5. Full System Integration with Docker Compose
Bring everything together in a deployable package:
yaml1# docker-compose.yml2version: '3.8'34services:5 api:6 build:7 context: .8 dockerfile: Dockerfile9 ports:10 - "8000:8000"11 volumes:12 - ./data:/app/data13 environment:14 - LOG_LEVEL=INFO15 - OLLAMA_HOST=ollama16 - OLLAMA_PORT=1143417 - DB_PATH=/app/data/documents.db18 depends_on:19 - ollama20 restart: unless-stopped2122 ollama:23 image: ollama/ollama:latest24 volumes:25 - ./ollama-models:/root/.ollama26 ports:27 - "11434:11434"28 deploy:29 resources:30 reservations:31 devices:32 - driver: nvidia33 count: 134 capabilities: [gpu]35 restart: unless-stopped3637 web:38 build:39 context: ./frontend40 dockerfile: Dockerfile41 ports:42 - "3000:3000"43 environment:44 - API_URL=http://api:800045 depends_on:46 - api47 restart: unless-stopped
6. Frontend Interface (React/Next.js)
Create a modern user interface:
jsx1// App.jsx (simplified version)2import React, { useState, useEffect } from 'react';3import {4 Container, Box, Typography, TextField, Button, CircularProgress,5 Table, TableBody, TableCell, TableContainer, TableHead, TableRow,6 Paper, Chip, Tab, Tabs, Dialog, DialogContent, DialogTitle,7 DialogActions, Snackbar, Alert8} from '@mui/material';9import { UploadFile, Search, Transform, Delete } from '@mui/icons-material';1011function App() {12 const [documents, setDocuments] = useState([]);13 const [loading, setLoading] = useState(false);14 const [activeTab, setActiveTab] = useState(0);15 const [searchQuery, setSearchQuery] = useState('');16 const [selectedDocument, setSelectedDocument] = useState(null);17 const [transformationType, setTransformationType] = useState('summarize');18 const [transformationResult, setTransformationResult] = useState(null);19 const [dialogOpen, setDialogOpen] = useState(false);20 const [uploadFile, setUploadFile] = useState(null);21 const [isUploading, setIsUploading] = useState(false);22 const [snackbar, setSnackbar] = useState({ open: false, message: '', severity: 'info' });2324 useEffect(() => {25 fetchDocuments();26 }, []);2728 const fetchDocuments = async () => {29 setLoading(true);30 try {31 const response = await fetch('/api/documents');32 const data = await response.json();33 setDocuments(data);34 } catch (error) {35 console.error('Error fetching documents:', error);36 showSnackbar('Failed to load documents', 'error');37 } finally {38 setLoading(false);39 }40 };4142 const searchDocuments = async () => {43 if (!searchQuery) {44 fetchDocuments();45 return;46 }4748 setLoading(true);49 try {50 const response = await fetch('/api/documents/search', {51 method: 'POST',52 headers: { 'Content-Type': 'application/json' },53 body: JSON.stringify({ query: searchQuery })54 });55 const data = await response.json();56 setDocuments(data);57 } catch (error) {58 console.error('Error searching documents:', error);59 showSnackbar('Search failed', 'error');60 } finally {61 setLoading(false);62 }63 };6465 const handleFileChange = (event) => {66 setUploadFile(event.target.files[0]);67 };6869 const uploadDocument = async () => {70 if (!uploadFile) return;7172 setIsUploading(true);73 const formData = new FormData();74 formData.append('file', uploadFile);7576 try {77 const response = await fetch('/api/documents/upload', {78 method: 'POST',79 body: formData,80 });8182 if (response.ok) {83 showSnackbar('Document upload started successfully', 'success');84 setUploadFile(null);85 setTimeout(fetchDocuments, 3000); // Refresh after a delay86 } else {87 const error = await response.json();88 throw new Error(error.detail || 'Upload failed');89 }90 } catch (error) {91 console.error('Error uploading document:', error);92 showSnackbar(`Upload failed: ${error.message}`, 'error');93 } finally {94 setIsUploading(false);95 }96 };9798 const openDocument = async (docId) => {99 setLoading(true);100 try {101 const response = await fetch(`/api/documents/${docId}`);102 const data = await response.json();103 setSelectedDocument(data);104 setDialogOpen(true);105 } catch (error) {106 console.error('Error fetching document:', error);107 showSnackbar('Failed to open document', 'error');108 } finally {109 setLoading(false);110 }111 };112113 const transformDocument = async () => {114 if (!selectedDocument) return;115116 setLoading(true);117 try {118 const response = await fetch(`/api/documents/${selectedDocument.doc_id}/transform`, {119 method: 'POST',120 headers: { 'Content-Type': 'application/json' },121 body: JSON.stringify({122 doc_id: selectedDocument.doc_id,123 transformation_type: transformationType,124 parameters: {}125 })126 });127128 const result = await response.json();129 setTransformationResult(result.transformed_content);130 } catch (error) {131 console.error('Error transforming document:', error);132 showSnackbar('Transformation failed', 'error');133 } finally {134 setLoading(false);135 }136 };137138 const deleteDocument = async (docId) => {139 if (!confirm('Are you sure you want to delete this document?')) return;140141 try {142 const response = await fetch(`/api/documents/${docId}`, {143 method: 'DELETE'144 });145146 if (response.ok) {147 showSnackbar('Document deleted successfully', 'success');148 fetchDocuments();149 } else {150 const error = await response.json();151 throw new Error(error.detail || 'Deletion failed');152 }153 } catch (error) {154 console.error('Error deleting document:', error);155 showSnackbar(`Deletion failed: ${error.message}`, 'error');156 }157 };158159 const showSnackbar = (message, severity) => {160 setSnackbar({ open: true, message, severity });161 };162163 const handleCloseSnackbar = () => {164 setSnackbar({ ...snackbar, open: false });165 };166167 return (168 <Container maxWidth="lg">169 <Typography variant="h4" component="h1" gutterBottom sx={{ mt: 4 }}>170 Document Processing System171 </Typography>172173 <Tabs value={activeTab} onChange={(e, newValue) => setActiveTab(newValue)} sx={{ mb: 4 }}>174 <Tab label="All Documents" />175 <Tab label="Upload Document" />176 <Tab label="Search" />177 </Tabs>178179 {/* Document List Tab */}180 {activeTab === 0 && (181 <Box>182 <Typography variant="h6" gutterBottom>183 Your Documents184 </Typography>185186 {loading ? (187 <Box display="flex" justifyContent="center" my={4}>188 <CircularProgress />189 </Box>190 ) : (191 <TableContainer component={Paper}>192 <Table>193 <TableHead>194 <TableRow>195 <TableCell>Title</TableCell>196 <TableCell>Type</TableCell>197 <TableCell>Updated</TableCell>198 <TableCell>Preview</TableCell>199 <TableCell>Actions</TableCell>200 </TableRow>201 </TableHead>202 <TableBody>203 {documents.length === 0 ? (204 <TableRow>205 <TableCell colSpan={5} align="center">206 No documents found207 </TableCell>208 </TableRow>209 ) : (210 documents.map(doc => (211 <TableRow key={doc.doc_id}>212 <TableCell>{doc.title}</TableCell>213 <TableCell>214 <Chip215 label={doc.file_type.toUpperCase()}216 color={doc.file_type === 'pdf' ? 'error' : 'primary'}217 size="small"218 />219 </TableCell>220 <TableCell>{new Date(doc.updated_at).toLocaleDateString()}</TableCell>221 <TableCell sx={{ maxWidth: 300, whiteSpace: 'nowrap', overflow: 'hidden', textOverflow: 'ellipsis' }}>222 {doc.content_preview}223 </TableCell>224 <TableCell>225 <Button226 size="small"227 onClick={() => openDocument(doc.doc_id)}228 sx={{ mr: 1 }}229 >230 Open231 </Button>232 <Button233 size="small"234 color="error"235 onClick={() => deleteDocument(doc.doc_id)}236 >237 <Delete fontSize="small" />238 </Button>239 </TableCell>240 </TableRow>241 ))242 )}243 </TableBody>244 </Table>245 </TableContainer>246 )}247 </Box>248 )}249250 {/* Upload Tab */}251 {activeTab === 1 && (252 <Box>253 <Typography variant="h6" gutterBottom>254 Upload New Document255 </Typography>256257 <Box sx={{ border: '1px dashed grey', p: 4, borderRadius: 2, textAlign: 'center', mb: 3 }}>258 <input259 accept=".pdf,.docx"260 style={{ display: 'none' }}261 id="upload-file"262 type="file"263 onChange={handleFileChange}264 />265 <label htmlFor="upload-file">266 <Button267 variant="outlined"268 component="span"269 startIcon={<UploadFile />}270 >271 Select File272 </Button>273 </label>274275 {uploadFile && (276 <Box mt={2}>277 <Typography variant="body1">278 Selected: {uploadFile.name}279 </Typography>280 <Button281 variant="contained"282 onClick={uploadDocument}283 disabled={isUploading}284 sx={{ mt: 2 }}285 >286 {isUploading ? <CircularProgress size={24} /> : 'Upload Document'}287 </Button>288 </Box>289 )}290 </Box>291292 <Typography variant="body2" color="text.secondary">293 Supported formats: PDF, DOCX294 </Typography>295 </Box>296 )}297298 {/* Search Tab */}299 {activeTab === 2 && (300 <Box>301 <Typography variant="h6" gutterBottom>302 Search Documents303 </Typography>304305 <Box display="flex" mb={3}>306 <TextField307 fullWidth308 label="Search query"309 value={searchQuery}310 onChange={(e) => setSearchQuery(e.target.value)}311 onKeyPress={(e) => e.key === 'Enter' && searchDocuments()}312 variant="outlined"313 sx={{ mr: 2 }}314 />315 <Button316 variant="contained"317 onClick={searchDocuments}318 startIcon={<Search />}319 >320 Search321 </Button>322 </Box>323324 {loading ? (325 <Box display="flex" justifyContent="center" my={4}>326 <CircularProgress />327 </Box>328 ) : (329 <TableContainer component={Paper}>330 <Table>331 <TableHead>332 <TableRow>333 <TableCell>Title</TableCell>334 <TableCell>Type</TableCell>335 <TableCell>Preview</TableCell>336 <TableCell>Actions</TableCell>337 </TableRow>338 </TableHead>339 <TableBody>340 {documents.length === 0 ? (341 <TableRow>342 <TableCell colSpan={4} align="center">343 No results found344 </TableCell>345 </TableRow>346 ) : (347 documents.map(doc => (348 <TableRow key={doc.doc_id}>349 <TableCell>{doc.title}</TableCell>350 <TableCell>351 <Chip352 label={doc.file_type.toUpperCase()}353 color={doc.file_type === 'pdf' ? 'error' : 'primary'}354 size="small"355 />356 </TableCell>357 <TableCell sx={{ maxWidth: 300, whiteSpace: 'nowrap', overflow: 'hidden', textOverflow: 'ellipsis' }}>358 {doc.content_preview}359 </TableCell>360 <TableCell>361 <Button362 size="small"363 onClick={() => openDocument(doc.doc_id)}364 >365 Open366 </Button>367 </TableCell>368 </TableRow>369 ))370 )}371 </TableBody>372 </Table>373 </TableContainer>374 )}375 </Box>376 )}377378 {/* Document Dialog */}379 <Dialog380 open={dialogOpen}381 onClose={() => setDialogOpen(false)}382 maxWidth="md"383 fullWidth384 >385 {selectedDocument && (386 <>387 <DialogTitle>388 {selectedDocument.title}389 {selectedDocument.tags?.map(tag => (390 <Chip391 key={tag}392 label={tag}393 size="small"394 sx={{ ml: 1 }}395 />396 ))}397 </DialogTitle>398 <DialogContent dividers>399 <Box mb={3}>400 <Typography variant="subtitle1" gutterBottom>401 Transform Document402 </Typography>403 <Box display="flex" alignItems="center">404 <TextField405 select406 label="Transformation Type"407 value={transformationType}408 onChange={(e) => setTransformationType(e.target.value)}409 SelectProps={{ native: true }}410 variant="outlined"411 sx={{ mr: 2, minWidth: 200 }}412 >413414 <option value="summarize">Summarize</option>415 <option value="reword">Reword</option>416 <option value="extract_key_points">Extract Key Points</option>417 <option value="change_tone">Change Tone</option>418 <option value="simplify">Simplify</option>419 </TextField>420 <Button421 variant="contained"422 onClick={transformDocument}423 startIcon={<Transform />}424 disabled={loading}425 >426 Transform427 </Button>428 </Box>429 </Box>430431 {transformationResult && (432 <Box mb={4} p={2} bgcolor="#f5f5f5" borderRadius={1}>433 <Typography variant="subtitle1" gutterBottom>434 Transformation Result435 </Typography>436 <Typography variant="body1">437 {transformationResult.transformed_text}438 </Typography>439 </Box>440 )}441442 <Typography variant="subtitle1" gutterBottom>443 Document Content444 </Typography>445446 {selectedDocument.content.summary && (447 <Box mb={3}>448 <Typography variant="h6">Summary</Typography>449 <Typography variant="body1">{selectedDocument.content.summary}</Typography>450 </Box>451 )}452453 {selectedDocument.content.sections?.map((section, index) => (454 <Box key={index} mb={3}>455 <Typography variant="h6">{section.heading}</Typography>456 <Typography variant="body1">{section.content}</Typography>457458 {section.key_points?.length > 0 && (459 <Box mt={2}>460 <Typography variant="subtitle2">Key Points:</Typography>461 <ul>462 {section.key_points.map((point, i) => (463 <li key={i}>464 <Typography variant="body2">{point}</Typography>465 </li>466 ))}467 </ul>468 </Box>469 )}470 </Box>471 ))}472473 {selectedDocument.content.entities && (474 <Box mb={3}>475 <Typography variant="h6">Entities</Typography>476477 {selectedDocument.content.entities.people?.length > 0 && (478 <Box mt={1}>479 <Typography variant="subtitle2">People:</Typography>480 {selectedDocument.content.entities.people.map((person, i) => (481 <Chip key={i} label={person} size="small" sx={{ mr: 1, mb: 1 }} />482 ))}483 </Box>484 )}485486 {selectedDocument.content.entities.organizations?.length > 0 && (487 <Box mt={1}>488 <Typography variant="subtitle2">Organizations:</Typography>489 {selectedDocument.content.entities.organizations.map((org, i) => (490 <Chip key={i} label={org} size="small" sx={{ mr: 1, mb: 1 }} />491 ))}492 </Box>493 )}494495 {selectedDocument.content.entities.locations?.length > 0 && (496 <Box mt={1}>497 <Typography variant="subtitle2">Locations:</Typography>498 {selectedDocument.content.entities.locations.map((loc, i) => (499 <Chip key={i} label={loc} size="small" sx={{ mr: 1, mb: 1 }} />500 ))}501 </Box>502 )}503 </Box>504 )}505506 <Box mt={4}>507 <Typography variant="subtitle2" color="text.secondary">508 File: {selectedDocument.file_path} β’509 Created: {new Date(selectedDocument.created_at).toLocaleString()} β’510 Version: {selectedDocument.version}511 </Typography>512 </Box>513 </DialogContent>514 <DialogActions>515 <Button onClick={() => setDialogOpen(false)}>Close</Button>516 </DialogActions>517 </>518 )}519 </Dialog>520521 {/* Snackbar for notifications */}522 <Snackbar523 open={snackbar.open}524 autoHideDuration={6000}525 onClose={handleCloseSnackbar}526 anchorOrigin={{ vertical: 'bottom', horizontal: 'right' }}527 >528 <Alert529 onClose={handleCloseSnackbar}530 severity={snackbar.severity}531 sx={{ width: '100%' }}532 >533 {snackbar.message}534 </Alert>535 </Snackbar>536 </Container>537 );538}539540export default App;
7. Comprehensive Deployment Guide
Create a /docs folder with full documentation:
markdown1# Local Document Processing Pipeline: Deployment Guide23This guide provides comprehensive instructions for deploying the Local Document Processing Pipeline, a self-contained system for extracting, processing, and transforming documents using local LLMs.45## System Requirements67- **Hardware**:8 - CPU: 4+ cores9 - RAM: 16GB+ (32GB recommended for larger models)10 - Storage: 20GB+ free space11 - GPU: NVIDIA GPU with 8GB+ VRAM (optional but highly recommended)1213- **Software**:14 - Docker and Docker Compose15 - NVIDIA Container Toolkit (for GPU acceleration)16 - Git1718## Quick Start19201. Clone the repository:21 ```bash22 git clone https://github.com/yourusername/document-pipeline.git23 cd document-pipeline
-
Start the system with Docker Compose:
bash1docker-compose up -d -
Open your browser and navigate to
http://localhost:3000 -
The system will automatically download the needed LLM models on first run
Component Overview
The system consists of three main components:
- API Server: Handles document processing, storage, and transformations
- Ollama: Runs the local LLM models
- Web Interface: Provides a user-friendly interface for the system
Configuration Options
Environment Variables
Edit the .env file to customize your deployment:
text1# API Server Configuration2LOG_LEVEL=INFO3DB_PATH=/app/data/documents.db4MAX_UPLOAD_SIZE=100MB56# Ollama Configuration7OLLAMA_MODEL=vanilj/Phi-4:latest8OLLAMA_CONCURRENCY=1910# Web Interface Configuration11NEXT_PUBLIC_API_URL=http://localhost:8000
LLM Model Selection
By default, the system uses the vanilj/Phi-4 model, which offers a good balance of quality and performance. You can change this by editing the OLLAMA_MODEL variable in the .env file.
Recommended models:
vanilj/Phi-4:latest: Great general-purpose model (4.7GB VRAM)mistral:7b: Excellent performance for complex text (14GB VRAM)phi3:mini: Smallest model with decent performance (2.8GB VRAM)
CPU-Only Deployment
If you don't have a GPU, modify the docker-compose.yml file to remove the GPU-specific settings:
yaml1ollama:2 image: ollama/ollama:latest3 volumes:4 - ./ollama-models:/root/.ollama5 ports:6 - "11434:11434"7 restart: unless-stopped8 # Remove the 'deploy' section for CPU-only mode
Troubleshooting
Common Issues
-
System is slow or unresponsive:
- Check if your system meets the hardware requirements
- Try a smaller LLM model
- Increase Docker container memory limits
-
Cannot connect to API server:
- Check if all containers are running:
docker-compose ps - Check logs:
docker-compose logs api
- Check if all containers are running:
-
Document processing fails:
- Check if the Ollama service is running properly
- Verify that the LLM model was downloaded successfully
- Check logs:
docker-compose logs ollama
Viewing Logs
bash1# All logs2docker-compose logs34# Specific component logs5docker-compose logs api6docker-compose logs ollama7docker-compose logs web89# Follow logs in real-time10docker-compose logs -f
Scaling for Production
For production environments, consider:
- Persistent Storage: Mount external volumes for database and document storage
- Load Balancing: Deploy multiple API server instances behind a load balancer
- Security: Add proper authentication, HTTPS, and firewall rules
- Monitoring: Implement Prometheus/Grafana for system metrics
Contributing
We welcome contributions! Please see our CONTRIBUTING.md file for guidelines.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Conclusion: Beyond Document Processing
The architecture presented here goes far beyond a simple document processing system. It represents a paradigm shift in how we interact with documents and knowledge:
-
Universal Content Extraction: The system extracts not just raw text but preserves document structure, formatting, and relationships, enabling intelligent processing of any document.
-
Semantic Understanding: By integrating local LLMs, the system can comprehend documents at a level approaching human understanding, extracting meaning rather than just data.
-
Flexible Transformation: The transformation layer lets users reshape content according to their needsβsummarizing dense research papers, simplifying technical documentation, or extracting key insights from lengthy reports.
-
Self-Contained Intelligence: By operating entirely locally, this architecture avoids the privacy concerns, costs, and network dependencies of cloud-based solutions.
-
Extensible Foundation: This architecture can serve as the foundation for a wide range of knowledge management applications, from research assistants to documentation systems to compliance tools.
This implementation balances elegance with power, providing production-ready code that handles real-world complexity while maintaining clean abstractions. The modular design allows for easy extension and customization, while the Docker-based deployment ensures consistent operation across environments.
By building on this foundation, you can create intelligent document systems that transform how your organization manages and extracts value from information.

Sovereign AI: Building Local-First Intelligent Systems
by Daniel Kliewer Β· Paperback Β· 72 pages
The hands-on guide to building AI that runs on your hardware, keeps your data private, and eliminates cloud dependence. Working code included.