diff --git a/.github/workflows/_testing.yml b/.github/workflows/_testing.yml index 0be313f0..21d69eed 100644 --- a/.github/workflows/_testing.yml +++ b/.github/workflows/_testing.yml @@ -10,7 +10,7 @@ jobs: strategy: matrix: host-os: ["ubuntu-latest"] - python-version: ["py310-cpu", "py311-cpu", "py312-cpu", "py313-cpu"] + python-version: ["py311-cpu", "py312-cpu", "py313-cpu"] fail-fast: false defaults: diff --git a/pixi.toml b/pixi.toml index 67129eab..d49a3a6c 100644 --- a/pixi.toml +++ b/pixi.toml @@ -9,7 +9,7 @@ platforms = ["linux-64", "osx-arm64"] version = "0.9.0" [dependencies] -python = ">=3.10.0,<3.14" +python = ">=3.11.0,<3.14" [feature.dev.dependencies] @@ -46,9 +46,6 @@ bluesky-tiled-plugins = "*" ophyd-async = "*" opencv-python = "*" -[feature.py310.dependencies] -python = "3.10.*" - [feature.py311.dependencies] python = "3.11.*" @@ -78,7 +75,6 @@ convert-tutorials-to-ipynb = "jupytext --to notebook docs/source/tutorials/*.md" dev = ["dev"] dev-cpu = ["dev-cpu"] docs = ["docs"] -py310-cpu = ["dev-cpu", "py310"] py311-cpu = ["dev-cpu", "py311"] py312-cpu = ["dev-cpu", "py312"] py313-cpu = ["dev-cpu", "py313"] diff --git a/pyproject.toml b/pyproject.toml index 90ac1139..db2b6df5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,9 +20,10 @@ maintainers = [ { name = "Jennefer Maldonado", email = "jmaldonad@bnl.gov" }, { name = "Roman Chernikov", email = "rcherniko@bnl.gov" }, ] -requires-python = ">=3.10" +requires-python = ">=3.11" dependencies = [ "ax-platform>=1.1.0,<1.3", + "xopt", "bluesky>=1.14.2", "bluesky-queueserver-api>=0.0.12", "torch", diff --git a/src/blop/__init__.py b/src/blop/__init__.py index c2d138e6..4327d6df 100644 --- a/src/blop/__init__.py +++ b/src/blop/__init__.py @@ -1,5 +1,6 @@ from .ax import DOF, Agent, ChoiceDOF, DOFConstraint, Objective, OutcomeConstraint, RangeDOF, ScalarizedObjective from .plans import acquire_baseline, default_acquire, optimize, optimize_step, sample_suggestions +from .xopt import XoptOptimizer try: from ._version import __version__ @@ -21,4 +22,5 @@ "optimize", "optimize_step", "sample_suggestions", + "XoptOptimizer", ] diff --git a/src/blop/tests/xopt/test_optimizer.py b/src/blop/tests/xopt/test_optimizer.py new file mode 100644 index 00000000..e8ef4f81 --- /dev/null +++ b/src/blop/tests/xopt/test_optimizer.py @@ -0,0 +1,382 @@ +from collections.abc import Callable + +import numpy as np +import pandas as pd +import pytest +from xopt.generators.bayesian import ExpectedImprovementGenerator +from xopt.generators.random import RandomGenerator +from xopt.vocs import VOCS + +from blop.xopt import optimizer as xopt_optimizer_module +from blop.xopt.optimizer import XoptOptimizer + + +def _random_optimizer(vocs: VOCS, *, checkpoint_path: str | None = None) -> XoptOptimizer: + return XoptOptimizer( + generator=RandomGenerator(vocs=vocs), + checkpoint_path=checkpoint_path, + ) + + +def _bo_optimizer(vocs: VOCS, *, checkpoint_path: str | None = None) -> XoptOptimizer: + return XoptOptimizer( + generator=ExpectedImprovementGenerator(vocs=vocs), + checkpoint_path=checkpoint_path, + ) + + +@pytest.fixture(params=[_random_optimizer, _bo_optimizer], ids=["random", "bo"]) +def optimizer_factory(request: pytest.FixtureRequest) -> Callable[[VOCS], XoptOptimizer]: + return request.param + + +def test_xopt_optimizer_init(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + if optimizer_factory is _bo_optimizer: + vocs = VOCS( + variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, + objectives={"y1": "MINIMIZE"}, + constraints={"y1": ["LESS_THAN", 10.0]}, + ) + else: + vocs = VOCS( + variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, + objectives={"y1": "MAXIMIZE", "y2": "MINIMIZE"}, + constraints={"y1": ["GREATER_THAN", 0.0], "y2": ["LESS_THAN", 0.0]}, + ) + + optimizer = optimizer_factory(vocs) + assert optimizer.generator is not None + assert set(optimizer.vocs.variable_names) == {"x1", "x2", "x3"} + + +def test_xopt_fixed_parameters(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + vocs = VOCS(variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, objectives={"y1": "MINIMIZE"}) + optimizer = optimizer_factory(vocs) + + with pytest.raises(KeyError): + optimizer.fixed_parameters = {"x4": 3} + + optimizer.fixed_parameters = {"x3": 3} + assert optimizer.fixed_parameters == {"x3": 3} + + optimizer.fixed_parameters = {} + assert optimizer.fixed_parameters is None + + +def test_xopt_optimizer_suggest_ids_and_keys(): + vocs = VOCS(variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, objectives={"y1": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + suggestions = optimizer.suggest(num_points=2) + assert len(suggestions) == 2 + for i, suggestion in enumerate(suggestions): + assert suggestion["_id"] == i + assert "x1" in suggestion + assert "x2" in suggestion + assert "x3" in suggestion + + +def test_xopt_optimizer_suggest_defaults_to_single_point(): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + suggestions = optimizer.suggest() + assert len(suggestions) == 1 + assert suggestions[0]["_id"] == 0 + + +def test_xopt_optimizer_ingest_multiple_columns(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + if optimizer_factory is _bo_optimizer: + vocs = VOCS( + variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, + objectives={"y1": "MINIMIZE"}, + ) + else: + vocs = VOCS( + variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0], "x3": [0.0, 5.0]}, + objectives={"y1": "MAXIMIZE", "y2": "MINIMIZE"}, + ) + optimizer = optimizer_factory(vocs) + + if optimizer_factory is _bo_optimizer: + optimizer.ingest( + [ + {"x1": 0.0, "x2": 0.0, "x3": 0.0, "y1": 1.0}, + {"x1": 0.1, "x2": 0.2, "x3": 1.0, "y1": 3.0}, + ] + ) + else: + optimizer.ingest( + [ + {"x1": 0.0, "x2": 0.0, "x3": 0.0, "y1": 1.0, "y2": 2.0}, + {"x1": 0.1, "x2": 0.2, "x3": 1.0, "y1": 3.0, "y2": 4.0}, + ] + ) + + data = optimizer.generator.data + assert data is not None + assert len(data) == 2 + assert np.allclose(data["x1"].to_numpy(dtype=float), [0.0, 0.1]) + assert np.allclose(data["x2"].to_numpy(dtype=float), [0.0, 0.2]) + assert np.allclose(data["x3"].to_numpy(dtype=float), [0.0, 1.0]) + assert np.allclose(data["y1"].to_numpy(dtype=float), [1.0, 3.0]) + if optimizer_factory is _bo_optimizer: + assert "y2" not in data.columns + else: + assert np.allclose(data["y2"].to_numpy(dtype=float), [2.0, 4.0]) + + +def test_xopt_optimizer_ingest_baseline_id(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + vocs = VOCS(variables={"x1": [-5.0, 5.0]}, objectives={"y1": "MINIMIZE"}) + optimizer = optimizer_factory(vocs) + + optimizer.ingest([{"x1": 0.0, "y1": 1.0, "_id": "baseline"}]) + data = optimizer.generator.data + assert data is not None + assert len(data) == 1 + assert data.iloc[0]["_id"] == "baseline" + + +def test_xopt_optimizer_seeds_state_when_existing_data_lacks_id(): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + generator = RandomGenerator(vocs=vocs) + generator.add_data(pd.DataFrame([{"x": 0.4, "y": 1.2}])) + + optimizer = XoptOptimizer(generator=generator) + + assert optimizer._params_by_id[0] == {"x": 0.4} + assert optimizer._next_id == 1 + + +def test_xopt_optimizer_suggest_ingest(): + vocs = VOCS(variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0]}, objectives={"y1": "MINIMIZE", "y2": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + suggestions = optimizer.suggest(num_points=2) + outcomes = [ + {"_id": suggestions[0]["_id"], "y1": 1.0, "y2": 2.0}, + {"_id": suggestions[1]["_id"], "y1": 3.0, "y2": 4.0}, + ] + optimizer.ingest(outcomes) + + data = optimizer.generator.data + assert data is not None + assert len(data) == 2 + assert np.allclose(data["y1"].to_numpy(dtype=float), [1.0, 3.0]) + assert np.allclose(data["y2"].to_numpy(dtype=float), [2.0, 4.0]) + + +def test_xopt_optimizer_register_failures(): + vocs = VOCS(variables={"x1": [-5.0, 5.0], "x2": [-5.0, 5.0]}, objectives={"y1": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + suggestions = optimizer.suggest(num_points=5) + optimizer.register_failures(suggestions) + + assert all(suggestion["_id"] not in optimizer._params_by_id for suggestion in suggestions) + + +def test_xopt_optimizer_checkpoint_roundtrip(tmp_path): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + checkpoint_path = tmp_path / "xopt_optimizer.pkl" + optimizer = _random_optimizer(vocs, checkpoint_path=str(checkpoint_path)) + + suggestions = optimizer.suggest(1) + optimizer.ingest([{"_id": suggestions[0]["_id"], "y": 0.5}]) + optimizer.checkpoint() + + recovered = XoptOptimizer.from_checkpoint(str(checkpoint_path)) + assert recovered.generator.data is not None + assert len(recovered.generator.data) == 1 + assert recovered.checkpoint_path == str(checkpoint_path) + + +def test_xopt_optimizer_checkpoint_no_path(): + vocs = VOCS(variables={"x1": [-5.0, 5.0]}, objectives={"y1": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + with pytest.raises(ValueError): + optimizer.checkpoint() + + +def test_xopt_optimizer_applies_fixed_parameters(): + vocs = VOCS(variables={"x": [0.0, 1.0], "z": [0.0, 2.0]}, objectives={"y": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + optimizer.fixed_parameters = {"z": 1.25} + + suggestions = optimizer.suggest(3) + assert all(suggestion["z"] == 1.25 for suggestion in suggestions) + + +def test_xopt_optimizer_get_best_points_single_objective_minimize(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + optimizer = optimizer_factory(vocs) + + optimizer.ingest( + [ + {"x": 0.1, "y": 5.0}, + {"x": 0.2, "y": 1.0}, + {"x": 0.3, "y": 3.0}, + ] + ) + + best_points = optimizer.get_best_points() + assert len(best_points) == 1 + _, params, outcomes = best_points[0] + assert params["x"] == 0.2 + assert outcomes["y"] == 1.0 + + +def test_xopt_optimizer_get_best_points_single_objective_maximize(optimizer_factory: Callable[[VOCS], XoptOptimizer]): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MAXIMIZE"}) + optimizer = optimizer_factory(vocs) + + optimizer.ingest( + [ + {"x": 0.1, "y": 5.0}, + {"x": 0.2, "y": 1.0}, + {"x": 0.3, "y": 3.0}, + ] + ) + + best_points = optimizer.get_best_points() + assert len(best_points) == 1 + _, params, outcomes = best_points[0] + assert params["x"] == 0.1 + assert outcomes["y"] == 5.0 + + +def test_xopt_optimizer_get_best_points_multi_objective(): + vocs = VOCS(variables={"x": [0.0, 10.0]}, objectives={"y1": "MAXIMIZE", "y2": "MAXIMIZE"}) + optimizer = _random_optimizer(vocs) + + optimizer.ingest( + [ + {"x": 1.0, "y1": 10.0, "y2": 1.0}, + {"x": 5.0, "y1": 1.0, "y2": 10.0}, + {"x": 3.0, "y1": 2.0, "y2": 2.0}, + ] + ) + + best_points = optimizer.get_best_points() + assert len(best_points) == 3 + for trial_id, params, metrics in best_points: + assert isinstance(trial_id, (int, float, str)) + assert "x" in params + assert "y1" in metrics + assert "y2" in metrics + + +def test_xopt_optimizer_get_best_points_returns_empty_when_all_infeasible(): + vocs = VOCS( + variables={"x": [0.0, 1.0]}, + objectives={"y": "MINIMIZE"}, + constraints={"c": ["LESS_THAN", 0.0]}, + ) + optimizer = _random_optimizer(vocs) + + optimizer.ingest( + [ + {"x": 0.1, "y": 5.0, "c": 1.0}, + {"x": 0.2, "y": 1.0, "c": 2.0}, + {"x": 0.3, "y": 3.0, "c": 3.0}, + ] + ) + + best_points = optimizer.get_best_points() + assert best_points == [] + + +def test_xopt_optimizer_get_best_points_selects_best_feasible_only(): + vocs = VOCS( + variables={"x": [0.0, 1.0]}, + objectives={"y": "MINIMIZE"}, + constraints={"c": ["LESS_THAN", 0.5]}, + ) + optimizer = _random_optimizer(vocs) + + optimizer.ingest( + [ + {"x": 0.1, "y": 10.0, "c": 0.1}, + {"x": 0.2, "y": 1.0, "c": 0.9}, + {"x": 0.3, "y": 2.0, "c": 0.2}, + ] + ) + + best_points = optimizer.get_best_points() + assert len(best_points) == 1 + _, params, outcomes = best_points[0] + assert params["x"] == 0.3 + assert outcomes["y"] == 2.0 + assert outcomes["c"] == 0.2 + + +def test_xopt_optimizer_get_best_points_empty_without_data(): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + optimizer = _random_optimizer(vocs) + + assert optimizer.get_best_points() == [] + + +def test_xopt_optimizer_feasible_mask_defaults_true_when_missing_feasible(monkeypatch: pytest.MonkeyPatch): + vocs = VOCS( + variables={"x": [0.0, 1.0]}, + objectives={"y": "MINIMIZE"}, + constraints={"c": ["LESS_THAN", 1.0]}, + ) + optimizer = _random_optimizer(vocs) + data = pd.DataFrame([{"x": 0.1, "y": 1.0, "c": 0.1}, {"x": 0.2, "y": 2.0, "c": 0.2}]) + + monkeypatch.setattr(xopt_optimizer_module, "get_feasibility_data", lambda _vocs, _data: {"c": [True, False]}) + + mask = optimizer._feasible_mask(data) + assert mask.tolist() == [True, True] + + +def test_xopt_optimizer_get_best_points_includes_observables(): + vocs = VOCS( + variables={"x": [0.0, 1.0]}, + objectives={"y": "MINIMIZE"}, + observables=["obs"], + ) + optimizer = _random_optimizer(vocs) + + optimizer.ingest( + [ + {"x": 0.1, "y": 2.0, "obs": 10.0}, + {"x": 0.2, "y": 1.0, "obs": 20.0}, + ] + ) + + best_points = optimizer.get_best_points() + assert len(best_points) == 1 + _, _, outcomes = best_points[0] + assert outcomes["y"] == 1.0 + assert outcomes["obs"] == 20.0 + + +def test_xopt_expected_improvement_runs_simple_minimization(): + vocs = VOCS(variables={"x": [0.0, 1.0]}, objectives={"y": "MINIMIZE"}) + optimizer = XoptOptimizer(generator=ExpectedImprovementGenerator(vocs=vocs)) + + # Seed EI with initial evaluations for model training. + optimizer.ingest( + [ + {"x": 0.0, "y": (0.0 - 0.25) ** 2}, + {"x": 0.5, "y": (0.5 - 0.25) ** 2}, + {"x": 1.0, "y": (1.0 - 0.25) ** 2}, + ] + ) + + for _ in range(3): + suggestion = optimizer.suggest(1)[0] + x_val = float(suggestion["x"]) + optimizer.ingest([{"_id": suggestion["_id"], "y": (x_val - 0.25) ** 2}]) + + assert optimizer.generator.data is not None + assert len(optimizer.generator.data) == 6 + + best_points = optimizer.get_best_points() + assert len(best_points) == 1 + _, _, outcomes = best_points[0] + assert outcomes["y"] <= 0.0625 diff --git a/src/blop/utils.py b/src/blop/utils.py index ecf8c2bb..d696f70f 100644 --- a/src/blop/utils.py +++ b/src/blop/utils.py @@ -1,6 +1,6 @@ import time from collections.abc import Sequence -from enum import Enum +from enum import StrEnum from typing import Any import networkx as nx @@ -12,7 +12,7 @@ from .protocols import ID_KEY, OptimizationProblem -class Source(str, Enum): +class Source(StrEnum): """An enum that helps describe where the data key comes from.""" OUTCOME = "optimization-outcome" diff --git a/src/blop/xopt/__init__.py b/src/blop/xopt/__init__.py new file mode 100644 index 00000000..959745c3 --- /dev/null +++ b/src/blop/xopt/__init__.py @@ -0,0 +1,3 @@ +from .optimizer import XoptOptimizer + +__all__ = ["XoptOptimizer"] diff --git a/src/blop/xopt/optimizer.py b/src/blop/xopt/optimizer.py new file mode 100644 index 00000000..835af2f7 --- /dev/null +++ b/src/blop/xopt/optimizer.py @@ -0,0 +1,235 @@ +import pickle +from collections.abc import Mapping +from pathlib import Path +from typing import Any + +import pandas as pd +from xopt import VOCS +from xopt.generator import Generator +from xopt.vocs import get_feasibility_data, select_best + +from ..protocols import ID_KEY, CanRegisterSuggestions, Checkpointable, Optimizer, TrialFaultAware + + +def _normalize_trial_id(value: Any) -> int | str: + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + return str(value) + + +def _is_missing_scalar(value: Any) -> bool: + return value is None or (isinstance(value, float) and pd.isna(value)) + + +class XoptOptimizer(Optimizer, Checkpointable, CanRegisterSuggestions, TrialFaultAware): + """Adapter that exposes an arbitrary Xopt generator through blop's Optimizer protocol.""" + + def __init__( + self, + generator: Generator, + *, + checkpoint_path: str | None = None, + ): + # Keep API simple: caller provides a fully configured Xopt generator instance. + self._generator = generator + + # Internal state tracks IDs, pending/known parameterizations, and checkpoint metadata. + self._checkpoint_path = checkpoint_path + self._fixed_parameters: dict[str, Any] | None = None + self._next_id = 0 + self._params_by_id: dict[int | str, dict[str, Any]] = {} + self._seed_state_from_existing_data() + + @classmethod + def from_checkpoint(cls, checkpoint_path: str) -> "XoptOptimizer": + # Restore all persistent adapter state from pickle payload. + path = Path(checkpoint_path) + with path.open("rb") as stream: + payload = pickle.load(stream) + + instance = object.__new__(cls) + instance._generator = payload["generator"] + instance._checkpoint_path = str(path) + instance._fixed_parameters = payload.get("fixed_parameters") + instance._next_id = payload.get("next_id", 0) + instance._params_by_id = payload.get("params_by_id", {}) + instance._seed_state_from_existing_data() + return instance + + @property + def checkpoint_path(self) -> str | None: + return self._checkpoint_path + + @property + def generator(self) -> Generator: + """Return the underlying Xopt generator instance.""" + return self._generator + + @property + def vocs(self) -> VOCS: + return self._generator.vocs + + @property + def fixed_parameters(self) -> dict[str, Any] | None: + return self._fixed_parameters + + @fixed_parameters.setter + def fixed_parameters(self, fixed_parameters: dict[str, Any] | None) -> None: + if not fixed_parameters: + self._fixed_parameters = None + return + + unknown_names = set(fixed_parameters) - set(self.vocs.variable_names) + if unknown_names: + raise KeyError(f"Unknown fixed parameter(s): {sorted(unknown_names)}") + + self._fixed_parameters = dict(fixed_parameters) + + def _seed_state_from_existing_data(self) -> None: + # Recover known trial IDs/parameters from existing generator data when available. + data = self._generator.data + if not isinstance(data, pd.DataFrame) or data.empty: + return + + for _, row in data.iterrows(): + # Reuse stored IDs when present, otherwise allocate synthetic IDs. + if ID_KEY in row and not _is_missing_scalar(row[ID_KEY]): + trial_id = _normalize_trial_id(row[ID_KEY]) + else: + trial_id = self._next_id + self._next_id += 1 + + self._params_by_id[trial_id] = {name: row[name] for name in self.vocs.variable_names if name in row} + if isinstance(trial_id, int): + self._next_id = max(self._next_id, trial_id + 1) + + def suggest(self, num_points: int | None = None) -> list[dict]: + # Default to single-point suggestion when caller does not specify cardinality. + if num_points is None: + num_points = 1 + + # Delegate candidate generation to Xopt and optionally enforce fixed variables. + suggestions = self._generator.generate(num_points) + if self._fixed_parameters: + suggestions = [{**suggestion, **self._fixed_parameters} for suggestion in suggestions] + return self.register_suggestions(suggestions) + + def register_suggestions(self, suggestions: list[dict]) -> list[dict]: + # Attach stable blop trial IDs and cache suggested parameterizations by ID. + registered: list[dict] = [] + for suggestion in suggestions: + trial_id = self._next_id + self._next_id += 1 + + params = {name: suggestion[name] for name in self.vocs.variable_names if name in suggestion} + self._params_by_id[trial_id] = params + registered.append({ID_KEY: trial_id, **suggestion}) + + return registered + + def ingest(self, points: list[dict]) -> None: + # Convert outcome payloads to DataFrame rows expected by Xopt generator.add_data(). + rows: list[dict[str, Any]] = [] + + for point in points: + # Preserve provided IDs when available, else allocate a new one. + raw_trial_id = point.get(ID_KEY) + if raw_trial_id is None: + trial_id: int | str = self._next_id + self._next_id += 1 + else: + trial_id = _normalize_trial_id(raw_trial_id) + + # Merge known suggested parameters with any explicit parameters in incoming point. + point_parameters = {name: point[name] for name in self.vocs.variable_names if name in point} + if trial_id in self._params_by_id: + parameters = {**self._params_by_id[trial_id], **point_parameters} + else: + parameters = point_parameters + + self._params_by_id[trial_id] = parameters + # Everything not in variables and not _id is treated as measured output. + outcomes = {k: v for k, v in point.items() if k not in set(self.vocs.variable_names) | {ID_KEY}} + rows.append({ID_KEY: trial_id, **parameters, **outcomes}) + + # Persist all new observations into the underlying generator state. + new_data = pd.DataFrame(rows) + self._generator.add_data(new_data) + + def register_failures(self, suggestions: list[dict]) -> None: + # Remove failed suggestions from pending parameter cache. + for suggestion in suggestions: + trial_id = suggestion.get(ID_KEY) + if trial_id in self._params_by_id: + self._params_by_id.pop(trial_id) + + def _feasible_mask(self, data: pd.DataFrame) -> pd.Series: + # Delegate feasibility computation to Xopt's native VOCS helper. + constraints = self.vocs.constraints + if not isinstance(constraints, Mapping) or len(constraints) == 0: + return pd.Series([True] * len(data), index=data.index) + + feasibility = get_feasibility_data(self.vocs, data) + if "feasible" not in feasibility: + return pd.Series([True] * len(data), index=data.index) + return pd.Series(feasibility["feasible"], index=data.index, dtype=bool) + + def _output_names(self) -> list[str]: + # Outputs include objectives, constraints, and optional observables. + names = list(self.vocs.objective_names) + if self.vocs.constraints: + names.extend(self.vocs.constraint_names) + if getattr(self.vocs, "observables", None): + names.extend(self.vocs.observables) + return names + + def get_best_points(self) -> list[tuple[int | str, Mapping, Mapping]]: + # Return no points when no data has been ingested. + data = self._generator.data + if not isinstance(data, pd.DataFrame) or data.empty: + return [] + + # Best points are only defined over feasible observations. + candidates: pd.DataFrame = data.loc[self._feasible_mask(data)] + if len(candidates) == 0: + return [] + + objective_names = list(self.vocs.objective_names) + # For single-objective problems, return the single extremum according to direction. + if len(objective_names) == 1 and objective_names[0] in candidates: + best_indices, _, _ = select_best(self.vocs, candidates, n=1) + best_index = best_indices[0] + selected = candidates.loc[[best_index]] + else: + # For multi-objective and objective-less modes, return the available candidate set. + selected = candidates + + output_names = self._output_names() + results: list[tuple[int | str, Mapping, Mapping]] = [] + for _, row in selected.iterrows(): + # Normalize IDs and split into parameter and outcome mappings. + trial_id = _normalize_trial_id(row[ID_KEY] if ID_KEY in row else _) + + parameterization = {name: row[name] for name in self.vocs.variable_names if name in row} + outcomes = {name: row[name] for name in output_names if name in row} + results.append((trial_id, parameterization, outcomes)) + + return results + + def checkpoint(self) -> None: + # Enforce explicit checkpoint path configuration before writing state. + if not self._checkpoint_path: + raise ValueError("Checkpoint path is not set. Please set a checkpoint path when initializing the optimizer.") + + # Persist generator and adapter bookkeeping to a single pickle artifact. + payload = { + "generator": self._generator, + "fixed_parameters": self._fixed_parameters, + "next_id": self._next_id, + "params_by_id": self._params_by_id, + } + path = Path(self._checkpoint_path) + with path.open("wb") as stream: + pickle.dump(payload, stream)