From c33f5fdb116f3ad3173db307585782257ff6ae8f Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Fri, 6 Mar 2026 08:43:50 -0800 Subject: [PATCH 01/10] test batch job lock --- sotodlib/site_pipeline/jobdb.py | 145 +++++++++++++++++++++----------- 1 file changed, 97 insertions(+), 48 deletions(-) diff --git a/sotodlib/site_pipeline/jobdb.py b/sotodlib/site_pipeline/jobdb.py index eedfe6911..3cc306368 100644 --- a/sotodlib/site_pipeline/jobdb.py +++ b/sotodlib/site_pipeline/jobdb.py @@ -300,11 +300,11 @@ def get_jobs(self, [session.expunge(j) for j in jobs] return jobs - def lock(self, job_id, owner=None, force=False): - """Lock a Job record by id. If the Job is already locked, a - JobLockedError is raised. + def lock(self, job_ids, owner=None, force=False): + """Lock one or more Jobs record by id. If a Job is already locked, + a JobLockedError is raised. - Returns a Job object that has been expunged from the database + Returns the Job objects that has been expunged from the database session. The object attributes can be modified, but won't be written back to the database unless the object is merged into a new session. @@ -312,43 +312,84 @@ def lock(self, job_id, owner=None, force=False): """ if owner is None: owner = self._lockstr() + + now = time.time() + with self.session_scope() as session: - q = session.query(Job) - if force: - q = q.filter(sqy.and_(Job.id == job_id)) - else: - q = q.filter(sqy.and_(Job.id == job_id, - Job.lock == None)) # noqa: E711 - n = q.update({Job.lock: time.time(), Job.lock_owner: owner}) + q = session.query(Job).filter(Job.id.in_(job_ids)) + + if not force: + q = q.filter(Job.lock == None) + + n = q.update( + {Job.lock: now, Job.lock_owner: owner}, + synchronize_session=False + ) + session.commit() with self.session_scope() as session: - job = session.get(Job, job_id) - session.expunge(job) + jobs = ( + session.query(Job) + .filter(Job.id.in_(job_ids)) + .all() + ) + + for job in jobs: + session.expunge(job) - if n == 0 or job.lock_owner != owner: + locked_jobs = [j for j in jobs if j.lock_owner == owner] + + if len(locked_jobs) != len(job_ids): raise JobLockedError() - return job + return locked_jobs + + def unlock(self, jobs, merge=True): + if not isinstance(jobs, (list, tuple, set)): + jobs = [jobs] + + job_ids = [] + job_objs = [] - def unlock(self, job, merge=True): - if not merge or isinstance(job, int): - if isinstance(job, Job): - job = job.id + for j in jobs: + if isinstance(j, Job): + job_ids.append(j.id) + job_objs.append(j) + else: + job_ids.append(j) + + if not merge or not job_objs: with self.session_scope() as session: - session.query(Job).filter(Job.id == job).update( - {Job.lock: None, Job.lock_owner: None}) + session.query(Job).filter(Job.id.in_(job_ids)).update( + {Job.lock: None, Job.lock_owner: None}, + synchronize_session=False + ) session.commit() + else: with self.session_scope() as session: - j1 = session.query(Job).filter(Job.id == job.id).one() - if j1.lock_owner is None: - raise JobNotLockedError() - if j1.lock_owner != job.lock_owner: - raise JobNotOwnedError() - job.lock = None - job.lock_owner = None - session.merge(job) + db_jobs = ( + session.query(Job) + .filter(Job.id.in_(job_ids)) + .all() + ) + + db_map = {j.id: j for j in db_jobs} + + for job in job_objs: + j1 = db_map[job.id] + + if j1.lock_owner is None: + raise JobNotLockedError() + + if j1.lock_owner != job.lock_owner: + raise JobNotOwnedError() + + job.lock = None + job.lock_owner = None + session.merge(job) + session.commit() def clear_locks(self, jobs=None): @@ -407,30 +448,38 @@ def locked(self, jobs, count=None, owner=None): """ if owner is None: owner = self._lockstr() + if isinstance(jobs, (int, Job)): jobs = [jobs] + + job_ids = [j.id if isinstance(j, Job) else j for j in jobs] + locked = [] - for job in jobs: - if len(locked) >= (1 if count is None else count): - break - if isinstance(job, Job): - job = job.id - try: - j = self.lock(job) - except JobLockedError: - continue - locked.append(j) + try: - if count is None: - if len(locked): - yield locked[0] - else: - yield None - else: - yield locked + with self.session_scope() as session: + unlocked_ids = { + j.id for j in session.query(Job.id) + .filter(Job.id.in_(job_ids), Job.lock == None) + } + + selected = [] + limit = count if count is not None else 1 + + for jid in job_ids: + if jid in unlocked_ids: + selected.append(jid) + if len(selected) >= limit: + break + + if selected: + locked = self.lock(selected, owner=owner) + + yield locked + finally: - for j in locked: - self.unlock(j) + if locked: + self.unlock(locked) def get_resource(self, jclass, n=None, jstate='open', tags={}): jobs = self.get_jobs(jclass, jstate=jstate, tags=tags) From 4c28ef88aeb9aa7618745e6180f40c103f587fe0 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Fri, 6 Mar 2026 12:51:17 -0800 Subject: [PATCH 02/10] preproc jdb batch update --- .../multilayer_preprocess_tod.py | 71 +++++++++++++++---- sotodlib/site_pipeline/preprocess_tod.py | 58 +++++++++++---- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/sotodlib/site_pipeline/multilayer_preprocess_tod.py b/sotodlib/site_pipeline/multilayer_preprocess_tod.py index f55686e0d..1112e107e 100644 --- a/sotodlib/site_pipeline/multilayer_preprocess_tod.py +++ b/sotodlib/site_pipeline/multilayer_preprocess_tod.py @@ -208,7 +208,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], exist_ok=True) # jobdb - jobdb_path = configs_proc.get("jobdb", None) + jobdb_path = configs_proc["jobdb"].get("path", None) if jobdb_path is not None: jdb = JobManager(sqlite_file=jobdb_path) # get init jobs @@ -404,6 +404,25 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], batch_size_init = configs_init['archive'].get('batch_size', 1) batch_size_proc = configs_proc['archive'].get('batch_size', 1) + if jobdb_path is not None: + # batch updates to JobDb + jdb_batch_size = configs['jobdb'].get('batch_size', 1) + batched_job_count = 0 + batched_job_fields = [] + + # get jobs up front since it is faster + init_jobs = jdb.get_jobs(jclass="init", jstate=JState.open) + tags_to_job_init = { + frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j + for j in init_jobs + } + + proc_jobs = jdb.get_jobs(jclass="proc", jstate=JState.open) + tags_to_job_proc = { + frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j + for j in proc_jobs + } + pb_name = f"pb_{str(int(time.time()))}.txt" with open(pb_name, 'w') as f: with MultiDbBatchManager( @@ -439,26 +458,50 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], configs_proc, logger, overwrite, db_manager=db_mgr_proc) + # update jobdb if jobdb_path is not None: tags = {} tags["obs:obs_id"] = obs_id for gb, g in zip(group_by, group): tags['dets:' + gb] = g - jobs = jdb.get_jobs(jstate=JState.open, tags=tags) + # get both init and proc + init_job = tags_to_job_init.get(fozenset(tags.items()), None) + proc_job = tags_to_job_proc.get(fozenset(tags.items()), None) + + if errors[0] is not None: + jstate = JState.failed + jerror = errors[0] + else: + jstate = JState.done + jerror = None + + for job in [init_job, proc_job]: + if job is not None: + batched_job_fields.append( + { + "job": job, + "error": jerror, + "jstate": jstate, + } + ) + + batched_job_count += 1 + + if (batched_job_count >= jdb_batch_size) or (len(futures) == 0): + jobs = [j['job'] for j in batched_job_fields] + job_idx = 0 + with jdb.locked(jobs, count=len(jobs)) as j: + for job in j: + job.mark_visited() + job.jstate = batched_job_fields[job_idx]["jstate"] + for _t in job._tags: + if _t.key == "error": + _t.value = batched_job_fields[job_idx]["error"] + job_idx += 1 + batched_job_count = 0 + batched_job_fields = [] - for job in jobs: - # init layer state will be JState.done if already run - if job.jstate == JState.open: - with jdb.locked(job) as j: - j.mark_visited() - if errors[0] is not None: - j.jstate = JState.failed - for _t in j._tags: - if _t.key == "error": - _t.value = errors[0] - else: - j.jstate = JState.done if raise_error: n_obs_fail = 0 n_groups_fail = 0 diff --git a/sotodlib/site_pipeline/preprocess_tod.py b/sotodlib/site_pipeline/preprocess_tod.py index d25cf06f5..72fd7476f 100644 --- a/sotodlib/site_pipeline/preprocess_tod.py +++ b/sotodlib/site_pipeline/preprocess_tod.py @@ -173,7 +173,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], os.makedirs(os.path.dirname(configs['archive']['policy']['filename']), exist_ok=True) - jobdb_path = configs.get("jobdb", None) + jobdb_path = configs["jobdb"].get("path", None) if jobdb_path is not None: jdb = JobManager(sqlite_file=jobdb_path) elif run_from_jobdb: @@ -312,6 +312,19 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], # batch updates to ManifestDb batch_size = configs['archive'].get('batch_size', 1) + if jobdb_path is not None: + # batch updates to JobDb + jdb_batch_size = configs['jobdb'].get('batch_size', 1) + batched_job_count = 0 + batched_job_fields = [] + + # get jobs up front since it is faster + jobs = jdb.get_jobs(jclass="init", jstate=JState.open) + tags_to_job = { + frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j + for j in existing_jobs + } + pb_name = f"pb_{str(int(time.time()))}.txt" with open(pb_name, 'w') as f: with DbBatchManager(db, batch_size=batch_size, logger=logger) as db_manager: @@ -336,22 +349,43 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], pp_util.cleanup_mandb(out_dict, out_meta, errors, configs, logger, overwrite, db_manager=db_manager) - # update jobdb + if jobdb_path is not None: tags = {} tags["obs:obs_id"] = obs_id for gb, g in zip(group_by, group): tags['dets:' + gb] = g - job = jdb.get_jobs(jclass="init", jstate=JState.open, tags=tags) - with jdb.locked(job) as j: - j.mark_visited() - if errors[0] is not None: - j.jstate = JState.failed - for _t in j._tags: - if _t.key == "error": - _t.value = errors[0] - else: - j.jstate = JState.done + + if errors[0] is not None: + jstate = JState.failed + jerror = errors[0] + else: + jstate = JState.done + jerror = None + + batched_job_fields.append( + { + "job": tags_to_job[frozenset(tags.items())], + "error": jerror, + "jstate": jstate, + } + ) + + batched_job_count += 1 + + if (batched_job_count >= jdb_batch_size) or (len(futures) == 0): + jobs = [j['job'] for j in batched_job_fields] + job_idx = 0 + with jdb.locked(jobs, count=len(jobs)) as j: + for job in j: + job.mark_visited() + job.jstate = batched_job_fields[job_idx]["jstate"] + for _t in job._tags: + if _t.key == "error": + _t.value = batched_job_fields[job_idx]["error"] + job_idx += 1 + batched_job_count = 0 + batched_job_fields = [] if raise_error: n_obs_fail = 0 From 3332d9700e6297a9e438a8063260cba0134adac6 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Fri, 6 Mar 2026 12:56:34 -0800 Subject: [PATCH 03/10] fix for jdb test --- sotodlib/site_pipeline/jobdb.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sotodlib/site_pipeline/jobdb.py b/sotodlib/site_pipeline/jobdb.py index 3cc306368..93eadef17 100644 --- a/sotodlib/site_pipeline/jobdb.py +++ b/sotodlib/site_pipeline/jobdb.py @@ -315,6 +315,9 @@ def lock(self, job_ids, owner=None, force=False): now = time.time() + if isinstance(job_ids, int): + job_ids = [job_ids] + with self.session_scope() as session: q = session.query(Job).filter(Job.id.in_(job_ids)) From 5f5c4f6b5fada7efd275488c752de3645f94d615 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Fri, 6 Mar 2026 13:56:52 -0800 Subject: [PATCH 04/10] more fixes --- sotodlib/site_pipeline/jobdb.py | 4 ++-- sotodlib/site_pipeline/preprocess_tod.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sotodlib/site_pipeline/jobdb.py b/sotodlib/site_pipeline/jobdb.py index 93eadef17..3e01d40f2 100644 --- a/sotodlib/site_pipeline/jobdb.py +++ b/sotodlib/site_pipeline/jobdb.py @@ -343,10 +343,10 @@ def lock(self, job_ids, owner=None, force=False): locked_jobs = [j for j in jobs if j.lock_owner == owner] - if len(locked_jobs) != len(job_ids): + if n == 0 or len(locked_jobs) != len(job_ids): raise JobLockedError() - return locked_jobs + return locked_jobs[0] if len(locked_jobs) == 1 else locked_jobs def unlock(self, jobs, merge=True): if not isinstance(jobs, (list, tuple, set)): diff --git a/sotodlib/site_pipeline/preprocess_tod.py b/sotodlib/site_pipeline/preprocess_tod.py index 72fd7476f..4d80ff96b 100644 --- a/sotodlib/site_pipeline/preprocess_tod.py +++ b/sotodlib/site_pipeline/preprocess_tod.py @@ -322,7 +322,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], jobs = jdb.get_jobs(jclass="init", jstate=JState.open) tags_to_job = { frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j - for j in existing_jobs + for j in jobs } pb_name = f"pb_{str(int(time.time()))}.txt" From 74f14d19fbffe49616efa4e99cbd5f8c3a9af9d5 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Fri, 6 Mar 2026 14:43:48 -0800 Subject: [PATCH 05/10] tests, batch remove --- sotodlib/site_pipeline/jobdb.py | 24 +++++++++++++++--------- tests/test_jobdb.py | 28 ++++++++++++++++++++++++---- 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/sotodlib/site_pipeline/jobdb.py b/sotodlib/site_pipeline/jobdb.py index 3e01d40f2..25a8f5c53 100644 --- a/sotodlib/site_pipeline/jobdb.py +++ b/sotodlib/site_pipeline/jobdb.py @@ -318,6 +318,8 @@ def lock(self, job_ids, owner=None, force=False): if isinstance(job_ids, int): job_ids = [job_ids] + job_ids = [(j.id if isinstance(j, Job) else j) for j in job_ids] + with self.session_scope() as session: q = session.query(Job).filter(Job.id.in_(job_ids)) @@ -343,7 +345,7 @@ def lock(self, job_ids, owner=None, force=False): locked_jobs = [j for j in jobs if j.lock_owner == owner] - if n == 0 or len(locked_jobs) != len(job_ids): + if n !=len(job_ids) or len(locked_jobs) != len(job_ids): raise JobLockedError() return locked_jobs[0] if len(locked_jobs) == 1 else locked_jobs @@ -408,18 +410,22 @@ def clear_locks(self, jobs=None): q = q.filter(Job.id == j) q.update({Job.lock: None, Job.lock_owner: None}) - def remove_job(self, job_id, check_locked=False): + def remove_job(self, job_ids, check_locked=False): + if isinstance(job_ids, (int, Job)): + job_ids = [job_ids] + + job_ids = [j.id if isinstance(j, Job) else j for j in job_ids] + with self.session_scope() as session: + q = session.query(Job).filter(Job.id.in_(job_ids)) + if check_locked: - q = session.query(Job).filter( - sqy.and_(Job.id == job_id, - Job.lock == None)) # noqa: E711 - else: - q = session.query(Job).filter(Job.id == job_id) + q = q.filter(Job.lock == None) # noqa: E711 - n = q.delete() + n = q.delete(synchronize_session=False) session.commit() - if n == 0: + + if n != len(job_ids): raise JobLockedError() @contextmanager diff --git a/tests/test_jobdb.py b/tests/test_jobdb.py index cbdbc9c11..ad6bb896c 100644 --- a/tests/test_jobdb.py +++ b/tests/test_jobdb.py @@ -40,6 +40,12 @@ def test_00_smoke(self): job = jdb.lock(j.id) jdb.unlock(job.id, merge=False) + # Locking many jobs at once + jobs = jdb.lock(jobs_to_do) + with self.assertRaises(jobdb.JobLockedError): + job = jdb.lock(jobs) + jdb.unlock(jobs, merge=False) + # State write-back for row in jobs_to_do: print(f'Finishing {row.id} ...') @@ -51,15 +57,21 @@ def test_00_smoke(self): self.assertNotEqual(len(jdb.get_jobs(jclass='jclass1', jstate='all')), 0) - # Deleting + # Deleting one job jobs_to_delete = jdb.get_jobs(jclass='jclass1', jstate='done') - for j in jobs_to_delete: - jdb.remove_job(j.id) + jdb.remove_job(jobs_to_delete[0].id) + self.assertEqual( + len(jdb.get_jobs(jclass='jclass1', jstate='all')), + len(jobs_to_delete) - 1 + ) + # Deleting many jobs + jobs_to_delete = jdb.get_jobs(jclass='jclass1', jstate='done') + jdb.remove_job(jobs_to_delete) self.assertEqual(len(jdb.get_jobs(jclass='jclass1', jstate='all')), 0) # Create-and-operate - j = jdb.create_job('jlcass2', {'obs_id': '123455'}) + j = jdb.create_job('jclass2', {'obs_id': '123455'}) with jdb.locked(j) as job: job.mark_visited() @@ -94,6 +106,14 @@ def test_20_locks(self): jdb.clear_locks('all') jdb.lock(jobs[0].id) + # Test locking many at once + with jdb.locked(jobs, count=len(jobs)): + for j in jobs: + with self.assertRaises(jobdb.JobLockedError): + jdb.lock(j.id) + jdb.clear_locks('all') + jdb.lock(jobs) + with self.assertRaises(jobdb.JobNotOwnedError): with jdb.locked(jobs, count=10) as jobs: # Simulate another entity stealing a lock. From c0e8666cb4012e99fd1dd71003522783ec44871f Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Mon, 9 Mar 2026 07:50:10 -0700 Subject: [PATCH 06/10] more changes --- sotodlib/site_pipeline/multilayer_preprocess_tod.py | 8 +++----- sotodlib/site_pipeline/preprocess_tod.py | 4 +--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sotodlib/site_pipeline/multilayer_preprocess_tod.py b/sotodlib/site_pipeline/multilayer_preprocess_tod.py index 1112e107e..b287fbeb5 100644 --- a/sotodlib/site_pipeline/multilayer_preprocess_tod.py +++ b/sotodlib/site_pipeline/multilayer_preprocess_tod.py @@ -406,7 +406,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], if jobdb_path is not None: # batch updates to JobDb - jdb_batch_size = configs['jobdb'].get('batch_size', 1) + jdb_batch_size = configs_proc['jobdb'].get('batch_size', 1) batched_job_count = 0 batched_job_fields = [] @@ -490,15 +490,13 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], if (batched_job_count >= jdb_batch_size) or (len(futures) == 0): jobs = [j['job'] for j in batched_job_fields] - job_idx = 0 with jdb.locked(jobs, count=len(jobs)) as j: - for job in j: + for job_idx, job in enumerate(j): job.mark_visited() job.jstate = batched_job_fields[job_idx]["jstate"] for _t in job._tags: if _t.key == "error": _t.value = batched_job_fields[job_idx]["error"] - job_idx += 1 batched_job_count = 0 batched_job_fields = [] @@ -599,7 +597,7 @@ def get_parser(parser=None): def main(configs_init: str, configs_proc: str, - query: Optional[str] = None, + query: str = '', obs_id: Optional[str] = None, overwrite: bool = False, min_ctime: Optional[int] = None, diff --git a/sotodlib/site_pipeline/preprocess_tod.py b/sotodlib/site_pipeline/preprocess_tod.py index 4d80ff96b..115528a07 100644 --- a/sotodlib/site_pipeline/preprocess_tod.py +++ b/sotodlib/site_pipeline/preprocess_tod.py @@ -375,15 +375,13 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], if (batched_job_count >= jdb_batch_size) or (len(futures) == 0): jobs = [j['job'] for j in batched_job_fields] - job_idx = 0 with jdb.locked(jobs, count=len(jobs)) as j: - for job in j: + for job_idx, job in enumerate(j): job.mark_visited() job.jstate = batched_job_fields[job_idx]["jstate"] for _t in job._tags: if _t.key == "error": _t.value = batched_job_fields[job_idx]["error"] - job_idx += 1 batched_job_count = 0 batched_job_fields = [] From 948c850229a3c113d3e0ad5b10bf26cee641ef57 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Mon, 9 Mar 2026 08:21:32 -0700 Subject: [PATCH 07/10] fixes --- sotodlib/site_pipeline/multilayer_preprocess_tod.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sotodlib/site_pipeline/multilayer_preprocess_tod.py b/sotodlib/site_pipeline/multilayer_preprocess_tod.py index b287fbeb5..fa6cd9cf5 100644 --- a/sotodlib/site_pipeline/multilayer_preprocess_tod.py +++ b/sotodlib/site_pipeline/multilayer_preprocess_tod.py @@ -14,6 +14,7 @@ from sotodlib.coords import demod as demod_mm from sotodlib.hwp import hwp_angle_model from sotodlib import core +from sotodlib.core.metadata.manifest import MultiDbBatchManager from sotodlib.site_pipeline.jobdb import JobManager, JState from sotodlib.preprocess import _Preprocess, Pipeline, processes import sotodlib.preprocess.preprocess_util as pp_util @@ -426,7 +427,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], pb_name = f"pb_{str(int(time.time()))}.txt" with open(pb_name, 'w') as f: with MultiDbBatchManager( - [db_init, db_proc], batch_sizes=[batch_size_init, batch_size_proc], logger=logger + [db_init, db_proc], batch_size=[batch_size_init, batch_size_proc], logger=logger ) as (db_mgr_init, db_mgr_proc): for future in tqdm(as_completed_callable(futures), total=total, desc="multilayer_preprocess_tod", file=f, @@ -466,8 +467,8 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], for gb, g in zip(group_by, group): tags['dets:' + gb] = g # get both init and proc - init_job = tags_to_job_init.get(fozenset(tags.items()), None) - proc_job = tags_to_job_proc.get(fozenset(tags.items()), None) + init_job = tags_to_job_init.get(frozenset(tags.items()), None) + proc_job = tags_to_job_proc.get(frozenset(tags.items()), None) if errors[0] is not None: jstate = JState.failed From 0449720ba7b941fd7fa8662b8de9c14d50744cd1 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Mon, 9 Mar 2026 08:39:32 -0700 Subject: [PATCH 08/10] make jobdb optional again --- sotodlib/site_pipeline/multilayer_preprocess_tod.py | 2 +- sotodlib/site_pipeline/preprocess_tod.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sotodlib/site_pipeline/multilayer_preprocess_tod.py b/sotodlib/site_pipeline/multilayer_preprocess_tod.py index fa6cd9cf5..45e44b747 100644 --- a/sotodlib/site_pipeline/multilayer_preprocess_tod.py +++ b/sotodlib/site_pipeline/multilayer_preprocess_tod.py @@ -209,7 +209,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], exist_ok=True) # jobdb - jobdb_path = configs_proc["jobdb"].get("path", None) + jobdb_path = configs_proc.get("jobdb", {}).get("path") if jobdb_path is not None: jdb = JobManager(sqlite_file=jobdb_path) # get init jobs diff --git a/sotodlib/site_pipeline/preprocess_tod.py b/sotodlib/site_pipeline/preprocess_tod.py index 115528a07..e1c4e1ee5 100644 --- a/sotodlib/site_pipeline/preprocess_tod.py +++ b/sotodlib/site_pipeline/preprocess_tod.py @@ -173,7 +173,7 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"], os.makedirs(os.path.dirname(configs['archive']['policy']['filename']), exist_ok=True) - jobdb_path = configs["jobdb"].get("path", None) + jobdb_path = configs.get("jobdb", {}).get("path") if jobdb_path is not None: jdb = JobManager(sqlite_file=jobdb_path) elif run_from_jobdb: From 8c0513d145dd7134c44aad9d121503a489ba6827 Mon Sep 17 00:00:00 2001 From: Michael McCrackan Date: Mon, 9 Mar 2026 08:50:46 -0700 Subject: [PATCH 09/10] rename remove_job, docs --- sotodlib/site_pipeline/jobdb.py | 22 ++++++++++++---------- tests/test_jobdb.py | 4 ++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sotodlib/site_pipeline/jobdb.py b/sotodlib/site_pipeline/jobdb.py index 9e409454f..8a93fc128 100644 --- a/sotodlib/site_pipeline/jobdb.py +++ b/sotodlib/site_pipeline/jobdb.py @@ -67,7 +67,7 @@ Delete some jobs:: for j in jdb.get_jobs(jclass='my_analysis'): - jdb.remove_job(j.id) + jdb.remove_jobs(j.id) """ @@ -308,10 +308,11 @@ def lock(self, job_ids, owner=None, force=False): """Lock one or more Jobs record by id. If a Job is already locked, a JobLockedError is raised. - Returns the Job objects that has been expunged from the database - session. The object attributes can be modified, but won't be - written back to the database unless the object is merged into - a new session. + Returns the Job object(s) that has been expunged from the database + session. If a single job id is input one Job will be returned, + otherwise a list of Jobs will be returned. The object attributes can + be modified, but won't be written back to the database unless the + object is merged into a new session. """ if owner is None: @@ -355,6 +356,7 @@ def lock(self, job_ids, owner=None, force=False): return locked_jobs[0] if len(locked_jobs) == 1 else locked_jobs def unlock(self, jobs, merge=True): + """Unlock one or more jobs.""" if not isinstance(jobs, (list, tuple, set)): jobs = [jobs] @@ -414,7 +416,7 @@ def clear_locks(self, jobs=None): q = q.filter(Job.id == j) q.update({Job.lock: None, Job.lock_owner: None}) - def remove_job(self, job_ids, check_locked=False): + def remove_jobs(self, job_ids, check_locked=False): if isinstance(job_ids, (int, Job)): job_ids = [job_ids] @@ -435,9 +437,9 @@ def remove_job(self, job_ids, check_locked=False): @contextmanager def locked(self, jobs, count=None, owner=None): """Context Manager to grant exclusive access to one or more - Job. Job record is marked as locked, and this process may - freely work on the job and alter the job data. When execution - leaves the context, the Job will be marked as unlocked. Note + Jobs. Job records are marked as locked, and this process may + freely work on the jobs and alter the job data. When execution + leaves the context, the Jobs will be marked as unlocked. Note the _database_ is only explicitly locked while this lock is being acquired and released. In between, other entities can do other database stuff. @@ -610,7 +612,7 @@ def cli(args=None): elif args.action == 'delete': print('Removing jobs ...') for j in jobs: - jdb.remove_job(j) + jdb.remove_jobs(j) elif args.action.startswith('set-'): for k in ['open', 'done', 'failed', 'ignored']: if args.action == f'set-{k}': diff --git a/tests/test_jobdb.py b/tests/test_jobdb.py index ad6bb896c..cbc3a00b4 100644 --- a/tests/test_jobdb.py +++ b/tests/test_jobdb.py @@ -59,7 +59,7 @@ def test_00_smoke(self): # Deleting one job jobs_to_delete = jdb.get_jobs(jclass='jclass1', jstate='done') - jdb.remove_job(jobs_to_delete[0].id) + jdb.remove_jobs(jobs_to_delete[0].id) self.assertEqual( len(jdb.get_jobs(jclass='jclass1', jstate='all')), len(jobs_to_delete) - 1 @@ -67,7 +67,7 @@ def test_00_smoke(self): # Deleting many jobs jobs_to_delete = jdb.get_jobs(jclass='jclass1', jstate='done') - jdb.remove_job(jobs_to_delete) + jdb.remove_jobs(jobs_to_delete) self.assertEqual(len(jdb.get_jobs(jclass='jclass1', jstate='all')), 0) # Create-and-operate From dc6397cb1ed037c1b5b336dbe5e71d6046408cfc Mon Sep 17 00:00:00 2001 From: msilvafe Date: Mon, 30 Mar 2026 17:33:35 -0400 Subject: [PATCH 10/10] Fix data processing issues introduced by new tf sim handling in t2p. --- sotodlib/preprocess/processes.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sotodlib/preprocess/processes.py b/sotodlib/preprocess/processes.py index 07a72b25f..c1d0e7c6b 100644 --- a/sotodlib/preprocess/processes.py +++ b/sotodlib/preprocess/processes.py @@ -2609,8 +2609,10 @@ def process(self, aman, proc_aman, sim=False, data_aman=None): T_signal=data_aman.dsT ) else: + pcfg = copy.deepcopy(self.process_cfgs) + pcfg.pop("fit_in_freq", None) tod_ops.t2pleakage.subtract_t2p(aman, proc_aman['t2p'], - **self.process_cfgs) + **pcfg) return aman, proc_aman class SplitFlags(_Preprocess):