Skip to content
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ maintainers = [
requires-python = ">=3.10"
dependencies = [
"ax-platform>=1.1.0,<1.3",
"xopt",
"bluesky>=1.14.2",
"bluesky-queueserver-api>=0.0.12",
"torch",
Expand Down
4 changes: 4 additions & 0 deletions src/blop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .ax import DOF, Agent, ChoiceDOF, DOFConstraint, Objective, OutcomeConstraint, RangeDOF, ScalarizedObjective
from .plans import acquire_baseline, default_acquire, optimize, optimize_step, sample_suggestions
from .xopt import XoptAgent, XoptOptimizer, build_vocs

try:
from ._version import __version__
Expand All @@ -21,4 +22,7 @@
"optimize",
"optimize_step",
"sample_suggestions",
"XoptAgent",
"XoptOptimizer",
"build_vocs",
]
97 changes: 97 additions & 0 deletions src/blop/tests/xopt/test_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from unittest.mock import MagicMock

import pytest
from bluesky.run_engine import RunEngine

from xopt.generators.bayesian import ExpectedImprovementGenerator
from xopt.generators.random import RandomGenerator

from blop.ax.dof import RangeDOF
from blop.ax.objective import Objective
from blop.tests.conftest import MovableSignal, ReadableSignal
from blop.xopt.agent import XoptAgent


@pytest.fixture(scope="function")
def RE():
return RunEngine({})


def test_xopt_agent_init_and_suggest():
movable = MovableSignal(name="x")
readable = ReadableSignal(name="det")
dof = RangeDOF(actuator=movable, bounds=(0.0, 1.0), parameter_type="float")
objective = Objective(name="score", minimize=True)

evaluation_function = MagicMock(return_value=[{"_id": 0, "score": 0.0}])
agent = XoptAgent(
sensors=[readable],
dofs=[dof],
objectives=[objective],
evaluation_function=evaluation_function,
generator=RandomGenerator,
)

suggestions = agent.suggest(1)
assert len(suggestions) == 1
assert "_id" in suggestions[0]
assert "x" in suggestions[0]


def test_xopt_agent_optimize_runs(RE):
movable = MovableSignal(name="x")
readable = ReadableSignal(name="det")
dof = RangeDOF(actuator=movable, bounds=(0.0, 1.0), parameter_type="float")
objective = Objective(name="score", minimize=True)

def evaluate(uid, suggestions):
return [{"_id": suggestion["_id"], "score": float(suggestion["x"])} for suggestion in suggestions]

agent = XoptAgent(
sensors=[readable],
dofs=[dof],
objectives=[objective],
evaluation_function=evaluate,
generator=RandomGenerator,
)

RE(agent.optimize(iterations=2, n_points=1))

assert agent.optimizer.generator.data is not None
assert len(agent.optimizer.generator.data) == 2
assert len(agent.get_best_points()) >= 1


def test_xopt_agent_expected_improvement_simple_minimization(RE):
movable = MovableSignal(name="x")
dof = RangeDOF(actuator=movable, bounds=(0.0, 1.0), parameter_type="float")
objective = Objective(name="score", minimize=True)

def evaluate(uid, suggestions):
return [{"_id": suggestion["_id"], "score": (float(suggestion["x"]) - 0.25) ** 2} for suggestion in suggestions]

agent = XoptAgent(
sensors=[],
dofs=[dof],
objectives=[objective],
evaluation_function=evaluate,
generator=ExpectedImprovementGenerator,
)

# Seed EI with initial measurements before optimization iterations.
agent.ingest(
[
{"x": 0.0, "score": (0.0 - 0.25) ** 2},
{"x": 0.5, "score": (0.5 - 0.25) ** 2},
{"x": 1.0, "score": (1.0 - 0.25) ** 2},
]
)

RE(agent.optimize(iterations=3, n_points=1))

assert agent.optimizer.generator.data is not None
assert len(agent.optimizer.generator.data) == 6
best_points = agent.get_best_points()
assert len(best_points) == 1
_, _, outcomes = best_points[0]
assert outcomes["score"] <= 0.0625
60 changes: 60 additions & 0 deletions src/blop/tests/xopt/test_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest

from blop.ax.dof import ChoiceDOF, DOFConstraint, RangeDOF
from blop.ax.objective import Objective, OutcomeConstraint, ScalarizedObjective
from blop.tests.conftest import ReadableSignal
from blop.xopt.mapping import build_vocs


def test_build_vocs_maps_basic_objects():
dof_x = RangeDOF(name="x", bounds=(0.0, 10.0), parameter_type="float")
dof_mode = ChoiceDOF(name="mode", values=[0, 1], parameter_type="int")
objective = Objective(name="score", minimize=True)
outcome_constraint = OutcomeConstraint("s <= 2.5", s=objective)

vocs = build_vocs(
dofs=[dof_x, dof_mode],
objectives=[objective],
outcome_constraints=[outcome_constraint],
sensors=[ReadableSignal(name="detector")],
)

assert vocs.variables["x"].domain == [0.0, 10.0]
assert vocs.variables["mode"].domain == [0.0, 1.0]
assert "score" in vocs.objectives
assert "score" in vocs.constraints
assert "detector" in vocs.observables


def test_build_vocs_applies_single_variable_dof_constraint():
dof_x = RangeDOF(name="x", bounds=(0.0, 10.0), parameter_type="float")
objective = Objective(name="score", minimize=True)
dof_constraint = DOFConstraint("x >= 3.0", x=dof_x)

vocs = build_vocs(
dofs=[dof_x],
objectives=[objective],
dof_constraints=[dof_constraint],
)

assert vocs.variables["x"].domain == [3.0, 10.0]


def test_build_vocs_rejects_scalarized_objective_mapping():
with pytest.raises(ValueError):
build_vocs(
dofs=[RangeDOF(name="x", bounds=(0.0, 1.0), parameter_type="float")],
objectives=ScalarizedObjective("a + b", minimize=True, a="oa", b="ob"),
)


def test_build_vocs_rejects_multivariable_dof_constraint():
x = RangeDOF(name="x", bounds=(0.0, 10.0), parameter_type="float")
y = RangeDOF(name="y", bounds=(0.0, 10.0), parameter_type="float")

with pytest.raises(ValueError):
build_vocs(
dofs=[x, y],
objectives=[Objective(name="score", minimize=True)],
dof_constraints=[DOFConstraint("x + y <= 1", x=x, y=y)],
)
Loading
Loading