FlexBoard / predictor.py
Grigori Fursin
first commit
4d2d78e unverified
"""Simplified performance predictor for MLPerf configurations using XGBoost."""
import logging
import random
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from utils import FEATURE_TYPES
logger = logging.getLogger(__name__)
class PerformancePredictor:
"""Predicts performance for hardware configurations."""
def __init__(self, dataset: pd.DataFrame, test_size: float = 0.2):
"""Initialize with benchmark dataset."""
self.df = dataset
self.model = None
self.target = "metrics.result_per_accelerator"
self.features = []
self.test_size = test_size
self.evaluation_data = pd.DataFrame()
self.evaluation_metrics = {}
self.feature_importance = pd.DataFrame(columns=["Feature", "Importance"])
self.excluded_features = {
"model.name",
"model.mlperf_name",
"software.framework",
"system.name",
}
self.excluded_features.update(
{
col
for col in dataset.columns
if col.startswith("submission.") or col.startswith("metrics.")
}
)
self.distributions = {}
self.max_accelerators = int(dataset["system.accelerator.total_count"].max())
self.max_gpu_memory = float(dataset["system.accelerator.memory_capacity"].max())
self.max_cpu_memory = float(dataset["system.memory.capacity"].max())
self.frameworks = sorted(
list(
set(
col.replace("software.framework.", "")
for col in dataset.columns
if col.startswith("software.framework.")
and col != "software.framework"
)
)
)
logger.info(
f"Found {len(self.frameworks)} unique frameworks: {', '.join(self.frameworks)}"
)
self._identify_features()
self._analyze_data_distributions()
self._train_model()
def _identify_features(self):
"""Identify features for model training."""
all_columns = set(self.df.columns)
available_features = all_columns - self.excluded_features - {self.target}
self.features = [f for f in available_features if not self.df[f].isna().all()]
logger.info(f"Identified {len(self.features)} features for model training")
def _analyze_data_distributions(self):
"""Analyze feature distributions for realistic data generation."""
categorical_features = {
col
for col in self.df.columns
if self.df[col].dtype == "object"
or col in FEATURE_TYPES.get("categorical", [])
}
for feature in categorical_features:
values = self.df[feature].dropna().tolist()
if values:
counter = Counter(values)
total = sum(counter.values())
self.distributions[feature] = {
value: count / total for value, count in counter.items()
}
continuous_features = {
col
for col in self.df.columns
if col in FEATURE_TYPES.get("continuous", [])
or pd.api.types.is_numeric_dtype(self.df[col].dtype)
if col not in categorical_features and not col.startswith("metrics.")
}
for feature in continuous_features:
values = self.df[feature].dropna()
if len(values) > 0:
self.distributions[feature] = {
"min": float(values.min()),
"max": float(values.max()),
"mean": float(values.mean()),
"std": float(values.std()),
"median": float(values.median()),
"values": sorted(values.unique().tolist()),
}
self._analyze_feature_relationships()
logger.info(f"Analyzed distributions for {len(self.distributions)} features")
def _analyze_feature_relationships(self):
"""Analyze relationships between related features."""
self._analyze_vendor_accelerator_relations()
self._analyze_vendor_cpu_relations()
self._analyze_accelerator_memory_relations()
self._analyze_interconnect_relations()
self._analyze_software_relations()
self._analyze_device_counts()
def _analyze_vendor_accelerator_relations(self):
"""Map vendors to their accelerators."""
vendor_accelerators = defaultdict(list)
for _, row in self.df.iterrows():
vendor = row.get("system.accelerator.vendor")
acc = row.get("system.accelerator.name")
if vendor and acc:
vendor_accelerators[vendor].append(acc)
self.distributions["vendor_accelerators"] = {}
for vendor, accelerators in vendor_accelerators.items():
counter = Counter(accelerators)
total = sum(counter.values())
self.distributions["vendor_accelerators"][vendor] = {
acc: count / total for acc, count in counter.items()
}
def _analyze_vendor_cpu_relations(self):
"""Map CPU vendors to their models."""
vendor_cpus = defaultdict(list)
for _, row in self.df.iterrows():
vendor = row.get("system.cpu.vendor")
model = row.get("system.cpu.model")
if vendor and model:
vendor_cpus[vendor].append(model)
self.distributions["vendor_cpus"] = {}
for vendor, models in vendor_cpus.items():
counter = Counter(models)
total = sum(counter.values())
self.distributions["vendor_cpus"][vendor] = {
model: count / total for model, count in counter.items()
}
def _analyze_accelerator_memory_relations(self):
"""Map accelerator models to memory capacities."""
acc_memory = defaultdict(list)
for _, row in self.df.iterrows():
acc = row.get("system.accelerator.name")
memory = row.get("system.accelerator.memory_capacity")
if acc and memory:
acc_memory[acc].append(memory)
self.distributions["accelerator_memory"] = {}
for acc, memories in acc_memory.items():
if memories:
counter = Counter(memories)
most_common = counter.most_common(1)[0][0] if counter else None
self.distributions["accelerator_memory"][acc] = {
"min": min(memories),
"max": max(memories),
"mean": sum(memories) / len(memories),
"most_common": most_common,
"values": sorted(set(memories)),
}
def _analyze_interconnect_relations(self):
"""Map vendors to interconnect types."""
vendor_interconnects = defaultdict(list)
for _, row in self.df.iterrows():
vendor = row.get("system.accelerator.vendor")
interconnect = row.get("system.interconnect.accelerator")
if vendor and interconnect:
vendor_interconnects[vendor].append(interconnect)
self.distributions["vendor_interconnects"] = {}
for vendor, interconnects in vendor_interconnects.items():
counter = Counter(interconnects)
total = sum(counter.values())
self.distributions["vendor_interconnects"][vendor] = {
ic: count / total for ic, count in counter.items()
}
def _analyze_software_relations(self):
"""Map vendors to software stacks."""
vendor_software = defaultdict(lambda: defaultdict(list))
for _, row in self.df.iterrows():
vendor = row.get("system.accelerator.vendor")
if not vendor:
continue
os = row.get("software.operating_system")
if os:
vendor_software[vendor]["os"].append(os)
for col in self.df.columns:
if (
col.startswith("software.framework.")
and col != "software.framework"
):
framework = col.replace("software.framework.", "")
version = row.get(col)
if version:
vendor_software[vendor][framework].append(version)
self.distributions["vendor_software"] = {}
for vendor, software_dict in vendor_software.items():
self.distributions["vendor_software"][vendor] = {}
for software_type, values in software_dict.items():
counter = Counter(values)
total = sum(counter.values())
self.distributions["vendor_software"][vendor][software_type] = {
value: count / total for value, count in counter.items()
}
def _analyze_device_counts(self):
"""Analyze distribution of device counts."""
counts = self.df["system.accelerator.total_count"].dropna().astype(int).tolist()
if counts:
counter = Counter(counts)
total = sum(counter.values())
self.distributions["device_count"] = {
count: freq / total for count, freq in counter.items()
}
self.distributions["device_count_values"] = sorted(list(set(counts)))
node_counts = self.df["system.number_of_nodes"].dropna().astype(int).tolist()
if node_counts:
counter = Counter(node_counts)
total = sum(counter.values())
self.distributions["node_count"] = {
count: freq / total for count, freq in counter.items()
}
self.distributions["node_count_values"] = sorted(list(set(node_counts)))
device_node_pairs = [
(
int(row["system.number_of_nodes"]),
int(row["system.accelerator.total_count"]),
)
for _, row in self.df.iterrows()
if row.get("system.number_of_nodes")
and row.get("system.accelerator.total_count")
]
node_to_devices = defaultdict(list)
for nodes, devices in device_node_pairs:
node_to_devices[nodes].append(devices)
self.distributions["node_device_relation"] = {}
for node_count, device_counts in node_to_devices.items():
counter = Counter(device_counts)
total = sum(counter.values())
self.distributions["node_device_relation"][node_count] = {
count: freq / total for count, freq in counter.items()
}
def _train_model(self):
"""Train XGBoost model on available data with train/test split."""
df_clean = self.df.dropna(subset=[self.target])
X = df_clean[self.features]
y = df_clean[self.target]
for col in X.select_dtypes(include=["object"]).columns:
with pd.option_context("mode.chained_assignment", None):
X[col] = X[col].astype("category")
try:
strat_column = df_clean["system.accelerator.name"].fillna("unknown")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, stratify=strat_column, random_state=42
)
logger.info(
f"Created stratified train/test split ({100 - self.test_size * 100:.0f}%/{self.test_size * 100:.0f}%) with {len(X_train)} training and {len(X_test)} test samples"
)
except ValueError:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=self.test_size, random_state=42
)
logger.info(
f"Created regular train/test split with {len(X_train)} training and {len(X_test)} test samples"
)
self.model = xgb.XGBRegressor(
objective="reg:squarederror",
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
enable_categorical=True,
)
self.model.fit(X_train, y_train)
logger.info(f"Trained XGBoost model on {len(X_train)} rows")
self._evaluate_model(X_test, y_test, df_clean.loc[X_test.index])
def _evaluate_model(self, X_test, y_test, test_df):
"""Evaluate model performance on test set."""
if X_test.empty:
logger.warning("No test data available for evaluation")
return
y_pred = self.model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
self.evaluation_metrics = {
"rmse": rmse,
"mae": mae,
"r2": r2,
"mape": mape,
"test_size": len(y_test),
"training_size": len(self.df) - len(y_test),
}
eval_data = test_df[
[
"system.accelerator.name",
"system.accelerator.vendor",
"system.accelerator.total_count",
]
].copy()
eval_data["actual"] = y_test
eval_data["predicted"] = y_pred
eval_data["error"] = y_pred - y_test
eval_data["error_percent"] = (y_pred - y_test) / y_test * 100
self.evaluation_data = eval_data.copy()
logger.info(
f"Model evaluation - RMSE: {rmse:.2f}, MAE: {mae:.2f}, R²: {r2:.3f}, MAPE: {mape:.2f}%"
)
logger.info(
f"Evaluation data shape: {eval_data.shape}, with columns: {list(eval_data.columns)}"
)
logger.info(f"Evaluation data sample: {eval_data.head(2).to_dict()}")
logger.info(
f"Evaluation data stored as class attribute with shape: {self.evaluation_data.shape}"
)
importance = self.model.feature_importances_
feature_importance = pd.DataFrame(
{"Feature": self.model.feature_names_in_, "Importance": importance}
).sort_values("Importance", ascending=False)
self.feature_importance = feature_importance.head(10).copy()
logger.info(
f"Top 5 important features: {', '.join(self.feature_importance['Feature'].head(5).tolist())}"
)
def get_evaluation_metrics(self) -> dict:
"""Return model evaluation metrics."""
logger.info(f"Getting evaluation metrics: {self.evaluation_metrics}")
return self.evaluation_metrics.copy() if self.evaluation_metrics else {}
def get_evaluation_data(self) -> pd.DataFrame:
"""Return evaluation data for visualization."""
data_shape = (
"empty" if self.evaluation_data.empty else self.evaluation_data.shape
)
logger.info(f"Getting evaluation data with shape: {data_shape}")
return self.evaluation_data.copy() if not self.evaluation_data.empty else None
def get_feature_importance(self) -> pd.DataFrame:
"""Return feature importance data."""
logger.info(
f"Getting feature importance with shape: {self.feature_importance.shape}"
)
return (
self.feature_importance.copy()
if not self.feature_importance.empty
else pd.DataFrame(columns=["Feature", "Importance"])
)
def generate_predictions(
self,
architecture: str,
parameters: float,
constraints: dict = None,
num_configs: int = 10,
) -> pd.DataFrame:
"""Generate and predict performance for hardware configurations."""
constraints = constraints or {}
logger.info(
f"Generating {num_configs} predictions for {architecture} model with {parameters}B parameters"
)
configs = self._generate_configs(
architecture, parameters, constraints, num_configs
)
if not configs:
return pd.DataFrame()
configs_df = pd.DataFrame(configs)
model_features = getattr(self.model, "feature_names_in_", self.features)
for feature in model_features:
if feature not in configs_df.columns:
configs_df[feature] = None
X_pred = configs_df[model_features]
for col in X_pred.select_dtypes(include=["object"]).columns:
with pd.option_context("mode.chained_assignment", None):
X_pred[col] = X_pred[col].astype("category")
configs_df[self.target] = self.model.predict(X_pred)
configs_df["predicted"] = True
configs_df["metrics.result"] = (
configs_df[self.target] * configs_df["system.accelerator.total_count"]
)
configs_df["system.name"] = "Hypothetical system - ongoing work"
logger.info(
f"Performance range: {configs_df[self.target].min():.2f} - {configs_df[self.target].max():.2f} tokens/s per accelerator"
)
return configs_df
def _sample_from_distribution(self, distribution: dict) -> any:
"""Sample a value from a categorical distribution."""
items = list(distribution.keys())
probabilities = list(distribution.values())
return np.random.choice(items, p=probabilities)
def _sample_continuous_value(self, feature: str) -> float:
"""Sample a continuous value from the feature distribution."""
dist = self.distributions[feature]
if "values" in dist and dist["values"]:
if len(dist["values"]) > 3:
value = np.random.normal(dist["mean"], max(dist["std"], 1.0))
value = max(dist["min"], min(dist["max"], value))
closest_idx = min(
range(len(dist["values"])),
key=lambda i: abs(dist["values"][i] - value),
)
return dist["values"][closest_idx]
else:
return random.choice(dist["values"])
elif all(k in dist for k in ["min", "max", "mean", "std"]):
value = np.random.normal(dist["mean"], max(dist["std"], 1.0))
return max(dist["min"], min(dist["max"], value))
return np.random.uniform(dist["min"], dist["max"])
def _get_device_count(self, min_devices=None, max_devices=None) -> int:
"""Get a realistic device count based on distribution and constraints."""
valid_counts = [
count
for count in self.distributions["device_count_values"]
if (min_devices is None or count >= min_devices)
and (max_devices is None or count <= max_devices)
]
if valid_counts:
probs = {
count: self.distributions["device_count"][count]
for count in valid_counts
if count in self.distributions["device_count"]
}
if probs:
total = sum(probs.values())
items = list(probs.keys())
weights = [probs[item] / total for item in items]
return np.random.choice(items, p=weights)
return random.choice(valid_counts)
if min_devices is not None and max_devices is not None:
valid_powers = [
2**i for i in range(10) if min_devices <= 2**i <= max_devices
]
if valid_powers:
return random.choice(valid_powers)
return random.randint(min_devices, max_devices)
return random.choice([1, 2, 4, 8, 16])
def _get_vendor_accelerator(self, vendor=None) -> tuple:
"""Get a vendor and accelerator pair."""
if vendor is None or vendor == "Any":
vendor = self._sample_from_distribution(
self.distributions["system.accelerator.vendor"]
)
if vendor in self.distributions["vendor_accelerators"]:
accelerator = self._sample_from_distribution(
self.distributions["vendor_accelerators"][vendor]
)
else:
accelerator = self._sample_from_distribution(
self.distributions["system.accelerator.name"]
)
return vendor, accelerator
def _get_memory_for_accelerator(
self, vendor: str, accelerator: str, min_memory=None, max_memory=None
) -> float:
"""Get appropriate memory capacity for a given accelerator."""
if accelerator in self.distributions["accelerator_memory"]:
mem_dist = self.distributions["accelerator_memory"][accelerator]
if "values" in mem_dist:
valid_values = [
m
for m in mem_dist["values"]
if (min_memory is None or m >= min_memory)
and (max_memory is None or m <= max_memory)
]
if valid_values:
return random.choice(valid_values)
if "most_common" in mem_dist:
most_common = mem_dist["most_common"]
if (min_memory is None or most_common >= min_memory) and (
max_memory is None or most_common <= max_memory
):
return most_common
dist = self.distributions["system.accelerator.memory_capacity"]
valid_values = [
m
for m in dist["values"]
if (min_memory is None or m >= min_memory)
and (max_memory is None or m <= max_memory)
]
if valid_values:
return random.choice(valid_values)
min_val = max(dist["min"], min_memory or dist["min"])
max_val = min(dist["max"], max_memory or dist["max"])
if min_val <= max_val:
mean = min(max(dist["mean"], min_val), max_val)
std = max(dist["std"], 1.0)
for _ in range(5):
value = np.random.normal(mean, std)
if min_val <= value <= max_val:
return value
return np.random.uniform(min_val, max_val)
return None
def _get_node_config(self, total_devices: int) -> tuple:
"""Determine number of nodes and devices per node."""
VALID_GPUS_PER_NODE = [1, 2, 4, 8]
for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True):
if total_devices % gpus_per_node == 0:
return total_devices // gpus_per_node, gpus_per_node
for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True):
if gpus_per_node <= total_devices:
nodes = total_devices // gpus_per_node
return nodes, gpus_per_node
return 1, 1
def _get_cpu_config(self) -> dict:
"""Generate a CPU configuration."""
cpu_config = {}
cpu_config["system.cpu.vendor"] = self._sample_from_distribution(
self.distributions["system.cpu.vendor"]
)
cpu_vendor = cpu_config["system.cpu.vendor"]
if cpu_vendor in self.distributions["vendor_cpus"]:
cpu_config["system.cpu.model"] = self._sample_from_distribution(
self.distributions["vendor_cpus"][cpu_vendor]
)
else:
cpu_config["system.cpu.model"] = self._sample_from_distribution(
self.distributions["system.cpu.model"]
)
for feature in [
"system.cpu.core_count",
"system.cpu.count_per_node",
"system.cpu.frequency",
]:
value = self._sample_continuous_value(feature)
if value is not None:
if feature in ["system.cpu.core_count", "system.cpu.count_per_node"]:
value = int(value)
cpu_config[feature] = value
if "system.cpu.caches" in self.distributions:
cpu_config["system.cpu.caches"] = self._sample_from_distribution(
self.distributions["system.cpu.caches"]
)
return cpu_config
def _get_software_config(self, vendor: str, constraints=None) -> dict:
"""Generate a software configuration based on hardware vendor."""
constraints = constraints or {}
software_config = {}
if vendor in self.distributions["vendor_software"]:
vendor_sw = self.distributions["vendor_software"][vendor]
if "os" in vendor_sw:
os_constraint = constraints.get("software.operating_system")
if os_constraint and os_constraint != "Any":
software_config["software.operating_system"] = os_constraint
else:
software_config["software.operating_system"] = (
self._sample_from_distribution(vendor_sw["os"])
)
for framework, versions in vendor_sw.items():
if framework != "os":
framework_key = f"software.framework.{framework}"
version_constraint = constraints.get(framework_key)
if version_constraint and version_constraint != "Any":
software_config[framework_key] = version_constraint
else:
software_config[framework_key] = self._sample_from_distribution(
versions
)
if (
"software.operating_system" not in software_config
and "software.operating_system" in self.distributions
):
os_constraint = constraints.get("software.operating_system")
if os_constraint and os_constraint != "Any":
software_config["software.operating_system"] = os_constraint
else:
software_config["software.operating_system"] = (
self._sample_from_distribution(
self.distributions["software.operating_system"]
)
)
for framework in self.frameworks:
framework_key = f"software.framework.{framework}"
if (
framework_key not in software_config
and framework_key in self.distributions
):
version_constraint = constraints.get(framework_key)
if version_constraint and version_constraint != "Any":
software_config[framework_key] = version_constraint
else:
software_config[framework_key] = self._sample_from_distribution(
self.distributions[framework_key]
)
return software_config
def _get_memory_config(self, min_memory=None, max_memory=None) -> dict:
"""Generate a memory configuration."""
memory_config = {}
dist = self.distributions["system.memory.capacity"]
if "values" in dist:
valid_values = [
m
for m in dist["values"]
if (min_memory is None or m >= min_memory)
and (max_memory is None or m <= max_memory)
]
if valid_values:
memory_config["system.memory.capacity"] = random.choice(valid_values)
if "system.memory.capacity" not in memory_config:
min_val = max(dist["min"], min_memory or dist["min"])
max_val = min(dist["max"], max_memory or dist["max"])
if min_val <= max_val:
mean = min(max(dist["mean"], min_val), max_val)
std = max(dist["std"], (max_val - min_val) / 6.0)
value = np.random.normal(mean, std)
if min_val <= value <= max_val:
memory_config["system.memory.capacity"] = value
else:
memory_config["system.memory.capacity"] = np.random.uniform(
min_val, max_val
)
if "system.memory.configuration" in self.distributions:
memory_config["system.memory.configuration"] = (
self._sample_from_distribution(
self.distributions["system.memory.configuration"]
)
)
return memory_config
def _get_interconnect_config(self, vendor: str) -> dict:
"""Generate interconnect configuration based on vendor."""
interconnect_config = {}
if vendor in self.distributions["vendor_interconnects"]:
interconnect_config["system.interconnect.accelerator"] = (
self._sample_from_distribution(
self.distributions["vendor_interconnects"][vendor]
)
)
elif "system.interconnect.accelerator" in self.distributions:
interconnect_config["system.interconnect.accelerator"] = (
self._sample_from_distribution(
self.distributions["system.interconnect.accelerator"]
)
)
if "system.interconnect.accelerator_host" in self.distributions:
interconnect_config["system.interconnect.accelerator_host"] = (
self._sample_from_distribution(
self.distributions["system.interconnect.accelerator_host"]
)
)
return interconnect_config
def _generate_configs(
self, architecture: str, parameters: float, constraints=None, count: int = 10
) -> list:
"""Generate configurations respecting user constraints."""
constraints = constraints or {}
configs = []
vendor = constraints.get("system.accelerator.vendor")
acc_name = constraints.get("system.accelerator.name")
def apply_margin(value, is_min=True):
if value is None or not isinstance(value, (int, float)) or value == "Any":
return None
return value * (0.9 if is_min else 1.1)
min_gpu_memory = apply_margin(constraints.get("min_gpu_memory"), is_min=True)
max_gpu_memory = apply_margin(
constraints.get("max_gpu_memory"), is_min=False
) or (self.max_gpu_memory * 1.1)
min_cpu_memory = apply_margin(constraints.get("min_cpu_memory"), is_min=True)
max_cpu_memory = apply_margin(
constraints.get("max_cpu_memory"), is_min=False
) or (self.max_cpu_memory * 1.1)
min_devices = apply_margin(constraints.get("min_accelerators"), is_min=True)
max_devices = (
apply_margin(constraints.get("max_accelerators"), is_min=False)
or self.max_accelerators
)
interconnect = constraints.get("system.interconnect.accelerator")
nodes = constraints.get("system.number_of_nodes")
VALID_GPUS_PER_NODE = [1, 2, 4, 8]
for _ in range(count * 3):
if len(configs) >= count:
break
device_count = self._get_device_count(min_devices, max_devices)
acc_vendor, acc_model = self._get_vendor_accelerator(vendor)
if acc_name and acc_name != "Any":
acc_model = acc_name
if nodes and nodes != "Any":
node_count = int(nodes)
valid_device_counts = []
for gpus in VALID_GPUS_PER_NODE:
if node_count * gpus >= (
min_devices or 1
) and node_count * gpus <= (max_devices or float("inf")):
valid_device_counts.append(gpus)
if not valid_device_counts:
continue
devices_per_node = random.choice(valid_device_counts)
device_count = node_count * devices_per_node
else:
valid_count = False
for gpus_per_node in sorted(VALID_GPUS_PER_NODE, reverse=True):
if device_count % gpus_per_node == 0:
valid_count = True
break
if not valid_count:
node_count, devices_per_node = self._get_node_config(device_count)
device_count = node_count * devices_per_node
else:
node_count, devices_per_node = (
device_count // gpus_per_node,
gpus_per_node,
)
config = {
"model.architecture": architecture,
"model.number_of_parameters": parameters,
"system.accelerator.vendor": acc_vendor,
"system.accelerator.name": acc_model,
"system.accelerator.total_count": device_count,
"system.number_of_nodes": node_count,
"system.accelerator.count_per_node": devices_per_node,
}
gpu_memory = self._get_memory_for_accelerator(
acc_vendor,
acc_model,
min_memory=min_gpu_memory,
max_memory=max_gpu_memory,
)
if gpu_memory is None:
continue
config["system.accelerator.memory_capacity"] = gpu_memory
if "system.accelerator.memory_config" in self.distributions:
config["system.accelerator.memory_config"] = (
self._sample_from_distribution(
self.distributions["system.accelerator.memory_config"]
)
)
interconnect_config = self._get_interconnect_config(acc_vendor)
if interconnect and interconnect != "Any":
interconnect_config["system.interconnect.accelerator"] = interconnect
config.update(interconnect_config)
config.update(self._get_cpu_config())
memory_config = self._get_memory_config(
min_memory=min_cpu_memory, max_memory=max_cpu_memory
)
if "system.memory.capacity" not in memory_config:
continue
config.update(memory_config)
for feature_name in [
"system.type",
"system.cooling",
"model.weight_data_types",
]:
if feature_name in self.distributions:
config[feature_name] = self._sample_from_distribution(
self.distributions[feature_name]
)
config.update(self._get_software_config(acc_vendor, constraints))
for key, value in constraints.items():
if (
not key.startswith("software.framework.")
and key != "software.operating_system"
and key
not in [
"min_gpu_memory",
"max_gpu_memory",
"min_cpu_memory",
"max_cpu_memory",
"min_accelerators",
"max_accelerators",
]
and key not in config
and value != "Any"
and value is not None
):
config[key] = value
configs.append(config)
return configs[:count]