Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion sotodlib/io/bookbinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,13 @@ def __init__(self, book, obsdb, filedb, data_root, readout_ids,
self.file_idxs = None
self.meta_files = None

def close(self):
"""Completely closes the log file
"""
for handler in self.log.handlers[:]:
handler.close()
self.log.removeHandler(handler)

def set_min_max_ctime(self):
"""Function to be run after stream.preprocess is finished to set the ctimes.
Splitting this out because it is useful for debugging purposes as well
Expand Down Expand Up @@ -1343,6 +1350,9 @@ def __init__(
else:
self.compress_output = False

def close(self):
pass

def get_metadata(self, telescope=None, tube_config={}):
return {
"book_id": self.book.bid,
Expand Down Expand Up @@ -1506,7 +1516,7 @@ def get_frame_times(frame, allow_bad_timing=False):

# Look for evidence of counters de-syncing.
stat = c2-(c0/480000) - np.round(c2-(c0/480000))
counters = ((np.ptp(stat)<0.001)+(np.abs(np.mean(stat))<0.0025))
counters = ((np.ptp(stat)<0.001) and (np.abs(np.mean(stat))<0.0025))

ts = np.round(c2 - (c0 / 480000) ) + c0 / 480000

Expand Down
16 changes: 13 additions & 3 deletions sotodlib/io/datapkg_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ def file_list_from_database(

def verify_timecode_deletable(
self, timecode, verify_with_librarian=True, include_hk=True,
raise_extra_db_files=False,
):
"""
Checkes that all books in that timecode are uploaded to the librarian
Expand All @@ -589,6 +590,10 @@ def verify_timecode_deletable(
folders into this list since they aren't book bound but we'd like them
to be deleted
3. Compare the two lists and make sure they're the same.

If raise_extra_db_files is False, don't raise an error if we find files in
the database but not on disk. In the last 1 year, this has only happened
when a previous cleanup script failed for a locked database
"""
deletable = [True, ""]

Expand Down Expand Up @@ -630,9 +635,14 @@ def verify_timecode_deletable(
msg = f"Files in database but not on disk: {extra_files}"
for f in missed_files:
msg += f"\t{f}\n"
self.logger.error(msg)
deletable[0] = False
deletable[1] += msg
if raise_extra_db_files:
self.logger.error(msg)
deletable[0] = False
deletable[1] += msg
else:
self.logger.warning(msg)
deletable[0] = True
deletable[1] += msg
return deletable

def delete_timecode_level2(
Expand Down
8 changes: 7 additions & 1 deletion sotodlib/io/imprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ def _run_book_binding(
assert book.type in VALID_OBSTYPES

err = None
binder = None
try:
# find appropriate binder for the book type
binder = self._get_binder_for_book(
Expand Down Expand Up @@ -878,7 +879,12 @@ def _run_book_binding(
message = f"{message}\ntrace={err_msg}" if message else err_msg
status = FAILED
err = e


## bookbinder creates a log file and holds the connection open.
## Doesn't matter during normal operations but make some fixing operations annoying
if binder is not None:
binder.close()

return book.bid, status, message, err

def bind_book(
Expand Down
61 changes: 60 additions & 1 deletion sotodlib/io/imprinter_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime as dt
from typing import Optional

import sotodlib.io.bookbinder as bbinder
from sotodlib.io.imprinter import Imprinter, Books, FAILED
import sotodlib.io.imprinter_utils as utils

Expand Down Expand Up @@ -346,11 +347,36 @@ def fix_book(self):
if np.all([x<=self.max_drops_to_fix for x in self.dropped.values()]):
utils.set_book_rebind(self.imprint, self.book)
self.imprint.bind_book(self.book, allow_bad_timing=True,)

## if any observations are over the limit, double check it's not a
## SMURF database error
dropped_oids = [k for k,x in self.dropped.items()
if x>self.max_drops_to_fix]
g3session, SMURF = self.imprint.get_g3tsmurf_session(
return_archive=True
)
for oid in dropped_oids:
print(f"Updating Level 2 observations for {oid}")
obs = SMURF.get_observation(oid, g3session)
SMURF.update_observation_files(obs, g3session, force=True)

utils.set_book_rebind(self.imprint, self.book)
try:
self.imprint.bind_book(self.book)
except bbinder.BadTimeSamples as err:
print(f"New error message is {err}")

print(self.report_error())
if sum([x>self.max_drops_to_fix for x in self.dropped.values()])==0:
print("problem solved!")
return

## if all our dropped values are more than the limit. Don't Bind
elif np.all([x>self.max_drops_to_fix for x in self.dropped.values()]):
if np.all([x>self.max_drops_to_fix for x in self.dropped.values()]):
print(f"All obs_ids have more than {self.max_drops_to_fix}"
" will not bind book")
utils.set_book_wont_bind(self.imprint, self.book)

## if only some of the observations have dropped timing. remove them
else:
remove_oid = [k for k,x in self.dropped.items()
Expand Down Expand Up @@ -416,6 +442,38 @@ def fix_book(self):
utils.set_book_rebind(self.imprint, self.book)
self.imprint.bind_book(self.book, require_monotonic_times=False,)

class TimingCounterError(BookError):
@staticmethod
def has_error(book):
return "TimingCounterError" in book.message

def report_error(self):
return f"{self.book.bid} has bad counter statistics"

def fix_book(self):
g3session, SMURF = self.imprint.get_g3tsmurf_session(
return_archive=True
)

obsdb = self.imprint.get_g3tsmurf_obs_for_book(self.book)
for obs_id, obs in obsdb.items():
## if this was tagged correctly at level 2, ready to bind
if "bad" == obs.tag.split(',')[1]:
continue
# otherwise, try and tag it correctly
print(f"Updating level 2 files and observation for {obs_id}")
for db_file in obs.files:
SMURF.add_file(db_file.name, g3session, overwrite=True)
SMURF.update_observation_files(obs, g3session, force=True)
assert "bad" == obs.tag.split(',')[1], "level 2 file is not reporting bad counters"

# bad timing books can't be multiwafer
books = utils.split_book_by_obs(self.imprint, self.book)
for book in books:
print(f"Binding book {book.bid} allowing bad timing")
utils.set_book_rebind(self.imprint, book)
self.imprint.bind_book(book, allow_bad_timing=True,)

AUTOFIX_ERRORS = [
SecondFail,
BookDirHasFiles,
Expand All @@ -430,6 +488,7 @@ def fix_book(self):
FileTooLargeError,
BadTimeSamples,
NonMonotonicAncillaryTimes,
TimingCounterError,
]

def process_book_failure(
Expand Down
94 changes: 92 additions & 2 deletions sotodlib/io/imprinter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,14 @@ def delete_level2_obs_and_book(imprint, book, session=None):
session.commit()


def remove_level2_obs_from_book(imprint, book, bad_obs_id):
def remove_level2_obs_from_book(
imprint, book, bad_obs_id,
register_wont_bind=False
):
"""If one level2 observation is problematic, delete that book and re-register without it.

register_wont_bind: if true, register the removed level 2 observation as a
book and set it's status to WONT_BIND.
"""
assert book.type == "obs", f"Book should be an 'obs' book"

Expand All @@ -146,6 +152,7 @@ def remove_level2_obs_from_book(imprint, book, bad_obs_id):
G3tObservations.obs_id == o
).one() for o in new_obs_list
]
tel_tube = book.tel_tube
oset = ObsSet(
olist,
mode=book.type,
Expand All @@ -157,7 +164,90 @@ def remove_level2_obs_from_book(imprint, book, bad_obs_id):
session.delete(o)
session.delete(book)
session.commit()
return imprint.register_book(oset, session=session)
new_book = imprint.register_book(oset, session=session)

if register_wont_bind:
bad_obs = g3session.query(G3tObservations).filter(
G3tObservations.obs_id == bad_obs_id
).one()
bad_oset = ObsSet(
[bad_obs],
mode='obs',
slots=imprint.tubes[tel_tube]['slots'],
tel_tube=tel_tube
)
bad_book = imprint.register_book(bad_oset, session=session)
set_book_wont_bind(
imprint, bad_book,
message=f"Removed from {new_book.bid}: observation {bad_obs_id} excluded during autofix",
session=session,
)

return new_book

def split_book_by_obs(imprint, book, session=None):
"""Split a book with data from multiple wafers into books for each individual wafer.

If the book contains only one observation, returns the book and makes
no changes. Otherwise, deletes the original book from the BookDB and
registers a new book for each observation it contained.

Parameters
----------
imprint : Imprinter instance
book : str or Book
The book to split. If a string, it is looked up via imprint.get_book().
session : BookDB session, optional
If None, one is obtained from imprint.get_session().

Returns
-------
new_books : list of Book
The newly registered single-observation books. If the original
book had only one observation returns [book].
"""
if session is None:
session = imprint.get_session()

if isinstance(book, str):
book = imprint.get_book(book)

obs_dict = imprint.get_g3tsmurf_obs_for_book(book)

if len(obs_dict) <= 1:
imprint.logger.info(
f"Book {book.bid} has {len(obs_dict)} observation(s); no split needed."
)
return [book]

imprint.logger.info(
f"Splitting book {book.bid} with {len(obs_dict)} observations into "
f"{len(obs_dict)} single-observation books."
)

# Clean up any staged files before re-registering
set_book_rebind(imprint, book)

# Remove the observations from the book to deal with uniqueness constraints
for o in book.obs:
session.delete(o)
session.commit()

new_books = []
for obs_id, obs in obs_dict.items():
oset = ObsSet(
[obs],
mode=book.type,
slots=imprint.tubes[book.tel_tube]['slots'],
tel_tube=book.tel_tube,
)
new_book = imprint.register_book(oset, session=session)
new_books.append(new_book)

session.delete(book)
session.commit()

return new_books

def find_overlaps(imprint, obs_id, min_ctime, max_ctime):
""" helper function for when a level 2 observation could span multiple
Expand Down
52 changes: 40 additions & 12 deletions sotodlib/io/load_smurf.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ def get_sample_timestamps(frame):
times = np.array([t.time / spt3g_core.G3Units.s for t in frame["data"].times()])
return times, TimingParadigm.G3Timestream

def check_frame_counters( frame ):
"""check counters in frame are doing what we want them to do.
bookbinder also checks but we'll see these warnings faster
"""
fo = frame.get('primary', None)
if fo is None:
return False # no good timing without primary
key_map = {k: i for i, k in enumerate(fo.names)}
c0 = fo.data[ key_map['Counter0']]
c2 = fo.data[ key_map['Counter2']]

s, ns = split_ts_bits(c2)
# Add 20 years in seconds (accounting for leap years) to handle
# offset between EPICS time referenced to 1990 relative to UNIX time.
c2 = s + ns*1e-9 + 5*(4*365 + 1)*24*60*60

# check all counters are incrementing
counters = np.all( np.diff( c0 ) != 0 )
counters = counters and np.all( np.diff( c2 ) != 0)

# Look for evidence of counters de-syncing.
stat = c2-(c0/480000) - np.round(c2-(c0/480000))
counters = counters and (
(np.ptp(stat)<0.001) and (np.abs(np.mean(stat))<0.0025)
)
return counters

def _file_has_end_frames(filename):
ended = False
Expand Down Expand Up @@ -331,6 +357,16 @@ def last_update(self, time):
agent.time = time
session.commit()

def get_observation(self, obs_id, session=None, ):
"""
get the observation object for obs_id
"""
if session is None:
session = self.Session()
return session.query(Observations).filter(
Observations.obs_id==obs_id
).one_or_none()

def add_file(self, path, session, overwrite=False):
"""
Indexes a single file and adds it to the sqlite database. Creates a
Expand Down Expand Up @@ -450,18 +486,7 @@ def add_file(self, path, session, overwrite=False):
timing = timing and (
frame.get("timing_paradigm", "") == "High Precision"
)
fo = frame.get('primary', None)
if fo is None:
timing = False # no good timing without primary
else:
key_map = {k: i for i, k in enumerate(fo.names)}
counters = np.all(
np.diff( fo.data[ key_map['Counter0']] ) != 0
) and np.all(
np.diff( fo.data[ key_map['Counter2']] ) != 0
)
# check all counters are incrementing
timing = timing and counters
timing = timing and check_frame_counters(frame)

else:
db_frame.n_samples = data.n_samples
Expand Down Expand Up @@ -498,6 +523,9 @@ def add_file(self, path, session, overwrite=False):
logger.debug(f"file {db_file.name} is an observation")
self.add_new_observation_from_status(status, session)




def index_archive(
self,
stop_at_error=False,
Expand Down
Loading