diff --git a/backend/lib/worker.py b/backend/lib/worker.py index fe3a862fb..b0686256a 100644 --- a/backend/lib/worker.py +++ b/backend/lib/worker.py @@ -140,11 +140,13 @@ def run(self): self.abort() except ProcessorException as e: self.log.error(str(e), frame=e.frame) + self.mark_job_after_crash() except Exception as e: stack = traceback.extract_tb(e.__traceback__) frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] location = "->".join(frames) self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) + self.mark_job_after_crash() finally: # Clean up after work successfully completed or terminates try: @@ -169,6 +171,29 @@ def run(self): except Exception: pass + def mark_job_after_crash(self): + """ + Decide what happens to the job after an unhandled crash + + On a crash the job is neither finished nor released by default, which + would leave it claimed (and so invisible to the delegator) until the + next restart. Instead: + + - recurring jobs are released so they retry on their next interval; + - one-shot jobs are parked, i.e. retried only on restart, to avoid a + tight crash loop. + + See `Job.park()` and `Job.release()`. + """ + if self.job.is_finished: + # the worker already finalised the job before crashing + return + + if self.job.is_recurring: + self.log.fatal("Worker %s crashed while processing recurring job %s - manaully release job for retry." % (self.type, self.job.data["remote_id"])) + + self.job.park() + def clean_up(self): """ Clean up after a processor runs successfully or results in error. diff --git a/backend/workers/api.py b/backend/workers/api.py index 5fc20af59..33767f331 100644 --- a/backend/workers/api.py +++ b/backend/workers/api.py @@ -260,35 +260,39 @@ def process_request(self, request, payload): worker = candidate break - is_claimed = job["timestamp_claimed"] > 0 - if not is_claimed and not worker: + # a job's claim flag encodes three states: + # 0 -> queued/claimable + # >0 -> claimed (running if a live worker exists, else a likely + # hard-kill zombie) + # -1 -> parked after a crash (Job.STATUS_PARKED); retried on the + # next restart. see Job.park() / BasicWorker.run(). + timestamp_claimed = job["timestamp_claimed"] + is_claimed = timestamp_claimed > 0 + is_parked = timestamp_claimed == Job.STATUS_PARKED + + if not worker and not is_claimed and not is_parked: + # truly queued and waiting to be claimed if jobtype not in queue: queue[jobtype] = 0 queue[jobtype] += 1 else: - # Claimed or has worker - if hasattr(worker, "dataset") and worker.dataset: - running_key = worker.dataset.key - running_user = worker.dataset.creator - running_parent = worker.dataset.top_parent().key - else: - running_key = None - running_user = None - running_parent = None - + # has a live worker, or is claimed/parked without one. dataset + # resolution is left to the frontend (which treats remote_id as + # a dataset key only for processor jobtypes); the API just + # reports job state. running.append({ "type": jobtype, "queue_id": queue_key, + "remote_id": job["remote_id"], "is_claimed": is_claimed, "is_running": bool(worker), - "is_processor": hasattr(worker, "dataset"), + # Processors have DataSets + "is_processor": jobtype in self.modules.processors, "is_recurring": (int(job["interval"]) > 0), + "is_parked": is_parked, "is_maybe_crashed": is_claimed and not bool(worker), - "dataset_key": running_key, - "dataset_user": running_user, - "dataset_parent_key": running_parent, "timestamp_queued": job["timestamp"], - "timestamp_claimed": job["timestamp_claimed"], + "timestamp_claimed": timestamp_claimed, "timestamp_lastclaimed": job["timestamp_lastclaimed"], }) diff --git a/common/lib/job.py b/common/lib/job.py index 1cd9aadee..cae973951 100644 --- a/common/lib/job.py +++ b/common/lib/job.py @@ -14,8 +14,16 @@ class Job: data = {} db = None + #: Sentinel value for `timestamp_claimed` marking a job that crashed and + #: should only be retried on restart. It is negative so it is excluded by + #: the `timestamp_claimed = 0` claimable check (the delegator never re-grabs + #: it), while `release_all()` resets it to 0 on the next restart, making it + #: claimable again. See `park()` and `queue.release_all()`. + STATUS_PARKED = -1 + is_finished = False is_claimed = False + is_parked = False def __init__(self, data, database=None): """ @@ -32,6 +40,7 @@ def __init__(self, data, database=None): try: self.is_finished = "is_finished" in self.data and self.data["is_finished"] self.is_claimed = self.data["timestamp_claimed"] and self.data["timestamp_claimed"] > 0 + self.is_parked = self.data["timestamp_claimed"] == self.STATUS_PARKED except KeyError: raise Exception @@ -86,7 +95,7 @@ def claim(self): This marks it in the database so it cannot be claimed again. """ - if self.data["interval"] == 0: + if not self.is_recurring: claim_time = int(time.time()) else: # the claim time should be a multiple of the interval to prevent @@ -116,7 +125,7 @@ def finish(self, delete=False): :param bool delete: Whether to force deleting the job even if it is a job with an interval. """ - if self.data["interval"] == 0 or delete: + if not self.is_recurring or delete: self.db.delete("jobs", where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) else: self.db.update("jobs", data={"timestamp_claimed": 0, "attempts": 0}, @@ -124,15 +133,21 @@ def finish(self, delete=False): self.is_finished = True - def release(self, delay=0, claim_after=0): + def release(self, delay=0, claim_after=0, increment_attempts=True): """ Release a job so it may be claimed again :param int delay: Delay in seconds after which job may be reclaimed. :param int claim_after: Timestamp after which job may be claimed. This is overridden by `delay`. - """ - update = {"timestamp_claimed": 0, "attempts": self.data["attempts"] + 1} + :param bool increment_attempts: Whether to count this as a failed + attempt. True for soft failures/retries (the default, matching the + retry-budget logic in e.g. scraper.py); False for a deliberate re-queue + such as a manual retry, which is not itself a failure. + """ + update = {"timestamp_claimed": 0} + if increment_attempts: + update["attempts"] = self.data["attempts"] + 1 if delay > 0: update["timestamp_after"] = int(time.time()) + delay @@ -146,13 +161,48 @@ def release(self, delay=0, claim_after=0): where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) self.is_claimed = False + def park(self): + """ + Park a job after a crash + + Marks the job as crashed by setting its claim to `STATUS_PARKED`. Parked + jobs are not claimable while the backend runs (the delegator only claims + jobs with `timestamp_claimed = 0`), so a crashing job is not retried in a + tight loop. `release_all()` resets the claim to 0 on the next restart, at + which point the job becomes claimable again ("retry on restart"). + + `attempts` is incremented so the crash count survives restarts + (`release_all()` does not reset it). + """ + self.db.update("jobs", + data={"timestamp_claimed": self.STATUS_PARKED, "attempts": self.data["attempts"] + 1}, + where={"jobtype": self.data["jobtype"], "remote_id": self.data["remote_id"]}) + self.data["timestamp_claimed"] = self.STATUS_PARKED + self.data["attempts"] = self.data["attempts"] + 1 + self.is_claimed = False + self.is_parked = True + + @property + def is_recurring(self): + """ + Is this a recurring job? + + NOTE: the claimable filter in `JobQueue.get_all_jobs()` / `get_job()` + (`restrict_claimable`) encodes this same interval logic directly in SQL + and must be kept in sync with this property. + + :return bool: Whether the job repeats. + """ + # adding this property in case we update the schema for other reoccuring job types (e.g. metronome) + return self.data["interval"] > 0 + def is_claimable(self): """ Can this job be claimed? - :return bool: If the job is not claimed yet and also isn't finished. + :return bool: If the job is not claimed, parked or finished. """ - return not self.is_claimed and not self.is_finished + return not self.is_claimed and not self.is_parked and not self.is_finished def get_place_in_queue(self): """ diff --git a/common/lib/module_loader.py b/common/lib/module_loader.py index 427ac6b1a..498599996 100644 --- a/common/lib/module_loader.py +++ b/common/lib/module_loader.py @@ -217,6 +217,11 @@ def _load_datasource(subdirectory, expiration): self.log_buffer += "Could not import %s: %s\n" % (module_name, e) return + if getattr(datasource, "DATASOURCE_DISABLED", False): + # module deliberately declined to register (e.g. dev-only datasource + # gated behind an env var); not an error, so don't warn. + return + if not hasattr(datasource, "init_datasource") or not hasattr(datasource, "DATASOURCE"): self.log_buffer += "Could not load datasource %s: missing init_datasource or DATASOURCE\n" % subdirectory return diff --git a/common/lib/queue.py b/common/lib/queue.py index 28a57e64f..9d4613eac 100644 --- a/common/lib/queue.py +++ b/common/lib/queue.py @@ -97,6 +97,11 @@ def get_all_jobs(self, jobtype="*", queue_id="*", limit=None, offset=None, remot query = "SELECT * FROM jobs %s" % filter if restrict_claimable: + # `timestamp_claimed = 0` excludes both running jobs (> 0) and + # crashed/parked jobs (Job.STATUS_PARKED, i.e. -1), which only become + # claimable again after release_all() on restart. + # NOTE: the `interval` recurrence logic here mirrors Job.is_recurring; + # keep the two in sync (e.g. if a "metronome" recurrence type is added). query += (" AND timestamp_claimed = 0" " AND timestamp_after < %s" " AND (interval = 0 OR timestamp_lastclaimed + interval < %s)") diff --git a/datasources/test/__init__.py b/datasources/test/__init__.py new file mode 100644 index 000000000..c698aade1 --- /dev/null +++ b/datasources/test/__init__.py @@ -0,0 +1,31 @@ +""" +Test datasource (development only) + +Provides a dummy "search" worker that creates datasets in deliberately distinct +states (completing normally, running forever, or crashing) so the worker-status +and queue admin pages can be exercised without running a real data collection. + +The search worker only registers itself when the +``FOURCAT_ENABLE_TEST_DATASOURCE`` environment variable is set to a truthy +value, so this datasource is inert (no worker, nothing runnable) on a normal or +production instance even though the folder is present. + +See ``helper-scripts/create_test_jobs.py`` for enqueuing one of each state. +""" +import os + +# only register this datasource when explicitly enabled, so it is inert on a +# normal/production instance. This MUST match the gate on the search worker in +# search_test.py: if the datasource registers without its worker, +# manager.validate_datasources() errors with "No search worker defined". +if os.environ.get("FOURCAT_ENABLE_TEST_DATASOURCE", "").lower() in ("1", "true", "yes", "on"): + # Use default data source init function + from common.lib.helpers import init_datasource as init_datasource + + # Internal identifier for this data source + DATASOURCE = "test" + NAME = "Test datasource (dev only)" +else: + # deliberately inert on a normal/production instance; tell the loader + # this is intentional so it doesn't warn about missing attributes + DATASOURCE_DISABLED = True diff --git a/datasources/test/search_test.py b/datasources/test/search_test.py new file mode 100644 index 000000000..2fca9b0a6 --- /dev/null +++ b/datasources/test/search_test.py @@ -0,0 +1,139 @@ +""" +Test datasource search worker (development only) + +This worker only registers itself when the ``FOURCAT_ENABLE_TEST_DATASOURCE`` +environment variable is set to a truthy value, so it never loads on a normal or +production instance. It produces dummy datasets in one of three deliberately +distinct states, selected via the ``mode`` parameter: + +- ``complete``: writes a few dummy rows and finishes normally +- ``forever``: runs indefinitely (until interrupted), updating progress +- ``crash``: raises a generic exception; because an unhandled exception + leaves the job claimed but releases no worker, this reproduces + the ``is_maybe_crashed`` state (claimed job, no live worker) + +Use ``helper-scripts/create_test_jobs.py`` to enqueue one of each. Note that the +backend daemon must also have ``FOURCAT_ENABLE_TEST_DATASOURCE`` set for the +jobs to actually be picked up and run. +""" +import os +import time + +from backend.lib.search import Search +from common.lib.user_input import UserInput +from common.lib.item_mapping import MappedItem +from common.lib.exceptions import ProcessorInterruptedException, ProcessorException + +# only make this worker available when explicitly enabled, so it never loads on +# a normal/production instance (the datasource folder is always discovered, but +# without this class there is no `test-search` worker and nothing can run) +TEST_DATASOURCE_ENABLED = os.environ.get("FOURCAT_ENABLE_TEST_DATASOURCE", "").lower() in ("1", "true", "yes", "on") + +if TEST_DATASOURCE_ENABLED: + + class SearchTest(Search): + """ + Dummy search worker for exercising the worker/queue status pages + """ + type = "test-search" # job ID + category = "Search" # category + title = "Test datasource (dev only)" # title displayed in UI + description = "Development-only datasource that creates dummy datasets in various states (complete, forever, crash) to exercise admin status pages." + extension = "ndjson" # extension of result file + + # not offered as a processor for existing datasets + accepts = [None] + + @classmethod + def get_queue_id(cls, remote_id, details, dataset) -> str: + # one queue per job so the dummy jobs run concurrently instead of + # serialising behind one another (a 'forever' job would otherwise block + # the rest). + return f"{cls.type}-{remote_id}" + + @classmethod + def get_options(cls, parent_dataset=None, config=None): + return { + "mode": { + "type": UserInput.OPTION_CHOICE, + "help": "Test mode", + "options": { + "complete": "Complete normally (writes dummy rows)", + "forever": "Run forever (until interrupted)", + "crash": "Crash (raise an exception)", + }, + "default": "complete", + }, + "amount": { + "type": UserInput.OPTION_TEXT, + "help": "Number of dummy rows (complete mode)", + "coerce_type": int, + "default": 5, + "min": 0, + }, + } + + def get_items(self, query): + """ + Generate dummy items, or run forever, or crash - depending on mode + + :param dict query: Query parameters, expects a `mode` key + :return: Iterable of dummy items (complete mode) or None (forever) + """ + mode = query.get("mode", "complete") + + if mode == "crash": + # leaves the job claimed with no live worker once the thread + # ends -> shows up as `is_maybe_crashed` on the status page + self.dataset.update_status("Test datasource: about to raise an exception") + raise Exception("Test datasource intentional crash (mode=crash)") + + if mode == "forever": + # block here until interrupted; this holds a worker slot so the + # job shows up as actively running with a moving progress bar + tick = 0 + while True: + if self.interrupted: + raise ProcessorInterruptedException("Interrupted while running forever (mode=forever)") + tick += 1 + self.dataset.update_status("Test datasource: running forever (tick %i)" % tick) + # oscillate progress 0..1 so the bar is visibly active + self.dataset.update_progress((tick % 20) / 20) + time.sleep(2) + + if self.job.is_recurring: + # recurring jobs are not expected to produce any items, so don't + # return any; just update the status and progress so the job + # shows up as active on the status page. + self.dataset.update_status("Test datasource: recurring job (mode=%s)" % mode) + self.dataset.update_progress(0.5) + raise ProcessorException("Recurring jobs are not expected to produce DataSets; this is a test datasource.") + + # mode == "complete": write some dummy rows and finish + amount = query.get("amount", 5) + items = [] + for i in range(amount): + if self.interrupted: + raise ProcessorInterruptedException("Interrupted while generating dummy data (mode=complete)") + self.dataset.update_progress((i + 1) / amount if amount else 1) + items.append({ + "id": str(i), + "thread_id": str(i), + "subject": "Dummy item %i" % i, + "body": "This is dummy test item %i." % i, + "author": "test_user", + "timestamp": "1970-01-01 00:00:00", + }) + + return items + + @staticmethod + def map_item(item): + return MappedItem({ + "id": item.get("id", ""), + "thread_id": item.get("thread_id", ""), + "subject": item.get("subject", ""), + "body": item.get("body", ""), + "author": item.get("author", ""), + "timestamp": item.get("timestamp", "1970-01-01 00:00:00"), + }) diff --git a/helper-scripts/create_test_jobs.py b/helper-scripts/create_test_jobs.py new file mode 100644 index 000000000..5f0a0c78f --- /dev/null +++ b/helper-scripts/create_test_jobs.py @@ -0,0 +1,77 @@ +""" +Enqueue dummy "test datasource" jobs in various states. + +This creates datasets/jobs for the development-only `test` datasource (see +`datasources/test/`) so the worker-status / queue admin pages can be exercised +without a real data collection. It can enqueue any combination of: + +- complete: finishes normally (a healthy dataset; not shown as "running") +- forever: runs indefinitely -> shows as actively running with progress +- crash: raises an exception -> shows as `is_maybe_crashed` (claimed, no worker) + +The backend daemon must ALSO have FOURCAT_ENABLE_TEST_DATASOURCE set (in its +environment) for these jobs to be picked up and run; otherwise the +`test-search` worker is not registered and the jobs sit in the queue. + +Usage: + python helper-scripts/create_test_jobs.py # one of each + python helper-scripts/create_test_jobs.py -m forever crash +""" +import argparse +import sys +import os + +# make sure the test datasource worker is registered for THIS process, so +# DataSet/ModuleCollector can resolve the `test-search` type while enqueuing +os.environ.setdefault("FOURCAT_ENABLE_TEST_DATASOURCE", "1") + +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)) + "/..") +from common.lib.database import Database +from common.lib.logger import Logger +from common.lib.queue import JobQueue +from common.lib.dataset import DataSet +from common.lib.job import Job +from common.lib.module_loader import ModuleCollector +from common.config_manager import ConfigManager + +cli = argparse.ArgumentParser() +cli.add_argument("-m", "--modes", nargs="+", default=["complete", "forever", "crash"], + choices=["complete", "forever", "crash"], help="Which dummy job state(s) to enqueue") +cli.add_argument("-i", "--interval", default=0, type=int, help="Interval between jobs (seconds)") +cli.add_argument("-u", "--user", default="anonymous", help="Username to assign the datasets to") +cli.add_argument("-a", "--amount", type=int, default=5, help="Dummy rows for 'complete' mode") +args = cli.parse_args() + +config = ConfigManager() +logger = Logger(log_path=config.get("PATH_LOGS").joinpath("create-test-jobs.log")) +db = Database(logger=logger, dbname=config.get("DB_NAME"), user=config.get("DB_USER"), + password=config.get("DB_PASSWORD"), host=config.get("DB_HOST"), port=config.get("DB_PORT"), + appname="create-test-jobs") +config.with_db(db) +modules = ModuleCollector(config) +queue = JobQueue(logger=logger, database=db) + +if "test-search" not in modules.workers: + print("The 'test-search' worker is not registered - cannot enqueue test jobs.") + print("(This should not happen here since the env var is forced on; check datasources/test/.)") + sys.exit(1) + +worker = modules.workers["test-search"] + +for mode in args.modes: + dataset = DataSet( + parameters={"datasource": "test", "type": "test-search", "mode": mode, "amount": args.amount}, + db=db, + type="test-search", + extension="ndjson", + is_private=False, + owner=args.user, + modules=modules + ) + dataset.update_label("Test job (%s)" % mode) + queue.add_job(worker_or_type=worker, dataset=dataset, interval=args.interval) + job = Job.get_by_remote_ID(dataset.key, db) + dataset.link_job(job) + print("Queued '%s' test job: dataset %s" % (mode, dataset.key)) + +print("Done. Make sure the backend has FOURCAT_ENABLE_TEST_DATASOURCE set so the jobs run.") diff --git a/processors/visualisation/word-trees.py b/processors/visualisation/word-trees.py index ed820f8bc..246156cb6 100644 --- a/processors/visualisation/word-trees.py +++ b/processors/visualisation/word-trees.py @@ -703,7 +703,6 @@ def render( # determine how high this block will be based on the available # height and the nodes we'll need to fit in it - parent_node = node.parent if not node.is_root else node block_width, block_height = self.get_bbox(node, side) own_width, own_height = self.get_bbox(node, side, False) diff --git a/webtool/templates/controlpanel/jobs.html b/webtool/templates/controlpanel/jobs.html index cd2602ea5..bbc75a8b2 100644 --- a/webtool/templates/controlpanel/jobs.html +++ b/webtool/templates/controlpanel/jobs.html @@ -47,9 +47,11 @@