Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions backend/lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ def run(self):
self.abort()
except ProcessorException as e:
self.log.error(str(e), frame=e.frame)
self.mark_job_after_crash()
except Exception as e:
stack = traceback.extract_tb(e.__traceback__)
frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
location = "->".join(frames)
self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
self.mark_job_after_crash()
finally:
# Clean up after work successfully completed or terminates
try:
Expand All @@ -169,6 +171,29 @@ def run(self):
except Exception:
pass

def mark_job_after_crash(self):
"""
Decide what happens to the job after an unhandled crash

On a crash the job is neither finished nor released by default, which
would leave it claimed (and so invisible to the delegator) until the
next restart. Instead:

- recurring jobs are released so they retry on their next interval;
Comment thread
dale-wahl marked this conversation as resolved.
- one-shot jobs are parked, i.e. retried only on restart, to avoid a
tight crash loop.

See `Job.park()` and `Job.release()`.
"""
if self.job.is_finished:
# the worker already finalised the job before crashing
return

if self.job.is_recurring:
self.log.fatal("Worker %s crashed while processing recurring job %s - manaully release job for retry." % (self.type, self.job.data["remote_id"]))

self.job.park()

def clean_up(self):
"""
Clean up after a processor runs successfully or results in error.
Expand Down
38 changes: 21 additions & 17 deletions backend/workers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,35 +260,39 @@ def process_request(self, request, payload):
worker = candidate
break

is_claimed = job["timestamp_claimed"] > 0
if not is_claimed and not worker:
# a job's claim flag encodes three states:
# 0 -> queued/claimable
# >0 -> claimed (running if a live worker exists, else a likely
# hard-kill zombie)
# -1 -> parked after a crash (Job.STATUS_PARKED); retried on the
# next restart. see Job.park() / BasicWorker.run().
timestamp_claimed = job["timestamp_claimed"]
is_claimed = timestamp_claimed > 0
is_parked = timestamp_claimed == Job.STATUS_PARKED

if not worker and not is_claimed and not is_parked:
# truly queued and waiting to be claimed
if jobtype not in queue:
queue[jobtype] = 0
queue[jobtype] += 1
else:
# Claimed or has worker
if hasattr(worker, "dataset") and worker.dataset:
running_key = worker.dataset.key
running_user = worker.dataset.creator
running_parent = worker.dataset.top_parent().key
else:
running_key = None
running_user = None
running_parent = None

# has a live worker, or is claimed/parked without one. dataset
# resolution is left to the frontend (which treats remote_id as
# a dataset key only for processor jobtypes); the API just
# reports job state.
running.append({
"type": jobtype,
"queue_id": queue_key,
"remote_id": job["remote_id"],
"is_claimed": is_claimed,
"is_running": bool(worker),
"is_processor": hasattr(worker, "dataset"),
# Processors have DataSets
"is_processor": jobtype in self.modules.processors,
"is_recurring": (int(job["interval"]) > 0),
"is_parked": is_parked,
"is_maybe_crashed": is_claimed and not bool(worker),
"dataset_key": running_key,
"dataset_user": running_user,
"dataset_parent_key": running_parent,
"timestamp_queued": job["timestamp"],
"timestamp_claimed": job["timestamp_claimed"],
"timestamp_claimed": timestamp_claimed,
"timestamp_lastclaimed": job["timestamp_lastclaimed"],
})

Expand Down
64 changes: 57 additions & 7 deletions common/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ class Job:
data = {}
db = None

#: Sentinel value for `timestamp_claimed` marking a job that crashed and
#: should only be retried on restart. It is negative so it is excluded by
#: the `timestamp_claimed = 0` claimable check (the delegator never re-grabs
#: it), while `release_all()` resets it to 0 on the next restart, making it
#: claimable again. See `park()` and `queue.release_all()`.
STATUS_PARKED = -1

is_finished = False
is_claimed = False
is_parked = False

def __init__(self, data, database=None):
"""
Expand All @@ -32,6 +40,7 @@ def __init__(self, data, database=None):
try:
self.is_finished = "is_finished" in self.data and self.data["is_finished"]
self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
self.is_parked = self.data["timestamp_claimed"] == self.STATUS_PARKED
except KeyError:
raise Exception

Expand Down Expand Up @@ -86,7 +95,7 @@ def claim(self):

This marks it in the database so it cannot be claimed again.
"""
if self.data["interval"] == 0:
if not self.is_recurring:
claim_time = int(time.time())
else:
# the claim time should be a multiple of the interval to prevent
Expand Down Expand Up @@ -116,23 +125,29 @@ def finish(self, delete=False):
:param bool delete: Whether to force deleting the job even if it is a
job with an interval.
"""
if self.data["interval"] == 0 or delete:
if not self.is_recurring or delete:
self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
else:
self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})

self.is_finished = True

def release(self, delay=0, claim_after=0):
def release(self, delay=0, claim_after=0, increment_attempts=True):
"""
Release a job so it may be claimed again

:param int delay: Delay in seconds after which job may be reclaimed.
:param int claim_after: Timestamp after which job may be claimed. This
is overridden by `delay`.
"""
update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
:param bool increment_attempts: Whether to count this as a failed
attempt. True for soft failures/retries (the default, matching the
retry-budget logic in e.g. scraper.py); False for a deliberate re-queue
such as a manual retry, which is not itself a failure.
"""
update = {"timestamp_claimed": 0}
if increment_attempts:
update["attempts"] = self.data["attempts"] + 1
if delay > 0:
update["timestamp_after"] = int(time.time()) + delay

Expand All @@ -146,13 +161,48 @@ def release(self, delay=0, claim_after=0):
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
self.is_claimed = False

def park(self):
"""
Park a job after a crash

Marks the job as crashed by setting its claim to `STATUS_PARKED`. Parked
jobs are not claimable while the backend runs (the delegator only claims
jobs with `timestamp_claimed = 0`), so a crashing job is not retried in a
tight loop. `release_all()` resets the claim to 0 on the next restart, at
which point the job becomes claimable again ("retry on restart").

`attempts` is incremented so the crash count survives restarts
(`release_all()` does not reset it).
"""
self.db.update("jobs",
data={"timestamp_claimed": self.STATUS_PARKED, "attempts": self.data["attempts"] + 1},
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
self.data["timestamp_claimed"] = self.STATUS_PARKED
self.data["attempts"] = self.data["attempts"] + 1
self.is_claimed = False
self.is_parked = True

@property
def is_recurring(self):
"""
Is this a recurring job?

NOTE: the claimable filter in `JobQueue.get_all_jobs()` / `get_job()`
(`restrict_claimable`) encodes this same interval logic directly in SQL
and must be kept in sync with this property.

:return bool: Whether the job repeats.
"""
# adding this property in case we update the schema for other reoccuring job types (e.g. metronome)
return self.data["interval"] > 0

def is_claimable(self):
"""
Can this job be claimed?

:return bool: If the job is not claimed yet and also isn't finished.
:return bool: If the job is not claimed, parked or finished.
"""
return not self.is_claimed and not self.is_finished
return not self.is_claimed and not self.is_parked and not self.is_finished

def get_place_in_queue(self):
"""
Expand Down
5 changes: 5 additions & 0 deletions common/lib/module_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ def _load_datasource(subdirectory, expiration):
self.log_buffer += "Could not import %s: %s\n" % (module_name, e)
return

if getattr(datasource, "DATASOURCE_DISABLED", False):
# module deliberately declined to register (e.g. dev-only datasource
# gated behind an env var); not an error, so don't warn.
return

if not hasattr(datasource, "init_datasource") or not hasattr(datasource, "DATASOURCE"):
self.log_buffer += "Could not load datasource %s: missing init_datasource or DATASOURCE\n" % subdirectory
return
Expand Down
5 changes: 5 additions & 0 deletions common/lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def get_all_jobs(self, jobtype="*", queue_id="*", limit=None, offset=None, remot
query = "SELECT * FROM jobs %s" % filter

if restrict_claimable:
# `timestamp_claimed = 0` excludes both running jobs (> 0) and
# crashed/parked jobs (Job.STATUS_PARKED, i.e. -1), which only become
# claimable again after release_all() on restart.
# NOTE: the `interval` recurrence logic here mirrors Job.is_recurring;
# keep the two in sync (e.g. if a "metronome" recurrence type is added).
query += (" AND timestamp_claimed = 0"
" AND timestamp_after < %s"
" AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
Expand Down
31 changes: 31 additions & 0 deletions datasources/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Test datasource (development only)

Provides a dummy "search" worker that creates datasets in deliberately distinct
states (completing normally, running forever, or crashing) so the worker-status
and queue admin pages can be exercised without running a real data collection.

The search worker only registers itself when the
``FOURCAT_ENABLE_TEST_DATASOURCE`` environment variable is set to a truthy
value, so this datasource is inert (no worker, nothing runnable) on a normal or
production instance even though the folder is present.

See ``helper-scripts/create_test_jobs.py`` for enqueuing one of each state.
"""
import os

# only register this datasource when explicitly enabled, so it is inert on a
# normal/production instance. This MUST match the gate on the search worker in
# search_test.py: if the datasource registers without its worker,
# manager.validate_datasources() errors with "No search worker defined".
if os.environ.get("FOURCAT_ENABLE_TEST_DATASOURCE", "").lower() in ("1", "true", "yes", "on"):
# Use default data source init function
from common.lib.helpers import init_datasource as init_datasource

# Internal identifier for this data source
DATASOURCE = "test"
NAME = "Test datasource (dev only)"
else:
# deliberately inert on a normal/production instance; tell the loader
# this is intentional so it doesn't warn about missing attributes
DATASOURCE_DISABLED = True
Loading
Loading