diff --git a/sotodlib/io/bookbinder.py b/sotodlib/io/bookbinder.py index 6ec1d538d..ddbe27296 100644 --- a/sotodlib/io/bookbinder.py +++ b/sotodlib/io/bookbinder.py @@ -999,6 +999,13 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids, self.file_idxs = None self.meta_files = None + def close(self): + """Completely closes the log file + """ + for handler in self.log.handlers[:]: + handler.close() + self.log.removeHandler(handler) + def set_min_max_ctime(self): """Function to be run after stream.preprocess is finished to set the ctimes. Splitting this out because it is useful for debugging purposes as well @@ -1343,6 +1350,9 @@ def __init__( else: self.compress_output = False + def close(self): + pass + def get_metadata(self, telescope=None, tube_config={}): return { "book_id": self.book.bid, @@ -1506,7 +1516,7 @@ def get_frame_times(frame, allow_bad_timing=False): # Look for evidence of counters de-syncing. stat = c2-(c0/480000) - np.round(c2-(c0/480000)) - counters = ((np.ptp(stat)<0.001)+(np.abs(np.mean(stat))<0.0025)) + counters = ((np.ptp(stat)<0.001) and (np.abs(np.mean(stat))<0.0025)) ts = np.round(c2 - (c0 / 480000) ) + c0 / 480000 diff --git a/sotodlib/io/datapkg_completion.py b/sotodlib/io/datapkg_completion.py index 7f0c1631b..59e7c5aaf 100644 --- a/sotodlib/io/datapkg_completion.py +++ b/sotodlib/io/datapkg_completion.py @@ -576,6 +576,7 @@ def file_list_from_database( def verify_timecode_deletable( self, timecode, verify_with_librarian=True, include_hk=True, + raise_extra_db_files=False, ): """ Checkes that all books in that timecode are uploaded to the librarian @@ -589,6 +590,10 @@ def verify_timecode_deletable( folders into this list since they aren't book bound but we'd like them to be deleted 3. Compare the two lists and make sure they're the same. + + If raise_extra_db_files is False, don't raise an error if we find files in + the database but not on disk. In the last 1 year, this has only happened + when a previous cleanup script failed for a locked database """ deletable = [True, ""] @@ -630,9 +635,14 @@ def verify_timecode_deletable( msg = f"Files in database but not on disk: {extra_files}" for f in missed_files: msg += f"\t{f}\n" - self.logger.error(msg) - deletable[0] = False - deletable[1] += msg + if raise_extra_db_files: + self.logger.error(msg) + deletable[0] = False + deletable[1] += msg + else: + self.logger.warning(msg) + deletable[0] = True + deletable[1] += msg return deletable def delete_timecode_level2( diff --git a/sotodlib/io/imprinter.py b/sotodlib/io/imprinter.py index 761a40271..fc1b51261 100644 --- a/sotodlib/io/imprinter.py +++ b/sotodlib/io/imprinter.py @@ -839,6 +839,7 @@ def _run_book_binding( assert book.type in VALID_OBSTYPES err = None + binder = None try: # find appropriate binder for the book type binder = self._get_binder_for_book( @@ -878,7 +879,12 @@ def _run_book_binding( message = f"{message}\ntrace={err_msg}" if message else err_msg status = FAILED err = e - + + ## bookbinder creates a log file and holds the connection open. + ## Doesn't matter during normal operations but make some fixing operations annoying + if binder is not None: + binder.close() + return book.bid, status, message, err def bind_book( diff --git a/sotodlib/io/imprinter_cli.py b/sotodlib/io/imprinter_cli.py index 8a9dd86ca..5a2b6c9d8 100644 --- a/sotodlib/io/imprinter_cli.py +++ b/sotodlib/io/imprinter_cli.py @@ -19,6 +19,7 @@ import datetime as dt from typing import Optional +import sotodlib.io.bookbinder as bbinder from sotodlib.io.imprinter import Imprinter, Books, FAILED import sotodlib.io.imprinter_utils as utils @@ -346,11 +347,36 @@ def fix_book(self): if np.all([x<=self.max_drops_to_fix for x in self.dropped.values()]): utils.set_book_rebind(self.imprint, self.book) self.imprint.bind_book(self.book, allow_bad_timing=True,) + + ## if any observations are over the limit, double check it's not a + ## SMURF database error + dropped_oids = [k for k,x in self.dropped.items() + if x>self.max_drops_to_fix] + g3session, SMURF = self.imprint.get_g3tsmurf_session( + return_archive=True + ) + for oid in dropped_oids: + print(f"Updating Level 2 observations for {oid}") + obs = SMURF.get_observation(oid, g3session) + SMURF.update_observation_files(obs, g3session, force=True) + + utils.set_book_rebind(self.imprint, self.book) + try: + self.imprint.bind_book(self.book) + except bbinder.BadTimeSamples as err: + print(f"New error message is {err}") + + print(self.report_error()) + if sum([x>self.max_drops_to_fix for x in self.dropped.values()])==0: + print("problem solved!") + return + ## if all our dropped values are more than the limit. Don't Bind - elif np.all([x>self.max_drops_to_fix for x in self.dropped.values()]): + if np.all([x>self.max_drops_to_fix for x in self.dropped.values()]): print(f"All obs_ids have more than {self.max_drops_to_fix}" " will not bind book") utils.set_book_wont_bind(self.imprint, self.book) + ## if only some of the observations have dropped timing. remove them else: remove_oid = [k for k,x in self.dropped.items() @@ -416,6 +442,38 @@ def fix_book(self): utils.set_book_rebind(self.imprint, self.book) self.imprint.bind_book(self.book, require_monotonic_times=False,) +class TimingCounterError(BookError): + @staticmethod + def has_error(book): + return "TimingCounterError" in book.message + + def report_error(self): + return f"{self.book.bid} has bad counter statistics" + + def fix_book(self): + g3session, SMURF = self.imprint.get_g3tsmurf_session( + return_archive=True + ) + + obsdb = self.imprint.get_g3tsmurf_obs_for_book(self.book) + for obs_id, obs in obsdb.items(): + ## if this was tagged correctly at level 2, ready to bind + if "bad" == obs.tag.split(',')[1]: + continue + # otherwise, try and tag it correctly + print(f"Updating level 2 files and observation for {obs_id}") + for db_file in obs.files: + SMURF.add_file(db_file.name, g3session, overwrite=True) + SMURF.update_observation_files(obs, g3session, force=True) + assert "bad" == obs.tag.split(',')[1], "level 2 file is not reporting bad counters" + + # bad timing books can't be multiwafer + books = utils.split_book_by_obs(self.imprint, self.book) + for book in books: + print(f"Binding book {book.bid} allowing bad timing") + utils.set_book_rebind(self.imprint, book) + self.imprint.bind_book(book, allow_bad_timing=True,) + AUTOFIX_ERRORS = [ SecondFail, BookDirHasFiles, @@ -430,6 +488,7 @@ def fix_book(self): FileTooLargeError, BadTimeSamples, NonMonotonicAncillaryTimes, + TimingCounterError, ] def process_book_failure( diff --git a/sotodlib/io/imprinter_utils.py b/sotodlib/io/imprinter_utils.py index 1eebf1bec..f93c86312 100644 --- a/sotodlib/io/imprinter_utils.py +++ b/sotodlib/io/imprinter_utils.py @@ -128,8 +128,14 @@ def delete_level2_obs_and_book(imprint, book, session=None): session.commit() -def remove_level2_obs_from_book(imprint, book, bad_obs_id): +def remove_level2_obs_from_book( + imprint, book, bad_obs_id, + register_wont_bind=False +): """If one level2 observation is problematic, delete that book and re-register without it. + + register_wont_bind: if true, register the removed level 2 observation as a + book and set it's status to WONT_BIND. """ assert book.type == "obs", f"Book should be an 'obs' book" @@ -146,6 +152,7 @@ def remove_level2_obs_from_book(imprint, book, bad_obs_id): G3tObservations.obs_id == o ).one() for o in new_obs_list ] + tel_tube = book.tel_tube oset = ObsSet( olist, mode=book.type, @@ -157,7 +164,90 @@ def remove_level2_obs_from_book(imprint, book, bad_obs_id): session.delete(o) session.delete(book) session.commit() - return imprint.register_book(oset, session=session) + new_book = imprint.register_book(oset, session=session) + + if register_wont_bind: + bad_obs = g3session.query(G3tObservations).filter( + G3tObservations.obs_id == bad_obs_id + ).one() + bad_oset = ObsSet( + [bad_obs], + mode='obs', + slots=imprint.tubes[tel_tube]['slots'], + tel_tube=tel_tube + ) + bad_book = imprint.register_book(bad_oset, session=session) + set_book_wont_bind( + imprint, bad_book, + message=f"Removed from {new_book.bid}: observation {bad_obs_id} excluded during autofix", + session=session, + ) + + return new_book + +def split_book_by_obs(imprint, book, session=None): + """Split a book with data from multiple wafers into books for each individual wafer. + + If the book contains only one observation, returns the book and makes + no changes. Otherwise, deletes the original book from the BookDB and + registers a new book for each observation it contained. + + Parameters + ---------- + imprint : Imprinter instance + book : str or Book + The book to split. If a string, it is looked up via imprint.get_book(). + session : BookDB session, optional + If None, one is obtained from imprint.get_session(). + + Returns + ------- + new_books : list of Book + The newly registered single-observation books. If the original + book had only one observation returns [book]. + """ + if session is None: + session = imprint.get_session() + + if isinstance(book, str): + book = imprint.get_book(book) + + obs_dict = imprint.get_g3tsmurf_obs_for_book(book) + + if len(obs_dict) <= 1: + imprint.logger.info( + f"Book {book.bid} has {len(obs_dict)} observation(s); no split needed." + ) + return [book] + + imprint.logger.info( + f"Splitting book {book.bid} with {len(obs_dict)} observations into " + f"{len(obs_dict)} single-observation books." + ) + + # Clean up any staged files before re-registering + set_book_rebind(imprint, book) + + # Remove the observations from the book to deal with uniqueness constraints + for o in book.obs: + session.delete(o) + session.commit() + + new_books = [] + for obs_id, obs in obs_dict.items(): + oset = ObsSet( + [obs], + mode=book.type, + slots=imprint.tubes[book.tel_tube]['slots'], + tel_tube=book.tel_tube, + ) + new_book = imprint.register_book(oset, session=session) + new_books.append(new_book) + + session.delete(book) + session.commit() + + return new_books def find_overlaps(imprint, obs_id, min_ctime, max_ctime): """ helper function for when a level 2 observation could span multiple diff --git a/sotodlib/io/load_smurf.py b/sotodlib/io/load_smurf.py index cc921ade1..c667b4a29 100644 --- a/sotodlib/io/load_smurf.py +++ b/sotodlib/io/load_smurf.py @@ -116,6 +116,32 @@ def get_sample_timestamps(frame): times = np.array([t.time / spt3g_core.G3Units.s for t in frame["data"].times()]) return times, TimingParadigm.G3Timestream +def check_frame_counters( frame ): + """check counters in frame are doing what we want them to do. + bookbinder also checks but we'll see these warnings faster + """ + fo = frame.get('primary', None) + if fo is None: + return False # no good timing without primary + key_map = {k: i for i, k in enumerate(fo.names)} + c0 = fo.data[ key_map['Counter0']] + c2 = fo.data[ key_map['Counter2']] + + s, ns = split_ts_bits(c2) + # Add 20 years in seconds (accounting for leap years) to handle + # offset between EPICS time referenced to 1990 relative to UNIX time. + c2 = s + ns*1e-9 + 5*(4*365 + 1)*24*60*60 + + # check all counters are incrementing + counters = np.all( np.diff( c0 ) != 0 ) + counters = counters and np.all( np.diff( c2 ) != 0) + + # Look for evidence of counters de-syncing. + stat = c2-(c0/480000) - np.round(c2-(c0/480000)) + counters = counters and ( + (np.ptp(stat)<0.001) and (np.abs(np.mean(stat))<0.0025) + ) + return counters def _file_has_end_frames(filename): ended = False @@ -331,6 +357,16 @@ def last_update(self, time): agent.time = time session.commit() + def get_observation(self, obs_id, session=None, ): + """ + get the observation object for obs_id + """ + if session is None: + session = self.Session() + return session.query(Observations).filter( + Observations.obs_id==obs_id + ).one_or_none() + def add_file(self, path, session, overwrite=False): """ Indexes a single file and adds it to the sqlite database. Creates a @@ -450,18 +486,7 @@ def add_file(self, path, session, overwrite=False): timing = timing and ( frame.get("timing_paradigm", "") == "High Precision" ) - fo = frame.get('primary', None) - if fo is None: - timing = False # no good timing without primary - else: - key_map = {k: i for i, k in enumerate(fo.names)} - counters = np.all( - np.diff( fo.data[ key_map['Counter0']] ) != 0 - ) and np.all( - np.diff( fo.data[ key_map['Counter2']] ) != 0 - ) - # check all counters are incrementing - timing = timing and counters + timing = timing and check_frame_counters(frame) else: db_frame.n_samples = data.n_samples @@ -498,6 +523,9 @@ def add_file(self, path, session, overwrite=False): logger.debug(f"file {db_file.name} is an observation") self.add_new_observation_from_status(status, session) + + + def index_archive( self, stop_at_error=False,