Β·37 min

Complete Blueprint: Building a Local LLM Document Processing Pipeline with Advanced Extraction, Semantic Analysis, and Scalable Storage

A comprehensive guide to building a production-ready local LLM document processing pipeline with advanced extraction, semantic analysis, vector storage, and transformation capabilities for enterprise document management.

DK

Daniel Kliewer

Author, Sovereign AI

Local LLMsDocument ProcessingSemantic AnalysisVector DatabasesText ExtractionOllamaFastAPIDockerScalable ArchitectureEnterprise AI
Sovereign AI book cover

From the Book

This is from Sovereign AI: Building Local-First Intelligent Systems.

Get the Book β€” $88
Complete Blueprint: Building a Local LLM Document Processing Pipeline with Advanced Extraction, Semantic Analysis, and Scalable Storage

Image

Building a Local Document Processing Pipeline with LLMs: The Ultimate Architecture

"The ability to process, understand, and transform documents is not merely a technical challengeβ€”it is the foundation of knowledge work in the digital age."

This comprehensive guide presents a production-grade, locally-hosted document processing pipeline that combines elegance with power. By the end, you'll have a system that extracts meaning from documents, structures information intelligently, and enables limitless transformations of your contentβ€”all without sending sensitive data to external APIs.

πŸ“‹ Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ Document │─→ β”‚ Extraction │─→ β”‚ Semantic │─→ β”‚ Storage & β”‚ β”‚ Ingestion β”‚ β”‚ Engine β”‚ β”‚ Processing β”‚ β”‚ Retrieval β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ↑ ↓ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Transformation Layer β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. High-Fidelity Document Extraction System

The foundation of our pipeline is a robust extraction engine that preserves document structure while efficiently handling multiple formats.

python
1# document_extractor.py
2from typing import Dict, Union, List, Optional
3import pdfplumber
4from docx import Document
5import fitz # PyMuPDF
6import logging
7import concurrent.futures
8from dataclasses import dataclass
9
10@dataclass
11class DocumentMetadata:
12 """Structured metadata for any document."""
13 filename: str
14 file_type: str
15 page_count: int
16 author: Optional[str] = None
17 creation_date: Optional[str] = None
18 last_modified: Optional[str] = None
19
20@dataclass
21class DocumentElement:
22 """Represents a structural element of a document."""
23 element_type: str # 'paragraph', 'heading', 'list_item', 'table', etc.
24 content: str
25 metadata: Dict = None
26 position: Dict = None # For spatial positioning in the document
27
28@dataclass
29class DocumentContent:
30 """Full representation of a document's content and structure."""
31 metadata: DocumentMetadata
32 elements: List[DocumentElement]
33 raw_text: str = None
34
35class DocumentExtractor:
36 """Universal document extraction class with advanced capabilities."""
37
38 def __init__(self, max_workers: int = 4):
39 self.logger = logging.getLogger(__name__)
40 self.max_workers = max_workers
41
42 def extract(self, file_path: str) -> DocumentContent:
43 """Extract content from document with appropriate extractor."""
44 lower_path = file_path.lower()
45
46 if lower_path.endswith('.pdf'):
47 return self._extract_pdf(file_path)
48 elif lower_path.endswith('.docx'):
49 return self._extract_docx(file_path)
50 else:
51 raise ValueError(f"Unsupported file format: {file_path}")
52
53 def _extract_pdf(self, file_path: str) -> DocumentContent:
54 """Extract content from PDF with advanced structure recognition."""
55 try:
56 # Using PyMuPDF for metadata and pdfplumber for content
57 pdf_doc = fitz.open(file_path)
58 metadata = DocumentMetadata(
59 filename=file_path.split('/')[-1],
60 file_type="pdf",
61 page_count=len(pdf_doc),
62 author=pdf_doc.metadata.get('author'),
63 creation_date=pdf_doc.metadata.get('creationDate'),
64 last_modified=pdf_doc.metadata.get('modDate')
65 )
66
67 elements = []
68 raw_text = ""
69
70 # Process pages in parallel for large documents
71 def process_page(page_num):
72 with pdfplumber.open(file_path) as pdf:
73 page = pdf.pages[page_num]
74 page_text = page.extract_text() or ""
75
76 # Extract tables separately to maintain structure
77 tables = page.extract_tables()
78
79 # Identify text blocks with their positions
80 blocks = page.extract_words(
81 keep_blank_chars=True,
82 x_tolerance=3,
83 y_tolerance=3,
84 extra_attrs=['fontname', 'size']
85 )
86
87 page_elements = []
88
89 # Process text blocks to identify paragraphs and headings
90 current_block = ""
91 current_metadata = {}
92
93 for word in blocks:
94 # Simplified logic - in production would have more sophisticated
95 # heading/paragraph detection based on font, size, etc.
96 if not current_metadata:
97 current_metadata = {
98 'font': word.get('fontname'),
99 'size': word.get('size'),
100 'page': page_num + 1
101 }
102
103 if word.get('size') != current_metadata.get('size'):
104 # Font size changed, likely a new element
105 if current_block:
106 element_type = 'heading' if current_metadata.get('size', 0) > 11 else 'paragraph'
107 page_elements.append(DocumentElement(
108 element_type=element_type,
109 content=current_block.strip(),
110 metadata=current_metadata.copy(),
111 position={'page': page_num + 1}
112 ))
113 current_block = ""
114 current_metadata = {
115 'font': word.get('fontname'),
116 'size': word.get('size'),
117 'page': page_num + 1
118 }
119
120 current_block += word.get('text', '') + " "
121
122 # Add the last block
123 if current_block:
124 element_type = 'heading' if current_metadata.get('size', 0) > 11 else 'paragraph'
125 page_elements.append(DocumentElement(
126 element_type=element_type,
127 content=current_block.strip(),
128 metadata=current_metadata,
129 position={'page': page_num + 1}
130 ))
131
132 # Add tables as structured elements
133 for i, table in enumerate(tables):
134 table_text = "\n".join([" | ".join([cell or "" for cell in row]) for row in table])
135 page_elements.append(DocumentElement(
136 element_type='table',
137 content=table_text,
138 metadata={'table_index': i},
139 position={'page': page_num + 1}
140 ))
141
142 return page_text, page_elements
143
144 # Process pages in parallel for large documents
145 results = []
146 with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
147 futures = [executor.submit(process_page, i) for i in range(len(pdf_doc))]
148 for future in concurrent.futures.as_completed(futures):
149 results.append(future.result())
150
151 # Sort results by page number (they might complete out of order)
152 for page_text, page_elements in sorted(results, key=lambda x: x[1][0].position['page'] if x[1] else 0):
153 raw_text += page_text + "\n\n"
154 elements.extend(page_elements)
155
156 return DocumentContent(metadata=metadata, elements=elements, raw_text=raw_text.strip())
157
158 except Exception as e:
159 self.logger.error(f"Error extracting PDF content: {str(e)}")
160 raise
161
162 def _extract_docx(self, file_path: str) -> DocumentContent:
163 """Extract content from DOCX with structure preservation."""
164 try:
165 doc = Document(file_path)
166
167 # Extract metadata
168 metadata = DocumentMetadata(
169 filename=file_path.split('/')[-1],
170 file_type="docx",
171 page_count=0, # Page count not directly available in python-docx
172 author=doc.core_properties.author,
173 creation_date=str(doc.core_properties.created) if doc.core_properties.created else None,
174 last_modified=str(doc.core_properties.modified) if doc.core_properties.modified else None
175 )
176
177 elements = []
178 raw_text = ""
179
180 # Process paragraphs
181 for i, para in enumerate(doc.paragraphs):
182 if not para.text.strip():
183 continue
184
185 # Determine element type based on paragraph style
186 element_type = 'paragraph'
187 if para.style.name.startswith('Heading'):
188 element_type = 'heading'
189 elif para.style.name.startswith('List'):
190 element_type = 'list_item'
191
192 # Extract formatting information
193 runs_info = []
194 for run in para.runs:
195 runs_info.append({
196 'text': run.text,
197 'bold': run.bold,
198 'italic': run.italic,
199 'underline': run.underline,
200 'font': run.font.name if run.font.name else None
201 })
202
203 elements.append(DocumentElement(
204 element_type=element_type,
205 content=para.text,
206 metadata={
207 'style': para.style.name,
208 'runs': runs_info
209 },
210 position={'index': i}
211 ))
212
213 raw_text += para.text + "\n"
214
215 # Process tables
216 for i, table in enumerate(doc.tables):
217 table_text = ""
218 for row in table.rows:
219 row_text = " | ".join([cell.text for cell in row.cells])
220 table_text += row_text + "\n"
221
222 elements.append(DocumentElement(
223 element_type='table',
224 content=table_text.strip(),
225 metadata={'table_index': i},
226 position={'index': len(doc.paragraphs) + i}
227 ))
228
229 raw_text += table_text + "\n\n"
230
231 return DocumentContent(metadata=metadata, elements=elements, raw_text=raw_text.strip())
232
233 except Exception as e:
234 self.logger.error(f"Error extracting DOCX content: {str(e)}")
235 raise
236
237# Usage example
238if __name__ == "__main__":
239 logging.basicConfig(level=logging.INFO)
240 extractor = DocumentExtractor()
241
242 # Extract PDF content
243 pdf_content = extractor.extract("sample.pdf")
244 print(f"PDF Metadata: {pdf_content.metadata}")
245 print(f"PDF Elements: {len(pdf_content.elements)}")
246
247 # Extract DOCX content
248 docx_content = extractor.extract("sample.docx")
249 print(f"DOCX Metadata: {docx_content.metadata}")
250 print(f"DOCX Elements: {len(docx_content.elements)}")

2. Semantic Processing with Local LLMs

This module integrates with local LLMs using Ollama while providing a flexible, performant interface that handles model limitations gracefully.

python
1# semantic_processor.py
2from typing import Dict, List, Any, Optional, Union
3import json
4import logging
5import time
6import httpx
7from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
8from document_extractor import DocumentContent, DocumentElement
9
10class LLMProcessingError(Exception):
11 """Raised when there is an error processing content with the LLM."""
12 pass
13
14class OllamaClient:
15 """Client for interacting with Ollama local LLM server."""
16
17 def __init__(
18 self,
19 base_url: str = "http://localhost:11434",
20 model: str = "vanilj/Phi-4:latest",
21 timeout: int = 120,
22 temperature: float = 0.1,
23 max_tokens: int = 1024
24 ):
25 self.base_url = base_url
26 self.model = model
27 self.timeout = timeout
28 self.temperature = temperature
29 self.max_tokens = max_tokens
30 self.logger = logging.getLogger(__name__)
31
32 @retry(
33 stop=stop_after_attempt(3),
34 wait=wait_exponential(multiplier=1, min=2, max=10),
35 retry=retry_if_exception_type((httpx.ReadTimeout, httpx.ConnectError))
36 )
37 async def generate(self, prompt: str, system_prompt: Optional[str] = None) -> str:
38 """Generate text from the model with retry logic for robustness."""
39 try:
40 payload = {
41 "model": self.model,
42 "prompt": prompt,
43 "stream": False,
44 "temperature": self.temperature,
45 "max_tokens": self.max_tokens
46 }
47
48 if system_prompt:
49 payload["system"] = system_prompt
50
51 async with httpx.AsyncClient(timeout=self.timeout) as client:
52 response = await client.post(f"{self.base_url}/api/generate", json=payload)
53 response.raise_for_status()
54 result = response.json()
55 return result.get("response", "")
56
57 except httpx.HTTPStatusError as e:
58 self.logger.error(f"HTTP error: {e}")
59 raise LLMProcessingError(f"Failed to get response from LLM: {str(e)}")
60 except Exception as e:
61 self.logger.error(f"Unexpected error: {e}")
62 raise LLMProcessingError(f"Error communicating with LLM: {str(e)}")
63
64class SemanticProcessor:
65 """Processes document content using a local LLM for intelligent extraction."""
66
67 def __init__(
68 self,
69 llm_client: OllamaClient = None,
70 chunk_size: int = 6000,
71 chunk_overlap: int = 1000
72 ):
73 self.llm_client = llm_client or OllamaClient()
74 self.chunk_size = chunk_size
75 self.chunk_overlap = chunk_overlap
76 self.logger = logging.getLogger(__name__)
77
78 def _chunk_document(self, doc_content: DocumentContent) -> List[str]:
79 """Split document into manageable chunks that preserve semantic meaning."""
80 elements = doc_content.elements
81 chunks = []
82 current_chunk = ""
83
84 for element in elements:
85 # If adding this element would exceed chunk size, save current chunk
86 if len(current_chunk) + len(element.content) > self.chunk_size and current_chunk:
87 chunks.append(current_chunk)
88 # Keep some overlap for context preservation
89 overlap_text = current_chunk[-self.chunk_overlap:] if self.chunk_overlap > 0 else ""
90 current_chunk = overlap_text
91
92 # Add element content with appropriate formatting
93 if element.element_type == 'heading':
94 current_chunk += f"\n## {element.content}\n\n"
95 elif element.element_type == 'list_item':
96 current_chunk += f"β€’ {element.content}\n"
97 elif element.element_type == 'table':
98 current_chunk += f"\nTABLE:\n{element.content}\n\n"
99 else: # paragraph
100 current_chunk += f"{element.content}\n\n"
101
102 # Add the final chunk if there's content
103 if current_chunk:
104 chunks.append(current_chunk)
105
106 return chunks
107
108 async def _process_chunk_to_json(self, chunk: str, schema: Dict) -> Dict:
109 """Process a document chunk into structured JSON."""
110 schema_str = json.dumps(schema, indent=2)
111
112 system_prompt = """You are a document structuring expert.
113Your task is to extract information from document text and structure it according to a given schema.
114Always respond with valid JSON that exactly matches the provided schema structure."""
115
116 user_prompt = f"""Extract structured information from the following document text.
117Format your response as a valid JSON object that strictly follows this schema:
118
119{schema_str}
120
121DOCUMENT TEXT:
122{chunk}
123
124Return ONLY the JSON output without any additional text, explanations, or formatting."""
125
126 try:
127 response = await self.llm_client.generate(user_prompt, system_prompt)
128
129 # Find JSON in the response (in case model adds comments)
130 try:
131 start_idx = response.find('{')
132 end_idx = response.rfind('}') + 1
133 if start_idx == -1 or end_idx == 0:
134 raise ValueError("No JSON found in response")
135
136 json_str = response[start_idx:end_idx]
137 result = json.loads(json_str)
138 return result
139 except json.JSONDecodeError:
140 # Try to fix common JSON errors
141 fixed_response = self._fix_json_response(response)
142 return json.loads(fixed_response)
143
144 except Exception as e:
145 self.logger.error(f"Error processing chunk to JSON: {str(e)}")
146 self.logger.error(f"Problematic chunk: {chunk[:100]}...")
147 # Return partial data instead of failing completely
148 return {"error": str(e), "partial_text": chunk[:100] + "..."}
149
150 def _fix_json_response(self, response: str) -> str:
151 """Attempt to fix common JSON errors in LLM responses."""
152 # Find what looks like the JSON part of the response
153 start_idx = response.find('{')
154 end_idx = response.rfind('}') + 1
155
156 if start_idx >= 0 and end_idx > 0:
157 json_str = response[start_idx:end_idx]
158
159 # Common fixes
160 # 1. Fix trailing commas before closing braces
161 json_str = json_str.replace(',}', '}').replace(',\n}', '\n}')
162 json_str = json_str.replace(',]', ']').replace(',\n]', '\n]')
163
164 # 2. Fix unescaped quotes in strings
165 # This is a simplistic approach - a real implementation would be more sophisticated
166 in_string = False
167 fixed_chars = []
168
169 for i, char in enumerate(json_str):
170 if char == '"' and (i == 0 or json_str[i-1] != '\\'):
171 in_string = not in_string
172
173 # If we're in a string and find an unescaped quote, escape it
174 if in_string and char == '"' and i > 0 and json_str[i-1] != '\\' and i < len(json_str)-1:
175 fixed_chars.append('\\')
176
177 fixed_chars.append(char)
178
179 return ''.join(fixed_chars)
180
181 return response
182
183 async def _merge_chunk_results(self, results: List[Dict], schema: Dict) -> Dict:
184 """Intelligently merge results from multiple chunks."""
185 if not results:
186 return {}
187
188 # If we only have one chunk, just return it
189 if len(results) == 1:
190 return results[0]
191
192 # For multiple chunks, we need to merge them intelligently
193 merged = {}
194
195 # Basic strategy - iterate through schema keys and merge accordingly
196 for key, value_type in schema.items():
197 # String fields: use the non-empty value from the first chunk that has it
198 if value_type == "string":
199 for result in results:
200 if result.get(key) and isinstance(result.get(key), str) and result[key].strip():
201 merged[key] = result[key]
202 break
203 if key not in merged:
204 merged[key] = ""
205
206 # List fields: concatenate lists from all chunks and deduplicate
207 elif isinstance(value_type, list) or (isinstance(value_type, str) and value_type.startswith("array")):
208 all_items = []
209 for result in results:
210 if result.get(key) and isinstance(result.get(key), list):
211 all_items.extend(result[key])
212
213 # Simple deduplication - this could be more sophisticated
214 deduplicated = []
215 seen = set()
216 for item in all_items:
217 item_str = str(item)
218 if item_str not in seen:
219 seen.add(item_str)
220 deduplicated.append(item)
221
222 merged[key] = deduplicated
223
224 # Object fields: recursively merge
225 elif isinstance(value_type, dict):
226 sub_results = [result.get(key, {}) for result in results if isinstance(result.get(key), dict)]
227 merged[key] = await self._merge_chunk_results(sub_results, value_type)
228
229 # Default case
230 else:
231 merged[key] = results[0].get(key, "")
232
233 return merged
234
235 async def process_document(self, doc_content: DocumentContent, schema: Dict) -> Dict:
236 """
237 Process a document into structured data according to the provided schema.
238
239 Args:
240 doc_content: The document content object from the extractor
241 schema: JSON schema defining the output structure
242
243 Returns:
244 Dict containing the structured document data
245 """
246 start_time = time.time()
247 self.logger.info(f"Starting document processing: {doc_content.metadata.filename}")
248
249 # Split document into manageable chunks
250 chunks = self._chunk_document(doc_content)
251 self.logger.info(f"Document split into {len(chunks)} chunks")
252
253 # Process each chunk in parallel
254 chunk_results = []
255 for i, chunk in enumerate(chunks):
256 self.logger.info(f"Processing chunk {i+1}/{len(chunks)}")
257 result = await self._process_chunk_to_json(chunk, schema)
258 chunk_results.append(result)
259
260 # Merge results from all chunks
261 final_result = await self._merge_chunk_results(chunk_results, schema)
262
263 # Add document metadata
264 final_result["_metadata"] = {
265 "filename": doc_content.metadata.filename,
266 "file_type": doc_content.metadata.file_type,
267 "page_count": doc_content.metadata.page_count,
268 "author": doc_content.metadata.author,
269 "processing_time": time.time() - start_time
270 }
271
272 self.logger.info(f"Document processing completed in {time.time() - start_time:.2f} seconds")
273 return final_result
274
275# Example schema
276DEFAULT_DOCUMENT_SCHEMA = {
277 "title": "string",
278 "summary": "string",
279 "main_topics": ["string"],
280 "sections": [
281 {
282 "heading": "string",
283 "content": "string",
284 "key_points": ["string"]
285 }
286 ],
287 "entities": {
288 "people": ["string"],
289 "organizations": ["string"],
290 "locations": ["string"],
291 "dates": ["string"]
292 }
293}
294
295# Usage example
296async def process_document_example():
297 from document_extractor import DocumentExtractor
298
299 logging.basicConfig(level=logging.INFO)
300
301 # Initialize components
302 extractor = DocumentExtractor()
303 llm_client = OllamaClient(model="vanilj/Phi-4:latest")
304 processor = SemanticProcessor(llm_client=llm_client)
305
306 # Extract document content
307 doc_content = extractor.extract("sample.pdf")
308
309 # Process document
310 result = await processor.process_document(doc_content, DEFAULT_DOCUMENT_SCHEMA)
311
312 # Print result
313 print(json.dumps(result, indent=2))
314
315if __name__ == "__main__":
316 import asyncio
317 asyncio.run(process_document_example())

3. Robust Storage and Retrieval System

This module provides a flexible data storage layer with support for multiple backends, efficient querying, and versioning.

python
1# document_store.py
2from typing import Dict, List, Any, Optional, Union, Tuple
3import json
4import logging
5import sqlite3
6import os
7import datetime
8from dataclasses import dataclass, asdict
9from uuid import uuid4
10import asyncio
11import aiosqlite
12
13@dataclass
14class DocumentRecord:
15 """Represents a document record in the storage system."""
16 doc_id: str
17 title: str
18 content: Dict[str, Any] # The structured JSON content
19 file_path: str
20 file_type: str
21 created_at: str
22 updated_at: str
23 version: int = 1
24 tags: List[str] = None
25
26 def to_dict(self) -> Dict:
27 """Convert to dictionary representation."""
28 result = asdict(self)
29 # Convert content to JSON string for storage
30 if isinstance(result['content'], dict):
31 result['content'] = json.dumps(result['content'])
32 if result['tags'] is None:
33 result['tags'] = []
34 return result
35
36 @classmethod
37 def from_dict(cls, data: Dict) -> 'DocumentRecord':
38 """Create from dictionary representation."""
39 # Parse content from JSON string if needed
40 if isinstance(data.get('content'), str):
41 try:
42 data['content'] = json.loads(data['content'])
43 except json.JSONDecodeError:
44 # Keep as string if it's not valid JSON
45 pass
46
47 # Ensure tags is a list
48 if data.get('tags') is None:
49 data['tags'] = []
50
51 return cls(**data)
52
53class DocumentStore:
54 """Abstract base class for document storage backends."""
55
56 async def initialize(self):
57 """Initialize the storage backend."""
58 raise NotImplementedError
59
60 async def store_document(self, document: DocumentRecord) -> str:
61 """Store a document and return its ID."""
62 raise NotImplementedError
63
64 async def get_document(self, doc_id: str) -> Optional[DocumentRecord]:
65 """Retrieve a document by ID."""
66 raise NotImplementedError
67
68 async def update_document(self, doc_id: str, content: Dict[str, Any],
69 increment_version: bool = True) -> Optional[DocumentRecord]:
70 """Update a document's content."""
71 raise NotImplementedError
72
73 async def delete_document(self, doc_id: str) -> bool:
74 """Delete a document."""
75 raise NotImplementedError
76
77 async def list_documents(self, limit: int = 100, offset: int = 0,
78 tags: Optional[List[str]] = None) -> List[DocumentRecord]:
79 """List documents with optional filtering."""
80 raise NotImplementedError
81
82 async def search_documents(self, query: str,
83 fields: Optional[List[str]] = None) -> List[DocumentRecord]:
84 """Search documents by content."""
85 raise NotImplementedError
86
87 async def get_document_versions(self, doc_id: str) -> List[Dict]:
88 """Get all versions of a document."""
89 raise NotImplementedError
90
91 async def add_tags(self, doc_id: str, tags: List[str]) -> bool:
92 """Add tags to a document."""
93 raise NotImplementedError
94
95 async def close(self):
96 """Close the storage connection."""
97 raise NotImplementedError
98
99class SQLiteDocumentStore(DocumentStore):
100 """SQLite implementation of document storage."""
101
102 def __init__(self, db_path: str = "documents.db"):
103 self.db_path = db_path
104 self.logger = logging.getLogger(__name__)
105 self.conn = None
106
107 async def initialize(self):
108 """Initialize the SQLite database."""
109 self.logger.info(f"Initializing SQLite document store at {self.db_path}")
110
111 # Ensure directory exists
112 os.makedirs(os.path.dirname(os.path.abspath(self.db_path)), exist_ok=True)
113
114 self.conn = await aiosqlite.connect(self.db_path)
115
116 # Enable foreign keys
117 await self.conn.execute("PRAGMA foreign_keys = ON")
118
119 # Create documents table
120 await self.conn.execute("""
121 CREATE TABLE IF NOT EXISTS documents (
122 doc_id TEXT PRIMARY KEY,
123 title TEXT NOT NULL,
124 content TEXT NOT NULL,
125 file_path TEXT NOT NULL,
126 file_type TEXT NOT NULL,
127 created_at TEXT NOT NULL,
128 updated_at TEXT NOT NULL,
129 version INTEGER NOT NULL DEFAULT 1
130 )
131 """)
132
133 # Create document versions table
134 await self.conn.execute("""
135 CREATE TABLE IF NOT EXISTS document_versions (
136 version_id INTEGER PRIMARY KEY AUTOINCREMENT,
137 doc_id TEXT NOT NULL,
138 content TEXT NOT NULL,
139 version INTEGER NOT NULL,
140 created_at TEXT NOT NULL,
141 FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE
142 )
143 """)
144
145 # Create tags table
146 await self.conn.execute("""
147 CREATE TABLE IF NOT EXISTS tags (
148 tag_id INTEGER PRIMARY KEY AUTOINCREMENT,
149 tag_name TEXT NOT NULL UNIQUE
150 )
151 """)
152
153 # Create document_tags junction table
154 await self.conn.execute("""
155 CREATE TABLE IF NOT EXISTS document_tags (
156 doc_id TEXT NOT NULL,
157 tag_id INTEGER NOT NULL,
158 PRIMARY KEY (doc_id, tag_id),
159 FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE,
160 FOREIGN KEY (tag_id) REFERENCES tags(tag_id) ON DELETE CASCADE
161 )
162 """)
163
164 # Create full-text search index
165 await self.conn.execute("""
166 CREATE VIRTUAL TABLE IF NOT EXISTS document_fts USING fts5(
167 doc_id UNINDEXED,
168 title,
169 content,
170 tokenize='porter unicode61'
171 )
172 """)
173
174 # Create triggers to keep FTS index updated
175 await self.conn.execute("""
176 CREATE TRIGGER IF NOT EXISTS documents_ai AFTER INSERT ON documents BEGIN
177 INSERT INTO document_fts(doc_id, title, content)
178 VALUES (new.doc_id, new.title, new.content);
179 END
180 """)
181
182 await self.conn.execute("""
183 CREATE TRIGGER IF NOT EXISTS documents_au AFTER UPDATE ON documents BEGIN
184 DELETE FROM document_fts WHERE doc_id = old.doc_id;
185 INSERT INTO document_fts(doc_id, title, content)
186 VALUES (new.doc_id, new.title, new.content);
187 END
188 """)
189
190 await self.conn.execute("""
191 CREATE TRIGGER IF NOT EXISTS documents_ad AFTER DELETE ON documents BEGIN
192 DELETE FROM document_fts WHERE doc_id = old.doc_id;
193 END
194 """)
195
196 await self.conn.commit()
197 self.logger.info("SQLite document store initialized")
198
199 async def store_document(self, document: DocumentRecord) -> str:
200 """Store a document and return its ID."""
201 if not self.conn:
202 await self.initialize()
203
204 if not document.doc_id:
205 document.doc_id = str(uuid4())
206
207 now = datetime.datetime.now().isoformat()
208 if not document.created_at:
209 document.created_at = now
210 if not document.updated_at:
211 document.updated_at = now
212
213 document_dict = document.to_dict()
214
215 try:
216 # Insert document
217 await self.conn.execute("""
218 INSERT INTO documents
219 (doc_id, title, content, file_path, file_type, created_at, updated_at, version)
220 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
221 """, (
222 document_dict['doc_id'],
223 document_dict['title'],
224 document_dict['content'],
225 document_dict['file_path'],
226 document_dict['file_type'],
227 document_dict['created_at'],
228 document_dict['updated_at'],
229 document_dict['version']
230 ))
231
232 # Store initial version
233 await self.conn.execute("""
234 INSERT INTO document_versions
235 (doc_id, content, version, created_at)
236 VALUES (?, ?, ?, ?)
237 """, (
238 document_dict['doc_id'],
239 document_dict['content'],
240 document_dict['version'],
241 document_dict['created_at']
242 ))
243
244 # Add tags if present
245 if document_dict['tags']:
246 await self._add_tags_internal(document_dict['doc_id'], document_dict['tags'])
247
248 await self.conn.commit()
249 self.logger.info(f"Stored document with ID: {document_dict['doc_id']}")
250 return document_dict['doc_id']
251
252 except sqlite3.Error as e:
253 self.logger.error(f"Error storing document: {str(e)}")
254 await self.conn.rollback()
255 raise
256
257 async def _add_tags_internal(self, doc_id: str, tags: List[str]):
258 """Internal method to add tags to a document."""
259 for tag in tags:
260 # Ensure tag exists in tags table
261 cursor = await self.conn.execute(
262 "INSERT OR IGNORE INTO tags (tag_name) VALUES (?)",
263 (tag,)
264 )
265 await self.conn.commit()
266
267 # Get tag ID
268 cursor = await self.conn.execute(
269 "SELECT tag_id FROM tags WHERE tag_name = ?",
270 (tag,)
271 )
272 row = await cursor.fetchone()
273 tag_id = row[0]
274
275 # Associate tag with document
276 await self.conn.execute(
277 "INSERT OR IGNORE INTO document_tags (doc_id, tag_id) VALUES (?, ?)",
278 (doc_id, tag_id)
279 )
280
281 async def get_document(self, doc_id: str) -> Optional[DocumentRecord]:
282 """Retrieve a document by ID."""
283 if not self.conn:
284 await self.initialize()
285
286 try:
287 # Get document
288 cursor = await self.conn.execute("""
289 SELECT d.doc_id, d.title, d.content, d.file_path, d.file_type,
290 d.created_at, d.updated_at, d.version
291 FROM documents d
292 WHERE d.doc_id = ?
293 """, (doc_id,))
294
295 row = await cursor.fetchone()
296 if not row:
297 return None
298
299 # Get tags for document
300 cursor = await self.conn.execute("""
301 SELECT t.tag_name
302 FROM tags t
303 JOIN document_tags dt ON t.tag_id = dt.tag_id
304 WHERE dt.doc_id = ?
305 """, (doc_id,))
306
307 tags = [tag[0] for tag in await cursor.fetchall()]
308
309 document_dict = {
310 'doc_id': row[0],
311 'title': row[1],
312 'content': row[2],
313 'file_path': row[3],
314 'file_type': row[4],
315 'created_at': row[5],
316 'updated_at': row[6],
317 'version': row[7],
318 'tags': tags
319 }
320
321 return DocumentRecord.from_dict(document_dict)
322
323 except sqlite3.Error as e:
324 self.logger.error(f"Error getting document: {str(e)}")
325 raise
326
327 async def update_document(self, doc_id: str, content: Dict[str, Any],
328 increment_version: bool = True) -> Optional[DocumentRecord]:
329 """Update a document's content."""
330 if not self.conn:
331 await self.initialize()
332
333 try:
334 # Get current document
335 cursor = await self.conn.execute(
336 "SELECT version FROM documents WHERE doc_id = ?",
337 (doc_id,)
338 )
339 row = await cursor.fetchone()
340 if not row:
341 return None
342
343 current_version = row[0]
344 new_version = current_version + 1 if increment_version else current_version
345 content_json = json.dumps(content)
346 now = datetime.datetime.now().isoformat()
347
348 # Update document
349 await self.conn.execute("""
350 UPDATE documents
351 SET content = ?, updated_at = ?, version = ?
352 WHERE doc_id = ?
353 """, (content_json, now, new_version, doc_id))
354
355 # Store new version if needed
356 if increment_version:
357 await self.conn.execute("""
358 INSERT INTO document_versions
359 (doc_id, content, version, created_at)
360 VALUES (?, ?, ?, ?)
361 """, (doc_id, content_json, new_version, now))
362
363 await self.conn.commit()
364
365 # Return updated document
366 return await self.get_document(doc_id)
367
368 except sqlite3.Error as e:
369 self.logger.error(f"Error updating document: {str(e)}")
370 await self.conn.rollback()
371 raise
372
373 async def delete_document(self, doc_id: str) -> bool:
374 """Delete a document."""
375 if not self.conn:
376 await self.initialize()
377
378 try:
379 cursor = await self.conn.execute(
380 "DELETE FROM documents WHERE doc_id = ?",
381 (doc_id,)
382 )
383 await self.conn.commit()
384
385 return cursor.rowcount > 0
386
387 except sqlite3.Error as e:
388 self.logger.error(f"Error deleting document: {str(e)}")
389 await self.conn.rollback()
390 raise
391
392 async def list_documents(self, limit: int = 100, offset: int = 0,
393 tags: Optional[List[str]] = None) -> List[DocumentRecord]:
394 """List documents with optional filtering."""
395 if not self.conn:
396 await self.initialize()
397
398 try:
399 documents = []
400
401 if tags:
402 # Query with tag filtering
403 placeholders = ','.join(['?'] * len(tags))
404 query = f"""
405 SELECT DISTINCT d.doc_id, d.title, d.content, d.file_path, d.file_type,
406 d.created_at, d.updated_at, d.version
407 FROM documents d
408 JOIN document_tags dt ON d.doc_id = dt.doc_id
409 JOIN tags t ON dt.tag_id = t.tag_id
410 WHERE t.tag_name IN ({placeholders})
411 ORDER BY d.updated_at DESC
412 LIMIT ? OFFSET ?
413 """
414 cursor = await self.conn.execute(query, (*tags, limit, offset))
415 else:
416 # Query without tag filtering
417 query = """
418 SELECT doc_id, title, content, file_path, file_type,
419 created_at, updated_at, version
420 FROM documents
421 ORDER BY updated_at DESC
422 LIMIT ? OFFSET ?
423 """
424 cursor = await self.conn.execute(query, (limit, offset))
425
426 rows = await cursor.fetchall()
427
428 for row in rows:
429 doc_id = row[0]
430
431 # Get tags for document
432 cursor = await self.conn.execute("""
433 SELECT t.tag_name
434 FROM tags t
435 JOIN document_tags dt ON t.tag_id = dt.tag_id
436 WHERE dt.doc_id = ?
437 """, (doc_id,))
438
439 doc_tags = [tag[0] for tag in await cursor.fetchall()]
440
441 document_dict = {
442 'doc_id': row[0],
443 'title': row[1],
444 'content': row[2],
445 'file_path': row[3],
446 'file_type': row[4],
447 'created_at': row[5],
448 'updated_at': row[6],
449 'version': row[7],
450 'tags': doc_tags
451 }
452
453 documents.append(DocumentRecord.from_dict(document_dict))
454
455 return documents
456
457 except sqlite3.Error as e:
458 self.logger.error(f"Error listing documents: {str(e)}")
459 raise
460
461 async def search_documents(self, query: str,
462 fields: Optional[List[str]] = None) -> List[DocumentRecord]:
463 """Search documents by content using FTS5."""
464 if not self.conn:
465 await self.initialize()
466
467 try:
468 documents = []
469
470 # Prepare search parameters
471 search_query = ' OR '.join([f"{query}*"] * 3) # Search with stemming
472
473 cursor = await self.conn.execute("""
474 SELECT d.doc_id, d.title, d.content, d.file_path, d.file_type,
475 d.created_at, d.updated_at, d.version
476 FROM document_fts fts
477 JOIN documents d ON fts.doc_id = d.doc_id
478 WHERE document_fts MATCH ?
479 ORDER BY rank
480 LIMIT 100
481 """, (search_query,))
482
483 rows = await cursor.fetchall()
484
485 for row in rows:
486 doc_id = row[0]
487
488 # Get tags for document
489 cursor = await self.conn.execute("""
490 SELECT t.tag_name
491 FROM tags t
492 JOIN document_tags dt ON t.tag_id = dt.tag_id
493 WHERE dt.doc_id = ?
494 """, (doc_id,))
495
496 doc_tags = [tag[0] for tag in await cursor.fetchall()]
497
498 document_dict = {
499 'doc_id': row[0],
500 'title': row[1],
501 'content': row[2],
502 'file_path': row[3],
503 'file_type': row[4],
504 'created_at': row[5],
505 'updated_at': row[6],
506 'version': row[7],
507 'tags': doc_tags
508 }
509
510 documents.append(DocumentRecord.from_dict(document_dict))
511
512 return documents
513
514 except sqlite3.Error as e:
515 self.logger.error(f"Error searching documents: {str(e)}")
516 raise
517
518 async def get_document_versions(self, doc_id: str) -> List[Dict]:
519 """Get all versions of a document."""
520 if not self.conn:
521 await self.initialize()
522
523 try:
524 cursor = await self.conn.execute("""
525 SELECT content, version, created_at
526 FROM document_versions
527 WHERE doc_id = ?
528 ORDER BY version DESC
529 """, (doc_id,))
530
531 rows = await cursor.fetchall()
532
533 versions = []
534 for row in rows:
535 version = {
536 'content': row[0],
537 'version': row[1],
538 'created_at': row[2]
539 }
540
541 # Parse content from JSON string if needed
542 if isinstance(version['content'], str):
543 try:
544 version['content'] = json.loads(version['content'])
545 except json.JSONDecodeError:
546 # Keep as string if it's not valid JSON
547 pass
548
549 versions.append(version)
550
551 return versions
552
553 except sqlite3.Error as e:
554 self.logger.error(f"Error getting document versions: {str(e)}")
555 raise
556
557 async def add_tags(self, doc_id: str, tags: List[str]) -> bool:
558 """Add tags to a document."""
559 if not self.conn:
560 await self.initialize()
561
562 try:
563 # Check if document exists
564 cursor = await self.conn.execute(
565 "SELECT 1 FROM documents WHERE doc_id = ?",
566 (doc_id,)
567 )
568 if not await cursor.fetchone():
569 return False
570
571 await self._add_tags_internal(doc_id, tags)
572 await self.conn.commit()
573
574 return True
575
576 except sqlite3.Error as e:
577 self.logger.error(f"Error adding tags: {str(e)}")
578 await self.conn.rollback()
579 raise
580
581 async def close(self):
582 """Close the database connection."""
583 if self.conn:
584 await self.conn.close()
585 self.conn = None
586 self.logger.info("SQLite document store connection closed")
587
588# Usage example
589async def document_store_example():
590 logging.basicConfig(level=logging.INFO)
591
592 # Initialize store
593 store = SQLiteDocumentStore("documents.db")
594 await store.initialize()
595
596 # Create a document
597 doc = DocumentRecord(
598 doc_id="", # Will be auto-generated
599 title="Sample Document",
600 content={
601 "title": "Sample Document",
602 "summary": "This is a sample document for testing.",
603 "sections": [
604 {"heading": "Introduction", "content": "This is the introduction."}
605 ]
606 },
607 file_path="/path/to/sample.pdf",
608 file_type="pdf",
609 created_at="", # Will be auto-generated
610 updated_at="", # Will be auto-generated
611 tags=["sample", "test"]
612 )
613
614 # Store document
615 doc_id = await store.store_document(doc)
616 print(f"Stored document with ID: {doc_id}")
617
618 # Retrieve document
619 retrieved_doc = await store.get_document(doc_id)
620 print(f"Retrieved document: {retrieved_doc.title}")
621
622 # Update document
623 retrieved_doc.content["summary"] = "Updated summary for testing."
624 updated_doc = await store.update_document(doc_id, retrieved_doc.content)
625 print(f"Updated document version: {updated_doc.version}")
626
627 # List documents
628 documents = await store.list_documents(limit=10)
629 print(f"Listed {len(documents)} documents")
630
631 # Search documents
632 search_results = await store.search_documents("sample")
633 print(f"Found {len(search_results)} documents matching 'sample'")
634
635 # Clean up
636 await store.close()
637
638if __name__ == "__main__":
639 asyncio.run(document_store_example())

4. Transformation API with FastAPI

Create a modern, responsive API for document transformations:

python
1# transformation_api.py
2from typing import Dict, List, Optional, Any
3import logging
4import json
5import asyncio
6import time
7from datetime import datetime
8from fastapi import FastAPI, HTTPException, BackgroundTasks, File, UploadFile, Form, Depends
9from fastapi.middleware.cors import CORSMiddleware
10from fastapi.responses import JSONResponse
11from pydantic import BaseModel, Field
12import uvicorn
13import os
14
15from document_extractor import DocumentExtractor, DocumentContent
16from semantic_processor import SemanticProcessor, OllamaClient, DEFAULT_DOCUMENT_SCHEMA
17from document_store import SQLiteDocumentStore, DocumentRecord
18
19# Initialize FastAPI app
20app = FastAPI(
21 title="Document Processing API",
22 description="API for processing, analyzing, and transforming documents using local LLMs",
23 version="1.0.0"
24)
25
26# Add CORS middleware
27app.add_middleware(
28 CORSMiddleware,
29 allow_origins=["*"], # For production, specify allowed origins
30 allow_credentials=True,
31 allow_methods=["*"],
32 allow_headers=["*"],
33)
34
35# Configure logging
36logging.basicConfig(
37 level=logging.INFO,
38 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
39)
40logger = logging.getLogger(__name__)
41
42# Initialize components
43document_extractor = DocumentExtractor()
44llm_client = OllamaClient(model="vanilj/Phi-4:latest")
45semantic_processor = SemanticProcessor(llm_client=llm_client)
46document_store = None # Will be initialized on startup
47
48# Models
49class TransformationRequest(BaseModel):
50 doc_id: str
51 transformation_type: str = Field(..., description="Type of transformation: 'reword', 'summarize', 'extract_key_points', etc.")
52 parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional parameters for the transformation")
53
54class TransformationResponse(BaseModel):
55 doc_id: str
56 transformation_type: str
57 transformed_content: Dict[str, Any]
58 execution_time: float
59
60class DocumentResponse(BaseModel):
61 doc_id: str
62 title: str
63 file_type: str
64 created_at: str
65 updated_at: str
66 version: int
67 tags: List[str]
68 content_preview: str = Field(..., description="Preview of the document content")
69
70class SearchRequest(BaseModel):
71 query: str
72 limit: int = 10
73 offset: int = 0
74
75# Dependency for getting the document store
76async def get_document_store():
77 return document_store
78
79# Background task for processing uploaded documents
80async def process_document_task(
81 file_path: str,
82 file_name: str,
83 file_type: str,
84 custom_schema: Optional[Dict] = None
85):
86 try:
87 # Extract document content
88 logger.info(f"Extracting content from {file_path}")
89 doc_content = document_extractor.extract(file_path)
90
91 # Process with LLM
92 logger.info(f"Processing document with LLM")
93 schema = custom_schema or DEFAULT_DOCUMENT_SCHEMA
94 result = await semantic_processor.process_document(doc_content, schema)
95
96 # Store in database
97 logger.info(f"Storing processed document")
98 doc = DocumentRecord(
99 doc_id="", # Auto-generated
100 title=result.get("title", file_name),
101 content=result,
102 file_path=file_path,
103 file_type=file_type,
104 created_at="", # Auto-generated
105 updated_at="", # Auto-generated
106 tags=[] # No initial tags
107 )
108
109 doc_id = await document_store.store_document(doc)
110 logger.info(f"Document processed and stored with ID: {doc_id}")
111
112 # Clean up temporary file if needed
113 if os.path.exists(file_path) and "/tmp/" in file_path:
114 os.remove(file_path)
115 logger.info(f"Temporary file {file_path} removed")
116
117 except Exception as e:
118 logger.error(f"Error processing document: {str(e)}")
119 # Could implement retry logic or notification system here
120
121# Event handlers
122@app.on_event("startup")
123async def startup_event():
124 global document_store
125 logger.info("Initializing document store")
126 document_store = SQLiteDocumentStore("documents.db")
127 await document_store.initialize()
128 logger.info("Document store initialized")
129
130@app.on_event("shutdown")
131async def shutdown_event():
132 logger.info("Shutting down document store")
133 if document_store:
134 await document_store.close()
135 logger.info("Document store closed")
136
137# Endpoints
138@app.post("/documents/upload")
139async def upload_document(
140 background_tasks: BackgroundTasks,
141 file: UploadFile = File(...),
142 custom_schema: Optional[str] = Form(None),
143 store: SQLiteDocumentStore = Depends(get_document_store)
144):
145 """Upload and process a document."""
146 try:
147 # Validate file type
148 file_name = file.filename
149 if not (file_name.lower().endswith('.pdf') or file_name.lower().endswith('.docx')):
150 raise HTTPException(status_code=400, detail="Only PDF and DOCX files are supported")
151
152 # Save file temporarily
153 file_path = f"/tmp/{int(time.time())}_{file_name}"
154 with open(file_path, "wb") as buffer:
155 buffer.write(await file.read())
156
157 # Parse custom schema if provided
158 schema = None
159 if custom_schema:
160 try:
161 schema = json.loads(custom_schema)
162 except json.JSONDecodeError:
163 raise HTTPException(status_code=400, detail="Invalid JSON schema")
164
165 # Process document in background
166 file_type = "pdf" if file_name.lower().endswith('.pdf') else "docx"
167 background_tasks.add_task(
168 process_document_task,
169 file_path,
170 file_name,
171 file_type,
172 schema
173 )
174
175 return {"message": "Document upload successful. Processing started."}
176
177 except Exception as e:
178 logger.error(f"Error in upload_document: {str(e)}")
179 raise HTTPException(status_code=500, detail=str(e))
180
181@app.get("/documents", response_model=List[DocumentResponse])
182async def list_documents(
183 limit: int = 10,
184 offset: int = 0,
185 tags: Optional[str] = None,
186 store: SQLiteDocumentStore = Depends(get_document_store)
187):
188 """List all documents with pagination and optional tag filtering."""
189 try:
190 tag_list = tags.split(',') if tags else None
191 documents = await store.list_documents(limit=limit, offset=offset, tags=tag_list)
192
193 # Create response objects with content previews
194 response = []
195 for doc in documents:
196 content_preview = ""
197 if isinstance(doc.content, dict):
198 # Try to extract a summary or the first section
199 if "summary" in doc.content and doc.content["summary"]:
200 content_preview = doc.content["summary"][:200] + "..." if len(doc.content["summary"]) > 200 else doc.content["summary"]
201 elif "sections" in doc.content and doc.content["sections"]:
202 first_section = doc.content["sections"][0]
203 if "content" in first_section:
204 content_preview = first_section["content"][:200] + "..." if len(first_section["content"]) > 200 else first_section["content"]
205
206 response.append(DocumentResponse(
207 doc_id=doc.doc_id,
208 title=doc.title,
209 file_type=doc.file_type,
210 created_at=doc.created_at,
211 updated_at=doc.updated_at,
212 version=doc.version,
213 tags=doc.tags or [],
214 content_preview=content_preview
215 ))
216
217 return response
218
219 except Exception as e:
220 logger.error(f"Error in list_documents: {str(e)}")
221 raise HTTPException(status_code=500, detail=str(e))
222
223@app.get("/documents/{doc_id}")
224async def get_document(
225 doc_id: str,
226 store: SQLiteDocumentStore = Depends(get_document_store)
227):
228 """Get a document by ID."""
229 try:
230 document = await store.get_document(doc_id)
231 if not document:
232 raise HTTPException(status_code=404, detail="Document not found")
233
234 return document
235
236 except HTTPException:
237 raise
238 except Exception as e:
239 logger.error(f"Error in get_document: {str(e)}")
240 raise HTTPException(status_code=500, detail=str(e))
241
242@app.post("/documents/search", response_model=List[DocumentResponse])
243async def search_documents(
244 search_request: SearchRequest,
245 store: SQLiteDocumentStore = Depends(get_document_store)
246):
247 """Search for documents."""
248 try:
249 documents = await store.search_documents(search_request.query)
250
251 # Create response objects with content previews (similar to list_documents)
252 response = []
253 for doc in documents:
254 content_preview = ""
255 if isinstance(doc.content, dict):
256 if "summary" in doc.content and doc.content["summary"]:
257 content_preview = doc.content["summary"][:200] + "..." if len(doc.content["summary"]) > 200 else doc.content["summary"]
258 elif "sections" in doc.content and doc.content["sections"]:
259 first_section = doc.content["sections"][0]
260 if "content" in first_section:
261 content_preview = first_section["content"][:200] + "..." if len(first_section["content"]) > 200 else first_section["content"]
262
263 response.append(DocumentResponse(
264 doc_id=doc.doc_id,
265 title=doc.title,
266 file_type=doc.file_type,
267 created_at=doc.created_at,
268 updated_at=doc.updated_at,
269 version=doc.version,
270 tags=doc.tags or [],
271 content_preview=content_preview
272 ))
273
274 return response
275
276 except Exception as e:
277 logger.error(f"Error in search_documents: {str(e)}")
278 raise HTTPException(status_code=500, detail=str(e))
279
280@app.post("/documents/{doc_id}/transform", response_model=TransformationResponse)
281async def transform_document(
282 doc_id: str,
283 request: TransformationRequest,
284 store: SQLiteDocumentStore = Depends(get_document_store)
285):
286 """Transform a document with specified transformation type."""
287 try:
288 start_time = time.time()
289
290 # Get document
291 document = await store.get_document(doc_id)
292 if not document:
293 raise HTTPException(status_code=404, detail="Document not found")
294
295 # Prepare transformation prompt based on type
296 transformation_prompts = {
297 "reword": "Rewrite the following text to improve clarity and readability while preserving the meaning:",
298 "summarize": "Provide a concise summary of the following text:",
299 "extract_key_points": "Extract the key points from the following text:",
300 "change_tone": f"Rewrite the following text using a {request.parameters.get('tone', 'professional')} tone:",
301 "simplify": "Simplify the following text to make it more accessible:"
302 }
303
304 if request.transformation_type not in transformation_prompts:
305 raise HTTPException(status_code=400, detail=f"Unsupported transformation type: {request.transformation_type}")
306
307 # Get the content to transform
308 content_to_transform = ""
309 if request.parameters.get("section_index") is not None:
310 # Transform a specific section
311 section_index = request.parameters["section_index"]
312 if (
313 isinstance(document.content, dict) and
314 "sections" in document.content and
315 section_index < len(document.content["sections"])
316 ):
317 section = document.content["sections"][section_index]
318 content_to_transform = section.get("content", "")
319 else:
320 raise HTTPException(status_code=400, detail="Invalid section index")
321 else:
322 # Transform the entire document or use the summary
323 if isinstance(document.content, dict) and "summary" in document.content:
324 content_to_transform = document.content["summary"]
325 elif isinstance(document.content, str):
326 content_to_transform = document.content
327 else:
328 # Try to reconstruct from sections
329 if isinstance(document.content, dict) and "sections" in document.content:
330 content_to_transform = "\n\n".join([
331 f"## {section.get('heading', 'Section')}\n{section.get('content', '')}"
332 for section in document.content["sections"]
333 ])
334
335 if not content_to_transform:
336 raise HTTPException(status_code=400, detail="No content available to transform")
337
338 # Prepare prompt for the LLM
339 prompt = f"{transformation_prompts[request.transformation_type]}\n\n{content_to_transform}"
340
341 # Set up system prompt based on transformation type
342 system_prompt = "You are an expert at document transformation and improvement."
343
344 # Process with LLM
345 response = await llm_client.generate(prompt, system_prompt)
346
347 # Create transformed content
348 transformed_content = {
349 "original_length": len(content_to_transform),
350 "transformed_length": len(response),
351 "transformed_text": response,
352 "transformation_type": request.transformation_type
353 }
354
355 execution_time = time.time() - start_time
356
357 # If requested, also update the document with the transformation
358 if request.parameters.get("update_document", False):
359 # Update the appropriate section
360 if request.parameters.get("section_index") is not None:
361 section_index = request.parameters["section_index"]
362 document.content["sections"][section_index]["content"] = response
363 elif "summary" in document.content:
364 document.content["summary"] = response
365
366 # Save the updated document
367 await store.update_document(doc_id, document.content)
368
369 return TransformationResponse(
370 doc_id=doc_id,
371 transformation_type=request.transformation_type,
372 transformed_content=transformed_content,
373 execution_time=execution_time
374 )
375
376 except HTTPException:
377 raise
378 except Exception as e:
379 logger.error(f"Error in transform_document: {str(e)}")
380 raise HTTPException(status_code=500, detail=str(e))
381
382@app.put("/documents/{doc_id}/tags")
383async def add_tags(
384 doc_id: str,
385 tags: List[str],
386 store: SQLiteDocumentStore = Depends(get_document_store)
387):
388 """Add tags to a document."""
389 try:
390 success = await store.add_tags(doc_id, tags)
391 if not success:
392 raise HTTPException(status_code=404, detail="Document not found")
393
394 return {"message": "Tags added successfully", "doc_id": doc_id, "tags": tags}
395
396 except HTTPException:
397 raise
398 except Exception as e:
399 logger.error(f"Error in add_tags: {str(e)}")
400 raise HTTPException(status_code=500, detail=str(e))
401
402@app.delete("/documents/{doc_id}")
403async def delete_document(
404 doc_id: str,
405 store: SQLiteDocumentStore = Depends(get_document_store)
406):
407 """Delete a document."""
408 try:
409 success = await store.delete_document(doc_id)
410 if not success:
411 raise HTTPException(status_code=404, detail="Document not found")
412
413 return {"message": "Document deleted successfully", "doc_id": doc_id}
414
415 except HTTPException:
416 raise
417 except Exception as e:
418 logger.error(f"Error in delete_document: {str(e)}")
419 raise HTTPException(status_code=500, detail=str(e))
420
421# Run the server
422if __name__ == "__main__":
423 uvicorn.run("transformation_api:app", host="0.0.0.0", port=8000, reload=True)

5. Full System Integration with Docker Compose

Bring everything together in a deployable package:

yaml
1# docker-compose.yml
2version: '3.8'
3
4services:
5 api:
6 build:
7 context: .
8 dockerfile: Dockerfile
9 ports:
10 - "8000:8000"
11 volumes:
12 - ./data:/app/data
13 environment:
14 - LOG_LEVEL=INFO
15 - OLLAMA_HOST=ollama
16 - OLLAMA_PORT=11434
17 - DB_PATH=/app/data/documents.db
18 depends_on:
19 - ollama
20 restart: unless-stopped
21
22 ollama:
23 image: ollama/ollama:latest
24 volumes:
25 - ./ollama-models:/root/.ollama
26 ports:
27 - "11434:11434"
28 deploy:
29 resources:
30 reservations:
31 devices:
32 - driver: nvidia
33 count: 1
34 capabilities: [gpu]
35 restart: unless-stopped
36
37 web:
38 build:
39 context: ./frontend
40 dockerfile: Dockerfile
41 ports:
42 - "3000:3000"
43 environment:
44 - API_URL=http://api:8000
45 depends_on:
46 - api
47 restart: unless-stopped

6. Frontend Interface (React/Next.js)

Create a modern user interface:

jsx
1// App.jsx (simplified version)
2import React, { useState, useEffect } from 'react';
3import {
4 Container, Box, Typography, TextField, Button, CircularProgress,
5 Table, TableBody, TableCell, TableContainer, TableHead, TableRow,
6 Paper, Chip, Tab, Tabs, Dialog, DialogContent, DialogTitle,
7 DialogActions, Snackbar, Alert
8} from '@mui/material';
9import { UploadFile, Search, Transform, Delete } from '@mui/icons-material';
10
11function App() {
12 const [documents, setDocuments] = useState([]);
13 const [loading, setLoading] = useState(false);
14 const [activeTab, setActiveTab] = useState(0);
15 const [searchQuery, setSearchQuery] = useState('');
16 const [selectedDocument, setSelectedDocument] = useState(null);
17 const [transformationType, setTransformationType] = useState('summarize');
18 const [transformationResult, setTransformationResult] = useState(null);
19 const [dialogOpen, setDialogOpen] = useState(false);
20 const [uploadFile, setUploadFile] = useState(null);
21 const [isUploading, setIsUploading] = useState(false);
22 const [snackbar, setSnackbar] = useState({ open: false, message: '', severity: 'info' });
23
24 useEffect(() => {
25 fetchDocuments();
26 }, []);
27
28 const fetchDocuments = async () => {
29 setLoading(true);
30 try {
31 const response = await fetch('/api/documents');
32 const data = await response.json();
33 setDocuments(data);
34 } catch (error) {
35 console.error('Error fetching documents:', error);
36 showSnackbar('Failed to load documents', 'error');
37 } finally {
38 setLoading(false);
39 }
40 };
41
42 const searchDocuments = async () => {
43 if (!searchQuery) {
44 fetchDocuments();
45 return;
46 }
47
48 setLoading(true);
49 try {
50 const response = await fetch('/api/documents/search', {
51 method: 'POST',
52 headers: { 'Content-Type': 'application/json' },
53 body: JSON.stringify({ query: searchQuery })
54 });
55 const data = await response.json();
56 setDocuments(data);
57 } catch (error) {
58 console.error('Error searching documents:', error);
59 showSnackbar('Search failed', 'error');
60 } finally {
61 setLoading(false);
62 }
63 };
64
65 const handleFileChange = (event) => {
66 setUploadFile(event.target.files[0]);
67 };
68
69 const uploadDocument = async () => {
70 if (!uploadFile) return;
71
72 setIsUploading(true);
73 const formData = new FormData();
74 formData.append('file', uploadFile);
75
76 try {
77 const response = await fetch('/api/documents/upload', {
78 method: 'POST',
79 body: formData,
80 });
81
82 if (response.ok) {
83 showSnackbar('Document upload started successfully', 'success');
84 setUploadFile(null);
85 setTimeout(fetchDocuments, 3000); // Refresh after a delay
86 } else {
87 const error = await response.json();
88 throw new Error(error.detail || 'Upload failed');
89 }
90 } catch (error) {
91 console.error('Error uploading document:', error);
92 showSnackbar(`Upload failed: ${error.message}`, 'error');
93 } finally {
94 setIsUploading(false);
95 }
96 };
97
98 const openDocument = async (docId) => {
99 setLoading(true);
100 try {
101 const response = await fetch(`/api/documents/${docId}`);
102 const data = await response.json();
103 setSelectedDocument(data);
104 setDialogOpen(true);
105 } catch (error) {
106 console.error('Error fetching document:', error);
107 showSnackbar('Failed to open document', 'error');
108 } finally {
109 setLoading(false);
110 }
111 };
112
113 const transformDocument = async () => {
114 if (!selectedDocument) return;
115
116 setLoading(true);
117 try {
118 const response = await fetch(`/api/documents/${selectedDocument.doc_id}/transform`, {
119 method: 'POST',
120 headers: { 'Content-Type': 'application/json' },
121 body: JSON.stringify({
122 doc_id: selectedDocument.doc_id,
123 transformation_type: transformationType,
124 parameters: {}
125 })
126 });
127
128 const result = await response.json();
129 setTransformationResult(result.transformed_content);
130 } catch (error) {
131 console.error('Error transforming document:', error);
132 showSnackbar('Transformation failed', 'error');
133 } finally {
134 setLoading(false);
135 }
136 };
137
138 const deleteDocument = async (docId) => {
139 if (!confirm('Are you sure you want to delete this document?')) return;
140
141 try {
142 const response = await fetch(`/api/documents/${docId}`, {
143 method: 'DELETE'
144 });
145
146 if (response.ok) {
147 showSnackbar('Document deleted successfully', 'success');
148 fetchDocuments();
149 } else {
150 const error = await response.json();
151 throw new Error(error.detail || 'Deletion failed');
152 }
153 } catch (error) {
154 console.error('Error deleting document:', error);
155 showSnackbar(`Deletion failed: ${error.message}`, 'error');
156 }
157 };
158
159 const showSnackbar = (message, severity) => {
160 setSnackbar({ open: true, message, severity });
161 };
162
163 const handleCloseSnackbar = () => {
164 setSnackbar({ ...snackbar, open: false });
165 };
166
167 return (
168 <Container maxWidth="lg">
169 <Typography variant="h4" component="h1" gutterBottom sx={{ mt: 4 }}>
170 Document Processing System
171 </Typography>
172
173 <Tabs value={activeTab} onChange={(e, newValue) => setActiveTab(newValue)} sx={{ mb: 4 }}>
174 <Tab label="All Documents" />
175 <Tab label="Upload Document" />
176 <Tab label="Search" />
177 </Tabs>
178
179 {/* Document List Tab */}
180 {activeTab === 0 && (
181 <Box>
182 <Typography variant="h6" gutterBottom>
183 Your Documents
184 </Typography>
185
186 {loading ? (
187 <Box display="flex" justifyContent="center" my={4}>
188 <CircularProgress />
189 </Box>
190 ) : (
191 <TableContainer component={Paper}>
192 <Table>
193 <TableHead>
194 <TableRow>
195 <TableCell>Title</TableCell>
196 <TableCell>Type</TableCell>
197 <TableCell>Updated</TableCell>
198 <TableCell>Preview</TableCell>
199 <TableCell>Actions</TableCell>
200 </TableRow>
201 </TableHead>
202 <TableBody>
203 {documents.length === 0 ? (
204 <TableRow>
205 <TableCell colSpan={5} align="center">
206 No documents found
207 </TableCell>
208 </TableRow>
209 ) : (
210 documents.map(doc => (
211 <TableRow key={doc.doc_id}>
212 <TableCell>{doc.title}</TableCell>
213 <TableCell>
214 <Chip
215 label={doc.file_type.toUpperCase()}
216 color={doc.file_type === 'pdf' ? 'error' : 'primary'}
217 size="small"
218 />
219 </TableCell>
220 <TableCell>{new Date(doc.updated_at).toLocaleDateString()}</TableCell>
221 <TableCell sx={{ maxWidth: 300, whiteSpace: 'nowrap', overflow: 'hidden', textOverflow: 'ellipsis' }}>
222 {doc.content_preview}
223 </TableCell>
224 <TableCell>
225 <Button
226 size="small"
227 onClick={() => openDocument(doc.doc_id)}
228 sx={{ mr: 1 }}
229 >
230 Open
231 </Button>
232 <Button
233 size="small"
234 color="error"
235 onClick={() => deleteDocument(doc.doc_id)}
236 >
237 <Delete fontSize="small" />
238 </Button>
239 </TableCell>
240 </TableRow>
241 ))
242 )}
243 </TableBody>
244 </Table>
245 </TableContainer>
246 )}
247 </Box>
248 )}
249
250 {/* Upload Tab */}
251 {activeTab === 1 && (
252 <Box>
253 <Typography variant="h6" gutterBottom>
254 Upload New Document
255 </Typography>
256
257 <Box sx={{ border: '1px dashed grey', p: 4, borderRadius: 2, textAlign: 'center', mb: 3 }}>
258 <input
259 accept=".pdf,.docx"
260 style={{ display: 'none' }}
261 id="upload-file"
262 type="file"
263 onChange={handleFileChange}
264 />
265 <label htmlFor="upload-file">
266 <Button
267 variant="outlined"
268 component="span"
269 startIcon={<UploadFile />}
270 >
271 Select File
272 </Button>
273 </label>
274
275 {uploadFile && (
276 <Box mt={2}>
277 <Typography variant="body1">
278 Selected: {uploadFile.name}
279 </Typography>
280 <Button
281 variant="contained"
282 onClick={uploadDocument}
283 disabled={isUploading}
284 sx={{ mt: 2 }}
285 >
286 {isUploading ? <CircularProgress size={24} /> : 'Upload Document'}
287 </Button>
288 </Box>
289 )}
290 </Box>
291
292 <Typography variant="body2" color="text.secondary">
293 Supported formats: PDF, DOCX
294 </Typography>
295 </Box>
296 )}
297
298 {/* Search Tab */}
299 {activeTab === 2 && (
300 <Box>
301 <Typography variant="h6" gutterBottom>
302 Search Documents
303 </Typography>
304
305 <Box display="flex" mb={3}>
306 <TextField
307 fullWidth
308 label="Search query"
309 value={searchQuery}
310 onChange={(e) => setSearchQuery(e.target.value)}
311 onKeyPress={(e) => e.key === 'Enter' && searchDocuments()}
312 variant="outlined"
313 sx={{ mr: 2 }}
314 />
315 <Button
316 variant="contained"
317 onClick={searchDocuments}
318 startIcon={<Search />}
319 >
320 Search
321 </Button>
322 </Box>
323
324 {loading ? (
325 <Box display="flex" justifyContent="center" my={4}>
326 <CircularProgress />
327 </Box>
328 ) : (
329 <TableContainer component={Paper}>
330 <Table>
331 <TableHead>
332 <TableRow>
333 <TableCell>Title</TableCell>
334 <TableCell>Type</TableCell>
335 <TableCell>Preview</TableCell>
336 <TableCell>Actions</TableCell>
337 </TableRow>
338 </TableHead>
339 <TableBody>
340 {documents.length === 0 ? (
341 <TableRow>
342 <TableCell colSpan={4} align="center">
343 No results found
344 </TableCell>
345 </TableRow>
346 ) : (
347 documents.map(doc => (
348 <TableRow key={doc.doc_id}>
349 <TableCell>{doc.title}</TableCell>
350 <TableCell>
351 <Chip
352 label={doc.file_type.toUpperCase()}
353 color={doc.file_type === 'pdf' ? 'error' : 'primary'}
354 size="small"
355 />
356 </TableCell>
357 <TableCell sx={{ maxWidth: 300, whiteSpace: 'nowrap', overflow: 'hidden', textOverflow: 'ellipsis' }}>
358 {doc.content_preview}
359 </TableCell>
360 <TableCell>
361 <Button
362 size="small"
363 onClick={() => openDocument(doc.doc_id)}
364 >
365 Open
366 </Button>
367 </TableCell>
368 </TableRow>
369 ))
370 )}
371 </TableBody>
372 </Table>
373 </TableContainer>
374 )}
375 </Box>
376 )}
377
378 {/* Document Dialog */}
379 <Dialog
380 open={dialogOpen}
381 onClose={() => setDialogOpen(false)}
382 maxWidth="md"
383 fullWidth
384 >
385 {selectedDocument && (
386 <>
387 <DialogTitle>
388 {selectedDocument.title}
389 {selectedDocument.tags?.map(tag => (
390 <Chip
391 key={tag}
392 label={tag}
393 size="small"
394 sx={{ ml: 1 }}
395 />
396 ))}
397 </DialogTitle>
398 <DialogContent dividers>
399 <Box mb={3}>
400 <Typography variant="subtitle1" gutterBottom>
401 Transform Document
402 </Typography>
403 <Box display="flex" alignItems="center">
404 <TextField
405 select
406 label="Transformation Type"
407 value={transformationType}
408 onChange={(e) => setTransformationType(e.target.value)}
409 SelectProps={{ native: true }}
410 variant="outlined"
411 sx={{ mr: 2, minWidth: 200 }}
412 >
413
414 <option value="summarize">Summarize</option>
415 <option value="reword">Reword</option>
416 <option value="extract_key_points">Extract Key Points</option>
417 <option value="change_tone">Change Tone</option>
418 <option value="simplify">Simplify</option>
419 </TextField>
420 <Button
421 variant="contained"
422 onClick={transformDocument}
423 startIcon={<Transform />}
424 disabled={loading}
425 >
426 Transform
427 </Button>
428 </Box>
429 </Box>
430
431 {transformationResult && (
432 <Box mb={4} p={2} bgcolor="#f5f5f5" borderRadius={1}>
433 <Typography variant="subtitle1" gutterBottom>
434 Transformation Result
435 </Typography>
436 <Typography variant="body1">
437 {transformationResult.transformed_text}
438 </Typography>
439 </Box>
440 )}
441
442 <Typography variant="subtitle1" gutterBottom>
443 Document Content
444 </Typography>
445
446 {selectedDocument.content.summary && (
447 <Box mb={3}>
448 <Typography variant="h6">Summary</Typography>
449 <Typography variant="body1">{selectedDocument.content.summary}</Typography>
450 </Box>
451 )}
452
453 {selectedDocument.content.sections?.map((section, index) => (
454 <Box key={index} mb={3}>
455 <Typography variant="h6">{section.heading}</Typography>
456 <Typography variant="body1">{section.content}</Typography>
457
458 {section.key_points?.length > 0 && (
459 <Box mt={2}>
460 <Typography variant="subtitle2">Key Points:</Typography>
461 <ul>
462 {section.key_points.map((point, i) => (
463 <li key={i}>
464 <Typography variant="body2">{point}</Typography>
465 </li>
466 ))}
467 </ul>
468 </Box>
469 )}
470 </Box>
471 ))}
472
473 {selectedDocument.content.entities && (
474 <Box mb={3}>
475 <Typography variant="h6">Entities</Typography>
476
477 {selectedDocument.content.entities.people?.length > 0 && (
478 <Box mt={1}>
479 <Typography variant="subtitle2">People:</Typography>
480 {selectedDocument.content.entities.people.map((person, i) => (
481 <Chip key={i} label={person} size="small" sx={{ mr: 1, mb: 1 }} />
482 ))}
483 </Box>
484 )}
485
486 {selectedDocument.content.entities.organizations?.length > 0 && (
487 <Box mt={1}>
488 <Typography variant="subtitle2">Organizations:</Typography>
489 {selectedDocument.content.entities.organizations.map((org, i) => (
490 <Chip key={i} label={org} size="small" sx={{ mr: 1, mb: 1 }} />
491 ))}
492 </Box>
493 )}
494
495 {selectedDocument.content.entities.locations?.length > 0 && (
496 <Box mt={1}>
497 <Typography variant="subtitle2">Locations:</Typography>
498 {selectedDocument.content.entities.locations.map((loc, i) => (
499 <Chip key={i} label={loc} size="small" sx={{ mr: 1, mb: 1 }} />
500 ))}
501 </Box>
502 )}
503 </Box>
504 )}
505
506 <Box mt={4}>
507 <Typography variant="subtitle2" color="text.secondary">
508 File: {selectedDocument.file_path} β€’
509 Created: {new Date(selectedDocument.created_at).toLocaleString()} β€’
510 Version: {selectedDocument.version}
511 </Typography>
512 </Box>
513 </DialogContent>
514 <DialogActions>
515 <Button onClick={() => setDialogOpen(false)}>Close</Button>
516 </DialogActions>
517 </>
518 )}
519 </Dialog>
520
521 {/* Snackbar for notifications */}
522 <Snackbar
523 open={snackbar.open}
524 autoHideDuration={6000}
525 onClose={handleCloseSnackbar}
526 anchorOrigin={{ vertical: 'bottom', horizontal: 'right' }}
527 >
528 <Alert
529 onClose={handleCloseSnackbar}
530 severity={snackbar.severity}
531 sx={{ width: '100%' }}
532 >
533 {snackbar.message}
534 </Alert>
535 </Snackbar>
536 </Container>
537 );
538}
539
540export default App;

7. Comprehensive Deployment Guide

Create a /docs folder with full documentation:

markdown
1# Local Document Processing Pipeline: Deployment Guide
2
3This guide provides comprehensive instructions for deploying the Local Document Processing Pipeline, a self-contained system for extracting, processing, and transforming documents using local LLMs.
4
5## System Requirements
6
7- **Hardware**:
8 - CPU: 4+ cores
9 - RAM: 16GB+ (32GB recommended for larger models)
10 - Storage: 20GB+ free space
11 - GPU: NVIDIA GPU with 8GB+ VRAM (optional but highly recommended)
12
13- **Software**:
14 - Docker and Docker Compose
15 - NVIDIA Container Toolkit (for GPU acceleration)
16 - Git
17
18## Quick Start
19
201. Clone the repository:
21 ```bash
22 git clone https://github.com/yourusername/document-pipeline.git
23 cd document-pipeline
  1. Start the system with Docker Compose:

    bash
    1docker-compose up -d
  2. Open your browser and navigate to http://localhost:3000

  3. The system will automatically download the needed LLM models on first run

Component Overview

The system consists of three main components:

  • API Server: Handles document processing, storage, and transformations
  • Ollama: Runs the local LLM models
  • Web Interface: Provides a user-friendly interface for the system

Configuration Options

Environment Variables

Edit the .env file to customize your deployment:

text
1# API Server Configuration
2LOG_LEVEL=INFO
3DB_PATH=/app/data/documents.db
4MAX_UPLOAD_SIZE=100MB
5
6# Ollama Configuration
7OLLAMA_MODEL=vanilj/Phi-4:latest
8OLLAMA_CONCURRENCY=1
9
10# Web Interface Configuration
11NEXT_PUBLIC_API_URL=http://localhost:8000

LLM Model Selection

By default, the system uses the vanilj/Phi-4 model, which offers a good balance of quality and performance. You can change this by editing the OLLAMA_MODEL variable in the .env file.

Recommended models:

  • vanilj/Phi-4:latest: Great general-purpose model (4.7GB VRAM)
  • mistral:7b: Excellent performance for complex text (14GB VRAM)
  • phi3:mini: Smallest model with decent performance (2.8GB VRAM)

CPU-Only Deployment

If you don't have a GPU, modify the docker-compose.yml file to remove the GPU-specific settings:

yaml
1ollama:
2 image: ollama/ollama:latest
3 volumes:
4 - ./ollama-models:/root/.ollama
5 ports:
6 - "11434:11434"
7 restart: unless-stopped
8 # Remove the 'deploy' section for CPU-only mode

Troubleshooting

Common Issues

  1. System is slow or unresponsive:

    • Check if your system meets the hardware requirements
    • Try a smaller LLM model
    • Increase Docker container memory limits
  2. Cannot connect to API server:

    • Check if all containers are running: docker-compose ps
    • Check logs: docker-compose logs api
  3. Document processing fails:

    • Check if the Ollama service is running properly
    • Verify that the LLM model was downloaded successfully
    • Check logs: docker-compose logs ollama

Viewing Logs

bash
1# All logs
2docker-compose logs
3
4# Specific component logs
5docker-compose logs api
6docker-compose logs ollama
7docker-compose logs web
8
9# Follow logs in real-time
10docker-compose logs -f

Scaling for Production

For production environments, consider:

  1. Persistent Storage: Mount external volumes for database and document storage
  2. Load Balancing: Deploy multiple API server instances behind a load balancer
  3. Security: Add proper authentication, HTTPS, and firewall rules
  4. Monitoring: Implement Prometheus/Grafana for system metrics

Contributing

We welcome contributions! Please see our CONTRIBUTING.md file for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Conclusion: Beyond Document Processing

The architecture presented here goes far beyond a simple document processing system. It represents a paradigm shift in how we interact with documents and knowledge:

  1. Universal Content Extraction: The system extracts not just raw text but preserves document structure, formatting, and relationships, enabling intelligent processing of any document.

  2. Semantic Understanding: By integrating local LLMs, the system can comprehend documents at a level approaching human understanding, extracting meaning rather than just data.

  3. Flexible Transformation: The transformation layer lets users reshape content according to their needsβ€”summarizing dense research papers, simplifying technical documentation, or extracting key insights from lengthy reports.

  4. Self-Contained Intelligence: By operating entirely locally, this architecture avoids the privacy concerns, costs, and network dependencies of cloud-based solutions.

  5. Extensible Foundation: This architecture can serve as the foundation for a wide range of knowledge management applications, from research assistants to documentation systems to compliance tools.

This implementation balances elegance with power, providing production-ready code that handles real-world complexity while maintaining clean abstractions. The modular design allows for easy extension and customization, while the Docker-based deployment ensures consistent operation across environments.

By building on this foundation, you can create intelligent document systems that transform how your organization manages and extracts value from information.

Sovereign AI book cover

Sovereign AI: Building Local-First Intelligent Systems

by Daniel Kliewer Β· Paperback Β· 72 pages

The hands-on guide to building AI that runs on your hardware, keeps your data private, and eliminates cloud dependence. Working code included.