How Praevia's Context Engine Works: A Technical Deep Dive
Explore the architecture behind Praevia's intelligent context selection and compression engine, from hybrid search to sub-50ms latency optimization.
How Praevia's Context Engine Works: A Technical Deep Dive
Understanding how Praevia achieves 50-90% token reduction while maintaining response quality requires looking under the hood at our sophisticated context optimization pipeline.
Architecture Overview
Praevia operates as an intelligent middleware layer between your application and your LLM provider. Built with FastAPI and PostgreSQL + PgVector, our engine processes every query through a multi-stage pipeline optimized for sub-50ms latency overhead.
The Core Pipeline
Query → Context Selection → Compression → Prompt Assembly → LLM → Response
↓ ↓ ↓
Hybrid Search Deduplication Token Counting
(Vector + BM25) Scoring Metrics Logging
Stage 1: Intelligent Context Selection
The foundation of Praevia's efficiency lies in our hybrid retrieval system that combines the best of both worlds.
Vector Search with PgVector
We use PostgreSQL's PgVector extension for semantic similarity search:
async def vector_search(query_embedding: List[float], limit: int = 50):
"""
Perform cosine similarity search against stored embeddings.
Returns the most semantically relevant chunks.
"""
# SQL query using PgVector distance operator
query = """
SELECT id, content, tokens,
1 - (embedding distance $1::vector) as similarity
FROM chunks
ORDER BY embedding distance $1::vector
LIMIT $2
"""
return await db.fetch(query, query_embedding, limit)
Keyword Search with BM25
Complementing vector search, our BM25-style keyword scoring captures exact term matches:
def calculate_keyword_score(chunk: str, keywords: List[str]) -> float:
"""
TF-IDF based scoring for keyword relevance.
Particularly effective for technical queries.
"""
score = 0.0
for keyword in keywords:
tf = chunk.lower().count(keyword.lower())
score += tf * keyword_weights[keyword]
return score
Hybrid Scoring
The magic happens when we combine both approaches:
async def hybrid_search(query: str, max_chunks: int = 100):
"""
Combine vector and keyword search with weighted scoring.
"""
# Extract keywords
keywords = extract_keywords(query)
# Get embeddings
query_embedding = await generate_embedding(query)
# Parallel search
vector_results = await vector_search(query_embedding)
keyword_scores = {
chunk.id: calculate_keyword_score(chunk.content, keywords)
for chunk in all_chunks
}
# Combine scores (60% vector, 40% keyword)
final_scores = {}
for chunk in vector_results:
vector_score = chunk.similarity
keyword_score = keyword_scores.get(chunk.id, 0)
final_scores[chunk.id] = 0.6 * vector_score + 0.4 * keyword_score
return rank_chunks_by_score(final_scores, max_chunks)
Stage 2: Context Compression
Once we have the relevant chunks, we aggressively reduce token count without losing information.
Deduplication Engine
def deduplicate_sentences(text: str) -> str:
"""
Remove exact and near-duplicate sentences.
Uses fuzzy matching to catch paraphrases.
"""
sentences = split_into_sentences(text)
unique_sentences = []
seen_fingerprints = set()
for sentence in sentences:
fingerprint = generate_fingerprint(sentence)
if fingerprint not in seen_fingerprints:
unique_sentences.append(sentence)
seen_fingerprints.add(fingerprint)
return " ".join(unique_sentences)
Importance Scoring
Not all sentences are created equal. We score and rank:
def score_sentence_importance(sentence: str, query: str) -> float:
"""
Score sentences based on:
- Query term overlap
- Position in document
- Sentence length (penalize too short/long)
- Presence of key entities
"""
score = 0.0
# Query overlap (most important)
query_terms = set(query.lower().split())
sentence_terms = set(sentence.lower().split())
overlap = len(query_terms & sentence_terms) / len(query_terms)
score += overlap * 10.0
# Entity bonus
if contains_named_entities(sentence):
score += 2.0
# Length penalty
word_count = len(sentence.split())
if 5 <= word_count <= 30:
score += 1.0
return score
Token-Based Trimming
Finally, we ensure we meet the target token limit:
async def compress_to_target(chunks: List[str], target_tokens: int) -> str:
"""
Compress chunks to fit within target token budget.
Preserves the highest-scoring content.
"""
# Deduplicate first
combined = " ".join(chunks)
deduped = deduplicate_sentences(combined)
# Score all sentences
sentences = split_into_sentences(deduped)
scored_sentences = [
(sentence, score_sentence_importance(sentence, query))
for sentence in sentences
]
# Sort by score and take top sentences until we hit token limit
scored_sentences.sort(key=lambda x: x[1], reverse=True)
final_text = []
total_tokens = 0
for sentence, score in scored_sentences:
sentence_tokens = count_tokens(sentence)
if total_tokens + sentence_tokens <= target_tokens:
final_text.append(sentence)
total_tokens += sentence_tokens
else:
break
# Reorder by original position for coherence
return reorder_by_position(final_text, deduped)
Stage 3: Prompt Assembly & LLM Integration
With optimized context in hand, we assemble the final prompt:
class PromptBuilder:
"""
Assembles optimized context into well-structured prompts.
"""
def build_prompt(
self,
query: str,
optimized_context: str,
system_instructions: Optional[str] = None
) -> List[Dict[str, str]]:
"""
Build messages array for LLM API.
"""
messages = []
if system_instructions:
messages.append({
"role": "system",
"content": system_instructions
})
# Context injection
context_prompt = f"""
Use the following context to answer the query:
Context:
{optimized_context}
Query: {query}
Provide a comprehensive answer based solely on the context provided.
"""
messages.append({
"role": "user",
"content": context_prompt.strip()
})
return messages
Performance Optimizations
Achieving sub-50ms overhead requires careful optimization:
Database Connection Pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # Let PgBouncer handle pooling
echo=False
)
Caching with LRU
from functools import lru_cache
@lru_cache(maxsize=10000)
def get_embedding(text: str) -> List[float]:
"""Cache embeddings to avoid redundant API calls."""
return embedding_model.encode(text)
Async Everything
async def process_query(query: str, db: AsyncSession) -> Dict:
"""
Fully async pipeline for maximum concurrency.
"""
start_time = time.perf_counter()
# Parallel operations
embedding_task = generate_embedding(query)
keywords_task = extract_keywords_async(query)
query_embedding, keywords = await asyncio.gather(
embedding_task,
keywords_task
)
# Continue pipeline...
chunks = await hybrid_search(query_embedding, keywords)
compressed = await compress_context(chunks, target_tokens=2000)
duration_ms = (time.perf_counter() - start_time) * 1000
return {
"compressed_context": compressed,
"metrics": {
"latency_ms": duration_ms,
"chunks_selected": len(chunks),
"tokens_saved": calculate_savings(chunks, compressed)
}
}
Metrics & Monitoring
Every query logs comprehensive metrics:
class QueryLog(Base):
__tablename__ = "query_logs"
id = Column(Integer, primary_key=True)
query = Column(Text, nullable=False)
tokens_before = Column(Integer)
tokens_after = Column(Integer)
compression_ratio = Column(Float)
selection_time_ms = Column(Float)
compression_time_ms = Column(Float)
total_time_ms = Column(Float)
llm_provider = Column(String(50))
created_at = Column(DateTime, default=datetime.utcnow)
Real-World Performance
In production, Praevia achieves:
| Metric | Target | Actual | |--------|--------|--------| | Latency Overhead | <50ms | 35ms avg | | Token Reduction | 50-90% | 78% avg | | Quality Retention | >95% | 98% | | Throughput | 100 req/s | 150 req/s |
Deployment Architecture
Praevia is designed for flexible deployment:
Cloud API: Fully managed, hosted by us
- Zero infrastructure management
- Auto-scaling
- Global CDN
Self-Hosted: Docker-based deployment
- Full control over data
- On-premise compliance
- Custom tuning
Hybrid: Best of both worlds
- Context engine on-premise
- LLM calls via cloud
Conclusion
Praevia's architecture demonstrates that intelligent context optimization doesn't require complex models or excessive compute. By combining proven techniques like hybrid search, aggressive deduplication, and importance scoring, we achieve dramatic cost savings while maintaining response quality.
The key is in the details: careful algorithm selection, performance optimization, and comprehensive metrics that prove the value at every step.
Want to dive deeper? Check out our GitHub repository or request a technical demo.