diff --git a/event_model/__init__.py b/event_model/__init__.py index cc16a40ec..95ecb3e2b 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1354,3 +1354,103 @@ def default(self, obj): return obj.item() return obj.tolist() return json.JSONEncoder.default(self, obj) + + +class OrderError(ValueError): + ... + + +def validate_order(run_iterable): + """ + Validates the order of a Bluesky Run. + + Parameters + --------- + run_iterable: iterable + A Bluesky run in the form of an iterable of name, doc pairs. + """ + datum_cache = {} + resource_cache = {} + descriptor_cache = {} + last_event_time = {} + stop = None + start = None + + def event_check(event): + # Check that descriptor doc is received before the first event of that + # stream. + if last_event_time.get(event['descriptor']) is None: + if descriptor_cache.get(event['descriptor']) is None: + raise OrderError("Descriptor was not received before the " + "first event of the stream.") + + # For each stream check that events are in timestamp order. + if last_event_time.get(event['descriptor']) is None: + last_event_time[event['descriptor']] = event['time'] + else: + if event['time'] < last_event_time[event['descriptor']]: + raise OrderError("Events out of order.") + last_event_time[event['descriptor']] = event['time'] + + external_keys = {key for key, val + in descriptor_cache[event['descriptor']]['data_keys'].items() + if 'external' in val} + + # Check that the filled keys match the external keys defined in the + # descriptor. + if external_keys != set(event['filled'].keys()): + raise ValueError("Filled keys do not match external_keys from " + f"the descriptor. external_keys:{external_keys}, " + f"filled_keys:{event['filled'].keys()}") + + # Check that for each datum_id in the event, the datum document was + # received first. + for key, value in event['data'].items(): + if key in external_keys: + if datum_cache.get(value) is None: + raise OrderError(f"Datum document {value} not received " + f"before event that references it. event:{event}") + + def datum_check(datum): + # Check that the referenced resource is received first. + if resource_cache.get(doc['resource']) is None: + raise OrderError(f"Resource document {datum['resource']} was not " + f"received before the datum that refrences it. " + f"datum: {datum}") + + for name, doc in run_iterable: + # Check that the start document is the first document. + if not start: + if name != 'start': + raise OrderError("The first document of the run must be a start " + "document, but the first document received was " + f"{name},{doc}") + + # Check that the stop document is the last document. + if stop: + raise OrderError("The stop document must be the last document of " + "the run. Documents were received following the " + "stop document.") + if name == 'start': + if start: + raise ValueError(f"A second start document was received. {doc}") + else: + start = doc + if name == 'stop': + stop = doc + if name == 'resource': + resource_cache[doc['uid']] = doc + if name == 'descriptor': + descriptor_cache[doc['uid']] = doc + if name == 'datum': + datum_cache[doc['datum_id']] = doc + datum_check(doc) + if name == 'datum_page': + for datum in unpack_datum_page(doc): + datum_cache[datum['datum_id']] = datum + datum_check(datum) + if name == 'event': + event_check(doc) + if name == 'event_page': + for event in unpack_event_page(doc): + event_check(event) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index dd3ea7338..f788481e2 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -668,3 +668,149 @@ def factory_with_subfactory_only(name, doc): assert expected_item in collected assert unexpected_item not in collected collected.clear() + + +def test_validate_order(tmp_path): + path_root = str(tmp_path) + + # Make some example documents to work with. + run_bundle = event_model.compose_run() + desc_bundle = run_bundle.compose_descriptor( + data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'}, + 'image': {'shape': [512, 512], 'dtype': 'number', + 'source': '...', 'external': 'FILESTORE:'}}, + name='primary') + desc_bundle_baseline = run_bundle.compose_descriptor( + data_keys={'motor': {'shape': [], 'dtype': 'number', 'source': '...'}}, + name='baseline') + res_bundle = run_bundle.compose_resource( + spec='DUMMY', root=path_root, resource_path='stack.tiff', + resource_kwargs={'a': 1, 'b': 2}) + datum_doc = res_bundle.compose_datum(datum_kwargs={'c': 3, 'd': 4}) + raw_event = desc_bundle.compose_event( + data={'motor': 0, 'image': datum_doc['datum_id']}, + timestamps={'motor': 0, 'image': 0}, filled={'image': False}, + seq_num=1) + raw_event2 = desc_bundle.compose_event( + data={'motor': 0, 'image': datum_doc['datum_id']}, + timestamps={'motor': 0, 'image': 0}, filled={'image': False}, + seq_num=1) + stop_doc = run_bundle.compose_stop() + + # A run that should not throw any exceptions. + good_run = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('datum_page', event_model.pack_datum_page(datum_doc)), + ('event', copy.deepcopy(raw_event)), + ('event_page', event_model.pack_event_page(copy.deepcopy(raw_event))), + ('stop', stop_doc)] + + # Test a good run. + event_model.validate_order(good_run) + + # Start doc is not the first document. + start_not_first = [('descriptor', desc_bundle.descriptor_doc), + ('start', run_bundle.start_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('event', copy.deepcopy(raw_event)), + ('stop', stop_doc)] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(start_not_first) + + # Two start documents. + two_starts = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('start', run_bundle.start_doc), + ('datum', datum_doc), + ('event', copy.deepcopy(raw_event)), + ('stop', stop_doc)] + + with pytest.raises(ValueError): + event_model.validate_order(two_starts) + + # Stop doc order. + bad_stop = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('stop', stop_doc), + ('event', copy.deepcopy(raw_event))] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(bad_stop) + + # Check that referenced resource is received first. + bad_resource = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('datum', datum_doc), + ('resource', res_bundle.resource_doc), + ('event', copy.deepcopy(raw_event)), + ('stop', stop_doc)] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(bad_resource) + + # Descriptor received after event. + late_descriptor = [('start', run_bundle.start_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('event', copy.deepcopy(raw_event)), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('stop', stop_doc)] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(late_descriptor) + + # Events out of order. + wrong_event_order = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('event', copy.deepcopy(raw_event2)), + ('event', copy.deepcopy(raw_event)), + ('stop', stop_doc)] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(wrong_event_order) + + # Got datum afer event that referenced it. + wrong_datum_order = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('event', copy.deepcopy(raw_event)), + ('datum', datum_doc), + ('stop', stop_doc)] + + with pytest.raises(event_model.OrderError): + event_model.validate_order(wrong_datum_order) + + # Check filled.keys() is the same as the external keys defined in the + # descriptor. + bad_event = desc_bundle.compose_event( + data={'motor': 0, 'image': datum_doc['datum_id']}, + timestamps={'motor': 0, 'image': 0}, filled={}, + seq_num=1) + + wrong_filled = [('start', run_bundle.start_doc), + ('descriptor', desc_bundle.descriptor_doc), + ('descriptor', desc_bundle_baseline.descriptor_doc), + ('resource', res_bundle.resource_doc), + ('datum', datum_doc), + ('event', copy.deepcopy(bad_event)), + ('stop', stop_doc)] + + with pytest.raises(ValueError): + event_model.validate_order(wrong_filled)