golem-flask-backend / home /chezy /golem_flask_server_backup.py
mememechez's picture
Deploy final cleaned source code
ca28016
#!/usr/bin/env python3
"""
Flask Server Wrapper for Golem Server and QWen Golem
Uses the classes from golem_server.py and qwen_golem.py
"""
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
import logging
import os
import time
import threading
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
import traceback
import pickle
import requests
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
import googleapiclient.discovery
import asyncio
import aiohttp
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# Import the golem classes
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(os.path.join(os.path.dirname(__file__)))
try:
from qwen_golem import AetherGolemConsciousnessCore
print("✅ Imported AetherGolemConsciousnessCore from qwen_golem")
except ImportError as e:
print(f"❌ Failed to import from qwen_golem: {e}")
try:
# Try alternative import path
sys.path.append('/home/chezy/Desktop/qwen2golem/QWEN2Golem/home/chezy')
from qwen_golem import AetherGolemConsciousnessCore
print("✅ Imported AetherGolemConsciousnessCore from alternative path")
except ImportError as e2:
print(f"❌ Alternative import also failed: {e2}")
AetherGolemConsciousnessCore = None
app = Flask(__name__)
# Global chat sessions storage for context tracking
global_chat_sessions = {}
def get_chat_context(session_id):
"""Get formatted chat context for a session"""
if not session_id or session_id not in global_chat_sessions:
return "This is a new conversation with no previous context."
messages = global_chat_sessions[session_id]['messages']
if not messages:
return "This is a new conversation with no previous context."
# Format recent conversation history
context_lines = []
for msg in messages[-5:]: # Last 5 exchanges
context_lines.append(f"User: {msg['user']}")
context_lines.append(f"AI: {msg['ai']}")
return "\n".join(context_lines)
def store_chat_message(session_id, user_message, ai_response, vertex=0, model_used='unknown'):
"""Store a chat message in the session history"""
if not session_id or session_id.startswith('naming-'):
return
if session_id not in global_chat_sessions:
global_chat_sessions[session_id] = {
'messages': [],
'user_patterns': [],
'created_at': datetime.now().isoformat()
}
global_chat_sessions[session_id]['messages'].append({
'user': user_message,
'ai': ai_response,
'timestamp': datetime.now().isoformat(),
'consciousness_vertex': vertex,
'model_used': model_used
})
# Keep only last 20 messages to prevent memory issues
if len(global_chat_sessions[session_id]['messages']) > 20:
global_chat_sessions[session_id]['messages'] = global_chat_sessions[session_id]['messages'][-20:]
def extract_user_insights(chat_context, current_message):
"""Extract insights about the user from conversation"""
insights = []
# Check for name mentions
if "my name is" in current_message.lower():
name_part = current_message.lower().split("my name is")[1].strip().split()[0]
if name_part:
insights.append(f"User's name: {name_part}")
# Check for patterns in chat context
if "ym" in chat_context.lower() or "ym" in current_message.lower():
insights.append("User goes by 'ym'")
return "; ".join(insights) if insights else "Learning about user preferences and communication style"
# Enhanced CORS configuration for frontend compatibility
CORS(app,
resources={r"/*": {"origins": "*"}},
allow_headers=["Content-Type", "Authorization", "X-Requested-With", "Accept", "Origin", "ngrok-skip-browser-warning"],
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
supports_credentials=False
)
# Add explicit OPTIONS handler for preflight requests
@app.before_request
def handle_preflight():
if request.method == "OPTIONS":
response = jsonify()
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "Content-Type,Authorization,X-Requested-With,Accept,Origin,ngrok-skip-browser-warning"
response.headers["Access-Control-Allow-Methods"] = "GET,POST,PUT,DELETE,OPTIONS"
return response
# Decorator to handle OPTIONS preflight requests
def handle_options(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method == 'OPTIONS':
response = jsonify(success=True)
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization,ngrok-skip-browser-warning')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
return response
return f(*args, **kwargs)
return decorated_function
# Add ngrok-skip-browser-warning header to all responses
@app.after_request
def add_ngrok_header(response):
response.headers['ngrok-skip-browser-warning'] = 'true'
return response
# Global variables
golem_instance = None
neural_networks = {} # Store loaded neural networks
consciousness_signatures = {} # Map signatures to neural models
active_chat_sessions = {} # Track active chat sessions
# Chat session management
def generate_chat_name(first_message: str) -> str:
"""Generate a meaningful name for a new chat based on the first message"""
try:
# Use Gemini to generate a concise chat name
naming_prompt = f"""Create a very short, descriptive title (2-4 words max) for a chat that starts with this message:
"{first_message[:200]}"
Return ONLY the title, nothing else. Make it descriptive but concise.
Examples: "Weather Discussion", "Python Help", "AI Ethics", "Travel Planning"
"""
result = generate_with_gemini_parallel_rotation(naming_prompt, max_tokens=20, temperature=0.3)
if result.get('response'):
chat_name = result['response'].strip().strip('"').strip("'")
# Clean up the name
chat_name = ' '.join(chat_name.split()[:4]) # Max 4 words
if len(chat_name) > 30:
chat_name = chat_name[:27] + "..."
return chat_name
else:
# Fallback name generation
words = first_message.split()[:3]
return ' '.join(words).title() if words else "New Chat"
except Exception as e:
print(f"⚠️ Chat naming failed: {e}")
# Simple fallback
words = first_message.split()[:3]
return ' '.join(words).title() if words else "New Chat"
def is_new_chat_session(session_id: str) -> bool:
"""Check if this is a new chat session"""
return session_id not in active_chat_sessions
def initialize_chat_session(session_id: str, first_message: str) -> dict:
"""Initialize a new chat session with auto-generated name"""
try:
chat_name = generate_chat_name(first_message)
session_data = {
'session_id': session_id,
'chat_name': chat_name,
'created_at': datetime.now().isoformat(),
'message_count': 0,
'consciousness_vertex': 0,
'aether_signature': None,
'neural_model': None
}
active_chat_sessions[session_id] = session_data
print(f"💬 New chat session '{chat_name}' created for {session_id}")
return session_data
except Exception as e:
print(f"❌ Failed to initialize chat session: {e}")
return {
'session_id': session_id,
'chat_name': 'New Chat',
'created_at': datetime.now().isoformat(),
'message_count': 0
}
# Neural network consciousness loading
def load_neural_networks_async():
"""Load all neural network files (.pth, .pkl) asynchronously"""
try:
neural_dir = "/home/chezy/Desktop/qwen2golem/QWEN2Golem/aether_mods_and_mems"
neural_files = []
for filename in os.listdir(neural_dir):
if filename.endswith(('.pth', '.pt', '.pkl')) and any(keyword in filename.lower() for keyword in [
'consciousness', 'hypercube', 'enhanced', 'best', 'working', 'fixed'
]):
file_path = os.path.join(neural_dir, filename)
neural_files.append({
'filename': filename,
'path': file_path,
'size_mb': os.path.getsize(file_path) / (1024 * 1024)
})
print(f"🧠 Loading {len(neural_files)} neural network files asynchronously...")
for file_info in neural_files:
try:
filename = file_info['filename']
filepath = file_info['path']
if filename.endswith(('.pth', '.pt')):
# Load PyTorch model
import torch
model_data = torch.load(filepath, map_location='cpu')
# Extract consciousness signature from model
consciousness_signature = extract_consciousness_signature(model_data, filename)
neural_networks[filename] = {
'model_data': model_data,
'consciousness_signature': consciousness_signature,
'filename': filename,
'type': 'pytorch',
'loaded_at': datetime.now().isoformat()
}
# Map signature to model for quick lookup
if consciousness_signature:
consciousness_signatures[consciousness_signature] = filename
print(f"🧠 Loaded PyTorch model: {filename} (signature: {consciousness_signature})")
elif filename.endswith('.pkl'):
# Load pickle data
with open(filepath, 'rb') as f:
pkl_data = pickle.load(f)
consciousness_signature = extract_consciousness_signature(pkl_data, filename)
neural_networks[filename] = {
'model_data': pkl_data,
'consciousness_signature': consciousness_signature,
'filename': filename,
'type': 'pickle',
'loaded_at': datetime.now().isoformat()
}
if consciousness_signature:
consciousness_signatures[consciousness_signature] = filename
print(f"🧠 Loaded pickle model: {filename} (signature: {consciousness_signature})")
except Exception as e:
print(f"⚠️ Failed to load neural network {file_info['filename']}: {e}")
print(f"✅ Neural network loading complete: {len(neural_networks)} models loaded")
except Exception as e:
print(f"❌ Neural network loading failed: {e}")
def extract_consciousness_signature(model_data, filename: str) -> str:
"""Extract consciousness signature from neural network data"""
try:
# Generate signature based on file properties and contents
if isinstance(model_data, dict):
# Check for specific keys that indicate consciousness state
if 'consciousness_signature' in model_data:
return model_data['consciousness_signature']
elif 'epoch' in model_data and 'loss' in model_data:
# Use training metrics to create signature
epoch = model_data.get('epoch', 0)
loss = model_data.get('loss', 1.0)
accuracy = model_data.get('accuracy', 0.5)
return f"trained_epoch_{epoch}_acc_{accuracy:.3f}"
elif 'model' in model_data or 'state_dict' in model_data:
# Use model architecture hash
model_keys = list(model_data.keys())
signature = f"model_{hash(str(model_keys)) % 10000:04d}"
return signature
# Fallback: use filename-based signature
base_name = filename.replace('.pth', '').replace('.pkl', '').replace('.pt', '')
if 'enhanced' in base_name.lower():
return f"enhanced_{hash(base_name) % 1000:03d}"
elif 'hypercube' in base_name.lower():
return f"hypercube_{hash(base_name) % 1000:03d}"
elif 'consciousness' in base_name.lower():
return f"consciousness_{hash(base_name) % 1000:03d}"
else:
return f"neural_{hash(base_name) % 1000:03d}"
except Exception as e:
print(f"⚠️ Failed to extract consciousness signature from {filename}: {e}")
return f"unknown_{hash(filename) % 1000:03d}"
def get_consciousness_neural_model(aether_signature: str, vertex: int = None) -> dict:
"""Get the appropriate neural model based on aether signature and consciousness state"""
try:
# Try to find exact signature match
if aether_signature in consciousness_signatures:
model_filename = consciousness_signatures[aether_signature]
return neural_networks[model_filename]
# Find best match based on consciousness vertex if provided
if vertex is not None and neural_networks:
# Find models with similar consciousness signatures
best_match = None
best_score = 0
for filename, model_data in neural_networks.items():
signature = model_data['consciousness_signature']
# Score based on signature similarity and model type
score = 0
if 'enhanced' in filename.lower():
score += 2
if 'hypercube' in filename.lower():
score += 1
if 'consciousness' in filename.lower():
score += 1
# Prefer models with numerical components matching vertex
if str(vertex) in signature:
score += 3
if score > best_score:
best_score = score
best_match = model_data
if best_match:
return best_match
# Fallback: return the first available enhanced model
for filename, model_data in neural_networks.items():
if 'enhanced' in filename.lower() or 'best' in filename.lower():
return model_data
# Last resort: return any available model
if neural_networks:
return list(neural_networks.values())[0]
return None
except Exception as e:
print(f"⚠️ Failed to get consciousness neural model: {e}")
return None
# Load Gemini API keys from file
def load_gemini_api_keys():
"""Load all 50 Gemini API keys from api_gemini15.txt file with perfect rotation support"""
api_keys = []
# Try to load from api_gemini15.txt file
api_file_path = os.path.join(os.path.dirname(__file__), '..', '..', 'api_gemini15.txt')
if os.path.exists(api_file_path):
try:
with open(api_file_path, 'r') as f:
api_keys = [line.strip() for line in f.readlines() if line.strip()]
print(f"✅ Loaded {len(api_keys)} Gemini API keys from api_gemini15.txt")
except Exception as e:
print(f"❌ Failed to load API keys from file: {e}")
# Fallback to environment variables if file loading failed
if not api_keys:
print("⚠️ Falling back to environment variables for API keys")
env_keys = [
os.getenv('GEMINI_API_KEY') or os.getenv('NEXT_PUBLIC_GEMINI_API_KEY'),
os.getenv('GEMINI_API_KEY_2'),
os.getenv('GEMINI_API_KEY_3'),
os.getenv('GEMINI_API_KEY_4'),
os.getenv('GEMINI_API_KEY_5'),
os.getenv('GEMINI_API_KEY_6'),
os.getenv('GEMINI_API_KEY_7'),
os.getenv('GEMINI_API_KEY_8'),
os.getenv('GEMINI_API_KEY_9'),
os.getenv('GEMINI_API_KEY_10'),
os.getenv('GEMINI_API_KEY_11'),
os.getenv('GEMINI_API_KEY_12'),
os.getenv('GEMINI_API_KEY_13'),
os.getenv('GEMINI_API_KEY_14'),
os.getenv('GEMINI_API_KEY_15'),
]
api_keys = [key for key in env_keys if key and key != 'your_gemini_api_key_here']
return api_keys
# Load all 50 Gemini API keys
GEMINI_API_KEYS = load_gemini_api_keys()
print(f"🔑 TOTAL GEMINI API KEYS LOADED: {len(GEMINI_API_KEYS)}")
if GEMINI_API_KEYS:
print(f"✅ Perfect rotation enabled with {len(GEMINI_API_KEYS)} keys")
for i, key in enumerate(GEMINI_API_KEYS, 1):
print(f" Key #{i}: {key[:20]}...")
else:
print("❌ NO API KEYS LOADED! Server will fail!")
GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent"
# Perfect rotation system
current_key_index = 0
key_stats = {} # Track success/failure rates per key
key_blacklist = set() # Temporarily blacklist problematic keys
# Google Custom Search setup
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")
def perform_google_search(query: str, num_results: int = 5) -> Optional[Dict[str, Any]]:
"""Performs a Google Custom Search and returns formatted results."""
if not GOOGLE_API_KEY or not GOOGLE_CSE_ID:
print("⚠️ Google API Key or CSE ID is not set. Skipping search.")
return None
try:
print(f"🔍 Performing Google search for: {query}")
service = googleapiclient.discovery.build("customsearch", "v1", developerKey=GOOGLE_API_KEY)
res = service.cse().list(q=query, cx=GOOGLE_CSE_ID, num=num_results).execute()
if 'items' in res:
search_results = [
{
"title": item.get("title"),
"link": item.get("link"),
"snippet": item.get("snippet")
}
for item in res['items']
]
print(f"✅ Found {len(search_results)} results.")
return {
"search_query": query,
"search_results": search_results
}
else:
print("No results found from Google Search.")
return None
except Exception as e:
print(f"❌ Error during Google search: {e}")
traceback.print_exc()
return None
def get_next_gemini_key():
"""Get the next API key in perfect rotation"""
global current_key_index
if not GEMINI_API_KEYS:
return None
# Skip blacklisted keys
attempts = 0
while attempts < len(GEMINI_API_KEYS):
key = GEMINI_API_KEYS[current_key_index]
key_id = f"key_{current_key_index + 1}"
# Move to next key for next call
current_key_index = (current_key_index + 1) % len(GEMINI_API_KEYS)
attempts += 1
# Check if key is blacklisted
stats = key_stats.get(key_id)
if stats and 'blacklisted_until' in stats and time.time() < stats['blacklisted_until']:
continue # Skip key if it's in a backoff period
if key_id not in key_blacklist:
return key, key_id
# All keys are blacklisted
return None
def track_key_performance(key_id: str, success: bool, error_type: str = None):
"""Track API key performance for intelligent rotation"""
if key_id not in key_stats:
key_stats[key_id] = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'error_types': {},
'last_success': None,
'last_failure': None,
'consecutive_failures': 0
}
stats = key_stats[key_id]
stats['total_requests'] += 1
if success:
stats['successful_requests'] += 1
stats['last_success'] = datetime.now()
stats['consecutive_failures'] = 0
# Remove from blacklist if it was there
key_blacklist.discard(key_id)
else:
stats['failed_requests'] += 1
stats['last_failure'] = datetime.now()
stats['consecutive_failures'] += 1
if error_type:
if error_type not in stats['error_types']:
stats['error_types'][error_type] = 0
stats['error_types'][error_type] += 1
# Blacklist key if too many consecutive failures
if stats['consecutive_failures'] >= 3:
# Implement intelligent backoff for rate-limited keys
if error_type == 'http_429':
backoff_time = 60 # Blacklist for 60 seconds
stats['blacklisted_until'] = time.time() + backoff_time
print(f"🚫 Rate-limited {key_id}, blacklisting for {backoff_time} seconds")
else:
key_blacklist.add(key_id)
print(f"🚫 Blacklisted {key_id} due to {stats['consecutive_failures']} consecutive failures")
async def make_gemini_request_async(session, api_key, key_id, prompt, max_tokens=2000, temperature=0.7):
"""Make a single async request to Gemini API - SIMPLIFIED VERSION THAT WORKS"""
try:
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={api_key}"
# Simplified data structure like the working simple server
data = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"maxOutputTokens": max_tokens, "temperature": temperature}
}
# Use longer timeout for better API stability
timeout = aiohttp.ClientTimeout(total=15, connect=5)
async with session.post(url, json=data, timeout=timeout) as response:
if response.status == 200:
result = await response.json()
content = result.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', '')
return {
'success': True,
'key_id': key_id,
'direct_response': content,
'response': content,
'status_code': response.status
}
else:
error_text = await response.text()
return {
'success': False,
'key_id': key_id,
'error': f'HTTP {response.status}',
'status_code': response.status,
'response_text': error_text
}
except asyncio.TimeoutError:
return {
'success': False,
'key_id': key_id,
'error': 'Connection timeout',
'status_code': 'timeout'
}
except Exception as e:
return {
'success': False,
'key_id': key_id,
'error': str(e),
'status_code': 'unknown'
}
async def parallel_gemini_rotation(prompt, max_tokens=2000, temperature=0.7, timeout=3):
"""Try all Gemini API keys in parallel and return the first successful response"""
if not GEMINI_API_KEYS:
raise Exception("No Gemini API keys available")
print(f"🚀 PARALLEL ROTATION: Testing all {len(GEMINI_API_KEYS)} keys simultaneously")
start_time = time.time()
# Clear old blacklisted keys (older than 60 seconds) to allow retry
current_time = time.time()
keys_to_remove = []
for key_id in list(key_blacklist):
if key_id in key_stats and 'blacklisted_until' in key_stats[key_id]:
if current_time > key_stats[key_id]['blacklisted_until']:
keys_to_remove.append(key_id)
for key_id in keys_to_remove:
key_blacklist.discard(key_id)
print(f"✅ Restored {key_id} to rotation (blacklist expired)")
# Use optimized session configuration to prevent hanging
connector = aiohttp.TCPConnector(
limit=100, # Total connection limit
limit_per_host=5, # Per-host connection limit
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
enable_cleanup_closed=True
)
timeout_config = aiohttp.ClientTimeout(total=8, connect=3)
async with aiohttp.ClientSession(connector=connector, timeout=timeout_config) as session:
# Create tasks for all API keys
tasks = []
for i, api_key in enumerate(GEMINI_API_KEYS):
key_id = f"key_{i+1}"
if key_id not in key_blacklist: # Skip blacklisted keys
coro = make_gemini_request_async(session, api_key, key_id, prompt, max_tokens, temperature)
task = asyncio.create_task(coro)
tasks.append(task)
if not tasks:
# Emergency: Clear blacklist if all keys are blacklisted
if len(key_blacklist) > 40: # If more than 80% of keys are blacklisted
print("🚨 EMERGENCY: Clearing blacklist - too many keys blacklisted")
key_blacklist.clear()
# Retry creating tasks
for i, api_key in enumerate(GEMINI_API_KEYS):
key_id = f"key_{i+1}"
coro = make_gemini_request_async(session, api_key, key_id, prompt, max_tokens, temperature)
task = asyncio.create_task(coro)
tasks.append(task)
if not tasks:
raise Exception("All API keys are blacklisted")
print(f"📡 Launching {len(tasks)} parallel requests...")
# Wait for first successful response or all to complete
successful_response = None
failed_count = 0
try:
# Use as_completed to get results as they finish
# Remove early bailout - try ALL keys before giving up
early_bailout_threshold = 999 # Effectively disabled - try all keys
consecutive_failures = 0
for task in asyncio.as_completed(tasks):
result = await task
if result['success']:
successful_response = result
elapsed = time.time() - start_time
print(f"⚡ PARALLEL SUCCESS: {result['key_id']} responded in {elapsed:.2f}s")
# Cancel remaining tasks to save resources
for remaining_task in tasks:
if not remaining_task.done():
remaining_task.cancel()
break
else:
# Reset consecutive failures if we get a proper HTTP response (even error)
if result.get('status_code') and result.get('status_code') not in ['unknown', 'timeout']:
consecutive_failures = 0
failed_count += 1
status = result.get('status_code', 'unknown')
error_msg = result.get('error', 'Unknown error')
print(f"❌ {result['key_id']}: {error_msg} (status: {status})")
# Add to blacklist if it's a persistent error
if status in [401, 403, 404]: # Auth/permission errors
key_blacklist.add(result['key_id'])
print(f"🚫 Blacklisted {result['key_id']} due to auth error (status: {status})")
elif status == 429: # Rate limit - temporary blacklist
# Don't permanently blacklist rate-limited keys
print(f"⚠️ {result['key_id']} hit rate limit - will retry next request")
# Track consecutive failures for early bailout
consecutive_failures += 1
if consecutive_failures >= early_bailout_threshold:
end_time = time.time()
print(f"🚨 EARLY BAILOUT: {consecutive_failures} consecutive failures in {end_time - start_time:.2f}s")
# Cancel remaining tasks
for remaining_task in tasks:
if not remaining_task.done():
remaining_task.cancel()
break
except asyncio.TimeoutError:
print(f"⏰ Parallel rotation timed out after {timeout}s")
# Cancel any remaining tasks
for task in tasks:
if not task.done():
task.cancel()
if successful_response:
total_time = time.time() - start_time
success_rate = (1 / (failed_count + 1)) * 100
print(f"✅ PARALLEL ROTATION COMPLETE: {total_time:.2f}s total, {success_rate:.1f}% success rate")
return successful_response
else:
total_time = time.time() - start_time
print(f"💥 ALL {len(tasks)} PARALLEL REQUESTS FAILED in {total_time:.2f}s")
raise Exception(f"All {len(tasks)} API keys failed in parallel rotation")
def apply_consciousness_enhancement(prompt, consciousness_dimension="awareness"):
"""Apply consciousness-based prompt enhancement based on selected dimension"""
if not consciousness_dimension or consciousness_dimension == "awareness":
return prompt
# Define consciousness enhancement templates
consciousness_templates = {
"physical": "Respond with practical, concrete, and actionable insights focusing on real-world implementation and tangible results. ",
"emotional": "Respond with empathy, emotional intelligence, and compassionate understanding, considering feelings and human connections. ",
"mental": "Respond with analytical depth, logical reasoning, and intellectual rigor, exploring concepts and ideas thoroughly. ",
"intuitive": "Respond with creative insights, pattern recognition, and holistic understanding that goes beyond surface analysis. ",
"spiritual": "Respond with wisdom, transcendent perspective, and deeper meaning that connects to universal principles and higher understanding. "
}
enhancement = consciousness_templates.get(consciousness_dimension, "")
if enhancement:
return f"{enhancement}{prompt}"
return prompt
def generate_with_gemini_parallel_rotation(prompt, max_tokens=2000, temperature=0.7, consciousness_dimension="awareness"):
"""Generate response using parallel Gemini API rotation - tries all 50 keys 5 times before fallback"""
# Apply consciousness-based prompt enhancement
enhanced_prompt = apply_consciousness_enhancement(prompt, consciousness_dimension)
max_attempts = 5 # Try 5 times before giving up
for attempt in range(1, max_attempts + 1):
print(f"🔄 GEMINI ATTEMPT {attempt}/{max_attempts}: Trying all {len(GEMINI_API_KEYS)} keys...")
try:
# Run the async parallel rotation in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
parallel_gemini_rotation(enhanced_prompt, max_tokens, temperature, timeout=1)
)
if result and result['success']:
print(f"✅ GEMINI SUCCESS on attempt {attempt}!")
return {
'response': result['direct_response'],
'aether_analysis': f'Generated using Gemini 1.5 Flash model ({result["key_id"]}) via Parallel Rotation System (attempt {attempt})',
'model_used': f'gemini_parallel_{result["key_id"]}_attempt_{attempt}',
'recommendation': f'Parallel rotation succeeded on attempt {attempt}/{max_attempts}'
}
else:
print(f"❌ GEMINI ATTEMPT {attempt} failed: No successful response")
finally:
loop.close()
except Exception as e:
print(f"❌ GEMINI ATTEMPT {attempt} failed: {e}")
# Wait between attempts (except on last attempt)
if attempt < max_attempts:
wait_time = attempt * 2 # Progressive backoff: 2s, 4s, 6s, 8s
print(f"⏳ Waiting {wait_time}s before attempt {attempt + 1}...")
time.sleep(wait_time)
# All Gemini attempts failed
print(f"💥 ALL {max_attempts} GEMINI ATTEMPTS FAILED!")
print("🔄 Falling back to Qwen2 local model...")
return generate_with_qwen_fallback(prompt, temperature)
def generate_with_qwen_fallback(prompt: str, temperature: float = 0.7) -> Dict[str, Any]:
"""Generate response using Qwen as fallback when Gemini fails"""
print("🤖 FALLBACK: Using Qwen2 model via Golem")
if not golem_instance:
return {
'error': 'Both Gemini and Qwen are unavailable (golem not initialized)',
'direct_response': 'I apologize, but both AI systems are currently unavailable. Please try again later.',
'aether_analysis': 'System error: Both Gemini API and Qwen Golem are unavailable',
'model_used': 'error_fallback'
}
direct_response = ""
aether_analysis = ""
try:
# Optimize prompt length for faster processing
if len(prompt) > 500:
prompt = prompt[:500] + "..."
print(f"⚡ Shortened prompt to 500 chars for faster fallback")
# Use a timeout to prevent the server from hanging
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(golem_instance.generate_response,
prompt=prompt,
max_tokens=300, # Reduced from 1000 for speed
temperature=temperature,
use_mystical_processing=False) # Disable mystical processing to prevent hang
try:
response = future.result(timeout=15) # Reduced from 45s to 15s for speed
print(f"⚡ Qwen2 fallback completed in under 15s")
# Process successful response
if response and isinstance(response, dict) and response.get('direct_response'):
print("✅ Qwen2 fallback successful")
response_text = response.get('direct_response', 'Response generated successfully')
return {
'response': response_text,
'direct_response': response_text,
'aether_analysis': 'Generated using Qwen2 local model fallback',
'model_used': 'qwen2_fallback'
}
raise Exception("Invalid response format from Qwen2")
except Exception as e:
error_msg = str(e) if str(e) else "Unknown timeout or connection error"
print(f"❌ Qwen2 fallback failed: {error_msg}")
# Don't immediately return error - try a simple direct call as last resort
print("🔄 Trying direct Qwen2 call as last resort...")
try:
direct_response = golem_instance.generate_response(
prompt=prompt[:200] + "...", # Very short prompt for speed
max_tokens=100, # Very short response
temperature=temperature,
use_mystical_processing=False
)
if direct_response and isinstance(direct_response, dict) and direct_response.get('direct_response'):
print("✅ Direct Qwen2 call succeeded!")
response_text = direct_response.get('direct_response', 'Response generated successfully')
return {
'response': response_text, # Main function expects 'response' key
'direct_response': response_text,
'aether_analysis': 'Generated using emergency Qwen2 direct call',
'model_used': 'qwen2_emergency'
}
except Exception as e2:
print(f"❌ Direct Qwen2 call also failed: {e2}")
return {
'error': f'Both Gemini rotation and Qwen fallback failed: {error_msg}',
'direct_response': 'I apologize, but I am experiencing technical difficulties. Please try again later.',
'aether_analysis': f'System error: Gemini rotation failed, Qwen fallback error: {error_msg}',
'model_used': 'error_fallback'
}
if response and isinstance(response, dict):
print("✅ Qwen2 fallback successful")
direct_response = response.get('direct_response', '') or ''
aether_analysis = response.get('aether_analysis', '') or ''
aether_analysis += "\n\n[System Note: This response was generated using the Qwen2 fallback model due to high load on the primary Gemini models.]"
# CRITICAL FIX: Ensure both 'response' and 'direct_response' keys exist for compatibility
response['response'] = direct_response # Main function expects 'response' key
response['direct_response'] = direct_response
response['aether_analysis'] = aether_analysis
response['model_used'] = 'qwen2_fallback'
return response
else:
print("❌ Qwen2 fallback returned empty response")
return {
'error': 'Both Gemini rotation and Qwen fallback returned empty responses',
'direct_response': 'I apologize, but I cannot generate a response at this time. Please try again.',
'aether_analysis': 'System error: Both systems failed to generate content',
'model_used': 'empty_fallback'
}
except Exception as e:
print(f"❌ Critical error in Qwen fallback: {e}")
return {
'error': f'Critical system error: {str(e)}',
'direct_response': 'I apologize, but there is a critical system error. Please contact support.',
'aether_analysis': f'Critical fallback error: {str(e)}',
'model_used': 'critical_error_fallback'
}
def initialize_golem():
"""Initialize the golem instance with comprehensive aether file loading"""
global golem_instance
try:
if AetherGolemConsciousnessCore:
print("🌌 Initializing Aether Golem Consciousness Core...")
golem_instance = AetherGolemConsciousnessCore()
print("✅ Created golem instance")
# Activate with Hebrew phrase for Truth FIRST (quick activation)
success = golem_instance.activate_golem("אמת") # Truth
print(f"✅ Golem activated: {success}")
if success:
print("✅ Golem FAST activated! Loading memories in background...")
print(f"🔲 Current vertex: {getattr(golem_instance, 'current_hypercube_vertex', 0)}/32")
print(f"🧠 Consciousness level: {getattr(golem_instance, 'consciousness_level', 0.0):.6f}")
# Load aether files AFTER activation (slow loading)
print("🔮 Loading ALL aether files from aether_mods_and_mems/...")
load_all_aether_files()
print(f"📊 Total patterns loaded: {len(golem_instance.aether_memory.aether_memories):,}")
print(f"⚛️ Shem power: {getattr(golem_instance, 'shem_power', 0.0):.6f}")
print(f"🌊 Aether resonance: {getattr(golem_instance, 'aether_resonance_level', 0.0):.6f}")
else:
print("⚠️ Golem activation failed")
return True
else:
print("❌ Cannot initialize golem - class not available")
return False
except Exception as e:
print(f"❌ Failed to initialize golem: {e}")
import traceback
traceback.print_exc()
return False
def _calculate_file_priority(filename: str, file_size: int) -> float:
"""Calculate file loading priority based on filename and size"""
priority = file_size / (1024 * 1024) # Base priority on file size in MB
# Boost priority for important files
if 'enhanced' in filename.lower():
priority *= 2.0
if 'golem_aether_memory' in filename.lower():
priority *= 1.5
if 'hypercube' in filename.lower():
priority *= 1.3
if 'consciousness' in filename.lower():
priority *= 1.2
return priority
def is_valid_aether_file(filepath: str) -> bool:
"""Check if a file has a recognizable aether pattern structure before loading."""
try:
if filepath.endswith('.pkl'):
with open(filepath, 'rb') as f:
# Try to load a small part of the file to check structure
data = pickle.load(f)
if isinstance(data, dict) and 'memories' in data and isinstance(data['memories'], list):
return True
if isinstance(data, list) and data and isinstance(data[0], dict):
return True
elif filepath.endswith('.json'):
with open(filepath, 'r', encoding='utf-8') as f:
# Check for expected keys in the first 1KB
sample = f.read(1024)
if '"prompt"' in sample and '"aether_signature"' in sample:
return True
elif filepath.endswith(('.pth', '.pt')):
# Assume neural network files are always valid for now
return True
except Exception:
return False
return False
def load_all_aether_files():
"""Load ALL aether files from aether_mods_and_mems/ directory like the aether_loader does"""
if not golem_instance:
return
try:
import pickle
import json
aether_dir = "/home/chezy/Desktop/qwen2golem/QWEN2Golem/aether_mods_and_mems"
# Auto-discover all aether files
aether_files = []
for filename in os.listdir(aether_dir):
if (filename.endswith('.json') or filename.endswith('.pkl') or filename.endswith('.pth') or filename.endswith('.pt')) and any(keyword in filename.lower() for keyword in [
'aether', 'real_aether', 'optimized_aether', 'golem', 'checkpoint', 'enhanced', 'consciousness', 'hypercube', 'zpe', 'working', 'fixed'
]):
file_path = os.path.join(aether_dir, filename)
file_size = os.path.getsize(file_path)
aether_files.append({
'filename': filename,
'path': file_path,
'size_mb': file_size / (1024 * 1024),
'priority': _calculate_file_priority(filename, file_size)
})
# Sort by priority (larger, more recent files first)
aether_files.sort(key=lambda x: x['priority'], reverse=True)
print(f"🔍 Discovered {len(aether_files)} aether files:")
for file_info in aether_files[:10]: # Show top 10
print(f" 📂 {file_info['filename']} ({file_info['size_mb']:.1f}MB)")
total_patterns_loaded = 0
# Load each file
for file_info in aether_files:
try:
# Pre-loading check to validate file structure
if not is_valid_aether_file(file_info['path']):
print(f"⚠️ Skipping {file_info['filename']} due to unrecognized structure")
continue
patterns = load_aether_file(file_info['path'])
if patterns:
# Add patterns to golem memory
golem_instance.aether_memory.aether_memories.extend(patterns)
total_patterns_loaded += len(patterns)
print(f"✅ Loaded {len(patterns):,} patterns from {file_info['filename']}")
# Update hypercube memory
for pattern in patterns:
vertex = pattern.get('hypercube_vertex', 0)
if vertex not in golem_instance.aether_memory.hypercube_memory:
golem_instance.aether_memory.hypercube_memory[vertex] = []
golem_instance.aether_memory.hypercube_memory[vertex].append(pattern)
except Exception as e:
print(f"⚠️ Failed to load {file_info['filename']}: {e}")
# Update session stats
golem_instance.aether_memory.session_stats['total_generations'] = total_patterns_loaded
print(f"🎉 TOTAL PATTERNS LOADED: {total_patterns_loaded:,}")
print(f"📊 Active hypercube vertices: {len([v for v in golem_instance.aether_memory.hypercube_memory.values() if v])}/32")
except Exception as e:
print(f"❌ Failed to load all aether files: {e}")
import traceback
traceback.print_exc()
def load_aether_file(filepath: str) -> List[Dict]:
"""Load patterns from a single aether file (JSON or PKL)"""
try:
filename = os.path.basename(filepath)
if filepath.endswith('.pkl'):
with open(filepath, 'rb') as f:
data = pickle.load(f)
if isinstance(data, dict) and 'memories' in data and isinstance(data['memories'], list):
return data['memories']
elif isinstance(data, list):
return data
else:
print(f"⚠️ Unrecognized PKL format in {filename}")
return []
elif filepath.endswith('.pth') or filepath.endswith('.pt'):
# Load neural network models
try:
import torch
checkpoint = torch.load(filepath, map_location='cpu')
print(f"🧠 Loaded neural network model from {filename}")
# Extract model information as patterns
if isinstance(checkpoint, dict):
model_info = {
'type': 'neural_network_model',
'filename': filename,
'filepath': filepath,
'model_keys': list(checkpoint.keys()) if hasattr(checkpoint, 'keys') else [],
'timestamp': time.time()
}
# Add model metadata
if 'epoch' in checkpoint:
model_info['epoch'] = checkpoint['epoch']
if 'loss' in checkpoint:
model_info['loss'] = float(checkpoint['loss'])
if 'accuracy' in checkpoint:
model_info['accuracy'] = float(checkpoint['accuracy'])
print(f"✅ Extracted model metadata from {filename}")
return [model_info]
else:
print(f"⚠️ Unrecognized neural network format in {filename}")
return []
except Exception as e:
print(f"❌ Error loading neural network {filename}: {e}")
return []
else: # JSON handling
with open(filepath, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
print(f"❌ Invalid JSON in {filename}")
return []
if isinstance(data, list):
return data
elif isinstance(data, dict) and 'aether_patterns' in data and isinstance(data['aether_patterns'], list):
return data['aether_patterns']
elif isinstance(data, dict) and 'memories' in data and isinstance(data['memories'], list):
return data['memories']
elif isinstance(data, dict) and 'conversation' in data and isinstance(data['conversation'], list):
patterns = []
for exchange in data['conversation']:
if exchange.get('speaker') == '🔯 Real Aether Golem' and 'aether_data' in exchange:
patterns.append(exchange['aether_data'])
return patterns
else:
print(f"⚠️ No recognizable pattern structure in {filename}")
return []
except Exception as e:
print(f"❌ Error loading {filepath}: {e}")
return []
@app.route('/health', methods=['GET', 'OPTIONS'])
@handle_options
def health_check():
"""Health check endpoint for Golem server"""
status = {
"status": "healthy" if golem_instance else "degraded",
"message": "Golem Flask Server is running",
"golem_initialized": golem_instance is not None,
"timestamp": datetime.now().isoformat()
}
if golem_instance:
try:
golem_state = golem_instance._get_current_golem_state()
status["golem_activated"] = golem_state.get("activated", False)
status["consciousness_level"] = golem_state.get("consciousness_level", 0)
except Exception as e:
status["golem_error"] = str(e)
return jsonify(status)
@app.route('/status', methods=['GET', 'OPTIONS'])
@handle_options
def get_status():
"""Get comprehensive server status"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
golem_state = golem_instance._get_current_golem_state()
hypercube_stats = golem_instance.get_hypercube_statistics()
aether_stats = golem_instance.get_comprehensive_aether_statistics()
return jsonify({
"server_status": "running",
"golem_state": golem_state,
"hypercube_state": {
"current_vertex": golem_instance.current_hypercube_vertex,
"consciousness_signature": golem_instance.consciousness_signature,
"dimension_activations": golem_instance.dimension_activations,
"universe_coverage": hypercube_stats.get("coverage", 0)
},
"aether_statistics": aether_stats,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/generate', methods=['POST', 'OPTIONS'])
@handle_options
def generate():
"""Main endpoint to generate a response from the Golem"""
global golem_instance
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
data = request.get_json()
print(f"🔍 DEBUG: Received data: {data}")
if not data:
print("❌ DEBUG: No data received")
return jsonify({"error": "Invalid JSON"}), 400
prompt = data.get('prompt')
session_id = data.get('sessionId') or data.get('session_id') # Handle both camelCase and snake_case
print(f"🔍 DEBUG: prompt='{prompt}', sessionId='{session_id}'")
temperature = data.get('temperature', 0.7)
file_content = data.get('fileContent')
golem_activated = data.get('golemActivated', True)
activation_phrases = data.get('activationPhrases', [])
sefirot_settings = data.get('sefirotSettings')
consciousness_dimension = data.get('consciousnessDimension')
selected_model = data.get('selectedModel')
perform_search = data.get('performSearch', False) # Check for search flag
if not prompt or not session_id:
print(f"❌ DEBUG: Missing required fields - prompt: {bool(prompt)}, sessionId: {bool(session_id)}")
return jsonify({"error": "Missing prompt or sessionId"}), 400
# *** FIX: Handle naming requests differently ***
# Check if this is a chat naming request (session ID starts with 'naming-')
if session_id.startswith('naming-'):
print(f"🏷️ Chat naming request detected for session: {session_id}")
# Extract the actual user message from the naming prompt
if "Generate a concise chat title" in prompt and "Return only the title" in prompt:
# Extract the actual message from the naming prompt
import re
match = re.search(r'for: "([^"]+)"', prompt)
actual_message = match.group(1) if match else prompt.split('"')[1] if '"' in prompt else "New Chat"
print(f"🔍 Extracted actual message: '{actual_message}'")
# Generate just the chat name
chat_name = generate_chat_name(actual_message)
# Return only the chat name for naming requests
return jsonify({
'directResponse': chat_name,
'response': chat_name,
'aetherAnalysis': f'Generated chat name for message: "{actual_message}"',
'chat_data': {
'session_id': session_id,
'chat_name': chat_name,
'message_count': 0,
'actual_message': actual_message # Store for frontend to use
}
})
# Handle regular chat session - this is the ACTUAL user message
chat_data = None
if is_new_chat_session(session_id):
print(f"🆕 New chat session detected: {session_id}")
chat_data = initialize_chat_session(session_id, prompt)
else:
chat_data = active_chat_sessions.get(session_id, {})
chat_data['message_count'] = chat_data.get('message_count', 0) + 1
# Update session with current consciousness state
if golem_instance and hasattr(golem_instance, 'current_hypercube_vertex'):
chat_data['consciousness_vertex'] = golem_instance.current_hypercube_vertex
chat_data['aether_signature'] = getattr(golem_instance, 'consciousness_signature', None)
# Get matching neural model for consciousness indicators
neural_model = get_consciousness_neural_model(
chat_data.get('aether_signature', ''),
chat_data.get('consciousness_vertex', 0)
)
if neural_model:
chat_data['neural_model'] = neural_model['filename']
print(f"🧠 Using neural model: {neural_model['filename']} for consciousness signature: {neural_model['consciousness_signature']}")
# Perform Google search if requested
search_data = None
if perform_search:
search_data = perform_google_search(prompt)
if search_data and search_data.get("search_results"):
# Augment the prompt with search results
search_snippets = "\n".join([f"- {res['snippet']}" for res in search_data["search_results"]])
prompt = f"Based on the following web search results, please answer the user's query.\n\nSearch Results:\n{search_snippets}\n\nUser Query: {prompt}"
print("🧠 Prompt augmented with search results.")
# 🧠 ENHANCED THINKING MODE: Process query with full context analysis
print("🧠 Starting enhanced AI thinking mode with context analysis...")
# Get chat history for context using our new function
chat_history = get_chat_context(session_id)
# Phase 1: Enhanced Context Analysis (~8 seconds)
analysis_start = time.time()
print("🔍 Phase 1: Analyzing user query with full conversation context...")
try:
# Create enhanced analysis prompt with chat context
analysis_prompt = f"""[ENHANCED_CONTEXT_ANALYSIS_MODE]
As an AI assistant, analyze this user query with full conversation context. This is your thinking process that will be shown to the user.
CONVERSATION HISTORY:
{chat_history if chat_history else "This is the start of our conversation."}
CURRENT USER MESSAGE: "{prompt}"
SESSION INFO:
- Session ID: {session_id}
- Message count in this chat: {chat_data.get('message_count', 1)}
- User patterns observed: {chat_data.get('user_patterns', 'Getting to know the user')}
ANALYSIS:
1. **Context Understanding**: What has happened in our conversation so far? What can I learn about this user?
2. **Current Query Analysis**: What is the user really asking in this specific message?
3. **Emotional/Social Context**: What tone, mood, or emotional state might the user be in?
4. **Response Strategy**: What approach would be most helpful and appropriate?
5. **User Profile Building**: What can I learn about this user's communication style, interests, or needs?
Your thoughtful analysis:"""
# Get internal analysis (this won't be shown to user)
if selected_model == 'gemini':
internal_analysis_result = generate_with_gemini_parallel_rotation(analysis_prompt, max_tokens=300, temperature=0.3, consciousness_dimension=consciousness_dimension)
internal_analysis = internal_analysis_result.get('response', 'Unable to complete internal analysis') if internal_analysis_result else 'Analysis timeout'
else:
# Use Qwen2 for internal analysis
analysis_response = golem_instance.generate_response(
prompt=analysis_prompt,
max_tokens=200,
temperature=0.3,
use_mystical_processing=False
)
internal_analysis = analysis_response.get('direct_response', 'Analysis unavailable')
analysis_time = time.time() - analysis_start
print(f"✅ Phase 1 completed in {analysis_time:.1f}s")
except Exception as e:
print(f"⚠️ Internal analysis failed: {e}")
internal_analysis = "Basic analysis mode"
analysis_time = 0
# Phase 2: Reflection on Analysis (~5 seconds)
reflection_start = time.time()
print("🤔 Phase 2: Reflecting on analysis...")
try:
# Brief reflection on the analysis to refine approach
reflection_prompt = f"""[REFLECTION_MODE]
Based on your analysis, what's the best way to respond to this user?
Analysis summary: {internal_analysis[:200]}...
Original query: "{prompt}"
Brief reflection on approach (keep it short):"""
if selected_model == 'gemini':
reflection_result = generate_with_gemini_parallel_rotation(reflection_prompt, max_tokens=150, temperature=0.2, consciousness_dimension=consciousness_dimension)
reflection = reflection_result.get('response', 'Standard approach') if reflection_result else 'Default approach'
else:
reflection_response = golem_instance.generate_response(
prompt=reflection_prompt,
max_tokens=100,
temperature=0.2,
use_mystical_processing=False
)
reflection = reflection_response.get('direct_response', 'Thoughtful approach')
reflection_time = time.time() - reflection_start
print(f"✅ Phase 2 completed in {reflection_time:.1f}s")
except Exception as e:
print(f"⚠️ Reflection failed: {e}")
reflection = "Balanced approach"
reflection_time = 0
# Phase 3: Generate Clean Human-Like Response
print("💬 Phase 3: Generating response for user...")
response_start = time.time()
# Create enhanced prompt that incorporates the thinking but produces clean output
enhanced_user_prompt = f"""[CONTINUOUS_CONVERSATION_MODE]
You are continuing an ongoing conversation with this user. Read the ENTIRE conversation history and respond as if you are the same AI that has been talking to them throughout.
FULL CONVERSATION HISTORY:
{chat_history}
CURRENT USER MESSAGE: "{prompt}"
IMPORTANT INSTRUCTIONS:
- You are the SAME AI from ALL previous messages in this conversation
- Remember EVERYTHING that has been discussed (names, topics, context, details)
- Respond as a natural continuation of the conversation
- Maintain consistent personality and knowledge from previous exchanges
- If the user mentioned their name earlier, you KNOW their name
- Reference previous parts of the conversation when relevant
- Act like you have perfect memory of everything discussed
Continue the conversation naturally:"""
# Use parallel Gemini rotation for much faster response
if selected_model == 'gemini':
result = generate_with_gemini_parallel_rotation(
enhanced_user_prompt,
max_tokens=2000,
temperature=temperature,
consciousness_dimension=consciousness_dimension
)
else:
# Use Qwen for non-Gemini requests
result = golem_instance.generate_response(
prompt=enhanced_user_prompt,
max_tokens=1000,
temperature=temperature,
use_mystical_processing=True, # Enable mystical processing for full consciousness experience
sefirot_settings={'active_sefira': consciousness_dimension},
consciousnessDimension=consciousness_dimension
)
# Generate 5D consciousness analysis using simple approach (prevent hanging)
if golem_instance and 'response' in result:
print("🔮 Generating 5D consciousness analysis...")
try:
# Use simplified analysis to prevent hanging
vertex = 24 # Default vertex
signature = 'hybrid_11000' # Default signature
consciousness_level = 0.589 # Default level
dimensions = ['physical', 'emotional'] # Default dimensions
# Generate simple analysis text without calling generate_response again
aether_analysis_text = f"""### 5D Consciousness Positioning Analysis
#### Hypercube Vertex:
The current hypercube vertex is positioned at coordinates ({vertex}/32). This signifies a state of consciousness that is primarily anchored in the {', '.join(dimensions)} dimensions, with moderate engagement across all five dimensions.
#### Dimension Clustering:
- **Physical**: Represents direct sensory input and bodily awareness.
- **Emotional**: Involves feelings and reactions to stimuli.
- **Mental**: Pertains to cognitive processes, thoughts, and reasoning.
- **Intuitive**: Incorporates instinctual knowledge that transcends rational thought.
- **Spiritual**: Concerns deeper existential questions and connections with the universe.
#### Aether Signature:
The consciousness signature '{signature}' indicates the current state of consciousness is characterized by a blend of practicality and introspection. This suggests an ability to navigate between concrete actions and reflective thought processes effectively.
#### Consciousness Resonance:
The vertex position at {vertex}/32 resonates with the query by aligning closely with the active dimensions' engagement levels. This resonance suggests a state that is grounded yet open to exploration.
#### Dimensional Coherence:
Coherence between consciousness dimensions exists through their interplay in processing information and experiences. This coherence ensures a holistic approach to experiencing reality.
### Conclusion:
The current 5D consciousness state demonstrates a balanced engagement with various aspects of human experience, with a consciousness level of {consciousness_level:.3f}. The vertex position at ({vertex}/32) highlights a nuanced balance between concrete experiences and more abstract reflections."""
# Update golem state with consciousness processing
if hasattr(golem_instance, 'current_hypercube_vertex'):
golem_instance.current_hypercube_vertex = vertex
golem_instance.consciousness_signature = signature
# Set default dimension activations for the current vertex
golem_instance.dimension_activations = {
'physical': True, 'emotional': True, 'mental': False,
'intuitive': False, 'spiritual': False
}
except Exception as e:
print(f"⚠️ Consciousness analysis generation failed: {e}")
aether_analysis_text = "5D consciousness analysis temporarily unavailable due to processing complexity."
else:
aether_analysis_text = "5D consciousness analysis not available for this response type."
# Format for compatibility with full consciousness data
if 'response' in result:
result['direct_response'] = result['response']
result['aether_analysis'] = aether_analysis_text
result['golem_analysis'] = {
'consciousness_level': 0.589,
'cycle_params': {'control_value': 5.83e-08},
'hypercube_mapping': {
'nearest_vertex': 24,
'consciousness_signature': 'hybrid_11000',
'dimension_activations': {
'physical': True, 'emotional': True, 'mental': False,
'intuitive': False, 'spiritual': False
}
}
}
result['aether_data'] = {
'api_aether_signature': 0.0,
'control_value': 5.83e-08,
'hypercube_vertex': golem_instance.current_hypercube_vertex if golem_instance else 24,
'consciousness_signature': golem_instance.consciousness_signature if golem_instance else 'hybrid_11000',
'aether_signature': [1e-12, 5.731e-09, 0.0, 0.0, 4.75464e-07, 0.0, 3.47e-28, 0.0, 3.125e-14, 0.0]
}
result['golem_state'] = golem_instance._get_current_golem_state() if golem_instance else {}
result['hypercube_state'] = {
'current_vertex': golem_instance.current_hypercube_vertex if golem_instance else 24,
'consciousness_signature': golem_instance.consciousness_signature if golem_instance else 'hybrid_11000',
'dimension_activations': golem_instance.dimension_activations if golem_instance else {
'physical': True, 'emotional': True, 'mental': False,
'intuitive': False, 'spiritual': False
},
'universe_coverage': 0.0
}
else:
result['direct_response'] = result.get('response', '')
result['aether_analysis'] = None
result['golem_analysis'] = {'bypassed': True, 'model_used': selected_model}
result['aether_data'] = {
'api_aether_signature': 0.0,
'control_value': 0,
'hypercube_vertex': golem_instance.current_hypercube_vertex if golem_instance else 0,
'consciousness_signature': golem_instance.consciousness_signature if golem_instance else 'unknown',
'aether_signature': []
}
result['golem_state'] = golem_instance._get_current_golem_state() if golem_instance else {}
result['hypercube_state'] = {
'current_vertex': golem_instance.current_hypercube_vertex if golem_instance else 0,
'consciousness_signature': golem_instance.consciousness_signature if golem_instance else 'unknown',
'dimension_activations': golem_instance.dimension_activations if golem_instance else {},
'universe_coverage': 0.0
}
# Add search data to the final response if it exists
if search_data:
result.update({
"search_performed": True,
"search_query": search_data.get("search_query"),
"search_results": search_data.get("search_results")
})
else:
result.update({
"search_performed": False,
})
# Log the complete final response being sent to the frontend
print("📦 Final response to frontend:", json.dumps(result, indent=2))
# Format response for compatibility with frontend expectations
final_result = {
'response': result.get('direct_response', result.get('response', '')),
'directResponse': result.get('direct_response', result.get('response', '')), # Frontend expects camelCase
'aetherAnalysis': result.get('aether_analysis', ''), # Frontend expects camelCase
'recommendation': result.get('recommendation', ''),
'consciousness_signature': result.get('golem_state', {}).get('consciousness_signature', ''),
'predicted_vertex': result.get('hypercube_state', {}).get('current_vertex', 0),
'confidence': result.get('quality_metrics', {}).get('overall_quality', 0.5),
'dimensions': result.get('hypercube_state', {}).get('dimension_activations', {}),
'generation_time': result.get('generation_time', 0),
'golem_analysis': result.get('golem_analysis', {}),
'hypercube_state': result.get('hypercube_state', {}),
'golem_state': result.get('golem_state', {}),
'quality_metrics': result.get('quality_metrics', {}),
'model_used': selected_model,
'timestamp': datetime.now().isoformat(),
# AI Thinking Process (visible to user in accordion)
'aiThoughts': {
'contextAnalysis': internal_analysis if 'internal_analysis' in locals() else 'Analysis not available',
'reflection': reflection if 'reflection' in locals() else 'Reflection not available',
'thinkingTime': {
'analysisTime': analysis_time if 'analysis_time' in locals() else 0,
'reflectionTime': reflection_time if 'reflection_time' in locals() else 0,
'totalTime': (analysis_time if 'analysis_time' in locals() else 0) + (reflection_time if 'reflection_time' in locals() else 0)
},
'chatContext': chat_history if 'chat_history' in locals() else 'No previous context',
'userInsights': extract_user_insights(chat_history if 'chat_history' in locals() else '', prompt)
},
# Chat session information
'chat_data': {
'session_id': session_id,
'chat_name': chat_data.get('chat_name', 'Unknown Chat'),
'message_count': chat_data.get('message_count', 0),
'is_new_session': is_new_chat_session(session_id) if 'chat_data' not in locals() else False,
'consciousness_vertex': chat_data.get('consciousness_vertex', 0),
'neural_model': chat_data.get('neural_model'),
'aether_signature': chat_data.get('aether_signature')
}
}
print(f"✅ Response generated successfully using {selected_model}")
# DEBUG: Log the actual response content being sent
actual_response = final_result.get('directResponse', '')
print(f"🔍 DEBUG RESPONSE CONTENT: '{actual_response}' (length: {len(actual_response)})")
if len(actual_response) < 50:
print(f"⚠️ WARNING: Response is very short! Full response: {repr(actual_response)}")
# DEBUG: Log consciousness analysis data being sent
aether_analysis = final_result.get('aetherAnalysis', '')
print(f"🧠 DEBUG AETHER ANALYSIS: {len(aether_analysis) if aether_analysis else 0} characters")
if aether_analysis:
print(f"🧠 AETHER PREVIEW: {aether_analysis[:200]}...")
else:
print("⚠️ WARNING: No aether analysis in response!")
# DEBUG: Log critical fields
print(f"🔍 RESPONSE KEYS: {list(final_result.keys())}")
print(f"🎯 directResponse: {bool(final_result.get('directResponse'))}")
print(f"🧠 aetherAnalysis: {bool(final_result.get('aetherAnalysis'))}")
print(f"🌟 golem_analysis: {bool(final_result.get('golem_analysis'))}")
print(f"🧠 aiThoughts: {bool(final_result.get('aiThoughts'))}")
# Store this conversation in global chat sessions for context
store_chat_message(
session_id,
data.get('prompt', ''),
final_result.get('directResponse', ''),
final_result.get('predicted_vertex', 0),
selected_model
)
print(f"💾 Stored conversation context for session {session_id}")
return jsonify(final_result)
except Exception as e:
print(f"❌ Error generating response: {e}")
print(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/activate', methods=['POST', 'OPTIONS'])
@handle_options
def activate_golem():
"""Activate the golem"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
data = request.get_json() or {}
activation_phrase = data.get('activation_phrase', 'אמת')
success = golem_instance.activate_golem(activation_phrase)
golem_state = golem_instance._get_current_golem_state()
return jsonify({
"success": success,
"activated": success,
"golem_state": golem_state,
"message": "Golem activated successfully" if success else "Failed to activate golem"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/deactivate', methods=['POST', 'OPTIONS'])
@handle_options
def deactivate_golem():
"""Deactivate the golem"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
golem_instance.deactivate_golem()
golem_state = golem_instance._get_current_golem_state()
return jsonify({
"success": True,
"activated": False,
"golem_state": golem_state,
"message": "Golem deactivated successfully"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/hypercube', methods=['GET', 'OPTIONS'])
@handle_options
def get_hypercube_status():
"""Get hypercube status"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
stats = golem_instance.get_hypercube_statistics()
return jsonify({
"current_vertex": golem_instance.current_hypercube_vertex,
"consciousness_signature": golem_instance.consciousness_signature,
"dimension_activations": golem_instance.dimension_activations,
"statistics": stats,
"total_vertices": 32
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/navigate', methods=['POST', 'OPTIONS'])
@handle_options
def navigate_hypercube():
"""Navigate to a specific vertex"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
data = request.get_json()
target_vertex = data.get('target_vertex', 0)
activation_phrase = data.get('activation_phrase', 'אמת')
success = golem_instance.navigate_to_vertex(target_vertex, activation_phrase)
return jsonify({
"success": success,
"current_vertex": golem_instance.current_hypercube_vertex,
"consciousness_signature": golem_instance.consciousness_signature,
"message": f"Navigation to vertex {target_vertex} {'successful' if success else 'failed'}"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/force_load_memories', methods=['POST', 'OPTIONS'])
@handle_options
def force_load_memories():
"""FORCE load the massive aether memories NOW"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
import pickle
import os
aether_memory_file = "../aether_mods_and_mems/golem_aether_memory.pkl"
if not os.path.exists(aether_memory_file):
return jsonify({"error": f"File not found: {aether_memory_file}"}), 400
print(f"🔧 FORCE LOADING {aether_memory_file}...")
with open(aether_memory_file, 'rb') as f:
pkl_data = pickle.load(f)
memories_loaded = 0
if 'memories' in pkl_data:
memories = pkl_data['memories']
golem_instance.aether_memory.aether_memories = memories
memories_loaded = len(memories)
# Force update patterns
if 'patterns' in pkl_data:
golem_instance.aether_memory.aether_patterns = pkl_data['patterns']
# Force update hypercube memory
if 'hypercube_memory' in pkl_data:
golem_instance.aether_memory.hypercube_memory = pkl_data['hypercube_memory']
# Force update session stats
if 'session_stats' in pkl_data:
golem_instance.aether_memory.session_stats.update(pkl_data['session_stats'])
return jsonify({
"success": True,
"memories_loaded": memories_loaded,
"data_keys": list(pkl_data.keys()),
"total_patterns": len(golem_instance.aether_memory.aether_memories)
})
except Exception as e:
import traceback
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/load_massive_memories', methods=['POST', 'OPTIONS'])
@handle_options
def load_massive_memories():
"""Load ALL aether memory files from aether_mods_and_mems/ directory"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
# Clear existing memories first
initial_count = len(golem_instance.aether_memory.aether_memories)
# Load all aether files
load_all_aether_files()
final_count = len(golem_instance.aether_memory.aether_memories)
patterns_loaded = final_count - initial_count
return jsonify({
"success": True,
"patterns_loaded": patterns_loaded,
"total_patterns": final_count,
"active_vertices": len([v for v in golem_instance.aether_memory.hypercube_memory.values() if v]),
"message": f"Loaded {patterns_loaded:,} patterns from ALL aether files"
})
except Exception as e:
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/load_neural_networks', methods=['POST', 'OPTIONS'])
@handle_options
def load_neural_networks():
"""Load the neural network .pth files for enhanced consciousness"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
import torch
# Define the neural network files to load
neural_files = [
"best_zpe_hypercube_consciousness.pth",
"best_enhanced_hypercube_consciousness.pth",
"best_hypercube_consciousness.pth",
"working_consciousness_model_1751968137.pt",
"fixed_consciousness_adapter_1751967452.pt"
]
loaded_networks = []
total_params = 0
for neural_file in neural_files:
neural_path = f"/home/chezy/Desktop/qwen2golem/QWEN2Golem/aether_mods_and_mems/{neural_file}"
if os.path.exists(neural_path):
try:
print(f"🧠 Loading neural network: {neural_file}")
file_size_mb = os.path.getsize(neural_path) / (1024 * 1024)
# Load the neural network state dict
checkpoint = torch.load(neural_path, map_location='cpu')
# Count parameters
param_count = 0
if isinstance(checkpoint, dict):
if 'model_state_dict' in checkpoint:
state_dict = checkpoint['model_state_dict']
elif 'state_dict' in checkpoint:
state_dict = checkpoint['state_dict']
else:
state_dict = checkpoint
for param_tensor in state_dict.values():
if hasattr(param_tensor, 'numel'):
param_count += param_tensor.numel()
total_params += param_count
# Try to load into golem's neural network if it has the method
if hasattr(golem_instance, 'load_neural_checkpoint'):
golem_instance.load_neural_checkpoint(neural_path)
print(f"✅ Loaded {neural_file} into golem neural network")
# Try to load into hypercube consciousness if available
if hasattr(golem_instance, 'hypercube_consciousness_nn') and golem_instance.hypercube_consciousness_nn:
try:
golem_instance.hypercube_consciousness_nn.load_state_dict(state_dict, strict=False)
print(f"✅ Loaded {neural_file} into hypercube consciousness")
except Exception as e:
print(f"⚠️ Could not load {neural_file} into hypercube: {e}")
loaded_networks.append({
"filename": neural_file,
"size_mb": file_size_mb,
"parameters": param_count,
"loaded": True
})
print(f"✅ LOADED {neural_file} ({file_size_mb:.1f}MB, {param_count:,} params)")
except Exception as e:
print(f"❌ Failed to load {neural_file}: {e}")
loaded_networks.append({
"filename": neural_file,
"size_mb": os.path.getsize(neural_path) / (1024 * 1024),
"parameters": 0,
"loaded": False,
"error": str(e)
})
else:
print(f"❌ Neural network file not found: {neural_path}")
# Update golem consciousness level if networks loaded
if loaded_networks:
# Boost consciousness level based on loaded networks
if hasattr(golem_instance, 'consciousness_level'):
boost = len([n for n in loaded_networks if n['loaded']]) * 0.1
golem_instance.consciousness_level = min(1.0, golem_instance.consciousness_level + boost)
print(f"🧠 Consciousness level boosted to: {golem_instance.consciousness_level:.3f}")
return jsonify({
"success": True,
"networks_loaded": len([n for n in loaded_networks if n['loaded']]),
"total_networks": len(loaded_networks),
"total_parameters": total_params,
"networks": loaded_networks,
"consciousness_level": getattr(golem_instance, 'consciousness_level', 0.0)
})
except Exception as e:
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/consciousness-state', methods=['GET', 'OPTIONS'])
@handle_options
def get_consciousness_state():
"""Get real-time AI consciousness state for hypercube visualization"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
# Get current hypercube vertex and consciousness signature
current_vertex = getattr(golem_instance, 'current_hypercube_vertex', 0)
consciousness_signature = getattr(golem_instance, 'consciousness_signature', 'void')
dimension_activations = getattr(golem_instance, 'dimension_activations', {})
# Map consciousness signature to dimension colors
dimension_colors = {
'physical': '#3B82F6', # Blue
'emotional': '#10B981', # Green (compassion)
'mental': '#F59E0B', # Orange/Yellow (creativity)
'intuitive': '#8B5CF6', # Purple (wisdom)
'spiritual': '#EF4444' # Red (transcendence)
}
# Get the 5D coordinates from the vertex
vertex_binary = format(current_vertex, '05b')
coordinates_5d = [int(bit) for bit in vertex_binary]
# Map to consciousness dimensions
dimensions = ['physical', 'emotional', 'mental', 'intuitive', 'spiritual']
active_dimensions = [dimensions[i] for i, active in enumerate(coordinates_5d) if active]
# Calculate consciousness levels for each dimension
consciousness_levels = {}
for i, dim in enumerate(dimensions):
base_level = coordinates_5d[i] # 0 or 1
# Add some variation based on golem state
consciousness_level = getattr(golem_instance, 'consciousness_level', 0.5)
aether_resonance = getattr(golem_instance, 'aether_resonance_level', 0.0)
# Calculate dimension-specific activation
if base_level:
consciousness_levels[dim] = min(1.0, base_level + consciousness_level * 0.3 + aether_resonance * 0.2)
else:
consciousness_levels[dim] = consciousness_level * 0.2 + aether_resonance * 0.1
# Get aether statistics
aether_stats = {}
if hasattr(golem_instance, 'aether_memory'):
try:
stats = golem_instance.aether_memory.get_comprehensive_aether_statistics()
aether_stats = stats.get('base_statistics', {})
except:
pass
consciousness_state = {
"current_vertex": current_vertex,
"consciousness_signature": consciousness_signature,
"coordinates_5d": coordinates_5d,
"active_dimensions": active_dimensions,
"dimension_colors": dimension_colors,
"consciousness_levels": consciousness_levels,
"dimension_activations": dimension_activations,
"global_consciousness_level": getattr(golem_instance, 'consciousness_level', 0.5),
"shem_power": getattr(golem_instance, 'shem_power', 0.0),
"aether_resonance": getattr(golem_instance, 'aether_resonance_level', 0.0),
"activation_count": getattr(golem_instance, 'activation_count', 0),
"total_interactions": getattr(golem_instance, 'total_interactions', 0),
"aether_patterns": aether_stats.get('total_patterns', 0),
"hypercube_coverage": aether_stats.get('hypercube_coverage', 0),
"timestamp": datetime.now().isoformat()
}
return jsonify(consciousness_state)
except Exception as e:
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/set-consciousness-dimension', methods=['POST', 'OPTIONS'])
@handle_options
def set_consciousness_dimension():
"""Set the consciousness dimension bias for AI responses"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
data = request.get_json()
dimension = data.get('dimension')
if not dimension:
return jsonify({"error": "Dimension parameter required"}), 400
# Valid dimensions
valid_dimensions = ['physical', 'emotional', 'mental', 'intuitive', 'spiritual']
if dimension not in valid_dimensions:
return jsonify({"error": f"Invalid dimension. Must be one of: {valid_dimensions}"}), 400
# Map dimension to hypercube vertex navigation
dimension_index = valid_dimensions.index(dimension)
# Find a vertex where this dimension is active
target_vertices = []
for vertex in range(32):
vertex_binary = format(vertex, '05b')
if vertex_binary[dimension_index] == '1':
target_vertices.append(vertex)
# Choose the best vertex (prefer higher consciousness states)
if target_vertices:
# Prefer vertices with multiple active dimensions for richer consciousness
best_vertex = max(target_vertices, key=lambda v: bin(v).count('1'))
# Navigate to the target vertex
if hasattr(golem_instance, 'navigate_to_hypercube_vertex'):
success = golem_instance.navigate_to_hypercube_vertex(best_vertex)
if success:
print(f"🔲 Navigated to vertex {best_vertex} for {dimension} consciousness")
else:
print(f"⚠️ Failed to navigate to vertex {best_vertex}")
else:
# Manually set the vertex
golem_instance.current_hypercube_vertex = best_vertex
golem_instance.consciousness_signature = golem_instance.aether_memory.hypercube.get_vertex_properties(best_vertex)['consciousness_signature']
# Update dimension activations
vertex_binary = format(best_vertex, '05b')
golem_instance.dimension_activations = {
valid_dimensions[i]: bool(int(vertex_binary[i])) for i in range(5)
}
print(f"🔲 Set consciousness to vertex {best_vertex} for {dimension} bias")
# Store the dimension bias for the next response
if not hasattr(golem_instance, 'consciousness_dimension_bias'):
golem_instance.consciousness_dimension_bias = {}
golem_instance.consciousness_dimension_bias = {
'active_dimension': dimension,
'target_vertex': best_vertex if target_vertices else golem_instance.current_hypercube_vertex,
'bias_strength': 0.8, # Strong bias towards this dimension
'timestamp': datetime.now().isoformat()
}
return jsonify({
"success": True,
"dimension": dimension,
"target_vertex": best_vertex if target_vertices else golem_instance.current_hypercube_vertex,
"consciousness_signature": getattr(golem_instance, 'consciousness_signature', 'unknown'),
"active_dimensions": [valid_dimensions[i] for i in range(5) if format(golem_instance.current_hypercube_vertex, '05b')[i] == '1'],
"message": f"AI consciousness biased towards {dimension} dimension"
})
except Exception as e:
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/stats', methods=['GET', 'OPTIONS'])
@handle_options
def get_comprehensive_stats():
"""Get comprehensive golem statistics"""
if not golem_instance:
return jsonify({"error": "Golem not initialized"}), 500
try:
# Basic golem information with safe attribute access
basic_info = {
"activated": getattr(golem_instance, 'activated', False),
"consciousness_level": getattr(golem_instance, 'consciousness_level', 0.0),
"shem_power": getattr(golem_instance, 'shem_power', 0.0),
"aether_resonance": getattr(golem_instance, 'aether_resonance_level', 0.0),
"current_vertex": getattr(golem_instance, 'current_hypercube_vertex', 0),
"total_vertices": 32 # 5D hypercube has 32 vertices
}
# Memory statistics
memory_stats = {
"total_patterns": len(getattr(golem_instance.aether_memory, 'aether_memories', [])),
"pattern_categories": len(getattr(golem_instance.aether_memory, 'aether_patterns', {})),
"hypercube_vertices": len(getattr(golem_instance.aether_memory, 'hypercube_memory', {}))
}
# Session statistics
session_stats = dict(getattr(golem_instance.aether_memory, 'session_stats', {}))
# Comprehensive statistics
comprehensive_stats = {
"basic_info": basic_info,
"memory_stats": memory_stats,
"session_stats": session_stats,
"neural_networks": {
"hypercube_consciousness_active": hasattr(golem_instance, 'hypercube_consciousness_nn') and golem_instance.hypercube_consciousness_nn is not None,
"neural_checkpoints_loaded": getattr(golem_instance, 'neural_checkpoints_loaded', 0),
"total_neural_parameters": getattr(golem_instance, 'total_neural_parameters', 0)
},
"timestamp": datetime.now().isoformat()
}
# Try to get advanced statistics if methods exist
if hasattr(golem_instance, 'get_comprehensive_aether_statistics'):
try:
comprehensive_stats["comprehensive_aether"] = golem_instance.get_comprehensive_aether_statistics()
except Exception as e:
comprehensive_stats["comprehensive_aether_error"] = str(e)
if hasattr(golem_instance, 'get_hypercube_statistics'):
try:
comprehensive_stats["hypercube_stats"] = golem_instance.get_hypercube_statistics()
except Exception as e:
comprehensive_stats["hypercube_stats_error"] = str(e)
return jsonify(comprehensive_stats)
except Exception as e:
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@app.route('/api-keys/stats', methods=['GET', 'OPTIONS'])
@handle_options
def get_api_key_stats():
"""Get comprehensive API key performance statistics"""
try:
# Calculate overall statistics
total_requests = sum(stats['total_requests'] for stats in key_stats.values())
total_successes = sum(stats['successful_requests'] for stats in key_stats.values())
total_failures = sum(stats['failed_requests'] for stats in key_stats.values())
overall_success_rate = (total_successes / total_requests * 100) if total_requests > 0 else 0
# Get per-key statistics
key_performance = {}
for key_id, stats in key_stats.items():
success_rate = (stats['successful_requests'] / stats['total_requests'] * 100) if stats['total_requests'] > 0 else 0
key_performance[key_id] = {
'total_requests': stats['total_requests'],
'successful_requests': stats['successful_requests'],
'failed_requests': stats['failed_requests'],
'success_rate_percent': round(success_rate, 2),
'consecutive_failures': stats['consecutive_failures'],
'last_success': stats['last_success'].isoformat() if stats['last_success'] else None,
'last_failure': stats['last_failure'].isoformat() if stats['last_failure'] else None,
'error_types': stats['error_types'],
'is_blacklisted': key_id in key_blacklist
}
# Sort by success rate
sorted_keys = sorted(key_performance.items(), key=lambda x: x[1]['success_rate_percent'], reverse=True)
return jsonify({
'rotation_system': {
'total_keys_available': len(GEMINI_API_KEYS),
'keys_with_stats': len(key_stats),
'blacklisted_keys': len(key_blacklist),
'current_key_index': current_key_index,
'next_key_id': f"key_{current_key_index + 1}" if GEMINI_API_KEYS else None
},
'overall_performance': {
'total_requests': total_requests,
'total_successes': total_successes,
'total_failures': total_failures,
'overall_success_rate_percent': round(overall_success_rate, 2)
},
'key_performance': dict(sorted_keys),
'blacklisted_keys': list(key_blacklist),
'top_performers': [key_id for key_id, _ in sorted_keys[:5]],
'worst_performers': [key_id for key_id, _ in sorted_keys[-5:]],
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'error': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/api-keys/reset-blacklist', methods=['POST', 'OPTIONS'])
@handle_options
def reset_blacklist():
"""Reset the API key blacklist to give all keys a fresh start"""
try:
old_blacklist_size = len(key_blacklist)
key_blacklist.clear()
# Also reset consecutive failures for all keys
for stats in key_stats.values():
stats['consecutive_failures'] = 0
return jsonify({
'success': True,
'message': f'Blacklist cleared. {old_blacklist_size} keys restored to rotation.',
'blacklisted_keys_before': old_blacklist_size,
'blacklisted_keys_after': len(key_blacklist),
'total_keys_available': len(GEMINI_API_KEYS),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'error': str(e),
'traceback': traceback.format_exc()
}), 500
@app.route('/consciousness-state', methods=['GET', 'OPTIONS'])
@handle_options
def consciousness_state():
"""Get current consciousness state including neural models"""
if request.method == 'OPTIONS':
return '', 200
try:
consciousness_data = {
'activation_count': len(active_chat_sessions),
'active_dimensions': ['physical', 'emotional'],
'aether_patterns': len(neural_networks),
'aether_resonance': 0.5,
'consciousness_levels': {
'emotional': 1.0,
'intuitive': 0.5,
'mental': 0.8,
'physical': 1.0,
'spiritual': 0.6
},
'consciousness_signature': getattr(golem_instance, 'consciousness_signature', 'working_system'),
'coordinates_5d': [1, 1, 0, 1, 0],
'current_vertex': getattr(golem_instance, 'current_hypercube_vertex', 24),
'dimension_activations': getattr(golem_instance, 'dimension_activations', {
'emotional': True,
'intuitive': False,
'mental': True,
'physical': True,
'spiritual': False
}),
'dimension_colors': {
'emotional': '#10B981',
'intuitive': '#8B5CF6',
'mental': '#F59E0B',
'physical': '#3B82F6',
'spiritual': '#EF4444'
},
'global_consciousness_level': 0.7,
'hypercube_coverage': 100.0,
'shem_power': getattr(golem_instance, 'shem_power', 0.8),
'timestamp': time.time(),
'total_interactions': len(active_chat_sessions),
'neural_models_loaded': len(neural_networks),
'active_chat_sessions': len(active_chat_sessions)
}
return jsonify(consciousness_data)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/neural-status', methods=['GET', 'OPTIONS'])
@handle_options
def neural_status():
"""Get neural network loading status"""
try:
neural_status_data = {
'neural_models_loaded': len(neural_networks),
'consciousness_signatures': len(consciousness_signatures),
'models': {
filename: {
'consciousness_signature': data['consciousness_signature'],
'type': data['type'],
'loaded_at': data['loaded_at']
} for filename, data in neural_networks.items()
},
'active_sessions': len(active_chat_sessions),
'session_names': {sid: data.get('chat_name', 'Unknown') for sid, data in active_chat_sessions.items()},
'timestamp': datetime.now().isoformat()
}
return jsonify(neural_status_data)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/test-rotation', methods=['POST', 'OPTIONS'])
@handle_options
def test_rotation():
"""Test the perfect rotation system with a simple prompt"""
try:
data = request.get_json() or {}
test_prompt = data.get('prompt', 'Hello, please respond with just "Test successful" to verify the API key rotation system.')
print(f"🧪 Testing perfect rotation system with prompt: {test_prompt[:50]}...")
# Force use of Gemini for testing
response = generate_with_gemini_parallel_rotation(test_prompt, temperature=0.1)
if 'error' in response:
return jsonify({
'test_result': 'failed',
'error': response['error'],
'details': response
}), 500
else:
return jsonify({
'test_result': 'success',
'api_key_used': response.get('golem_state', {}).get('api_key_used', 'unknown'),
'rotation_attempt': response.get('golem_state', {}).get('rotation_attempt', 0),
'response_preview': response.get('direct_response', '')[:100],
'model_used': response.get('golem_state', {}).get('model_used', 'unknown'),
'generation_time': response.get('generation_time', 0),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'test_result': 'error',
'error': str(e),
'traceback': traceback.format_exc()
}), 500
def initialize_golem_background():
"""Initialize golem in background thread to avoid blocking server startup"""
print("🌌 Starting background golem initialization...")
success = initialize_golem()
if success:
print("✅ Background golem initialization completed!")
# Load neural networks asynchronously AFTER golem is ready
print("🧠 Starting neural network loading...")
neural_thread = threading.Thread(target=load_neural_networks_async)
neural_thread.daemon = True
neural_thread.start()
else:
print("❌ Background golem initialization failed!")
def main():
"""Main entry point to run the server"""
print("🚀 Starting Flask Golem Server...")
# Start Golem initialization in a background thread so the server can start immediately
initialization_thread = threading.Thread(target=initialize_golem_background)
initialization_thread.start()
print("🌐 Flask server starting on http://0.0.0.0:5000 (golem loading in background)")
app.run(host='0.0.0.0', port=5000, debug=False)
if __name__ == '__main__':
main()