Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,11 @@ def __init__(
self._resources: dict = {}
self._stream_resources: dict = {}

# Map RunStart UID to the list of Resource UIDs and StreamResource UIDs.
# These are used to facilitate efficient cleanup of the caches above.
self._start_to_resources: defaultdict = defaultdict(list)
self._start_to_stream_resources: defaultdict = defaultdict(list)

# Old-style Resources that do not have a RunStart UID
self._unlabeled_resources: deque = deque(maxlen=10000)

Expand Down Expand Up @@ -1699,6 +1704,7 @@ def resource(self, doc: Resource) -> None:
else:
self._fillers[start_uid].resource(doc)
self._resources[doc["uid"]] = doc["run_start"]
self._start_to_resources[start_uid].append(doc["uid"])
for callback in self._factory_cbs_by_start[start_uid]:
callback("resource", doc)
for callback in self._subfactory_cbs_by_start[start_uid]:
Expand All @@ -1708,6 +1714,7 @@ def stream_resource(self, doc: StreamResource) -> None:
start_uid = doc["run_start"] # No need for Try
self._fillers[start_uid].stream_resource(doc)
self._stream_resources[doc["uid"]] = doc["run_start"]
self._start_to_stream_resources[start_uid].append(doc["uid"])
for callback in self._factory_cbs_by_start[start_uid]:
callback("stream_resource", doc)
for callback in self._subfactory_cbs_by_start[start_uid]:
Expand All @@ -1728,7 +1735,10 @@ def stop(self, doc: RunStop) -> None:
self._descriptor_to_start.pop(descriptor_uid, None)
self._factory_cbs_by_descriptor.pop(descriptor_uid, None)
self._subfactory_cbs_by_descriptor.pop(descriptor_uid, None)
self._resources.pop(start_uid, None)
for resource_uid in self._start_to_resources.pop(start_uid, ()):
self._resources.pop(resource_uid, None)
for stream_resource_uid in self._start_to_stream_resources.pop(start_uid, ()):
self._stream_resources.pop(stream_resource_uid, None)
self._start_to_start_doc.pop(start_uid, None)


Expand Down
3 changes: 3 additions & 0 deletions src/event_model/documents/run_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class RunStart(TypedDict):
Unix group to associate this data with
"""
hints: NotRequired[Hints]
"""
Start-level hints
"""
owner: NotRequired[str]
"""
Unix owner to associate this data with
Expand Down
78 changes: 78 additions & 0 deletions src/event_model/tests/test_run_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,81 @@ def exception_callback_factory(start_doc_name, start_doc):

event_document = {"descriptor": "ghijkl", "uid": "mnopqr"}
rr.event(event_document)


def test_run_router_resource_cleanup(tmp_path):
"""Resources and StreamResources must be removed from internal caches after stop.

Regression test for a bug where _resources was popped using the wrong key
(start_uid instead of resource_uid), so all Resource documents from completed
runs accumulated in _resources indefinitely. StreamResource documents were
never cleaned up at all.
"""
rr = event_model.RunRouter([])

# --- Run 1: classic Resource / Datum documents ---
bundle1 = event_model.compose_run()
start1 = bundle1.start_doc
resource_bundle = bundle1.compose_resource(
spec="TIFF",
root=str(tmp_path),
resource_path="stack.tiff",
resource_kwargs={},
)
resource_doc = resource_bundle.resource_doc
stop1 = bundle1.compose_stop()

rr("start", start1)
rr("resource", resource_doc)
rr("stop", stop1)

# After stop, neither the resource nor the start should remain in any cache.
assert resource_doc["uid"] not in rr._resources, (
"_resources still contains the resource uid after stop"
)
assert start1["uid"] not in rr._start_to_resources, (
"_start_to_resources still contains the start uid after stop"
)
assert start1["uid"] not in rr._start_to_start_doc, (
"_start_to_start_doc still contains the start uid after stop"
)

# --- Run 2: StreamResource / StreamDatum documents ---
bundle2 = event_model.compose_run()
start2 = bundle2.start_doc
sres_doc, _ = bundle2.compose_stream_resource(
mimetype="image/tiff",
data_key="det",
uri="file://localhost" + str(tmp_path) + "/frames",
parameters={},
)
stop2 = bundle2.compose_stop()

rr("start", start2)
rr("stream_resource", sres_doc)
rr("stop", stop2)

assert sres_doc["uid"] not in rr._stream_resources, (
"_stream_resources still contains the stream_resource uid after stop"
)
assert start2["uid"] not in rr._start_to_stream_resources, (
"_start_to_stream_resources still contains the start uid after stop"
)

# --- Ensure caches stay empty across multiple runs ---
for _ in range(3):
bundle = event_model.compose_run()
sres, _ = bundle.compose_stream_resource(
mimetype="image/tiff",
data_key="det",
uri="file://localhost" + str(tmp_path) + "/frames",
parameters={},
)
rr("start", bundle.start_doc)
rr("stream_resource", sres)
rr("stop", bundle.compose_stop())

assert len(rr._resources) == 0
assert len(rr._stream_resources) == 0
assert len(rr._start_to_resources) == 0
assert len(rr._start_to_stream_resources) == 0
Loading