# Phase 4 Implementation Spec: Orchestrator & UI **Goal**: Connect the Brain and the Body, then give it a Face. **Philosophy**: "Streaming is Trust." **Estimated Effort**: 4-5 hours **Prerequisite**: Phases 1-3 complete --- ## 1. The Slice Definition This slice connects: 1. **Orchestrator**: The loop calling `SearchHandler` → `JudgeHandler`. 2. **UI**: Gradio app. **Files**: - `src/utils/models.py`: Add Orchestrator models - `src/orchestrator.py`: Main logic - `src/app.py`: UI --- ## 2. Models (`src/utils/models.py`) Add to models file: ```python from enum import Enum class AgentState(str, Enum): INITIALIZING = "initializing" SEARCHING = "searching" JUDGING = "judging" SYNTHESIZING = "synthesizing" COMPLETE = "complete" ERROR = "error" class AgentEvent(BaseModel): state: AgentState message: str iteration: int = 0 data: dict[str, Any] | None = None def to_display(self) -> str: """Format for UI display.""" emoji_map = { AgentState.INITIALIZING: "ā³", AgentState.SEARCHING: "šŸ”", AgentState.JUDGING: "🧠", AgentState.SYNTHESIZING: "šŸ“", AgentState.COMPLETE: "āœ…", AgentState.ERROR: "āŒ", } emoji = emoji_map.get(self.state, "") return f"{emoji} **[{self.state.value.upper()}]** {self.message}" class AgentResult(BaseModel): """Final result of the agent execution.""" question: str report: str evidence_count: int iterations: int candidates: list[Any] = Field(default_factory=list) quality_score: int = 0 ``` --- ## 3. Orchestrator (`src/orchestrator.py`) ```python """Main agent orchestrator.""" import structlog import asyncio from typing import AsyncGenerator from src.utils.config import settings from src.utils.exceptions import DeepCriticalError from src.tools.search_handler import SearchHandler from src.tools.pubmed import PubMedTool from src.tools.websearch import WebTool from src.agent_factory.judges import JudgeHandler from src.utils.models import AgentEvent, AgentState, Evidence, JudgeAssessment, AgentResult logger = structlog.get_logger() # Placeholder for Synthesis Agent (Phase 5) class MockSynthesisAgent: async def run(self, prompt): class Result: data = "Research Report (Synthesis not implemented yet)\n\n" + prompt[:500] + "..." return Result() synthesis_agent = MockSynthesisAgent() def build_synthesis_prompt(question, assessment, evidence): return f"Question: {question}\nAssessment: {assessment}\nEvidence: {len(evidence)} items" class Orchestrator: """Main orchestrator for the DeepCritical agent.""" def __init__( self, search_handler: SearchHandler | None = None, judge_handler: JudgeHandler | None = None, max_iterations: int | None = None, ): """Initialize the orchestrator. Args: search_handler: Optional SearchHandler (for testing). judge_handler: Optional JudgeHandler (for testing). max_iterations: Max search iterations (default from settings). """ self.search_handler = search_handler or SearchHandler([ PubMedTool(), WebTool(), ]) self.judge_handler = judge_handler or JudgeHandler() self.max_iterations = max_iterations or settings.max_iterations async def run(self, question: str) -> AsyncGenerator[AgentEvent, None]: """Run the agent loop, yielding events for streaming UI. Args: question: The research question to investigate. Yields: AgentEvent objects for each state change. """ logger.info("orchestrator_starting", question=question[:100]) # Track state all_evidence: list[Evidence] = [] iteration = 0 last_assessment: JudgeAssessment | None = None try: # Initial event yield AgentEvent( state=AgentState.INITIALIZING, message=f"Starting research on: {question[:100]}...", iteration=0, ) # Main search → judge loop while iteration < self.max_iterations: iteration += 1 # === SEARCH PHASE === yield AgentEvent( state=AgentState.SEARCHING, message=f"Searching (iteration {iteration}/{self.max_iterations})...", iteration=iteration, ) # Determine search query if last_assessment and last_assessment.next_search_queries: # Use judge's suggested queries search_query = last_assessment.next_search_queries[0] else: # Use original question search_query = question # Execute search search_result = await self.search_handler.execute( search_query, max_results_per_tool=10, ) # Accumulate evidence (deduplicate by URL) existing_urls = {e.citation.url for e in all_evidence} new_evidence = [ e for e in search_result.evidence if e.citation.url not in existing_urls ] all_evidence.extend(new_evidence) yield AgentEvent( state=AgentState.SEARCHING, message=f"Found {len(new_evidence)} new items ({len(all_evidence)} total)", iteration=iteration, data={ "new_count": len(new_evidence), "total_count": len(all_evidence), "sources": search_result.sources_searched, }, ) # === JUDGE PHASE === yield AgentEvent( state=AgentState.JUDGING, message="Evaluating evidence quality...", iteration=iteration, ) last_assessment = await self.judge_handler.assess( question, all_evidence[-20:], # Evaluate most recent 20 items ) yield AgentEvent( state=AgentState.JUDGING, message=( f"Quality: {last_assessment.overall_quality_score}/10, " f"Coverage: {last_assessment.coverage_score}/10" ), iteration=iteration, data={ "quality_score": last_assessment.overall_quality_score, "coverage_score": last_assessment.coverage_score, "sufficient": last_assessment.sufficient, "candidates": len(last_assessment.candidates), }, ) # Check if we should stop if not await self.judge_handler.should_continue(last_assessment): logger.info( "orchestrator_sufficient_evidence", iteration=iteration, evidence_count=len(all_evidence), ) break # Log why we're continuing if last_assessment.gaps: logger.info( "orchestrator_continuing", gaps=last_assessment.gaps[:3], next_query=last_assessment.next_search_queries[:1], ) # === SYNTHESIS PHASE === yield AgentEvent( state=AgentState.SYNTHESIZING, message="Generating research report...", iteration=iteration, ) report = await self._synthesize_report( question, all_evidence, last_assessment, ) # === COMPLETE === yield AgentEvent( state=AgentState.COMPLETE, message="Research complete!", iteration=iteration, data={ "evidence_count": len(all_evidence), "candidates": ( len(last_assessment.candidates) if last_assessment else 0 ), "report_length": len(report), }, ) # Yield final report as special event yield AgentEvent( state=AgentState.COMPLETE, message=report, # The report itself iteration=iteration, data={"is_report": True}, ) except Exception as e: logger.error("orchestrator_error", error=str(e)) yield AgentEvent( state=AgentState.ERROR, message=f"Error: {str(e)}", iteration=iteration, ) raise DeepCriticalError(f"Orchestrator failed: {e}") from e async def _synthesize_report( self, question: str, evidence: list[Evidence], assessment: JudgeAssessment | None, ) -> str: """Generate the final research report.""" if not assessment: # Fallback assessment assessment = JudgeAssessment( sufficient=True, recommendation="synthesize", reasoning="Manual synthesis requested.", overall_quality_score=5, coverage_score=5, ) # Build synthesis prompt prompt = build_synthesis_prompt(question, assessment, evidence) # Generate report result = await synthesis_agent.run(prompt) return result.data async def run_to_completion(self, question: str) -> AgentResult: """Run the agent and return final result (non-streaming).""" report = "" evidence_count = 0 iterations = 0 candidates = [] quality_score = 0 async for event in self.run(question): iterations = event.iteration if event.data: if event.data.get("is_report"): report = event.message if "evidence_count" in event.data: evidence_count = event.data["evidence_count"] if "candidates" in event.data: candidates = event.data.get("candidates", []) if "quality_score" in event.data: quality_score = event.data["quality_score"] return AgentResult( question=question, report=report, evidence_count=evidence_count, iterations=iterations, candidates=candidates, quality_score=quality_score, ) ``` --- ## 4. UI (`src/app.py`) ```python """Gradio UI for DeepCritical agent.""" import gradio as gr from typing import AsyncGenerator from src.orchestrator import Orchestrator from src.utils.models import AgentEvent, AgentState async def chat( message: str, history: list[list[str]], ) -> AsyncGenerator[str, None]: """Process a chat message and stream responses. Args: message: User's research question. history: Chat history (not used, fresh agent each time). Yields: Streaming response text. """ if not message.strip(): yield "Please enter a research question." return orchestrator = Orchestrator() full_response = "" try: async for event in orchestrator.run(message): # Format event for display if event.data and event.data.get("is_report"): # Final report - yield as-is full_response = event.message yield full_response else: # Status update status = event.to_display() full_response += f"\n{status}" yield full_response except Exception as e: yield f"\nāŒ **Error**: {str(e)}" def create_app() -> gr.Blocks: """Create the Gradio application.""" with gr.Blocks( title="DeepCritical - Drug Repurposing Research Agent", theme=gr.themes.Soft(), ) as app: gr.Markdown( """ # 🧬 DeepCritical ## AI-Powered Drug Repurposing Research Agent Enter a research question about drug repurposing to get started. The agent will search PubMed and the web, evaluate evidence quality, and generate a comprehensive research report. **Example questions:** - "Can metformin be repurposed to treat Alzheimer's disease?" - "What existing drugs might help treat long COVID fatigue?" - "Are there diabetes drugs that could treat Parkinson's?" """ ) chatbot = gr.Chatbot( label="Research Assistant", height=600, show_copy_button=True, render_markdown=True, ) msg = gr.Textbox( label="Research Question", placeholder="e.g., Can metformin be repurposed to treat Alzheimer's disease?", lines=2, max_lines=5, ) with gr.Row(): submit_btn = gr.Button("šŸ”¬ Research", variant="primary") clear_btn = gr.Button("šŸ—‘ļø Clear") # Examples gr.Examples( examples=[ "Can metformin be repurposed to treat Alzheimer's disease?", "What existing drugs might help treat long COVID fatigue?", "Are there cancer drugs that could treat autoimmune diseases?", "Can diabetes medications help with heart failure?", ], inputs=msg, ) # Event handlers async def respond(message: str, chat_history: list): """Handle user message and stream response.""" chat_history = chat_history or [] chat_history.append([message, ""]) async for response in chat(message, chat_history): chat_history[-1][1] = response yield "", chat_history submit_btn.click( respond, inputs=[msg, chatbot], outputs=[msg, chatbot], ) msg.submit( respond, inputs=[msg, chatbot], outputs=[msg, chatbot], ) clear_btn.click(lambda: (None, []), outputs=[msg, chatbot]) gr.Markdown( """ --- **Disclaimer**: This tool is for research purposes only. Always consult healthcare professionals for medical decisions. Built with ā¤ļø using PydanticAI, Gradio, and Claude. """ ) return app # Create the app instance app = create_app() if __name__ == "__main__": app.launch( server_name="0.0.0.0", server_port=7860, share=False, ) ``` --- ## 5. Deployment Files ### `Dockerfile` ```dockerfile # DeepCritical Docker Image FROM python:3.11-slim # Set working directory WORKDIR /app # Install uv for fast package management RUN pip install uv # Copy dependency files COPY pyproject.toml . COPY uv.lock* . # Install dependencies RUN uv sync --no-dev # Copy source code COPY src/ src/ # Expose Gradio port EXPOSE 7860 # Set environment variables ENV PYTHONUNBUFFERED=1 ENV PYTHONDONTWRITEBYTECODE=1 # Run the app CMD ["uv", "run", "python", "src/app.py"] ``` ### `README.md` (HuggingFace Space Config) > Note: This is for the HuggingFace Space, placed at project root. ```markdown --- title: DeepCritical emoji: 🧬 colorFrom: blue colorTo: green sdk: gradio sdk_version: 5.0.0 python_version: 3.11 app_file: src/app.py pinned: false license: mit --- # DeepCritical - Drug Repurposing Research Agent An AI-powered research assistant that searches biomedical literature to identify drug repurposing opportunities. ## Features - šŸ” Searches PubMed and web sources - āš–ļø Evaluates evidence quality using AI - šŸ“ Generates comprehensive research reports - šŸ’Š Identifies drug repurposing candidates ## How to Use 1. Enter a research question about drug repurposing 2. Wait for the agent to search and analyze literature 3. Review the generated research report ## Example Questions - "Can metformin be repurposed to treat Alzheimer's disease?" - "What existing drugs might help treat long COVID?" - "Are there diabetes drugs that could treat Parkinson's?" ## Technical Details Built with: - PydanticAI for structured LLM outputs - PubMed E-utilities for biomedical search - DuckDuckGo for web search - Gradio for the interface ## Disclaimer This tool is for research purposes only. Always consult healthcare professionals. ``` --- ## 6. TDD Workflow ### Test File: `tests/unit/test_orchestrator.py` ```python """Unit tests for Orchestrator.""" import pytest from unittest.mock import AsyncMock, MagicMock, patch class TestOrchestrator: """Tests for Orchestrator.""" @pytest.mark.asyncio async def test_run_yields_events(self, mocker): """Orchestrator.run should yield AgentEvents.""" from src.orchestrator import Orchestrator from src.utils.models import ( AgentEvent, AgentState, SearchResult, JudgeAssessment, Evidence, Citation, ) # Mock search handler mock_search = MagicMock() mock_search.execute = AsyncMock(return_value=SearchResult( query="test", evidence=[ Evidence( content="Test evidence", citation=Citation( source="pubmed", title="Test", url="https://example.com", date="2024", ), ) ], sources_searched=["pubmed", "web"], total_found=1, )) # Mock judge handler - return "synthesize" immediately mock_judge = MagicMock() mock_judge.assess = AsyncMock(return_value=JudgeAssessment( sufficient=True, recommendation="synthesize", reasoning="Good evidence.", overall_quality_score=8, coverage_score=8, candidates=[], )) mock_judge.should_continue = AsyncMock(return_value=False) # Mock synthesis mocker.patch( "src.orchestrator.synthesis_agent.run", new=AsyncMock(return_value=MagicMock(data="# Test Report")) ) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=mock_judge, max_iterations=3, ) events = [] async for event in orchestrator.run("test question"): events.append(event) # Should have multiple events assert len(events) >= 4 # init, search, judge, complete # Check state progression states = [e.state for e in events] assert AgentState.INITIALIZING in states assert AgentState.SEARCHING in states assert AgentState.JUDGING in states assert AgentState.COMPLETE in states @pytest.mark.asyncio async def test_run_respects_max_iterations(self, mocker): """Orchestrator should stop at max_iterations.""" from src.orchestrator import Orchestrator from src.utils.models import SearchResult, JudgeAssessment, Evidence, Citation # Mock search mock_search = MagicMock() mock_search.execute = AsyncMock(return_value=SearchResult( query="test", evidence=[ Evidence( content="Test", citation=Citation( source="pubmed", title="Test", url="https://example.com", date="2024", ), ) ], sources_searched=["pubmed"], total_found=1, )) # Mock judge - always say "continue" mock_judge = MagicMock() mock_judge.assess = AsyncMock(return_value=JudgeAssessment( sufficient=False, recommendation="continue", reasoning="Need more evidence.", overall_quality_score=4, coverage_score=4, next_search_queries=["more research"], )) mock_judge.should_continue = AsyncMock(return_value=True) # Mock synthesis mocker.patch( "src.orchestrator.synthesis_agent.run", new=AsyncMock(return_value=MagicMock(data="# Report")) ) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=mock_judge, max_iterations=2, # Low limit ) iterations_seen = set() async for event in orchestrator.run("test"): iterations_seen.add(event.iteration) # Should not exceed max_iterations assert max(iterations_seen) <= 2 @pytest.mark.asyncio async def test_run_handles_errors(self, mocker): """Orchestrator should yield error event on failure.""" from src.orchestrator import Orchestrator from src.utils.models import AgentState from src.utils.exceptions import DeepCriticalError # Mock search to raise error mock_search = MagicMock() mock_search.execute = AsyncMock(side_effect=Exception("Search failed")) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=MagicMock(), max_iterations=3, ) events = [] with pytest.raises(DeepCriticalError): async for event in orchestrator.run("test"): events.append(event) # Should have error event error_events = [e for e in events if e.state == AgentState.ERROR] assert len(error_events) >= 1 @pytest.mark.asyncio async def test_run_to_completion_returns_result(self, mocker): """run_to_completion should return AgentResult.""" from src.orchestrator import Orchestrator from src.utils.models import SearchResult, JudgeAssessment, AgentResult, Evidence, Citation # Mock search mock_search = MagicMock() mock_search.execute = AsyncMock(return_value=SearchResult( query="test", evidence=[ Evidence( content="Test", citation=Citation( source="pubmed", title="Test", url="https://example.com", date="2024", ), ) ], sources_searched=["pubmed"], total_found=1, )) # Mock judge mock_judge = MagicMock() mock_judge.assess = AsyncMock(return_value=JudgeAssessment( sufficient=True, recommendation="synthesize", reasoning="Good.", overall_quality_score=8, coverage_score=8, )) mock_judge.should_continue = AsyncMock(return_value=False) # Mock synthesis mocker.patch( "src.orchestrator.synthesis_agent.run", new=AsyncMock(return_value=MagicMock(data="# Test Report\n\nContent here.")) ) orchestrator = Orchestrator( search_handler=mock_search, judge_handler=mock_judge, ) result = await orchestrator.run_to_completion("test question") assert isinstance(result, AgentResult) assert result.question == "test question" assert "Test Report" in result.report class TestAgentEvent: """Tests for AgentEvent model.""" def test_to_display_formats_correctly(self): """to_display should format event with icon.""" from src.utils.models import AgentEvent, AgentState event = AgentEvent( state=AgentState.SEARCHING, message="Searching PubMed...", iteration=1, ) display = event.to_display() assert "šŸ”" in display assert "SEARCHING" in display assert "Searching PubMed" in display def test_to_display_handles_all_states(self): """to_display should handle all AgentState values.""" from src.utils.models import AgentEvent, AgentState for state in AgentState: event = AgentEvent(state=state, message="Test") display = event.to_display() assert state.value.upper() in display ``` --- ## 7. Implementation Checklist - [ ] Add `AgentState`, `AgentEvent`, `AgentResult` models to `src/utils/models.py` - [ ] Implement `src/orchestrator.py` (complete Orchestrator class) - [ ] Implement `src/app.py` (complete Gradio UI) - [ ] Create `Dockerfile` - [ ] Update root `README.md` for HuggingFace Spaces - [ ] Run `uv run pytest tests/unit/test_orchestrator.py -v` — **ALL TESTS MUST PASS** - [ ] Run `uv run ruff check src` — **NO ERRORS** - [ ] Run `uv run mypy src` — **NO ERRORS** - [ ] Run `uv run python src/app.py` — **VERIFY UI LOADS** - [ ] Test with real query locally - [ ] Build Docker image: `docker build -t deepcritical .` - [ ] Commit: `git commit -m "feat: phase 4 orchestrator and UI complete"` --- ## 8. Definition of Done Phase 4 is **COMPLETE** when: 1. āœ… All unit tests pass 2. āœ… Orchestrator yields streaming AgentEvents 3. āœ… Orchestrator respects max_iterations 4. āœ… Graceful error handling with error events 5. āœ… Gradio UI renders streaming updates 6. āœ… Ruff and mypy pass with no errors 7. āœ… Docker builds successfully 8. āœ… Manual smoke test works: ```bash # Run locally uv run python src/app.py # Open http://localhost:7860 and test: # "What existing drugs might help treat long COVID fatigue?" # Verify: # - Status updates stream in real-time # - Final report is formatted as markdown # - No errors in console ``` ```