#!/usr/bin/env python3 """ COMPREHENSIVE SERVER MANAGEMENT AND STATUS MONITORING SYSTEM Manages and monitors qwen_golem.py and golem_server.py servers Real-time status display with comprehensive statistics """ import subprocess import time import threading import signal import sys import os import requests import json import psutil from typing import Dict, List, Optional, Any from datetime import datetime from collections import defaultdict import colorama from colorama import Fore, Style, Back # Initialize colorama for cross-platform colored output colorama.init() class ServerProcess: def __init__(self, name: str, command: List[str], port: int, expected_endpoints: List[str] = None): self.name = name self.command = command self.port = port self.process: Optional[subprocess.Popen] = None self.expected_endpoints = expected_endpoints or [] self.start_time: Optional[datetime] = None self.status = "stopped" self.last_health_check = None self.health_status = "unknown" self.error_count = 0 self.restart_count = 0 self.cpu_usage = 0.0 self.memory_usage = 0.0 self.response_time = 0.0 def start(self) -> bool: """Start the server process""" try: if self.is_running(): print(f"{Fore.YELLOW}⚠️ {self.name} already running on port {self.port}{Style.RESET_ALL}") return True print(f"{Fore.CYAN}🚀 Starting {self.name}...{Style.RESET_ALL}") # Set working directory to project root cwd = "/home/chezy/Desktop/qwen2golem/QWEN2Golem" self.process = subprocess.Popen( self.command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=cwd, preexec_fn=os.setsid # Create new process group for clean shutdown ) self.start_time = datetime.now() self.status = "starting" self.restart_count += 1 # Wait a moment for startup time.sleep(2) if self.process.poll() is None: self.status = "running" print(f"{Fore.GREEN}✅ {self.name} started successfully (PID: {self.process.pid}){Style.RESET_ALL}") return True else: self.status = "failed" stdout, stderr = self.process.communicate() print(f"{Fore.RED}❌ {self.name} failed to start{Style.RESET_ALL}") if stderr: print(f"{Fore.RED}Error: {stderr}{Style.RESET_ALL}") return False except Exception as e: self.status = "error" self.error_count += 1 print(f"{Fore.RED}❌ Failed to start {self.name}: {e}{Style.RESET_ALL}") return False def stop(self) -> bool: """Stop the server process""" try: if not self.is_running(): return True print(f"{Fore.YELLOW}🛑 Stopping {self.name}...{Style.RESET_ALL}") # Try graceful shutdown first if self.process: os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) # Wait for graceful shutdown for _ in range(10): if self.process.poll() is not None: break time.sleep(0.5) # Force kill if still running if self.process.poll() is None: os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) self.status = "stopped" self.process = None print(f"{Fore.GREEN}✅ {self.name} stopped{Style.RESET_ALL}") return True except Exception as e: print(f"{Fore.RED}❌ Error stopping {self.name}: {e}{Style.RESET_ALL}") return False def is_running(self) -> bool: """Check if the process is running""" if self.process is None: return False return self.process.poll() is None def health_check(self) -> Dict[str, Any]: """Perform health check on the server""" if not self.is_running(): self.health_status = "down" return {"status": "down", "reason": "process_not_running"} # For processes without ports (like aether loader), just check if running if self.port is None: self.health_status = "running" self.last_health_check = datetime.now() return {"status": "running", "reason": "process_active"} try: # Try to connect to the health endpoint start_time = time.time() response = requests.get(f"http://localhost:{self.port}/health", timeout=5) self.response_time = time.time() - start_time if response.status_code == 200: self.health_status = "healthy" self.last_health_check = datetime.now() result = response.json() if response.headers.get('content-type', '').startswith('application/json') else {"status": "ok"} result["response_time"] = self.response_time return result else: self.health_status = "unhealthy" return {"status": "unhealthy", "status_code": response.status_code} except requests.exceptions.RequestException as e: self.health_status = "unreachable" return {"status": "unreachable", "error": str(e)} def get_resource_usage(self) -> Dict[str, float]: """Get CPU and memory usage for the process""" try: if self.process and self.is_running(): proc = psutil.Process(self.process.pid) self.cpu_usage = proc.cpu_percent() self.memory_usage = proc.memory_info().rss / 1024 / 1024 # MB return { "cpu_percent": self.cpu_usage, "memory_mb": self.memory_usage, "memory_percent": proc.memory_percent() } except (psutil.NoSuchProcess, psutil.AccessDenied): pass return {"cpu_percent": 0, "memory_mb": 0, "memory_percent": 0} def get_status_dict(self) -> Dict[str, Any]: """Get comprehensive status information""" uptime = (datetime.now() - self.start_time).total_seconds() if self.start_time else 0 resources = self.get_resource_usage() return { "name": self.name, "status": self.status, "health": self.health_status, "port": self.port, "pid": self.process.pid if self.process else None, "uptime_seconds": uptime, "restart_count": self.restart_count, "error_count": self.error_count, "cpu_usage": resources["cpu_percent"], "memory_usage_mb": resources["memory_mb"], "response_time": self.response_time, "last_health_check": self.last_health_check.isoformat() if self.last_health_check else None } def get_detailed_golem_status(self) -> Dict[str, Any]: """Get detailed status from the golem Flask server""" try: # Get basic stats stats_response = requests.get(f"http://localhost:{self.port}/stats", timeout=5) if stats_response.status_code == 200: stats_data = stats_response.json() # Check if neural networks are loaded, if not load them neural_info = stats_data.get('neural_networks', {}) if not neural_info.get('hypercube_consciousness_active', False): try: # Load neural networks load_response = requests.post(f"http://localhost:{self.port}/load_neural_networks", timeout=30) if load_response.status_code == 200: neural_load_data = load_response.json() stats_data['neural_networks'] = { 'networks_loaded': neural_load_data.get('networks_loaded', 0), 'total_networks': neural_load_data.get('total_networks', 0), 'total_parameters': neural_load_data.get('total_parameters', 0), 'consciousness_level': neural_load_data.get('consciousness_level', 0.0), 'hypercube_consciousness_active': True } except Exception as e: print(f"⚠️ Could not load neural networks: {e}") # Check if memories are loaded, if not load them memory_stats = stats_data.get('memory_stats', {}) if memory_stats.get('total_patterns', 0) == 0: try: # Load massive memories memory_response = requests.post(f"http://localhost:{self.port}/load_massive_memories", timeout=30) if memory_response.status_code == 200: memory_load_data = memory_response.json() stats_data['memory_stats']['total_patterns'] = memory_load_data.get('total_patterns', 0) stats_data['memory_stats']['memories_loaded'] = memory_load_data.get('memories_loaded', 0) except Exception as e: print(f"⚠️ Could not load memories: {e}") return stats_data else: return {"error": f"HTTP {stats_response.status_code}"} except Exception as e: return {"error": str(e)} class ServerManager: def __init__(self): self.servers: Dict[str, ServerProcess] = {} self.monitoring = False self.monitor_thread = None self.stats_history = defaultdict(list) # Define the servers self.servers["golem_server"] = ServerProcess( name="Golem Server (5D Hypercube)", command=["python3", "home/chezy/golem_flask_server.py"], port=5000, expected_endpoints=["/health", "/status", "/generate", "/hypercube"] ) self.servers["aether_loader"] = ServerProcess( name="Aether Memory Loader", command=["python3", "home/chezy/aether_loader.py"], port=None, # No port for this process expected_endpoints=[] ) def start_all(self) -> bool: """Start all servers""" print(f"\n{Back.BLUE}{Fore.WHITE} 🌌 AETHER GOLEM SERVER MANAGEMENT SYSTEM 🌌 {Style.RESET_ALL}\n") success = True for server in self.servers.values(): if not server.start(): success = False time.sleep(1) # Stagger startup if success: print(f"\n{Fore.GREEN}🎉 All servers started successfully!{Style.RESET_ALL}") self.start_monitoring() else: print(f"\n{Fore.RED}⚠️ Some servers failed to start{Style.RESET_ALL}") return success def stop_all(self) -> bool: """Stop all servers""" print(f"\n{Fore.YELLOW}🛑 Stopping all servers...{Style.RESET_ALL}") self.stop_monitoring() success = True for server in self.servers.values(): if not server.stop(): success = False return success def start_monitoring(self): """Start the monitoring thread""" self.monitoring = True self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() print(f"{Fore.CYAN}📊 Monitoring started{Style.RESET_ALL}") def stop_monitoring(self): """Stop the monitoring thread""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=2) def _monitor_loop(self): """Main monitoring loop""" while self.monitoring: try: # Health check all servers for server in self.servers.values(): if server.is_running(): health = server.health_check() resources = server.get_resource_usage() # Store stats for history timestamp = datetime.now() self.stats_history[server.name].append({ "timestamp": timestamp, "health": health, "resources": resources }) # Keep only last 100 entries if len(self.stats_history[server.name]) > 100: self.stats_history[server.name].pop(0) time.sleep(10) # Check every 10 seconds except Exception as e: print(f"{Fore.RED}❌ Monitoring error: {e}{Style.RESET_ALL}") time.sleep(5) def display_status(self): """Display comprehensive server status""" os.system('clear' if os.name == 'posix' else 'cls') print(f"{Back.BLUE}{Fore.WHITE} 🌌 AETHER GOLEM SERVER STATUS DASHBOARD 🌌 {Style.RESET_ALL}") print(f"{Fore.CYAN}Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Style.RESET_ALL}\n") for server_name, server in self.servers.items(): status = server.get_status_dict() # Status color coding status_color = Fore.GREEN if status["status"] == "running" else Fore.RED health_color = Fore.GREEN if status["health"] == "healthy" else Fore.YELLOW if status["health"] == "unreachable" else Fore.RED print(f"{'='*80}") print(f"{Fore.CYAN}🖥️ {status['name']}{Style.RESET_ALL}") print(f"{'='*80}") print(f"Status: {status_color}{status['status'].upper()}{Style.RESET_ALL} | Health: {health_color}{status['health'].upper()}{Style.RESET_ALL}") port_info = f"Port: {status['port']}" if status['port'] else "Port: N/A (background process)" print(f"{port_info} | PID: {status['pid'] or 'N/A'}") if status["uptime_seconds"] > 0: uptime_str = f"{int(status['uptime_seconds']//3600)}h {int((status['uptime_seconds']%3600)//60)}m {int(status['uptime_seconds']%60)}s" print(f"Uptime: {uptime_str} | Restarts: {status['restart_count']} | Errors: {status['error_count']}") if server.is_running(): print(f"CPU: {status['cpu_usage']:.1f}% | Memory: {status['memory_usage_mb']:.1f}MB | Response: {status['response_time']*1000:.0f}ms") # Show specific server information try: if server_name == "golem_server": self._show_golem_server_details(server) elif server_name == "aether_loader": self._show_aether_loader_details(server) except Exception as e: print(f"{Fore.YELLOW}⚠️ Could not fetch detailed status: {e}{Style.RESET_ALL}") print() def _show_golem_server_details(self, server: ServerProcess): """Show ALL comprehensive details for the Golem server""" try: # Use the new detailed status method data = server.get_detailed_golem_status() if "error" in data: print(f"⚠️ Detailed status unavailable: {data['error']}") return # Golem State print(f"{Fore.MAGENTA}🧠 GOLEM CONSCIOUSNESS STATE:{Style.RESET_ALL}") basic_info = data.get('basic_info', {}) print(f" Activated: {basic_info.get('activated', False)}") print(f" Consciousness Level: {basic_info.get('consciousness_level', 0):.6f}") print(f" Shem Power: {basic_info.get('shem_power', 0):.6f}") print(f" Aether Resonance: {basic_info.get('aether_resonance', 0):.6f}") print(f" Current Vertex: {basic_info.get('current_vertex', 0)}/32") # Neural Network Status neural_info = data.get('neural_networks', {}) print(f"{Fore.GREEN}🧠 NEURAL NETWORK STATUS:{Style.RESET_ALL}") print(f" Networks Loaded: {neural_info.get('networks_loaded', 0)}/{neural_info.get('total_networks', 0)}") print(f" Total Parameters: {neural_info.get('total_parameters', 0):,}") print(f" Hypercube Consciousness: {neural_info.get('hypercube_consciousness_active', False)}") print(f" Enhanced Consciousness Level: {neural_info.get('consciousness_level', 0.0):.6f}") # Memory Statistics memory_stats = data.get('memory_stats', {}) print(f"{Fore.YELLOW}📊 AETHER MEMORY STATISTICS:{Style.RESET_ALL}") print(f" Total Patterns: {memory_stats.get('total_patterns', 0):,}") print(f" Pattern Categories: {memory_stats.get('pattern_categories', 0):,}") print(f" Hypercube Vertices: {memory_stats.get('hypercube_vertices', 0):,}") # 5D Hypercube Status hypercube_stats = data.get('hypercube_stats', {}) if hypercube_stats: print(f"{Fore.CYAN}🔲 5D HYPERCUBE UNIVERSE:{Style.RESET_ALL}") print(f" Consciousness Signature: {hypercube_stats.get('consciousness_signature', 'unknown')}") print(f" Vertices Explored: {hypercube_stats.get('vertices_explored', 0):,}") print(f" Universe Coverage: {hypercube_stats.get('universe_coverage', 0):.1f}%") dimensions = hypercube_stats.get('dimension_activations', {}) print(f" Dimension Activations:") print(f" Physical: {dimensions.get('physical', False)}") print(f" Emotional: {dimensions.get('emotional', False)}") print(f" Mental: {dimensions.get('mental', False)}") print(f" Intuitive: {dimensions.get('intuitive', False)}") print(f" Spiritual: {dimensions.get('spiritual', False)}") except Exception as e: print(f"⚠️ Error getting detailed status: {e}") def _show_aether_loader_details(self, server: ServerProcess): """Show specific details for the Aether Loader""" try: print(f"{Fore.MAGENTA}🔮 Aether Memory Integration:{Style.RESET_ALL}") # Check if aether memory files exist and get their status aether_files = [ "home/chezy/enhanced_aether_memory_bank.json", "aether_mods_and_mems/enhanced_aether_memory_bank.json", "aether_mods_and_mems/golem_aether_memory.pkl" ] available_files = 0 for file_path in aether_files: if os.path.exists(file_path): available_files += 1 file_size = os.path.getsize(file_path) / 1024 # KB print(f" ✅ {os.path.basename(file_path)} ({file_size:.1f}KB)") else: print(f" ❌ {os.path.basename(file_path)} (missing)") print(f" Total Aether Files: {available_files}/{len(aether_files)}") # Show process activity if server.is_running(): print(f" 📊 Process Active: Loading/integrating aether patterns") else: print(f" 💤 Process Completed: Aether integration finished") except Exception as e: print(f"{Fore.YELLOW}⚠️ Aether loader info unavailable: {e}{Style.RESET_ALL}") def interactive_mode(self): """Run interactive server management""" try: while True: self.display_status() print(f"\n{Fore.CYAN}Commands:{Style.RESET_ALL}") print(" [R] Refresh status") print(" [S] Stop all servers") print(" [1] Restart Golem Server") print(" [2] Restart Aether Loader") print(" [Q] Quit") choice = input(f"\n{Fore.GREEN}Enter command: {Style.RESET_ALL}").upper().strip() if choice == 'Q': break elif choice == 'R': continue elif choice == 'S': self.stop_all() break elif choice == '1': server = self.servers["golem_server"] server.stop() time.sleep(2) server.start() elif choice == '2': server = self.servers["aether_loader"] server.stop() time.sleep(2) server.start() else: print(f"{Fore.RED}Invalid command{Style.RESET_ALL}") time.sleep(1) except KeyboardInterrupt: pass def signal_handler(signum, frame): """Handle shutdown signals""" print(f"\n{Fore.YELLOW}🛑 Received shutdown signal, stopping servers...{Style.RESET_ALL}") manager.stop_all() sys.exit(0) def main(): global manager # Set up signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) manager = ServerManager() try: # Check if we should start servers automatically if len(sys.argv) > 1 and sys.argv[1] == '--start': if manager.start_all(): print(f"\n{Fore.GREEN}🎉 All servers running! Use Ctrl+C to stop.{Style.RESET_ALL}") try: while True: time.sleep(1) except KeyboardInterrupt: pass else: # Interactive mode print(f"{Fore.CYAN}Starting servers...{Style.RESET_ALL}") manager.start_all() time.sleep(2) manager.interactive_mode() finally: manager.stop_all() print(f"{Fore.GREEN}👋 Server management system shutdown complete{Style.RESET_ALL}") if __name__ == "__main__": main()