diff --git a/src/country_workspace/contrib/hope/push/orchestration.py b/src/country_workspace/contrib/hope/push/orchestration.py
index ea4cbe36..2928e875 100644
--- a/src/country_workspace/contrib/hope/push/orchestration.py
+++ b/src/country_workspace/contrib/hope/push/orchestration.py
@@ -13,7 +13,8 @@
)
from country_workspace.contrib.hope.exceptions import HopePushError
from country_workspace.exceptions import RemoteError, RemoteUnavailableError
-from country_workspace.models import AsyncJob, Rdp
+from country_workspace.models import AsyncJob, Household, Individual, Rdp
+from country_workspace.workspaces.models import CountryIndividual
from .config import CreateRdpConfig, PushWorkflowConfig
from .policy import ActionCheck, get_rdp_policy
@@ -60,6 +61,50 @@ def _deduplication_snapshot(status: DedupClientStatus | None) -> dict[str, Any]:
}
+def archive_removed_unique_values(rdp: Rdp, is_master_detail: bool) -> None:
+ """Persist unique-field values for records about to be marked as removed after a successful push."""
+ program = rdp.program
+ owner = selection_owner_for_rdp(rdp=rdp)
+
+ if is_master_detail:
+ hh_field = program.get_unique_field_for(Household)
+ ind_field = program.get_unique_field_for(Individual)
+ if not (hh_field or ind_field):
+ return
+ hh_pks = list(owner.households.filter(removed=False).values_list("pk", flat=True))
+ if not hh_pks:
+ return
+ if hh_field:
+ hh_values = owner.households.filter(pk__in=hh_pks).values_list(f"flex_fields__{hh_field}", flat=True)
+ program.add_removed_unique_values_for(Household, hh_values.iterator())
+ if ind_field:
+ ind_values = CountryIndividual.objects.filter(household_id__in=hh_pks, removed=False).values_list(
+ f"flex_fields__{ind_field}", flat=True
+ )
+ program.add_removed_unique_values_for(Individual, ind_values.iterator())
+ return
+
+ if ind_field := program.get_unique_field_for(Individual):
+ ind_values = owner.individuals.filter(removed=False).values_list(f"flex_fields__{ind_field}", flat=True)
+ program.add_removed_unique_values_for(Individual, ind_values.iterator())
+
+
+def steps(processor: PushProcessor, config: PushWorkflowConfig) -> Iterator[Callable[[], None]]:
+ """Yield the ordered workflow callables; each step appends errors to processor.total."""
+ pks = config["pks"]
+
+ yield processor.preflight
+ yield processor.rdi_create
+ if config["master_detail"]:
+ yield from (
+ partial(processor.run_with, qs_individuals_by_household_pks(pks), processor.rdi_push_individuals),
+ partial(processor.run_with, qs_households(pks=pks), processor.rdi_push_households),
+ )
+ else:
+ yield partial(processor.run_with, qs_individuals_by_pks(pks), processor.rdi_push_people)
+ yield processor.rdi_complete
+
+
def _save_current_deduplication_snapshot(*, rdp: Rdp, key: str) -> None:
status = get_rdp_policy(rdp).deduplication_status(rdp)
snapshot = _deduplication_snapshot(status)
@@ -318,6 +363,7 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
with transaction.atomic():
locked = lock_rdp_for_update(pk=rdp.pk)
+ archive_removed_unique_values(locked, config["master_detail"])
_mark_rdp_beneficiaries_removed(locked, config["master_detail"])
set_rdp_push_status(
rdp=locked,
diff --git a/src/country_workspace/models/program.py b/src/country_workspace/models/program.py
index 2a95d5ce..f16f8247 100644
--- a/src/country_workspace/models/program.py
+++ b/src/country_workspace/models/program.py
@@ -228,3 +228,73 @@ def apply_default_fields(self, m: type[Validable] | Validable, data: dict[str, A
if field_name not in data or data[field_name] is None:
data[field_name] = default_value
return data
+
+ def has_any_data(self) -> bool:
+ if not self.pk:
+ return False
+ from country_workspace.models import Batch
+
+ return Batch.objects.filter(program_id=self.pk).exists()
+
+ def get_unique_field_for(self, m: type[Validable] | Validable) -> str | None:
+ scope = self._scope_for(m).value
+ unique_fields = (self.system_fields or {}).get("unique_fields") or {}
+ value = unique_fields.get(scope)
+ return value if isinstance(value, str) and value.strip() else None
+
+ def save_unique_field_for(self, m: type[Validable] | Validable, field_name: str | None) -> None:
+ scope = self._scope_for(m).value
+ normalized = (field_name or "").strip() or None
+
+ system_fields = dict(self.system_fields or {})
+ unique_fields = dict(system_fields.get("unique_fields") or {})
+ removed_unique_values = dict(system_fields.get("removed_unique_values") or {})
+ scope_removed_values = dict(removed_unique_values.get(scope) or {})
+
+ if normalized is None:
+ unique_fields.pop(scope, None)
+ else:
+ unique_fields[scope] = normalized
+ scope_removed_values.setdefault(normalized, [])
+ removed_unique_values[scope] = scope_removed_values
+
+ system_fields["unique_fields"] = unique_fields
+ system_fields["removed_unique_values"] = removed_unique_values
+ self.system_fields = system_fields
+ self.save(update_fields=["system_fields"])
+
+ def get_removed_unique_values_for(self, m: type[Validable] | Validable) -> list[str]:
+ """Return archived unique values for configured scope+field."""
+ if not (field_name := self.get_unique_field_for(m)):
+ return []
+
+ scope = self._scope_for(m).value
+ removed_unique_values = (self.system_fields or {}).get("removed_unique_values") or {}
+ scope_values = removed_unique_values.get(scope) or {}
+ values = scope_values.get(field_name) or []
+ if not isinstance(values, list):
+ return []
+ return [str(value) for value in values if value is not None and str(value).strip()]
+
+ def add_removed_unique_values_for(self, m: type[Validable] | Validable, values: Iterable[Any]) -> None:
+ """Merge removed values for configured unique field in the given scope."""
+ if not (field_name := self.get_unique_field_for(m)):
+ return
+ scope = self._scope_for(m).value
+
+ normalized_values = {str(value).strip() for value in values if value is not None and str(value).strip()}
+ if not normalized_values:
+ return
+
+ system_fields = dict(self.system_fields or {})
+ removed_unique_values = dict(system_fields.get("removed_unique_values") or {})
+ scope_values = dict(removed_unique_values.get(scope) or {})
+ existing_values = scope_values.get(field_name) or []
+ existing_set = {str(value).strip() for value in existing_values if value is not None and str(value).strip()}
+
+ scope_values[field_name] = sorted(existing_set | normalized_values)
+ removed_unique_values[scope] = scope_values
+ system_fields["removed_unique_values"] = removed_unique_values
+
+ self.system_fields = system_fields
+ self.save(update_fields=["system_fields"])
diff --git a/src/country_workspace/workspaces/admin/cleaners/validate.py b/src/country_workspace/workspaces/admin/cleaners/validate.py
index 1285e617..49309240 100644
--- a/src/country_workspace/workspaces/admin/cleaners/validate.py
+++ b/src/country_workspace/workspaces/admin/cleaners/validate.py
@@ -7,6 +7,7 @@
from constance import config
from django.db.models import Model, QuerySet, Prefetch
from django.db.models.query import prefetch_related_objects
+from django.utils import timezone
from country_workspace.context import batch_ctx
from country_workspace.models import AsyncJob, Household, Individual, Program
@@ -14,6 +15,78 @@
from country_workspace.utils.imports import validate_alien_fields
logger = logging.getLogger(__name__)
+UNIQUE_VALIDATION_ERROR = "Value must be unique within the programme."
+ARCHIVED_UNIQUE_VALIDATION_ERROR = "Value must be unique and cannot match previously pushed records."
+
+
+def _normalize_unique_value(value: object) -> str | None:
+ normalized = str(value).strip() if value is not None else ""
+ return normalized or None
+
+
+def _append_unique_error(obj: Model, field_name: str, message: str) -> None:
+ errors = dict(getattr(obj, "errors", {}) or {})
+ current = errors.get(field_name) or []
+ if not isinstance(current, list):
+ current = [str(current)]
+ if message in current:
+ return
+ current.append(message)
+ errors[field_name] = current
+ obj.errors = errors
+ obj.last_checked = timezone.now()
+ obj.save(update_fields=["errors", "last_checked"])
+
+
+def _append_household_member_invalid_error(obj: Model) -> None:
+ errors = dict(getattr(obj, "errors", {}) or {})
+ details = errors.get("dct") or []
+ if not isinstance(details, list):
+ details = [str(details)]
+ marker = "Some members did not validate"
+ if marker in details:
+ return
+ details.append(marker)
+ errors["dct"] = details
+ obj.errors = errors
+ obj.last_checked = timezone.now()
+ obj.save(update_fields=["errors", "last_checked"])
+
+
+class UniqueValidationState:
+ def __init__(self, *, field_name: str, archived_values: set[str]) -> None:
+ self.field_name = field_name
+ self.archived_values = archived_values
+ self.seen_by_value: dict[str, Model] = {}
+
+ def validate(self, obj: Model) -> set[int]:
+ invalid_pks: set[int] = set()
+ flex_fields = getattr(obj, "flex_fields", {}) or {}
+ value = _normalize_unique_value(flex_fields.get(self.field_name))
+ if not value:
+ return invalid_pks
+
+ if value in self.archived_values:
+ _append_unique_error(obj, self.field_name, ARCHIVED_UNIQUE_VALIDATION_ERROR)
+ invalid_pks.add(obj.pk)
+ return invalid_pks
+
+ if previous := self.seen_by_value.get(value):
+ _append_unique_error(previous, self.field_name, UNIQUE_VALIDATION_ERROR)
+ _append_unique_error(obj, self.field_name, UNIQUE_VALIDATION_ERROR)
+ invalid_pks.add(previous.pk)
+ invalid_pks.add(obj.pk)
+ return invalid_pks
+
+ self.seen_by_value[value] = obj
+ return invalid_pks
+
+
+def _build_unique_state(program: Program, model: type[Model]) -> UniqueValidationState | None:
+ if not (field_name := program.get_unique_field_for(model)):
+ return None
+ archived_values = {value for value in program.get_removed_unique_values_for(model) if value}
+ return UniqueValidationState(field_name=field_name, archived_values=archived_values)
def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwargs: Any) -> dict[str, int]:
@@ -27,7 +100,9 @@ def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwarg
return {"valid": valid, "invalid": invalid}
with state.set(tenant=first.country_office, program=first.program):
+ unique_state = _build_unique_state(first.program, queryset.model)
if issubclass(queryset.model, Household):
+ individual_unique_state = _build_unique_state(first.program, Individual)
# Reverse-FK prefetch for Household.members; include forward FKs for Individuals
prefetch_members = Prefetch(
"members",
@@ -39,11 +114,17 @@ def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwarg
for chunk in batched(it, chunk_size):
# Populate members for all objects in this batch (no N+1 on members access).
prefetch_related_objects(chunk, prefetch_members)
- dv, di = _validate_and_count(chunk)
+ dv, di = _validate_and_count(
+ chunk,
+ unique_state=unique_state,
+ member_unique_state=individual_unique_state,
+ )
valid, invalid = valid + dv, invalid + di
else: # Individual
# Just stream.
- dv, di = _validate_and_count(queryset.iterator(chunk_size=chunk_size)) # stream rows from DB
+ dv, di = _validate_and_count(
+ queryset.iterator(chunk_size=chunk_size), unique_state=unique_state
+ ) # stream rows from DB
valid, invalid = valid + dv, invalid + di
except Exception as e: # pragma: no cover
@@ -53,21 +134,46 @@ def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwarg
return {"valid": valid, "invalid": invalid}
-def _validate_and_count(objs: Iterable[Model]) -> tuple[int, int]:
- valid = invalid = 0
+def _validate_and_count( # noqa: C901
+ objs: Iterable[Model],
+ unique_state: UniqueValidationState | None = None,
+ member_unique_state: UniqueValidationState | None = None,
+) -> tuple[int, int]:
+ total = 0
+ invalid_pks: set[int] = set()
+ member_household_by_member_pk: dict[int, int] = {}
aliens_checked = False
for obj in objs:
+ total += 1
if not aliens_checked:
validate_alien_fields(obj)
aliens_checked = True
with batch_ctx(obj.batch_id):
- if obj.validate_with_checker():
- valid += 1
- else:
- invalid += 1
-
+ if not obj.validate_with_checker():
+ invalid_pks.add(obj.pk)
+ if unique_state:
+ invalid_pks |= unique_state.validate(obj)
+ if member_unique_state and isinstance(obj, Household):
+ member_invalid = False
+ for member in obj.members.all():
+ member_household_by_member_pk[member.pk] = obj.pk
+ invalid_member_pks = member_unique_state.validate(member)
+ if not invalid_member_pks:
+ continue
+
+ member_invalid = True
+ for member_pk in invalid_member_pks:
+ if household_pk := member_household_by_member_pk.get(member_pk):
+ invalid_pks.add(household_pk)
+
+ if member_invalid:
+ invalid_pks.add(obj.pk)
+ _append_household_member_invalid_error(obj)
+
+ invalid = len(invalid_pks)
+ valid = total - invalid
return valid, invalid
diff --git a/src/country_workspace/workspaces/admin/program.py b/src/country_workspace/workspaces/admin/program.py
index d5b5849d..f36aa535 100644
--- a/src/country_workspace/workspaces/admin/program.py
+++ b/src/country_workspace/workspaces/admin/program.py
@@ -71,6 +71,19 @@ class SelectIndividualColumnsForm(SelectColumnsForm):
model_core_fields = [("name", "name"), ("id", "id"), ("household", "household")]
+class SelectUniqueFieldForm(forms.Form):
+ field = forms.ChoiceField(choices=(), required=True)
+
+ def __init__(self, *args: Any, checker: "DataChecker", **kwargs: Any) -> None:
+ super().__init__(*args, **kwargs)
+ checker_form_class = checker.get_form_class()
+ choices: list[tuple[str, str]] = []
+ for name, field in checker_form_class.base_fields.items():
+ label = getattr(field, "label", "") or name
+ choices.append((name, f"{label} ({name})"))
+ self.fields["field"].choices = choices
+
+
class ProgramForm(forms.ModelForm):
class Meta:
model = CountryProgram
@@ -398,6 +411,45 @@ def _set_defaults(
context["selected_fields"] = selected_fields
return render(request, "workspace/program/set_defaults.html", context)
+ @staticmethod
+ def _can_update_unique_field(program: CountryProgram) -> bool:
+ return not program.has_any_data()
+
+ def _set_unique_field(
+ self,
+ request: HttpRequest,
+ form_class: type[SelectUniqueFieldForm],
+ context: dict[str, Any],
+ ) -> HttpResponse:
+ program: CountryProgram = context["original"]
+ checker: "DataChecker" = context["checker"]
+ scope_model: type[Validable] = context["unique_scope_model"]
+
+ if not self._can_update_unique_field(program):
+ self.message_user(
+ request,
+ _("Unique field can only be changed before importing any programme data."),
+ level=messages.ERROR,
+ )
+ return HttpResponseRedirect(reverse("workspace:workspaces_countryprogram_change", args=[program.pk]))
+
+ initial_unique_field = program.get_unique_field_for(scope_model)
+ if request.method == "POST":
+ form = form_class(request.POST, checker=checker)
+ if form.is_valid():
+ program.save_unique_field_for(scope_model, form.cleaned_data["field"])
+ self.message_user(
+ request,
+ _("Unique field has been updated."),
+ level=messages.SUCCESS,
+ )
+ return HttpResponseRedirect(reverse("workspace:workspaces_countryprogram_change", args=[program.pk]))
+ else:
+ form = form_class(checker=checker, initial={"field": initial_unique_field})
+
+ context["form"] = form
+ return render(request, "workspace/program/set_unique_field.html", context)
+
@view(
label=_("Configure Columns"),
permission=can_change_country_program,
@@ -460,6 +512,18 @@ def household_defaults(self, request: HttpRequest, pk: str) -> HttpResponse:
context["defaults_scope_model"] = Household
return self._set_defaults(request, MassDefaultsForm, context)
+ @view(
+ label=_("Set Unique Field"),
+ permission=can_change_country_program,
+ visible=lambda btn: CountryProgramAdmin._can_configure(btn, Household, require_master_detail=True),
+ )
+ def household_unique_field(self, request: HttpRequest, pk: str) -> HttpResponse:
+ context = self.get_common_context(request, pk, title=_("Set Household unique field"))
+ program: CountryProgram = context["original"]
+ context["checker"] = program.household_checker
+ context["unique_scope_model"] = Household
+ return self._set_unique_field(request, SelectUniqueFieldForm, context)
+
@choice(
label=_("Household Columns"),
change_form=True,
@@ -471,6 +535,7 @@ def household_group(self, button: ChoiceButton) -> None:
button.choices = [
self.household_columns,
self.household_defaults,
+ self.household_unique_field,
self.household_alien_fields_to_ignore,
]
@@ -504,6 +569,18 @@ def individual_defaults(self, request: HttpRequest, pk: str) -> HttpResponse:
context["defaults_scope_model"] = Individual
return self._set_defaults(request, MassDefaultsForm, context)
+ @view(
+ label=_("Set Unique Field"),
+ permission=can_change_country_program,
+ visible=lambda btn: CountryProgramAdmin._can_configure(btn, Individual),
+ )
+ def individual_unique_field(self, request: HttpRequest, pk: str) -> HttpResponse:
+ context = self.get_common_context(request, pk, title=_("Set Individual unique field"))
+ program: CountryProgram = context["original"]
+ context["checker"] = program.individual_checker
+ context["unique_scope_model"] = Individual
+ return self._set_unique_field(request, SelectUniqueFieldForm, context)
+
@choice(
label=_("Individual Columns"),
change_form=True,
@@ -515,6 +592,7 @@ def individual_group(self, button: ChoiceButton) -> None:
button.choices = [
self.individual_columns,
self.individual_defaults,
+ self.individual_unique_field,
self.individual_alien_fields_to_ignore,
]
diff --git a/src/country_workspace/workspaces/templates/workspace/program/set_unique_field.html b/src/country_workspace/workspaces/templates/workspace/program/set_unique_field.html
new file mode 100644
index 00000000..3ac91711
--- /dev/null
+++ b/src/country_workspace/workspaces/templates/workspace/program/set_unique_field.html
@@ -0,0 +1,57 @@
+{% extends "workspace/change_form.html" %}
+{% load i18n workspace_urls %}
+{% block page-title %}
+ › {{ original }}{% admin_url original %}
+ › {% translate "Set unique field" %}
+{% endblock page-title %}
+{% block content %}
+
+{% endblock content %}
diff --git a/tests/contrib/hope/push/test_orchestration.py b/tests/contrib/hope/push/test_orchestration.py
index 0aaf272c..11d388b5 100644
--- a/tests/contrib/hope/push/test_orchestration.py
+++ b/tests/contrib/hope/push/test_orchestration.py
@@ -19,6 +19,7 @@
_require_policy_check,
_save_current_deduplication_snapshot,
_steps,
+ archive_removed_unique_values,
claim_rdp_deduplication,
clone_rdp_core,
create_rdp_core,
@@ -940,6 +941,7 @@ def test_push_existing_rdp_core_success(
processor_cls = mocker.patch(f"{MOD}.PushProcessor", return_value=processor)
steps_spy = mocker.patch(f"{MOD}._steps", return_value=[step1, step2])
mocker.patch(f"{MOD}.lock_rdp_for_update", return_value=locked)
+ archive = mocker.patch(f"{MOD}.archive_removed_unique_values")
mark_removed = mocker.patch(f"{MOD}._mark_rdp_beneficiaries_removed")
set_status = mocker.patch(f"{MOD}.set_rdp_push_status")
approve = mocker.patch(f"{MOD}._approve_deduplication_set_after_successful_push")
@@ -953,6 +955,7 @@ def test_push_existing_rdp_core_success(
steps_spy.assert_called_once_with(processor, config)
step1.assert_called_once_with()
step2.assert_called_once_with()
+ archive.assert_called_once_with(locked, True)
mark_removed.assert_called_once_with(locked, True)
set_status.assert_called_once_with(
rdp=locked,
@@ -987,6 +990,7 @@ def fail_step() -> None:
mocker.patch(f"{MOD}._steps", return_value=[fail_step, next_step])
mocker.patch(f"{MOD}.lock_rdp_for_update", return_value=locked)
set_status = mocker.patch(f"{MOD}.set_rdp_push_status")
+ archive = mocker.patch(f"{MOD}.archive_removed_unique_values")
mark_removed = mocker.patch(f"{MOD}._mark_rdp_beneficiaries_removed")
with pytest.raises(HopePushError) as exc:
@@ -994,6 +998,7 @@ def fail_step() -> None:
assert err_contains(exc.value.args[0]["errors"], "boom")
save_snapshot.assert_called_once_with(rdp=rdp, key="before_push")
+ archive.assert_not_called()
mark_removed.assert_not_called()
set_status.assert_called_once_with(
rdp=locked,
@@ -1001,3 +1006,119 @@ def fail_step() -> None:
hope_rdi_id="N/A",
)
next_step.assert_not_called()
+
+
+def test_archive_removed_unique_values_master_detail(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.side_effect = lambda model: {
+ "Household": "household_id",
+ "Individual": "document_number",
+ }[model.__name__]
+
+ hh_values = mocker.MagicMock()
+ hh_values.iterator.return_value = iter(["HH-1"])
+ hh_values_qs = mocker.MagicMock()
+ hh_values_qs.values_list.return_value = hh_values
+
+ owner = mocker.MagicMock()
+ owner.households.filter.side_effect = [
+ mocker.MagicMock(values_list=mocker.Mock(return_value=[10, 11])),
+ hh_values_qs,
+ ]
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ ci_values = mocker.MagicMock()
+ ci_values.iterator.return_value = iter(["DOC-1", "DOC-2"])
+ ci_qs = mocker.MagicMock()
+ ci_qs.values_list.return_value = ci_values
+ ci_filter = mocker.patch(f"{MOD}.CountryIndividual.objects.filter", return_value=ci_qs)
+
+ rdp = mocker.MagicMock(program=program)
+
+ archive_removed_unique_values(rdp, True)
+
+ ci_filter.assert_called_once_with(household_id__in=[10, 11], removed=False)
+ assert program.add_removed_unique_values_for.call_count == 2
+
+
+def test_archive_removed_unique_values_master_detail_no_records(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.return_value = "document_number"
+
+ owner = mocker.MagicMock()
+ owner.households.filter.return_value = mocker.MagicMock(values_list=mocker.Mock(return_value=[]))
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ archive_removed_unique_values(mocker.MagicMock(program=program), True)
+
+ program.add_removed_unique_values_for.assert_not_called()
+
+
+def test_archive_removed_unique_values_master_detail_no_config(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.return_value = None
+
+ owner = mocker.MagicMock()
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ archive_removed_unique_values(mocker.MagicMock(program=program), True)
+
+ owner.households.filter.assert_not_called()
+ program.add_removed_unique_values_for.assert_not_called()
+
+
+def test_archive_removed_unique_values_people_only(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.side_effect = lambda model: (
+ "document_number" if model.__name__ == "Individual" else None
+ )
+
+ ind_values = mocker.MagicMock()
+ ind_values.iterator.return_value = iter(["DOC-1"])
+ ind_qs = mocker.MagicMock()
+ ind_qs.values_list.return_value = ind_values
+
+ owner = mocker.MagicMock()
+ owner.individuals.filter.return_value = ind_qs
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ archive_removed_unique_values(mocker.MagicMock(program=program), False)
+
+ owner.individuals.filter.assert_called_once_with(removed=False)
+ program.add_removed_unique_values_for.assert_called_once()
+
+
+def test_archive_removed_unique_values_people_only_no_config(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.return_value = None
+
+ owner = mocker.MagicMock()
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ archive_removed_unique_values(mocker.MagicMock(program=program), False)
+
+ owner.individuals.filter.assert_not_called()
+ program.add_removed_unique_values_for.assert_not_called()
+
+
+def test_archive_removed_unique_values_master_detail_individual_only(mocker: MockerFixture) -> None:
+ program = mocker.MagicMock()
+ program.get_unique_field_for.side_effect = lambda model: (
+ None if model.__name__ == "Household" else "document_number"
+ )
+
+ owner = mocker.MagicMock()
+ owner.households.filter.return_value = mocker.MagicMock(values_list=mocker.Mock(return_value=[10]))
+ mocker.patch(f"{MOD}.selection_owner_for_rdp", return_value=owner)
+
+ ci_values = mocker.MagicMock()
+ ci_values.iterator.return_value = iter(["DOC-1"])
+ ci_qs = mocker.MagicMock()
+ ci_qs.values_list.return_value = ci_values
+ mocker.patch(f"{MOD}.CountryIndividual.objects.filter", return_value=ci_qs)
+
+ archive_removed_unique_values(mocker.MagicMock(program=program), True)
+
+ program.add_removed_unique_values_for.assert_called_once()
+ args, _ = program.add_removed_unique_values_for.call_args
+ assert args[0].__name__ == "Individual"
diff --git a/tests/models/test_m_program.py b/tests/models/test_m_program.py
index 045617a1..4c28889f 100644
--- a/tests/models/test_m_program.py
+++ b/tests/models/test_m_program.py
@@ -234,6 +234,123 @@ def test_program_apply_default_fields_applies_only_missing_or_none(program: Prog
assert result == {"a": "keep", "b": 2, "c": 3}
+@pytest.mark.parametrize(
+ ("model_cls", "field_name"),
+ [
+ (Household, "household_id"),
+ (Individual, "national_id"),
+ ],
+)
+def test_program_unique_field_for_scope(program: Program, model_cls: type, field_name: str) -> None:
+ program.system_fields = {
+ "unique_fields": {
+ "household": "household_id",
+ "individual": "national_id",
+ }
+ }
+ assert program.get_unique_field_for(model_cls) == field_name
+
+
+@pytest.mark.parametrize(
+ ("model_cls", "scope_key"),
+ [
+ (Household, "household"),
+ (Individual, "individual"),
+ ],
+)
+def test_program_save_unique_field_for_updates_scope(program: Program, model_cls: type, scope_key: str) -> None:
+ program.system_fields = {"removed_unique_values": {"other": {"keep": ["x"]}}}
+
+ program.save_unique_field_for(model_cls, "field_1")
+
+ assert program.system_fields["unique_fields"][scope_key] == "field_1"
+ assert program.system_fields["removed_unique_values"][scope_key]["field_1"] == []
+ assert program.system_fields["removed_unique_values"]["other"] == {"keep": ["x"]}
+
+
+def test_program_add_removed_unique_values_for(program: Program) -> None:
+ program.system_fields = {
+ "unique_fields": {"individual": "national_id"},
+ "removed_unique_values": {"individual": {"national_id": ["A"]}},
+ }
+
+ program.add_removed_unique_values_for(Individual, ["A", "B", " ", None, 123])
+
+ assert set(program.get_removed_unique_values_for(Individual)) == {"A", "B", "123"}
+
+
+def test_program_has_any_data(program: Program) -> None:
+ from tests.extras.testutils.factories import BatchFactory
+
+ assert not program.has_any_data()
+ batch = BatchFactory(program=program, country_office=program.country_office)
+ IndividualFactory(batch=batch)
+ assert program.has_any_data()
+
+
+def test_program_has_any_data_unsaved_program() -> None:
+ unsaved_program = ProgramFactory.build()
+ assert unsaved_program.has_any_data() is False
+
+
+def test_program_has_any_data_proxy_instance() -> None:
+ from tests.extras.testutils.factories import CountryProgramFactory, BatchFactory
+
+ program = CountryProgramFactory()
+ assert program.has_any_data() is False
+
+ BatchFactory(program=program, country_office=program.country_office)
+ assert program.has_any_data() is True
+
+
+def test_program_save_unique_field_for_none_removes_scope(program: Program) -> None:
+ program.system_fields = {
+ "unique_fields": {"individual": "national_id", "household": "household_id"},
+ "removed_unique_values": {"individual": {"national_id": ["A"]}},
+ }
+
+ program.save_unique_field_for(Individual, None)
+
+ assert "individual" not in program.system_fields["unique_fields"]
+ assert program.system_fields["unique_fields"]["household"] == "household_id"
+
+
+def test_program_get_removed_unique_values_for_without_unique_field(program: Program) -> None:
+ program.system_fields = {"removed_unique_values": {"individual": {"national_id": ["A"]}}}
+ assert program.get_removed_unique_values_for(Individual) == []
+
+
+def test_program_add_removed_unique_values_for_without_unique_field(program: Program) -> None:
+ program.system_fields = {}
+ program.add_removed_unique_values_for(Individual, ["A"])
+ assert program.system_fields == {}
+
+
+def test_program_add_removed_unique_values_for_skips_empty_values(program: Program) -> None:
+ program.system_fields = {
+ "unique_fields": {"individual": "national_id"},
+ "removed_unique_values": {"individual": {"national_id": ["A"]}},
+ }
+
+ program.add_removed_unique_values_for(Individual, [None, "", " "])
+
+ assert program.system_fields["removed_unique_values"]["individual"]["national_id"] == ["A"]
+
+
+def test_program_get_removed_unique_values_for_handles_non_list(program: Program) -> None:
+ program.system_fields = {
+ "unique_fields": {"individual": "national_id"},
+ "removed_unique_values": {"individual": {"national_id": "not-a-list"}},
+ }
+
+ assert program.get_removed_unique_values_for(Individual) == []
+
+
+def test_program_scope_for_unsupported_model_raises(program: Program) -> None:
+ with pytest.raises(TypeError):
+ program._scope_for(Program)
+
+
def test_apply_mapping_importer_with_mapping_id(program: Program, mocker: MockerFixture) -> None:
mapping_id = 123
data: dict[str, Any] = {"name": "Test"}
diff --git a/tests/workspace/actions/test_ws_validate.py b/tests/workspace/actions/test_ws_validate.py
index 98772713..8c2a7dde 100644
--- a/tests/workspace/actions/test_ws_validate.py
+++ b/tests/workspace/actions/test_ws_validate.py
@@ -1,14 +1,27 @@
import datetime
from typing import TYPE_CHECKING
+from unittest.mock import MagicMock
import freezegun
import pytest
from django.urls import reverse
+from pytest_mock import MockerFixture
from testutils.utils import select_office
from country_workspace.models import Household, Individual
from country_workspace.state import state
-from country_workspace.workspaces.admin.cleaners.validate import validate_queryset
+from country_workspace.workspaces.admin.cleaners import validate as validate_mod
+from country_workspace.workspaces.admin.cleaners.validate import (
+ ARCHIVED_UNIQUE_VALIDATION_ERROR,
+ UNIQUE_VALIDATION_ERROR,
+ UniqueValidationState,
+ _append_household_member_invalid_error,
+ _append_unique_error,
+ _build_unique_state,
+ _normalize_unique_value,
+ _validate_and_count,
+ validate_queryset,
+)
if TYPE_CHECKING:
from django_webtest import DjangoTestApp
@@ -120,3 +133,286 @@ def test_validate_queryset_individuals(program, force_migrated_records):
result = validate_queryset(qs)
assert result["valid"] + result["invalid"] == 2
+
+
+@pytest.mark.django_db
+def test_validate_queryset_individual_unique_field_duplicates(program, force_migrated_records):
+ from testutils.factories import IndividualFactory
+
+ program.save_unique_field_for(Individual, "full_name")
+
+ ind1: "CountryIndividual" = IndividualFactory(
+ household=None,
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"full_name": "John Doe"},
+ )
+ ind2: "CountryIndividual" = IndividualFactory(
+ household=None,
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"full_name": "John Doe"},
+ )
+
+ result = validate_queryset(Individual.objects.filter(pk__in=[ind1.pk, ind2.pk]))
+
+ assert result == {"valid": 0, "invalid": 2}
+ ind1.refresh_from_db()
+ ind2.refresh_from_db()
+ assert "full_name" in ind1.errors
+ assert "full_name" in ind2.errors
+
+
+@pytest.mark.django_db
+def test_validate_queryset_individual_unique_field_against_archived_values(program, force_migrated_records):
+ from testutils.factories import IndividualFactory
+
+ program.save_unique_field_for(Individual, "full_name")
+ program.add_removed_unique_values_for(Individual, ["John Doe"])
+
+ individual: "CountryIndividual" = IndividualFactory(
+ household=None,
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"full_name": "John Doe"},
+ )
+
+ result = validate_queryset(Individual.objects.filter(pk=individual.pk))
+
+ assert result == {"valid": 0, "invalid": 1}
+ individual.refresh_from_db()
+ assert "full_name" in individual.errors
+
+
+@pytest.mark.django_db
+def test_validate_queryset_households_marks_invalid_when_member_unique_duplicates(program, force_migrated_records):
+ from testutils.factories import HouseholdFactory, IndividualFactory
+
+ program.beneficiary_group.master_detail = True
+ program.beneficiary_group.save()
+ program.save_unique_field_for(Individual, "full_name")
+
+ household: "CountryHousehold" = HouseholdFactory(
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"size": 2},
+ )
+ IndividualFactory(
+ household=household,
+ batch=household.batch,
+ flex_fields={"full_name": "Member X"},
+ )
+ IndividualFactory(
+ household=household,
+ batch=household.batch,
+ flex_fields={"full_name": "Member X"},
+ )
+
+ result = validate_queryset(Household.objects.filter(pk=household.pk).prefetch_related("members"))
+ assert result == {"valid": 0, "invalid": 1}
+
+ household.refresh_from_db()
+ assert "dct" in household.errors
+
+
+@pytest.mark.django_db
+def test_validate_queryset_households_marks_both_invalid_for_member_unique_duplicates_across_households(
+ program, force_migrated_records
+):
+ from testutils.factories import HouseholdFactory, IndividualFactory
+
+ program.beneficiary_group.master_detail = True
+ program.beneficiary_group.save()
+ program.save_unique_field_for(Individual, "full_name")
+
+ hh1: "CountryHousehold" = HouseholdFactory(
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"size": 1},
+ )
+ IndividualFactory(household=hh1, batch=hh1.batch, flex_fields={"full_name": "Member X"})
+
+ hh2: "CountryHousehold" = HouseholdFactory(
+ batch__program=program,
+ batch__country_office=program.country_office,
+ flex_fields={"size": 1},
+ )
+ IndividualFactory(household=hh2, batch=hh2.batch, flex_fields={"full_name": "Member X"})
+
+ result = validate_queryset(Household.objects.filter(pk__in=[hh1.pk, hh2.pk]), chunk_size=1)
+
+ assert result == {"valid": 0, "invalid": 2}
+
+
+@pytest.mark.parametrize(
+ ("raw", "expected"),
+ [
+ (None, None),
+ ("", None),
+ (" ", None),
+ ("hello", "hello"),
+ (" spaced ", "spaced"),
+ (123, "123"),
+ ],
+)
+def test_normalize_unique_value(raw: object, expected: str | None) -> None:
+ assert _normalize_unique_value(raw) == expected
+
+
+def _mock_obj(errors: object = None) -> MagicMock:
+ obj = MagicMock()
+ obj.errors = errors
+ return obj
+
+
+def test_append_unique_error_appends_first_message() -> None:
+ obj = _mock_obj({})
+
+ _append_unique_error(obj, "national_id", UNIQUE_VALIDATION_ERROR)
+
+ assert obj.errors["national_id"] == [UNIQUE_VALIDATION_ERROR]
+ obj.save.assert_called_once_with(update_fields=["errors", "last_checked"])
+
+
+def test_append_unique_error_wraps_non_list_current() -> None:
+ obj = _mock_obj({"national_id": "previous"})
+
+ _append_unique_error(obj, "national_id", UNIQUE_VALIDATION_ERROR)
+
+ assert obj.errors["national_id"] == ["previous", UNIQUE_VALIDATION_ERROR]
+
+
+def test_append_unique_error_skips_when_message_already_present() -> None:
+ obj = _mock_obj({"national_id": [UNIQUE_VALIDATION_ERROR]})
+
+ _append_unique_error(obj, "national_id", UNIQUE_VALIDATION_ERROR)
+
+ assert obj.errors == {"national_id": [UNIQUE_VALIDATION_ERROR]}
+ obj.save.assert_not_called()
+
+
+def test_append_household_member_invalid_error_appends() -> None:
+ obj = _mock_obj({})
+
+ _append_household_member_invalid_error(obj)
+
+ assert obj.errors["dct"] == ["Some members did not validate"]
+ obj.save.assert_called_once_with(update_fields=["errors", "last_checked"])
+
+
+def test_append_household_member_invalid_error_wraps_non_list() -> None:
+ obj = _mock_obj({"dct": "scalar"})
+
+ _append_household_member_invalid_error(obj)
+
+ assert obj.errors["dct"] == ["scalar", "Some members did not validate"]
+
+
+def test_append_household_member_invalid_error_skips_when_marker_present() -> None:
+ obj = _mock_obj({"dct": ["Some members did not validate"]})
+
+ _append_household_member_invalid_error(obj)
+
+ obj.save.assert_not_called()
+
+
+def test_unique_validation_state_skips_empty_values() -> None:
+ state_ = UniqueValidationState(field_name="national_id", archived_values=set())
+ obj = MagicMock(flex_fields={"national_id": " "})
+
+ assert state_.validate(obj) == set()
+
+
+def test_unique_validation_state_handles_missing_flex_fields() -> None:
+ state_ = UniqueValidationState(field_name="national_id", archived_values=set())
+ obj = MagicMock(flex_fields=None)
+
+ assert state_.validate(obj) == set()
+
+
+def test_unique_validation_state_archived_value(mocker: MockerFixture) -> None:
+ spy = mocker.patch.object(validate_mod, "_append_unique_error")
+ state_ = UniqueValidationState(field_name="national_id", archived_values={"A"})
+ obj = MagicMock(pk=1, flex_fields={"national_id": "A"})
+
+ assert state_.validate(obj) == {1}
+
+ spy.assert_called_once_with(obj, "national_id", ARCHIVED_UNIQUE_VALIDATION_ERROR)
+
+
+def test_unique_validation_state_first_occurrence_records_value() -> None:
+ state_ = UniqueValidationState(field_name="national_id", archived_values=set())
+ obj = MagicMock(pk=1, flex_fields={"national_id": "A"})
+
+ assert state_.validate(obj) == set()
+ assert state_.seen_by_value == {"A": obj}
+
+
+def test_unique_validation_state_duplicate_marks_both(mocker: MockerFixture) -> None:
+ spy = mocker.patch.object(validate_mod, "_append_unique_error")
+ state_ = UniqueValidationState(field_name="national_id", archived_values=set())
+ first = MagicMock(pk=1, flex_fields={"national_id": "A"})
+ second = MagicMock(pk=2, flex_fields={"national_id": "A"})
+
+ state_.validate(first)
+ invalid = state_.validate(second)
+
+ assert invalid == {1, 2}
+ assert spy.call_count == 2
+ assert spy.call_args_list[0].args == (first, "national_id", UNIQUE_VALIDATION_ERROR)
+ assert spy.call_args_list[1].args == (second, "national_id", UNIQUE_VALIDATION_ERROR)
+
+
+def test_build_unique_state_returns_none_when_no_field() -> None:
+ program = MagicMock()
+ program.get_unique_field_for.return_value = None
+
+ assert _build_unique_state(program, Individual) is None
+
+
+def test_build_unique_state_filters_falsy_archived_values() -> None:
+ program = MagicMock()
+ program.get_unique_field_for.return_value = "national_id"
+ program.get_removed_unique_values_for.return_value = ["A", "", "B"]
+
+ state_ = _build_unique_state(program, Individual)
+
+ assert state_ is not None
+ assert state_.field_name == "national_id"
+ assert state_.archived_values == {"A", "B"}
+
+
+def test_validate_and_count_without_states(mocker: MockerFixture) -> None:
+ mocker.patch.object(validate_mod, "validate_alien_fields")
+ mocker.patch.object(validate_mod, "batch_ctx")
+ obj = MagicMock(pk=1, batch_id=1)
+ obj.validate_with_checker.return_value = True
+
+ valid, invalid = _validate_and_count([obj])
+
+ assert (valid, invalid) == (1, 0)
+
+
+def test_validate_and_count_invalid_obj_only(mocker: MockerFixture) -> None:
+ mocker.patch.object(validate_mod, "validate_alien_fields")
+ mocker.patch.object(validate_mod, "batch_ctx")
+ obj = MagicMock(pk=1, batch_id=1)
+ obj.validate_with_checker.return_value = False
+
+ valid, invalid = _validate_and_count([obj])
+
+ assert (valid, invalid) == (0, 1)
+
+
+def test_validate_and_count_member_unique_only_for_household(mocker: MockerFixture) -> None:
+ mocker.patch.object(validate_mod, "validate_alien_fields")
+ mocker.patch.object(validate_mod, "batch_ctx")
+ member_state = MagicMock(spec=UniqueValidationState)
+ member_state.validate.return_value = set()
+ obj = MagicMock(pk=1, batch_id=1)
+ obj.validate_with_checker.return_value = True
+
+ valid, invalid = _validate_and_count([obj], member_unique_state=member_state)
+
+ assert (valid, invalid) == (1, 0)
+ member_state.validate.assert_not_called()
diff --git a/tests/workspace/admin/program/test_program.py b/tests/workspace/admin/program/test_program.py
index ed7fcd96..edd86dd9 100644
--- a/tests/workspace/admin/program/test_program.py
+++ b/tests/workspace/admin/program/test_program.py
@@ -1,6 +1,7 @@
import pytest
from django.contrib import messages
from django.http import HttpResponseRedirect, QueryDict
+from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from country_workspace.exceptions import RemoteError
@@ -61,6 +62,13 @@ def test_import_aurora_returns_form_when_invalid(
assert result is form
+@pytest.fixture
+def mock_program(mocker: MockerFixture):
+ program = mocker.MagicMock()
+ program.pk = 1
+ return program
+
+
def test_import_kobo_returns_form_when_invalid(
program_admin,
mock_request,
@@ -363,3 +371,169 @@ def test_update_dedup_settings_post_success(
)
assert isinstance(response, HttpResponseRedirect)
assert response.url == "/program/1/change/"
+
+
+def test_set_unique_field_get(program_admin, mock_request, mock_program, mocker: MockerFixture) -> None:
+ mock_request.method = "GET"
+ mock_program.get_unique_field_for.return_value = "field_2"
+ mock_program.has_any_data.return_value = False
+
+ form_class = mocker.MagicMock()
+ render = mocker.patch("country_workspace.workspaces.admin.program.render")
+ context = {
+ "original": mock_program,
+ "checker": "checker",
+ "unique_scope_model": "Model",
+ }
+
+ response = program_admin._set_unique_field(mock_request, form_class, context)
+
+ form_class.assert_called_once_with(checker="checker", initial={"field": "field_2"})
+ render.assert_called_once_with(mock_request, "workspace/program/set_unique_field.html", context)
+ assert response is render.return_value
+
+
+@pytest.mark.parametrize("has_data", [True, False])
+def test_set_unique_field_post(
+ program_admin, mock_request, mock_program, mocker: MockerFixture, has_data: bool
+) -> None:
+ mock_request.method = "POST"
+ mock_request.POST = {"field": "national_id"}
+ mock_program.pk = 42
+ mock_program.has_any_data.return_value = has_data
+
+ context = {
+ "original": mock_program,
+ "checker": "checker",
+ "unique_scope_model": "Model",
+ }
+ reverse = mocker.patch(
+ "country_workspace.workspaces.admin.program.reverse",
+ return_value="/workspaces/countryprogram/42/change/",
+ )
+ form = MagicMock()
+ form.is_valid.return_value = True
+ form.cleaned_data = {"field": "national_id"}
+ form_class = mocker.MagicMock(return_value=form)
+
+ response = program_admin._set_unique_field(mock_request, form_class, context)
+ assert isinstance(response, HttpResponseRedirect)
+ reverse.assert_called_once()
+
+ if has_data:
+ form_class.assert_not_called()
+ mock_program.save_unique_field_for.assert_not_called()
+ else:
+ form_class.assert_called_once_with(mock_request.POST, checker="checker")
+ mock_program.save_unique_field_for.assert_called_once_with("Model", "national_id")
+
+
+def test_set_unique_field_get_blocked_when_program_has_data(
+ program_admin, mock_request, mock_program, mocker: MockerFixture
+) -> None:
+ mock_request.method = "GET"
+ mock_program.has_any_data.return_value = True
+ mock_program.pk = 99
+ reverse_mock = mocker.patch(
+ "country_workspace.workspaces.admin.program.reverse",
+ return_value="/program/99/change/",
+ )
+
+ response = program_admin._set_unique_field(
+ mock_request,
+ mocker.MagicMock(),
+ {
+ "original": mock_program,
+ "checker": "checker",
+ "unique_scope_model": "Model",
+ },
+ )
+
+ assert isinstance(response, HttpResponseRedirect)
+ assert response.url == "/program/99/change/"
+ reverse_mock.assert_called_once()
+ program_admin.message_user.assert_called_once()
+ _, kwargs = program_admin.message_user.call_args
+ assert kwargs.get("level") == messages.ERROR
+
+
+def test_set_unique_field_post_invalid_form_renders_again(
+ program_admin, mock_request, mock_program, mocker: MockerFixture
+) -> None:
+ mock_request.method = "POST"
+ mock_request.POST = {"field": ""}
+ mock_program.has_any_data.return_value = False
+ mock_program.get_unique_field_for.return_value = None
+
+ form = mocker.MagicMock()
+ form.is_valid.return_value = False
+ form_class = mocker.MagicMock(return_value=form)
+ render = mocker.patch("country_workspace.workspaces.admin.program.render")
+
+ context = {
+ "original": mock_program,
+ "checker": "checker",
+ "unique_scope_model": "Model",
+ }
+ response = program_admin._set_unique_field(mock_request, form_class, context)
+
+ form_class.assert_called_once_with(mock_request.POST, checker="checker")
+ mock_program.save_unique_field_for.assert_not_called()
+ render.assert_called_once_with(mock_request, "workspace/program/set_unique_field.html", context)
+ assert response is render.return_value
+
+
+def test_select_unique_field_form_builds_choices_from_checker(mocker: MockerFixture) -> None:
+ labelled = mocker.MagicMock()
+ labelled.label = "National ID"
+ unlabelled = mocker.MagicMock()
+ unlabelled.label = ""
+
+ form_class = mocker.MagicMock()
+ form_class.base_fields = {"national_id": labelled, "iban": unlabelled}
+
+ checker = mocker.MagicMock()
+ checker.get_form_class.return_value = form_class
+
+ form = program_admin_mod.SelectUniqueFieldForm(checker=checker)
+
+ assert form.fields["field"].choices == [
+ ("national_id", "National ID (national_id)"),
+ ("iban", "iban (iban)"),
+ ]
+
+
+def test_household_unique_field_view_dispatches(program_admin, mock_request, mocker: MockerFixture) -> None:
+ set_unique = mocker.patch.object(program_admin, "_set_unique_field")
+
+ program_admin.household_unique_field.func(program_admin, mock_request, pk="1")
+
+ set_unique.assert_called_once()
+ _, _, ctx = set_unique.call_args.args
+ assert ctx["unique_scope_model"].__name__ == "Household"
+ assert "checker" in ctx
+
+
+def test_individual_unique_field_view_dispatches(program_admin, mock_request, mocker: MockerFixture) -> None:
+ set_unique = mocker.patch.object(program_admin, "_set_unique_field")
+
+ program_admin.individual_unique_field.func(program_admin, mock_request, pk="1")
+
+ set_unique.assert_called_once()
+ _, _, ctx = set_unique.call_args.args
+ assert ctx["unique_scope_model"].__name__ == "Individual"
+ assert "checker" in ctx
+
+
+def test_household_group_includes_unique_field(program_admin) -> None:
+ button = MagicMock()
+ button.choices = []
+ program_admin.household_group.func(program_admin, button)
+ assert program_admin.household_unique_field in button.choices
+
+
+def test_individual_group_includes_unique_field(program_admin) -> None:
+ button = MagicMock()
+ button.choices = []
+ program_admin.individual_group.func(program_admin, button)
+ assert program_admin.individual_unique_field in button.choices
diff --git a/tests/workspace/admin/program/test_program_buttons.py b/tests/workspace/admin/program/test_program_buttons.py
index 104e2525..d8d6de9c 100644
--- a/tests/workspace/admin/program/test_program_buttons.py
+++ b/tests/workspace/admin/program/test_program_buttons.py
@@ -75,8 +75,10 @@ def test_program_action_permissions(
("workspace:workspaces_countryprogram_individual_columns", "country_program"),
("workspace:workspaces_countryprogram_household_defaults", "country_program_md_true"),
("workspace:workspaces_countryprogram_individual_defaults", "country_program"),
+ ("workspace:workspaces_countryprogram_household_unique_field", "country_program_md_true"),
+ ("workspace:workspaces_countryprogram_individual_unique_field", "country_program"),
],
- ids=["hh_columns", "ind_columns", "hh_defaults", "ind_defaults"],
+ ids=["hh_columns", "ind_columns", "hh_defaults", "ind_defaults", "hh_unique_field", "ind_unique_field"],
)
def test_columns_and_defaults_permissions(
user: User,
@@ -100,10 +102,22 @@ def test_columns_and_defaults_permissions(
@pytest.mark.parametrize(
- ("group_method", "columns_method", "defaults_method", "ignore_method"),
+ ("group_method", "columns_method", "defaults_method", "unique_method", "ignore_method"),
[
- ("household_group", "household_columns", "household_defaults", "household_alien_fields_to_ignore"),
- ("individual_group", "individual_columns", "individual_defaults", "individual_alien_fields_to_ignore"),
+ (
+ "household_group",
+ "household_columns",
+ "household_defaults",
+ "household_unique_field",
+ "household_alien_fields_to_ignore",
+ ),
+ (
+ "individual_group",
+ "individual_columns",
+ "individual_defaults",
+ "individual_unique_field",
+ "individual_alien_fields_to_ignore",
+ ),
],
ids=["household_group", "individual_group"],
)
@@ -113,6 +127,7 @@ def test_group_choice_buttons_choices(
group_method: str,
columns_method: str,
defaults_method: str,
+ unique_method: str,
ignore_method: str,
) -> None:
admin = country_program_admin_instance
@@ -124,6 +139,7 @@ def test_group_choice_buttons_choices(
assert button.choices == [
getattr(admin, columns_method),
getattr(admin, defaults_method),
+ getattr(admin, unique_method),
getattr(admin, ignore_method),
]
diff --git a/tests/workspace/test_ws_program.py b/tests/workspace/test_ws_program.py
index b83b4321..b7215f24 100644
--- a/tests/workspace/test_ws_program.py
+++ b/tests/workspace/test_ws_program.py
@@ -103,6 +103,10 @@ def test_configure_hh_columns(app, household: "CountryHousehold", master_detail:
# 4) Ensure options contain links to *_columns and *_defaults views
assert reverse("workspace:workspaces_countryprogram_household_columns", args=[program.pk]) in option_values
assert reverse("workspace:workspaces_countryprogram_household_defaults", args=[program.pk]) in option_values
+ assert (
+ reverse("workspace:workspaces_countryprogram_household_unique_field", args=[program.pk])
+ in option_values
+ )
# 5) Configure columns via household_columns view
url = reverse("workspace:workspaces_countryprogram_household_columns", args=[program.pk])
@@ -154,6 +158,9 @@ def test_configure_ind_columns(app, household: "CountryHousehold"):
# 3) Ensure options contain links to *_columns and *_defaults views
assert reverse("workspace:workspaces_countryprogram_individual_columns", args=[program.pk]) in option_values
assert reverse("workspace:workspaces_countryprogram_individual_defaults", args=[program.pk]) in option_values
+ assert (
+ reverse("workspace:workspaces_countryprogram_individual_unique_field", args=[program.pk]) in option_values
+ )
# 4) Configure columns via individual_columns view
url = reverse("workspace:workspaces_countryprogram_individual_columns", args=[program.pk])