# core/retriever.py # Add this as the FIRST lines of code (after docstrings) import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) import re import logging import asyncio from typing import List, Dict, Any, Tuple from langchain_core.documents import Document from langchain_mongodb.vectorstores import MongoDBAtlasVectorSearch from config.settings import settings from config.constants import ARTICLE_PATTERNS, CATEGORY_KEYWORDS, DOCUMENT_TYPE_KEYWORDS logger = logging.getLogger(__name__) class LegalRetriever: def __init__(self, vectorstore: MongoDBAtlasVectorSearch, collection): self.vectorstore = vectorstore self.collection = collection async def smart_legal_query(self, user_query: str, country: str) -> Tuple[List[Document], List[str], Dict[str, Any], str]: """Perform smart legal search with automatic fallback and custom messages - ASYNC VERSION""" try: # Détection initiale du type de document initial_doc_type = self._detect_document_type(user_query.lower()) pre_filter = self._build_pre_filters(user_query, country) logger.info(f"📋 Filtre doc_type initial: {initial_doc_type}") logger.info(f"🔍 Recherche {country} avec filtres: {pre_filter}") # Première recherche enhanced_docs, detected_articles, applied_filters = await self._perform_search_async( user_query, country, pre_filter ) message_supplementaire = "" # Fallback automatique si aucun résultat pour case_study if not enhanced_docs and initial_doc_type == "case_study": logger.info("🔄 Fallback: Aucun case_study trouvé, recherche dans les articles") # Create new filter for articles - DON'T rebuild, just modify fallback_filter = pre_filter.copy() # Copy the original filter fallback_filter["doc_type"] = "articles" # Force articles type logger.info(f"🔄 Fallback filter: {fallback_filter}") # Log the fallback filter enhanced_docs, detected_articles, applied_filters = await self._perform_search_async( user_query, country, fallback_filter ) # Mark that fallback was used applied_filters["original_search"] = "case_study" applied_filters["fallback_to"] = "articles" applied_filters["fallback_used"] = True # Message personnalisé pour le fallback if enhanced_docs: message_supplementaire = ( "⚠️ Nous nous excusons, mais aucune décision de justice n'a été trouvée pour votre requête. " "La base de données sera enrichie avec des décisions de justice prochainement. " "En attendant, voici des articles de loi pertinents qui peuvent vous aider." ) else: # Check if it's a MongoDB error if "mongodb_error" in applied_filters: message_supplementaire = ( "⚠️ Nous nous excusons, mais une erreur technique s'est produite lors de la recherche. " "Nous travaillons à résoudre ce problème. Veuillez réessayer dans quelques instants." ) else: message_supplementaire = ( "⚠️ Nous nous excusons, mais aucune décision de justice n'a été trouvée pour votre requête. " "La base de données sera enrichie avec des décisions de justice prochainement. " "De plus, aucun article de loi correspondant n'a été trouvé. " "Essayez de reformuler votre question avec des termes plus généraux." ) logger.info(f"🔍 Search completed: {len(enhanced_docs)} documents found") logger.info(f"📢 Supplemental message: {message_supplementaire[:100] if message_supplementaire else 'None'}") return enhanced_docs, detected_articles, applied_filters, message_supplementaire except Exception as e: logger.error(f"Error in smart_legal_query: {str(e)}") # Return empty results on error return [], [], {"error": str(e)}, f"Erreur lors de la recherche: {str(e)}" async def _perform_search_async(self, user_query: str, country: str, pre_filter: Dict) -> Tuple[List[Document], List[str], Dict[str, Any]]: """Perform search with given filters - ASYNC VERSION""" try: detected_articles = self._detect_articles(user_query) enhanced_query = self._enhance_query(user_query, detected_articles) logger.info(f"🔢 Articles détectés: {detected_articles}") logger.info(f"🔍 Requête enrichie: {enhanced_query[:100]}...") # CRITICAL FIX: Run synchronous vectorstore operation in thread pool docs = await asyncio.get_event_loop().run_in_executor( None, # Use default thread pool lambda: self.vectorstore.similarity_search( enhanced_query, k=settings.MAX_SEARCH_RESULTS, pre_filter=pre_filter ) ) logger.info(f"🎯 Vector search returned {len(docs)} raw documents") if docs: logger.info(f"📄 First result metadata: {docs[0].metadata}") else: logger.warning(f"⚠️ No documents found with filters: {pre_filter}") await self._debug_search_issue(pre_filter) enhanced_docs = self.enhance_with_article_context(docs) return enhanced_docs, detected_articles, pre_filter except Exception as e: logger.error(f"Error in _perform_search_async: {str(e)}") # Mark the filter with MongoDB error for better error handling error_filter = pre_filter.copy() error_filter["mongodb_error"] = str(e) return [], [], error_filter async def _debug_search_issue(self, pre_filter: Dict): """Debug why search returned no results""" try: # Check total document count total_count = await asyncio.get_event_loop().run_in_executor( None, lambda: self.collection.count_documents({}) ) logger.info(f"🔢 Total documents in collection: {total_count}") # Check documents matching country filter country_count = await asyncio.get_event_loop().run_in_executor( None, lambda: self.collection.count_documents({"pays": pre_filter.get("pays")}) ) logger.info(f"🌍 Documents for country {pre_filter.get('pays')}: {country_count}") # Check documents with doc_type doc_type_count = await asyncio.get_event_loop().run_in_executor( None, lambda: self.collection.count_documents({ "pays": pre_filter.get("pays"), "doc_type": pre_filter.get("doc_type") }) ) logger.info(f"📋 Documents with doc_type {pre_filter.get('doc_type')}: {doc_type_count}") # Check documents with embeddings embedding_count = await asyncio.get_event_loop().run_in_executor( None, lambda: self.collection.count_documents({ "pays": pre_filter.get("pays"), "embedding": {"$exists": True, "$ne": None} }) ) logger.info(f"🎯 Documents with embeddings: {embedding_count}") # Sample document check sample_doc = await asyncio.get_event_loop().run_in_executor( None, lambda: self.collection.find_one({"pays": pre_filter.get("pays")}) ) if sample_doc: logger.info(f"📄 Sample document keys: {list(sample_doc.keys())}") logger.info(f"📄 Sample doc_type: {sample_doc.get('doc_type', 'NOT_SET')}") else: logger.error("❌ No sample document found!") except Exception as e: logger.error(f"Error in debug: {str(e)}") def _build_pre_filters(self, query: str, country: str) -> Dict[str, Any]: """Build search filters based on query and country""" # Filtre pays obligatoire - MAKE SURE EXACT MATCH country_mapping = { "benin": "Bénin", "madagascar": "Madagascar" } pre_filter = {"pays": country_mapping.get(country.lower(), country)} # Filtre doc_type pour différencier articles et études de cas query_lower = query.lower() detected_doc_type = self._detect_document_type(query_lower) pre_filter["doc_type"] = detected_doc_type logger.info(f"🏷️ Using country filter: {pre_filter['pays']}") logger.info(f"📋 Using doc_type filter: {detected_doc_type}") # Filtres par catégorie (optionnels) logger.info("ℹ️ No category filter applied - using all available family law documents") # for keyword, category in CATEGORY_KEYWORDS.items(): # if keyword in query_lower: # pre_filter["categorie"] = category # logger.info(f"🏷️ Filtre catégorie: {category}") # break return pre_filter def _detect_document_type(self, query_lower: str) -> str: """Détecte le type de document basé sur les mots-clés de la requête""" # Mots-clés pour les études de cas case_study_indicators = [ "jurisprudence", "arrêt", "décision", "tribunal", "cours", "jugement", "affaire", "procès", "litige", "contentieux", "précédent", "cas", "cour d'appel", "cour suprême", "conseil d'état", "juridiction" ] # Mots-clés pour les articles articles_indicators = [ "article", "loi", "code", "décret", "texte", "disposition", "règlement", "ordonnance", "prescription", "norme", "chapitre", "titre" ] case_study_score = sum(1 for keyword in case_study_indicators if keyword in query_lower) articles_score = sum(1 for keyword in articles_indicators if keyword in query_lower) if case_study_score > articles_score and case_study_score > 0: return "case_study" elif articles_score > 0: return "articles" else: # Par défaut, on cherche les articles de loi return "articles" def _detect_articles(self, query: str) -> List[str]: """Detect article references in query""" detected_articles = [] for pattern in ARTICLE_PATTERNS: matches = re.findall(pattern, query.lower()) for match in matches: if isinstance(match, tuple): nums = [n for n in match if n.isdigit()] detected_articles.extend(nums) else: nums = re.findall(r"\d+", match) detected_articles.extend(nums) return sorted(list(set(detected_articles))) def _enhance_query(self, query: str, detected_articles: List[str]) -> str: """Enhance query with article context""" if detected_articles: enhanced = f"article {' '.join(detected_articles)} {query}" logger.info(f"🔢 Requête enrichie avec articles: {detected_articles}") return enhanced return query def enhance_with_article_context(self, results: List[Document]) -> List[Document]: """Enhance search results with referenced article context""" enhanced_results = [] for result in results: enhanced_results.append(result) # Pour les documents de type "articles", on peut ajouter les références if result.metadata.get("doc_type") == "articles": article_refs = result.metadata.get("article_references", []) resolved_refs = result.metadata.get("resolved_references", {}) for article_num in article_refs[:3]: if article_num in resolved_refs: ref_doc = Document( page_content=f"Article {article_num} (Référencé): {resolved_refs[article_num][:500]}...", metadata={ **result.metadata, "is_reference": True, "referenced_article": article_num, "doc_type": "article_reference" }, ) enhanced_results.append(ref_doc) return enhanced_results def format_search_results(self, query: str, enhanced_docs: List[Document], detected_articles: List[str], applied_filters: Dict[str, Any], country: str, supplemental_message: str = "") -> str: """Format search results for system prompt""" country_name = "Bénin" if country == "benin" else "Madagascar" if not enhanced_docs: doc_type = applied_filters.get("doc_type", "articles") # Check if this was an error case if "error" in applied_filters: return f""" **🚨 ERREUR DE RECHERCHE - {country_name.upper()}** Une erreur s'est produite lors de la recherche: {applied_filters['error']} **Informations de débogage:** - **Requête**: "{query}" - **Pays**: {country_name} - **Type de document recherché**: {doc_type} - **Filtres**: {applied_filters} Veuillez réessayer ou contacter le support technique. """ if applied_filters.get("fallback_used"): # Cas où le fallback a été utilisé mais n'a rien trouvé non plus mongodb_error_note = "" if "mongodb_error" in applied_filters: mongodb_error_note = f"\n\n**⚠️ Erreur technique**: {applied_filters['mongodb_error'][:200]}..." return f""" **🔍 RECHERCHE JURIDIQUE - {country_name.upper()}** {supplemental_message} **💡 Informations :** - Votre recherche portait sur des **décisions de justice (jurisprudence)** - Aucune décision de justice n'a été trouvée dans la base de données - Aucun article de loi correspondant n'a été trouvé non plus {mongodb_error_note} **Suggestion**: Essayez de reformuler votre requête avec des termes plus généraux. **Recherche effectuée**: - Type initial: {applied_filters.get('original_search', 'N/A')} - Fallback vers: {applied_filters.get('fallback_to', 'N/A')} - Pays: {country_name} """ else: # Cas normal sans fallback return f""" **🔍 RECHERCHE JURIDIQUE - {country_name.upper()}** Aucun document trouvé avec les critères suivants: - **Type de document**: {doc_type} - **Catégorie**: {applied_filters.get('categorie', 'Toutes')} - **Requête**: "{query}" **Suggestion**: Essayez avec des termes plus généraux ou vérifiez l'orthographe. **Filtres appliqués**: {applied_filters} """ # Si des documents sont trouvés doc_type = applied_filters.get("doc_type", "articles") doc_type_fr = "articles de loi" if doc_type == "articles" else "études de cas/jurisprudence" fallback_note = "" if applied_filters.get("fallback_used"): fallback_note = f""" **💡 {supplemental_message}** --- """ search_results = f""" **🔍 RECHERCHE JURIDIQUE - {country_name.upper()}** **Type de documents**: {doc_type_fr} **Requête**: "{query}" **Juridiction**: {country_name} **Articles détectés**: {', '.join(detected_articles) if detected_articles else 'Aucun'} **Documents trouvés**: {len(enhanced_docs)} {fallback_note} """ # Formatage des documents trouvés main_docs = [doc for doc in enhanced_docs if not doc.metadata.get("is_reference", False)] for i, doc in enumerate(main_docs[:5]): doc_type = doc.metadata.get("doc_type", "inconnu") source = doc.metadata.get('source', 'Non spécifié') content = doc.page_content[:600] search_results += f""" **📄 DOCUMENT {i+1}** (Type: {doc_type}) - **Source**: {source} - **Contenu**: {content}... """ return search_results # BACKWARD COMPATIBILITY: Keep sync version for any remaining sync calls def smart_legal_query_sync(self, user_query: str, country: str) -> Tuple[List[Document], List[str], Dict[str, Any], str]: """Synchronous version for backward compatibility""" logger.warning("Using sync version of smart_legal_query - consider migrating to async") return asyncio.run(self.smart_legal_query(user_query, country))