dhrumii's picture
Rename filehegi.py to app.py
5c63b86 verified
"""
Smart Waste Segregation System - Hardware-Realistic Simulator with Real Model
Simulates complete edge deployment with realistic sensor protocols, motor control, and hardware timing
Dependencies: pip install streamlit numpy pillow opencv-python-headless sqlite3 torch torchvision
"""
import streamlit as st
import sqlite3
import json
import time
import random
import numpy as np
from PIL import Image
import io
import base64
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Tuple
import threading
import queue
from enum import Enum
import os
# PyTorch imports
import torch
import torch.nn as nn
from torchvision import transforms, models
# ==================== HARDWARE CONSTANTS ====================
class HardwareConfig:
"""Realistic hardware specifications"""
# Ultrasonic Sensor (HC-SR04)
ULTRASONIC_MIN_DISTANCE = 2.0 # cm
ULTRASONIC_MAX_DISTANCE = 400.0 # cm
ULTRASONIC_ACCURACY = 0.3 # cm
ULTRASONIC_READ_TIME = 0.015 # seconds (15ms)
# Capacitive Moisture Sensor
MOISTURE_MIN_VOLTAGE = 0.0 # V (dry)
MOISTURE_MAX_VOLTAGE = 3.3 # V (wet)
MOISTURE_ADC_RESOLUTION = 12 # bits (4096 levels)
MOISTURE_READ_TIME = 0.010 # seconds (10ms)
# Load Cell (HX711)
WEIGHT_MAX_CAPACITY = 5000.0 # grams (5kg)
WEIGHT_RESOLUTION = 0.1 # grams
WEIGHT_STABILIZATION_TIME = 0.5 # seconds
WEIGHT_READ_TIME = 0.100 # seconds (100ms)
# Servo Motor (MG996R)
SERVO_MIN_ANGLE = 0
SERVO_MAX_ANGLE = 180
SERVO_SPEED = 60 # degrees per 0.17 seconds (at 4.8V)
SERVO_CURRENT_IDLE = 10 # mA
SERVO_CURRENT_MOVING = 500 # mA
SERVO_TORQUE = 11 # kg-cm
# Camera (Raspberry Pi Camera Module v2)
CAMERA_RESOLUTION = (1920, 1080)
CAMERA_CAPTURE_TIME = 0.150 # seconds
CAMERA_WARMUP_TIME = 2.0 # seconds
# Edge Device (Raspberry Pi 4)
CPU_CORES = 4
RAM_MB = 4096
INFERENCE_OVERHEAD = 0.050 # seconds
# ==================== DATABASE SETUP ====================
def init_database():
"""Initialize SQLite database with required tables"""
conn = sqlite3.connect('waste_segregation.db', check_same_thread=False)
cursor = conn.cursor()
# Check if we need to migrate old database
try:
cursor.execute("SELECT total_pipeline_time_ms FROM events LIMIT 1")
except sqlite3.OperationalError:
# Column doesn't exist, need to add it
try:
cursor.execute("ALTER TABLE events ADD COLUMN total_pipeline_time_ms REAL")
conn.commit()
st.info("✅ Database schema updated successfully")
except sqlite3.OperationalError:
# Table doesn't exist at all, will be created below
pass
# Events table
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
image_name TEXT,
waste_class TEXT NOT NULL,
confidence REAL NOT NULL,
distance_cm REAL,
moisture_raw INTEGER,
moisture_percent REAL,
weight_g REAL,
servo_angle INTEGER,
inference_time_ms REAL,
total_pipeline_time_ms REAL,
status TEXT,
power_consumption_mw REAL
)
''')
# Telemetry table
cursor.execute('''
CREATE TABLE IF NOT EXISTS telemetry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
component TEXT NOT NULL,
data TEXT NOT NULL
)
''')
# Hardware diagnostics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS hardware_diagnostics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
component TEXT NOT NULL,
status TEXT NOT NULL,
voltage REAL,
current_ma REAL,
temperature_c REAL,
error_count INTEGER
)
''')
conn.commit()
return conn
# ==================== DATA MODELS ====================
@dataclass
class SensorReading:
"""Raw sensor reading with hardware-realistic properties"""
value: float
raw_value: int # ADC value
voltage: float
read_time_ms: float
noise_level: float
timestamp: str
@dataclass
class SensorData:
"""Complete sensor suite readings"""
distance_cm: float
distance_raw: SensorReading
moisture_percent: float
moisture_raw: SensorReading
weight_g: float
weight_raw: SensorReading
temperature: float
timestamp: str
total_read_time_ms: float
@dataclass
class InferenceResult:
"""AI model inference output with edge device metrics"""
waste_class: str
confidence: float
inference_time_ms: float
preprocessing_time_ms: float
postprocessing_time_ms: float
probabilities: Dict[str, float]
model_version: str
device_temperature: float
cpu_usage_percent: float
@dataclass
class MotorControl:
"""Servo motor control with realistic physics"""
target_angle: int
current_angle: int
start_angle: int
status: str
duration_ms: float
power_consumption_mw: float
torque_applied: float
movement_profile: List[Tuple[float, int]] # (time_ms, angle)
class SensorStatus(Enum):
"""Sensor operational status"""
READY = "ready"
READING = "reading"
STABILIZING = "stabilizing"
ERROR = "error"
CALIBRATING = "calibrating"
# ==================== REALISTIC SENSOR SIMULATION ====================
class UltrasonicSensor:
"""HC-SR04 Ultrasonic Sensor Simulation"""
def __init__(self):
self.status = SensorStatus.READY
self.error_count = 0
self.last_reading = None
self.calibration_offset = random.uniform(-0.2, 0.2)
def trigger_read(self, actual_distance: float) -> SensorReading:
"""Simulate ultrasonic pulse and echo timing"""
start_time = time.time()
self.status = SensorStatus.READING
# Simulate trigger pulse (10μs) and echo wait
time.sleep(HardwareConfig.ULTRASONIC_READ_TIME)
# Add realistic noise and environmental factors
noise = random.gauss(0, HardwareConfig.ULTRASONIC_ACCURACY)
temperature_factor = 1.0 + random.uniform(-0.02, 0.02)
# Simulate occasional read errors (1% chance)
if random.random() < 0.01:
self.error_count += 1
measured_distance = -1.0 # Error code
else:
measured_distance = (actual_distance + noise + self.calibration_offset) * temperature_factor
measured_distance = max(HardwareConfig.ULTRASONIC_MIN_DISTANCE,
min(HardwareConfig.ULTRASONIC_MAX_DISTANCE, measured_distance))
# Calculate echo time (distance = speed * time / 2)
echo_time_us = (measured_distance * 2) / 0.0343 if measured_distance > 0 else 0
read_time = (time.time() - start_time) * 1000
self.status = SensorStatus.READY
self.last_reading = measured_distance
return SensorReading(
value=round(measured_distance, 2),
raw_value=int(echo_time_us),
voltage=3.3,
read_time_ms=round(read_time, 3),
noise_level=abs(noise),
timestamp=datetime.now().isoformat()
)
class MoistureSensor:
"""Capacitive Soil Moisture Sensor Simulation"""
def __init__(self):
self.status = SensorStatus.READY
self.adc_resolution = 2 ** HardwareConfig.MOISTURE_ADC_RESOLUTION
self.calibration_dry = random.randint(3200, 3400)
self.calibration_wet = random.randint(1200, 1400)
def read_moisture(self, actual_moisture: float) -> SensorReading:
"""Read capacitive moisture sensor via ADC"""
start_time = time.time()
self.status = SensorStatus.READING
time.sleep(HardwareConfig.MOISTURE_READ_TIME)
# Convert moisture percentage to ADC value
adc_range = self.calibration_dry - self.calibration_wet
adc_value = self.calibration_dry - (actual_moisture * adc_range)
# Add ADC noise (±2 LSB typical)
adc_noise = random.randint(-2, 2)
adc_value = int(adc_value + adc_noise)
adc_value = max(0, min(self.adc_resolution - 1, adc_value))
# Convert ADC to voltage
voltage = (adc_value / self.adc_resolution) * HardwareConfig.MOISTURE_MAX_VOLTAGE
# Convert back to percentage
measured_moisture = (self.calibration_dry - adc_value) / adc_range
measured_moisture = max(0.0, min(1.0, measured_moisture))
read_time = (time.time() - start_time) * 1000
self.status = SensorStatus.READY
return SensorReading(
value=round(measured_moisture, 4),
raw_value=adc_value,
voltage=round(voltage, 3),
read_time_ms=round(read_time, 3),
noise_level=abs(adc_noise / adc_range) if adc_range != 0 else 0.0,
timestamp=datetime.now().isoformat()
)
class LoadCellSensor:
"""HX711 Load Cell Amplifier Simulation"""
def __init__(self):
self.status = SensorStatus.READY
self.tare_value = random.randint(-50000, 50000)
self.calibration_factor = random.uniform(420, 450)
self.last_stable_reading = 0.0
self.reading_buffer = []
def read_weight(self, actual_weight: float) -> SensorReading:
"""Read load cell with stabilization"""
start_time = time.time()
self.status = SensorStatus.STABILIZING
# Simulate mechanical settling time
time.sleep(HardwareConfig.WEIGHT_STABILIZATION_TIME * 0.2)
self.status = SensorStatus.READING
# Take multiple readings and average
readings = []
for _ in range(10):
adc_value = self.tare_value + int(actual_weight * self.calibration_factor)
noise = random.randint(-2, 2)
vibration = random.gauss(0, 5)
adc_value += int(noise + vibration)
readings.append(adc_value)
time.sleep(HardwareConfig.WEIGHT_READ_TIME / 10)
# Average and filter
avg_adc = int(np.mean(readings))
measured_weight = (avg_adc - self.tare_value) / self.calibration_factor
measured_weight = max(0.0, min(HardwareConfig.WEIGHT_MAX_CAPACITY, measured_weight))
measured_weight = round(measured_weight / HardwareConfig.WEIGHT_RESOLUTION) * HardwareConfig.WEIGHT_RESOLUTION
read_time = (time.time() - start_time) * 1000
self.status = SensorStatus.READY
self.last_stable_reading = measured_weight
return SensorReading(
value=round(measured_weight, 2),
raw_value=avg_adc,
voltage=3.3,
read_time_ms=round(read_time, 3),
noise_level=np.std(readings) / self.calibration_factor,
timestamp=datetime.now().isoformat()
)
class SensorHub:
"""Integrated sensor management system"""
def __init__(self):
self.ultrasonic = UltrasonicSensor()
self.moisture = MoistureSensor()
self.load_cell = LoadCellSensor()
self.temperature = 28.0
self.baseline = {
'distance_cm': 25.0,
'moisture': 0.3,
'weight_g': 150.0
}
def read_all_sensors(self) -> SensorData:
"""Read all sensors with realistic timing"""
start_time = time.time()
distance_reading = self.ultrasonic.trigger_read(self.baseline['distance_cm'])
moisture_reading = self.moisture.read_moisture(self.baseline['moisture'])
weight_reading = self.load_cell.read_weight(self.baseline['weight_g'])
self.temperature = 28.0 + random.uniform(-2, 2)
total_time = (time.time() - start_time) * 1000
return SensorData(
distance_cm=distance_reading.value,
distance_raw=distance_reading,
moisture_percent=moisture_reading.value * 100,
moisture_raw=moisture_reading,
weight_g=weight_reading.value,
weight_raw=weight_reading,
temperature=round(self.temperature, 2),
timestamp=datetime.now().isoformat(),
total_read_time_ms=round(total_time, 2)
)
def set_baseline(self, distance=None, moisture=None, weight=None):
"""Update sensor baselines for simulation"""
if distance is not None:
self.baseline['distance_cm'] = distance
if moisture is not None:
self.baseline['moisture'] = moisture
if weight is not None:
self.baseline['weight_g'] = weight
# ==================== REAL EDGE AI INFERENCE ====================
class EdgeAIProcessor:
"""Edge device AI inference with REAL trained model"""
def __init__(self, model_path: Optional[str] = None):
self.model_path = model_path or 'best_trash_classifier.pth'
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = None
self.model_loaded = False
self.model_version = "v1.0.0-mock"
self.warmup_complete = False
self.classes = []
# Edge device specs
self.device_temp = 45.0
self.cpu_usage = 25.0
# Image preprocessing transform
self.transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Try to load real model
self.load_model()
def load_model(self):
"""Load the trained PyTorch model"""
try:
if not os.path.exists(self.model_path):
st.warning(f"⚠️ Model file '{self.model_path}' not found. Using mock inference.")
self.classes = ["organic", "recyclable", "non-recyclable"]
return
# Load checkpoint
checkpoint = torch.load(self.model_path, map_location=self.device)
# Get class names from checkpoint
self.classes = checkpoint.get('class_names', ["organic", "recyclable", "non-recyclable"])
num_classes = len(self.classes)
# Build model architecture (same as training)
self.model = models.efficientnet_v2_s(weights=None)
num_features = self.model.classifier[1].in_features
self.model.classifier = nn.Sequential(
nn.Dropout(p=0.3, inplace=True),
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Dropout(p=0.3),
nn.Linear(512, num_classes)
)
# Load weights
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model = self.model.to(self.device)
self.model.eval()
self.model_loaded = True
self.model_version = f"v1.0.0-efficientnet-epoch{checkpoint.get('epoch', 'unknown')}"
st.success(f"✅ Model loaded successfully! Classes: {self.classes}")
except Exception as e:
st.error(f"❌ Failed to load model: {e}")
st.info("Using mock inference mode")
self.classes = ["organic", "recyclable", "non-recyclable"]
self.model_loaded = False
def warmup(self):
"""Warm up model (first inference is slower)"""
if not self.warmup_complete and self.model_loaded:
try:
dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
with torch.no_grad():
_ = self.model(dummy_input)
self.warmup_complete = True
except:
pass
def preprocess_image(self, image: Image.Image) -> Tuple[torch.Tensor, float]:
"""Preprocess image for inference with timing"""
start_time = time.time()
# Convert to RGB if needed
if image.mode != 'RGB':
image = image.convert('RGB')
# Apply transforms
img_tensor = self.transform(image)
img_tensor = img_tensor.unsqueeze(0) # Add batch dimension
preprocess_time = (time.time() - start_time) * 1000
return img_tensor, preprocess_time
def infer(self, image: Image.Image, sensor_data: SensorData) -> InferenceResult:
"""Run inference with real or mock model"""
self.warmup()
# Preprocessing
input_tensor, preprocess_time = self.preprocess_image(image)
# Inference
inference_start = time.time()
if self.model_loaded and self.model is not None:
# REAL MODEL INFERENCE
try:
input_tensor = input_tensor.to(self.device)
with torch.no_grad():
outputs = self.model(input_tensor)
logits = outputs[0].cpu().numpy()
except Exception as e:
st.error(f"Inference error: {e}")
# Fallback to mock
logits = self._mock_inference_with_fusion(sensor_data)
else:
# MOCK INFERENCE with sensor fusion
logits = self._mock_inference_with_fusion(sensor_data)
# Simulate realistic inference time
time.sleep(random.uniform(0.050, 0.150))
inference_time = (time.time() - inference_start) * 1000
# Postprocessing
postprocess_start = time.time()
# Softmax
exp_logits = np.exp(logits - np.max(logits))
probs = exp_logits / np.sum(exp_logits)
class_idx = int(np.argmax(probs))
waste_class = self.classes[class_idx]
confidence = float(probs[class_idx])
prob_dict = {cls: float(probs[i]) for i, cls in enumerate(self.classes)}
postprocess_time = (time.time() - postprocess_start) * 1000
# Update device metrics
self.device_temp = min(85.0, self.device_temp + random.uniform(-1, 2))
self.cpu_usage = min(100.0, max(20.0, self.cpu_usage + random.uniform(-10, 15)))
return InferenceResult(
waste_class=waste_class,
confidence=round(confidence, 4),
inference_time_ms=round(inference_time, 2),
preprocessing_time_ms=round(preprocess_time, 2),
postprocessing_time_ms=round(postprocess_time, 2),
probabilities=prob_dict,
model_version=self.model_version,
device_temperature=round(self.device_temp, 1),
cpu_usage_percent=round(self.cpu_usage, 1)
)
def _mock_inference_with_fusion(self, sensor_data: SensorData) -> np.ndarray:
"""Mock inference using multi-modal sensor fusion"""
logits = np.random.randn(len(self.classes)) * 0.3
# Moisture-based classification
moisture = sensor_data.moisture_percent / 100.0
if moisture > 0.5:
logits[0] += 2.5 # High moisture -> organic
elif moisture < 0.2:
logits[1] += 1.5 # Low moisture -> recyclable
else:
logits[2] += 1.0 # Medium moisture -> non-recyclable
# Weight-based refinement
weight = sensor_data.weight_g
if weight < 50:
logits[1] += 0.5 # Light items often recyclable
elif weight > 300:
logits[2] += 0.5 # Heavy items often non-recyclable
# Distance-based confidence adjustment
distance_factor = 1.0 - (sensor_data.distance_cm / 100.0)
logits *= (0.7 + 0.3 * distance_factor)
return logits
# ==================== REALISTIC MOTOR CONTROL ====================
class ServoMotor:
"""MG996R Servo Motor Simulation with realistic physics"""
# Bin routing angles
BIN_POSITIONS = {
"organic": 0,
"recyclable": 90,
"non-recyclable": 180
}
def __init__(self):
self.current_angle = 90
self.target_angle = 90
self.is_moving = False
self.speed_deg_per_sec = HardwareConfig.SERVO_SPEED / 0.17
self.voltage = 5.0
self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE
self.position_history = []
def calculate_movement_profile(self, start: int, end: int, duration_s: float) -> List[Tuple[float, int]]:
"""Calculate realistic servo movement with acceleration/deceleration"""
profile = []
steps = max(10, int(duration_s * 50))
for i in range(steps + 1):
t = i / steps
# S-curve motion profile
if t < 0.3:
progress = (t / 0.3) ** 2 * 0.3
elif t > 0.7:
progress = 1.0 - ((1.0 - t) / 0.3) ** 2 * 0.3
else:
progress = 0.3 + (t - 0.3) * (0.4 / 0.4)
angle = int(start + (end - start) * progress)
time_ms = t * duration_s * 1000
profile.append((round(time_ms, 1), angle))
return profile
def move_to_bin(self, waste_class: str) -> MotorControl:
"""Move servo to route waste to correct bin with realistic physics"""
target = self.BIN_POSITIONS.get(waste_class, 90)
start_angle = self.current_angle
start_time = time.time()
self.is_moving = True
self.target_angle = target
self.current_ma = HardwareConfig.SERVO_CURRENT_MOVING
angle_distance = abs(target - start_angle)
duration_s = (angle_distance / 60.0) * 0.17
movement_profile = self.calculate_movement_profile(start_angle, target, duration_s)
# Simulate movement
time.sleep(duration_s * 0.3)
avg_current = (HardwareConfig.SERVO_CURRENT_MOVING + HardwareConfig.SERVO_CURRENT_IDLE) / 2
power_mw = self.voltage * avg_current
torque_applied = HardwareConfig.SERVO_TORQUE * 0.7
self.current_angle = target
self.is_moving = False
self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE
actual_duration = (time.time() - start_time) * 1000
return MotorControl(
target_angle=target,
current_angle=target,
start_angle=start_angle,
status="completed",
duration_ms=round(actual_duration, 2),
power_consumption_mw=round(power_mw, 2),
torque_applied=round(torque_applied, 2),
movement_profile=movement_profile
)
def get_diagnostics(self) -> dict:
"""Get motor diagnostic information"""
return {
'current_angle': self.current_angle,
'target_angle': self.target_angle,
'is_moving': self.is_moving,
'current_ma': self.current_ma,
'voltage': self.voltage,
'temperature_c': 35.0 + random.uniform(-2, 5)
}
# ==================== CAMERA SIMULATION ====================
class CameraModule:
"""Raspberry Pi Camera Module v2 Simulation"""
def __init__(self):
self.resolution = HardwareConfig.CAMERA_RESOLUTION
self.is_warmed_up = False
self.frame_count = 0
def warmup(self):
"""Camera warmup (auto-exposure, white balance)"""
if not self.is_warmed_up:
time.sleep(HardwareConfig.CAMERA_WARMUP_TIME * 0.1)
self.is_warmed_up = True
def capture_image(self, uploaded_image: Optional[Image.Image] = None) -> Tuple[Image.Image, float]:
"""Capture image with realistic timing"""
self.warmup()
start_time = time.time()
time.sleep(HardwareConfig.CAMERA_CAPTURE_TIME * 0.5)
if uploaded_image:
image = uploaded_image
else:
image = self._generate_synthetic_waste()
capture_time = (time.time() - start_time) * 1000
self.frame_count += 1
return image, capture_time
def _generate_synthetic_waste(self) -> Image.Image:
"""Generate synthetic waste image for testing"""
colors = [
(139, 90, 43), # Brown (organic)
(200, 200, 200), # Gray (recyclable)
(50, 50, 50) # Dark (non-recyclable)
]
color = random.choice(colors)
color = tuple(max(0, min(255, c + random.randint(-30, 30))) for c in color)
image = Image.new('RGB', (224, 224), color=color)
return image
# ==================== DATA LOGGER ====================
class DataLogger:
"""Logs all events, telemetry, and hardware diagnostics to SQLite"""
def __init__(self, conn):
self.conn = conn
self.lock = threading.Lock()
def log_event(self, sensor_data: SensorData, inference: InferenceResult,
motor: MotorControl, image_name: str = "sample.jpg"):
"""Log complete segregation event with hardware metrics"""
with self.lock:
cursor = self.conn.cursor()
total_time = (sensor_data.total_read_time_ms +
inference.preprocessing_time_ms +
inference.inference_time_ms +
inference.postprocessing_time_ms +
motor.duration_ms)
cursor.execute('''
INSERT INTO events (
timestamp, image_name, waste_class, confidence,
distance_cm, moisture_raw, moisture_percent, weight_g,
servo_angle, inference_time_ms, total_pipeline_time_ms,
status, power_consumption_mw
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sensor_data.timestamp,
image_name,
inference.waste_class,
inference.confidence,
sensor_data.distance_cm,
sensor_data.moisture_raw.raw_value,
sensor_data.moisture_percent,
sensor_data.weight_g,
motor.target_angle,
inference.inference_time_ms,
total_time,
motor.status,
motor.power_consumption_mw
))
self.conn.commit()
def log_telemetry(self, component: str, data: dict):
"""Log component telemetry"""
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO telemetry (timestamp, component, data)
VALUES (?, ?, ?)
''', (
datetime.now().isoformat(),
component,
json.dumps(data)
))
self.conn.commit()
def log_hardware_diagnostics(self, component: str, status: str,
voltage: float = 0, current_ma: float = 0,
temperature: float = 0, error_count: int = 0):
"""Log hardware diagnostics"""
with self.lock:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO hardware_diagnostics
(timestamp, component, status, voltage, current_ma, temperature_c, error_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
component,
status,
voltage,
current_ma,
temperature,
error_count
))
self.conn.commit()
def get_recent_events(self, limit: int = 10) -> List[dict]:
"""Retrieve recent events"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM events
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def get_statistics(self) -> dict:
"""Get overall statistics"""
cursor = self.conn.cursor()
# Class distribution
cursor.execute('''
SELECT waste_class, COUNT(*) as count
FROM events
GROUP BY waste_class
''')
class_dist = dict(cursor.fetchall())
# Average confidence
cursor.execute('SELECT AVG(confidence) FROM events')
avg_conf = cursor.fetchone()[0] or 0
# Average pipeline time
cursor.execute('SELECT AVG(total_pipeline_time_ms) FROM events')
avg_time = cursor.fetchone()[0] or 0
# Total events
cursor.execute('SELECT COUNT(*) FROM events')
total = cursor.fetchone()[0]
# Total power consumption
cursor.execute('SELECT SUM(power_consumption_mw) FROM events')
total_power = cursor.fetchone()[0] or 0
return {
'total_events': total,
'class_distribution': class_dist,
'average_confidence': round(avg_conf, 4),
'average_pipeline_time_ms': round(avg_time, 2),
'total_power_consumption_mwh': round(total_power / 3600000, 4)
}
def get_hardware_diagnostics(self, limit: int = 20) -> List[dict]:
"""Get recent hardware diagnostics"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM hardware_diagnostics
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
# ==================== STREAMLIT DASHBOARD ====================
def main():
st.set_page_config(
page_title="Smart Waste Segregation System",
page_icon="♻️",
layout="wide"
)
# Initialize components
if 'db_conn' not in st.session_state:
st.session_state.db_conn = init_database()
if 'sensor_hub' not in st.session_state:
st.session_state.sensor_hub = SensorHub()
if 'edge_ai' not in st.session_state:
st.session_state.edge_ai = EdgeAIProcessor()
if 'servo' not in st.session_state:
st.session_state.servo = ServoMotor()
if 'camera' not in st.session_state:
st.session_state.camera = CameraModule()
if 'logger' not in st.session_state:
st.session_state.logger = DataLogger(st.session_state.db_conn)
if 'execution_log' not in st.session_state:
st.session_state.execution_log = []
# Header
st.title("♻️ Smart Waste Segregation System - Hardware Simulator")
st.markdown("**IDEA-ONE Hackathon | Team Trackify | Edge AI Deployment Demo**")
# Model status indicator
if st.session_state.edge_ai.model_loaded:
st.success(f"✅ Real Model Loaded: {st.session_state.edge_ai.model_version}")
else:
st.warning("⚠️ Using Mock Inference (Model not found)")
st.markdown("---")
# Hardware status bar
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric("🌡️ Device Temp", f"{st.session_state.edge_ai.device_temp:.1f}°C")
with col2:
st.metric("💻 CPU Usage", f"{st.session_state.edge_ai.cpu_usage:.1f}%")
with col3:
st.metric("⚡ Servo Angle", f"{st.session_state.servo.current_angle}°")
with col4:
st.metric("📷 Frames", st.session_state.camera.frame_count)
with col5:
st.metric("🔌 Servo Current", f"{st.session_state.servo.current_ma} mA")
st.markdown("---")
# Tabs
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"🔄 Live Pipeline",
"📊 Dashboard",
"📋 Event Logs",
"🔧 Hardware Diagnostics",
"📡 Sensor Details"
])
# ==================== TAB 1: LIVE PIPELINE ====================
with tab1:
col1, col2 = st.columns([1, 2])
with col1:
st.subheader("⚙️ Sensor Configuration")
st.markdown("##### Ultrasonic Sensor (HC-SR04)")
distance = st.slider("Distance (cm)", 2.0, 60.0, 25.0, 0.5)
st.markdown("##### Capacitive Moisture Sensor")
moisture = st.slider("Moisture (%)", 0.0, 100.0, 30.0, 1.0) / 100.0
st.markdown("##### Load Cell (HX711)")
weight = st.slider("Weight (g)", 0.0, 1000.0, 150.0, 1.0)
st.session_state.sensor_hub.set_baseline(distance, moisture, weight)
st.markdown("---")
# Image upload
st.subheader("📷 Camera Input")
uploaded_file = st.file_uploader(
"Upload waste image (optional)",
type=['jpg', 'png', 'jpeg']
)
if uploaded_file:
image = Image.open(uploaded_file)
st.image(image, width=250, caption="Uploaded Image")
else:
st.info("No image uploaded. Will use synthetic waste image.")
st.markdown("---")
# Execute button
if st.button("🚀 Run Complete Pipeline", type="primary", use_container_width=True):
st.session_state.execution_log = []
with st.spinner("Processing..."):
pipeline_start = time.time()
# Step 1: Camera capture
st.session_state.execution_log.append("📷 Capturing image from camera...")
try:
if uploaded_file:
image = Image.open(uploaded_file)
capture_time = 0
else:
image, capture_time = st.session_state.camera.capture_image()
st.session_state.execution_log.append(f" ✓ Image captured ({capture_time:.2f}ms)")
except Exception as e:
st.session_state.execution_log.append(f" ⚠️ Image capture failed: {e}")
st.session_state.execution_log.append(" 🔄 Generating synthetic image...")
image = st.session_state.camera._generate_synthetic_waste()
capture_time = 0
# Step 2: Sensor reading
st.session_state.execution_log.append("📡 Reading sensor suite...")
sensor_data = st.session_state.sensor_hub.read_all_sensors()
st.session_state.execution_log.append(f" ✓ Sensors read ({sensor_data.total_read_time_ms:.2f}ms)")
st.session_state.execution_log.append(f" - Distance: {sensor_data.distance_cm} cm")
st.session_state.execution_log.append(f" - Moisture: {sensor_data.moisture_percent:.1f}%")
st.session_state.execution_log.append(f" - Weight: {sensor_data.weight_g} g")
# Log sensor telemetry
st.session_state.logger.log_telemetry("sensors", {
'distance': asdict(sensor_data.distance_raw),
'moisture': asdict(sensor_data.moisture_raw),
'weight': asdict(sensor_data.weight_raw),
'temperature': sensor_data.temperature
})
# Step 3: AI Inference
st.session_state.execution_log.append("🤖 Running edge AI inference...")
inference = st.session_state.edge_ai.infer(image, sensor_data)
st.session_state.execution_log.append(f" ✓ Inference complete ({inference.inference_time_ms:.2f}ms)")
st.session_state.execution_log.append(f" - Preprocessing: {inference.preprocessing_time_ms:.2f}ms")
st.session_state.execution_log.append(f" - Model inference: {inference.inference_time_ms:.2f}ms")
st.session_state.execution_log.append(f" - Postprocessing: {inference.postprocessing_time_ms:.2f}ms")
st.session_state.execution_log.append(f" - Classification: {inference.waste_class} ({inference.confidence*100:.2f}%)")
# Log AI telemetry
st.session_state.logger.log_telemetry("edge_ai", asdict(inference))
# Step 4: Motor control
st.session_state.execution_log.append(f"⚙️ Actuating servo to {inference.waste_class} bin...")
motor = st.session_state.servo.move_to_bin(inference.waste_class)
st.session_state.execution_log.append(f" ✓ Servo moved ({motor.duration_ms:.2f}ms)")
st.session_state.execution_log.append(f" - Start angle: {motor.start_angle}°")
st.session_state.execution_log.append(f" - Target angle: {motor.target_angle}°")
st.session_state.execution_log.append(f" - Power consumption: {motor.power_consumption_mw:.2f} mW")
# Log motor telemetry
st.session_state.logger.log_telemetry("servo_motor", asdict(motor))
# Step 5: Log event
st.session_state.execution_log.append("💾 Logging event to database...")
image_name = uploaded_file.name if uploaded_file else "synthetic.jpg"
st.session_state.logger.log_event(sensor_data, inference, motor, image_name)
# Log hardware diagnostics
st.session_state.logger.log_hardware_diagnostics(
"ultrasonic", "operational", 3.3, 15, sensor_data.temperature,
st.session_state.sensor_hub.ultrasonic.error_count
)
st.session_state.logger.log_hardware_diagnostics(
"servo_motor", "operational", 5.0, st.session_state.servo.current_ma,
35.0, 0
)
pipeline_time = (time.time() - pipeline_start) * 1000
st.session_state.execution_log.append(f"✅ Pipeline completed! Total time: {pipeline_time:.2f}ms")
# Store results
st.session_state.last_result = {
'sensor': sensor_data,
'inference': inference,
'motor': motor,
'image': image,
'pipeline_time': pipeline_time
}
st.success("✅ Segregation completed successfully!")
st.balloons()
with col2:
st.subheader("🔄 Pipeline Execution Log")
# Execution log
if st.session_state.execution_log:
log_container = st.container()
with log_container:
for log_entry in st.session_state.execution_log:
if "✓" in log_entry or "✅" in log_entry:
st.success(log_entry)
elif "📡" in log_entry or "🤖" in log_entry or "⚙️" in log_entry or "📷" in log_entry:
st.info(log_entry)
else:
st.write(log_entry)
st.markdown("---")
# Results
if 'last_result' in st.session_state:
res = st.session_state.last_result
st.subheader("📊 Pipeline Results")
# Display captured image
col_img, col_metrics = st.columns([1, 2])
with col_img:
st.image(res['image'], caption="Processed Image", use_container_width=True)
with col_metrics:
st.metric("🗑️ Classification", res['inference'].waste_class.upper())
st.metric("📊 Confidence", f"{res['inference'].confidence*100:.2f}%")
st.metric("🤖 Inference Time", f"{res['inference'].inference_time_ms:.2f} ms")
st.metric("⏱️ Total Pipeline Time", f"{res['pipeline_time']:.2f} ms")
st.markdown("---")
st.metric("⚙️ Servo Angle", f"{res['motor'].target_angle}°")
st.metric("⚡ Power Used", f"{res['motor'].power_consumption_mw:.2f} mW")
st.markdown("---")
# Class probabilities
st.markdown("#### 📈 Classification Probabilities")
for cls, prob in res['inference'].probabilities.items():
st.progress(prob, text=f"{cls.capitalize()}: {prob*100:.2f}%")
st.markdown("---")
# Detailed telemetry
st.subheader("📡 Detailed Component Telemetry")
tel_col1, tel_col2, tel_col3 = st.columns(3)
with tel_col1:
with st.expander("📏 Sensor Readings", expanded=False):
sensor_data = {
'distance_cm': res['sensor'].distance_cm,
'distance_raw_adc': res['sensor'].distance_raw.raw_value,
'moisture_percent': res['sensor'].moisture_percent,
'moisture_raw_adc': res['sensor'].moisture_raw.raw_value,
'weight_g': res['sensor'].weight_g,
'weight_raw_adc': res['sensor'].weight_raw.raw_value,
'temperature_c': res['sensor'].temperature,
'total_read_time_ms': res['sensor'].total_read_time_ms
}
st.json(sensor_data)
with tel_col2:
with st.expander("🤖 AI Inference", expanded=False):
inference_data = {
'waste_class': res['inference'].waste_class,
'confidence': res['inference'].confidence,
'probabilities': res['inference'].probabilities,
'preprocessing_ms': res['inference'].preprocessing_time_ms,
'inference_ms': res['inference'].inference_time_ms,
'postprocessing_ms': res['inference'].postprocessing_time_ms,
'model_version': res['inference'].model_version,
'device_temp_c': res['inference'].device_temperature,
'cpu_usage_percent': res['inference'].cpu_usage_percent
}
st.json(inference_data)
with tel_col3:
with st.expander("⚙️ Motor Control", expanded=False):
motor_data = {
'start_angle': res['motor'].start_angle,
'target_angle': res['motor'].target_angle,
'current_angle': res['motor'].current_angle,
'duration_ms': res['motor'].duration_ms,
'power_consumption_mw': res['motor'].power_consumption_mw,
'torque_applied_kgcm': res['motor'].torque_applied,
'status': res['motor'].status,
'movement_steps': len(res['motor'].movement_profile)
}
st.json(motor_data)
else:
st.info("👈 Configure sensors and click 'Run Complete Pipeline' to start")
st.markdown("### 🗺️ System Architecture")
st.markdown("""
**Hardware Components:**
- 📷 **Camera**: Raspberry Pi Camera Module v2 (1920x1080)
- 📏 **Ultrasonic**: HC-SR04 (2-400cm range)
- 💧 **Moisture**: Capacitive sensor (12-bit ADC)
- ⚖️ **Weight**: HX711 Load Cell (5kg capacity)
- ⚙️ **Servo**: MG996R (180° rotation, 11 kg-cm torque)
- 💻 **Edge Device**: Raspberry Pi 4 (4GB RAM, Quad-core)
**Pipeline Flow:**
1. Camera captures waste image
2. Sensors read physical properties
3. Edge AI processes multi-modal data
4. Servo routes waste to correct bin
5. Event logged to database
""")
# ==================== TAB 2: DASHBOARD ====================
with tab2:
st.subheader("📊 System Statistics")
stats = st.session_state.logger.get_statistics()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Events", stats['total_events'])
with col2:
st.metric("Avg Confidence", f"{stats['average_confidence']*100:.2f}%")
with col3:
st.metric("Avg Pipeline Time", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms")
with col4:
st.metric("Total Power Used", f"{stats.get('total_power_consumption_mwh', 0):.4f} mWh")
st.markdown("---")
# Class distribution chart
col_chart1, col_chart2 = st.columns(2)
with col_chart1:
st.subheader("📈 Waste Class Distribution")
if stats['class_distribution']:
import pandas as pd
df = pd.DataFrame(
list(stats['class_distribution'].items()),
columns=['Class', 'Count']
)
st.bar_chart(df.set_index('Class'))
else:
st.info("No data yet. Run some segregations first!")
with col_chart2:
st.subheader("⚡ System Performance")
if stats['total_events'] > 0:
st.metric("Average Inference", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms")
st.metric("System Efficiency", f"{stats['average_confidence']*100:.1f}%")
st.metric("Power Efficiency", f"{stats.get('total_power_consumption_mwh', 0)*1000:.2f} mW")
else:
st.info("No performance data yet.")
# ==================== TAB 3: EVENT LOGS ====================
with tab3:
st.subheader("📋 Recent Events")
num_events = st.slider("Number of events to display", 5, 50, 10)
events = st.session_state.logger.get_recent_events(num_events)
if events:
import pandas as pd
df = pd.DataFrame(events)
display_cols = [
'timestamp', 'waste_class', 'confidence',
'distance_cm', 'moisture_percent', 'weight_g',
'servo_angle', 'inference_time_ms', 'total_pipeline_time_ms',
'power_consumption_mw', 'status'
]
available_cols = [col for col in display_cols if col in df.columns]
st.dataframe(
df[available_cols],
use_container_width=True,
hide_index=True
)
# Download button
csv = df.to_csv(index=False)
st.download_button(
"📥 Download CSV",
csv,
"events.csv",
"text/csv",
use_container_width=True
)
else:
st.info("No events logged yet.")
# ==================== TAB 4: HARDWARE DIAGNOSTICS ====================
with tab4:
st.subheader("🔧 Hardware Diagnostics")
diagnostics = st.session_state.logger.get_hardware_diagnostics(20)
if diagnostics:
import pandas as pd
df = pd.DataFrame(diagnostics)
st.dataframe(
df[['timestamp', 'component', 'status', 'voltage', 'current_ma', 'temperature_c', 'error_count']],
use_container_width=True,
hide_index=True
)
st.markdown("---")
# Component status
st.subheader("📊 Component Status")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("##### Ultrasonic Sensor")
st.write(f"Status: {st.session_state.sensor_hub.ultrasonic.status.value}")
st.write(f"Errors: {st.session_state.sensor_hub.ultrasonic.error_count}")
st.write(f"Last Reading: {st.session_state.sensor_hub.ultrasonic.last_reading} cm")
with col2:
st.markdown("##### Servo Motor")
motor_diag = st.session_state.servo.get_diagnostics()
st.write(f"Angle: {motor_diag['current_angle']}°")
st.write(f"Current: {motor_diag['current_ma']} mA")
st.write(f"Temperature: {motor_diag['temperature_c']:.1f}°C")
with col3:
st.markdown("##### Edge Device")
st.write(f"CPU: {st.session_state.edge_ai.cpu_usage:.1f}%")
st.write(f"Temperature: {st.session_state.edge_ai.device_temp:.1f}°C")
st.write(f"Model: {st.session_state.edge_ai.model_version}")
else:
st.info("No diagnostics data yet.")
# ==================== TAB 5: SENSOR DETAILS ====================
with tab5:
st.subheader("📡 Detailed Sensor Information")
col1, col2 = st.columns(2)
with col1:
st.markdown("### Ultrasonic Sensor (HC-SR04)")
st.markdown(f"""
- **Range**: {HardwareConfig.ULTRASONIC_MIN_DISTANCE} - {HardwareConfig.ULTRASONIC_MAX_DISTANCE} cm
- **Accuracy**: ±{HardwareConfig.ULTRASONIC_ACCURACY} cm
- **Read Time**: {HardwareConfig.ULTRASONIC_READ_TIME*1000:.1f} ms
- **Operating Voltage**: 5V DC
- **Current**: 15 mA
- **Frequency**: 40 kHz
""")
st.markdown("### Capacitive Moisture Sensor")
st.markdown(f"""
- **Voltage Range**: {HardwareConfig.MOISTURE_MIN_VOLTAGE} - {HardwareConfig.MOISTURE_MAX_VOLTAGE} V
- **ADC Resolution**: {HardwareConfig.MOISTURE_ADC_RESOLUTION} bits ({2**HardwareConfig.MOISTURE_ADC_RESOLUTION} levels)
- **Read Time**: {HardwareConfig.MOISTURE_READ_TIME*1000:.1f} ms
- **Operating Voltage**: 3.3V - 5.5V
- **Current**: 5 mA
""")
with col2:
st.markdown("### Load Cell (HX711)")
st.markdown(f"""
- **Capacity**: {HardwareConfig.WEIGHT_MAX_CAPACITY} g
- **Resolution**: {HardwareConfig.WEIGHT_RESOLUTION} g
- **Stabilization Time**: {HardwareConfig.WEIGHT_STABILIZATION_TIME*1000:.0f} ms
- **Read Time**: {HardwareConfig.WEIGHT_READ_TIME*1000:.0f} ms
- **ADC**: 24-bit
- **Sample Rate**: 10/80 Hz
""")
st.markdown("### Servo Motor (MG996R)")
st.markdown(f"""
- **Rotation**: {HardwareConfig.SERVO_MIN_ANGLE}° - {HardwareConfig.SERVO_MAX_ANGLE}°
- **Speed**: 60°/0.17s (at 4.8V)
- **Torque**: {HardwareConfig.SERVO_TORQUE} kg-cm
- **Operating Voltage**: 4.8V - 7.2V
- **Current (Idle)**: {HardwareConfig.SERVO_CURRENT_IDLE} mA
- **Current (Moving)**: {HardwareConfig.SERVO_CURRENT_MOVING} mA
""")
st.markdown("---")
st.markdown("### 📷 Camera Module (Raspberry Pi Camera v2)")
st.markdown(f"""
- **Sensor**: Sony IMX219 8MP
- **Resolution**: {HardwareConfig.CAMERA_RESOLUTION[0]}x{HardwareConfig.CAMERA_RESOLUTION[1]}
- **Capture Time**: {HardwareConfig.CAMERA_CAPTURE_TIME*1000:.0f} ms
- **Warmup Time**: {HardwareConfig.CAMERA_WARMUP_TIME:.1f} s
- **Interface**: CSI (Camera Serial Interface)
- **Frame Rate**: 30 fps (1080p)
""")
st.markdown("### 💻 Edge Device (Raspberry Pi 4)")
st.markdown(f"""
- **CPU**: Quad-core Cortex-A72 @ 1.5GHz
- **RAM**: {HardwareConfig.RAM_MB} MB
- **Cores**: {HardwareConfig.CPU_CORES}
- **Inference Overhead**: {HardwareConfig.INFERENCE_OVERHEAD*1000:.0f} ms
- **Operating System**: Raspberry Pi OS (Linux)
- **AI Framework**: PyTorch / ONNX Runtime
""")
# Footer
st.markdown("---")
st.caption("Smart Waste Segregation System | LDRP-ITR, Gandhinagar | IDEA-ONE Hackathon 2025")
st.caption("Hardware-Realistic Simulation | Edge AI Deployment | Multi-Modal Sensor Fusion")
if __name__ == "__main__":
main()