diff --git a/ckanext/xloader/interfaces.py b/ckanext/xloader/interfaces.py index cdecf52a..36bd61fb 100644 --- a/ckanext/xloader/interfaces.py +++ b/ckanext/xloader/interfaces.py @@ -47,3 +47,47 @@ def after_upload(self, context, resource_dict, dataset_dict): the resource that was uploaded """ pass + + def datastore_before_update(self, resource_id, existing_fields, new_headers): + """ Called by the loader just before it is about to modify the + DataStore table for a resource (truncate, drop+recreate, or create). + It allows plugins to inspect the difference between the current + DataStore columns and the ones detected in the incoming file, for + example to log an activity when columns are added, removed or + renamed. + + Both ``existing_fields`` and ``new_headers`` are lists of field + dicts that contain at least an ``id`` key, so a plugin can compute + the diff symmetrically:: + + old_ids = {f['id'] for f in existing_fields or []} + new_ids = {h['id'] for h in new_headers} + added = new_ids - old_ids + removed = old_ids - new_ids + + :param resource_id: the ID of the resource whose DataStore table is + about to be updated. + :type resource_id: string + + :param existing_fields: the current columns of the DataStore table + (the internal ``_id`` column is excluded), or ``None`` if the + DataStore table does not yet exist. Each dict contains at least + ``id`` and ``type`` and may include ``info`` for fields with a + Data Dictionary entry. + :type existing_fields: list of dicts or None + + :param new_headers: the list of field dicts that will be written to + the DataStore. Each dict has at least an ``id`` and ``type`` + key, and may include an ``info`` dict for fields that already + existed. + :type new_headers: list of dicts + + The ``new_headers`` param is the same list the loader will use after the + hook returns, so mutating it (adding, removing, renaming fields) + will affect the subsequent DataStore operation. + The ``existing_fields`` is a snapshot with the internal ``_id`` column + excluded and should be treated as read-only. + + The return value is ignored. + """ + pass diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 76a4a8d9..d48aeefd 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -18,10 +18,28 @@ import ckan.plugins as p +from .interfaces import IXloader from .job_exceptions import FileCouldNotBeLoadedError, LoaderError from .parser import CSV_SAMPLE_LINES, TypeConverter from .utils import cleanup_temp_file, datastore_resource_exists, headers_guess, type_guess + +def _notify_datastore_before_update(resource_id, existing_fields, new_headers): + """Notify IXloader plugins that the DataStore table for ``resource_id`` + is about to change. See ``IXloader.datastore_before_update``. + + The internal ``_id`` column is stripped from ``existing_fields`` so + consumers only see user-visible columns. + """ + if existing_fields is not None: + existing_fields = [f for f in existing_fields if f.get('id') != '_id'] + for plugin in p.PluginImplementations(IXloader): + plugin.datastore_before_update( + resource_id=resource_id, + existing_fields=existing_fields, + new_headers=new_headers, + ) + from ckan.plugins.toolkit import config import ckanext.datastore.backend.postgres as datastore_db @@ -356,11 +374,21 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing ''' fields_match = _fields_match(fields, existing_fields, logger) if fields_match == FieldMatch.EXACT_MATCH: + _notify_datastore_before_update( + resource_id=resource_id, + existing_fields=existing_fields, + new_headers=fields, + ) logger.info('Clearing records for "%s" from DataStore.', resource_id) _clear_datastore_resource(resource_id) else: logger.info('Deleting "%s" from DataStore.', resource_id) delete_datastore_resource(resource_id) + _notify_datastore_before_update( + resource_id=resource_id, + existing_fields=existing_fields, + new_headers=fields, + ) # if file structure has changed, # and it wasn't just from a Data Dictionary override, # then we need to re-guess types @@ -372,6 +400,11 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing 'type': 'text', 'strip_extra_white': True} for header_name in headers] + _notify_datastore_before_update( + resource_id=resource_id, + existing_fields=None, + new_headers=fields, + ) logger.info('Fields: %s', fields) @@ -590,6 +623,11 @@ def row_iterator(): Otherwise 'datastore_create' will append to the existing datastore. And if the fields have significantly changed, it may also fail. ''' + _notify_datastore_before_update( + resource_id=resource_id, + existing_fields=existing_fields, + new_headers=headers_dicts, + ) if existing: if _fields_match(headers_dicts, existing_fields, logger) == FieldMatch.EXACT_MATCH: logger.info('Clearing records for "%s" from DataStore.', resource_id) diff --git a/ckanext/xloader/tests/samples/simple2.csv b/ckanext/xloader/tests/samples/simple2.csv new file mode 100644 index 00000000..28a9f5dd --- /dev/null +++ b/ckanext/xloader/tests/samples/simple2.csv @@ -0,0 +1,7 @@ +date,temperature,city +2011-01-01,1,Galway +2011-01-02,-1,Galway +2011-01-03,0,Galway +2011-01-01,6,Berkeley +2011-01-02,8,Berkeley +2011-01-03,5,Berkeley diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index 2181a7a5..16aaecbb 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -2,6 +2,7 @@ from __future__ import print_function from __future__ import absolute_import import os +from unittest import mock import pytest import six import sqlalchemy as sa @@ -1094,6 +1095,106 @@ def test_shapefile_zip_python3(self, Session): ] +class TestDatastoreBeforeUpdateHook(TestLoadBase): + """ Verify that loader.load_csv / loader.load_table invoke + IXloader.datastore_before_update with the documented payload shape. + """ + + def test_fires_on_new_table(self): + """ First load of a resource: no prior DataStore table, so + existing_fields is None and new_headers reflects the file columns. + """ + resource = factories.Resource() + with mock.patch.object(loader, '_notify_datastore_before_update') as notify: + loader.load_csv( + get_sample_filepath("simple.csv"), + resource_id=resource['id'], + mimetype="text/csv", + logger=logger, + ) + + notify.assert_called_once() + kwargs = notify.call_args.kwargs + assert kwargs['resource_id'] == resource['id'] + assert not kwargs['existing_fields'] + assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place'] + + def test_fires_on_reload_same_columns(self): + """ Reload with the same file: existing_fields and new_headers + expose the same column ids, so a consumer computing + ``set(existing) ^ set(new)`` sees no diff. + """ + resource = factories.Resource() + resource_id = resource['id'] + loader.load_csv( + get_sample_filepath("simple.csv"), + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + with mock.patch.object(loader, '_notify_datastore_before_update') as notify: + loader.load_csv( + get_sample_filepath("simple.csv"), + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + notify.assert_called_once() + kwargs = notify.call_args.kwargs + assert kwargs['resource_id'] == resource_id + assert [f['id'] for f in kwargs['existing_fields']] == ['date', 'temperature', 'place'] + assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place'] + + def test_fires_on_reload_with_changed_columns(self): + """ Reload the resource with a renamed column (place -> city) and + verify the hook exposes both sides BEFORE the DataStore is updated, + so downstream plugins can diff them and log a 'columns changed' + activity. + """ + resource = factories.Resource() + resource_id = resource['id'] + loader.load_csv( + get_sample_filepath("simple.csv"), + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + with mock.patch.object(loader, '_notify_datastore_before_update') as notify: + loader.load_csv( + get_sample_filepath("simple2.csv"), + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + notify.assert_called_once() + kwargs = notify.call_args.kwargs + assert kwargs['resource_id'] == resource_id + + old_ids = [f['id'] for f in kwargs['existing_fields']] + new_ids = [h['id'] for h in kwargs['new_headers']] + assert old_ids == ['date', 'temperature', 'place'] + assert new_ids == ['date', 'temperature', 'city'] + + def test_fires_for_load_table(self): + resource = factories.Resource() + with mock.patch.object(loader, '_notify_datastore_before_update') as notify: + loader.load_table( + get_sample_filepath("simple.xls"), + resource_id=resource['id'], + mimetype="xls", + logger=logger, + ) + + notify.assert_called_once() + kwargs = notify.call_args.kwargs + assert kwargs['resource_id'] == resource['id'] + assert not kwargs['existing_fields'] + assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place'] + + class TestLoadTabulator(TestLoadBase): def test_simple(self, Session): csv_filepath = get_sample_filepath("simple.xls")