| | """ |
| | Tests for the controller plugin system. |
| | |
| | This module tests: |
| | 1. Base controller interface |
| | 2. Controller registry |
| | 3. Individual plugin controllers |
| | 4. Composite controllers |
| | 5. Integration with simulator |
| | """ |
| |
|
| | import pytest |
| | import numpy as np |
| | from unittest.mock import MagicMock |
| |
|
| | from tep.controller_base import ( |
| | BaseController, |
| | ControllerRegistry, |
| | register_controller, |
| | CompositeController, |
| | ) |
| | from tep.controllers import ( |
| | DecentralizedController, |
| | ManualController, |
| | PIController, |
| | ) |
| | from tep.controller_plugins import ( |
| | ReactorTemperatureController, |
| | SeparatorLevelController, |
| | StripperLevelController, |
| | ReactorSubsystemController, |
| | SeparatorSubsystemController, |
| | FeedSubsystemController, |
| | ProductQualityController, |
| | ReactorCompositionController, |
| | ProportionalOnlyController, |
| | EconomicMPCController, |
| | PassthroughController, |
| | create_composite_controller, |
| | ) |
| | from tep.simulator import TEPSimulator, ControlMode |
| | from tep.constants import NUM_MANIPULATED_VARS, NUM_MEASUREMENTS |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestBaseController: |
| | """Tests for BaseController abstract class.""" |
| |
|
| | def test_cannot_instantiate_directly(self): |
| | """BaseController cannot be instantiated directly.""" |
| | with pytest.raises(TypeError): |
| | BaseController() |
| |
|
| | def test_subclass_must_implement_calculate(self): |
| | """Subclasses must implement calculate method.""" |
| | class IncompleteController(BaseController): |
| | def reset(self): |
| | pass |
| |
|
| | with pytest.raises(TypeError): |
| | IncompleteController() |
| |
|
| | def test_subclass_must_implement_reset(self): |
| | """Subclasses must implement reset method.""" |
| | class IncompleteController(BaseController): |
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | with pytest.raises(TypeError): |
| | IncompleteController() |
| |
|
| | def test_valid_subclass(self): |
| | """Valid subclass can be instantiated.""" |
| | class ValidController(BaseController): |
| | name = "valid" |
| | description = "A valid controller" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv.copy() |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ctrl = ValidController() |
| | assert ctrl.name == "valid" |
| | assert ctrl.description == "A valid controller" |
| |
|
| | def test_get_info(self): |
| | """get_info returns controller metadata.""" |
| | class TestController(BaseController): |
| | name = "test_ctrl" |
| | description = "Test controller" |
| | version = "2.0.0" |
| | controlled_mvs = [1, 2, 3] |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ctrl = TestController() |
| | info = ctrl.get_info() |
| |
|
| | assert info["name"] == "test_ctrl" |
| | assert info["description"] == "Test controller" |
| | assert info["version"] == "2.0.0" |
| | assert info["controlled_mvs"] == [1, 2, 3] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestControllerRegistry: |
| | """Tests for ControllerRegistry.""" |
| |
|
| | def setup_method(self): |
| | """Clear registry before each test.""" |
| | |
| | self._saved = ControllerRegistry._controllers.copy() |
| |
|
| | def teardown_method(self): |
| | """Restore registry after each test.""" |
| | ControllerRegistry._controllers = self._saved |
| |
|
| | def test_register_controller(self): |
| | """Can register a controller class.""" |
| | class MyController(BaseController): |
| | name = "my_ctrl" |
| | description = "My controller" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register(MyController) |
| | assert "my_ctrl" in ControllerRegistry.list_available() |
| |
|
| | def test_register_with_custom_name(self): |
| | """Can register with a custom name.""" |
| | class MyController(BaseController): |
| | name = "original_name" |
| | description = "My controller" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register(MyController, name="custom_name") |
| | assert "custom_name" in ControllerRegistry.list_available() |
| |
|
| | def test_register_non_basecontroller_fails(self): |
| | """Registering non-BaseController class raises TypeError.""" |
| | class NotAController: |
| | pass |
| |
|
| | with pytest.raises(TypeError): |
| | ControllerRegistry.register(NotAController) |
| |
|
| | def test_get_controller(self): |
| | """Can get a registered controller class.""" |
| | class MyController(BaseController): |
| | name = "get_test" |
| | description = "Test" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register(MyController) |
| | cls = ControllerRegistry.get("get_test") |
| | assert cls is MyController |
| |
|
| | def test_get_unknown_controller_fails(self): |
| | """Getting unknown controller raises KeyError.""" |
| | with pytest.raises(KeyError): |
| | ControllerRegistry.get("nonexistent_controller") |
| |
|
| | def test_create_controller(self): |
| | """Can create a controller instance.""" |
| | class MyController(BaseController): |
| | name = "create_test" |
| | description = "Test" |
| |
|
| | def __init__(self, param1=10): |
| | self.param1 = param1 |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register(MyController) |
| | ctrl = ControllerRegistry.create("create_test", param1=42) |
| |
|
| | assert isinstance(ctrl, MyController) |
| | assert ctrl.param1 == 42 |
| |
|
| | def test_create_with_default_params(self): |
| | """Create uses registered default params.""" |
| | class MyController(BaseController): |
| | name = "defaults_test" |
| | description = "Test" |
| |
|
| | def __init__(self, value=0): |
| | self.value = value |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register( |
| | MyController, |
| | default_params={"value": 100} |
| | ) |
| | ctrl = ControllerRegistry.create("defaults_test") |
| |
|
| | assert ctrl.value == 100 |
| |
|
| | def test_list_available(self): |
| | """list_available returns registered controller names.""" |
| | |
| | available = ControllerRegistry.list_available() |
| | assert "decentralized" in available |
| | assert "manual" in available |
| |
|
| | def test_get_info(self): |
| | """get_info returns controller metadata.""" |
| | info = ControllerRegistry.get_info("decentralized") |
| | assert info["name"] == "decentralized" |
| | assert "description" in info |
| |
|
| | def test_unregister(self): |
| | """Can unregister a controller.""" |
| | class TempController(BaseController): |
| | name = "temp_ctrl" |
| | description = "Temporary" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | ControllerRegistry.register(TempController) |
| | assert "temp_ctrl" in ControllerRegistry.list_available() |
| |
|
| | ControllerRegistry.unregister("temp_ctrl") |
| | assert "temp_ctrl" not in ControllerRegistry.list_available() |
| |
|
| |
|
| | class TestRegisterDecorator: |
| | """Tests for @register_controller decorator.""" |
| |
|
| | def setup_method(self): |
| | self._saved = ControllerRegistry._controllers.copy() |
| |
|
| | def teardown_method(self): |
| | ControllerRegistry._controllers = self._saved |
| |
|
| | def test_decorator_registers_controller(self): |
| | """Decorator registers the controller.""" |
| | @register_controller(name="decorated_ctrl") |
| | class DecoratedController(BaseController): |
| | name = "decorated_ctrl" |
| | description = "Decorated" |
| |
|
| | def calculate(self, xmeas, xmv, step): |
| | return xmv |
| |
|
| | def reset(self): |
| | pass |
| |
|
| | assert "decorated_ctrl" in ControllerRegistry.list_available() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestReactorTemperatureController: |
| | """Tests for ReactorTemperatureController.""" |
| |
|
| | def test_initialization(self): |
| | """Controller initializes with correct parameters.""" |
| | ctrl = ReactorTemperatureController(setpoint=125.0, gain=-2.0) |
| | assert ctrl.setpoint == 125.0 |
| | assert ctrl.gain == -2.0 |
| |
|
| | def test_calculate_returns_correct_shape(self): |
| | """calculate returns 12-element array.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| | assert result.shape == (NUM_MANIPULATED_VARS,) |
| |
|
| | def test_only_modifies_mv10(self): |
| | """Only modifies XMV 10 (reactor cooling water).""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 130.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i == 9: |
| | assert result[i] != 50.0 |
| | else: |
| | assert result[i] == 50.0 |
| |
|
| | def test_reset_clears_state(self): |
| | """reset clears controller state.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 130.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | for step in range(0, 12, 3): |
| | ctrl.calculate(xmeas, xmv, step) |
| |
|
| | ctrl.reset() |
| | assert ctrl._controller.err_old == 0.0 |
| |
|
| |
|
| | class TestSeparatorLevelController: |
| | """Tests for SeparatorLevelController.""" |
| |
|
| | def test_only_modifies_mv7(self): |
| | """Only modifies XMV 7 (separator underflow).""" |
| | ctrl = SeparatorLevelController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[11] = 60.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i == 6: |
| | assert result[i] != 50.0 |
| | else: |
| | assert result[i] == 50.0 |
| |
|
| |
|
| | class TestStripperLevelController: |
| | """Tests for StripperLevelController.""" |
| |
|
| | def test_only_modifies_mv8(self): |
| | """Only modifies XMV 8 (stripper product flow).""" |
| | ctrl = StripperLevelController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[14] = 60.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i == 7: |
| | assert result[i] != 50.0 |
| | else: |
| | assert result[i] == 50.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestReactorSubsystemController: |
| | """Tests for ReactorSubsystemController.""" |
| |
|
| | def test_modifies_correct_mvs(self): |
| | """Modifies XMV 4 and 10.""" |
| | ctrl = ReactorSubsystemController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[3] = 10.0 |
| | xmeas[7] = 80.0 |
| | xmeas[8] = 125.0 |
| | xmeas[20] = 95.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | controlled = [3, 9] |
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i not in controlled: |
| | assert result[i] == 50.0, f"MV {i+1} should not change" |
| |
|
| | def test_cascade_structure(self): |
| | """Controller has proper cascade structure.""" |
| | ctrl = ReactorSubsystemController() |
| | assert hasattr(ctrl, 'cw_temp_setpoint') |
| | assert hasattr(ctrl, 'ac_feed_setpoint') |
| |
|
| |
|
| | class TestFeedSubsystemController: |
| | """Tests for FeedSubsystemController.""" |
| |
|
| | def test_modifies_mvs_1_to_4(self): |
| | """Modifies XMV 1, 2, 3, 4.""" |
| | ctrl = FeedSubsystemController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[0] = 0.3 |
| | xmeas[1] = 4000.0 |
| | xmeas[2] = 5000.0 |
| | xmeas[3] = 10.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | controlled = [0, 1, 2, 3] |
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i not in controlled: |
| | assert result[i] == 50.0, f"MV {i+1} should not change" |
| |
|
| | def test_set_setpoints(self): |
| | """Can update setpoints dynamically.""" |
| | ctrl = FeedSubsystemController() |
| | ctrl.set_setpoints(d_feed=4000.0, e_feed=5000.0) |
| |
|
| | assert ctrl.d_feed_sp == 4000.0 |
| | assert ctrl.e_feed_sp == 5000.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestProductQualityController: |
| | """Tests for ProductQualityController.""" |
| |
|
| | def test_only_modifies_mv9(self): |
| | """Only modifies XMV 9 (steam valve).""" |
| | ctrl = ProductQualityController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[17] = 66.0 |
| | xmeas[18] = 240.0 |
| | xmeas[37] = 0.85 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | for i in range(NUM_MANIPULATED_VARS): |
| | if i == 8: |
| | pass |
| | else: |
| | assert result[i] == 50.0 |
| |
|
| | def test_slow_loop_timing(self): |
| | """Slow loop only executes every 900 steps.""" |
| | ctrl = ProductQualityController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[37] = 0.9 |
| | xmeas[17] = 66.0 |
| | xmeas[18] = 230.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | initial_sp = ctrl.stripper_temp_setpoint |
| |
|
| | |
| | |
| | for step in [3, 6, 9, 300, 600]: |
| | ctrl.calculate(xmeas, xmv, step) |
| |
|
| | |
| | |
| | assert ctrl.stripper_temp_setpoint == initial_sp, \ |
| | f"Stripper temp setpoint changed before step 900: {ctrl.stripper_temp_setpoint}" |
| |
|
| | |
| | ctrl.calculate(xmeas, xmv, step=900) |
| |
|
| | |
| | assert ctrl.stripper_temp_setpoint != initial_sp, \ |
| | f"Stripper temp setpoint should change at step 900" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestProportionalOnlyController: |
| | """Tests for ProportionalOnlyController.""" |
| |
|
| | def test_controls_mvs_1_to_11(self): |
| | """Controls MVs 1-11.""" |
| | ctrl = ProportionalOnlyController() |
| | assert ctrl.controlled_mvs == list(range(1, 12)) |
| |
|
| | def test_calculate_updates_multiple_mvs(self): |
| | """calculate updates multiple MVs.""" |
| | ctrl = ProportionalOnlyController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | |
| | xmeas[1] = 3664.0 |
| | xmeas[2] = 4509.3 |
| | xmeas[11] = 60.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | changed_count = sum(1 for i in range(11) if result[i] != 50.0) |
| | assert changed_count > 0 |
| |
|
| |
|
| | class TestEconomicMPCController: |
| | """Tests for EconomicMPCController.""" |
| |
|
| | def test_safety_override_high_temp(self): |
| | """Safety layer activates on high reactor temperature.""" |
| | ctrl = EconomicMPCController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 148.0 |
| | xmeas[6] = 2800.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | assert result[9] > 50.0 |
| |
|
| | def test_safety_override_high_pressure(self): |
| | """Safety layer activates on high reactor pressure.""" |
| | ctrl = EconomicMPCController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 120.0 |
| | xmeas[6] = 2860.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | assert result[5] > 50.0 |
| |
|
| | def test_production_rate_adjustment(self): |
| | """Can adjust production rate target.""" |
| | ctrl = EconomicMPCController() |
| | ctrl.set_production_rate(0.8) |
| |
|
| | assert ctrl.production_rate_target == 0.8 |
| |
|
| |
|
| | class TestPassthroughController: |
| | """Tests for PassthroughController.""" |
| |
|
| | def test_returns_unchanged_mvs(self): |
| | """Returns MVs unchanged.""" |
| | ctrl = PassthroughController() |
| | xmeas = np.random.rand(NUM_MEASUREMENTS) |
| | xmv = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 50, 50], dtype=float) |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | np.testing.assert_array_equal(result, xmv) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestCompositeController: |
| | """Tests for CompositeController.""" |
| |
|
| | def test_combines_multiple_controllers(self): |
| | """Combines outputs from multiple sub-controllers.""" |
| | composite = CompositeController() |
| |
|
| | |
| | composite.add_controller(ReactorTemperatureController(), mvs=[10]) |
| |
|
| | |
| | composite.add_controller(SeparatorLevelController(), mvs=[7]) |
| |
|
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 130.0 |
| | xmeas[11] = 60.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = composite.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | assert result[9] != 50.0 |
| | assert result[6] != 50.0 |
| |
|
| | def test_mv_conflict_raises_error(self): |
| | """Adding conflicting MV assignments raises error.""" |
| | composite = CompositeController() |
| | composite.add_controller(ReactorTemperatureController(), mvs=[10]) |
| |
|
| | |
| | with pytest.raises(ValueError): |
| | composite.add_controller(SeparatorLevelController(), mvs=[10]) |
| |
|
| | def test_fallback_controller(self): |
| | """Fallback controller handles unassigned MVs.""" |
| | |
| | composite = CompositeController( |
| | fallback_controller=PassthroughController() |
| | ) |
| | composite.add_controller(ReactorTemperatureController(), mvs=[10]) |
| |
|
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | result = composite.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | assert result[0] == 50.0 |
| |
|
| | def test_reset_resets_all(self): |
| | """reset calls reset on all sub-controllers.""" |
| | composite = CompositeController() |
| | composite.add_controller(ReactorTemperatureController(), mvs=[10]) |
| | composite.add_controller(SeparatorLevelController(), mvs=[7]) |
| |
|
| | |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 130.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| | for step in range(0, 30, 3): |
| | composite.calculate(xmeas, xmv, step) |
| |
|
| | composite.reset() |
| |
|
| | |
| | for ctrl, _ in composite._sub_controllers: |
| | assert ctrl._controller.err_old == 0.0 |
| |
|
| |
|
| | class TestCreateCompositeController: |
| | """Tests for create_composite_controller factory function.""" |
| |
|
| | def test_creates_composite_from_names(self): |
| | """Creates composite controller from subsystem names.""" |
| | ctrl = create_composite_controller( |
| | ["reactor_temp", "separator_level"], |
| | fallback="passthrough" |
| | ) |
| |
|
| | assert isinstance(ctrl, CompositeController) |
| | assert len(ctrl._sub_controllers) == 2 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestSimulatorIntegration: |
| | """Tests for controller integration with TEPSimulator.""" |
| |
|
| | @pytest.fixture |
| | def fortran_available(self): |
| | """Check if Fortran backend is available.""" |
| | try: |
| | from tep._fortran import teprob |
| | return True |
| | except ImportError: |
| | return False |
| |
|
| | def test_decentralized_controller_registered(self): |
| | """DecentralizedController is registered.""" |
| | assert "decentralized" in ControllerRegistry.list_available() |
| |
|
| | def test_manual_controller_registered(self): |
| | """ManualController is registered.""" |
| | assert "manual" in ControllerRegistry.list_available() |
| |
|
| | def test_custom_controller_with_simulator(self, fortran_available): |
| | """Custom controller works with simulator.""" |
| | if not fortran_available: |
| | pytest.skip("Fortran backend not available") |
| |
|
| | sim = TEPSimulator(control_mode=ControlMode.CLOSED_LOOP) |
| | sim.initialize() |
| |
|
| | |
| | custom_ctrl = ReactorTemperatureController() |
| | sim.controller = custom_ctrl |
| |
|
| | |
| | for _ in range(10): |
| | sim.step() |
| |
|
| | |
| | assert sim.step_count == 10 |
| |
|
| | def test_simulate_with_controller_method(self, fortran_available): |
| | """simulate_with_controller accepts custom controllers.""" |
| | if not fortran_available: |
| | pytest.skip("Fortran backend not available") |
| |
|
| | sim = TEPSimulator(control_mode=ControlMode.CLOSED_LOOP) |
| | sim.initialize() |
| |
|
| | |
| | result = sim.simulate_with_controller( |
| | duration_hours=0.01, |
| | controller=PassthroughController(), |
| | record_interval=36 |
| | ) |
| |
|
| | assert result.time[-1] > 0 |
| |
|
| | def test_plugin_controller_via_registry(self, fortran_available): |
| | """Can use registry to create controller for simulator.""" |
| | if not fortran_available: |
| | pytest.skip("Fortran backend not available") |
| |
|
| | ctrl = ControllerRegistry.create("reactor_temp") |
| |
|
| | sim = TEPSimulator(control_mode=ControlMode.CLOSED_LOOP) |
| | sim.initialize() |
| | sim.controller = ctrl |
| |
|
| | |
| | result = sim.simulate(duration_hours=0.01, record_interval=36) |
| | assert not result.shutdown |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestDecentralizedControllerInheritance: |
| | """Tests that DecentralizedController properly inherits from BaseController.""" |
| |
|
| | def test_is_base_controller(self): |
| | """DecentralizedController inherits from BaseController.""" |
| | ctrl = DecentralizedController() |
| | assert isinstance(ctrl, BaseController) |
| |
|
| | def test_has_required_methods(self): |
| | """Has calculate and reset methods.""" |
| | ctrl = DecentralizedController() |
| | assert hasattr(ctrl, 'calculate') |
| | assert hasattr(ctrl, 'reset') |
| | assert callable(ctrl.calculate) |
| | assert callable(ctrl.reset) |
| |
|
| | def test_has_class_attributes(self): |
| | """Has required class attributes.""" |
| | ctrl = DecentralizedController() |
| | assert ctrl.name == "decentralized" |
| | assert ctrl.description is not None |
| | assert ctrl.version is not None |
| |
|
| | def test_get_parameters(self): |
| | """get_parameters returns controller state.""" |
| | ctrl = DecentralizedController() |
| | params = ctrl.get_parameters() |
| |
|
| | assert "setpoints" in params |
| | assert "purge_flag" in params |
| |
|
| |
|
| | class TestManualControllerInheritance: |
| | """Tests that ManualController properly inherits from BaseController.""" |
| |
|
| | def test_is_base_controller(self): |
| | """ManualController inherits from BaseController.""" |
| | ctrl = ManualController() |
| | assert isinstance(ctrl, BaseController) |
| |
|
| | def test_has_required_methods(self): |
| | """Has calculate and reset methods.""" |
| | ctrl = ManualController() |
| | assert hasattr(ctrl, 'calculate') |
| | assert hasattr(ctrl, 'reset') |
| |
|
| | def test_reset_restores_defaults(self): |
| | """reset restores default MV values.""" |
| | ctrl = ManualController() |
| | ctrl.set_mv(1, 100.0) |
| |
|
| | ctrl.reset() |
| |
|
| | |
| | assert ctrl.mv_values[0] != 100.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestEdgeCases: |
| | """Tests for edge cases and error handling.""" |
| |
|
| | def test_controller_with_zero_step(self): |
| | """Controllers handle step=0.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | result = ctrl.calculate(xmeas, xmv, step=0) |
| | assert result.shape == (NUM_MANIPULATED_VARS,) |
| |
|
| | def test_controller_with_negative_measurements(self): |
| | """Controllers handle negative measurements gracefully.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.ones(NUM_MEASUREMENTS) * -10.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| | assert result.shape == (NUM_MANIPULATED_VARS,) |
| |
|
| | def test_controller_with_nan_measurements(self): |
| | """Controllers produce output even with NaN measurements.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.ones(NUM_MEASUREMENTS) * np.nan |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| | assert result.shape == (NUM_MANIPULATED_VARS,) |
| |
|
| | def test_controller_preserves_mv_limits(self): |
| | """Controller outputs respect MV limits (0-100).""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 200.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 99.0 |
| |
|
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| |
|
| | |
| | assert 0.0 <= result[9] <= 100.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestControllerTiming: |
| | """Tests for controller timing behavior.""" |
| |
|
| | def test_fast_loop_timing(self): |
| | """Fast loops execute every 3 steps.""" |
| | ctrl = ReactorTemperatureController() |
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[8] = 130.0 |
| | xmv_base = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | for step in [1, 2, 4, 5, 7, 8]: |
| | xmv = xmv_base.copy() |
| | result = ctrl.calculate(xmeas, xmv, step) |
| | |
| | assert result[9] == 50.0, f"MV10 changed on step {step}" |
| |
|
| | |
| | ctrl.reset() |
| | xmv = xmv_base.copy() |
| | result = ctrl.calculate(xmeas, xmv, step=3) |
| | |
| | assert result[9] != 50.0, "MV10 should change on step 3" |
| |
|
| | def test_medium_loop_timing(self): |
| | """Medium loops (6 min) execute every 360 steps.""" |
| | ctrl = ReactorCompositionController() |
| |
|
| | xmeas = np.zeros(NUM_MEASUREMENTS) |
| | xmeas[22] = 40.0 |
| | xmv = np.ones(NUM_MANIPULATED_VARS) * 50.0 |
| |
|
| | |
| | initial_sp = ctrl.a_feed_setpoint |
| |
|
| | |
| | |
| | for step in [3, 6, 9, 300]: |
| | ctrl.calculate(xmeas, xmv, step) |
| |
|
| | |
| | |
| | assert ctrl.a_feed_setpoint == initial_sp, \ |
| | f"Setpoint changed before step 360: {ctrl.a_feed_setpoint} != {initial_sp}" |
| |
|
| | |
| | ctrl.calculate(xmeas, xmv, step=360) |
| |
|
| | |
| | assert ctrl.a_feed_setpoint != initial_sp, \ |
| | f"Setpoint should have changed at step 360" |
| |
|