Spaces:
Sleeping
Sleeping
| # [file name]: core/graph_builder.py | |
| # Add this as the FIRST lines of code (after docstrings) | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| import logging | |
| from typing import Dict, List, Any, Union | |
| from langchain_core.runnables import RunnableConfig | |
| from models.state_models import MultiCountryLegalState | |
| from core.router import CountryRouter | |
| from core.retriever import LegalRetriever | |
| from core.conversation_repair import ConversationRepair | |
| from core.human_approval_node import HumanApprovalNode | |
| # Import modular components | |
| from core.nodes.routing_nodes import RoutingNodes | |
| from core.assistance.workflow_nodes import AssistanceWorkflowNodes | |
| from core.nodes.retrieval_nodes import RetrievalNodes | |
| from core.nodes.response_nodes import ResponseNodes | |
| from core.nodes.helper_nodes import HelperNodes | |
| from core.routing.routing_logic import RoutingLogic | |
| logger = logging.getLogger(__name__) | |
| class GraphBuilder: | |
| def __init__( | |
| self, | |
| router: CountryRouter, | |
| llm, | |
| checkpointer: Union[AsyncPostgresSaver, InMemorySaver], | |
| # Country retrievers as a dictionary for easy extension | |
| country_retrievers: Dict[str, LegalRetriever] = None | |
| ): | |
| self.router = router | |
| self.llm = llm | |
| self.checkpointer = checkpointer | |
| # Initialize country retrievers - easily extensible! | |
| self.country_retrievers = country_retrievers or {} | |
| # Initialize modular components | |
| self.conversation_repair = ConversationRepair() | |
| self.human_approval = HumanApprovalNode() | |
| self.routing_logic = RoutingLogic() | |
| # Initialize node groups | |
| self.routing_nodes = RoutingNodes(router, self.conversation_repair, llm) | |
| self.assistance_nodes = AssistanceWorkflowNodes() | |
| # Dynamic retrieval nodes based on available countries | |
| self.retrieval_nodes = RetrievalNodes(self.country_retrievers) | |
| self.response_nodes = ResponseNodes(llm) | |
| self.helper_nodes = HelperNodes(llm) | |
| # Log checkpointer type | |
| checkpointer_type = self._get_checkpointer_type() | |
| logger.info(f"GraphBuilder initialized with {checkpointer_type} checkpointer and countries: {list(self.country_retrievers.keys())}") | |
| def _get_checkpointer_type(self) -> str: | |
| """Determine the type of checkpointer being used""" | |
| if hasattr(self.checkpointer, '__class__'): | |
| class_name = self.checkpointer.__class__.__name__ | |
| if 'PostgresSaver' in class_name: | |
| return "PostgreSQL" | |
| elif 'InMemorySaver' in class_name: | |
| return "in-memory" | |
| return "unknown" | |
| def add_country(self, country_code: str, retriever: LegalRetriever): | |
| """Dynamically add a new country to the system""" | |
| self.country_retrievers[country_code] = retriever | |
| self.retrieval_nodes = RetrievalNodes(self.country_retrievers) # Re-initialize | |
| logger.info(f"Added country: {country_code}") | |
| def build_graph(self) -> StateGraph: | |
| """Build simplified flow with all routing categories""" | |
| workflow = StateGraph(MultiCountryLegalState) | |
| # Core nodes | |
| workflow.add_node("router", self.routing_nodes.router_node) | |
| workflow.add_node("response", self.response_nodes.response_generation_node) | |
| # Country retrieval nodes - dynamically created | |
| country_nodes = {} | |
| for country_code in self.country_retrievers.keys(): | |
| node_name = f"{country_code}_retrieval" | |
| workflow.add_node(node_name, self._create_country_retrieval_node(country_code)) | |
| country_nodes[country_code] = node_name | |
| # Handler nodes | |
| workflow.add_node("greeting_handler", self.routing_nodes.greeting_small_talk_node) | |
| workflow.add_node("repair_handler", self.routing_nodes.conversation_repair_node) | |
| workflow.add_node("summary_handler", self.helper_nodes.conversation_summarization_node) | |
| workflow.add_node("unclear_handler", self.helper_nodes.unclear_route_node) | |
| workflow.add_node("out_of_scope_handler", self.helper_nodes.out_of_scope_node) | |
| # Assistance nodes - Using wrapper methods to ensure correct signatures | |
| workflow.add_node("assistance_collect_info", self._create_assistance_collect_wrapper()) | |
| workflow.add_node("assistance_confirm", self._create_assistance_confirm_wrapper()) | |
| workflow.add_node("human_approval", self.human_approval.process_approval) | |
| workflow.add_node("process_assistance", self._create_process_assistance_node) | |
| # Main flow | |
| workflow.add_edge(START, "router") | |
| # Router directly routes to appropriate nodes | |
| workflow.add_conditional_edges( | |
| "router", | |
| self._route_after_router, | |
| { | |
| **country_nodes, # benin_retrieval, madagascar_retrieval, etc. | |
| "greeting_small_talk": "greeting_handler", | |
| "conversation_repair": "repair_handler", | |
| "conversation_summarization": "summary_handler", | |
| "unclear": "unclear_handler", | |
| "out_of_scope": "out_of_scope_handler", | |
| "assistance_request": "assistance_collect_info" | |
| } | |
| ) | |
| # All handlers go to response | |
| workflow.add_edge("greeting_handler", "response") | |
| workflow.add_edge("repair_handler", "response") | |
| workflow.add_edge("summary_handler", "response") | |
| workflow.add_edge("unclear_handler", "response") | |
| workflow.add_edge("out_of_scope_handler", "response") | |
| # Country nodes go to response | |
| for country_code in self.country_retrievers.keys(): | |
| workflow.add_edge(f"{country_code}_retrieval", "response") | |
| # Assistance sub-flow | |
| workflow.add_conditional_edges( | |
| "assistance_collect_info", | |
| self.routing_logic.route_after_info_collection, | |
| { | |
| "need_email": "response", # Ask for email | |
| "need_description": "response", # Ask for description | |
| "ready_to_confirm": "assistance_confirm", | |
| "cancelled": "response" | |
| } | |
| ) | |
| # CRITICAL FIX: After response, only continue assistance if we have new user input | |
| workflow.add_conditional_edges( | |
| "response", | |
| self._route_after_response, | |
| { | |
| "continue_assistance": "assistance_collect_info", | |
| "end": END | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "assistance_confirm", | |
| self.routing_logic.route_after_confirmation, | |
| { | |
| "confirmed": "human_approval", | |
| "cancelled": "response", | |
| "needs_correction": "response" | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "human_approval", | |
| self.routing_logic.route_after_human_approval, | |
| { | |
| "approved": "process_assistance", | |
| "rejected": "response", | |
| "interrupt": "response" | |
| } | |
| ) | |
| workflow.add_edge("process_assistance", "response") | |
| checkpointer_type = self._get_checkpointer_type() | |
| logger.info(f"Scalable graph built with {checkpointer_type} checkpointer for {len(self.country_retrievers)} countries: {list(self.country_retrievers.keys())}") | |
| return workflow | |
| def _create_assistance_collect_wrapper(self): | |
| """Wrapper to ensure proper method signature for assistance collection""" | |
| async def wrapper(state: MultiCountryLegalState, config: RunnableConfig) -> Dict[str, Any]: | |
| result = await self.assistance_nodes.collect_assistance_info_node(state, config) | |
| # Ensure supplemental_message is included if not present | |
| if "supplemental_message" not in result: | |
| result["supplemental_message"] = "" | |
| return result | |
| return wrapper | |
| def _create_assistance_confirm_wrapper(self): | |
| """Wrapper to ensure proper method signature for assistance confirmation""" | |
| async def wrapper(state: MultiCountryLegalState, config: RunnableConfig) -> Dict[str, Any]: | |
| result = await self.assistance_nodes.confirm_assistance_send_node(state, config) | |
| # Ensure supplemental_message is included if not present | |
| if "supplemental_message" not in result: | |
| result["supplemental_message"] = "" | |
| return result | |
| return wrapper | |
| def _route_after_router(self, state: MultiCountryLegalState) -> str: | |
| """Route directly from router - single source of truth""" | |
| router_decision = state.router_decision or "unclear" | |
| logger.debug(f"Routing from router: {router_decision}") | |
| return router_decision | |
| def _route_after_response(self, state: MultiCountryLegalState) -> str: | |
| """Route after response - check if we should continue assistance workflow""" | |
| # Check if we're in the middle of an assistance workflow | |
| assistance_step = state.assistance_step | |
| if assistance_step and assistance_step not in [None, "cancelled", "completed"]: | |
| # CRITICAL FIX: Only continue if we have new user input to process | |
| # This prevents infinite loops when no new user input is available | |
| has_new_user_input = self._has_new_user_input(state) | |
| if has_new_user_input: | |
| logger.info(f"🔄 Continuing assistance workflow from response: {assistance_step}") | |
| return "continue_assistance" | |
| else: | |
| logger.info("⏸️ No new user input - waiting for user response") | |
| return "end" | |
| # Normal end of conversation | |
| logger.debug("✅ Ending conversation - no assistance workflow active") | |
| return "end" | |
| def _has_new_user_input(self, state: MultiCountryLegalState) -> bool: | |
| """Check if there's new user input to process in assistance workflow""" | |
| if not state.messages: | |
| return False | |
| # Get the last message | |
| last_message = state.messages[-1] if state.messages else None | |
| # Check if the last message is from user and not already processed | |
| if last_message and last_message.get("role") == "user": | |
| # Check message metadata to see if it's been processed in current assistance step | |
| message_meta = last_message.get("meta", {}) | |
| processed_in_step = message_meta.get("processed_in_assistance_step") | |
| current_step = state.assistance_step | |
| # If this message hasn't been processed in the current assistance step, it's new input | |
| if processed_in_step != current_step: | |
| logger.info(f"📥 New user input detected for assistance step: {current_step}") | |
| return True | |
| logger.info("📭 No new user input detected") | |
| return False | |
| def _create_country_retrieval_node(self, country_code: str): | |
| """Create a dynamic country retrieval node (closure factory)""" | |
| async def country_retrieval_node(state: MultiCountryLegalState, config: RunnableConfig) -> Dict[str, Any]: | |
| logger.info(f"Country retrieval for: {country_code}") | |
| return await self.retrieval_nodes.country_retrieval_node(state, config, country_code) | |
| return country_retrieval_node | |
| async def _create_process_assistance_node(self, state: MultiCountryLegalState, config: RunnableConfig) -> Dict[str, Any]: | |
| """Process assistance after approval""" | |
| logger.info("Processing assistance request") | |
| # Mark assistance as completed with supplemental message | |
| return { | |
| "email_status": "sent", | |
| "approval_status": "approved", | |
| "assistance_step": "completed", | |
| "messages": [], | |
| # "supplemental_message": "Votre demande d'assistance a été traitée avec succès et envoyée à notre équipe juridique." | |
| } | |
| def debug_state(self, state: MultiCountryLegalState, step: str) -> None: | |
| """Debug state information""" | |
| if logger.isEnabledFor(logging.DEBUG): | |
| logger.debug(f"=== STATE DEBUG at {step} ===") | |
| logger.debug(f"Router decision: {getattr(state, 'router_decision', 'None')}") | |
| logger.debug(f"Assistance step: {getattr(state, 'assistance_step', 'None')}") | |
| logger.debug(f"User email: {getattr(state, 'user_email', 'None')}") | |
| logger.debug(f"Assistance description: {getattr(state, 'assistance_description', 'None')}") | |
| logger.debug(f"Supplemental message: {getattr(state, 'supplemental_message', 'None')}") | |
| logger.debug(f"Available countries: {list(self.country_retrievers.keys())}") | |
| logger.debug("=== END STATE DEBUG ===") |