diff --git a/pyproject.toml b/pyproject.toml index 0d2200a65..4c5c41b99 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "flacarray>=0.3.4", "h5py", "mapcat", + "opentelemetry", ] dynamic=["version"] classifiers = [ diff --git a/sotodlib/io/imprinter.py b/sotodlib/io/imprinter.py index 7037c20c9..24e7242de 100644 --- a/sotodlib/io/imprinter.py +++ b/sotodlib/io/imprinter.py @@ -32,6 +32,9 @@ from .g3thk_db import G3tHk, HKFiles from ..site_pipeline.utils.logging import init_logger +from opentelemetry import trace + +tracer = trace.get_tracer("sotodlib") #################### # useful constants # @@ -1756,25 +1759,48 @@ def upload_book_to_librarian(self, book, session=None, raise_on_error=True): if self.librarian is None: self._librarian_connect() - assert book.status == BOUND, "cannot upload unbound books" + with tracer.start_as_current_span("librarian_upload", attributes={ + "book.bid": book.bid, + "book.type": book.type, + "book.status": book.status, + "librarian.host": self.librarian.host, + }) as span: + self.logger.info(f"Uploading book {book.bid} to librarian") + try: + assert book.status == BOUND, "cannot upload unbound books" + + local_path = Path(self.get_book_abs_path(book)) + dest_path = Path(book.path) + + span.set_attributes( + { + "book.local_path": str(local_path), + "librarian.dest_path": str(dest_path), + } + ) - self.logger.info(f"Uploading book {book.bid} to librarian") - try: - self.librarian.upload( - Path( self.get_book_abs_path(book) ), - Path( book.path ), - ) - book.status = UPLOADED - session.commit() - except Exception as e: - self.logger.error( - f"Failed to upload book {book.bid}." - ) - if raise_on_error: - raise e - else: - return False, e - return True, None + self.librarian.upload( + local_path, + dest_path, + ) + + book.status = UPLOADED + session.commit() + except Exception as e: + self.logger.error( + f"Failed to upload book {book.bid}." + ) + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + + if raise_on_error: + raise e + else: + return False, e + + span.set_status(trace.Status(trace.StatusCode.OK)) + + return True, None def check_book_in_librarian( self, diff --git a/sotodlib/site_pipeline/update_librarian.py b/sotodlib/site_pipeline/update_librarian.py index f432dad57..3498b801c 100644 --- a/sotodlib/site_pipeline/update_librarian.py +++ b/sotodlib/site_pipeline/update_librarian.py @@ -3,6 +3,8 @@ from pathlib import Path from typing import Optional +from opentelemetry import trace + from sotodlib.io.imprinter import Imprinter, BOUND, UPLOADED from sotodlib.site_pipeline.utils.logging import init_logger @@ -49,29 +51,46 @@ def core(config: str): Path to config file for imprinter """ - imprinter = Imprinter( - config, - db_args={'connect_args': {'check_same_thread': False}}, - ) - - session = imprinter.get_session() - to_upload = imprinter.get_bound_books(session=session) + tracer = trace.get_tracer("sotodlib") - failed_list = [] - for book in to_upload: - success, err = imprinter.upload_book_to_librarian( - book, session=session, raise_on_error=False + with tracer.start_as_current_span("update_librarian"): + imprinter = Imprinter( + config, + db_args={'connect_args': {'check_same_thread': False}}, ) - if not success: - failed_list.append( (book.bid, err) ) - ## don't just continually fail - if len(failed_list) > 5: - break - - if len(failed_list) != 0: - # raise the first error so we know something is wrong - logger.error(f"Failed to upload books {[f[0] for f in failed_list]}") - raise failed_list[0][1] + + session = imprinter.get_session() + to_upload = imprinter.get_bound_books(session=session) + + failed_list = [] + for book in to_upload: + with tracer.start_as_current_span("upload_book_to_librarian") as span: + span.set_attribute("book_id", book.bid) + + success, err = imprinter.upload_book_to_librarian( + book, session=session, raise_on_error=False + ) + + if not success: + span.set_status( + trace.Status( + trace.StatusCode.ERROR, + f"Failed to upload book to librarian with error {err}" + ) + ) + + failed_list.append( (book.bid, err) ) + else: + span.set_status(trace.Status(trace.StatusCode.OK)) + + # Pause if we fail too many in total. + if len(failed_list) > 5: + break + + if len(failed_list) != 0: + # raise the first error so we know something is wrong + logger.error(f"Failed to upload books {[f[0] for f in failed_list]}") + raise failed_list[0][1] def get_parser(parser=None):