Skip to content
44 changes: 44 additions & 0 deletions ckanext/xloader/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,47 @@ def after_upload(self, context, resource_dict, dataset_dict):
the resource that was uploaded
"""
pass

def datastore_before_update(self, resource_id, existing_fields, new_headers):
""" Called by the loader just before it is about to modify the
DataStore table for a resource (truncate, drop+recreate, or create).
It allows plugins to inspect the difference between the current
DataStore columns and the ones detected in the incoming file, for
example to log an activity when columns are added, removed or
renamed.

Both ``existing_fields`` and ``new_headers`` are lists of field
dicts that contain at least an ``id`` key, so a plugin can compute
the diff symmetrically::

old_ids = {f['id'] for f in existing_fields or []}
new_ids = {h['id'] for h in new_headers}
added = new_ids - old_ids
removed = old_ids - new_ids

:param resource_id: the ID of the resource whose DataStore table is
about to be updated.
:type resource_id: string

:param existing_fields: the current columns of the DataStore table
(the internal ``_id`` column is excluded), or ``None`` if the
DataStore table does not yet exist. Each dict contains at least
``id`` and ``type`` and may include ``info`` for fields with a
Data Dictionary entry.
:type existing_fields: list of dicts or None

:param new_headers: the list of field dicts that will be written to
the DataStore. Each dict has at least an ``id`` and ``type``
key, and may include an ``info`` dict for fields that already
existed.
:type new_headers: list of dicts

The ``new_headers`` param is the same list the loader will use after the
hook returns, so mutating it (adding, removing, renaming fields)
will affect the subsequent DataStore operation.
The ``existing_fields`` is a snapshot with the internal ``_id`` column
excluded and should be treated as read-only.

The return value is ignored.
Comment thread
avdata99 marked this conversation as resolved.
"""
pass
38 changes: 38 additions & 0 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,28 @@

import ckan.plugins as p

from .interfaces import IXloader
from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import CSV_SAMPLE_LINES, TypeConverter
from .utils import cleanup_temp_file, datastore_resource_exists, headers_guess, type_guess


def _notify_datastore_before_update(resource_id, existing_fields, new_headers):
"""Notify IXloader plugins that the DataStore table for ``resource_id``
is about to change. See ``IXloader.datastore_before_update``.

The internal ``_id`` column is stripped from ``existing_fields`` so
consumers only see user-visible columns.
"""
if existing_fields is not None:
existing_fields = [f for f in existing_fields if f.get('id') != '_id']
for plugin in p.PluginImplementations(IXloader):
plugin.datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=new_headers,
)

from ckan.plugins.toolkit import config

import ckanext.datastore.backend.postgres as datastore_db
Expand Down Expand Up @@ -356,11 +374,21 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing
'''
fields_match = _fields_match(fields, existing_fields, logger)
if fields_match == FieldMatch.EXACT_MATCH:
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=fields,
)
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
logger.info('Deleting "%s" from DataStore.', resource_id)
delete_datastore_resource(resource_id)
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=fields,
)
# if file structure has changed,
# and it wasn't just from a Data Dictionary override,
# then we need to re-guess types
Expand All @@ -372,6 +400,11 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing
'type': 'text',
'strip_extra_white': True}
for header_name in headers]
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=None,
new_headers=fields,
)

logger.info('Fields: %s', fields)

Expand Down Expand Up @@ -590,6 +623,11 @@ def row_iterator():
Otherwise 'datastore_create' will append to the existing datastore.
And if the fields have significantly changed, it may also fail.
'''
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=headers_dicts,
)
if existing:
if _fields_match(headers_dicts, existing_fields, logger) == FieldMatch.EXACT_MATCH:
logger.info('Clearing records for "%s" from DataStore.', resource_id)
Expand Down
7 changes: 7 additions & 0 deletions ckanext/xloader/tests/samples/simple2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
date,temperature,city
2011-01-01,1,Galway
2011-01-02,-1,Galway
2011-01-03,0,Galway
2011-01-01,6,Berkeley
2011-01-02,8,Berkeley
2011-01-03,5,Berkeley
101 changes: 101 additions & 0 deletions ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function
from __future__ import absolute_import
import os
from unittest import mock
import pytest
import six
import sqlalchemy as sa
Expand Down Expand Up @@ -1094,6 +1095,106 @@ def test_shapefile_zip_python3(self, Session):
]


class TestDatastoreBeforeUpdateHook(TestLoadBase):
""" Verify that loader.load_csv / loader.load_table invoke
IXloader.datastore_before_update with the documented payload shape.
"""

def test_fires_on_new_table(self):
""" First load of a resource: no prior DataStore table, so
existing_fields is None and new_headers reflects the file columns.
"""
resource = factories.Resource()
with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource['id'],
mimetype="text/csv",
logger=logger,
)

notify.assert_called_once()
kwargs = notify.call_args.kwargs
assert kwargs['resource_id'] == resource['id']
assert not kwargs['existing_fields']
assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place']

def test_fires_on_reload_same_columns(self):
""" Reload with the same file: existing_fields and new_headers
expose the same column ids, so a consumer computing
``set(existing) ^ set(new)`` sees no diff.
"""
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

notify.assert_called_once()
kwargs = notify.call_args.kwargs
assert kwargs['resource_id'] == resource_id
assert [f['id'] for f in kwargs['existing_fields']] == ['date', 'temperature', 'place']
assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place']

def test_fires_on_reload_with_changed_columns(self):
""" Reload the resource with a renamed column (place -> city) and
verify the hook exposes both sides BEFORE the DataStore is updated,
so downstream plugins can diff them and log a 'columns changed'
activity.
"""
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple2.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)
notify.assert_called_once()
kwargs = notify.call_args.kwargs
assert kwargs['resource_id'] == resource_id

old_ids = [f['id'] for f in kwargs['existing_fields']]
new_ids = [h['id'] for h in kwargs['new_headers']]
assert old_ids == ['date', 'temperature', 'place']
assert new_ids == ['date', 'temperature', 'city']

def test_fires_for_load_table(self):
resource = factories.Resource()
with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_table(
get_sample_filepath("simple.xls"),
resource_id=resource['id'],
mimetype="xls",
logger=logger,
)

notify.assert_called_once()
kwargs = notify.call_args.kwargs
assert kwargs['resource_id'] == resource['id']
assert not kwargs['existing_fields']
assert [h['id'] for h in kwargs['new_headers']] == ['date', 'temperature', 'place']


class TestLoadTabulator(TestLoadBase):
def test_simple(self, Session):
csv_filepath = get_sample_filepath("simple.xls")
Expand Down
Loading