diff --git a/suitcase/mongo_embedded/__init__.py b/suitcase/mongo_embedded/__init__.py index 0c86b0f..5c2741b 100644 --- a/suitcase/mongo_embedded/__init__.py +++ b/suitcase/mongo_embedded/__init__.py @@ -3,10 +3,11 @@ from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from pymongo import UpdateOne +import pymongo from threading import Event -import sys import time import queue +import bson __version__ = get_versions()['version'] del get_versions @@ -21,17 +22,8 @@ class Serializer(event_model.DocumentRouter): and resource documents. The event_pages are stored in the event colleciton, and datum_pages are stored in the datum collection. - To ensure data integrity two databases are required. A volatile database - that allows the update of existing documents. And a permanent database that - does not allow updates to documents. When the stop document is received - from the RunEngine the volatile data is "frozen" by moving it to the - permanent database. - This Serializer ensures that when the stop document or close request is - received all documents we be written to the volatile database. After - everything has been successfully writen to volatile database, it will copy - all of the runs documents to the permanent database, check that it has been - correctly copied, and then delete the volatile data. + received all documents we be written to the database. This Serializer assumes that documents have been previously validated according to the bluesky event-model. @@ -48,18 +40,15 @@ class Serializer(event_model.DocumentRouter): >>> from ophyd.sim import det, motor >>> from suitcase.mongo_embedded import Serializer - >>> # Create two sandboxed mongo instances - >>> volatile_box = MongoBox() - >>> permanent_box = MongoBox() - >>> volatile_box.start() - >>> permanent_box.start() + >>> # Create a sandboxed mongo instance. + >>> mongo_box = MongoBox() + >>> mongo_box.start() - >>> # Get references to the mongo databases - >>> volatile_db = volatile_box.client().db - >>> permanent_db = permanent_box.client().db + >>> # Get a reference to the mongo database. + >>> db = mongo_box.client().db >>> # Create the Serializer - >>> serializer = Serializer(volatile_db, permanent_db) + >>> serializer = Serializer(db) >>> RE = RunEngine({}) >>> RE.subscribe(serializer) @@ -68,19 +57,16 @@ class Serializer(event_model.DocumentRouter): >>> RE(scan([det], motor, 1, 10, 10)) """ - def __init__(self, volatile_db, permanent_db, num_threads=1, - queue_size=20, embedder_size=1000000, page_size=5000000, - max_insert_time=10, **kwargs): + def __init__(self, db, num_threads=1, queue_size=100, + embedder_size=1000000, page_size=5000000, + max_insert_time=5, **kwargs): """ Insert documents into MongoDB using an embedded data model. Parameters ---------- - volatile_db: pymongo database - database for temporary storage - permanent_db: pymongo database - database for permanent storage + db: pymongo database num_theads: int, optional number of workers that read from the buffer and write to the database. Must be 5 or less. Default is 1. @@ -117,8 +103,8 @@ def __init__(self, volatile_db, permanent_db, num_threads=1, self._EMBED_SIZE = embedder_size self._PAGE_SIZE = page_size self._MAX_INSERT = max_insert_time - self._permanent_db = permanent_db - self._volatile_db = volatile_db + self._QUEUE_TIMEOUT = 0.2 + self._db = db self._event_queue = queue.Queue(maxsize=self._QUEUE_SIZE) self._datum_queue = queue.Queue(maxsize=self._QUEUE_SIZE) self._event_embedder = Embedder('event', self._EMBED_SIZE) @@ -149,6 +135,45 @@ def __init__(self, volatile_db, permanent_db, num_threads=1, self._datum_executor.submit(self._datum_worker) self._count_executor.submit(self._count_worker) + self._create_indexes() + + def _create_indexes(self): + """ + Create indexes on the various collections. + If the index already exists, this has no effect. + """ + self._db.header.create_index('resources.uid', unique=True, sparse=True) + self._db.header.create_index('resources.resource_id') # legacy + self._db.header.create_index( + [('start.uid', pymongo.DESCENDING)], unique=True, sparse=True) + self._db.header.create_index( + [('start.time', pymongo.DESCENDING), + ('start.scan_id', pymongo.DESCENDING)], + unique=False, background=True) + self._db.header.create_index([("$**", "text")]) + self._db.header.create_index('stop.run_start', unique=True, sparse=True) + self._db.header.create_index('stop.uid', unique=True, sparse=True) + self._db.header.create_index( + [('stop.time', pymongo.DESCENDING)], unique=False, + background=True, sparse=True) + self._db.header.create_index( + [('descriptors.uid', pymongo.DESCENDING)], unique=True, sparse=True) + self._db.header.create_index( + [('descriptors.run_start', pymongo.DESCENDING), + ('time', pymongo.DESCENDING)], + unique=False, background=True) + self._db.header.create_index( + [('descriptors.time', pymongo.DESCENDING)], + unique=False, background=True) + self._db.event.create_index( + [('uid', pymongo.DESCENDING)], unique=True, sparse=True) + self._db.event.create_index( + [('descriptor', pymongo.DESCENDING), + ('time.0', pymongo.ASCENDING)], + unique=False, background=True) + self._db.datum.create_index('datum_id', unique=True, sparse=True) + self._db.datum.create_index('resource') + def __call__(self, name, doc): # Before inserting into mongo, convert any numpy objects into built-in # Python types compatible with pymongo. @@ -175,7 +200,7 @@ def inner(self): @_try_wrapper def _event_worker(self): # Gets events from the queue, embedds them, and writes them to the - # volatile database. + # database. last_push = 0 event = None @@ -186,7 +211,7 @@ def _event_worker(self): do_push = False try: if event is None: - event = self._event_queue.get(timeout=0.5) + event = self._event_queue.get(timeout=self._QUEUE_TIMEOUT) except queue.Empty: do_push = True else: @@ -200,8 +225,8 @@ def _event_worker(self): or do_push or time.monotonic() > (last_push + self._MAX_INSERT)): if not self._event_embedder.empty(): - event_dump = self._event_embedder.dump() - self._bulkwrite_event(event_dump) + event_dump, dump_sizes = self._event_embedder.dump() + self._bulkwrite_event(event_dump, dump_sizes) for descriptor, event_page in event_dump.items(): self._db_event_count['count_' + descriptor] += len( event_page['seq_num']) @@ -211,7 +236,7 @@ def _event_worker(self): @_try_wrapper def _datum_worker(self): # Gets datum from the queue, embedds them, and writes them to the - # volatile database. + # database. last_push = 0 datum = None @@ -223,7 +248,7 @@ def _datum_worker(self): do_push = False try: if datum is None: - datum = self._datum_queue.get(timeout=0.5) + datum = self._datum_queue.get(timeout=self._QUEUE_TIMEOUT) except queue.Empty: do_push = True else: @@ -238,8 +263,8 @@ def _datum_worker(self): or time.monotonic() > (last_push + self._MAX_INSERT)): if not self._datum_embedder.empty(): - datum_dump = self._datum_embedder.dump() - self._bulkwrite_datum(datum_dump) + datum_dump, dump_sizes = self._datum_embedder.dump() + self._bulkwrite_datum(datum_dump, dump_sizes) for resource, datum_page in datum_dump.items(): self._db_datum_count['count_' + resource] += len( datum_page['datum_id']) @@ -262,7 +287,7 @@ def _count_worker(self): or (sum(self._db_datum_count.values()) > sum(last_datum_count.values())) ): - self._volatile_db.header.update_one( + self._db.header.update_one( {'run_id': self._run_uid}, {'$set': {**dict(self._db_event_count), **dict(self._db_datum_count)}}) @@ -300,21 +325,23 @@ def datum(self, doc): return doc def event_page(self, doc): - self._bulkwrite_event({doc['descriptor']: doc}) + doc_size = len(bson.BSON.encode(doc)) + self._bulkwrite_event({doc['descriptor']: doc}, + {doc['descriptor']: doc_size}) return doc def datum_page(self, doc): - self._bulkwrite_datum({doc['resource']: doc}) + doc_size = len(bson.BSON.encode(doc)) + self._bulkwrite_datum({doc['resource']: doc}, + {doc['resource']: doc_size}) return doc def close(self): - self.freeze(self._run_uid) + self.finalize(self._run_uid) - def freeze(self, run_uid): + def finalize(self, run_uid): """ - Freeze the run by flushing the buffers and moving all of the run's - documents to the permanent database. This method checks that the data - has been transfered correcly, and then deletes the volatile data. + Finalize insertion of the run. """ # Freeze the serializer. self._frozen = True @@ -324,8 +351,6 @@ def freeze(self, run_uid): # Interupt the count worker sleep self._count.set() - # No need to wait the 5 seconds for count_executor to finish because we - # update the final count after explicitly durring freeze. self._count_executor.shutdown(wait=True) self._event_executor.shutdown(wait=True) self._datum_executor.shutdown(wait=True) @@ -345,93 +370,44 @@ def freeze(self, run_uid): # Insert the stop doc. self._insert_header('stop', self._stop_doc) - # Copy the run to the permanent database. - volatile_run = self._get_run(self._volatile_db, run_uid) - self._insert_run(self._permanent_db, volatile_run) - permanent_run = self._get_run(self._permanent_db, run_uid) - - # Check that it has been copied correctly to permanent database, then - # delete the run from the volatile database. - if volatile_run != permanent_run: - raise IOError("Failed to move data to permanent database.") - else: - self._volatile_db.header.drop() - self._volatile_db.event.drop() - self._volatile_db.datum.drop() - - def _get_run(self, db, run_uid): - """ - Gets a run from a database. Returns a list of the run's documents. - """ - run = list() - - # Get the header. - header = db.header.find_one({'run_id': run_uid}, {'_id': False}) - if header is None: - raise RuntimeError(f"Run not found {run_uid}") - - run.append(('header', header)) - - # Get the events. - if 'descriptors' in header.keys(): - for descriptor in header['descriptors']: - run += [('event', doc) for doc in - db.event.find({'descriptor': descriptor['uid']}, - {'_id': False})] - - # Get the datum. - if 'resources' in header.keys(): - for resource in header['resources']: - run += [('datum', doc) for doc in - db.datum.find({'resource': resource['uid']}, - {'_id': False})] - return run - - def _insert_run(self, db, run): - """ - Inserts a run into a database. run is a list of the run's documents. - """ - for collection, doc in run: - db[collection].insert_one(doc) - # del doc['_id'] is needed because insert_one mutates doc. - del doc['_id'] - def _insert_header(self, name, doc): """ Inserts header document into the run's header document. """ - self._volatile_db.header.update_one({'run_id': self._run_uid}, - {'$push': {name: doc}}, - upsert=True) + self._db.header.update_one({'run_id': self._run_uid}, + {'$push': {name: doc}}, + upsert=True) def _set_header(self, name, doc): """ Inserts header document into the run's header document. """ - self._volatile_db.header.update_one({'run_id': self._run_uid}, - {'$set': {name: doc}}) + self._db.header.update_one({'run_id': self._run_uid}, + {'$set': {name: doc}}) - def _bulkwrite_datum(self, datum_buffer): + def _bulkwrite_datum(self, datum_buffer, dump_sizes): """ Bulk writes datum_pages to Mongo datum collection. """ - operations = [self._updateone_datumpage(resource, datum_page) + operations = [self._updateone_datumpage(resource, datum_page, + dump_sizes[resource]) for resource, datum_page in datum_buffer.items()] - self._volatile_db.datum.bulk_write(operations, ordered=False) + self._db.datum.bulk_write(operations, ordered=False) - def _bulkwrite_event(self, event_buffer): + def _bulkwrite_event(self, event_buffer, dump_sizes): """ Bulk writes event_pages to Mongo event collection. """ - operations = [self._updateone_eventpage(descriptor, event_page) + operations = [self._updateone_eventpage(descriptor, event_page, + dump_sizes[descriptor]) for descriptor, event_page in event_buffer.items()] - self._volatile_db.event.bulk_write(operations, ordered=False) + self._db.event.bulk_write(operations, ordered=False) - def _updateone_eventpage(self, descriptor_id, event_page): + def _updateone_eventpage(self, descriptor_id, event_page, size): """ Creates the UpdateOne command that gets used with bulk_write. """ - event_size = sys.getsizeof(event_page) + event_size = size data_string = {'data.' + key: {'$each': value_array} for key, value_array in event_page['data'].items()} @@ -459,11 +435,11 @@ def _updateone_eventpage(self, descriptor_id, event_page): '$max': {'last_index': self._event_count[descriptor_id] - 1}}, upsert=True) - def _updateone_datumpage(self, resource_id, datum_page): + def _updateone_datumpage(self, resource_id, datum_page, size): """ Creates the UpdateOne command that gets used with bulk_write. """ - datum_size = sys.getsizeof(datum_page) + datum_size = size kwargs_string = {'datum_kwargs.' + key: {'$each': value_array} for key, value_array @@ -490,46 +466,6 @@ def _check_start(self, doc): else: self._start_found = True - def explicit_freeze(self, run_uid): - """ - Freeze the run by flushing the buffers and moving all of the run's - documents to the permanent database. This method is inteded to be used - if a run fails, and the freeze method is not called. This method can be - used to freeze a partial run. - """ - # Freeze the serializer. - self._frozen = True - - self._event_queue.put(False) - self._datum_queue.put(False) - - self._count_executor.shutdown(wait=False) - self._event_executor.shutdown(wait=True) - self._datum_executor.shutdown(wait=True) - - if self._worker_error: - raise RuntimeError("Worker exception: ") from self._worker_error - - # Raise exception if buffers are not empty. - assert self._event_queue.empty() - assert self._datum_queue.empty() - assert self._event_embedder.empty() - assert self._datum_embedder.empty() - - # Copy the run to the permanent database. - volatile_run = self._get_run(self._volatile_db, run_uid) - self._insert_run(self._permanent_db, volatile_run) - permanent_run = self._get_run(self._permanent_db, run_uid) - - # Check that it has been copied correctly to permanent database, then - # delete the run from the volatile database. - if volatile_run != permanent_run: - raise IOError("Failed to move data to permanent database.") - else: - self._volatile_db.header.drop() - self._volatile_db.event.drop() - self._volatile_db.datum.drop() - class Embedder(): @@ -572,6 +508,7 @@ def __init__(self, doc_type, max_size): self._embedder = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) self.current_size = 0 + self.stream_size = defaultdict(lambda: 0) if (max_size >= 1000) and (max_size <= 15000000): self._max_size = max_size @@ -608,8 +545,10 @@ def dump(self): embedder_dump = self._embedder self._embedder = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + dump_sizes = dict(self.stream_size) + self.stream_size = defaultdict(lambda: 0) self.current_size = 0 - return embedder_dump + return embedder_dump, dump_sizes def insert(self, doc): """ @@ -623,7 +562,13 @@ def insert(self, doc): result: bool True if insert is successful, False if it failed. """ - doc_size = sys.getsizeof(doc) + doc_size = len(bson.BSON.encode(doc)) + if doc_size > self._max_size: + raise ValueError(f"Document size is too large to fit in the " + f"embedder. doc_size={doc_size}, " + f"embedder_size={self._max_size}, " + f"doc_uid={doc['uid']}") + if (self.current_size + doc_size) > self._max_size: return doc @@ -640,6 +585,7 @@ def insert(self, doc): self._embedder[doc[self._stream_id_key]][key] = value self.current_size += doc_size + self.stream_size[doc[self._stream_id_key]] += doc_size return None def empty(self): diff --git a/suitcase/mongo_embedded/tests/tests.py b/suitcase/mongo_embedded/tests/tests.py index 86c9989..448f352 100644 --- a/suitcase/mongo_embedded/tests/tests.py +++ b/suitcase/mongo_embedded/tests/tests.py @@ -10,9 +10,8 @@ def test_export(db_factory, example_data): """ Test suitcase-mongo-embedded serializer with default parameters. """ - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db) + serializer = Serializer(permanent_db) run(example_data, serializer, permanent_db) if not serializer._frozen: serializer.close() @@ -22,9 +21,8 @@ def test_multithread(db_factory, example_data): """ Test suitcase-mongo-embedded serializer with multiple worker threads. """ - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db, num_threads=5) + serializer = Serializer(permanent_db, num_threads=5) run(example_data, serializer, permanent_db) if not serializer._frozen: serializer.close() @@ -34,9 +32,8 @@ def test_smallbuffer(db_factory, example_data): """ Test suitcase-mongo-embedded serializer with a small buffer. """ - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db, embedder_size=1000) + serializer = Serializer(permanent_db, embedder_size=3000) run(example_data, serializer, permanent_db) if not serializer._frozen: serializer.close() @@ -46,9 +43,8 @@ def test_smallqueue(db_factory, example_data): """ Test suitcase-mongo-embedded serializer with a small buffer. """ - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db, queue_size=1) + serializer = Serializer(permanent_db, queue_size=1) run(example_data, serializer, permanent_db) if not serializer._frozen: serializer.close() @@ -58,9 +54,8 @@ def test_smallpage(db_factory, example_data): """ Test suitcase-mongo-embedded serializer with a small mongo page saize. """ - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db, page_size=10000) + serializer = Serializer(permanent_db, page_size=10000) run(example_data, serializer, permanent_db) if not serializer._frozen: serializer.close() @@ -74,9 +69,8 @@ def test_evil_db(db_factory, example_data): def evil_func(*args, **kwargs): raise RuntimeError - volatile_db = db_factory() permanent_db = db_factory() - serializer = Serializer(volatile_db, permanent_db) + serializer = Serializer(permanent_db) serializer._bulkwrite_event = evil_func serializer._bulkwrite_datum = evil_func with pytest.raises(RuntimeError): @@ -101,7 +95,6 @@ def run(example_data, serializer, permanent_db): # Fix formatting for JSON. item = event_model.sanitize_doc(item) - # Send the bluesky doc to the serializer mongo_serializer(*item)