Skip to content
100 changes: 100 additions & 0 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,3 +1354,103 @@ def default(self, obj):
return obj.item()
return obj.tolist()
return json.JSONEncoder.default(self, obj)


class OrderError(ValueError):
...


def validate_order(run_iterable):
"""
Validates the order of a Bluesky Run.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you copy over the content from #98 here to explain what constraints this is enforcing?


Parameters
---------
run_iterable: iterable
A Bluesky run in the form of an iterable of name, doc pairs.
"""
datum_cache = {}
resource_cache = {}
descriptor_cache = {}
last_event_time = {}
stop = None
start = None

def event_check(event):
# Check that descriptor doc is received before the first event of that
# stream.
if last_event_time.get(event['descriptor']) is None:
if descriptor_cache.get(event['descriptor']) is None:
raise OrderError("Descriptor was not received before the "
"first event of the stream.")

# For each stream check that events are in timestamp order.
if last_event_time.get(event['descriptor']) is None:
last_event_time[event['descriptor']] = event['time']
else:
if event['time'] < last_event_time[event['descriptor']]:
raise OrderError("Events out of order.")
last_event_time[event['descriptor']] = event['time']

external_keys = {key for key, val
in descriptor_cache[event['descriptor']]['data_keys'].items()
if 'external' in val}

# Check that the filled keys match the external keys defined in the
# descriptor.
if external_keys != set(event['filled'].keys()):
raise ValueError("Filled keys do not match external_keys from "
f"the descriptor. external_keys:{external_keys}, "
f"filled_keys:{event['filled'].keys()}")

# Check that for each datum_id in the event, the datum document was
# received first.
for key, value in event['data'].items():
if key in external_keys:
if datum_cache.get(value) is None:
raise OrderError(f"Datum document {value} not received "
f"before event that references it. event:{event}")

def datum_check(datum):
# Check that the referenced resource is received first.
if resource_cache.get(doc['resource']) is None:
raise OrderError(f"Resource document {datum['resource']} was not "
f"received before the datum that refrences it. "
f"datum: {datum}")

for name, doc in run_iterable:
# Check that the start document is the first document.
if not start:
if name != 'start':
raise OrderError("The first document of the run must be a start "
"document, but the first document received was "
f"{name},{doc}")

# Check that the stop document is the last document.
if stop:
raise OrderError("The stop document must be the last document of "
"the run. Documents were received following the "
"stop document.")
if name == 'start':
if start:
raise ValueError(f"A second start document was received. {doc}")
else:
start = doc
if name == 'stop':
stop = doc
if name == 'resource':
resource_cache[doc['uid']] = doc
if name == 'descriptor':
descriptor_cache[doc['uid']] = doc
if name == 'datum':
datum_cache[doc['datum_id']] = doc
datum_check(doc)
if name == 'datum_page':
for datum in unpack_datum_page(doc):
datum_cache[datum['datum_id']] = datum
datum_check(datum)
if name == 'event':
event_check(doc)
if name == 'event_page':
for event in unpack_event_page(doc):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not address:

Event[Page]s across streams are in time order up to the time resolution of a Page. That is, if we denote event_page['time'][0] as a_i ("a initial") and event_page['time'][-1] as a_f ("a final") for a given EventPage a, if b follows a and then b_f >= a_i. In English, each EventPage's highest time must be greater than or equal to the preceding EventPages' lowest times.

event_check(event)
146 changes: 146 additions & 0 deletions event_model/tests/test_em.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,149 @@ def factory_with_subfactory_only(name, doc):
assert expected_item in collected
assert unexpected_item not in collected
collected.clear()


def test_validate_order(tmp_path):
path_root = str(tmp_path)

# Make some example documents to work with.
run_bundle = event_model.compose_run()
desc_bundle = run_bundle.compose_descriptor(
data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'},
'image': {'shape': [512, 512], 'dtype': 'number',
'source': '...', 'external': 'FILESTORE:'}},
name='primary')
desc_bundle_baseline = run_bundle.compose_descriptor(
data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'}},
name='baseline')
res_bundle = run_bundle.compose_resource(
spec='DUMMY', root=path_root, resource_path='stack.tiff',
resource_kwargs={'a': 1, 'b': 2})
datum_doc = res_bundle.compose_datum(datum_kwargs={'c': 3, 'd': 4})
raw_event = desc_bundle.compose_event(
data={'motor': 0, 'image': datum_doc['datum_id']},
timestamps={'motor': 0, 'image': 0}, filled={'image': False},
seq_num=1)
raw_event2 = desc_bundle.compose_event(
data={'motor': 0, 'image': datum_doc['datum_id']},
timestamps={'motor': 0, 'image': 0}, filled={'image': False},
seq_num=1)
stop_doc = run_bundle.compose_stop()

# A run that should not throw any exceptions.
good_run = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('datum_page', event_model.pack_datum_page(datum_doc)),
('event', copy.deepcopy(raw_event)),
('event_page', event_model.pack_event_page(copy.deepcopy(raw_event))),
('stop', stop_doc)]

# Test a good run.
event_model.validate_order(good_run)

# Start doc is not the first document.
start_not_first = [('descriptor', desc_bundle.descriptor_doc),
('start', run_bundle.start_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('event', copy.deepcopy(raw_event)),
('stop', stop_doc)]

with pytest.raises(event_model.OrderError):
event_model.validate_order(start_not_first)

# Two start documents.
two_starts = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('start', run_bundle.start_doc),
('datum', datum_doc),
('event', copy.deepcopy(raw_event)),
('stop', stop_doc)]

with pytest.raises(ValueError):
event_model.validate_order(two_starts)

# Stop doc order.
bad_stop = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('stop', stop_doc),
('event', copy.deepcopy(raw_event))]

with pytest.raises(event_model.OrderError):
event_model.validate_order(bad_stop)

# Check that referenced resource is received first.
bad_resource = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('datum', datum_doc),
('resource', res_bundle.resource_doc),
('event', copy.deepcopy(raw_event)),
('stop', stop_doc)]

with pytest.raises(event_model.OrderError):
event_model.validate_order(bad_resource)

# Descriptor received after event.
late_descriptor = [('start', run_bundle.start_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('event', copy.deepcopy(raw_event)),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('stop', stop_doc)]

with pytest.raises(event_model.OrderError):
event_model.validate_order(late_descriptor)

# Events out of order.
wrong_event_order = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('event', copy.deepcopy(raw_event2)),
('event', copy.deepcopy(raw_event)),
('stop', stop_doc)]

with pytest.raises(event_model.OrderError):
event_model.validate_order(wrong_event_order)

# Got datum afer event that referenced it.
wrong_datum_order = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('event', copy.deepcopy(raw_event)),
('datum', datum_doc),
('stop', stop_doc)]

with pytest.raises(event_model.OrderError):
event_model.validate_order(wrong_datum_order)

# Check filled.keys() is the same as the external keys defined in the
# descriptor.
bad_event = desc_bundle.compose_event(
data={'motor': 0, 'image': datum_doc['datum_id']},
timestamps={'motor': 0, 'image': 0}, filled={},
seq_num=1)

wrong_filled = [('start', run_bundle.start_doc),
('descriptor', desc_bundle.descriptor_doc),
('descriptor', desc_bundle_baseline.descriptor_doc),
('resource', res_bundle.resource_doc),
('datum', datum_doc),
('event', copy.deepcopy(bad_event)),
('stop', stop_doc)]

with pytest.raises(ValueError):
event_model.validate_order(wrong_filled)