Skip to content
2 changes: 2 additions & 0 deletions bec_ipython_client/tests/end-2-end/test_actors_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
def bec_with_delay_device(bec_ipython_client_fixture):
bec = bec_ipython_client_fixture
bec.builtin_actors.scan_interlock.enabled = True
bec.builtin_actors.scan_interlock.trigger_setting = "restart_scan"
dev = bec.device_manager.devices
dev.ramp_up.min_val.put(0)
dev.ramp_up.max_val.put(400)
Expand Down Expand Up @@ -61,6 +62,7 @@ def test_scan_interlock(
actors: BuiltinActorHli = bec.builtin_actors
assert bec.beamline_states.beam_intensity_sufficient.get()["status"] == "valid"
assert actors.scan_interlock.enabled
assert actors.scan_interlock.trigger_setting == "restart_scan"
current_q_status_msg: ScanQueueStatus = bec.queue.queue_storage.current_scan_queue["primary"]
assert current_q_status_msg.status == "RUNNING"
actors.scan_interlock.add_state_to_interlock("beam_intensity_sufficient", "valid")
Expand Down
60 changes: 30 additions & 30 deletions bec_lib/bec_lib/builtin_actor_hli.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
from typing import TYPE_CHECKING

from bec_lib.config_values import RedisConfigValue
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messages import (
BlStateStatus,
BuiltinActorStateChangeNotification,
InterlockTargetState,
ScanInterlockModifyStateTableMessage,
ScanInterlockTriggerSetting,
Comment thread
d-perl marked this conversation as resolved.
)

if TYPE_CHECKING:
from bec_lib.client import BECClient

VAR_PREFIX = "_BuiltinActors"


def builtin_actor_enabled_var(actor_name: str):
return f"{VAR_PREFIX}/enabled/{actor_name}"


class ScanInterlockHli:
def __init__(self, client: "BECClient", parent: "BuiltinActorHli") -> None:
self._client = client
self._parent = parent
self._actor_name = "ScanInterlockActor"
self._enabled = RedisConfigValue(
connector=self._client.connector, endpoint=MessageEndpoints.scan_interlock_enabled()
)
self._trigger_setting = RedisConfigValue(
connector=self._client.connector,
endpoint=MessageEndpoints.scan_interlock_trigger_setting(),
)

@property
def enabled(self):
return self._parent.check_enabled(self._actor_name)
return self._enabled.value

@enabled.setter
def enabled(self, enabled: bool):
if enabled:
self._parent.set_enabled(self._actor_name)
else:
self._parent.set_disabled(self._actor_name)
self._enabled.value = enabled

@property
def trigger_setting(self):
return self._trigger_setting.value

@trigger_setting.setter
def trigger_setting(self, trigger_setting: str | ScanInterlockTriggerSetting):
accepted_values = [str(v) for v in ScanInterlockTriggerSetting]
if isinstance(trigger_setting, str) and trigger_setting not in accepted_values:
raise ValueError(f"Scan interlock trigger setting must be one of {accepted_values}!")
self._trigger_setting.value = ScanInterlockTriggerSetting(trigger_setting)

@property
def states_watched(self) -> dict[str, InterlockTargetState]:
Expand Down Expand Up @@ -90,25 +98,17 @@ def clear_all(self):
{"data": ScanInterlockModifyStateTableMessage(action="remove_all")},
)

def shutdown(self):
"""Unregister the config-value stream subscriptions from the connector."""
self._enabled.unregister_all()
self._trigger_setting.unregister_all()


class BuiltinActorHli:
def __init__(self, client: "BECClient") -> None:
self._client = client
self.scan_interlock = ScanInterlockHli(self._client, self)

def _notify(self, actor_name):
self._client.connector.send(
MessageEndpoints.builtin_actor_update_req_notif(),
BuiltinActorStateChangeNotification(actor_name=actor_name),
)

def check_enabled(self, actor_name: str):
return bool(self._client.get_global_var(builtin_actor_enabled_var(actor_name)))

def set_enabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), True)
self._notify(actor_name)

def set_disabled(self, actor_name: str):
self._client.set_global_var(builtin_actor_enabled_var(actor_name), False)
self._notify(actor_name)
def shutdown(self):
"""Tear down builtin-actor client subscriptions (called on client shutdown)."""
self.scan_interlock.shutdown()
10 changes: 7 additions & 3 deletions bec_lib/bec_lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def __init__(
if self._initialized:
return
self.__init_params: _InitParams = {
"config": config if config is not None else ServiceConfig(config_name="client"),
"connector_cls": connector_cls if connector_cls is not None else RedisConnector,
"config": (config if config is not None else ServiceConfig(config_name="client")),
"connector_cls": (connector_cls if connector_cls is not None else RedisConnector),
"wait_for_server": wait_for_server,
"prompt_for_acl": prompt_for_acl,
}
Expand Down Expand Up @@ -164,7 +164,7 @@ def __init__(
self._system_user = ""
self.beamline_states = None
self.messaging: MessagingContainer = None # type: ignore
self.builtin_actors = BuiltinActorHli(self)
self.builtin_actors: BuiltinActorHli = None # type: ignore

def __new__(cls, *args, forced=False, **kwargs):
if forced or BECClient._client is None:
Expand Down Expand Up @@ -217,6 +217,7 @@ def start(self):
)
builtins.bec = self._parent
self.macros = UserMacros(self)
self.builtin_actors = BuiltinActorHli(self)
self._start_services()
self.proc = ProcedureHli(self.connector) if self._init_procedure_hli else None
default_namespace = {"dev": self.device_manager.devices, "scans": self.scans_namespace}
Expand Down Expand Up @@ -336,6 +337,9 @@ def shutdown(self, per_thread_timeout_s: float | None = None):
if self.history is not None:
# pylint: disable=protected-access
self.history._shutdown()
if self.builtin_actors is not None:
self.builtin_actors.shutdown()
self.builtin_actors = None # type: ignore
bec_logger.logger.remove()
self.started = False

Expand Down
102 changes: 102 additions & 0 deletions bec_lib/bec_lib/config_values.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from threading import Event
from typing import Any, Callable, Generic, TypeVar
from weakref import ReferenceType

from louie.saferef import BoundMethodWeakref, safe_ref

from bec_lib.endpoints import EndpointInfo, MessageOp
from bec_lib.logger import bec_logger
from bec_lib.messages import ManagedConfigMessage
from bec_lib.redis_connector import RedisConnector

logger = bec_logger.logger
ValueT = TypeVar("ValueT")


class RedisConfigValue(property, Generic[ValueT]):
def __init__(
self,
connector: RedisConnector,
endpoint: EndpointInfo[type[ManagedConfigMessage[ValueT]]],
wait_for_writes: bool = True,
) -> None:
"""A config value bound to a value in Redis, which uses the ManagedConfigMessage and an associated endpoint,
and which can be subscribed to."""

if endpoint.message_op != MessageOp.STREAM or not issubclass(
endpoint.message_type, ManagedConfigMessage
):
raise TypeError(
"RedisConfigManager needs a STREAM endpoint with a message type which is a subclass of ManagedConfigMessage"
)

self._ep = endpoint
self._connector = connector

self._writing_wait_event = Event()
self._writing_wait_event.set()
self._wait_for_writes = wait_for_writes

self._cbs: set[ReferenceType[Callable[[ValueT]]] | BoundMethodWeakref] = set()

self._config = self._fetch()
self._connector.register(self._ep, cb=self._update_cb)

def __del__(self):
if hasattr(self, "_connector"):
self.unregister_all()

def __bool__(self):
raise ValueError(f"Maybe you meant to check {self}.value?")

def _fetch(self) -> ManagedConfigMessage[ValueT]:
existing = self._connector.xread(self._ep, from_start=True)
if existing is None or existing == []:
logger.warning(
f"No value found in redis for managed config var {self._ep.endpoint}, resetting to default."
)
config = self._ep.message_type() # type: ignore # concrete classes must have a default
self._write(config, False)
return config
return existing[-1]["config"]

def _write(self, updated: ManagedConfigMessage[ValueT], wait):
if wait:
self._writing_wait_event.clear()
self._connector.xadd(self._ep, {"config": updated}, max_size=1)
self._writing_wait_event.wait(timeout=2)
if not self._writing_wait_event.is_set():
logger.error(
f"Timed out waiting for config variable {self._ep.endpoint} to return from Redis"
)

def _update_cb(self, msg_dict: dict):
try:
self._config = msg_dict["config"]
for cb_ref in list(self._cbs):
if cb := cb_ref():
try:
cb(self._config.value)
except Exception as e:
logger.error(f"Exception in managed config value callback {cb}: {e}")
else:
self._cbs.discard(cb_ref)
finally:
self._writing_wait_event.set()

@property
def value(self) -> ValueT:
return self._config.value

@value.setter
def value(self, value: ValueT):
self._write(self._ep.message_type(value=value), self._wait_for_writes)

def subscribe(self, cb: Callable[[ValueT], Any]):
self._cbs.add(safe_ref(cb))

def unsubscribe(self, cb: Callable[[ValueT], Any]):
self._cbs.discard(safe_ref(cb))

def unregister_all(self):
self._connector.unregister(self._ep, cb=self._update_cb)
47 changes: 28 additions & 19 deletions bec_lib/bec_lib/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MessageOp(list[str], enum.Enum):
SET = ["remove_from_set", "get_set_members", "delete"]


MessageType = TypeVar("MessageType", bound="type[messages.BECMessage]")
MessageType = TypeVar("MessageType", bound="type[messages.BECMessage]", covariant=True)


@dataclass
Expand Down Expand Up @@ -1417,7 +1417,7 @@ def gui_heartbeat(gui_id: str):
# Procedures

@staticmethod
def available_procedures() -> EndpointInfo:
def available_procedures():
"""
Endpoint for available procedures. This endpoint is used to publish the available procedures
using an AvailableResourceMessage.
Expand All @@ -1433,7 +1433,7 @@ def available_procedures() -> EndpointInfo:
)

@staticmethod
def procedure_request() -> EndpointInfo:
def procedure_request():
"""
Endpoint for requesting new procedures.
The request is sent using a messages.ProcedureRequestMessage message.
Expand All @@ -1449,7 +1449,7 @@ def procedure_request() -> EndpointInfo:
)

@staticmethod
def procedure_request_response() -> EndpointInfo:
def procedure_request_response():
"""
Endpoint for procedure request responses. This endpoint is used to publish the
information on whether the procedure request was accepted or rejected. The response
Expand Down Expand Up @@ -1561,7 +1561,7 @@ def procedure_worker_status_update(queue_id: str):
)

@staticmethod
def procedure_status_update() -> EndpointInfo:
def procedure_status_update():
"""
Endpoint for individual procedure status updates. Mainly for use in updating procedure statuses in the helper.
For general queue monitoring, use the procedure_queue_notif endpoint and read the queues instead.
Expand Down Expand Up @@ -1637,7 +1637,7 @@ def actor_stop(exec_id: str):
)

@staticmethod
def actor_request_response() -> EndpointInfo:
def actor_request_response():
endpoint = f"{EndpointType.INFO.value}/actor/request_response"
return EndpointInfo(
endpoint=endpoint,
Expand All @@ -1646,17 +1646,7 @@ def actor_request_response() -> EndpointInfo:
)

@staticmethod
def builtin_actor_update_req_notif() -> EndpointInfo:
"""Endpoint to notify a builtin actor of a pending state change request."""
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/state_change_request_notification"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.BuiltinActorStateChangeNotification,
message_op=MessageOp.SEND,
)

@staticmethod
def builtin_actor_update_notif(actor_name: str) -> EndpointInfo:
def builtin_actor_update_notif(actor_name: str):
"""Endpoint to notify clients of builtin actor state changes."""
endpoint = f"{EndpointType.INFO.value}/actor/builtin/{actor_name}/state_change_done"
return EndpointInfo(
Expand All @@ -1666,7 +1656,7 @@ def builtin_actor_update_notif(actor_name: str) -> EndpointInfo:
)

@staticmethod
def modify_interlock_table() -> EndpointInfo:
def modify_interlock_table():
endpoint = f"{EndpointType.INTERNAL.value}/actor/builtin/scan_interlock/table_mod"
return EndpointInfo(
endpoint=endpoint,
Expand All @@ -1675,14 +1665,33 @@ def modify_interlock_table() -> EndpointInfo:
)

@staticmethod
def scan_interlock_states() -> EndpointInfo:
def scan_interlock_states():
endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/current_states_watched"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockStateTableContent,
message_op=MessageOp.KEY_VALUE,
)

@staticmethod
def scan_interlock_enabled():
endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/config/enabled"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.BoolConfigDefaultFalse,
message_op=MessageOp.STREAM,
)

@staticmethod
def scan_interlock_trigger_setting():
"""Whether to restart the scan when the interlock is triggered."""
endpoint = f"{EndpointType.INFO.value}/actor/builtin/scan_interlock/config/trigger_setting"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.ScanInterlockTriggerSettingMessage,
message_op=MessageOp.STREAM,
)

@staticmethod
def gui_registry_state(gui_id: str):
"""
Expand Down
Loading
Loading