-
Notifications
You must be signed in to change notification settings - Fork 23
Test Jobdb batch locking #1572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Test Jobdb batch locking #1572
Changes from all commits
c33f5fd
4c28ef8
3332d97
5f5c4f6
74f14d1
c0e8666
141d6f2
948c850
0449720
8c0513d
bce30b4
dc6397c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,7 @@ | |
| Delete some jobs:: | ||
|
|
||
| for j in jdb.get_jobs(jclass='my_analysis'): | ||
| jdb.remove_job(j.id) | ||
| jdb.remove_jobs(j.id) | ||
|
|
||
|
|
||
| """ | ||
|
|
@@ -304,55 +304,103 @@ def get_jobs(self, | |
| [session.expunge(j) for j in jobs] | ||
| return jobs | ||
|
|
||
| def lock(self, job_id, owner=None, force=False): | ||
| """Lock a Job record by id. If the Job is already locked, a | ||
| JobLockedError is raised. | ||
| def lock(self, job_ids, owner=None, force=False): | ||
| """Lock one or more Jobs record by id. If a Job is already locked, | ||
| a JobLockedError is raised. | ||
|
|
||
| Returns a Job object that has been expunged from the database | ||
| session. The object attributes can be modified, but won't be | ||
| written back to the database unless the object is merged into | ||
| a new session. | ||
| Returns the Job object(s) that has been expunged from the database | ||
| session. If a single job id is input one Job will be returned, | ||
| otherwise a list of Jobs will be returned. The object attributes can | ||
| be modified, but won't be written back to the database unless the | ||
| object is merged into a new session. | ||
|
|
||
| """ | ||
| if owner is None: | ||
| owner = self._lockstr() | ||
|
|
||
| now = time.time() | ||
|
|
||
| if isinstance(job_ids, int): | ||
| job_ids = [job_ids] | ||
|
|
||
| job_ids = [(j.id if isinstance(j, Job) else j) for j in job_ids] | ||
|
|
||
| with self.session_scope() as session: | ||
| q = session.query(Job) | ||
| if force: | ||
| q = q.filter(sqy.and_(Job.id == job_id)) | ||
| else: | ||
| q = q.filter(sqy.and_(Job.id == job_id, | ||
| Job.lock == None)) # noqa: E711 | ||
| n = q.update({Job.lock: time.time(), Job.lock_owner: owner}) | ||
| q = session.query(Job).filter(Job.id.in_(job_ids)) | ||
|
|
||
| if not force: | ||
| q = q.filter(Job.lock == None) | ||
|
|
||
| n = q.update( | ||
| {Job.lock: now, Job.lock_owner: owner}, | ||
| synchronize_session=False | ||
| ) | ||
|
|
||
| session.commit() | ||
|
|
||
| with self.session_scope() as session: | ||
| job = session.get(Job, job_id) | ||
| session.expunge(job) | ||
| jobs = ( | ||
| session.query(Job) | ||
| .filter(Job.id.in_(job_ids)) | ||
| .all() | ||
| ) | ||
|
|
||
| for job in jobs: | ||
| session.expunge(job) | ||
|
|
||
| if n == 0 or job.lock_owner != owner: | ||
| locked_jobs = [j for j in jobs if j.lock_owner == owner] | ||
|
|
||
| if n !=len(job_ids) or len(locked_jobs) != len(job_ids): | ||
| raise JobLockedError() | ||
|
Comment on lines
+353
to
354
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This behavior should be revisited -- as implemented here, if one of the many jobs is locked already, the code will lock them all and then raise an error. This is a sure-fire way for a small problem of a stray locked row to compound into multiple stray locked rows. Reasonable behaviors:
|
||
|
|
||
| return job | ||
| return locked_jobs[0] if len(locked_jobs) == 1 else locked_jobs | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As written, if I pass in [job_id] (a list) I will get back a scalar. That will cause trouble in some cases; better to capture scalarness in a variable at the top of this function, and use that to decide whether to unpack the list in the return. |
||
|
|
||
| def unlock(self, jobs, merge=True): | ||
| """Unlock one or more jobs.""" | ||
| if not isinstance(jobs, (list, tuple, set)): | ||
| jobs = [jobs] | ||
|
Comment on lines
+360
to
+361
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this check the same as what is done in |
||
|
|
||
| job_ids = [] | ||
| job_objs = [] | ||
|
|
||
| for j in jobs: | ||
| if isinstance(j, Job): | ||
| job_ids.append(j.id) | ||
| job_objs.append(j) | ||
| else: | ||
| job_ids.append(j) | ||
|
|
||
| def unlock(self, job, merge=True): | ||
| if not merge or isinstance(job, int): | ||
| if isinstance(job, Job): | ||
| job = job.id | ||
| if not merge or not job_objs: | ||
| with self.session_scope() as session: | ||
| session.query(Job).filter(Job.id == job).update( | ||
| {Job.lock: None, Job.lock_owner: None}) | ||
| session.query(Job).filter(Job.id.in_(job_ids)).update( | ||
| {Job.lock: None, Job.lock_owner: None}, | ||
| synchronize_session=False | ||
| ) | ||
| session.commit() | ||
|
|
||
| else: | ||
| with self.session_scope() as session: | ||
| j1 = session.query(Job).filter(Job.id == job.id).one() | ||
| if j1.lock_owner is None: | ||
| raise JobNotLockedError() | ||
| if j1.lock_owner != job.lock_owner: | ||
| raise JobNotOwnedError() | ||
| job.lock = None | ||
| job.lock_owner = None | ||
| session.merge(job) | ||
| db_jobs = ( | ||
| session.query(Job) | ||
| .filter(Job.id.in_(job_ids)) | ||
| .all() | ||
| ) | ||
|
|
||
| db_map = {j.id: j for j in db_jobs} | ||
|
|
||
| for job in job_objs: | ||
| j1 = db_map[job.id] | ||
|
|
||
| if j1.lock_owner is None: | ||
| raise JobNotLockedError() | ||
|
|
||
| if j1.lock_owner != job.lock_owner: | ||
| raise JobNotOwnedError() | ||
|
Comment on lines
+394
to
+398
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, error handling in the multiple-jobs should be done with some care. Perhaps on failure, you make a note and then try the next job? Then raise if any failures? In a multi-job case it would be helpful to provide traceback about what job failed user's assumptions here. |
||
|
|
||
| job.lock = None | ||
| job.lock_owner = None | ||
| session.merge(job) | ||
|
|
||
| session.commit() | ||
|
|
||
| def clear_locks(self, jobs=None): | ||
|
|
@@ -368,26 +416,30 @@ def clear_locks(self, jobs=None): | |
| q = q.filter(Job.id == j) | ||
| q.update({Job.lock: None, Job.lock_owner: None}) | ||
|
|
||
| def remove_job(self, job_id, check_locked=False): | ||
| def remove_jobs(self, job_ids, check_locked=False): | ||
| if isinstance(job_ids, (int, Job)): | ||
| job_ids = [job_ids] | ||
|
|
||
| job_ids = [j.id if isinstance(j, Job) else j for j in job_ids] | ||
|
|
||
| with self.session_scope() as session: | ||
| q = session.query(Job).filter(Job.id.in_(job_ids)) | ||
|
|
||
| if check_locked: | ||
| q = session.query(Job).filter( | ||
| sqy.and_(Job.id == job_id, | ||
| Job.lock == None)) # noqa: E711 | ||
| else: | ||
| q = session.query(Job).filter(Job.id == job_id) | ||
| q = q.filter(Job.lock == None) # noqa: E711 | ||
|
|
||
| n = q.delete() | ||
| n = q.delete(synchronize_session=False) | ||
| session.commit() | ||
| if n == 0: | ||
|
|
||
| if n != len(job_ids): | ||
| raise JobLockedError() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a custom error indicating that "only n of m jobs were successfully deleted"? |
||
|
|
||
| @contextmanager | ||
| def locked(self, jobs, count=None, owner=None): | ||
| """Context Manager to grant exclusive access to one or more | ||
| Job. Job record is marked as locked, and this process may | ||
| freely work on the job and alter the job data. When execution | ||
| leaves the context, the Job will be marked as unlocked. Note | ||
| Jobs. Job records are marked as locked, and this process may | ||
| freely work on the jobs and alter the job data. When execution | ||
| leaves the context, the Jobs will be marked as unlocked. Note | ||
| the _database_ is only explicitly locked while this lock is | ||
| being acquired and released. In between, other entities can | ||
| do other database stuff. | ||
|
|
@@ -411,30 +463,38 @@ def locked(self, jobs, count=None, owner=None): | |
| """ | ||
| if owner is None: | ||
| owner = self._lockstr() | ||
|
|
||
| if isinstance(jobs, (int, Job)): | ||
| jobs = [jobs] | ||
|
|
||
| job_ids = [j.id if isinstance(j, Job) else j for j in jobs] | ||
|
|
||
| locked = [] | ||
| for job in jobs: | ||
| if len(locked) >= (1 if count is None else count): | ||
| break | ||
| if isinstance(job, Job): | ||
| job = job.id | ||
| try: | ||
| j = self.lock(job) | ||
| except JobLockedError: | ||
| continue | ||
| locked.append(j) | ||
|
|
||
| try: | ||
| if count is None: | ||
| if len(locked): | ||
| yield locked[0] | ||
| else: | ||
| yield None | ||
| else: | ||
| yield locked | ||
| with self.session_scope() as session: | ||
| unlocked_ids = { | ||
| j.id for j in session.query(Job.id) | ||
| .filter(Job.id.in_(job_ids), Job.lock == None) | ||
| } | ||
|
|
||
| selected = [] | ||
| limit = count if count is not None else 1 | ||
|
|
||
| for jid in job_ids: | ||
| if jid in unlocked_ids: | ||
| selected.append(jid) | ||
| if len(selected) >= limit: | ||
| break | ||
|
|
||
| if selected: | ||
| locked = self.lock(selected, owner=owner) | ||
|
Comment on lines
+484
to
+491
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Depending on how self.lock() error handling is modified --) There's a race condition here that will not necessarily produce count newly locked items. Although it will be slow on slow databases, you probably need a loop here to keep locking items until you reach count or run out of candidates. There may be atomic one-liners to do this sort of thing; the original code didn't optimize for that (partly because I don't know sqlalchemy well enough). But consider something like: |
||
|
|
||
| yield locked | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs special handling to yield a single job if count is None. |
||
|
|
||
| finally: | ||
| for j in locked: | ||
| self.unlock(j) | ||
| if locked: | ||
| self.unlock(locked) | ||
|
|
||
| def get_resource(self, jclass, n=None, jstate='open', tags={}): | ||
| jobs = self.get_jobs(jclass, jstate=jstate, tags=tags) | ||
|
|
@@ -552,7 +612,7 @@ def cli(args=None): | |
| elif args.action == 'delete': | ||
| print('Removing jobs ...') | ||
| for j in jobs: | ||
| jdb.remove_job(j) | ||
| jdb.remove_jobs(j) | ||
| elif args.action.startswith('set-'): | ||
| for k in ['open', 'done', 'failed', 'ignored']: | ||
| if args.action == f'set-{k}': | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either outlaw passing in Job(s) entirely, or accept that a user might pass in a single Job; e.g.
isinstance(job_ids, (int, Job))