architecture
engineering
context-engine
technical

How Praevia's Context Engine Works: A Technical Deep Dive

Explore the architecture behind Praevia's intelligent context selection and compression engine, from hybrid search to sub-50ms latency optimization.

PEPraevia Engineering Team
14 novembre 2024

How Praevia's Context Engine Works: A Technical Deep Dive

Understanding how Praevia achieves 50-90% token reduction while maintaining response quality requires looking under the hood at our sophisticated context optimization pipeline.

Architecture Overview

Praevia operates as an intelligent middleware layer between your application and your LLM provider. Built with FastAPI and PostgreSQL + PgVector, our engine processes every query through a multi-stage pipeline optimized for sub-50ms latency overhead.

The Core Pipeline

Query → Context Selection → Compression → Prompt Assembly → LLM → Response
          ↓                    ↓              ↓
      Hybrid Search      Deduplication   Token Counting
      (Vector + BM25)    Scoring         Metrics Logging

Stage 1: Intelligent Context Selection

The foundation of Praevia's efficiency lies in our hybrid retrieval system that combines the best of both worlds.

Vector Search with PgVector

We use PostgreSQL's PgVector extension for semantic similarity search:

async def vector_search(query_embedding: List[float], limit: int = 50):
    """
    Perform cosine similarity search against stored embeddings.
    Returns the most semantically relevant chunks.
    """
    # SQL query using PgVector distance operator
    query = """
        SELECT id, content, tokens, 
               1 - (embedding distance $1::vector) as similarity
        FROM chunks
        ORDER BY embedding distance $1::vector
        LIMIT $2
    """
    return await db.fetch(query, query_embedding, limit)

Keyword Search with BM25

Complementing vector search, our BM25-style keyword scoring captures exact term matches:

def calculate_keyword_score(chunk: str, keywords: List[str]) -> float:
    """
    TF-IDF based scoring for keyword relevance.
    Particularly effective for technical queries.
    """
    score = 0.0
    for keyword in keywords:
        tf = chunk.lower().count(keyword.lower())
        score += tf * keyword_weights[keyword]
    return score

Hybrid Scoring

The magic happens when we combine both approaches:

async def hybrid_search(query: str, max_chunks: int = 100):
    """
    Combine vector and keyword search with weighted scoring.
    """
    # Extract keywords
    keywords = extract_keywords(query)
    
    # Get embeddings
    query_embedding = await generate_embedding(query)
    
    # Parallel search
    vector_results = await vector_search(query_embedding)
    keyword_scores = {
        chunk.id: calculate_keyword_score(chunk.content, keywords)
        for chunk in all_chunks
    }
    
    # Combine scores (60% vector, 40% keyword)
    final_scores = {}
    for chunk in vector_results:
        vector_score = chunk.similarity
        keyword_score = keyword_scores.get(chunk.id, 0)
        final_scores[chunk.id] = 0.6 * vector_score + 0.4 * keyword_score
    
    return rank_chunks_by_score(final_scores, max_chunks)

Stage 2: Context Compression

Once we have the relevant chunks, we aggressively reduce token count without losing information.

Deduplication Engine

def deduplicate_sentences(text: str) -> str:
    """
    Remove exact and near-duplicate sentences.
    Uses fuzzy matching to catch paraphrases.
    """
    sentences = split_into_sentences(text)
    unique_sentences = []
    seen_fingerprints = set()
    
    for sentence in sentences:
        fingerprint = generate_fingerprint(sentence)
        if fingerprint not in seen_fingerprints:
            unique_sentences.append(sentence)
            seen_fingerprints.add(fingerprint)
    
    return " ".join(unique_sentences)

Importance Scoring

Not all sentences are created equal. We score and rank:

def score_sentence_importance(sentence: str, query: str) -> float:
    """
    Score sentences based on:
    - Query term overlap
    - Position in document
    - Sentence length (penalize too short/long)
    - Presence of key entities
    """
    score = 0.0
    
    # Query overlap (most important)
    query_terms = set(query.lower().split())
    sentence_terms = set(sentence.lower().split())
    overlap = len(query_terms & sentence_terms) / len(query_terms)
    score += overlap * 10.0
    
    # Entity bonus
    if contains_named_entities(sentence):
        score += 2.0
    
    # Length penalty
    word_count = len(sentence.split())
    if 5 <= word_count <= 30:
        score += 1.0
    
    return score

Token-Based Trimming

Finally, we ensure we meet the target token limit:

async def compress_to_target(chunks: List[str], target_tokens: int) -> str:
    """
    Compress chunks to fit within target token budget.
    Preserves the highest-scoring content.
    """
    # Deduplicate first
    combined = " ".join(chunks)
    deduped = deduplicate_sentences(combined)
    
    # Score all sentences
    sentences = split_into_sentences(deduped)
    scored_sentences = [
        (sentence, score_sentence_importance(sentence, query))
        for sentence in sentences
    ]
    
    # Sort by score and take top sentences until we hit token limit
    scored_sentences.sort(key=lambda x: x[1], reverse=True)
    
    final_text = []
    total_tokens = 0
    
    for sentence, score in scored_sentences:
        sentence_tokens = count_tokens(sentence)
        if total_tokens + sentence_tokens <= target_tokens:
            final_text.append(sentence)
            total_tokens += sentence_tokens
        else:
            break
    
    # Reorder by original position for coherence
    return reorder_by_position(final_text, deduped)

Stage 3: Prompt Assembly & LLM Integration

With optimized context in hand, we assemble the final prompt:

class PromptBuilder:
    """
    Assembles optimized context into well-structured prompts.
    """
    
    def build_prompt(
        self, 
        query: str, 
        optimized_context: str,
        system_instructions: Optional[str] = None
    ) -> List[Dict[str, str]]:
        """
        Build messages array for LLM API.
        """
        messages = []
        
        if system_instructions:
            messages.append({
                "role": "system",
                "content": system_instructions
            })
        
        # Context injection
        context_prompt = f"""
        Use the following context to answer the query:
        
        Context:
        {optimized_context}
        
        Query: {query}
        
        Provide a comprehensive answer based solely on the context provided.
        """
        
        messages.append({
            "role": "user",
            "content": context_prompt.strip()
        })
        
        return messages

Performance Optimizations

Achieving sub-50ms overhead requires careful optimization:

Database Connection Pooling

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool

engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # Let PgBouncer handle pooling
    echo=False
)

Caching with LRU

from functools import lru_cache

@lru_cache(maxsize=10000)
def get_embedding(text: str) -> List[float]:
    """Cache embeddings to avoid redundant API calls."""
    return embedding_model.encode(text)

Async Everything

async def process_query(query: str, db: AsyncSession) -> Dict:
    """
    Fully async pipeline for maximum concurrency.
    """
    start_time = time.perf_counter()
    
    # Parallel operations
    embedding_task = generate_embedding(query)
    keywords_task = extract_keywords_async(query)
    
    query_embedding, keywords = await asyncio.gather(
        embedding_task,
        keywords_task
    )
    
    # Continue pipeline...
    chunks = await hybrid_search(query_embedding, keywords)
    compressed = await compress_context(chunks, target_tokens=2000)
    
    duration_ms = (time.perf_counter() - start_time) * 1000
    
    return {
        "compressed_context": compressed,
        "metrics": {
            "latency_ms": duration_ms,
            "chunks_selected": len(chunks),
            "tokens_saved": calculate_savings(chunks, compressed)
        }
    }

Metrics & Monitoring

Every query logs comprehensive metrics:

class QueryLog(Base):
    __tablename__ = "query_logs"
    
    id = Column(Integer, primary_key=True)
    query = Column(Text, nullable=False)
    tokens_before = Column(Integer)
    tokens_after = Column(Integer)
    compression_ratio = Column(Float)
    selection_time_ms = Column(Float)
    compression_time_ms = Column(Float)
    total_time_ms = Column(Float)
    llm_provider = Column(String(50))
    created_at = Column(DateTime, default=datetime.utcnow)

Real-World Performance

In production, Praevia achieves:

| Metric | Target | Actual | |--------|--------|--------| | Latency Overhead | <50ms | 35ms avg | | Token Reduction | 50-90% | 78% avg | | Quality Retention | >95% | 98% | | Throughput | 100 req/s | 150 req/s |

Deployment Architecture

Praevia is designed for flexible deployment:

Cloud API: Fully managed, hosted by us

  • Zero infrastructure management
  • Auto-scaling
  • Global CDN

Self-Hosted: Docker-based deployment

  • Full control over data
  • On-premise compliance
  • Custom tuning

Hybrid: Best of both worlds

  • Context engine on-premise
  • LLM calls via cloud

Conclusion

Praevia's architecture demonstrates that intelligent context optimization doesn't require complex models or excessive compute. By combining proven techniques like hybrid search, aggressive deduplication, and importance scoring, we achieve dramatic cost savings while maintaining response quality.

The key is in the details: careful algorithm selection, performance optimization, and comprehensive metrics that prove the value at every step.


Want to dive deeper? Check out our GitHub repository or request a technical demo.

COMPATIBLE WITH ALL MAJOR LLMS

Compatible With All Major LLMs

Praevia works seamlessly with OpenAI, Anthropic, Google, Meta, and more. Optimize your costs regardless of your provider.

Get in Touch With Our Team

We respond to every message within 24 hours.

Email

Reach out via email for any assistance you need.

Office

Toronto, Canada

Visit us at our headquarters.

Phone

Available Monday–Friday, 9 AM – 6 PM EST.