From e1e6aaac71f00d6eb92286f6539e17fc6e9245b6 Mon Sep 17 00:00:00 2001 From: ThrawnCA Date: Mon, 2 Mar 2026 16:23:41 +1000 Subject: [PATCH 1/6] allow configuration of multiple job queues --- README.md | 28 +++++++++++++++ ckanext/xloader/action.py | 45 ++++++------------------- ckanext/xloader/auth.py | 5 +-- ckanext/xloader/config_declaration.yaml | 7 ++++ ckanext/xloader/jobs.py | 21 ++++++++++++ ckanext/xloader/tests/test_action.py | 14 ++++++++ ckanext/xloader/tests/test_jobs.py | 15 ++++++--- test.ini | 1 + 8 files changed, 94 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 5e091308..c455b1ff 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,20 @@ Default value: `1000000000` The maximum file size that XLoader will attempt to load. +#### ckanext.xloader.queue_names + +Example: + +``` +ckanext.xloader.queue_names = foo baz +``` + +Default value: `default` + +A whitespace-separated list of worker queues that XLoader jobs can be sent to. + +By default, jobs for the same package will be sent to the same queue, +to reduce database lock contention. #### ckanext.xloader.use_type_guessing @@ -409,6 +423,20 @@ Controls whether or not the status badges display all of the statuses. By defaul the badges will display "pending", "running", and "error". With debug_badges enabled, they will also display "complete", "active", "inactive", and "unknown". +#### ckanext.xloader.queue_names + +Example: + +``` +ckanext.xloader.queue_names = foo,baz +``` + +Default value: 'default' + +Defines the queue(s) available to use for XLoader jobs. +Jobs for the same package will be sent to the same queue, +to reduce lock contention. + #### ckanext.xloader.validation.requires_successful_report Supports: __ckanext-validation__ diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index 108d7cc9..2c2b4f13 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -5,10 +5,11 @@ import json import logging -import ckan.lib.jobs as rq_jobs +from ckan.lib.jobs import get_queue import ckan.lib.navl.dictization_functions from ckan.logic import side_effect_free import ckan.plugins as p +from ckan.plugins.toolkit import config, enqueue_job, get_or_bust from dateutil.parser import parse as parse_date from dateutil.parser import isoparse as parse_iso_date @@ -16,13 +17,8 @@ from . import interfaces as xloader_interfaces, jobs, db, utils -enqueue_job = p.toolkit.enqueue_job -get_queue = rq_jobs.get_queue - log = logging.getLogger(__name__) -config = p.toolkit.config -_get_or_bust = p.toolkit.get_or_bust _validate = ckan.lib.navl.dictization_functions.validate @@ -48,7 +44,6 @@ def xloader_submit(context, data_dict): ''' p.toolkit.check_access('xloader_submit', context, data_dict) api_key = utils.get_xloader_user_apitoken() - custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME) schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema()) data_dict, errors = _validate(data_dict, schema, context) if errors: @@ -68,6 +63,8 @@ def xloader_submit(context, data_dict): }) except p.toolkit.ObjectNotFound: return False + package_id = resource_dict.get('package_id') + custom_queue = data_dict.pop('queue', jobs.get_default_queue_name(package_id)) for plugin in p.PluginImplementations(xloader_interfaces.IXloader): upload = plugin.can_upload(res_id) @@ -158,9 +155,10 @@ def xloader_submit(context, data_dict): 'set_url_type': data_dict.get('set_url_type', False), 'task_created': task['last_updated'], 'original_url': resource_dict.get('url'), + 'queue_name': custom_queue, } } - if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME: + if custom_queue not in jobs.DEFAULT_QUEUE_NAMES: # Don't automatically retry if it's a custom run data['metadata']['tries'] = jobs.MAX_RETRIES @@ -173,7 +171,7 @@ def xloader_submit(context, data_dict): try: job = enqueue_job( jobs.xloader_data_into_datastore, [data], queue=custom_queue, - title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), + title="xloader_submit: package: {} resource: {}".format(package_id, res_id), rq_kwargs=dict(timeout=timeout, at_front=sync) ) except Exception: @@ -200,29 +198,6 @@ def xloader_submit(context, data_dict): return True -def _enqueue(fn, args=None, kwargs=None, title=None, queue='default', - timeout=180): - '''Same as latest ckan.lib.jobs.enqueue - earlier CKAN versions dont have - the timeout param - - This function can be removed when dropping support for 2.7 - ''' - if args is None: - args = [] - if kwargs is None: - kwargs = {} - job = get_queue(queue).enqueue_call(func=fn, args=args, kwargs=kwargs, - timeout=timeout) - job.meta[u'title'] = title - job.save() - msg = u'Added background job {}'.format(job.id) - if title: - msg = u'{} ("{}")'.format(msg, title) - msg = u'{} to queue "{}"'.format(msg, queue) - log.info(msg) - return job - - def xloader_hook(context, data_dict): ''' Update xloader task. This action is typically called by ckanext-xloader whenever the status of a job changes. @@ -254,9 +229,9 @@ def xloader_hook(context, data_dict): ''' - metadata, status = _get_or_bust(data_dict, ['metadata', 'status']) + metadata, status = get_or_bust(data_dict, ['metadata', 'status']) - res_id = _get_or_bust(metadata, 'resource_id') + res_id = get_or_bust(metadata, 'resource_id') # Pass metadata, not data_dict, as it contains the resource id needed # on the auth checks @@ -338,7 +313,7 @@ def xloader_status(context, data_dict): if 'id' in data_dict: data_dict['resource_id'] = data_dict['id'] - res_id = _get_or_bust(data_dict, 'resource_id') + res_id = get_or_bust(data_dict, 'resource_id') task = p.toolkit.get_action('task_status_show')(context, { 'entity_id': res_id, diff --git a/ckanext/xloader/auth.py b/ckanext/xloader/auth.py index 2547db6d..3f4a66bc 100644 --- a/ckanext/xloader/auth.py +++ b/ckanext/xloader/auth.py @@ -1,5 +1,6 @@ from ckan import authz -from ckan.lib import jobs as rq_jobs + +from .jobs import DEFAULT_QUEUE_NAMES import ckanext.datastore.logic.auth as auth @@ -7,7 +8,7 @@ def xloader_submit(context, data_dict): # only sysadmins can specify a custom processing queue custom_queue = data_dict.get('queue') - if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME: + if custom_queue and custom_queue not in DEFAULT_QUEUE_NAMES: return authz.is_authorized('config_option_update', context, data_dict) return auth.datastore_auth(context, data_dict) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 13cd2ff7..c4587b01 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -163,6 +163,13 @@ groups: See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. + - key: ckanext.xloader.queue_names + default: default + example: foo baz + description: | + A whitespace-separated list of queues to enqueue XLoader jobs to. + Jobs for the same package will be sent to the same queue. + required: false - key: ckanext.xloader.clean_datastore_tables default: False example: True diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index cd550a32..b2739d11 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -18,6 +18,7 @@ from rq.timeouts import JobTimeoutException import sqlalchemy as sa +from ckan.lib.jobs import DEFAULT_QUEUE_NAME from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h from . import db, loader @@ -33,6 +34,7 @@ if not SSL_VERIFY: requests.packages.urllib3.disable_warnings() +DEFAULT_QUEUE_NAMES = config.get('ckanext.xloader.queue_names', DEFAULT_QUEUE_NAME).split() MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9) # Don't try Tabulator load on large files MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10) @@ -100,6 +102,25 @@ def is_retryable_error(error): # } # } + +def get_default_queue_name(package_id=None): + """ Retrieve the queue to be used in submitting jobs for the specified dataset. + + By sending all jobs for a dataset to the same queue, lock conflicts are reduced. + """ + if not DEFAULT_QUEUE_NAMES: + return DEFAULT_QUEUE_NAME + if not package_id: + return DEFAULT_QUEUE_NAMES[0] + + # Pick a queue by taking the first character of the package name + # and converting it into a numeric index to the list of queue names. + # We don't want a proper hash function, because those tend to add + # complications for the sake of (unnecessary) cryptographic strength. + queue_index = ord(package_id[0]) % len(DEFAULT_QUEUE_NAMES) + return DEFAULT_QUEUE_NAMES[queue_index] + + def xloader_data_into_datastore(input): '''This is the func that is queued. It is a wrapper for xloader_data_into_datastore_, and makes sure it finishes by calling diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index df0cb875..ac5cb837 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -32,6 +32,20 @@ def test_submit(self): resource_id=res["id"], ) assert 1 == enqueue_mock.call_count + assert enqueue_mock.call_args[1].get('queue') == 'default{}'.format(ord(res['package_id'][0]) % 2) + + def test_submit_nonexistent_resource(self): + user = factories.User() + with mock.patch( + "ckanext.xloader.action.enqueue_job", + return_value=mock.MagicMock(id=123), + ) as enqueue_mock: + assert helpers.call_action( + "xloader_submit", + context=dict(user=user["name"]), + resource_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + ) is False + assert enqueue_mock.call_count == 0 def test_submit_to_custom_queue_without_auth(self): # check that xloader_submit doesn't allow regular users to change queues diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index c586bd7e..880efef0 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -87,15 +87,20 @@ def data(create_with_upload, apikey): @pytest.mark.ckan_config("ckan.jobs.timeout", 2) class TestXLoaderJobs(helpers.FunctionalRQTestBase): + def test_derive_queue_name(self): + assert jobs.get_default_queue_name() == "default0" + assert jobs.get_default_queue_name("foo") == "default0" + assert jobs.get_default_queue_name("meh") == "default1" + def test_xloader_data_into_datastore(self, cli, data): self.enqueue(jobs.xloader_data_into_datastore, [data]) with mock.patch("ckanext.xloader.jobs.get_response", get_response): stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output - assert "File hash: d44fa65eda3675e11710682fdb5f1648" in stdout - assert "Fields: [{'id': 'x', 'type': 'text', 'strip_extra_white': True}, {'id': 'y', 'type': 'text', 'strip_extra_white': True}]" in stdout - assert "Copying to database..." in stdout - assert "Creating search index..." in stdout - assert "Express Load completed" in stdout + assert "File hash: d44fa65eda3675e11710682fdb5f1648" in stdout + assert "Fields: [{'id': 'x', 'type': 'text', 'strip_extra_white': True}, {'id': 'y', 'type': 'text', 'strip_extra_white': True}]" in stdout + assert "Copying to database..." in stdout + assert "Creating search index..." in stdout + assert "Express Load completed" in stdout resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] diff --git a/test.ini b/test.ini index c02827bd..bf80696a 100644 --- a/test.ini +++ b/test.ini @@ -17,6 +17,7 @@ use = config:../ckan/test-core.ini # tests here. ckan.plugins = xloader datastore ckanext.xloader.jobs_db.uri = sqlite:////tmp/jobs.db +ckanext.xloader.queue_names = default0 default1 # Logging configuration [loggers] From d0f9a373ab75b4adc12bfcd2a15e8f35dd14dec7 Mon Sep 17 00:00:00 2001 From: ThrawnCA Date: Mon, 2 Mar 2026 16:24:43 +1000 Subject: [PATCH 2/6] cleanup - Ensure temporary files are deleted after use - Flake8 fixes --- ckanext/xloader/jobs.py | 175 +++++++++++++-------------- ckanext/xloader/loader.py | 128 ++++++++++---------- ckanext/xloader/tests/test_chunks.py | 7 +- ckanext/xloader/tests/test_jobs.py | 2 + ckanext/xloader/utils.py | 9 ++ 5 files changed, 164 insertions(+), 157 deletions(-) diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index b2739d11..63bfe0d4 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -23,7 +23,7 @@ from . import db, loader from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, LoaderError, XLoaderTimeoutError -from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url +from .utils import cleanup_temp_file, datastore_resource_exists, set_resource_metadata, modify_input_url from ckan.lib.api_token import get_user_from_token @@ -270,93 +270,90 @@ def xloader_data_into_datastore_(input, job_dict, logger): tmp_file, file_hash = _download_resource_data(resource, data, api_key, logger) - if (resource.get('hash') == file_hash - and not data.get('ignore_hash')): - logger.info('Ignoring resource - the file hash hasn\'t changed: ' - '{hash}.'.format(hash=file_hash)) - return - logger.info('File hash: %s', file_hash) - resource['hash'] = file_hash - - def direct_load(allow_type_guessing=False): - fields = loader.load_csv( - tmp_file.name, - resource_id=resource['id'], - mimetype=resource.get('format'), - allow_type_guessing=allow_type_guessing, - logger=logger) - loader.calculate_record_count( - resource_id=resource['id'], logger=logger) - set_datastore_active(data, resource, logger) - if 'result_url' in input: - job_dict['status'] = 'running_but_viewable' - callback_xloader_hook(result_url=input['result_url'], - api_key=api_key, - job_dict=job_dict) - logger.info('Data now available to users: %s', resource_ckan_url) - loader.create_column_indexes( - fields=fields, - resource_id=resource['id'], - logger=logger) - update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, - patch_only=True) - logger.info('File Hash updated for resource: %s', resource['hash']) - - def tabulator_load(): - try: - loader.load_table(tmp_file.name, - resource_id=resource['id'], - mimetype=resource.get('format'), - logger=logger) - except JobError as e: - logger.error('Error during tabulator load: %s', e) - raise - loader.calculate_record_count( - resource_id=resource['id'], logger=logger) - set_datastore_active(data, resource, logger) - logger.info('Finished loading with tabulator') - update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, - patch_only=True) - logger.info('File Hash updated for resource: %s', resource['hash']) - - # Load it - logger.info('Loading CSV') - # If ckanext.xloader.use_type_guessing is not configured, fall back to - # deprecated ckanext.xloader.just_load_with_messytables - use_type_guessing = asbool( - config.get('ckanext.xloader.use_type_guessing', config.get( - 'ckanext.xloader.just_load_with_messytables', False))) \ - and not datastore_resource_exists(resource['id']) \ - and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH - logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: - if use_type_guessing: + if (resource.get('hash') == file_hash + and not data.get('ignore_hash')): + logger.info('Ignoring resource - the file hash hasn\'t changed: ' + '{hash}.'.format(hash=file_hash)) + return + logger.info('File hash: %s', file_hash) + resource['hash'] = file_hash + + def direct_load(allow_type_guessing=False): + fields = loader.load_csv( + tmp_file.name, + resource_id=resource['id'], + mimetype=resource.get('format'), + allow_type_guessing=allow_type_guessing, + logger=logger) + loader.calculate_record_count( + resource_id=resource['id'], logger=logger) + set_datastore_active(data, resource, logger) + if 'result_url' in input: + job_dict['status'] = 'running_but_viewable' + callback_xloader_hook(result_url=input['result_url'], + api_key=api_key, + job_dict=job_dict) + logger.info('Data now available to users: %s', resource_ckan_url) + loader.create_column_indexes( + fields=fields, + resource_id=resource['id'], + logger=logger) + update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, + patch_only=True) + logger.info('File Hash updated for resource: %s', resource['hash']) + + def tabulator_load(): try: - tabulator_load() + loader.load_table(tmp_file.name, + resource_id=resource['id'], + mimetype=resource.get('format'), + logger=logger) except JobError as e: - logger.warning('Load using tabulator failed: %s', e) - logger.info('Trying again with direct COPY') - direct_load() - else: - try: - direct_load(allow_type_guessing=True) - except (JobError, LoaderError) as e: - logger.warning('Load using COPY failed: %s', e) - logger.info('Trying again with tabulator') - tabulator_load() - except JobTimeoutException: - logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) - raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) - except FileCouldNotBeLoadedError as e: - logger.warning('Loading excerpt for this format not supported.') - logger.error('Loading file raised an error: %s', e) - raise JobError('Loading file raised an error: {}'.format(e)) - finally: + logger.error('Error during tabulator load: %s', e) + raise + loader.calculate_record_count( + resource_id=resource['id'], logger=logger) + set_datastore_active(data, resource, logger) + logger.info('Finished loading with tabulator') + update_resource(resource={'id': resource['id'], 'hash': resource['hash']}, + patch_only=True) + logger.info('File Hash updated for resource: %s', resource['hash']) + + # Load it + logger.info('Loading CSV') + # If ckanext.xloader.use_type_guessing is not configured, fall back to + # deprecated ckanext.xloader.just_load_with_messytables + use_type_guessing = asbool( + config.get('ckanext.xloader.use_type_guessing', config.get( + 'ckanext.xloader.just_load_with_messytables', False))) \ + and not datastore_resource_exists(resource['id']) \ + and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH + logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: - tmp_file.close() - os.remove(tmp_file.name) - except FileNotFoundError: - pass + if use_type_guessing: + try: + tabulator_load() + except JobError as e: + logger.warning('Load using tabulator failed: %s', e) + logger.info('Trying again with direct COPY') + direct_load() + else: + try: + direct_load(allow_type_guessing=True) + except (JobError, LoaderError) as e: + logger.warning('Load using COPY failed: %s', e) + logger.info('Trying again with tabulator') + tabulator_load() + except JobTimeoutException: + logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) + raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) + except FileCouldNotBeLoadedError as e: + logger.warning('Loading excerpt for this format not supported.') + logger.error('Loading file raised an error: %s', e) + raise JobError('Loading file raised an error: {}'.format(e)) + finally: + cleanup_temp_file(tmp_file) logger.info('Express Load completed') @@ -388,7 +385,7 @@ def _download_resource_data(resource, data, api_key, logger): logger.info('Fetching from: {0}'.format(url)) tmp_file = get_tmp_file(url) length = 0 - m = hashlib.md5() + m = hashlib.md5(usedforsecurity=False) cl = None try: headers = {} @@ -425,7 +422,7 @@ def _download_resource_data(resource, data, api_key, logger): data['datastore_contains_all_records_of_source_file'] = True except DataTooBigError: - tmp_file.close() + cleanup_temp_file(tmp_file) message = 'Data too large to load into Datastore: ' \ '{cl} bytes > max {max_cl} bytes.' \ .format(cl=cl or length, max_cl=MAX_CONTENT_LENGTH) @@ -450,7 +447,7 @@ def _download_resource_data(resource, data, api_key, logger): response.close() data['datastore_contains_all_records_of_source_file'] = False except requests.exceptions.HTTPError as error: - tmp_file.close() + cleanup_temp_file(tmp_file) # status code error logger.debug('HTTP error: %s', error) raise HTTPError( @@ -462,7 +459,7 @@ def _download_resource_data(resource, data, api_key, logger): raise XLoaderTimeoutError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: - tmp_file.close() + cleanup_temp_file(tmp_file) try: err_message = str(e.reason) except AttributeError: @@ -472,7 +469,7 @@ def _download_resource_data(resource, data, api_key, logger): message=err_message, status_code=None, request_url=url, response=None) except JobTimeoutException: - tmp_file.close() + cleanup_temp_file(tmp_file) logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 8ce183e3..030b9534 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -20,7 +20,7 @@ from .job_exceptions import FileCouldNotBeLoadedError, LoaderError from .parser import CSV_SAMPLE_LINES, TypeConverter -from .utils import datastore_resource_exists, headers_guess, type_guess +from .utils import cleanup_temp_file, datastore_resource_exists, headers_guess, type_guess from ckan.plugins.toolkit import config @@ -161,7 +161,6 @@ def copy_file(csv_filepath, engine, logger, resource_id, headers, delimiter): with engine.begin() as conn: cur = conn.connection.cursor() - #cur.execute('SET DATESTYLE TO "SQL , MDY"') try: with open(csv_filepath, 'rb') as f: # can't use :param for table name because params are only @@ -186,12 +185,12 @@ def copy_file(csv_filepath, engine, logger, resource_id, headers, delimiter): # but logging and exceptions need a normal (7 bit) str error_str = str(e) logger.warning('%s: %s', resource_id, error_str) - raise LoaderError('Error during the load into PostgreSQL:' - ' {}'.format(error_str)) + raise LoaderError('Error during the load into PostgreSQL: {}'.format(error_str)) finally: cur.close() -def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimiter = ',', max_size=1024**3, encoding='utf-8'): # 1 Gigabyte + +def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimiter=',', max_size=1024**3, encoding='utf-8'): # 1 Gigabyte """ Reads a CSV file, splits it into chunks of maximum size, and writes each chunk to PostgreSQL COPY command to load the data into a table. @@ -209,7 +208,7 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi file_size = os.path.getsize(input_file) logger.info('Starting chunked processing for file size: %s bytes with chunk size: %s bytes', file_size, max_size) - with open(input_file, 'r', encoding = encoding) as infile: + with open(input_file, 'r', encoding=encoding) as infile: current_file = None output_filename = f'/tmp/output_{resource_id}.csv' header = False @@ -228,7 +227,6 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi if header: current_file.write(delimiter.join(headers) + '\n') current_file.write(row) - # Close the last file if open if current_file: @@ -243,7 +241,7 @@ def split_copy_by_size(input_file, engine, logger, resource_id, headers, delimi logger.info('Completed chunked processing: %s chunks processed for file size %s bytes', chunk_count, file_size) if infile: - infile.close() + cleanup_temp_file(infile) def _read_metadata(table_filepath, mimetype, logger): @@ -330,12 +328,6 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing # TODO worry about csv header name problems # e.g. duplicate names - # encoding (and line ending?)- use chardet - # It is easier to reencode it as UTF8 than convert the name of the encoding - # to one that pgloader will understand. - logger.info('Ensuring character coding is UTF8') - f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) - # get column info from existing table existing, existing_info, existing_fields, existing_fields_by_headers = _read_existing_fields(resource_id) if existing: @@ -393,62 +385,70 @@ def strip_white_space_iter(): yield row return strip_white_space_iter - save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} - try: - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - skip_rows=skip_rows) as stream: - stream.iter = _make_whitespace_stripping_iter(stream.iter) - stream.save(**save_args) - except (EncodingError, UnicodeDecodeError): - with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, - skip_rows=skip_rows) as stream: - stream.iter = _make_whitespace_stripping_iter(stream.iter) - stream.save(**save_args) - csv_filepath = f_write.name - - # Create table - from ckan import model - context = {'model': model, 'ignore_auth': True} - data_dict = dict( - resource_id=resource_id, - fields=fields, - ) - data_dict['records'] = None # just create an empty table - data_dict['force'] = True # TODO check this - I don't fully - # understand read-only/datastore resources + # encoding (and line ending?)- use chardet + # It is easier to reencode it as UTF8 than convert the name of the encoding + # to one that pgloader will understand. + logger.info('Ensuring character coding is UTF8') + f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) try: - p.toolkit.get_action('datastore_create')(context, data_dict) - except p.toolkit.ValidationError as e: - if 'fields' in e.error_dict: - # e.g. {'message': None, 'error_dict': {'fields': [u'"***" is not a valid field name']}, '_error_summary': None} # noqa - error_message = e.error_dict['fields'][0] - raise LoaderError('Error with field definition: {}' - .format(error_message)) - else: - raise LoaderError( - 'Validation error when creating the database table: {}' - .format(str(e))) - except Exception as e: - raise LoaderError('Could not create the database table: {}' - .format(e)) + save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} + try: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + except (EncodingError, UnicodeDecodeError): + with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + csv_filepath = f_write.name + + # Create table + from ckan import model + context = {'model': model, 'ignore_auth': True} + data_dict = dict( + resource_id=resource_id, + fields=fields, + ) + data_dict['records'] = None # just create an empty table + data_dict['force'] = True # TODO check this - I don't fully + # understand read-only/datastore resources + try: + p.toolkit.get_action('datastore_create')(context, data_dict) + except p.toolkit.ValidationError as e: + if 'fields' in e.error_dict: + # e.g. {'message': None, 'error_dict': {'fields': [u'"***" is not a valid field name']}, '_error_summary': None} # noqa + error_message = e.error_dict['fields'][0] + raise LoaderError('Error with field definition: {}' + .format(error_message)) + else: + raise LoaderError( + 'Validation error when creating the database table: {}' + .format(str(e))) + except Exception as e: + raise LoaderError('Could not create the database table: {}' + .format(e)) - # datastore_active is switched on by datastore_create - # TODO temporarily disable it until the load is complete + # datastore_active is switched on by datastore_create + # TODO temporarily disable it until the load is complete - engine = get_write_engine() - with engine.begin() as conn: - _disable_fulltext_trigger(conn, resource_id) + engine = get_write_engine() + with engine.begin() as conn: + _disable_fulltext_trigger(conn, resource_id) - with engine.begin() as conn: - context['connection'] = conn - _drop_indexes(context, data_dict, False) + with engine.begin() as conn: + context['connection'] = conn + _drop_indexes(context, data_dict, False) - logger.info('Copying to database...') + logger.info('Copying to database...') - # Copy file to datastore db, split to chunks. - max_size = config.get('ckanext.xloader.copy_chunk_size', 1024**3) - logger.debug('Using chunk size: %s bytes for resource %s', max_size, resource_id) - split_copy_by_size(csv_filepath, engine, logger, resource_id, headers, delimiter, int(max_size)) + # Copy file to datastore db, split to chunks. + max_size = config.get('ckanext.xloader.copy_chunk_size', 1024**3) + logger.debug('Using chunk size: %s bytes for resource %s', max_size, resource_id) + split_copy_by_size(csv_filepath, engine, logger, resource_id, headers, delimiter, int(max_size)) + finally: + cleanup_temp_file(f_write) logger.info('...copying done') diff --git a/ckanext/xloader/tests/test_chunks.py b/ckanext/xloader/tests/test_chunks.py index b8478602..460a7e70 100644 --- a/ckanext/xloader/tests/test_chunks.py +++ b/ckanext/xloader/tests/test_chunks.py @@ -4,7 +4,7 @@ import tempfile import logging from typing import Callable, List, Tuple, Any -from unittest.mock import patch, MagicMock +from unittest.mock import patch import csv import sqlalchemy.orm as orm @@ -97,7 +97,7 @@ def test_chunked_processing_large_file(self, Session: Any) -> None: with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy): with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file): # Load the CSV with chunked processing - fields = loader.load_csv( + loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", @@ -153,7 +153,7 @@ def test_small_file_no_chunking(self, Session: Any) -> None: with patch('ckanext.xloader.loader.split_copy_by_size', side_effect=mock_split_copy): with patch('ckanext.xloader.loader.copy_file', side_effect=mock_copy_file): - fields = loader.load_csv( + loader.load_csv( csv_filepath, resource_id=resource_id, mimetype="text/csv", @@ -166,4 +166,3 @@ def test_small_file_no_chunking(self, Session: Any) -> None: # Verify data loaded correctly records = self._get_records(Session, resource_id) assert len(records) == 6 # Known number of records in simple.csv - diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 880efef0..cba399e6 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,3 +1,5 @@ +# encoding: utf-8 + import pytest import io import os diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 9e022091..a442ffd6 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -5,6 +5,7 @@ import json import datetime import logging +import os import re from six import text_type as str, binary_type from urllib.parse import urlunparse, urlparse @@ -390,3 +391,11 @@ def datastore_resource_exists(resource_id): except tk.ObjectNotFound: return False return response or {'fields': []} + + +def cleanup_temp_file(tmp_file): + try: + tmp_file.close() + os.remove(tmp_file.name) + except FileNotFoundError: + pass From 8a2ce03ff403ddc2999df2ad13b4d94c906aea57 Mon Sep 17 00:00:00 2001 From: ThrawnCA Date: Mon, 2 Mar 2026 16:25:08 +1000 Subject: [PATCH 3/6] don't retry HTTP 500 as it typically indicates a deeper issue --- ckanext/xloader/jobs.py | 3 +-- ckanext/xloader/tests/test_jobs.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 63bfe0d4..b0e63dce 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -67,7 +67,6 @@ def is_retryable_error(error): Retryable HTTP status codes: - 408 Request Timeout - 429 Too Many Requests - - 500 Internal Server Error - 502 Bad Gateway - 503 Service Unavailable - 504 Gateway Timeout @@ -81,7 +80,7 @@ def is_retryable_error(error): :rtype: bool """ if isinstance(error, HTTPError): - retryable_status_codes = {408, 429, 500, 502, 503, 504, 507, 522, 524} + retryable_status_codes = {408, 429, 502, 503, 504, 507, 522, 524} return error.status_code in retryable_status_codes else: return True diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index cba399e6..88161737 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -217,7 +217,6 @@ def test_data_with_rq_job_timeout(self, cli, data): # Retryable HTTP errors (status codes from is_retryable_error) ("HTTPError_408", True), ("HTTPError_429", True), - ("HTTPError_500", True), ("HTTPError_502", True), ("HTTPError_503", True), ("HTTPError_504", True), @@ -228,6 +227,7 @@ def test_data_with_rq_job_timeout(self, cli, data): ("HTTPError_400", False), ("HTTPError_404", False), ("HTTPError_403", False), + ("HTTPError_500", False), # Other non-retryable errors (not in RETRYABLE_ERRORS) ("ValueError", False), ("TypeError", False), From c12b61be5d7c3ca84f2763461d1100bd62e39af7 Mon Sep 17 00:00:00 2001 From: ThrawnCA Date: Mon, 2 Mar 2026 16:25:28 +1000 Subject: [PATCH 4/6] gracefully handle extra cells added by Excel, #272 --- ckanext/xloader/parser.py | 2 +- ckanext/xloader/tests/test_loader.py | 12 ++++++++++++ ckanext/xloader/tests/test_parser.py | 29 ++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index d57b7313..936c2820 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -32,7 +32,7 @@ def convert_types(self, extended_rows): for cell_index, cell_value in enumerate(row): if cell_value is None: row[cell_index] = '' - if self.fields: + if self.fields and cell_index < len(self.fields): # only strip white space if strip_extra_white is True if self.fields[cell_index].get('info', {}).get('strip_extra_white', True) and isinstance(cell_value, six.text_type): cell_value = cell_value.strip() diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index c1ddacfb..2181a7a5 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -1522,6 +1522,18 @@ def test_with_extra_blank_cells(self, Session): ) assert len(self._get_records(Session, resource_id)) == 1 + def test_with_extra_blank_cells_data_only(self, Session): + csv_filepath = get_sample_filepath("extra_fields.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_table( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + assert len(self._get_records(Session, resource_id)) == 2 + def test_with_mixed_quotes(self, Session): csv_filepath = get_sample_filepath("sample_with_mixed_quotes.csv") resource = factories.Resource() diff --git a/ckanext/xloader/tests/test_parser.py b/ckanext/xloader/tests/test_parser.py index ac4047dd..e5f02b77 100644 --- a/ckanext/xloader/tests/test_parser.py +++ b/ckanext/xloader/tests/test_parser.py @@ -143,3 +143,32 @@ def test_yearfirst_dayfirst(self): 'Berkeley' ], ] + + def test_trailing_cells(self): + """ The parser should ignore the presence of extra cells beyond the header row. + These will be handled instead by the loader. + """ + csv_filepath = os.path.abspath( + os.path.join(os.path.dirname(__file__), "samples", "extra_fields.csv") + ) + with Stream(csv_filepath, format='csv', + post_parse=[TypeConverter().convert_types]) as stream: + assert stream.sample == [ + [ + 'col1', + 'col2', + 'col3' + ], + [ + 'value1', + 'value2', + 'value3', + '' + ], + [ + 'value4', + 'value5', + 'value6', + '' + ] + ] From 8391f0150a2796e2ec9a3429429c94373fd9e3a7 Mon Sep 17 00:00:00 2001 From: ThrawnCA Date: Mon, 9 Mar 2026 16:49:03 +1000 Subject: [PATCH 5/6] load config at plugin configure time instead of module load time, #270 --- ckanext/xloader/action.py | 4 +- ckanext/xloader/auth.py | 4 +- ckanext/xloader/jobs.py | 70 ++++++++++++++---------------- ckanext/xloader/plugin.py | 23 +++++++++- ckanext/xloader/tests/test_jobs.py | 6 +-- 5 files changed, 62 insertions(+), 45 deletions(-) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index 2c2b4f13..5d3c72bc 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -158,9 +158,9 @@ def xloader_submit(context, data_dict): 'queue_name': custom_queue, } } - if custom_queue not in jobs.DEFAULT_QUEUE_NAMES: + if custom_queue not in jobs.default_queue_names: # Don't automatically retry if it's a custom run - data['metadata']['tries'] = jobs.MAX_RETRIES + data['metadata']['tries'] = jobs.max_retries # Expand timeout for resources that have to be type-guessed timeout = config.get( diff --git a/ckanext/xloader/auth.py b/ckanext/xloader/auth.py index 3f4a66bc..ea201c03 100644 --- a/ckanext/xloader/auth.py +++ b/ckanext/xloader/auth.py @@ -1,6 +1,6 @@ from ckan import authz -from .jobs import DEFAULT_QUEUE_NAMES +from .jobs import default_queue_names import ckanext.datastore.logic.auth as auth @@ -8,7 +8,7 @@ def xloader_submit(context, data_dict): # only sysadmins can specify a custom processing queue custom_queue = data_dict.get('queue') - if custom_queue and custom_queue not in DEFAULT_QUEUE_NAMES: + if custom_queue and custom_queue not in default_queue_names: return authz.is_authorized('config_option_update', context, data_dict) return auth.datastore_auth(context, data_dict) diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 042c4761..08cd5bc0 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -30,19 +30,9 @@ log = logging.getLogger(__name__) -SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True)) -if not SSL_VERIFY: - requests.packages.urllib3.disable_warnings() - -DEFAULT_QUEUE_NAMES = config.get('ckanext.xloader.queue_names', DEFAULT_QUEUE_NAME).split() -MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9) -# Don't try Tabulator load on large files -MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10) -MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0) CHUNK_SIZE = 16 * 1024 # 16kb DOWNLOAD_TIMEOUT = 30 -MAX_RETRIES = int(config.get('ckanext.xloader.max_retries', 1)) RETRYABLE_ERRORS = ( errors.DeadlockDetected, errors.LockNotAvailable, @@ -50,10 +40,16 @@ HTTPError, XLoaderTimeoutError ) -# Retries can only occur in cases where the datastore entry exists, -# so use the standard timeout -RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600') -APITOKEN_HEADER_NAME = config.get('apitoken_header_name', 'Authorization') + +# Variables that are set from config values and must be available to all jobs +ssl_verify = None +max_content_length = None +max_type_guessing_length = None +max_excerpt_lines = None +max_retries = None +retried_job_timeout = None +apitoken_header_name = None +default_queue_names = DEFAULT_QUEUE_NAME.split() def is_retryable_error(error): @@ -107,17 +103,17 @@ def get_default_queue_name(package_id=None): By sending all jobs for a dataset to the same queue, lock conflicts are reduced. """ - if not DEFAULT_QUEUE_NAMES: + if not default_queue_names: return DEFAULT_QUEUE_NAME if not package_id: - return DEFAULT_QUEUE_NAMES[0] + return default_queue_names[0] # Pick a queue by taking the first character of the package name # and converting it into a numeric index to the list of queue names. # We don't want a proper hash function, because those tend to add # complications for the sake of (unnecessary) cryptographic strength. - queue_index = ord(package_id[0]) % len(DEFAULT_QUEUE_NAMES) - return DEFAULT_QUEUE_NAMES[queue_index] + queue_index = ord(package_id[0]) % len(default_queue_names) + return default_queue_names[queue_index] def xloader_data_into_datastore(input): @@ -206,7 +202,7 @@ def handle_retryable_error(e, input, job_id, job_dict, logger, error_state): """ if isinstance(e, RETRYABLE_ERRORS) and is_retryable_error(e): tries = job_dict['metadata'].get('tries', 0) - if tries < MAX_RETRIES: + if tries < max_retries: tries = tries + 1 log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) logger.info("Job failed due to temporary error [%s], retrying", e) @@ -217,7 +213,7 @@ def handle_retryable_error(e, input, job_id, job_dict, logger, error_state): [input], title="retry xloader_data_into_datastore: resource: {} attempt {}".format( job_dict['metadata']['resource_id'], tries), - rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) + rq_kwargs=dict(timeout=retried_job_timeout) ) return True db.mark_job_as_errored( @@ -327,7 +323,7 @@ def tabulator_load(): config.get('ckanext.xloader.use_type_guessing', config.get( 'ckanext.xloader.just_load_with_messytables', False))) \ and not datastore_resource_exists(resource['id']) \ - and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH + and os.path.getsize(tmp_file.name) <= max_type_guessing_length logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: if use_type_guessing: @@ -345,8 +341,8 @@ def tabulator_load(): logger.info('Trying again with tabulator') tabulator_load() except JobTimeoutException: - logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) - raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) + logger.warning('Job timed out after %ss', retried_job_timeout) + raise JobError('Job timed out after {}s'.format(retried_job_timeout)) except FileCouldNotBeLoadedError as e: logger.warning('Loading excerpt for this format not supported.') logger.error('Loading file raised an error: %s', e) @@ -365,8 +361,8 @@ def _download_resource_data(resource, data, api_key, logger): :param api_key: CKAN api key - needed to obtain resources that are private :param logger: - If the download is bigger than MAX_CONTENT_LENGTH then it just downloads a - excerpt (of MAX_EXCERPT_LINES) for preview, and flags it by setting + If the download is bigger than max_content_length then it just downloads a + excerpt (of max_excerpt_lines) for preview, and flags it by setting data['datastore_contains_all_records_of_source_file'] = False which will be saved to the resource later on. ''' @@ -391,7 +387,7 @@ def _download_resource_data(resource, data, api_key, logger): if resource.get('url_type') == 'upload': # If this is an uploaded file to CKAN, authenticate the request, # otherwise we won't get file from private resources - headers[APITOKEN_HEADER_NAME] = api_key + headers[apitoken_header_name] = api_key # Add a constantly changing parameter to bypass URL caching. # If we're running XLoader, then either the resource has @@ -406,14 +402,14 @@ def _download_resource_data(resource, data, api_key, logger): response = get_response(download_url, headers) cl = response.headers.get('content-length') - if cl and int(cl) > MAX_CONTENT_LENGTH: + if cl and int(cl) > max_content_length: response.close() raise DataTooBigError() # download the file to a tempfile on disk for chunk in response.iter_content(CHUNK_SIZE): length += len(chunk) - if length > MAX_CONTENT_LENGTH: + if length > max_content_length: raise DataTooBigError tmp_file.write(chunk) m.update(chunk) @@ -424,13 +420,13 @@ def _download_resource_data(resource, data, api_key, logger): cleanup_temp_file(tmp_file) message = 'Data too large to load into Datastore: ' \ '{cl} bytes > max {max_cl} bytes.' \ - .format(cl=cl or length, max_cl=MAX_CONTENT_LENGTH) + .format(cl=cl or length, max_cl=max_content_length) logger.warning(message) - if MAX_EXCERPT_LINES <= 0: + if max_excerpt_lines <= 0: raise JobError(message) logger.info('Loading excerpt of ~{max_lines} lines to ' 'DataStore.' - .format(max_lines=MAX_EXCERPT_LINES)) + .format(max_lines=max_excerpt_lines)) tmp_file = get_tmp_file(url) response = get_response(url, headers) length = 0 @@ -441,7 +437,7 @@ def _download_resource_data(resource, data, api_key, logger): m.update(line) length += len(line) line_count += 1 - if length > MAX_CONTENT_LENGTH or line_count >= MAX_EXCERPT_LINES: + if length > max_content_length or line_count >= max_excerpt_lines: break response.close() data['datastore_contains_all_records_of_source_file'] = False @@ -469,8 +465,8 @@ def _download_resource_data(resource, data, api_key, logger): request_url=url, response=None) except JobTimeoutException: cleanup_temp_file(tmp_file) - logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) - raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) + logger.warning('Job timed out after %ss', retried_job_timeout) + raise JobError('Job timed out after {}s'.format(retried_job_timeout)) logger.info('Downloaded ok - %s', printable_file_size(length)) file_hash = m.hexdigest() @@ -481,7 +477,7 @@ def _download_resource_data(resource, data, api_key, logger): def get_response(url, headers): def get_url(): kwargs = {'headers': headers, 'timeout': DOWNLOAD_TIMEOUT, - 'verify': SSL_VERIFY, 'stream': True} # just gets the headers for now + 'verify': ssl_verify, 'stream': True} # just gets the headers for now if 'ckan.download_proxy' in config: proxy = config.get('ckan.download_proxy') kwargs['proxies'] = {'http': proxy, 'https': proxy} @@ -537,14 +533,14 @@ def callback_xloader_hook(result_url, api_key, job_dict): if ':' in api_key: header, key = api_key.split(':') else: - header, key = APITOKEN_HEADER_NAME, api_key + header, key = apitoken_header_name, api_key headers[header] = key try: result = requests.post( modify_input_url(result_url), # modify with local config data=json.dumps(job_dict, cls=DatetimeJsonEncoder), - verify=SSL_VERIFY, + verify=ssl_verify, headers=headers) except requests.ConnectionError: return False diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 91bf87ab..c8c95508 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -2,14 +2,16 @@ import logging +import requests from ckan import plugins from ckan.plugins import toolkit from ckan.model.domain_object import DomainObjectOperation from ckan.model.resource import Resource -from . import action, auth, helpers as xloader_helpers, utils +from . import action, auth, helpers as xloader_helpers, jobs, utils from ckanext.xloader.utils import XLoaderFormats +from ckan.lib.jobs import DEFAULT_QUEUE_NAME try: from ckanext.validation.interfaces import IPipeValidation @@ -70,6 +72,25 @@ def configure(self, config_): else: self.ignore_hash = False + # Set config values that need to be available to all jobs, without loading + # the config itself in every single job + jobs.ssl_verify = toolkit.asbool(config_.get('ckanext.xloader.ssl_verify', True)) + if not jobs.ssl_verify: + requests.packages.urllib3.disable_warnings() + + jobs.max_content_length = int(config_.get('ckanext.xloader.max_content_length') or 1e9) + # Don't try Tabulator load on large files + jobs.max_type_guessing_length = int( + config_.get('ckanext.xloader.max_type_guessing_length') or jobs.max_content_length / 10 + ) + jobs.max_excerpt_lines = int(config_.get('ckanext.xloader.max_excerpt_lines') or 0) + jobs.max_retries = int(config_.get('ckanext.xloader.max_retries', 1)) + # Retries can only occur in cases where the datastore entry exists, + # so use the standard timeout + jobs.retried_job_timeout = config_.get('ckanext.xloader.job_timeout', '3600') + jobs.apitoken_header_name = config_.get('apitoken_header_name', 'Authorization') + jobs.default_queue_names = config_.get('ckanext.xloader.queue_names', DEFAULT_QUEUE_NAME).split() + # IPipeValidation def receive_validation_report(self, validation_report): diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 5731fa63..fd71af76 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -188,12 +188,12 @@ def test_data_too_big_error_if_content_length_bigger_than_config(self, cli, data stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output assert "Data too large to load into Datastore:" in stdout + @pytest.mark.ckan_config("ckanext.xloader.max_excerpt_lines", 1) def test_data_max_excerpt_lines_config(self, cli, data): self.enqueue(jobs.xloader_data_into_datastore, [data]) with mock.patch("ckanext.xloader.jobs.get_response", get_large_response): - with mock.patch("ckanext.xloader.jobs.MAX_EXCERPT_LINES", 1): - stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output - assert "Loading excerpt of ~1 lines to DataStore." in stdout + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Loading excerpt of ~1 lines to DataStore." in stdout resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] is False From c92d51d5e6a2328b728e4528230849d96a56fd82 Mon Sep 17 00:00:00 2001 From: antuarc Date: Thu, 28 May 2026 11:35:23 +1000 Subject: [PATCH 6/6] don't fail the build if coverage reports are unavailable - Orgoro coverage action needs extra permissions, but shouldn't be mandatory --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e678af9f..d3d4156e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -107,7 +107,7 @@ jobs: - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v5 if: ${{ !cancelled() }} - continue-on-error: ${{ matrix.experimental }} + continue-on-error: true with: token: ${{ secrets.CODECOV_TOKEN }} verbose: false # optional (default = false) @@ -115,7 +115,7 @@ jobs: - name: Get Cover uses: orgoro/coverage@3f13a558c5af7376496aa4848bf0224aead366ac # v3.2 if: ${{ !cancelled() }} - continue-on-error: ${{ matrix.experimental }} + continue-on-error: true with: coverageFile: coverage.xml token: ${{ secrets.GITHUB_TOKEN }} @@ -123,7 +123,7 @@ jobs: - name: Upload test results to Codecov uses: codecov/test-results-action@v1 if: ${{ !cancelled() }} - continue-on-error: ${{ matrix.experimental }} + continue-on-error: true with: token: ${{ secrets.CODECOV_TOKEN }} verbose: false # optional (default = false)