diff --git a/src/event_model/__init__.py b/src/event_model/__init__.py index 3e39b96e..0a50efd8 100644 --- a/src/event_model/__init__.py +++ b/src/event_model/__init__.py @@ -1529,6 +1529,11 @@ def __init__( self._resources: dict = {} self._stream_resources: dict = {} + # Map RunStart UID to the list of Resource UIDs and StreamResource UIDs. + # These are used to facilitate efficient cleanup of the caches above. + self._start_to_resources: defaultdict = defaultdict(list) + self._start_to_stream_resources: defaultdict = defaultdict(list) + # Old-style Resources that do not have a RunStart UID self._unlabeled_resources: deque = deque(maxlen=10000) @@ -1699,6 +1704,7 @@ def resource(self, doc: Resource) -> None: else: self._fillers[start_uid].resource(doc) self._resources[doc["uid"]] = doc["run_start"] + self._start_to_resources[start_uid].append(doc["uid"]) for callback in self._factory_cbs_by_start[start_uid]: callback("resource", doc) for callback in self._subfactory_cbs_by_start[start_uid]: @@ -1708,6 +1714,7 @@ def stream_resource(self, doc: StreamResource) -> None: start_uid = doc["run_start"] # No need for Try self._fillers[start_uid].stream_resource(doc) self._stream_resources[doc["uid"]] = doc["run_start"] + self._start_to_stream_resources[start_uid].append(doc["uid"]) for callback in self._factory_cbs_by_start[start_uid]: callback("stream_resource", doc) for callback in self._subfactory_cbs_by_start[start_uid]: @@ -1728,7 +1735,10 @@ def stop(self, doc: RunStop) -> None: self._descriptor_to_start.pop(descriptor_uid, None) self._factory_cbs_by_descriptor.pop(descriptor_uid, None) self._subfactory_cbs_by_descriptor.pop(descriptor_uid, None) - self._resources.pop(start_uid, None) + for resource_uid in self._start_to_resources.pop(start_uid, ()): + self._resources.pop(resource_uid, None) + for stream_resource_uid in self._start_to_stream_resources.pop(start_uid, ()): + self._stream_resources.pop(stream_resource_uid, None) self._start_to_start_doc.pop(start_uid, None) diff --git a/src/event_model/documents/run_start.py b/src/event_model/documents/run_start.py index b2971709..ce515a0b 100644 --- a/src/event_model/documents/run_start.py +++ b/src/event_model/documents/run_start.py @@ -137,6 +137,9 @@ class RunStart(TypedDict): Unix group to associate this data with """ hints: NotRequired[Hints] + """ + Start-level hints + """ owner: NotRequired[str] """ Unix owner to associate this data with diff --git a/src/event_model/tests/test_run_router.py b/src/event_model/tests/test_run_router.py index 8912ff4f..f1465f8d 100644 --- a/src/event_model/tests/test_run_router.py +++ b/src/event_model/tests/test_run_router.py @@ -359,3 +359,81 @@ def exception_callback_factory(start_doc_name, start_doc): event_document = {"descriptor": "ghijkl", "uid": "mnopqr"} rr.event(event_document) + + +def test_run_router_resource_cleanup(tmp_path): + """Resources and StreamResources must be removed from internal caches after stop. + + Regression test for a bug where _resources was popped using the wrong key + (start_uid instead of resource_uid), so all Resource documents from completed + runs accumulated in _resources indefinitely. StreamResource documents were + never cleaned up at all. + """ + rr = event_model.RunRouter([]) + + # --- Run 1: classic Resource / Datum documents --- + bundle1 = event_model.compose_run() + start1 = bundle1.start_doc + resource_bundle = bundle1.compose_resource( + spec="TIFF", + root=str(tmp_path), + resource_path="stack.tiff", + resource_kwargs={}, + ) + resource_doc = resource_bundle.resource_doc + stop1 = bundle1.compose_stop() + + rr("start", start1) + rr("resource", resource_doc) + rr("stop", stop1) + + # After stop, neither the resource nor the start should remain in any cache. + assert resource_doc["uid"] not in rr._resources, ( + "_resources still contains the resource uid after stop" + ) + assert start1["uid"] not in rr._start_to_resources, ( + "_start_to_resources still contains the start uid after stop" + ) + assert start1["uid"] not in rr._start_to_start_doc, ( + "_start_to_start_doc still contains the start uid after stop" + ) + + # --- Run 2: StreamResource / StreamDatum documents --- + bundle2 = event_model.compose_run() + start2 = bundle2.start_doc + sres_doc, _ = bundle2.compose_stream_resource( + mimetype="image/tiff", + data_key="det", + uri="file://localhost" + str(tmp_path) + "/frames", + parameters={}, + ) + stop2 = bundle2.compose_stop() + + rr("start", start2) + rr("stream_resource", sres_doc) + rr("stop", stop2) + + assert sres_doc["uid"] not in rr._stream_resources, ( + "_stream_resources still contains the stream_resource uid after stop" + ) + assert start2["uid"] not in rr._start_to_stream_resources, ( + "_start_to_stream_resources still contains the start uid after stop" + ) + + # --- Ensure caches stay empty across multiple runs --- + for _ in range(3): + bundle = event_model.compose_run() + sres, _ = bundle.compose_stream_resource( + mimetype="image/tiff", + data_key="det", + uri="file://localhost" + str(tmp_path) + "/frames", + parameters={}, + ) + rr("start", bundle.start_doc) + rr("stream_resource", sres) + rr("stop", bundle.compose_stop()) + + assert len(rr._resources) == 0 + assert len(rr._stream_resources) == 0 + assert len(rr._start_to_resources) == 0 + assert len(rr._start_to_stream_resources) == 0