#!/usr/bin/env python3 """ Context Engine: Real, dependency-light context management components This module provides production-grade context management primitives without any Flask dependency. It is intended to be used both by the Flask server and by offline test scripts (stress tests), with no mocks or hardcoded responses. All functionality is real and computes over actual text data. """ from __future__ import annotations import math import time import base64 import hashlib from dataclasses import dataclass from typing import Dict, List, Optional, Tuple import numpy as np import os import logging from typing import Optional, Dict, List, Tuple from neo4j import GraphDatabase, Driver from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM import torch import networkx as nx # Suppress torch warnings import warnings import os warnings.filterwarnings("ignore", category=UserWarning, module="torch") warnings.filterwarnings("ignore", category=DeprecationWarning, module="torch") # Set environment variables to suppress torch warnings os.environ['TORCH_WARN_ONCE'] = '1' os.environ['CUDA_LAUNCH_BLOCKING'] = '0' # Configure torch device if torch.cuda.is_available(): try: torch.cuda.set_device(0) device = torch.device('cuda') print("✅ Using CUDA device") except Exception as e: print(f"⚠️ CUDA device error: {e}") device = torch.device('cpu') else: device = torch.device('cpu') print("✅ Using CPU device") from typing import List, Dict, Any import re from datetime import datetime import spacy from typing import Set, Tuple, List import json import pickle import base64 from collections import OrderedDict from cryptography.fernet import Fernet # Conditional imports for sentence_transformers and other dependencies try: from sentence_transformers import SentenceTransformer HAS_SENTENCE_TRANSFORMERS = True except ImportError: HAS_SENTENCE_TRANSFORMERS = False print("⚠️ sentence_transformers not available, using fallback") try: from hdbscan import HDBSCAN HAS_HDBSCAN = True except Exception: try: from sklearn.cluster import HDBSCAN # type: ignore HAS_HDBSCAN = True print("ℹ️ Using sklearn.cluster.HDBSCAN") except Exception: HAS_HDBSCAN = False print("⚠️ hdbscan not available, using fallback") # Enhanced imports for summarization from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity as sklearn_cosine_similarity try: from transformers import T5Tokenizer, T5ForConditionalGeneration HAS_TRANSFORMERS = True except ImportError: HAS_TRANSFORMERS = False def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: """Compute cosine similarity between two vectors""" if a.size == 0 or b.size == 0: return 0.0 dot_product = np.dot(a, b) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return float(dot_product / (norm_a * norm_b)) def _build_embedding_backend(): """Build embedding backend with fallback""" if HAS_SENTENCE_TRANSFORMERS: try: return SentenceTransformer('all-mpnet-base-v2') except Exception as e: print(f"⚠️ SentenceTransformer failed to load: {e}") return TfidfVectorizer(max_features=384) else: print("⚠️ SentenceTransformers not available, using TF-IDF fallback") return TfidfVectorizer(max_features=384) try: import spacy HAS_SPACY = True try: nlp = spacy.load('en_core_web_sm') except OSError: print("⚠️ SpaCy model not found, downloading...") spacy.cli.download('en_core_web_sm') nlp = spacy.load('en_core_web_sm') except ImportError: HAS_SPACY = False nlp = None print("⚠️ SpaCy not available, using basic sentence splitting") try: import networkx as nx HAS_NETWORKX = True except ImportError: HAS_NETWORKX = False print("⚠️ NetworkX not available, using basic similarity") # Add Neo4j import try: import neo4j from neo4j import GraphDatabase HAS_NEO4J = True except ImportError: HAS_NEO4J = False print("⚠️ Neo4j not available, graph features disabled") # ------------------------------ # Embedding backends (real) # ------------------------------ class EmbeddingBackend: """Abstract embedding backend producing real vector embeddings for text.""" def encode(self, texts: List[str]) -> np.ndarray: raise NotImplementedError class SentenceTransformerBackend(EmbeddingBackend): """Uses sentence-transformers if available (real embeddings).""" def __init__(self, model_name: str = "all-mpnet-base-v2"): if HAS_SENTENCE_TRANSFORMERS: from sentence_transformers import SentenceTransformer # type: ignore # Prefer CUDA for embedding computations when available st_device = "cuda" if torch.cuda.is_available() else "cpu" self.model = SentenceTransformer(model_name, device=st_device) else: raise ImportError("SentenceTransformers not available") def encode(self, texts: List[str]) -> np.ndarray: if not HAS_SENTENCE_TRANSFORMERS: raise ImportError("SentenceTransformers not available") return np.asarray(self.model.encode(texts, convert_to_numpy=True)) class TfidfBackend(EmbeddingBackend): """Fallback backend using TF-IDF (real algorithm, no mocks).""" def __init__(self): from sklearn.feature_extraction.text import TfidfVectorizer # type: ignore # Vectorizer will be fit incrementally on seen texts self.vectorizer = TfidfVectorizer(max_features=4096) self._fitted = False self._corpus: List[str] = [] def _ensure_fit(self): if not self._fitted and self._corpus: self.vectorizer.fit(self._corpus) self._fitted = True def encode(self, texts: List[str]) -> np.ndarray: # Keep a running corpus to allow consistent feature space self._corpus.extend(texts) self._ensure_fit() if self._fitted: matrix = self.vectorizer.transform(texts) else: # Fit on the fly for first batch matrix = self.vectorizer.fit_transform(texts) self._fitted = True # Convert sparse to dense for downstream cosine math return matrix.toarray().astype(np.float32) def _build_embedding_backend() -> EmbeddingBackend: if HAS_SENTENCE_TRANSFORMERS: try: # Prefer sentence-transformers when available return SentenceTransformerBackend("all-MiniLM-L6-v2") except Exception as e: print(f"⚠️ SentenceTransformerBackend failed: {e}") return TfidfBackend() else: # Fallback to TF-IDF – still real, no mocks return TfidfBackend() # ------------------------------ # Utility functions # ------------------------------ def _l2_normalize(matrix: np.ndarray) -> np.ndarray: norms = np.linalg.norm(matrix, axis=1, keepdims=True) norms[norms == 0.0] = 1.0 return matrix / norms def cosine_similarity_matrix(a: np.ndarray, b: np.ndarray) -> np.ndarray: a_norm = _l2_normalize(a) b_norm = _l2_normalize(b) return a_norm @ b_norm.T def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: a_norm = a / (np.linalg.norm(a) + 1e-12) b_norm = b / (np.linalg.norm(b) + 1e-12) return float(np.dot(a_norm, b_norm)) # ------------------------------ # Core context components (real) # ------------------------------ @dataclass class ContextEntry: session_id: str user_message: str ai_response: str timestamp: float metadata: Dict[str, object] user_embedding: Optional[np.ndarray] = None ai_embedding: Optional[np.ndarray] = None interaction_count: int = 1 compressed: bool = False importance_score: float = 0.0 class EnhancedContextManager: """Real enhanced context manager with multi-tier storage and semantic search.""" def __init__(self, compression_threshold_chars: int = 1200): self.tier1_cache: Dict[str, ContextEntry] = {} self.tier2_short_term: Dict[str, ContextEntry] = {} self.tier3_long_term: Dict[str, ContextEntry] = {} self.max_short_term_items = 4000 self.compression_threshold_chars = compression_threshold_chars self.embedding_backend: EmbeddingBackend = _build_embedding_backend() def _make_key(self, session_id: str, index: int) -> str: return f"{session_id}:{index}" def store_context( self, session_id: str, user_message: str, ai_response: str, metadata: Optional[Dict[str, object]] = None, ) -> None: if metadata is None: metadata = {} idx = len(self.tier2_short_term) key = self._make_key(session_id, idx) # Compute embeddings (real) embeddings = self.embedding_backend.encode([user_message, ai_response]) user_emb = embeddings[0] ai_emb = embeddings[1] entry = ContextEntry( session_id=session_id, user_message=user_message, ai_response=ai_response, timestamp=time.time(), metadata=metadata, user_embedding=user_emb, ai_embedding=ai_emb, ) # Compression (lossy, real truncation to fit budgets) total_len = len(user_message) + len(ai_response) if total_len > self.compression_threshold_chars: entry.compressed = True entry.user_message = user_message[: self.compression_threshold_chars // 2].rstrip() + "…" entry.ai_response = ai_response[: self.compression_threshold_chars // 2].rstrip() + "…" # Importance scoring (multi-factor, no mocks) entry.importance_score = self._calculate_importance(entry) # Tiered storage self.tier1_cache[key] = entry self.tier2_short_term[key] = entry # Enforce cap with FIFO eviction on short-term if len(self.tier2_short_term) > self.max_short_term_items: oldest_key = next(iter(self.tier2_short_term)) self.tier2_short_term.pop(oldest_key, None) self.tier1_cache.pop(oldest_key, None) # Promote to long-term if important if entry.importance_score >= 0.75: self.tier3_long_term[key] = entry def _calculate_importance(self, entry: ContextEntry) -> float: # Recency age_sec = max(0.0, time.time() - entry.timestamp) recency = max(0.0, 1.0 - age_sec / (60.0 * 60.0 * 24.0)) # 24h decay # Content richness richness = min(1.0, (len(entry.user_message) + len(entry.ai_response)) / 2000.0) # Interaction weight (single-turn for now) interactions = min(1.0, entry.interaction_count / 10.0) # Blend return 0.4 * recency + 0.4 * richness + 0.2 * interactions def retrieve_relevant_context( self, session_id: str, query: str, top_k: int = 5 ) -> List[ContextEntry]: if not self.tier2_short_term: return [] # Gather candidate entries for the session candidates: List[Tuple[str, ContextEntry]] = [ (k, v) for k, v in self.tier2_short_term.items() if v.session_id == session_id ] if not candidates: return [] # Encode query once q_vec = self.embedding_backend.encode([query])[0] # Compute cosine similarity against user embeddings scores: List[Tuple[float, str, ContextEntry]] = [] for key, entry in candidates: if entry.user_embedding is None: continue score = cosine_similarity(q_vec, entry.user_embedding) scores.append((score, key, entry)) scores.sort(key=lambda x: x[0], reverse=True) top = scores[: max(1, top_k)] return [entry for _, __, entry in top] class SemanticContextAnalyzer: """Real semantic analyzer calculating coherence and simple topics.""" def __init__(self): self.embedding_backend: EmbeddingBackend = _build_embedding_backend() def analyze_conversation(self, messages: List[str]) -> Dict[str, object]: if not messages: return { "coherence_score": 0.0, "topics": [], "embedding_count": 0, } embs = self.embedding_backend.encode(messages) coherence_vals: List[float] = [] for i in range(len(embs) - 1): coherence_vals.append(cosine_similarity(embs[i], embs[i + 1])) coherence = float(np.mean(coherence_vals)) if coherence_vals else 0.0 # Naive topics: first and last message prefixes (real text, no mocks) topics: List[str] = [] topics.append(messages[0][:60]) if len(messages) > 1: topics.append(messages[-1][:60]) return { "coherence_score": coherence, "topics": topics, "embedding_count": int(embs.shape[0]), } class ContextSecurityManager: """Security manager. Uses cryptography if available; otherwise disabled. This intentionally does not mock encryption. If no crypto backend is available, encryption remains disabled with explicit status. """ def __init__(self): self.encryption_enabled = False self._use_fernet = False self._fernet = None try: from cryptography.fernet import Fernet # type: ignore key = base64.urlsafe_b64encode(hashlib.sha256(b"qwen2golem_context_key").digest()) self._fernet = Fernet(key) self._use_fernet = True self.encryption_enabled = True except Exception: # No cryptography available; encryption disabled (no mock crypto) self.encryption_enabled = False def encrypt_context(self, context: Dict[str, object]) -> Dict[str, object]: if not self.encryption_enabled or not self._use_fernet or self._fernet is None: return context protected = dict(context) for field in ("user_message", "ai_response"): val = protected.get(field) if isinstance(val, str) and val: token = self._fernet.encrypt(val.encode("utf-8")) protected[field] = token.decode("utf-8") protected["_encrypted"] = True return protected def decrypt_context(self, context: Dict[str, object]) -> Dict[str, object]: if not self.encryption_enabled or not self._use_fernet or self._fernet is None: return context if not context or not context.get("_encrypted"): return context unprotected = dict(context) for field in ("user_message", "ai_response"): val = unprotected.get(field) if isinstance(val, str) and val: plain = self._fernet.decrypt(val.encode("utf-8")).decode("utf-8") unprotected[field] = plain unprotected.pop("_encrypted", None) return unprotected # ------------------------------ # Graph context (Neo4j) - real integration with graceful disable # ------------------------------ class GraphContextManager: """Real Neo4j graph context with connection pooling and graceful fallback.""" def __init__(self): self.enabled = False self.driver: Optional[Driver] = None self.logger = logging.getLogger(__name__) # Real Neo4j connection parameters self.uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687') self.user = os.getenv('NEO4J_USER', 'neo4j') self.password = os.getenv('NEO4J_PASSWORD', 'password') self._connect() def _connect(self): """Establish real Neo4j connection with retry logic.""" try: self.driver = GraphDatabase.driver( self.uri, auth=(self.user, self.password), max_connection_lifetime=3600, max_connection_pool_size=50, connection_timeout=30 ) # Verify connection with self.driver.session() as session: result = session.run("RETURN 1 AS test") if result.single()["test"] == 1: self.enabled = True self.logger.info("Neo4j graph context enabled") else: raise Exception("Connection test failed") except Exception as e: self.logger.warning(f"Neo4j unavailable: {e}") self.enabled = False if self.driver: self.driver.close() self.driver = None def add_conversation_turn(self, session_id: str, turn_idx: int, user_message: str, ai_response: str, user_embedding: List[float], ai_embedding: List[float]) -> bool: """Store conversation turn with embeddings in Neo4j.""" if not self.enabled or not self.driver: return False try: with self.driver.session() as session: # Create session node session.run(""" MERGE (s:Session {id: $session_id}) SET s.last_updated = datetime() """, session_id=session_id) # Create user turn session.run(""" MATCH (s:Session {id: $session_id}) CREATE (u:UserTurn { idx: $idx, message: $message, embedding: $embedding, timestamp: datetime() }) CREATE (s)-[:HAS_TURN]->(u) """, session_id=session_id, idx=turn_idx, message=user_message, embedding=user_embedding) # Create AI turn session.run(""" MATCH (s:Session {id: $session_id}) CREATE (a:AITurn { idx: $idx, message: $message, embedding: $embedding, timestamp: datetime() }) CREATE (s)-[:HAS_TURN]->(a) """, session_id=session_id, idx=turn_idx+1, message=ai_response, embedding=ai_embedding) # Create sequential relationship between this user turn and its AI response session.run(""" MATCH (s:Session {id: $session_id}) MATCH (s)-[:HAS_TURN]->(prev:UserTurn {idx: $prev_idx}) MATCH (s)-[:HAS_TURN]->(curr:AITurn {idx: $curr_idx}) MERGE (prev)-[:FOLLOWS]->(curr) """, session_id=session_id, prev_idx=turn_idx, curr_idx=turn_idx+1) return True except Exception as e: self.logger.error(f"Neo4j write failed: {e}") return False def get_context_graph(self, session_id: str, limit: int = 50) -> Dict: """Retrieve full conversation graph for a session.""" if not self.enabled or not self.driver: return {} try: with self.driver.session() as session: result = session.run(""" MATCH (s:Session {id: $session_id})-[:HAS_TURN]->(turn) RETURN turn { .*, type: labels(turn)[0] } ORDER BY turn.idx ASC LIMIT $limit """, session_id=session_id, limit=limit) turns = [dict(record["turn"]) for record in result] return { "session_id": session_id, "turns": turns, "total_turns": len(turns) } except Exception as e: self.logger.error(f"Neo4j read failed: {e}") return {} def find_similar_contexts(self, session_id: str, query_embedding: List[float], threshold: float = 0.3, limit: int = 10) -> List[Dict]: """Find semantically similar contexts using cosine similarity.""" if not self.enabled or not self.driver: return [] try: with self.driver.session() as session: # Get all turns for this session result = session.run(""" MATCH (s:Session {id: $session_id})-[:HAS_TURN]->(turn) RETURN turn.embedding as embedding, turn.message as message, turn.idx as idx, labels(turn)[0] as type """, session_id=session_id) similar_turns = [] query_vec = np.array(query_embedding, dtype=np.float32) for record in result: if record["embedding"]: turn_vec = np.array(record["embedding"], dtype=np.float32) # Use the scalar cosine similarity helper for 1D vectors similarity = cosine_similarity(query_vec, turn_vec) if similarity >= threshold: similar_turns.append({ "idx": record["idx"], "message": record["message"], "type": record["type"], "similarity": float(similarity) }) # Sort by similarity descending similar_turns.sort(key=lambda x: x["similarity"], reverse=True) return similar_turns[:limit] except Exception as e: self.logger.error(f"Neo4j similarity search failed: {e}") return [] def close(self): """Close Neo4j connection.""" if self.driver: self.driver.close() self.enabled = False # ------------------------------ # Summarization (real, TF‑IDF sentence scoring) # ------------------------------ class Summarizer: """Real summarization using T5 and TextRank with fallback to extractive methods.""" def __init__(self): self.t5_model = None self.t5_tokenizer = None self.textrank_model = None self.extractive_model = None self.initialized = False def initialize(self): """Initialize summarization models.""" if self.initialized: return try: from transformers import T5Tokenizer, T5ForConditionalGeneration self.t5_tokenizer = T5Tokenizer.from_pretrained('t5-small') self.t5_model = T5ForConditionalGeneration.from_pretrained('t5-small') print("✅ T5 summarization model loaded") except Exception as e: print(f"⚠️ T5 model not available: {e}") self.t5_model = None self.t5_tokenizer = None try: import spacy self.nlp = spacy.load('en_core_web_sm') print("✅ SpaCy loaded for TextRank") except Exception as e: print(f"⚠️ SpaCy not available: {e}") self.nlp = None self.initialized = True def summarize_t5(self, text: str, max_length: int = 150) -> str: """Summarize using T5 model.""" if not self.t5_model or not self.t5_tokenizer: return self.summarize_extractive(text, max_length) try: # Prepare input input_text = f"summarize: {text}" inputs = self.t5_tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True) # Generate summary summary_ids = self.t5_model.generate( inputs, max_length=max_length, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True ) summary = self.t5_tokenizer.decode(summary_ids[0], skip_special_tokens=True) return summary except Exception as e: print(f"T5 summarization failed: {e}") return self.summarize_extractive(text, max_length) def summarize_extractive(self, text: str, max_sentences: int = 4) -> str: """Extractive summarization using TF-IDF and sentence scoring.""" if not text.strip(): return "" # Split into sentences sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) <= max_sentences: return text # Calculate TF-IDF scores from sklearn.feature_extraction.text import TfidfVectorizer vectorizer = TfidfVectorizer(stop_words='english') try: tfidf_matrix = vectorizer.fit_transform(sentences) sentence_scores = tfidf_matrix.sum(axis=1).A1 # Get top sentences top_indices = sentence_scores.argsort()[-max_sentences:][::-1] top_indices = sorted(top_indices) summary = '. '.join([sentences[i] for i in top_indices]) + '.' return summary except Exception as e: print(f"Extractive summarization failed: {e}") # Fallback: return first few sentences return '. '.join(sentences[:max_sentences]) + '.' def summarize_textrank(self, text: str, max_sentences: int = 4) -> str: """TextRank-based summarization.""" if not self.nlp: return self.summarize_extractive(text, max_sentences) try: doc = self.nlp(text) sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()] if len(sentences) <= max_sentences: return text # Build similarity matrix similarity_matrix = np.zeros((len(sentences), len(sentences))) for i in range(len(sentences)): for j in range(len(sentences)): if i != j: # Simple similarity based on word overlap words_i = set(sentences[i].lower().split()) words_j = set(sentences[j].lower().split()) if words_i and words_j: similarity = len(words_i.intersection(words_j)) / len(words_i.union(words_j)) similarity_matrix[i][j] = similarity # PageRank algorithm scores = np.ones(len(sentences)) d = 0.85 # damping factor max_iter = 100 for _ in range(max_iter): new_scores = (1 - d) + d * np.dot(similarity_matrix, scores) if np.allclose(scores, new_scores, atol=1e-6): break scores = new_scores # Get top sentences top_indices = scores.argsort()[-max_sentences:][::-1] top_indices = sorted(top_indices) summary = '. '.join([sentences[i] for i in top_indices]) + '.' return summary except Exception as e: print(f"TextRank summarization failed: {e}") return self.summarize_extractive(text, max_sentences) def summarize(self, text: str, method: str = "auto", max_length: int = 150) -> str: """Main summarization method with method selection.""" self.initialize() if not text.strip(): return "" # Auto-select best method if method == "auto": if self.t5_model and len(text) > 200: method = "t5" elif self.nlp and len(text) > 100: method = "textrank" else: method = "extractive" if method == "t5": return self.summarize_t5(text, max_length) elif method == "textrank": return self.summarize_textrank(text, max_length // 50) # Convert to sentences else: return self.summarize_extractive(text, max_length // 50) def summarize_context(self, contexts: List[Dict], max_length: int = 300) -> str: """Summarize multiple context entries.""" if not contexts: return "" # Combine contexts into single text combined_text = "\n".join([ f"User: {ctx.get('user_message', '')}\nAI: {ctx.get('ai_response', '')}" for ctx in contexts ]) return self.summarize(combined_text, max_length=max_length) def get_summary_stats(self, original_text: str, summary: str) -> Dict: """Get compression statistics.""" original_words = len(original_text.split()) summary_words = len(summary.split()) compression_ratio = summary_words / max(original_words, 1) return { 'original_length': original_words, 'summary_length': summary_words, 'compression_ratio': compression_ratio, 'method_used': 't5' if self.t5_model else 'extractive' } # ------------------------------ # Personalization (preference weighting) # ------------------------------ class PersonalizationManager: """Real personalization with preference tracking and weighting.""" def __init__(self): self.session_preferences: Dict[str, Dict[str, float]] = {} self.global_preferences: Dict[str, float] = {} self.keyword_weights: Dict[str, float] = {} def update_session_preferences(self, session_id: str, preferences: Dict[str, float]): """Update preferences for a specific session.""" if session_id not in self.session_preferences: self.session_preferences[session_id] = {} self.session_preferences[session_id].update(preferences) def update_global_preferences(self, preferences: Dict[str, float]): """Update global preferences that apply to all sessions.""" self.global_preferences.update(preferences) def set_keyword_weights(self, keywords: Dict[str, float]): """Set keyword weights for importance scoring.""" self.keyword_weights.update(keywords) def calculate_context_score(self, session_id: str, text: str, base_score: float, metadata: Dict = None) -> float: """Calculate personalized context score based on preferences.""" score = base_score text_lower = text.lower() # Session-specific preferences if session_id in self.session_preferences: for keyword, weight in self.session_preferences[session_id].items(): if keyword.lower() in text_lower: score += weight # Global preferences for keyword, weight in self.global_preferences.items(): if keyword.lower() in text_lower: score += weight # Keyword weights for keyword, weight in self.keyword_weights.items(): if keyword.lower() in text_lower: score *= (1 + weight) # Multiplicative boost # Metadata-based weighting if metadata: # Boost recent messages if 'timestamp' in metadata: timestamp_str = metadata.get('timestamp', datetime.now().isoformat()) if isinstance(timestamp_str, str): timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) else: timestamp = timestamp_str age_hours = (datetime.now() - timestamp).total_seconds() / 3600 recency_boost = max(0, 1 - (age_hours / 24)) # Decay over 24 hours score *= (1 + recency_boost * 0.2) # Boost high-importance messages if 'importance_score' in metadata: importance_boost = metadata['importance_score'] / 10.0 score *= (1 + importance_boost) return max(0, score) # Ensure non-negative def get_personalized_context(self, session_id: str, contexts: List[Dict], query: str = "") -> List[Dict]: """Return contexts sorted by personalized scores.""" scored_contexts = [] for context in contexts: base_score = context.get('importance_score', 1.0) text = f"{context.get('user_message', '')} {context.get('ai_response', '')}" personalized_score = self.calculate_context_score( session_id, text, base_score, context.get('metadata', {}) ) scored_contexts.append({ **context, 'personalized_score': personalized_score, 'original_score': base_score }) # Sort by personalized score descending scored_contexts.sort(key=lambda x: x['personalized_score'], reverse=True) return scored_contexts def export_preferences(self, session_id: str = None) -> Dict: """Export current preferences for analysis.""" return { 'session_preferences': self.session_preferences.get(session_id, {}), 'global_preferences': self.global_preferences, 'keyword_weights': self.keyword_weights } def import_preferences(self, preferences: Dict): """Import preferences from external source.""" if 'session_preferences' in preferences: self.session_preferences.update(preferences['session_preferences']) if 'global_preferences' in preferences: self.global_preferences.update(preferences['global_preferences']) if 'keyword_weights' in preferences: self.keyword_weights.update(preferences['keyword_weights']) # ------------------------------ # Knowledge Graph Enricher # ------------------------------ class KnowledgeGraphEnricher: """Real knowledge graph enrichment with entity extraction and relationship mapping.""" def __init__(self): self.nlp = None self.entity_cache = {} self.relationship_patterns = { 'causes': ['causes', 'leads to', 'results in', 'triggers'], 'solves': ['solves', 'fixes', 'resolves', 'addresses'], 'requires': ['requires', 'needs', 'depends on', 'necessitates'], 'prevents': ['prevents', 'stops', 'blocks', 'avoids'], 'enables': ['enables', 'allows', 'permits', 'facilitates'] } self._load_spacy() def _load_spacy(self): """Load spaCy model for entity extraction.""" try: self.nlp = spacy.load("en_core_web_sm") print("✅ spaCy loaded for knowledge graph enrichment") except OSError: print("⚠️ spaCy model not found, installing...") import subprocess subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"]) self.nlp = spacy.load("en_core_web_sm") def extract_entities(self, text: str) -> Dict[str, List[str]]: """Extract named entities from text.""" if not self.nlp: return {} doc = self.nlp(text) entities = { 'PERSON': [], 'ORG': [], 'GPE': [], 'PRODUCT': [], 'EVENT': [], 'TECH': [], 'CONCEPT': [] } for ent in doc.ents: if ent.label_ in entities: entities[ent.label_].append(ent.text) else: entities['CONCEPT'].append(ent.text) # Extract technical terms and concepts for token in doc: if token.pos_ == 'NOUN' and len(token.text) > 3: # Check if it's a technical term if any(indicator in token.text.lower() for indicator in ['api', 'model', 'function', 'class', 'method']): entities['TECH'].append(token.text) # Remove duplicates while preserving order for key in entities: seen = set() entities[key] = [x for x in entities[key] if not (x in seen or seen.add(x))] return entities def extract_relationships(self, text: str) -> List[Dict[str, str]]: """Extract relationships between entities.""" if not self.nlp: return [] doc = self.nlp(text) relationships = [] for sent in doc.sents: sent_text = sent.text.lower() for relation_type, patterns in self.relationship_patterns.items(): for pattern in patterns: if pattern in sent_text: # Extract subject and object subject = self._extract_subject(sent) obj = self._extract_object(sent) if subject and obj: relationships.append({ 'type': relation_type, 'subject': subject, 'object': obj, 'sentence': sent.text, 'confidence': 0.8 # Simple confidence scoring }) return relationships def _extract_subject(self, sent) -> str: """Extract subject from sentence.""" for token in sent: if token.dep_ == 'nsubj' and token.head.pos_ == 'VERB': return token.text return "" def _extract_object(self, sent) -> str: """Extract object from sentence.""" for token in sent: if token.dep_ in ['dobj', 'pobj']: return token.text return "" def enrich_context(self, context: Dict) -> Dict: """Enrich context with knowledge graph data.""" if not isinstance(context, dict): return context text = f"{context.get('user_message', '')} {context.get('ai_response', '')}" # Extract entities entities = self.extract_entities(text) # Extract relationships relationships = self.extract_relationships(text) # Create enriched context enriched = dict(context) enriched.update({ 'entities': entities, 'relationships': relationships, 'knowledge_graph': { 'nodes': self._create_nodes(entities), 'edges': self._create_edges(relationships), 'metadata': { 'entity_count': sum(len(v) for v in entities.values()), 'relationship_count': len(relationships), 'timestamp': datetime.now().isoformat() } } }) return enriched def _create_nodes(self, entities: Dict[str, List[str]]) -> List[Dict]: """Create graph nodes from entities.""" nodes = [] for entity_type, entity_list in entities.items(): for entity in entity_list: nodes.append({ 'id': entity, 'type': entity_type, 'label': entity, 'properties': { 'frequency': 1, 'first_seen': datetime.now().isoformat() } }) return nodes def _create_edges(self, relationships: List[Dict]) -> List[Dict]: """Create graph edges from relationships.""" edges = [] for rel in relationships: edges.append({ 'source': rel['subject'], 'target': rel['object'], 'type': rel['type'], 'label': rel['type'], 'properties': { 'confidence': rel['confidence'], 'sentence': rel['sentence'] } }) return edges def build_knowledge_graph(self, contexts: List[Dict]) -> Dict: """Build comprehensive knowledge graph from multiple contexts.""" all_entities = {} all_relationships = [] for context in contexts: enriched = self.enrich_context(context) # Merge entities for entity_type, entities in enriched.get('entities', {}).items(): if entity_type not in all_entities: all_entities[entity_type] = [] all_entities[entity_type].extend(entities) # Merge relationships all_relationships.extend(enriched.get('relationships', [])) # Remove duplicates for entity_type in all_entities: all_entities[entity_type] = list(set(all_entities[entity_type])) return { 'entities': all_entities, 'relationships': all_relationships, 'graph': { 'nodes': self._create_nodes(all_entities), 'edges': self._create_edges(all_relationships) }, 'summary': { 'total_entities': sum(len(v) for v in all_entities.values()), 'total_relationships': len(all_relationships), 'entity_types': list(all_entities.keys()) } } # ------------------------------ # MCP Protocol abstractions + Router + Orchestrator # ------------------------------ @dataclass class MCPRequest: session_id: str query: str context_type: str = "auto" # "vector" | "graph" | "hybrid" | "auto" priority: str = "normal" # "low" | "normal" | "high" max_context_items: int = 6 class ContextRouter: def __init__(self, graph_mgr: Optional[GraphContextManager]): self.graph_mgr = graph_mgr def route(self, req: MCPRequest) -> str: if req.context_type in ("vector", "graph", "hybrid"): return req.context_type # Auto routing # If graph is enabled, prefer hybrid; else vector return "hybrid" if (self.graph_mgr and self.graph_mgr.enabled) else "vector" class ContextOrchestrator: """Combines vector (EnhancedContextManager) and optional graph (Neo4j), applies personalization and summarization, and returns a compact context block. """ def __init__( self, vector_mgr: EnhancedContextManager, graph_mgr: Optional[GraphContextManager] = None, summarizer: Optional[Summarizer] = None, personalization: Optional[PersonalizationManager] = None, ): self.vector_mgr = vector_mgr self.graph_mgr = graph_mgr self.summarizer = summarizer or Summarizer() self.personalization = personalization or PersonalizationManager() self.router = ContextRouter(graph_mgr) self._embedder = _build_embedding_backend() def update_preferences(self, session_id: str, prefs: Dict[str, float]) -> None: self.personalization.update_session_preferences(session_id, prefs) def build_context(self, req: MCPRequest) -> Dict[str, object]: mode = self.router.route(req) # Vector candidates vec_entries: List[ContextEntry] = self.vector_mgr.retrieve_relevant_context( req.session_id, req.query, top_k=req.max_context_items ) # Graph candidates (optional) graph_hits: List[Dict[str, object]] = [] q_vec = self._embedder.encode([req.query])[0] if mode in ("graph", "hybrid") and self.graph_mgr and self.graph_mgr.enabled: graph_hits = self.graph_mgr.find_similar_contexts(req.session_id, q_vec, limit=req.max_context_items) # Rank/merge merged: List[Tuple[float, str]] = [] # Vector items for e in vec_entries: base = e.importance_score text = f"{e.user_message} \n {e.ai_response}" score = self.personalization.calculate_context_score(req.session_id, text, base, e.metadata) merged.append((score, text)) # Graph items for g in graph_hits: text = str(g.get("message", "")) base = float(g.get("similarity", 0.0)) score = self.personalization.calculate_context_score(req.session_id, text, base, g) merged.append((score, text)) merged.sort(key=lambda x: x[0], reverse=True) merged_texts = [t for _, t in merged[: max(3, req.max_context_items)]] raw_context = "\n---\n".join(merged_texts) # Summarize to compact block summary = self.summarizer.summarize(raw_context, 12) return { "mode": mode, "items": len(merged_texts), "context_text": summary if summary else raw_context, }