Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"flacarray>=0.3.4",
"h5py",
"mapcat",
"opentelemetry",
]
dynamic=["version"]
classifiers = [
Expand Down
62 changes: 44 additions & 18 deletions sotodlib/io/imprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
from .g3thk_db import G3tHk, HKFiles
from ..site_pipeline.utils.logging import init_logger

from opentelemetry import trace

tracer = trace.get_tracer("sotodlib")

####################
# useful constants #
Expand Down Expand Up @@ -1756,25 +1759,48 @@ def upload_book_to_librarian(self, book, session=None, raise_on_error=True):
if self.librarian is None:
self._librarian_connect()

assert book.status == BOUND, "cannot upload unbound books"
with tracer.start_as_current_span("librarian_upload", attributes={
"book.bid": book.bid,
"book.type": book.type,
"book.status": book.status,
"librarian.host": self.librarian.host,
}) as span:
self.logger.info(f"Uploading book {book.bid} to librarian")
try:
assert book.status == BOUND, "cannot upload unbound books"

local_path = Path(self.get_book_abs_path(book))
dest_path = Path(book.path)

span.set_attributes(
{
"book.local_path": str(local_path),
"librarian.dest_path": str(dest_path),
}
)

self.logger.info(f"Uploading book {book.bid} to librarian")
try:
self.librarian.upload(
Path( self.get_book_abs_path(book) ),
Path( book.path ),
)
book.status = UPLOADED
session.commit()
except Exception as e:
self.logger.error(
f"Failed to upload book {book.bid}."
)
if raise_on_error:
raise e
else:
return False, e
return True, None
self.librarian.upload(
local_path,
dest_path,
)

book.status = UPLOADED
session.commit()
except Exception as e:
self.logger.error(
f"Failed to upload book {book.bid}."
)
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))

if raise_on_error:
raise e
else:
return False, e

span.set_status(trace.Status(trace.StatusCode.OK))

return True, None

def check_book_in_librarian(
self,
Expand Down
61 changes: 40 additions & 21 deletions sotodlib/site_pipeline/update_librarian.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pathlib import Path
from typing import Optional

from opentelemetry import trace

from sotodlib.io.imprinter import Imprinter, BOUND, UPLOADED
from sotodlib.site_pipeline.utils.logging import init_logger

Expand Down Expand Up @@ -49,29 +51,46 @@ def core(config: str):
Path to config file for imprinter
"""

imprinter = Imprinter(
config,
db_args={'connect_args': {'check_same_thread': False}},
)

session = imprinter.get_session()
to_upload = imprinter.get_bound_books(session=session)
tracer = trace.get_tracer("sotodlib")

failed_list = []
for book in to_upload:
success, err = imprinter.upload_book_to_librarian(
book, session=session, raise_on_error=False
with tracer.start_as_current_span("update_librarian"):
imprinter = Imprinter(
config,
db_args={'connect_args': {'check_same_thread': False}},
)
if not success:
failed_list.append( (book.bid, err) )
## don't just continually fail
if len(failed_list) > 5:
break

if len(failed_list) != 0:
# raise the first error so we know something is wrong
logger.error(f"Failed to upload books {[f[0] for f in failed_list]}")
raise failed_list[0][1]

session = imprinter.get_session()
to_upload = imprinter.get_bound_books(session=session)

failed_list = []
for book in to_upload:
with tracer.start_as_current_span("upload_book_to_librarian") as span:
span.set_attribute("book_id", book.bid)

success, err = imprinter.upload_book_to_librarian(
book, session=session, raise_on_error=False
)

if not success:
span.set_status(
trace.Status(
trace.StatusCode.ERROR,
f"Failed to upload book to librarian with error {err}"
)
)

failed_list.append( (book.bid, err) )
else:
span.set_status(trace.Status(trace.StatusCode.OK))

# Pause if we fail too many in total.
if len(failed_list) > 5:
break

if len(failed_list) != 0:
# raise the first error so we know something is wrong
logger.error(f"Failed to upload books {[f[0] for f in failed_list]}")
raise failed_list[0][1]


def get_parser(parser=None):
Expand Down
Loading