Skip to content
47 changes: 47 additions & 0 deletions ckanext/xloader/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,50 @@ def after_upload(self, context, resource_dict, dataset_dict):
the resource that was uploaded
"""
pass

def datastore_before_update(self, resource_id, existing_fields, new_headers):
""" Called by the loader just before it is about to modify the
DataStore table for a resource (truncate, drop+recreate, or create).
It allows plugins to inspect the difference between the current
DataStore columns and the ones detected in the incoming file, for
example to log an activity when columns are added, removed or
renamed.

Both ``existing_fields`` and ``new_headers`` are lists of field
dicts that contain at least an ``id`` key, so a plugin can compute
the diff symmetrically::

old_ids = {f['id'] for f in existing_fields or []}
new_ids = {h['id'] for h in new_headers}
added = new_ids - old_ids
removed = old_ids - new_ids

:param resource_id: the ID of the resource whose DataStore table is
about to be updated.
:type resource_id: string

:param existing_fields: the current columns of the DataStore table
(the internal ``_id`` column is excluded), or ``None`` if the
DataStore table does not yet exist. Each dict contains at least
``id`` and ``type`` and may include ``info`` for fields with a
Data Dictionary entry.
:type existing_fields: list of dicts or None

:param new_headers: the list of field dicts that will be written to
the DataStore. Each dict has at least an ``id`` and ``type``
key, and may include an ``info`` dict for fields that already
existed.
:type new_headers: list of dicts

.. warning::

The ``existing_fields`` and ``new_headers`` lists are the

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the existing_fields object is the same after removing _id.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated here 6e13330

same objects that the loader will use after this hook returns.
Mutating them (e.g. adding, removing or renaming fields) will
affect the subsequent DataStore operation. This hook is
intended for **read-only observation**; modify the lists only
if you fully understand the downstream consequences.

The return value is ignored.
Comment thread
avdata99 marked this conversation as resolved.
"""
pass
38 changes: 38 additions & 0 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,28 @@

import ckan.plugins as p

from .interfaces import IXloader
from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import CSV_SAMPLE_LINES, TypeConverter
from .utils import cleanup_temp_file, datastore_resource_exists, headers_guess, type_guess


def _notify_datastore_before_update(resource_id, existing_fields, new_headers):
"""Notify IXloader plugins that the DataStore table for ``resource_id``
is about to change. See ``IXloader.datastore_before_update``.

The internal ``_id`` column is stripped from ``existing_fields`` so
consumers only see user-visible columns.
"""
if existing_fields is not None:
existing_fields = [f for f in existing_fields if f.get('id') != '_id']
for plugin in p.PluginImplementations(IXloader):
plugin.datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=new_headers,
)

from ckan.plugins.toolkit import config

import ckanext.datastore.backend.postgres as datastore_db
Expand Down Expand Up @@ -356,6 +374,11 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing
'''
fields_match = _fields_match(fields, existing_fields, logger)
if fields_match == FieldMatch.EXACT_MATCH:
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=fields,
)
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
Expand All @@ -366,12 +389,22 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing
# then we need to re-guess types
if allow_type_guessing and fields_match == FieldMatch.MISMATCH:
raise LoaderError("File structure has changed, reverting to Tabulator")
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=fields,
)
else:
fields = [
{'id': header_name,
'type': 'text',
'strip_extra_white': True}
for header_name in headers]
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=None,
new_headers=fields,
)

logger.info('Fields: %s', fields)

Expand Down Expand Up @@ -590,6 +623,11 @@ def row_iterator():
Otherwise 'datastore_create' will append to the existing datastore.
And if the fields have significantly changed, it may also fail.
'''
_notify_datastore_before_update(
resource_id=resource_id,
existing_fields=existing_fields,
new_headers=headers_dicts,
)
if existing:
if _fields_match(headers_dicts, existing_fields, logger) == FieldMatch.EXACT_MATCH:
logger.info('Clearing records for "%s" from DataStore.', resource_id)
Expand Down
7 changes: 7 additions & 0 deletions ckanext/xloader/tests/samples/simple2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
date,temperature,city
2011-01-01,1,Galway
2011-01-02,-1,Galway
2011-01-03,0,Galway
2011-01-01,6,Berkeley
2011-01-02,8,Berkeley
2011-01-03,5,Berkeley
102 changes: 102 additions & 0 deletions ckanext/xloader/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import print_function
from __future__ import absolute_import
import os
from unittest import mock
import pytest
import six
import sqlalchemy as sa
Expand Down Expand Up @@ -1094,6 +1095,107 @@ def test_shapefile_zip_python3(self, Session):
]


class TestDatastoreBeforeUpdateHook(TestLoadBase):
""" Verify that loader.load_csv / loader.load_table invoke
IXloader.datastore_before_update with the documented payload shape.
"""

def test_fires_on_new_table(self):
""" First load of a resource: no prior DataStore table, so
existing_fields is None and new_headers reflects the file columns.
"""
resource = factories.Resource()
with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource['id'],
mimetype="text/csv",
logger=logger,
)

notify.assert_called_once()
resource_id, existing_fields, new_headers = notify.call_args.args
assert resource_id == resource['id']
assert not existing_fields
assert [h['id'] for h in new_headers] == ['date', 'temperature', 'place']

def test_fires_on_reload_same_columns(self):
""" Reload with the same file: existing_fields and new_headers
expose the same column ids, so a consumer computing
``set(existing) ^ set(new)`` sees no diff.
"""
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

notify.assert_called_once()
called_resource_id, existing_fields, new_headers = notify.call_args.args
assert called_resource_id == resource_id
assert [f['id'] for f in existing_fields] == ['date', 'temperature', 'place']
assert [h['id'] for h in new_headers] == ['date', 'temperature', 'place']

def test_fires_on_reload_with_changed_columns(self):
""" Reload the resource with a renamed column (place -> city) and
verify the hook exposes both sides BEFORE the DataStore is updated,
so downstream plugins can diff them and log a 'columns changed'
activity.
"""
resource = factories.Resource()
resource_id = resource['id']
loader.load_csv(
get_sample_filepath("simple.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_csv(
get_sample_filepath("simple2.csv"),
resource_id=resource_id,
mimetype="text/csv",
logger=logger,
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two blank lines?

@avdata99 avdata99 Apr 22, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line removed here
b6a8b2e

notify.assert_called_once()
called_resource_id, existing_fields, new_headers = notify.call_args.args
assert called_resource_id == resource_id

old_ids = [f['id'] for f in existing_fields]
new_ids = [h['id'] for h in new_headers]
assert old_ids == ['date', 'temperature', 'place']
assert new_ids == ['date', 'temperature', 'city']

def test_fires_for_load_table(self):
resource = factories.Resource()
with mock.patch.object(loader, '_notify_datastore_before_update') as notify:
loader.load_table(
get_sample_filepath("simple.xls"),
resource_id=resource['id'],
mimetype="xls",
logger=logger,
)

notify.assert_called_once()
resource_id, existing_fields, new_headers = notify.call_args.args
assert resource_id == resource['id']
assert not existing_fields
assert [h['id'] for h in new_headers] == ['date', 'temperature', 'place']


class TestLoadTabulator(TestLoadBase):
def test_simple(self, Session):
csv_filepath = get_sample_filepath("simple.xls")
Expand Down
Loading