Spaces:
Sleeping
Sleeping
| """ | |
| Smart Waste Segregation System - Hardware-Realistic Simulator with Real Model | |
| Simulates complete edge deployment with realistic sensor protocols, motor control, and hardware timing | |
| Dependencies: pip install streamlit numpy pillow opencv-python-headless sqlite3 torch torchvision | |
| """ | |
| import streamlit as st | |
| import sqlite3 | |
| import json | |
| import time | |
| import random | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import base64 | |
| from datetime import datetime | |
| from dataclasses import dataclass, asdict | |
| from typing import Dict, List, Optional, Tuple | |
| import threading | |
| import queue | |
| from enum import Enum | |
| import os | |
| # PyTorch imports | |
| import torch | |
| import torch.nn as nn | |
| from torchvision import transforms, models | |
| # ==================== HARDWARE CONSTANTS ==================== | |
| class HardwareConfig: | |
| """Realistic hardware specifications""" | |
| # Ultrasonic Sensor (HC-SR04) | |
| ULTRASONIC_MIN_DISTANCE = 2.0 # cm | |
| ULTRASONIC_MAX_DISTANCE = 400.0 # cm | |
| ULTRASONIC_ACCURACY = 0.3 # cm | |
| ULTRASONIC_READ_TIME = 0.015 # seconds (15ms) | |
| # Capacitive Moisture Sensor | |
| MOISTURE_MIN_VOLTAGE = 0.0 # V (dry) | |
| MOISTURE_MAX_VOLTAGE = 3.3 # V (wet) | |
| MOISTURE_ADC_RESOLUTION = 12 # bits (4096 levels) | |
| MOISTURE_READ_TIME = 0.010 # seconds (10ms) | |
| # Load Cell (HX711) | |
| WEIGHT_MAX_CAPACITY = 5000.0 # grams (5kg) | |
| WEIGHT_RESOLUTION = 0.1 # grams | |
| WEIGHT_STABILIZATION_TIME = 0.5 # seconds | |
| WEIGHT_READ_TIME = 0.100 # seconds (100ms) | |
| # Servo Motor (MG996R) | |
| SERVO_MIN_ANGLE = 0 | |
| SERVO_MAX_ANGLE = 180 | |
| SERVO_SPEED = 60 # degrees per 0.17 seconds (at 4.8V) | |
| SERVO_CURRENT_IDLE = 10 # mA | |
| SERVO_CURRENT_MOVING = 500 # mA | |
| SERVO_TORQUE = 11 # kg-cm | |
| # Camera (Raspberry Pi Camera Module v2) | |
| CAMERA_RESOLUTION = (1920, 1080) | |
| CAMERA_CAPTURE_TIME = 0.150 # seconds | |
| CAMERA_WARMUP_TIME = 2.0 # seconds | |
| # Edge Device (Raspberry Pi 4) | |
| CPU_CORES = 4 | |
| RAM_MB = 4096 | |
| INFERENCE_OVERHEAD = 0.050 # seconds | |
| # ==================== DATABASE SETUP ==================== | |
| def init_database(): | |
| """Initialize SQLite database with required tables""" | |
| conn = sqlite3.connect('waste_segregation.db', check_same_thread=False) | |
| cursor = conn.cursor() | |
| # Check if we need to migrate old database | |
| try: | |
| cursor.execute("SELECT total_pipeline_time_ms FROM events LIMIT 1") | |
| except sqlite3.OperationalError: | |
| # Column doesn't exist, need to add it | |
| try: | |
| cursor.execute("ALTER TABLE events ADD COLUMN total_pipeline_time_ms REAL") | |
| conn.commit() | |
| st.info("✅ Database schema updated successfully") | |
| except sqlite3.OperationalError: | |
| # Table doesn't exist at all, will be created below | |
| pass | |
| # Events table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| image_name TEXT, | |
| waste_class TEXT NOT NULL, | |
| confidence REAL NOT NULL, | |
| distance_cm REAL, | |
| moisture_raw INTEGER, | |
| moisture_percent REAL, | |
| weight_g REAL, | |
| servo_angle INTEGER, | |
| inference_time_ms REAL, | |
| total_pipeline_time_ms REAL, | |
| status TEXT, | |
| power_consumption_mw REAL | |
| ) | |
| ''') | |
| # Telemetry table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS telemetry ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| component TEXT NOT NULL, | |
| data TEXT NOT NULL | |
| ) | |
| ''') | |
| # Hardware diagnostics table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS hardware_diagnostics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| component TEXT NOT NULL, | |
| status TEXT NOT NULL, | |
| voltage REAL, | |
| current_ma REAL, | |
| temperature_c REAL, | |
| error_count INTEGER | |
| ) | |
| ''') | |
| conn.commit() | |
| return conn | |
| # ==================== DATA MODELS ==================== | |
| class SensorReading: | |
| """Raw sensor reading with hardware-realistic properties""" | |
| value: float | |
| raw_value: int # ADC value | |
| voltage: float | |
| read_time_ms: float | |
| noise_level: float | |
| timestamp: str | |
| class SensorData: | |
| """Complete sensor suite readings""" | |
| distance_cm: float | |
| distance_raw: SensorReading | |
| moisture_percent: float | |
| moisture_raw: SensorReading | |
| weight_g: float | |
| weight_raw: SensorReading | |
| temperature: float | |
| timestamp: str | |
| total_read_time_ms: float | |
| class InferenceResult: | |
| """AI model inference output with edge device metrics""" | |
| waste_class: str | |
| confidence: float | |
| inference_time_ms: float | |
| preprocessing_time_ms: float | |
| postprocessing_time_ms: float | |
| probabilities: Dict[str, float] | |
| model_version: str | |
| device_temperature: float | |
| cpu_usage_percent: float | |
| class MotorControl: | |
| """Servo motor control with realistic physics""" | |
| target_angle: int | |
| current_angle: int | |
| start_angle: int | |
| status: str | |
| duration_ms: float | |
| power_consumption_mw: float | |
| torque_applied: float | |
| movement_profile: List[Tuple[float, int]] # (time_ms, angle) | |
| class SensorStatus(Enum): | |
| """Sensor operational status""" | |
| READY = "ready" | |
| READING = "reading" | |
| STABILIZING = "stabilizing" | |
| ERROR = "error" | |
| CALIBRATING = "calibrating" | |
| # ==================== REALISTIC SENSOR SIMULATION ==================== | |
| class UltrasonicSensor: | |
| """HC-SR04 Ultrasonic Sensor Simulation""" | |
| def __init__(self): | |
| self.status = SensorStatus.READY | |
| self.error_count = 0 | |
| self.last_reading = None | |
| self.calibration_offset = random.uniform(-0.2, 0.2) | |
| def trigger_read(self, actual_distance: float) -> SensorReading: | |
| """Simulate ultrasonic pulse and echo timing""" | |
| start_time = time.time() | |
| self.status = SensorStatus.READING | |
| # Simulate trigger pulse (10μs) and echo wait | |
| time.sleep(HardwareConfig.ULTRASONIC_READ_TIME) | |
| # Add realistic noise and environmental factors | |
| noise = random.gauss(0, HardwareConfig.ULTRASONIC_ACCURACY) | |
| temperature_factor = 1.0 + random.uniform(-0.02, 0.02) | |
| # Simulate occasional read errors (1% chance) | |
| if random.random() < 0.01: | |
| self.error_count += 1 | |
| measured_distance = -1.0 # Error code | |
| else: | |
| measured_distance = (actual_distance + noise + self.calibration_offset) * temperature_factor | |
| measured_distance = max(HardwareConfig.ULTRASONIC_MIN_DISTANCE, | |
| min(HardwareConfig.ULTRASONIC_MAX_DISTANCE, measured_distance)) | |
| # Calculate echo time (distance = speed * time / 2) | |
| echo_time_us = (measured_distance * 2) / 0.0343 if measured_distance > 0 else 0 | |
| read_time = (time.time() - start_time) * 1000 | |
| self.status = SensorStatus.READY | |
| self.last_reading = measured_distance | |
| return SensorReading( | |
| value=round(measured_distance, 2), | |
| raw_value=int(echo_time_us), | |
| voltage=3.3, | |
| read_time_ms=round(read_time, 3), | |
| noise_level=abs(noise), | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| class MoistureSensor: | |
| """Capacitive Soil Moisture Sensor Simulation""" | |
| def __init__(self): | |
| self.status = SensorStatus.READY | |
| self.adc_resolution = 2 ** HardwareConfig.MOISTURE_ADC_RESOLUTION | |
| self.calibration_dry = random.randint(3200, 3400) | |
| self.calibration_wet = random.randint(1200, 1400) | |
| def read_moisture(self, actual_moisture: float) -> SensorReading: | |
| """Read capacitive moisture sensor via ADC""" | |
| start_time = time.time() | |
| self.status = SensorStatus.READING | |
| time.sleep(HardwareConfig.MOISTURE_READ_TIME) | |
| # Convert moisture percentage to ADC value | |
| adc_range = self.calibration_dry - self.calibration_wet | |
| adc_value = self.calibration_dry - (actual_moisture * adc_range) | |
| # Add ADC noise (±2 LSB typical) | |
| adc_noise = random.randint(-2, 2) | |
| adc_value = int(adc_value + adc_noise) | |
| adc_value = max(0, min(self.adc_resolution - 1, adc_value)) | |
| # Convert ADC to voltage | |
| voltage = (adc_value / self.adc_resolution) * HardwareConfig.MOISTURE_MAX_VOLTAGE | |
| # Convert back to percentage | |
| measured_moisture = (self.calibration_dry - adc_value) / adc_range | |
| measured_moisture = max(0.0, min(1.0, measured_moisture)) | |
| read_time = (time.time() - start_time) * 1000 | |
| self.status = SensorStatus.READY | |
| return SensorReading( | |
| value=round(measured_moisture, 4), | |
| raw_value=adc_value, | |
| voltage=round(voltage, 3), | |
| read_time_ms=round(read_time, 3), | |
| noise_level=abs(adc_noise / adc_range) if adc_range != 0 else 0.0, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| class LoadCellSensor: | |
| """HX711 Load Cell Amplifier Simulation""" | |
| def __init__(self): | |
| self.status = SensorStatus.READY | |
| self.tare_value = random.randint(-50000, 50000) | |
| self.calibration_factor = random.uniform(420, 450) | |
| self.last_stable_reading = 0.0 | |
| self.reading_buffer = [] | |
| def read_weight(self, actual_weight: float) -> SensorReading: | |
| """Read load cell with stabilization""" | |
| start_time = time.time() | |
| self.status = SensorStatus.STABILIZING | |
| # Simulate mechanical settling time | |
| time.sleep(HardwareConfig.WEIGHT_STABILIZATION_TIME * 0.2) | |
| self.status = SensorStatus.READING | |
| # Take multiple readings and average | |
| readings = [] | |
| for _ in range(10): | |
| adc_value = self.tare_value + int(actual_weight * self.calibration_factor) | |
| noise = random.randint(-2, 2) | |
| vibration = random.gauss(0, 5) | |
| adc_value += int(noise + vibration) | |
| readings.append(adc_value) | |
| time.sleep(HardwareConfig.WEIGHT_READ_TIME / 10) | |
| # Average and filter | |
| avg_adc = int(np.mean(readings)) | |
| measured_weight = (avg_adc - self.tare_value) / self.calibration_factor | |
| measured_weight = max(0.0, min(HardwareConfig.WEIGHT_MAX_CAPACITY, measured_weight)) | |
| measured_weight = round(measured_weight / HardwareConfig.WEIGHT_RESOLUTION) * HardwareConfig.WEIGHT_RESOLUTION | |
| read_time = (time.time() - start_time) * 1000 | |
| self.status = SensorStatus.READY | |
| self.last_stable_reading = measured_weight | |
| return SensorReading( | |
| value=round(measured_weight, 2), | |
| raw_value=avg_adc, | |
| voltage=3.3, | |
| read_time_ms=round(read_time, 3), | |
| noise_level=np.std(readings) / self.calibration_factor, | |
| timestamp=datetime.now().isoformat() | |
| ) | |
| class SensorHub: | |
| """Integrated sensor management system""" | |
| def __init__(self): | |
| self.ultrasonic = UltrasonicSensor() | |
| self.moisture = MoistureSensor() | |
| self.load_cell = LoadCellSensor() | |
| self.temperature = 28.0 | |
| self.baseline = { | |
| 'distance_cm': 25.0, | |
| 'moisture': 0.3, | |
| 'weight_g': 150.0 | |
| } | |
| def read_all_sensors(self) -> SensorData: | |
| """Read all sensors with realistic timing""" | |
| start_time = time.time() | |
| distance_reading = self.ultrasonic.trigger_read(self.baseline['distance_cm']) | |
| moisture_reading = self.moisture.read_moisture(self.baseline['moisture']) | |
| weight_reading = self.load_cell.read_weight(self.baseline['weight_g']) | |
| self.temperature = 28.0 + random.uniform(-2, 2) | |
| total_time = (time.time() - start_time) * 1000 | |
| return SensorData( | |
| distance_cm=distance_reading.value, | |
| distance_raw=distance_reading, | |
| moisture_percent=moisture_reading.value * 100, | |
| moisture_raw=moisture_reading, | |
| weight_g=weight_reading.value, | |
| weight_raw=weight_reading, | |
| temperature=round(self.temperature, 2), | |
| timestamp=datetime.now().isoformat(), | |
| total_read_time_ms=round(total_time, 2) | |
| ) | |
| def set_baseline(self, distance=None, moisture=None, weight=None): | |
| """Update sensor baselines for simulation""" | |
| if distance is not None: | |
| self.baseline['distance_cm'] = distance | |
| if moisture is not None: | |
| self.baseline['moisture'] = moisture | |
| if weight is not None: | |
| self.baseline['weight_g'] = weight | |
| # ==================== REAL EDGE AI INFERENCE ==================== | |
| class EdgeAIProcessor: | |
| """Edge device AI inference with REAL trained model""" | |
| def __init__(self, model_path: Optional[str] = None): | |
| self.model_path = model_path or 'best_trash_classifier.pth' | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.model = None | |
| self.model_loaded = False | |
| self.model_version = "v1.0.0-mock" | |
| self.warmup_complete = False | |
| self.classes = [] | |
| # Edge device specs | |
| self.device_temp = 45.0 | |
| self.cpu_usage = 25.0 | |
| # Image preprocessing transform | |
| self.transform = transforms.Compose([ | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| # Try to load real model | |
| self.load_model() | |
| def load_model(self): | |
| """Load the trained PyTorch model""" | |
| try: | |
| if not os.path.exists(self.model_path): | |
| st.warning(f"⚠️ Model file '{self.model_path}' not found. Using mock inference.") | |
| self.classes = ["organic", "recyclable", "non-recyclable"] | |
| return | |
| # Load checkpoint | |
| checkpoint = torch.load(self.model_path, map_location=self.device) | |
| # Get class names from checkpoint | |
| self.classes = checkpoint.get('class_names', ["organic", "recyclable", "non-recyclable"]) | |
| num_classes = len(self.classes) | |
| # Build model architecture (same as training) | |
| self.model = models.efficientnet_v2_s(weights=None) | |
| num_features = self.model.classifier[1].in_features | |
| self.model.classifier = nn.Sequential( | |
| nn.Dropout(p=0.3, inplace=True), | |
| nn.Linear(num_features, 512), | |
| nn.ReLU(), | |
| nn.Dropout(p=0.3), | |
| nn.Linear(512, num_classes) | |
| ) | |
| # Load weights | |
| self.model.load_state_dict(checkpoint['model_state_dict']) | |
| self.model = self.model.to(self.device) | |
| self.model.eval() | |
| self.model_loaded = True | |
| self.model_version = f"v1.0.0-efficientnet-epoch{checkpoint.get('epoch', 'unknown')}" | |
| st.success(f"✅ Model loaded successfully! Classes: {self.classes}") | |
| except Exception as e: | |
| st.error(f"❌ Failed to load model: {e}") | |
| st.info("Using mock inference mode") | |
| self.classes = ["organic", "recyclable", "non-recyclable"] | |
| self.model_loaded = False | |
| def warmup(self): | |
| """Warm up model (first inference is slower)""" | |
| if not self.warmup_complete and self.model_loaded: | |
| try: | |
| dummy_input = torch.randn(1, 3, 224, 224).to(self.device) | |
| with torch.no_grad(): | |
| _ = self.model(dummy_input) | |
| self.warmup_complete = True | |
| except: | |
| pass | |
| def preprocess_image(self, image: Image.Image) -> Tuple[torch.Tensor, float]: | |
| """Preprocess image for inference with timing""" | |
| start_time = time.time() | |
| # Convert to RGB if needed | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Apply transforms | |
| img_tensor = self.transform(image) | |
| img_tensor = img_tensor.unsqueeze(0) # Add batch dimension | |
| preprocess_time = (time.time() - start_time) * 1000 | |
| return img_tensor, preprocess_time | |
| def infer(self, image: Image.Image, sensor_data: SensorData) -> InferenceResult: | |
| """Run inference with real or mock model""" | |
| self.warmup() | |
| # Preprocessing | |
| input_tensor, preprocess_time = self.preprocess_image(image) | |
| # Inference | |
| inference_start = time.time() | |
| if self.model_loaded and self.model is not None: | |
| # REAL MODEL INFERENCE | |
| try: | |
| input_tensor = input_tensor.to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(input_tensor) | |
| logits = outputs[0].cpu().numpy() | |
| except Exception as e: | |
| st.error(f"Inference error: {e}") | |
| # Fallback to mock | |
| logits = self._mock_inference_with_fusion(sensor_data) | |
| else: | |
| # MOCK INFERENCE with sensor fusion | |
| logits = self._mock_inference_with_fusion(sensor_data) | |
| # Simulate realistic inference time | |
| time.sleep(random.uniform(0.050, 0.150)) | |
| inference_time = (time.time() - inference_start) * 1000 | |
| # Postprocessing | |
| postprocess_start = time.time() | |
| # Softmax | |
| exp_logits = np.exp(logits - np.max(logits)) | |
| probs = exp_logits / np.sum(exp_logits) | |
| class_idx = int(np.argmax(probs)) | |
| waste_class = self.classes[class_idx] | |
| confidence = float(probs[class_idx]) | |
| prob_dict = {cls: float(probs[i]) for i, cls in enumerate(self.classes)} | |
| postprocess_time = (time.time() - postprocess_start) * 1000 | |
| # Update device metrics | |
| self.device_temp = min(85.0, self.device_temp + random.uniform(-1, 2)) | |
| self.cpu_usage = min(100.0, max(20.0, self.cpu_usage + random.uniform(-10, 15))) | |
| return InferenceResult( | |
| waste_class=waste_class, | |
| confidence=round(confidence, 4), | |
| inference_time_ms=round(inference_time, 2), | |
| preprocessing_time_ms=round(preprocess_time, 2), | |
| postprocessing_time_ms=round(postprocess_time, 2), | |
| probabilities=prob_dict, | |
| model_version=self.model_version, | |
| device_temperature=round(self.device_temp, 1), | |
| cpu_usage_percent=round(self.cpu_usage, 1) | |
| ) | |
| def _mock_inference_with_fusion(self, sensor_data: SensorData) -> np.ndarray: | |
| """Mock inference using multi-modal sensor fusion""" | |
| logits = np.random.randn(len(self.classes)) * 0.3 | |
| # Moisture-based classification | |
| moisture = sensor_data.moisture_percent / 100.0 | |
| if moisture > 0.5: | |
| logits[0] += 2.5 # High moisture -> organic | |
| elif moisture < 0.2: | |
| logits[1] += 1.5 # Low moisture -> recyclable | |
| else: | |
| logits[2] += 1.0 # Medium moisture -> non-recyclable | |
| # Weight-based refinement | |
| weight = sensor_data.weight_g | |
| if weight < 50: | |
| logits[1] += 0.5 # Light items often recyclable | |
| elif weight > 300: | |
| logits[2] += 0.5 # Heavy items often non-recyclable | |
| # Distance-based confidence adjustment | |
| distance_factor = 1.0 - (sensor_data.distance_cm / 100.0) | |
| logits *= (0.7 + 0.3 * distance_factor) | |
| return logits | |
| # ==================== REALISTIC MOTOR CONTROL ==================== | |
| class ServoMotor: | |
| """MG996R Servo Motor Simulation with realistic physics""" | |
| # Bin routing angles | |
| BIN_POSITIONS = { | |
| "organic": 0, | |
| "recyclable": 90, | |
| "non-recyclable": 180 | |
| } | |
| def __init__(self): | |
| self.current_angle = 90 | |
| self.target_angle = 90 | |
| self.is_moving = False | |
| self.speed_deg_per_sec = HardwareConfig.SERVO_SPEED / 0.17 | |
| self.voltage = 5.0 | |
| self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE | |
| self.position_history = [] | |
| def calculate_movement_profile(self, start: int, end: int, duration_s: float) -> List[Tuple[float, int]]: | |
| """Calculate realistic servo movement with acceleration/deceleration""" | |
| profile = [] | |
| steps = max(10, int(duration_s * 50)) | |
| for i in range(steps + 1): | |
| t = i / steps | |
| # S-curve motion profile | |
| if t < 0.3: | |
| progress = (t / 0.3) ** 2 * 0.3 | |
| elif t > 0.7: | |
| progress = 1.0 - ((1.0 - t) / 0.3) ** 2 * 0.3 | |
| else: | |
| progress = 0.3 + (t - 0.3) * (0.4 / 0.4) | |
| angle = int(start + (end - start) * progress) | |
| time_ms = t * duration_s * 1000 | |
| profile.append((round(time_ms, 1), angle)) | |
| return profile | |
| def move_to_bin(self, waste_class: str) -> MotorControl: | |
| """Move servo to route waste to correct bin with realistic physics""" | |
| target = self.BIN_POSITIONS.get(waste_class, 90) | |
| start_angle = self.current_angle | |
| start_time = time.time() | |
| self.is_moving = True | |
| self.target_angle = target | |
| self.current_ma = HardwareConfig.SERVO_CURRENT_MOVING | |
| angle_distance = abs(target - start_angle) | |
| duration_s = (angle_distance / 60.0) * 0.17 | |
| movement_profile = self.calculate_movement_profile(start_angle, target, duration_s) | |
| # Simulate movement | |
| time.sleep(duration_s * 0.3) | |
| avg_current = (HardwareConfig.SERVO_CURRENT_MOVING + HardwareConfig.SERVO_CURRENT_IDLE) / 2 | |
| power_mw = self.voltage * avg_current | |
| torque_applied = HardwareConfig.SERVO_TORQUE * 0.7 | |
| self.current_angle = target | |
| self.is_moving = False | |
| self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE | |
| actual_duration = (time.time() - start_time) * 1000 | |
| return MotorControl( | |
| target_angle=target, | |
| current_angle=target, | |
| start_angle=start_angle, | |
| status="completed", | |
| duration_ms=round(actual_duration, 2), | |
| power_consumption_mw=round(power_mw, 2), | |
| torque_applied=round(torque_applied, 2), | |
| movement_profile=movement_profile | |
| ) | |
| def get_diagnostics(self) -> dict: | |
| """Get motor diagnostic information""" | |
| return { | |
| 'current_angle': self.current_angle, | |
| 'target_angle': self.target_angle, | |
| 'is_moving': self.is_moving, | |
| 'current_ma': self.current_ma, | |
| 'voltage': self.voltage, | |
| 'temperature_c': 35.0 + random.uniform(-2, 5) | |
| } | |
| # ==================== CAMERA SIMULATION ==================== | |
| class CameraModule: | |
| """Raspberry Pi Camera Module v2 Simulation""" | |
| def __init__(self): | |
| self.resolution = HardwareConfig.CAMERA_RESOLUTION | |
| self.is_warmed_up = False | |
| self.frame_count = 0 | |
| def warmup(self): | |
| """Camera warmup (auto-exposure, white balance)""" | |
| if not self.is_warmed_up: | |
| time.sleep(HardwareConfig.CAMERA_WARMUP_TIME * 0.1) | |
| self.is_warmed_up = True | |
| def capture_image(self, uploaded_image: Optional[Image.Image] = None) -> Tuple[Image.Image, float]: | |
| """Capture image with realistic timing""" | |
| self.warmup() | |
| start_time = time.time() | |
| time.sleep(HardwareConfig.CAMERA_CAPTURE_TIME * 0.5) | |
| if uploaded_image: | |
| image = uploaded_image | |
| else: | |
| image = self._generate_synthetic_waste() | |
| capture_time = (time.time() - start_time) * 1000 | |
| self.frame_count += 1 | |
| return image, capture_time | |
| def _generate_synthetic_waste(self) -> Image.Image: | |
| """Generate synthetic waste image for testing""" | |
| colors = [ | |
| (139, 90, 43), # Brown (organic) | |
| (200, 200, 200), # Gray (recyclable) | |
| (50, 50, 50) # Dark (non-recyclable) | |
| ] | |
| color = random.choice(colors) | |
| color = tuple(max(0, min(255, c + random.randint(-30, 30))) for c in color) | |
| image = Image.new('RGB', (224, 224), color=color) | |
| return image | |
| # ==================== DATA LOGGER ==================== | |
| class DataLogger: | |
| """Logs all events, telemetry, and hardware diagnostics to SQLite""" | |
| def __init__(self, conn): | |
| self.conn = conn | |
| self.lock = threading.Lock() | |
| def log_event(self, sensor_data: SensorData, inference: InferenceResult, | |
| motor: MotorControl, image_name: str = "sample.jpg"): | |
| """Log complete segregation event with hardware metrics""" | |
| with self.lock: | |
| cursor = self.conn.cursor() | |
| total_time = (sensor_data.total_read_time_ms + | |
| inference.preprocessing_time_ms + | |
| inference.inference_time_ms + | |
| inference.postprocessing_time_ms + | |
| motor.duration_ms) | |
| cursor.execute(''' | |
| INSERT INTO events ( | |
| timestamp, image_name, waste_class, confidence, | |
| distance_cm, moisture_raw, moisture_percent, weight_g, | |
| servo_angle, inference_time_ms, total_pipeline_time_ms, | |
| status, power_consumption_mw | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| sensor_data.timestamp, | |
| image_name, | |
| inference.waste_class, | |
| inference.confidence, | |
| sensor_data.distance_cm, | |
| sensor_data.moisture_raw.raw_value, | |
| sensor_data.moisture_percent, | |
| sensor_data.weight_g, | |
| motor.target_angle, | |
| inference.inference_time_ms, | |
| total_time, | |
| motor.status, | |
| motor.power_consumption_mw | |
| )) | |
| self.conn.commit() | |
| def log_telemetry(self, component: str, data: dict): | |
| """Log component telemetry""" | |
| with self.lock: | |
| cursor = self.conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO telemetry (timestamp, component, data) | |
| VALUES (?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| component, | |
| json.dumps(data) | |
| )) | |
| self.conn.commit() | |
| def log_hardware_diagnostics(self, component: str, status: str, | |
| voltage: float = 0, current_ma: float = 0, | |
| temperature: float = 0, error_count: int = 0): | |
| """Log hardware diagnostics""" | |
| with self.lock: | |
| cursor = self.conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO hardware_diagnostics | |
| (timestamp, component, status, voltage, current_ma, temperature_c, error_count) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| component, | |
| status, | |
| voltage, | |
| current_ma, | |
| temperature, | |
| error_count | |
| )) | |
| self.conn.commit() | |
| def get_recent_events(self, limit: int = 10) -> List[dict]: | |
| """Retrieve recent events""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(''' | |
| SELECT * FROM events | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ''', (limit,)) | |
| columns = [desc[0] for desc in cursor.description] | |
| return [dict(zip(columns, row)) for row in cursor.fetchall()] | |
| def get_statistics(self) -> dict: | |
| """Get overall statistics""" | |
| cursor = self.conn.cursor() | |
| # Class distribution | |
| cursor.execute(''' | |
| SELECT waste_class, COUNT(*) as count | |
| FROM events | |
| GROUP BY waste_class | |
| ''') | |
| class_dist = dict(cursor.fetchall()) | |
| # Average confidence | |
| cursor.execute('SELECT AVG(confidence) FROM events') | |
| avg_conf = cursor.fetchone()[0] or 0 | |
| # Average pipeline time | |
| cursor.execute('SELECT AVG(total_pipeline_time_ms) FROM events') | |
| avg_time = cursor.fetchone()[0] or 0 | |
| # Total events | |
| cursor.execute('SELECT COUNT(*) FROM events') | |
| total = cursor.fetchone()[0] | |
| # Total power consumption | |
| cursor.execute('SELECT SUM(power_consumption_mw) FROM events') | |
| total_power = cursor.fetchone()[0] or 0 | |
| return { | |
| 'total_events': total, | |
| 'class_distribution': class_dist, | |
| 'average_confidence': round(avg_conf, 4), | |
| 'average_pipeline_time_ms': round(avg_time, 2), | |
| 'total_power_consumption_mwh': round(total_power / 3600000, 4) | |
| } | |
| def get_hardware_diagnostics(self, limit: int = 20) -> List[dict]: | |
| """Get recent hardware diagnostics""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(''' | |
| SELECT * FROM hardware_diagnostics | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ''', (limit,)) | |
| columns = [desc[0] for desc in cursor.description] | |
| return [dict(zip(columns, row)) for row in cursor.fetchall()] | |
| # ==================== STREAMLIT DASHBOARD ==================== | |
| def main(): | |
| st.set_page_config( | |
| page_title="Smart Waste Segregation System", | |
| page_icon="♻️", | |
| layout="wide" | |
| ) | |
| # Initialize components | |
| if 'db_conn' not in st.session_state: | |
| st.session_state.db_conn = init_database() | |
| if 'sensor_hub' not in st.session_state: | |
| st.session_state.sensor_hub = SensorHub() | |
| if 'edge_ai' not in st.session_state: | |
| st.session_state.edge_ai = EdgeAIProcessor() | |
| if 'servo' not in st.session_state: | |
| st.session_state.servo = ServoMotor() | |
| if 'camera' not in st.session_state: | |
| st.session_state.camera = CameraModule() | |
| if 'logger' not in st.session_state: | |
| st.session_state.logger = DataLogger(st.session_state.db_conn) | |
| if 'execution_log' not in st.session_state: | |
| st.session_state.execution_log = [] | |
| # Header | |
| st.title("♻️ Smart Waste Segregation System - Hardware Simulator") | |
| st.markdown("**IDEA-ONE Hackathon | Team Trackify | Edge AI Deployment Demo**") | |
| # Model status indicator | |
| if st.session_state.edge_ai.model_loaded: | |
| st.success(f"✅ Real Model Loaded: {st.session_state.edge_ai.model_version}") | |
| else: | |
| st.warning("⚠️ Using Mock Inference (Model not found)") | |
| st.markdown("---") | |
| # Hardware status bar | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| st.metric("🌡️ Device Temp", f"{st.session_state.edge_ai.device_temp:.1f}°C") | |
| with col2: | |
| st.metric("💻 CPU Usage", f"{st.session_state.edge_ai.cpu_usage:.1f}%") | |
| with col3: | |
| st.metric("⚡ Servo Angle", f"{st.session_state.servo.current_angle}°") | |
| with col4: | |
| st.metric("📷 Frames", st.session_state.camera.frame_count) | |
| with col5: | |
| st.metric("🔌 Servo Current", f"{st.session_state.servo.current_ma} mA") | |
| st.markdown("---") | |
| # Tabs | |
| tab1, tab2, tab3, tab4, tab5 = st.tabs([ | |
| "🔄 Live Pipeline", | |
| "📊 Dashboard", | |
| "📋 Event Logs", | |
| "🔧 Hardware Diagnostics", | |
| "📡 Sensor Details" | |
| ]) | |
| # ==================== TAB 1: LIVE PIPELINE ==================== | |
| with tab1: | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| st.subheader("⚙️ Sensor Configuration") | |
| st.markdown("##### Ultrasonic Sensor (HC-SR04)") | |
| distance = st.slider("Distance (cm)", 2.0, 60.0, 25.0, 0.5) | |
| st.markdown("##### Capacitive Moisture Sensor") | |
| moisture = st.slider("Moisture (%)", 0.0, 100.0, 30.0, 1.0) / 100.0 | |
| st.markdown("##### Load Cell (HX711)") | |
| weight = st.slider("Weight (g)", 0.0, 1000.0, 150.0, 1.0) | |
| st.session_state.sensor_hub.set_baseline(distance, moisture, weight) | |
| st.markdown("---") | |
| # Image upload | |
| st.subheader("📷 Camera Input") | |
| uploaded_file = st.file_uploader( | |
| "Upload waste image (optional)", | |
| type=['jpg', 'png', 'jpeg'] | |
| ) | |
| if uploaded_file: | |
| image = Image.open(uploaded_file) | |
| st.image(image, width=250, caption="Uploaded Image") | |
| else: | |
| st.info("No image uploaded. Will use synthetic waste image.") | |
| st.markdown("---") | |
| # Execute button | |
| if st.button("🚀 Run Complete Pipeline", type="primary", use_container_width=True): | |
| st.session_state.execution_log = [] | |
| with st.spinner("Processing..."): | |
| pipeline_start = time.time() | |
| # Step 1: Camera capture | |
| st.session_state.execution_log.append("📷 Capturing image from camera...") | |
| try: | |
| if uploaded_file: | |
| image = Image.open(uploaded_file) | |
| capture_time = 0 | |
| else: | |
| image, capture_time = st.session_state.camera.capture_image() | |
| st.session_state.execution_log.append(f" ✓ Image captured ({capture_time:.2f}ms)") | |
| except Exception as e: | |
| st.session_state.execution_log.append(f" ⚠️ Image capture failed: {e}") | |
| st.session_state.execution_log.append(" 🔄 Generating synthetic image...") | |
| image = st.session_state.camera._generate_synthetic_waste() | |
| capture_time = 0 | |
| # Step 2: Sensor reading | |
| st.session_state.execution_log.append("📡 Reading sensor suite...") | |
| sensor_data = st.session_state.sensor_hub.read_all_sensors() | |
| st.session_state.execution_log.append(f" ✓ Sensors read ({sensor_data.total_read_time_ms:.2f}ms)") | |
| st.session_state.execution_log.append(f" - Distance: {sensor_data.distance_cm} cm") | |
| st.session_state.execution_log.append(f" - Moisture: {sensor_data.moisture_percent:.1f}%") | |
| st.session_state.execution_log.append(f" - Weight: {sensor_data.weight_g} g") | |
| # Log sensor telemetry | |
| st.session_state.logger.log_telemetry("sensors", { | |
| 'distance': asdict(sensor_data.distance_raw), | |
| 'moisture': asdict(sensor_data.moisture_raw), | |
| 'weight': asdict(sensor_data.weight_raw), | |
| 'temperature': sensor_data.temperature | |
| }) | |
| # Step 3: AI Inference | |
| st.session_state.execution_log.append("🤖 Running edge AI inference...") | |
| inference = st.session_state.edge_ai.infer(image, sensor_data) | |
| st.session_state.execution_log.append(f" ✓ Inference complete ({inference.inference_time_ms:.2f}ms)") | |
| st.session_state.execution_log.append(f" - Preprocessing: {inference.preprocessing_time_ms:.2f}ms") | |
| st.session_state.execution_log.append(f" - Model inference: {inference.inference_time_ms:.2f}ms") | |
| st.session_state.execution_log.append(f" - Postprocessing: {inference.postprocessing_time_ms:.2f}ms") | |
| st.session_state.execution_log.append(f" - Classification: {inference.waste_class} ({inference.confidence*100:.2f}%)") | |
| # Log AI telemetry | |
| st.session_state.logger.log_telemetry("edge_ai", asdict(inference)) | |
| # Step 4: Motor control | |
| st.session_state.execution_log.append(f"⚙️ Actuating servo to {inference.waste_class} bin...") | |
| motor = st.session_state.servo.move_to_bin(inference.waste_class) | |
| st.session_state.execution_log.append(f" ✓ Servo moved ({motor.duration_ms:.2f}ms)") | |
| st.session_state.execution_log.append(f" - Start angle: {motor.start_angle}°") | |
| st.session_state.execution_log.append(f" - Target angle: {motor.target_angle}°") | |
| st.session_state.execution_log.append(f" - Power consumption: {motor.power_consumption_mw:.2f} mW") | |
| # Log motor telemetry | |
| st.session_state.logger.log_telemetry("servo_motor", asdict(motor)) | |
| # Step 5: Log event | |
| st.session_state.execution_log.append("💾 Logging event to database...") | |
| image_name = uploaded_file.name if uploaded_file else "synthetic.jpg" | |
| st.session_state.logger.log_event(sensor_data, inference, motor, image_name) | |
| # Log hardware diagnostics | |
| st.session_state.logger.log_hardware_diagnostics( | |
| "ultrasonic", "operational", 3.3, 15, sensor_data.temperature, | |
| st.session_state.sensor_hub.ultrasonic.error_count | |
| ) | |
| st.session_state.logger.log_hardware_diagnostics( | |
| "servo_motor", "operational", 5.0, st.session_state.servo.current_ma, | |
| 35.0, 0 | |
| ) | |
| pipeline_time = (time.time() - pipeline_start) * 1000 | |
| st.session_state.execution_log.append(f"✅ Pipeline completed! Total time: {pipeline_time:.2f}ms") | |
| # Store results | |
| st.session_state.last_result = { | |
| 'sensor': sensor_data, | |
| 'inference': inference, | |
| 'motor': motor, | |
| 'image': image, | |
| 'pipeline_time': pipeline_time | |
| } | |
| st.success("✅ Segregation completed successfully!") | |
| st.balloons() | |
| with col2: | |
| st.subheader("🔄 Pipeline Execution Log") | |
| # Execution log | |
| if st.session_state.execution_log: | |
| log_container = st.container() | |
| with log_container: | |
| for log_entry in st.session_state.execution_log: | |
| if "✓" in log_entry or "✅" in log_entry: | |
| st.success(log_entry) | |
| elif "📡" in log_entry or "🤖" in log_entry or "⚙️" in log_entry or "📷" in log_entry: | |
| st.info(log_entry) | |
| else: | |
| st.write(log_entry) | |
| st.markdown("---") | |
| # Results | |
| if 'last_result' in st.session_state: | |
| res = st.session_state.last_result | |
| st.subheader("📊 Pipeline Results") | |
| # Display captured image | |
| col_img, col_metrics = st.columns([1, 2]) | |
| with col_img: | |
| st.image(res['image'], caption="Processed Image", use_container_width=True) | |
| with col_metrics: | |
| st.metric("🗑️ Classification", res['inference'].waste_class.upper()) | |
| st.metric("📊 Confidence", f"{res['inference'].confidence*100:.2f}%") | |
| st.metric("🤖 Inference Time", f"{res['inference'].inference_time_ms:.2f} ms") | |
| st.metric("⏱️ Total Pipeline Time", f"{res['pipeline_time']:.2f} ms") | |
| st.markdown("---") | |
| st.metric("⚙️ Servo Angle", f"{res['motor'].target_angle}°") | |
| st.metric("⚡ Power Used", f"{res['motor'].power_consumption_mw:.2f} mW") | |
| st.markdown("---") | |
| # Class probabilities | |
| st.markdown("#### 📈 Classification Probabilities") | |
| for cls, prob in res['inference'].probabilities.items(): | |
| st.progress(prob, text=f"{cls.capitalize()}: {prob*100:.2f}%") | |
| st.markdown("---") | |
| # Detailed telemetry | |
| st.subheader("📡 Detailed Component Telemetry") | |
| tel_col1, tel_col2, tel_col3 = st.columns(3) | |
| with tel_col1: | |
| with st.expander("📏 Sensor Readings", expanded=False): | |
| sensor_data = { | |
| 'distance_cm': res['sensor'].distance_cm, | |
| 'distance_raw_adc': res['sensor'].distance_raw.raw_value, | |
| 'moisture_percent': res['sensor'].moisture_percent, | |
| 'moisture_raw_adc': res['sensor'].moisture_raw.raw_value, | |
| 'weight_g': res['sensor'].weight_g, | |
| 'weight_raw_adc': res['sensor'].weight_raw.raw_value, | |
| 'temperature_c': res['sensor'].temperature, | |
| 'total_read_time_ms': res['sensor'].total_read_time_ms | |
| } | |
| st.json(sensor_data) | |
| with tel_col2: | |
| with st.expander("🤖 AI Inference", expanded=False): | |
| inference_data = { | |
| 'waste_class': res['inference'].waste_class, | |
| 'confidence': res['inference'].confidence, | |
| 'probabilities': res['inference'].probabilities, | |
| 'preprocessing_ms': res['inference'].preprocessing_time_ms, | |
| 'inference_ms': res['inference'].inference_time_ms, | |
| 'postprocessing_ms': res['inference'].postprocessing_time_ms, | |
| 'model_version': res['inference'].model_version, | |
| 'device_temp_c': res['inference'].device_temperature, | |
| 'cpu_usage_percent': res['inference'].cpu_usage_percent | |
| } | |
| st.json(inference_data) | |
| with tel_col3: | |
| with st.expander("⚙️ Motor Control", expanded=False): | |
| motor_data = { | |
| 'start_angle': res['motor'].start_angle, | |
| 'target_angle': res['motor'].target_angle, | |
| 'current_angle': res['motor'].current_angle, | |
| 'duration_ms': res['motor'].duration_ms, | |
| 'power_consumption_mw': res['motor'].power_consumption_mw, | |
| 'torque_applied_kgcm': res['motor'].torque_applied, | |
| 'status': res['motor'].status, | |
| 'movement_steps': len(res['motor'].movement_profile) | |
| } | |
| st.json(motor_data) | |
| else: | |
| st.info("👈 Configure sensors and click 'Run Complete Pipeline' to start") | |
| st.markdown("### 🗺️ System Architecture") | |
| st.markdown(""" | |
| **Hardware Components:** | |
| - 📷 **Camera**: Raspberry Pi Camera Module v2 (1920x1080) | |
| - 📏 **Ultrasonic**: HC-SR04 (2-400cm range) | |
| - 💧 **Moisture**: Capacitive sensor (12-bit ADC) | |
| - ⚖️ **Weight**: HX711 Load Cell (5kg capacity) | |
| - ⚙️ **Servo**: MG996R (180° rotation, 11 kg-cm torque) | |
| - 💻 **Edge Device**: Raspberry Pi 4 (4GB RAM, Quad-core) | |
| **Pipeline Flow:** | |
| 1. Camera captures waste image | |
| 2. Sensors read physical properties | |
| 3. Edge AI processes multi-modal data | |
| 4. Servo routes waste to correct bin | |
| 5. Event logged to database | |
| """) | |
| # ==================== TAB 2: DASHBOARD ==================== | |
| with tab2: | |
| st.subheader("📊 System Statistics") | |
| stats = st.session_state.logger.get_statistics() | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Events", stats['total_events']) | |
| with col2: | |
| st.metric("Avg Confidence", f"{stats['average_confidence']*100:.2f}%") | |
| with col3: | |
| st.metric("Avg Pipeline Time", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms") | |
| with col4: | |
| st.metric("Total Power Used", f"{stats.get('total_power_consumption_mwh', 0):.4f} mWh") | |
| st.markdown("---") | |
| # Class distribution chart | |
| col_chart1, col_chart2 = st.columns(2) | |
| with col_chart1: | |
| st.subheader("📈 Waste Class Distribution") | |
| if stats['class_distribution']: | |
| import pandas as pd | |
| df = pd.DataFrame( | |
| list(stats['class_distribution'].items()), | |
| columns=['Class', 'Count'] | |
| ) | |
| st.bar_chart(df.set_index('Class')) | |
| else: | |
| st.info("No data yet. Run some segregations first!") | |
| with col_chart2: | |
| st.subheader("⚡ System Performance") | |
| if stats['total_events'] > 0: | |
| st.metric("Average Inference", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms") | |
| st.metric("System Efficiency", f"{stats['average_confidence']*100:.1f}%") | |
| st.metric("Power Efficiency", f"{stats.get('total_power_consumption_mwh', 0)*1000:.2f} mW") | |
| else: | |
| st.info("No performance data yet.") | |
| # ==================== TAB 3: EVENT LOGS ==================== | |
| with tab3: | |
| st.subheader("📋 Recent Events") | |
| num_events = st.slider("Number of events to display", 5, 50, 10) | |
| events = st.session_state.logger.get_recent_events(num_events) | |
| if events: | |
| import pandas as pd | |
| df = pd.DataFrame(events) | |
| display_cols = [ | |
| 'timestamp', 'waste_class', 'confidence', | |
| 'distance_cm', 'moisture_percent', 'weight_g', | |
| 'servo_angle', 'inference_time_ms', 'total_pipeline_time_ms', | |
| 'power_consumption_mw', 'status' | |
| ] | |
| available_cols = [col for col in display_cols if col in df.columns] | |
| st.dataframe( | |
| df[available_cols], | |
| use_container_width=True, | |
| hide_index=True | |
| ) | |
| # Download button | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| "📥 Download CSV", | |
| csv, | |
| "events.csv", | |
| "text/csv", | |
| use_container_width=True | |
| ) | |
| else: | |
| st.info("No events logged yet.") | |
| # ==================== TAB 4: HARDWARE DIAGNOSTICS ==================== | |
| with tab4: | |
| st.subheader("🔧 Hardware Diagnostics") | |
| diagnostics = st.session_state.logger.get_hardware_diagnostics(20) | |
| if diagnostics: | |
| import pandas as pd | |
| df = pd.DataFrame(diagnostics) | |
| st.dataframe( | |
| df[['timestamp', 'component', 'status', 'voltage', 'current_ma', 'temperature_c', 'error_count']], | |
| use_container_width=True, | |
| hide_index=True | |
| ) | |
| st.markdown("---") | |
| # Component status | |
| st.subheader("📊 Component Status") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown("##### Ultrasonic Sensor") | |
| st.write(f"Status: {st.session_state.sensor_hub.ultrasonic.status.value}") | |
| st.write(f"Errors: {st.session_state.sensor_hub.ultrasonic.error_count}") | |
| st.write(f"Last Reading: {st.session_state.sensor_hub.ultrasonic.last_reading} cm") | |
| with col2: | |
| st.markdown("##### Servo Motor") | |
| motor_diag = st.session_state.servo.get_diagnostics() | |
| st.write(f"Angle: {motor_diag['current_angle']}°") | |
| st.write(f"Current: {motor_diag['current_ma']} mA") | |
| st.write(f"Temperature: {motor_diag['temperature_c']:.1f}°C") | |
| with col3: | |
| st.markdown("##### Edge Device") | |
| st.write(f"CPU: {st.session_state.edge_ai.cpu_usage:.1f}%") | |
| st.write(f"Temperature: {st.session_state.edge_ai.device_temp:.1f}°C") | |
| st.write(f"Model: {st.session_state.edge_ai.model_version}") | |
| else: | |
| st.info("No diagnostics data yet.") | |
| # ==================== TAB 5: SENSOR DETAILS ==================== | |
| with tab5: | |
| st.subheader("📡 Detailed Sensor Information") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Ultrasonic Sensor (HC-SR04)") | |
| st.markdown(f""" | |
| - **Range**: {HardwareConfig.ULTRASONIC_MIN_DISTANCE} - {HardwareConfig.ULTRASONIC_MAX_DISTANCE} cm | |
| - **Accuracy**: ±{HardwareConfig.ULTRASONIC_ACCURACY} cm | |
| - **Read Time**: {HardwareConfig.ULTRASONIC_READ_TIME*1000:.1f} ms | |
| - **Operating Voltage**: 5V DC | |
| - **Current**: 15 mA | |
| - **Frequency**: 40 kHz | |
| """) | |
| st.markdown("### Capacitive Moisture Sensor") | |
| st.markdown(f""" | |
| - **Voltage Range**: {HardwareConfig.MOISTURE_MIN_VOLTAGE} - {HardwareConfig.MOISTURE_MAX_VOLTAGE} V | |
| - **ADC Resolution**: {HardwareConfig.MOISTURE_ADC_RESOLUTION} bits ({2**HardwareConfig.MOISTURE_ADC_RESOLUTION} levels) | |
| - **Read Time**: {HardwareConfig.MOISTURE_READ_TIME*1000:.1f} ms | |
| - **Operating Voltage**: 3.3V - 5.5V | |
| - **Current**: 5 mA | |
| """) | |
| with col2: | |
| st.markdown("### Load Cell (HX711)") | |
| st.markdown(f""" | |
| - **Capacity**: {HardwareConfig.WEIGHT_MAX_CAPACITY} g | |
| - **Resolution**: {HardwareConfig.WEIGHT_RESOLUTION} g | |
| - **Stabilization Time**: {HardwareConfig.WEIGHT_STABILIZATION_TIME*1000:.0f} ms | |
| - **Read Time**: {HardwareConfig.WEIGHT_READ_TIME*1000:.0f} ms | |
| - **ADC**: 24-bit | |
| - **Sample Rate**: 10/80 Hz | |
| """) | |
| st.markdown("### Servo Motor (MG996R)") | |
| st.markdown(f""" | |
| - **Rotation**: {HardwareConfig.SERVO_MIN_ANGLE}° - {HardwareConfig.SERVO_MAX_ANGLE}° | |
| - **Speed**: 60°/0.17s (at 4.8V) | |
| - **Torque**: {HardwareConfig.SERVO_TORQUE} kg-cm | |
| - **Operating Voltage**: 4.8V - 7.2V | |
| - **Current (Idle)**: {HardwareConfig.SERVO_CURRENT_IDLE} mA | |
| - **Current (Moving)**: {HardwareConfig.SERVO_CURRENT_MOVING} mA | |
| """) | |
| st.markdown("---") | |
| st.markdown("### 📷 Camera Module (Raspberry Pi Camera v2)") | |
| st.markdown(f""" | |
| - **Sensor**: Sony IMX219 8MP | |
| - **Resolution**: {HardwareConfig.CAMERA_RESOLUTION[0]}x{HardwareConfig.CAMERA_RESOLUTION[1]} | |
| - **Capture Time**: {HardwareConfig.CAMERA_CAPTURE_TIME*1000:.0f} ms | |
| - **Warmup Time**: {HardwareConfig.CAMERA_WARMUP_TIME:.1f} s | |
| - **Interface**: CSI (Camera Serial Interface) | |
| - **Frame Rate**: 30 fps (1080p) | |
| """) | |
| st.markdown("### 💻 Edge Device (Raspberry Pi 4)") | |
| st.markdown(f""" | |
| - **CPU**: Quad-core Cortex-A72 @ 1.5GHz | |
| - **RAM**: {HardwareConfig.RAM_MB} MB | |
| - **Cores**: {HardwareConfig.CPU_CORES} | |
| - **Inference Overhead**: {HardwareConfig.INFERENCE_OVERHEAD*1000:.0f} ms | |
| - **Operating System**: Raspberry Pi OS (Linux) | |
| - **AI Framework**: PyTorch / ONNX Runtime | |
| """) | |
| # Footer | |
| st.markdown("---") | |
| st.caption("Smart Waste Segregation System | LDRP-ITR, Gandhinagar | IDEA-ONE Hackathon 2025") | |
| st.caption("Hardware-Realistic Simulation | Edge AI Deployment | Multi-Modal Sensor Fusion") | |
| if __name__ == "__main__": | |
| main() |