Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions backend/lib/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ def run(self):
self.abort()
except ProcessorException as e:
self.log.error(str(e), frame=e.frame)
self.mark_job_after_crash()
except Exception as e:
stack = traceback.extract_tb(e.__traceback__)
frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
location = "->".join(frames)
self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
self.mark_job_after_crash()
finally:
# Clean up after work successfully completed or terminates
try:
Expand All @@ -169,6 +171,29 @@ def run(self):
except Exception:
pass

def mark_job_after_crash(self):
"""
Decide what happens to the job after an unhandled crash

On a crash the job is neither finished nor released by default, which
would leave it claimed (and so invisible to the delegator) until the
next restart. Instead:

- recurring jobs are released so they retry on their next interval;
Comment thread
dale-wahl marked this conversation as resolved.
- one-shot jobs are parked, i.e. retried only on restart, to avoid a
tight crash loop.

See `Job.park()` and `Job.release()`.
"""
if self.job.is_finished:
# the worker already finalised the job before crashing
return

if self.job.is_recurring:
self.job.release(delay=10)
else:
self.job.park()

def clean_up(self):
"""
Clean up after a processor runs successfully or results in error.
Expand Down
38 changes: 21 additions & 17 deletions backend/workers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,35 +260,39 @@ def process_request(self, request, payload):
worker = candidate
break

is_claimed = job["timestamp_claimed"] > 0
if not is_claimed and not worker:
# a job's claim flag encodes three states:
# 0 -> queued/claimable
# >0 -> claimed (running if a live worker exists, else a likely
# hard-kill zombie)
# -1 -> parked after a crash (Job.STATUS_PARKED); retried on the
# next restart. see Job.park() / BasicWorker.run().
timestamp_claimed = job["timestamp_claimed"]
is_claimed = timestamp_claimed > 0
is_parked = timestamp_claimed == Job.STATUS_PARKED

if not worker and not is_claimed and not is_parked:
# truly queued and waiting to be claimed
if jobtype not in queue:
queue[jobtype] = 0
queue[jobtype] += 1
else:
# Claimed or has worker
if hasattr(worker, "dataset") and worker.dataset:
running_key = worker.dataset.key
running_user = worker.dataset.creator
running_parent = worker.dataset.top_parent().key
else:
running_key = None
running_user = None
running_parent = None

# has a live worker, or is claimed/parked without one. dataset
# resolution is left to the frontend (which treats remote_id as
# a dataset key only for processor jobtypes); the API just
# reports job state.
running.append({
"type": jobtype,
"queue_id": queue_key,
"remote_id": job["remote_id"],
"is_claimed": is_claimed,
"is_running": bool(worker),
"is_processor": hasattr(worker, "dataset"),
# Processors have DataSets
"is_processor": jobtype in self.modules.processors,
"is_recurring": (int(job["interval"]) > 0),
"is_parked": is_parked,
"is_maybe_crashed": is_claimed and not bool(worker),
"dataset_key": running_key,
"dataset_user": running_user,
"dataset_parent_key": running_parent,
"timestamp_queued": job["timestamp"],
"timestamp_claimed": job["timestamp_claimed"],
"timestamp_claimed": timestamp_claimed,
"timestamp_lastclaimed": job["timestamp_lastclaimed"],
})

Expand Down
64 changes: 57 additions & 7 deletions common/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ class Job:
data = {}
db = None

#: Sentinel value for `timestamp_claimed` marking a job that crashed and
#: should only be retried on restart. It is negative so it is excluded by
#: the `timestamp_claimed = 0` claimable check (the delegator never re-grabs
#: it), while `release_all()` resets it to 0 on the next restart, making it
#: claimable again. See `park()` and `queue.release_all()`.
STATUS_PARKED = -1

is_finished = False
is_claimed = False
is_parked = False

def __init__(self, data, database=None):
"""
Expand All @@ -32,6 +40,7 @@ def __init__(self, data, database=None):
try:
self.is_finished = "is_finished" in self.data and self.data["is_finished"]
self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0
self.is_parked = self.data["timestamp_claimed"] == self.STATUS_PARKED
except KeyError:
raise Exception

Expand Down Expand Up @@ -86,7 +95,7 @@ def claim(self):

This marks it in the database so it cannot be claimed again.
"""
if self.data["interval"] == 0:
if not self.is_recurring:
claim_time = int(time.time())
else:
# the claim time should be a multiple of the interval to prevent
Expand Down Expand Up @@ -116,23 +125,29 @@ def finish(self, delete=False):
:param bool delete: Whether to force deleting the job even if it is a
job with an interval.
"""
if self.data["interval"] == 0 or delete:
if not self.is_recurring or delete:
self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
else:
self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0},
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})

self.is_finished = True

def release(self, delay=0, claim_after=0):
def release(self, delay=0, claim_after=0, increment_attempts=True):
"""
Release a job so it may be claimed again

:param int delay: Delay in seconds after which job may be reclaimed.
:param int claim_after: Timestamp after which job may be claimed. This
is overridden by `delay`.
"""
update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1}
:param bool increment_attempts: Whether to count this as a failed
attempt. True for soft failures/retries (the default, matching the
retry-budget logic in e.g. scraper.py); False for a deliberate re-queue
such as a manual retry, which is not itself a failure.
"""
update = {"timestamp_claimed": 0}
if increment_attempts:
update["attempts"] = self.data["attempts"] + 1
if delay > 0:
update["timestamp_after"] = int(time.time()) + delay

Expand All @@ -146,13 +161,48 @@ def release(self, delay=0, claim_after=0):
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
self.is_claimed = False

def park(self):
"""
Park a job after a crash

Marks the job as crashed by setting its claim to `STATUS_PARKED`. Parked
jobs are not claimable while the backend runs (the delegator only claims
jobs with `timestamp_claimed = 0`), so a crashing job is not retried in a
tight loop. `release_all()` resets the claim to 0 on the next restart, at
which point the job becomes claimable again ("retry on restart").

`attempts` is incremented so the crash count survives restarts
(`release_all()` does not reset it).
"""
self.db.update("jobs",
data={"timestamp_claimed": self.STATUS_PARKED, "attempts": self.data["attempts"] + 1},
where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]})
self.data["timestamp_claimed"] = self.STATUS_PARKED
self.data["attempts"] = self.data["attempts"] + 1
self.is_claimed = False
self.is_parked = True

@property
def is_recurring(self):
"""
Is this a recurring job?

NOTE: the claimable filter in `JobQueue.get_all_jobs()` / `get_job()`
(`restrict_claimable`) encodes this same interval logic directly in SQL
and must be kept in sync with this property.

:return bool: Whether the job repeats.
"""
# adding this property in case we update the schema for other reoccuring job types (e.g. metronome)
return self.data["interval"] > 0

def is_claimable(self):
"""
Can this job be claimed?

:return bool: If the job is not claimed yet and also isn't finished.
:return bool: If the job is not claimed, parked or finished.
"""
return not self.is_claimed and not self.is_finished
return not self.is_claimed and not self.is_parked and not self.is_finished

def get_place_in_queue(self):
"""
Expand Down
5 changes: 5 additions & 0 deletions common/lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def get_all_jobs(self, jobtype="*", queue_id="*", limit=None, offset=None, remot
query = "SELECT * FROM jobs %s" % filter

if restrict_claimable:
# `timestamp_claimed = 0` excludes both running jobs (> 0) and
# crashed/parked jobs (Job.STATUS_PARKED, i.e. -1), which only become
# claimable again after release_all() on restart.
# NOTE: the `interval` recurrence logic here mirrors Job.is_recurring;
# keep the two in sync (e.g. if a "metronome" recurrence type is added).
query += (" AND timestamp_claimed = 0"
" AND timestamp_after < %s"
" AND (interval = 0 OR timestamp_lastclaimed + interval < %s)")
Expand Down
27 changes: 27 additions & 0 deletions datasources/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Test datasource (development only)

Provides a dummy "search" worker that creates datasets in deliberately distinct
states (completing normally, running forever, or crashing) so the worker-status
and queue admin pages can be exercised without running a real data collection.

The search worker only registers itself when the
``FOURCAT_ENABLE_TEST_DATASOURCE`` environment variable is set to a truthy
value, so this datasource is inert (no worker, nothing runnable) on a normal or
production instance even though the folder is present.

See ``helper-scripts/create_test_jobs.py`` for enqueuing one of each state.
"""
import os

# only register this datasource when explicitly enabled, so it is inert on a
# normal/production instance. This MUST match the gate on the search worker in
# search_test.py: if the datasource registers without its worker,
# manager.validate_datasources() errors with "No search worker defined".
if os.environ.get("FOURCAT_ENABLE_TEST_DATASOURCE", "").lower() in ("1", "true", "yes", "on"):
# Use default data source init function
from common.lib.helpers import init_datasource as init_datasource

# Internal identifier for this data source
DATASOURCE = "test"
NAME = "Test datasource (dev only)"
131 changes: 131 additions & 0 deletions datasources/test/search_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Test datasource search worker (development only)

This worker only registers itself when the ``FOURCAT_ENABLE_TEST_DATASOURCE``
environment variable is set to a truthy value, so it never loads on a normal or
production instance. It produces dummy datasets in one of three deliberately
distinct states, selected via the ``mode`` parameter:

- ``complete``: writes a few dummy rows and finishes normally
- ``forever``: runs indefinitely (until interrupted), updating progress
- ``crash``: raises a generic exception; because an unhandled exception
leaves the job claimed but releases no worker, this reproduces
the ``is_maybe_crashed`` state (claimed job, no live worker)

Use ``helper-scripts/create_test_jobs.py`` to enqueue one of each. Note that the
backend daemon must also have ``FOURCAT_ENABLE_TEST_DATASOURCE`` set for the
jobs to actually be picked up and run.
"""
import os
import time

from backend.lib.search import Search
from common.lib.user_input import UserInput
from common.lib.item_mapping import MappedItem
from common.lib.exceptions import ProcessorInterruptedException

# only make this worker available when explicitly enabled, so it never loads on
# a normal/production instance (the datasource folder is always discovered, but
# without this class there is no `test-search` worker and nothing can run)
TEST_DATASOURCE_ENABLED = os.environ.get("FOURCAT_ENABLE_TEST_DATASOURCE", "").lower() in ("1", "true", "yes", "on")

if TEST_DATASOURCE_ENABLED:

class SearchTest(Search):
"""
Dummy search worker for exercising the worker/queue status pages
"""
type = "test-search" # job ID
category = "Search" # category
title = "Test datasource (dev only)" # title displayed in UI
description = "Development-only datasource that creates dummy datasets in various states (complete, forever, crash) to exercise admin status pages."
extension = "ndjson" # extension of result file

# not offered as a processor for existing datasets
accepts = [None]

@classmethod
def get_queue_id(cls, remote_id, details, dataset) -> str:
# one queue per job so the dummy jobs run concurrently instead of
# serialising behind one another (a 'forever' job would otherwise block
# the rest).
return f"{cls.type}-{remote_id}"

@classmethod
def get_options(cls, parent_dataset=None, config=None):
return {
"mode": {
"type": UserInput.OPTION_CHOICE,
"help": "Test mode",
"options": {
"complete": "Complete normally (writes dummy rows)",
"forever": "Run forever (until interrupted)",
"crash": "Crash (raise an exception)",
},
"default": "complete",
},
"amount": {
"type": UserInput.OPTION_TEXT,
"help": "Number of dummy rows (complete mode)",
"coerce_type": int,
"default": 5,
"min": 0,
},
}

def get_items(self, query):
"""
Generate dummy items, or run forever, or crash - depending on mode

:param dict query: Query parameters, expects a `mode` key
:return: Iterable of dummy items (complete mode) or None (forever)
"""
mode = query.get("mode", "complete")

if mode == "crash":
# leaves the job claimed with no live worker once the thread
# ends -> shows up as `is_maybe_crashed` on the status page
self.dataset.update_status("Test datasource: about to raise an exception")
raise Exception("Test datasource intentional crash (mode=crash)")

if mode == "forever":
# block here until interrupted; this holds a worker slot so the
# job shows up as actively running with a moving progress bar
tick = 0
while True:
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while running forever (mode=forever)")
tick += 1
self.dataset.update_status("Test datasource: running forever (tick %i)" % tick)
# oscillate progress 0..1 so the bar is visibly active
self.dataset.update_progress((tick % 20) / 20)
time.sleep(2)

# mode == "complete": write some dummy rows and finish
amount = query.get("amount", 5)
items = []
for i in range(amount):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while generating dummy data (mode=complete)")
self.dataset.update_progress((i + 1) / amount if amount else 1)
items.append({
"id": str(i),
"thread_id": str(i),
"subject": "Dummy item %i" % i,
"body": "This is dummy test item %i." % i,
"author": "test_user",
"timestamp": "1970-01-01 00:00:00",
})

return items

@staticmethod
def map_item(item):
return MappedItem({
"id": item.get("id", ""),
"thread_id": item.get("thread_id", ""),
"subject": item.get("subject", ""),
"body": item.get("body", ""),
"author": item.get("author", ""),
"timestamp": item.get("timestamp", "1970-01-01 00:00:00"),
})
Loading
Loading