""" Data Processing and Validation Module This module provides comprehensive data processing capabilities for TimesFM, including CSV loading, covariate preparation, and data validation. Key Features: - CSV data loading with flexible column configuration - Automatic data type inference and conversion - Covariates data preparation and validation - Data structure formatting for TimesFM input requirements - Support for dynamic and static, numerical and categorical covariates """ import pandas as pd import numpy as np import json import logging from typing import Dict, List, Union, Optional, Tuple, Any from datetime import datetime, timedelta logger = logging.getLogger(__name__) class DataProcessor: """ Handles data loading, processing, and validation for TimesFM forecasting. This class provides methods to load CSV data, process covariates according to TimesFM requirements, and validate data structures before forecasting. Example: >>> processor = DataProcessor() >>> data = processor.load_csv_data("data.csv", data_definition) >>> inputs, covariates = processor.prepare_forecast_data( ... data, context_len=100, horizon_len=24 ... ) """ def __init__(self): """Initialize the DataProcessor.""" self.data = None self.data_definition = None self.processed_data = None def load_csv_data( self, csv_file_path: str, data_definition: Union[str, Dict[str, str]] ) -> pd.DataFrame: """ Load CSV data with proper column type conversion based on data definition. Args: csv_file_path: Path to the CSV file data_definition: Either JSON file path or dictionary defining column types Returns: Loaded and processed DataFrame Raises: FileNotFoundError: If CSV or JSON file not found ValueError: If data definition is invalid """ logger.info(f"Loading CSV data from: {csv_file_path}") # Load data definition if isinstance(data_definition, str): with open(data_definition, 'r') as f: self.data_definition = json.load(f) else: self.data_definition = data_definition.copy() logger.info(f"Data definition: {self.data_definition}") # Load CSV try: self.data = pd.read_csv(csv_file_path).dropna(axis=0) logger.info(f"Loaded CSV with shape: {self.data.shape}") logger.info(f"Columns: {list(self.data.columns)}") except FileNotFoundError: raise FileNotFoundError(f"CSV file not found: {csv_file_path}") # Validate that 'date' column exists if 'date' not in self.data.columns: raise ValueError("CSV file must contain a 'date' column as the first column") # Convert date column self.data['date'] = pd.to_datetime(self.data['date']) logger.info(f"Date range: {self.data['date'].min()} to {self.data['date'].max()}") # Apply data type conversions based on definition self._apply_data_types() # Validate data definition self._validate_data_definition() logger.info("✅ CSV data loaded and processed successfully") return self.data.copy() def _apply_data_types(self) -> None: """Apply appropriate data types based on the data definition.""" logger.info("Applying data type conversions...") for column, data_type in self.data_definition.items(): if column not in self.data.columns: logger.warning(f"Column '{column}' in data definition not found in CSV") continue try: if data_type in ['target', 'dynamic_numerical', 'static_numerical']: # Convert to float self.data[column] = pd.to_numeric(self.data[column], errors='coerce') logger.info(f"Converted '{column}' to numerical (float)") elif data_type in ['dynamic_categorical', 'static_categorical']: # Convert to string self.data[column] = self.data[column].astype(str) logger.info(f"Converted '{column}' to categorical (string)") else: logger.warning(f"Unknown data type '{data_type}' for column '{column}'") except Exception as e: logger.error(f"Failed to convert column '{column}': {str(e)}") raise def _validate_data_definition(self) -> None: """Validate the data definition against the loaded data.""" logger.info("Validating data definition...") # Check for required data types target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target'] if not target_columns: raise ValueError("Data definition must contain at least one 'target' column") if len(target_columns) > 1: logger.warning(f"Multiple target columns found: {target_columns}. Using first one for univariate forecasting.") # Validate column existence missing_columns = set(self.data_definition.keys()) - set(self.data.columns) if missing_columns: raise ValueError(f"Columns defined in data_definition but missing from CSV: {missing_columns}") # Check for data quality issues for column in target_columns: if self.data[column].isnull().any(): null_count = self.data[column].isnull().sum() logger.warning(f"Target column '{column}' has {null_count} null values") logger.info("✅ Data definition validation passed") def prepare_forecast_data( self, data: pd.DataFrame, context_len: int, horizon_len: int, target_column: Optional[str] = None ) -> Tuple[List[float], Dict[str, Any]]: """ Prepare data for TimesFM forecasting with covariates. Args: data: Input DataFrame context_len: Length of context window for forecasting horizon_len: Length of forecast horizon target_column: Target column name (auto-detected if None) Returns: Tuple of (target_inputs, covariates_dict) Raises: ValueError: If insufficient data or invalid configuration """ logger.info(f"Preparing forecast data (context: {context_len}, horizon: {horizon_len})") # Auto-detect target column if not specified if target_column is None: target_columns = [col for col, dtype in self.data_definition.items() if dtype == 'target'] if not target_columns: raise ValueError("No target column found in data definition") target_column = target_columns[0] logger.info(f"Using target column: {target_column}") # Validate data length - only need context_len for the data if len(data) < context_len: raise ValueError(f"Insufficient data: need {context_len} points, have {len(data)}") # Prepare target inputs using the most recent context window target_series = data[target_column].values context_start = max(0, len(data) - context_len) context_end = len(data) # Use last context_len periods target_inputs = target_series[context_start:context_end].tolist() logger.info(f"Target data preparation:") logger.info(f" - Target column: {target_column}") logger.info(f" - Context start index: {context_start}") logger.info(f" - Context end index: {context_end}") logger.info(f" - Target inputs length: {len(target_inputs)}") logger.info(f" - Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}") # Prepare covariates covariates = self._prepare_covariates(data, context_len, horizon_len) logger.info(f"✅ Prepared forecast data:") logger.info(f" Target inputs length: {len(target_inputs)}") logger.info(f" Target range: {min(target_inputs):.2f} - {max(target_inputs):.2f}") logger.info(f" Covariates: {list(covariates.keys())}") return target_inputs, covariates def _prepare_covariates( self, data: pd.DataFrame, context_len: int, horizon_len: int ) -> Dict[str, Dict[str, List]]: """ Prepare covariates data structure for TimesFM. Args: data: Input DataFrame context_len: Context window length horizon_len: Forecast horizon length Returns: Dictionary containing organized covariates """ covariates = { 'dynamic_numerical_covariates': {}, 'dynamic_categorical_covariates': {}, 'static_numerical_covariates': {}, 'static_categorical_covariates': {} } # For dynamic covariates, we need context_len + horizon_len total periods # Context period: last context_len periods of available data # Horizon period: horizon_len periods (padded with last known values) total_len = context_len + horizon_len logger.info(f"Covariate preparation debug:") logger.info(f" - Data length: {len(data)}") logger.info(f" - Context length: {context_len}") logger.info(f" - Horizon length: {horizon_len}") logger.info(f" - Total periods needed: {total_len}") logger.info(f" - Data date range: {data['date'].min()} to {data['date'].max()}") logger.info(f" - Context period: last {context_len} periods of data") logger.info(f" - Horizon period: {horizon_len} periods (padded with last known values)") for column, data_type in self.data_definition.items(): if column == 'date' or data_type == 'target': continue if data_type == 'dynamic_numerical': # Dynamic numerical: need context + horizon values # Context: last context_len periods of available data # Horizon: horizon_len periods (padded with last known value) if len(data) < context_len: logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}") continue # Get context values (last context_len periods) context_values = data[column].iloc[-context_len:].tolist() # Get horizon values (pad with last known value) last_value = context_values[-1] horizon_values = [last_value] * horizon_len # Combine context + horizon values = context_values + horizon_values covariates['dynamic_numerical_covariates'][column] = [values] logger.info(f"Added dynamic numerical covariate '{column}': {len(values)} values") logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)") logger.info(f" - Horizon period: {len(horizon_values)} values (padded with {last_value})") elif data_type == 'dynamic_categorical': # Dynamic categorical: need context + horizon values # Context: last context_len periods of available data # Horizon: horizon_len periods (padded with last known value) if len(data) < context_len: logger.warning(f"Insufficient data for dynamic covariate '{column}': need {context_len} for context, have {len(data)}") continue # Get context values (last context_len periods) context_values = data[column].astype(str).iloc[-context_len:].tolist() # Get horizon values (pad with last known value) last_value = context_values[-1] horizon_values = [last_value] * horizon_len # Combine context + horizon values = context_values + horizon_values covariates['dynamic_categorical_covariates'][column] = [values] logger.info(f"Added dynamic categorical covariate '{column}': {len(values)} values") logger.info(f" - Context period: {len(context_values)} values (last {context_len} periods)") logger.info(f" - Horizon period: {len(horizon_values)} values (padded with '{last_value}')") elif data_type == 'static_numerical': # Static numerical: single value per time series value = float(data[column].iloc[0]) covariates['static_numerical_covariates'][column] = [value] logger.info(f"Added static numerical covariate '{column}': {value}") elif data_type == 'static_categorical': # Static categorical: single value per time series value = str(data[column].iloc[0]) covariates['static_categorical_covariates'][column] = [value] logger.info(f"Added static categorical covariate '{column}': {value}") # Remove empty covariate types covariates = {k: v for k, v in covariates.items() if v} return covariates def get_data_summary(self) -> Dict[str, Any]: """ Get a summary of the loaded data. Returns: Dictionary containing data summary statistics """ if self.data is None: return {"status": "No data loaded"} summary = { "status": "loaded", "shape": self.data.shape, "date_range": { "start": str(self.data['date'].min().date()), "end": str(self.data['date'].max().date()), "total_periods": len(self.data) }, "columns": list(self.data.columns), "data_definition": self.data_definition } # Add column-specific statistics column_stats = {} for column in self.data.columns: if column == 'date': continue col_data = self.data[column] data_type = self.data_definition.get(column, 'unknown') if data_type in ['target', 'dynamic_numerical', 'static_numerical']: column_stats[column] = { "type": data_type, "dtype": str(col_data.dtype), "min": float(col_data.min()) if not col_data.isnull().all() else None, "max": float(col_data.max()) if not col_data.isnull().all() else None, "mean": float(col_data.mean()) if not col_data.isnull().all() else None, "null_count": int(col_data.isnull().sum()) } else: column_stats[column] = { "type": data_type, "dtype": str(col_data.dtype), "unique_values": int(col_data.nunique()), "null_count": int(col_data.isnull().sum()), "sample_values": col_data.dropna().unique()[:5].tolist() } summary["column_statistics"] = column_stats return summary def validate_forecast_inputs( self, inputs: List[float], covariates: Dict[str, Any], context_len: int, horizon_len: int ) -> bool: """ Validate that forecast inputs are properly formatted for TimesFM. Args: inputs: Target time series inputs covariates: Covariates dictionary context_len: Expected context length horizon_len: Expected horizon length Returns: True if validation passes Raises: ValueError: If validation fails """ logger.info("Validating forecast inputs...") # Validate inputs length if len(inputs) != context_len: raise ValueError(f"Input length {len(inputs)} doesn't match context_len {context_len}") # Validate inputs are numeric if not all(isinstance(x, (int, float)) and not np.isnan(x) for x in inputs): raise ValueError("All inputs must be numeric and non-NaN") # Validate covariates structure total_len = context_len + horizon_len for cov_type, cov_dict in covariates.items(): if cov_type in ['dynamic_numerical_covariates', 'dynamic_categorical_covariates']: for name, values_list in cov_dict.items(): if len(values_list) != 1: raise ValueError(f"Dynamic covariate '{name}' must have exactly 1 time series") if len(values_list[0]) != total_len: raise ValueError(f"Dynamic covariate '{name}' must have {total_len} values, got {len(values_list[0])}") elif cov_type in ['static_numerical_covariates', 'static_categorical_covariates']: for name, values_list in cov_dict.items(): if len(values_list) != 1: raise ValueError(f"Static covariate '{name}' must have exactly 1 value") logger.info("✅ Forecast inputs validation passed") return True def create_sample_data_definition(self, output_path: str) -> None: """ Create a sample data definition JSON file. Args: output_path: Path where to save the sample JSON file """ sample_definition = { "btc_price": "target", "eth_price": "dynamic_numerical", "vix_index": "dynamic_numerical", "sp500_price": "dynamic_numerical", "quarter": "dynamic_categorical", "asset_category": "static_categorical", "base_price": "static_numerical" } with open(output_path, 'w') as f: json.dump(sample_definition, f, indent=2) logger.info(f"Sample data definition saved to: {output_path}") print(f"Sample data definition structure:") print(json.dumps(sample_definition, indent=2)) def prepare_visualization_data( processed_data: pd.DataFrame, target_inputs: Union[List[float], List[List[float]]], target_column: str, context_len: int, horizon_len: int, extended_data: Optional[pd.DataFrame] = None ) -> Dict[str, Any]: """ Centralized function to prepare visualization data from processed data. This function creates the visualization data structure used by both the webapp and notebook for consistent data handling. Args: processed_data: Processed DataFrame with date column target_inputs: Target input data used for forecasting (flattenable to a single series) target_column: Name of the target column context_len: Context length used for forecasting horizon_len: Horizon length for forecasting Returns: Dictionary containing visualization data with keys: - 'historical_data': Context window used for forecasting (chronologically ordered) - 'dates_historical': Corresponding historical dates - 'dates_future': Future dates aligned with the forecast horizon - 'target_name': Name of the target column - 'actual_future': Optional actual values for the forecast horizon (if available) """ if processed_data.empty: return { 'historical_data': [], 'dates_historical': [], 'dates_future': [], 'target_name': target_column, 'actual_future': [] } # Work on a chronologically sorted copy to ensure alignment df = processed_data.dropna(axis=0).sort_values('date').reset_index(drop=True) # Flatten target inputs (they may arrive as List[List[float]] or List[float]) if isinstance(target_inputs, (list, tuple)) and target_inputs: if isinstance(target_inputs[0], (list, tuple, np.ndarray)): target_inputs_flat = list(target_inputs[0]) else: target_inputs_flat = list(target_inputs) else: target_inputs_flat = [] # Respect the actual context length used context_len_effective = len(target_inputs_flat) or context_len available_len = len(df) # Use target_inputs as historical data to ensure exact alignment with forecasting # This guarantees that the historical data in visualization matches what was used for forecasting if target_inputs_flat: historical_slice = list(map(float, target_inputs_flat)) # For dates, we need to find the corresponding dates for the target_inputs # Since target_inputs represents the last context_len periods used for forecasting, # we need to find the dates that correspond to those exact data points if len(df) >= context_len_effective: # Get the dates for the last context_len periods (same as target_inputs) dates_historical = df['date'].iloc[-context_len_effective:].tolist() else: # If we don't have enough data, use what we have dates_historical = df['date'].tolist() logger.info(f"Using target_inputs for historical data to ensure forecasting alignment") else: # Fallback to data-based extraction if target_inputs not available if len(df) >= context_len_effective: historical_slice = df[target_column].iloc[-context_len_effective:].astype(float).tolist() dates_historical = df['date'].iloc[-context_len_effective:].tolist() else: historical_slice = df[target_column].astype(float).tolist() dates_historical = df['date'].tolist() logger.info(f"Using data-based extraction for historical data") logger.info(f"Visualization data preparation:") logger.info(f" - Processed data shape: {df.shape}") logger.info(f" - Target column: {target_column}") logger.info(f" - Context length effective: {context_len_effective}") logger.info(f" - Historical slice length: {len(historical_slice)}") logger.info(f" - Target inputs flat length: {len(target_inputs_flat)}") logger.info(f" - Dates historical length: {len(dates_historical)}") logger.info(f" - Historical data range: {min(historical_slice) if historical_slice else 'N/A'} to {max(historical_slice) if historical_slice else 'N/A'}") if dates_historical: logger.info(f" - First historical date: {dates_historical[0]}") logger.info(f" - Last historical date: {dates_historical[-1]}") # For future dates, we need to generate them since we only have context data # Extract actual future values when present (useful for overlaying actuals) # The actual future values should start from the day after the last historical date # Use extended_data if available (includes horizon period), otherwise use df data_for_future_extraction = extended_data if extended_data is not None else df if len(data_for_future_extraction) > context_len_effective and dates_historical: # Find the last historical date (this is the context end date) last_historical_date = dates_historical[-1] # Find data points that come after the last historical date future_mask = data_for_future_extraction['date'] > last_historical_date future_data = data_for_future_extraction[future_mask] if len(future_data) > 0: # Take only the first horizon_len periods of future data future_slice = future_data[target_column].iloc[:horizon_len].astype(float).tolist() dates_future = future_data['date'].iloc[:horizon_len].tolist() logger.info(f"Actual future values extracted:") logger.info(f" - Data for extraction length: {len(data_for_future_extraction)}") logger.info(f" - Context length effective: {context_len_effective}") logger.info(f" - Last historical date (context end): {last_historical_date}") logger.info(f" - Future data available: {len(future_data)} periods") logger.info(f" - Future slice length: {len(future_slice)}") logger.info(f" - Future dates length: {len(dates_future)}") if future_slice and dates_future: logger.info(f" - Future values range: {min(future_slice):.4f} to {max(future_slice):.4f}") logger.info(f" - First future date: {dates_future[0]}") logger.info(f" - Last future date: {dates_future[-1]}") else: future_slice = [] dates_future = [] logger.info("No actual future values available - no data after last historical date") else: # No actual future values available future_slice = [] dates_future = [] logger.info("No actual future values available - data doesn't extend beyond context period") if len(dates_future) < horizon_len: # Generate future dates if the dataset stops at the forecast boundary inferred_delta: Optional[pd.Timedelta] = None if len(dates_historical) >= 2: inferred_delta = dates_historical[-1] - dates_historical[-2] last_date = dates_historical[-1] if hasattr(last_date, 'to_pydatetime'): last_date = last_date.to_pydatetime() elif isinstance(last_date, np.datetime64): last_date = pd.to_datetime(last_date).to_pydatetime() step = inferred_delta if isinstance(inferred_delta, pd.Timedelta) and inferred_delta != pd.Timedelta(0) else timedelta(days=1) dates_future = [last_date + step * (i + 1) for i in range(horizon_len)] future_slice = [] # No actual future data in this case visualization_data = { 'historical_data': historical_slice, 'dates_historical': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_historical], 'dates_future': [d.isoformat() if hasattr(d, 'isoformat') else str(d) for d in dates_future], 'target_name': target_column, 'actual_future': future_slice } return visualization_data