diff --git a/src/country_workspace/config/__init__.py b/src/country_workspace/config/__init__.py index 464041ff..2c0ca99f 100644 --- a/src/country_workspace/config/__init__.py +++ b/src/country_workspace/config/__init__.py @@ -197,6 +197,12 @@ class Group(Enum): "IGNORE_UNSUPPORTED_FEATURES": (bool, False, True), "REQUESTS_TIMEOUT": (int, 30, 30), "DEBUG_API_REQUESTS": (bool, False, False), + "BITCASTER_API_URL": (str, "", "", False, "Bitcaster API URL"), + "BITCASTER_ENABLED": (bool, False, False, False, "Enable Bitcaster notifications"), + "BITCASTER_API_KEY": (str, "", "", False, "Bitcaster API key"), + "BITCASTER_ORGANIZATION_SLUG": (str, "unicef", "unicef", False, "Bitcaster organization slug"), + "BITCASTER_PROJECT_SLUG": (str, "hope", "hope", False, "Bitcaster project slug"), + "BITCASTER_APPLICATION_SLUG": (str, "", "country-workspace", False, "Bitcaster application slug"), } env = SmartEnv(**CONFIG) diff --git a/src/country_workspace/config/settings.py b/src/country_workspace/config/settings.py index c293e4d2..54d1c861 100644 --- a/src/country_workspace/config/settings.py +++ b/src/country_workspace/config/settings.py @@ -56,6 +56,7 @@ "country_workspace.workspaces.apps.Config", "country_workspace.versioning", "country_workspace.cache", + "country_workspace.notifications", # these should be optional in the future "country_workspace.contrib.hope.apps.Config", "country_workspace.contrib.aurora.apps.Config", @@ -149,6 +150,15 @@ }, }, } + +# Bitcaster Configuration +BITCASTER_API_URL = env("BITCASTER_API_URL") +BITCASTER_ENABLED = env("BITCASTER_ENABLED") +BITCASTER_API_KEY = env("BITCASTER_API_KEY") +BITCASTER_ORGANIZATION_SLUG = env("BITCASTER_ORGANIZATION_SLUG") +BITCASTER_PROJECT_SLUG = env("BITCASTER_PROJECT_SLUG") +BITCASTER_APPLICATION_SLUG = env("BITCASTER_APPLICATION_SLUG") + SELECT2_CACHE_BACKEND = "select2" X_FRAME_OPTIONS = "SAMEORIGIN" diff --git a/src/country_workspace/contrib/aurora/import_processing.py b/src/country_workspace/contrib/aurora/import_processing.py index ca39bf8b..dac971c1 100644 --- a/src/country_workspace/contrib/aurora/import_processing.py +++ b/src/country_workspace/contrib/aurora/import_processing.py @@ -15,6 +15,7 @@ from country_workspace.utils.imports import get_aurora_originating_id from country_workspace.utils.import_flow import build_import_processor, run_batch_postprocessing from country_workspace.utils.sync_log import get_aurora_sync_log_name +from country_workspace.notifications.signals import data_imported_signal logger = logging.getLogger(__name__) @@ -78,6 +79,14 @@ def import_data(job: AsyncJob) -> ImportResult: batch.status = Batch.BatchStatus.COMPLETE batch.save(update_fields=["status"]) + + data_imported_signal.send( + sender=Batch, + program_id=batch.program_id, + batch_id=batch.id, + record_count=total_people + total_households, + source=Batch.BatchSource.AURORA, + ) return ImportResult(people=total_people, households=total_households) diff --git a/src/country_workspace/contrib/hope/push/orchestration.py b/src/country_workspace/contrib/hope/push/orchestration.py index ea4cbe36..dc1cd46b 100644 --- a/src/country_workspace/contrib/hope/push/orchestration.py +++ b/src/country_workspace/contrib/hope/push/orchestration.py @@ -14,6 +14,10 @@ from country_workspace.contrib.hope.exceptions import HopePushError from country_workspace.exceptions import RemoteError, RemoteUnavailableError from country_workspace.models import AsyncJob, Rdp +from country_workspace.notifications.signals import ( + rdi_push_completed_signal, + rdp_push_status_changed_signal, +) from .config import CreateRdpConfig, PushWorkflowConfig from .policy import ActionCheck, get_rdp_policy @@ -314,6 +318,12 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]: status=Rdp.PushStatus.FAILURE, hope_rdi_id=hope_processor.hope_rdi_id or "N/A", ) + rdp_push_status_changed_signal.send( + sender=Rdp, + program_id=rdp.program_id, + rdp_id=rdp.pk, + status=Rdp.PushStatus.FAILURE, + ) raise HopePushError(hope_processor.total) with transaction.atomic(): @@ -333,6 +343,25 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]: processor=hope_processor, ) + pushed_count = ( + hope_processor.total.get("households", 0) + + hope_processor.total.get("individuals", 0) + + hope_processor.total.get("people", 0) + ) + + rdi_push_completed_signal.send( + sender=Rdp, + program_id=rdp.program_id, + pushed_count=pushed_count, + ) + + rdp_push_status_changed_signal.send( + sender=Rdp, + program_id=rdp.program_id, + rdp_id=rdp.pk, + status=Rdp.PushStatus.SUCCESS, + ) + return hope_processor.total diff --git a/src/country_workspace/contrib/kobo/sync.py b/src/country_workspace/contrib/kobo/sync.py index 6038ba90..2ffe0fd7 100644 --- a/src/country_workspace/contrib/kobo/sync.py +++ b/src/country_workspace/contrib/kobo/sync.py @@ -27,6 +27,7 @@ from country_workspace.utils.sync_log import get_kobo_sync_log_name from country_workspace.workspaces.admin.cleaners.validate import create_validation_jobs from country_workspace.models.jobs import GracefulJobCancellationError +from country_workspace.notifications.signals import data_imported_signal if TYPE_CHECKING: from hope_flex_fields.models import DataChecker @@ -468,11 +469,20 @@ def import_data(job: AsyncJob) -> ImportResult: owner=job.owner, program=job.program, queryset=batch.household_set.filter(removed=False).prefetch_related("members"), # type: ignore[attr-defined] + context="rdi", ) job.ensure_not_cancelled(refresh=True) # Mark batch complete in a dedicated transaction. with transaction.atomic(): Batch.objects.select_for_update().filter(pk=batch.pk).update(status=Batch.BatchStatus.COMPLETE) + + data_imported_signal.send( + sender=Batch, + program_id=batch.program_id, + batch_id=batch.id, + record_count=household_counter + individual_counter, + source=Batch.BatchSource.KOBO, + ) else: job.ensure_not_cancelled(refresh=True) AsyncJob.objects.create( diff --git a/src/country_workspace/datasources/rdi/processors.py b/src/country_workspace/datasources/rdi/processors.py index a72d3e16..6f3bdc21 100644 --- a/src/country_workspace/datasources/rdi/processors.py +++ b/src/country_workspace/datasources/rdi/processors.py @@ -19,6 +19,7 @@ from country_workspace.utils.functional import compose from country_workspace.utils.import_flow import build_import_processor, run_batch_postprocessing from country_workspace.workspaces.admin.cleaners.validate import create_validation_jobs +from country_workspace.notifications.signals import data_imported_signal from .config import Config, SheetName, Sheet from .exceptions import ColumnConfigurationError, SheetProcessingError, SheetNotFoundError @@ -220,6 +221,13 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]: if not config.get("validate_after_import"): batch.status = Batch.BatchStatus.COMPLETE batch.save(update_fields=["status"]) + data_imported_signal.send( + sender=Batch, + program_id=batch.program_id, + batch_id=batch.id, + record_count=sum(result.values()), + source=Batch.BatchSource.RDI, + ) return result job.ensure_not_cancelled(refresh=True) @@ -233,9 +241,17 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]: owner=job.owner, program=job.program, queryset=queryset, + context="rdi", ) batch.status = Batch.BatchStatus.COMPLETE batch.save(update_fields=["status"]) + data_imported_signal.send( + sender=Batch, + program_id=batch.program_id, + batch_id=batch.id, + record_count=sum(result.values()), + source=Batch.BatchSource.RDI, + ) return result diff --git a/src/country_workspace/notifications/__init__.py b/src/country_workspace/notifications/__init__.py new file mode 100644 index 00000000..1e87d37b --- /dev/null +++ b/src/country_workspace/notifications/__init__.py @@ -0,0 +1,4 @@ +# Expose the client for easier imports +from .bitcaster_client import BitcasterClient + +__all__ = ["BitcasterClient"] diff --git a/src/country_workspace/notifications/apps.py b/src/country_workspace/notifications/apps.py new file mode 100644 index 00000000..b20dfdf3 --- /dev/null +++ b/src/country_workspace/notifications/apps.py @@ -0,0 +1,10 @@ +from django.apps import AppConfig + + +class NotificationsConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "country_workspace.notifications" + + def ready(self) -> None: + # Import handlers to ensure signals are registered when Django starts + import country_workspace.notifications.handlers # noqa: F401 diff --git a/src/country_workspace/notifications/bitcaster_client.py b/src/country_workspace/notifications/bitcaster_client.py new file mode 100644 index 00000000..5ae7ad3f --- /dev/null +++ b/src/country_workspace/notifications/bitcaster_client.py @@ -0,0 +1,51 @@ +import logging +from typing import Any +from urllib.parse import urlparse + +from bitcaster_sdk.client import Client as SDKClient +from django.conf import settings + +logger = logging.getLogger(__name__) + + +class BitcasterClient: + """Client wrapper around bitcaster-sdk.""" + + def __init__( + self, + api_url: str | None = None, + api_key: str | None = None, + organization_slug: str | None = None, + project_slug: str | None = None, + application_slug: str | None = None, + ) -> None: + # Allow overriding settings, but default to Django settings + self.api_url = api_url or settings.BITCASTER_API_URL + self.api_key = api_key or settings.BITCASTER_API_KEY + self.organization_slug = organization_slug or settings.BITCASTER_ORGANIZATION_SLUG + self.project_slug = project_slug or settings.BITCASTER_PROJECT_SLUG + self.application_slug = application_slug or settings.BITCASTER_APPLICATION_SLUG + + @property + def is_configured(self) -> bool: + return bool( + self.api_url and self.api_key and self.organization_slug and self.project_slug and self.application_slug + ) + + def _build_bae(self) -> str: + parsed = urlparse(self.api_url) + return f"{parsed.scheme}://{self.api_key}@{parsed.netloc}/api/o/{self.organization_slug}/" + + def trigger_event(self, event_name: str, payload: dict[str, Any]) -> bool: + if not self.is_configured: + logger.warning("Bitcaster client is not fully configured. Skipping event '%s'.", event_name) + return False + + SDKClient(bae=self._build_bae()).trigger( + project=self.project_slug, + application=self.application_slug, + event=event_name, + context=payload, + ) + logger.info("Successfully triggered Bitcaster event: %s", event_name) + return True diff --git a/src/country_workspace/notifications/handlers.py b/src/country_workspace/notifications/handlers.py new file mode 100644 index 00000000..07548cde --- /dev/null +++ b/src/country_workspace/notifications/handlers.py @@ -0,0 +1,55 @@ +import logging +from typing import Any +from django.dispatch import receiver + +from country_workspace.notifications.signals import ( + data_imported_signal, + rdi_push_completed_signal, + rdp_push_status_changed_signal, + validation_completed_signal, +) +from country_workspace.notifications.tasks import send_bitcaster_event_task + +logger = logging.getLogger(__name__) + + +@receiver(data_imported_signal) +def handle_data_imported(sender: Any, **kwargs: Any) -> None: + # Build a standard payload dictionary using kwargs + payload = { + "program_id": kwargs.get("program_id"), + "batch_id": kwargs.get("batch_id"), + "record_count": kwargs.get("record_count"), + "source": kwargs.get("source"), + } + # Queue the Celery task + send_bitcaster_event_task.delay("data_imported", payload) + + +@receiver(validation_completed_signal) +def handle_validation_completed(sender: Any, **kwargs: Any) -> None: + payload = { + "program_id": kwargs.get("program_id"), + "context": kwargs.get("context"), + "results": kwargs.get("results", {}), + } + send_bitcaster_event_task.delay("validation_completed", payload) + + +@receiver(rdi_push_completed_signal) +def handle_rdi_pushed(sender: Any, **kwargs: Any) -> None: + payload = { + "program_id": kwargs.get("program_id"), + "pushed_count": kwargs.get("pushed_count"), + } + send_bitcaster_event_task.delay("rdi_push_completed", payload) + + +@receiver(rdp_push_status_changed_signal) +def handle_rdp_pushed(sender: Any, **kwargs: Any) -> None: + payload = { + "program_id": kwargs.get("program_id"), + "rdp_id": kwargs.get("rdp_id"), + "status": kwargs.get("status"), + } + send_bitcaster_event_task.delay("rdp_push_status_changed", payload) diff --git a/src/country_workspace/notifications/notifier.py b/src/country_workspace/notifications/notifier.py new file mode 100644 index 00000000..494ba007 --- /dev/null +++ b/src/country_workspace/notifications/notifier.py @@ -0,0 +1,16 @@ +from typing import Any, Protocol + +from country_workspace.notifications.bitcaster_client import BitcasterClient + + +class NotificationBackend(Protocol): + def trigger_event(self, event_name: str, payload: dict[str, Any]) -> bool: ... + + +def get_notification_backend() -> NotificationBackend: + return BitcasterClient() + + +def send_notification_event(event_name: str, payload: dict[str, Any]) -> bool: + backend = get_notification_backend() + return backend.trigger_event(event_name, payload) diff --git a/src/country_workspace/notifications/signals.py b/src/country_workspace/notifications/signals.py new file mode 100644 index 00000000..0aad6fe6 --- /dev/null +++ b/src/country_workspace/notifications/signals.py @@ -0,0 +1,18 @@ +import django.dispatch + +# Bitcaster Integration Signals +# Triggered when a batch of data has been successfully imported. +# Expected kwargs: program_id (int), batch_id (int), record_count (int), source (str) +data_imported_signal = django.dispatch.Signal() + +# Triggered when validation processing is finished (either full database or RDI validation) +# Expected kwargs: program_id (int), context (str - e.g. "total", "rdi"), results (dict) +validation_completed_signal = django.dispatch.Signal() + +# Triggered when an RDI push cycle successfully completes. +# Expected kwargs: program_id (int), pushed_count (int) +rdi_push_completed_signal = django.dispatch.Signal() + +# Triggered when an RDP record status changes. +# Expected kwargs: program_id (int), rdp_id (int), status (str) +rdp_push_status_changed_signal = django.dispatch.Signal() diff --git a/src/country_workspace/notifications/tasks.py b/src/country_workspace/notifications/tasks.py new file mode 100644 index 00000000..0a1505a2 --- /dev/null +++ b/src/country_workspace/notifications/tasks.py @@ -0,0 +1,33 @@ +import logging +from typing import Any + +from django.conf import settings + +from country_workspace.config.celery import app +from country_workspace.notifications.notifier import get_notification_backend, send_notification_event + +logger = logging.getLogger(__name__) + + +class NotifyError(Exception): + """Raise the exception when a notification fails.""" + + +@app.task() +def send_bitcaster_event_task(event_name: str, payload: dict[str, Any]) -> None: + """Celery task to asynchronously send an event to Bitcaster.""" + if not settings.BITCASTER_ENABLED: + logger.info("Skipping Bitcaster task: integration disabled (event='%s').", event_name) + return + + backend = get_notification_backend() + if not getattr(backend, "is_configured", False): + logger.warning("Skipping Bitcaster task: client not configured.") + return + + try: + success = send_notification_event(event_name, payload) + if not success: + logger.warning("Bitcaster client returned false for event '%s'", event_name) + except NotifyError as exc: # pragma: no cover + logger.error("Bitcaster send failed for event '%s': %s", event_name, str(exc)) diff --git a/src/country_workspace/workspaces/admin/batch/reprocessing.py b/src/country_workspace/workspaces/admin/batch/reprocessing.py index a3fc1555..d3b6cfaf 100644 --- a/src/country_workspace/workspaces/admin/batch/reprocessing.py +++ b/src/country_workspace/workspaces/admin/batch/reprocessing.py @@ -350,6 +350,7 @@ def reprocess_batch(job: AsyncJob) -> dict[str, Any]: owner=job.owner, program=batch.program, queryset=households.prefetch_related("members"), + context="rdi", ) elif individual_count > 0: create_validation_jobs( @@ -357,6 +358,7 @@ def reprocess_batch(job: AsyncJob) -> dict[str, Any]: owner=job.owner, program=batch.program, queryset=individuals, + context="rdi", ) response = { diff --git a/src/country_workspace/workspaces/admin/cleaners/actions.py b/src/country_workspace/workspaces/admin/cleaners/actions.py index f1d07e7d..7a3290ac 100644 --- a/src/country_workspace/workspaces/admin/cleaners/actions.py +++ b/src/country_workspace/workspaces/admin/cleaners/actions.py @@ -39,6 +39,7 @@ def validate_records( owner=request.user, program=state.program, queryset=queryset, + context="total", ) model_admin.message_user(request, "Task scheduled", messages.SUCCESS) return diff --git a/src/country_workspace/workspaces/admin/cleaners/validate.py b/src/country_workspace/workspaces/admin/cleaners/validate.py index 1285e617..8d6dd9bb 100644 --- a/src/country_workspace/workspaces/admin/cleaners/validate.py +++ b/src/country_workspace/workspaces/admin/cleaners/validate.py @@ -1,10 +1,13 @@ import logging +import math +import uuid from typing import Any from itertools import batched from collections.abc import Iterable from concurrency.utils import fqn from constance import config +from django.core.cache import cache from django.db.models import Model, QuerySet, Prefetch from django.db.models.query import prefetch_related_objects @@ -12,8 +15,61 @@ from country_workspace.models import AsyncJob, Household, Individual, Program from country_workspace.state import state from country_workspace.utils.imports import validate_alien_fields +from country_workspace.notifications.signals import validation_completed_signal logger = logging.getLogger(__name__) +VALIDATION_PROGRESS_TTL_SECONDS = 24 * 60 * 60 + + +def _emit_validation_completed(program_id: int, context: str, valid: int, invalid: int, sender: type[Model]) -> None: + validation_completed_signal.send( + sender=sender, + program_id=program_id, + context=context, + results={"valid": valid, "invalid": invalid}, + ) + + +def _aggregate_validation_result( # noqa: PLR0913 + *, + validation_run_id: str, + total_chunks: int, + program_id: int, + context: str, + valid: int, + invalid: int, + sender: type[Model], +) -> None: + cache_key = f"validation-run:{validation_run_id}" + progress: dict[str, int | str] = cache.get(cache_key, {}) + if not progress: + progress = { + "valid": 0, + "invalid": 0, + "completed_chunks": 0, + "total_chunks": total_chunks, + "program_id": program_id, + "context": context, + } + + progress["valid"] = int(progress.get("valid", 0)) + valid + progress["invalid"] = int(progress.get("invalid", 0)) + invalid + progress["completed_chunks"] = int(progress.get("completed_chunks", 0)) + 1 + cache.set(cache_key, progress, timeout=VALIDATION_PROGRESS_TTL_SECONDS) + + completed_chunks = int(progress["completed_chunks"]) + required_chunks = int(progress.get("total_chunks", total_chunks)) + if completed_chunks < required_chunks: + return + + _emit_validation_completed( + program_id=int(progress["program_id"]), + context=str(progress["context"]), + valid=int(progress["valid"]), + invalid=int(progress["invalid"]), + sender=sender, + ) + cache.delete(cache_key) def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwargs: Any) -> dict[str, int]: @@ -46,6 +102,29 @@ def validate_queryset(queryset: QuerySet[Model], chunk_size: int = 2000, **kwarg dv, di = _validate_and_count(queryset.iterator(chunk_size=chunk_size)) # stream rows from DB valid, invalid = valid + dv, invalid + di + context = kwargs.get("context", "total") + validation_run_id = kwargs.get("validation_run_id") + total_chunks = kwargs.get("validation_total_chunks") + + if validation_run_id and total_chunks: + _aggregate_validation_result( + validation_run_id=validation_run_id, + total_chunks=int(total_chunks), + program_id=first.program.id, + context=context, + valid=valid, + invalid=invalid, + sender=queryset.model, + ) + else: + _emit_validation_completed( + program_id=first.program.id, + context=context, + valid=valid, + invalid=invalid, + sender=queryset.model, + ) + except Exception as e: # pragma: no cover logger.error("Error during queryset validation: %s", e) raise @@ -71,16 +150,35 @@ def _validate_and_count(objs: Iterable[Model]) -> tuple[int, int]: return valid, invalid -def create_validation_jobs(description: str, owner: str, program: Program, queryset: QuerySet) -> AsyncJob: +def create_validation_jobs( + description: str, owner: str, program: Program, queryset: QuerySet, *, context: str = "total" +) -> AsyncJob | None: opts = queryset.model._meta queryset = queryset.order_by("pk").values_list("pk", flat=True) - for chunk in batched(queryset, config.CHUNK_SIZE_FOR_VALIDATION_TASK): + chunk_size = config.CHUNK_SIZE_FOR_VALIDATION_TASK + total_records = queryset.count() + if total_records == 0: + return None + total_chunks = math.ceil(total_records / chunk_size) + validation_run_id = uuid.uuid4().hex + + job: AsyncJob | None = None + for chunk in batched(queryset, chunk_size): job = AsyncJob.objects.create( description=f"{description} (PKs {chunk[0]} - {chunk[-1]})", type=AsyncJob.JobType.ACTION, owner=owner, action=fqn(validate_queryset), program=program, - config={"pks": chunk, "model_name": opts.label}, + config={ + "pks": chunk, + "model_name": opts.label, + "kwargs": { + "context": context, + "validation_run_id": validation_run_id, + "validation_total_chunks": total_chunks, + }, + }, ) job.queue() + return job diff --git a/src/country_workspace/workspaces/admin/hh_ind.py b/src/country_workspace/workspaces/admin/hh_ind.py index 2e03db2a..ccb0f2d8 100644 --- a/src/country_workspace/workspaces/admin/hh_ind.py +++ b/src/country_workspace/workspaces/admin/hh_ind.py @@ -200,6 +200,7 @@ def validate_program(self, request: HttpRequest) -> "HttpResponse": owner=request.user, program=program, queryset=queryset, + context="total", ) self.message_user(request, _("Task scheduled"), messages.SUCCESS) diff --git a/tests/contrib/aurora/test_import_processing.py b/tests/contrib/aurora/test_import_processing.py index 80509fca..fe11cd72 100644 --- a/tests/contrib/aurora/test_import_processing.py +++ b/tests/contrib/aurora/test_import_processing.py @@ -27,6 +27,11 @@ def config() -> Config: } +@pytest.fixture(autouse=True) +def _mock_bitcaster_dispatch(mocker: MockerFixture): + return mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + @pytest.fixture def job(mocker: MockerFixture, config: Config): job = mocker.MagicMock() diff --git a/tests/contrib/hope/push/test_orchestration.py b/tests/contrib/hope/push/test_orchestration.py index 0aaf272c..6d56bd3c 100644 --- a/tests/contrib/hope/push/test_orchestration.py +++ b/tests/contrib/hope/push/test_orchestration.py @@ -943,6 +943,8 @@ def test_push_existing_rdp_core_success( mark_removed = mocker.patch(f"{MOD}._mark_rdp_beneficiaries_removed") set_status = mocker.patch(f"{MOD}.set_rdp_push_status") approve = mocker.patch(f"{MOD}._approve_deduplication_set_after_successful_push") + rdi_pushed = mocker.patch(f"{MOD}.rdi_push_completed_signal.send") + rdp_pushed = mocker.patch(f"{MOD}.rdp_push_status_changed_signal.send") assert push_existing_rdp_core(job) == {"errors": []} @@ -964,6 +966,17 @@ def test_push_existing_rdp_core_success( deduplication_set_id=ds_id, processor=processor, ) + rdi_pushed.assert_called_once_with( + sender=Rdp, + program_id=rdp.program_id, + pushed_count=0, + ) + rdp_pushed.assert_called_once_with( + sender=Rdp, + program_id=rdp.program_id, + rdp_id=rdp.pk, + status=Rdp.PushStatus.SUCCESS, + ) def test_push_existing_rdp_core_failure(mocker: MockerFixture, err_contains) -> None: @@ -988,6 +1001,8 @@ def fail_step() -> None: mocker.patch(f"{MOD}.lock_rdp_for_update", return_value=locked) set_status = mocker.patch(f"{MOD}.set_rdp_push_status") mark_removed = mocker.patch(f"{MOD}._mark_rdp_beneficiaries_removed") + rdi_pushed = mocker.patch(f"{MOD}.rdi_push_completed_signal.send") + rdp_pushed = mocker.patch(f"{MOD}.rdp_push_status_changed_signal.send") with pytest.raises(HopePushError) as exc: push_existing_rdp_core(job) @@ -1000,4 +1015,11 @@ def fail_step() -> None: status=Rdp.PushStatus.FAILURE, hope_rdi_id="N/A", ) + rdi_pushed.assert_not_called() + rdp_pushed.assert_called_once_with( + sender=Rdp, + program_id=rdp.program_id, + rdp_id=rdp.pk, + status=Rdp.PushStatus.FAILURE, + ) next_step.assert_not_called() diff --git a/tests/contrib/kobo/test_kobo_sync.py b/tests/contrib/kobo/test_kobo_sync.py index 52dcbcb2..0ee99180 100644 --- a/tests/contrib/kobo/test_kobo_sync.py +++ b/tests/contrib/kobo/test_kobo_sync.py @@ -66,6 +66,11 @@ def config() -> Config: } +@pytest.fixture(autouse=True) +def _mock_bitcaster_dispatch(mocker: MockerFixture): + return mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + @pytest.fixture def submission(mocker: MockerFixture) -> Callable[[int], object]: def create(pk: int) -> object: diff --git a/tests/datasources/rdi/test_processors.py b/tests/datasources/rdi/test_processors.py index 9b761229..216efb0d 100644 --- a/tests/datasources/rdi/test_processors.py +++ b/tests/datasources/rdi/test_processors.py @@ -41,6 +41,11 @@ FULL_NAME_COLUMN = "full_name" +@pytest.fixture(autouse=True) +def _mock_bitcaster_dispatch(mocker: MockerFixture): + return mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + @pytest.fixture def skip_if_not_master_detail(config: Config) -> None: if not config["master_detail"]: diff --git a/tests/datasources/rdi/test_processors_transformers.py b/tests/datasources/rdi/test_processors_transformers.py index d9d74fac..925fd9d6 100644 --- a/tests/datasources/rdi/test_processors_transformers.py +++ b/tests/datasources/rdi/test_processors_transformers.py @@ -1,8 +1,14 @@ +import pytest from pytest_mock import MockerFixture from country_workspace.datasources.rdi import processors +@pytest.fixture(autouse=True) +def _mock_bitcaster_dispatch(mocker: MockerFixture): + return mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + def _base_config() -> dict: return { "batch_name": "batch", diff --git a/tests/notifications/test_bitcaster_notifications.py b/tests/notifications/test_bitcaster_notifications.py new file mode 100644 index 00000000..efb8cd86 --- /dev/null +++ b/tests/notifications/test_bitcaster_notifications.py @@ -0,0 +1,115 @@ +from unittest.mock import Mock + +import pytest + +from country_workspace.notifications.bitcaster_client import BitcasterClient +from country_workspace.notifications.notifier import send_notification_event +from country_workspace.notifications.tasks import NotifyError, send_bitcaster_event_task + + +def _configure_bitcaster_settings(settings) -> None: + settings.BITCASTER_ENABLED = True + settings.BITCASTER_API_URL = "https://bitcaster.example" + settings.BITCASTER_API_KEY = "secret-key" + settings.BITCASTER_ORGANIZATION_SLUG = "org" + settings.BITCASTER_PROJECT_SLUG = "project" + settings.BITCASTER_APPLICATION_SLUG = "workspace" + + +def test_trigger_event_uses_bitcaster_trigger_contract(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + sdk_client = mocker.patch("country_workspace.notifications.bitcaster_client.SDKClient") + + client = BitcasterClient() + result = client.trigger_event("data_imported", {"program_id": 123}) + + assert result is True + sdk_client.assert_called_once_with(bae="https://secret-key@bitcaster.example/api/o/org/") + sdk_client.return_value.trigger.assert_called_once_with( + project="project", + application="workspace", + event="data_imported", + context={"program_id": 123}, + ) + + +def test_trigger_event_propagates_sdk_errors(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + sdk_client = mocker.patch("country_workspace.notifications.bitcaster_client.SDKClient") + sdk_client.return_value.trigger.side_effect = RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + BitcasterClient().trigger_event("rdi_push_completed", {"program_id": 12}) + + +def test_trigger_event_returns_false_when_client_not_configured(settings) -> None: + settings.BITCASTER_API_URL = "" + settings.BITCASTER_API_KEY = "" + settings.BITCASTER_ORGANIZATION_SLUG = "" + settings.BITCASTER_PROJECT_SLUG = "" + settings.BITCASTER_APPLICATION_SLUG = "" + + assert BitcasterClient().trigger_event("data_imported", {"program_id": 12}) is False + + +def test_send_notification_event_delegates_to_backend(mocker) -> None: + backend = Mock() + backend.trigger_event.return_value = True + get_backend = mocker.patch( + "country_workspace.notifications.notifier.get_notification_backend", + return_value=backend, + ) + + result = send_notification_event("rdi_push_completed", {"program_id": 7}) + + assert result is True + get_backend.assert_called_once_with() + backend.trigger_event.assert_called_once_with("rdi_push_completed", {"program_id": 7}) + + +def test_send_bitcaster_event_task_logs_error_on_exception(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + backend = Mock(is_configured=True) + logger_error = mocker.patch("country_workspace.notifications.tasks.logger.error") + mocker.patch("country_workspace.notifications.tasks.get_notification_backend", return_value=backend) + mocker.patch( + "country_workspace.notifications.tasks.send_notification_event", side_effect=NotifyError("bad request") + ) + + send_bitcaster_event_task.run("data_imported", {"program_id": 12}) + logger_error.assert_called_once_with("Bitcaster send failed for event '%s': %s", "data_imported", "bad request") + + +def test_send_bitcaster_event_task_logs_warning_when_backend_returns_false(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + backend = Mock(is_configured=True) + warning = mocker.patch("country_workspace.notifications.tasks.logger.warning") + mocker.patch("country_workspace.notifications.tasks.get_notification_backend", return_value=backend) + mocker.patch("country_workspace.notifications.tasks.send_notification_event", return_value=False) + + send_bitcaster_event_task.run("data_imported", {"program_id": 12}) + + warning.assert_any_call("Bitcaster client returned false for event '%s'", "data_imported") + + +def test_send_bitcaster_event_task_skips_when_backend_not_configured(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + backend = Mock(is_configured=False) + send_event = mocker.patch("country_workspace.notifications.tasks.send_notification_event") + mocker.patch("country_workspace.notifications.tasks.get_notification_backend", return_value=backend) + + send_bitcaster_event_task.run("data_imported", {"program_id": 12}) + + send_event.assert_not_called() + + +def test_send_bitcaster_event_task_skips_when_bitcaster_disabled(settings, mocker) -> None: + _configure_bitcaster_settings(settings) + settings.BITCASTER_ENABLED = False + logger_info = mocker.patch("country_workspace.notifications.tasks.logger.info") + get_backend = mocker.patch("country_workspace.notifications.tasks.get_notification_backend") + + send_bitcaster_event_task.run("data_imported", {"program_id": 12}) + + logger_info.assert_called_once_with("Skipping Bitcaster task: integration disabled (event='%s').", "data_imported") + get_backend.assert_not_called() diff --git a/tests/notifications/test_handlers.py b/tests/notifications/test_handlers.py new file mode 100644 index 00000000..10413407 --- /dev/null +++ b/tests/notifications/test_handlers.py @@ -0,0 +1,86 @@ +from country_workspace.notifications.handlers import ( + handle_data_imported, + handle_rdi_pushed, + handle_rdp_pushed, + handle_validation_completed, +) + + +def test_handle_data_imported_enqueues_expected_event_and_payload(mocker) -> None: + delay = mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + handle_data_imported( + sender=object(), + program_id=10, + batch_id=20, + record_count=30, + source="KOBO", + ) + + delay.assert_called_once_with( + "data_imported", + { + "program_id": 10, + "batch_id": 20, + "record_count": 30, + "source": "KOBO", + }, + ) + + +def test_handle_validation_completed_enqueues_expected_event_and_payload(mocker) -> None: + delay = mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + handle_validation_completed( + sender=object(), + program_id=10, + context="rdi", + results={"valid": 3, "invalid": 1}, + ) + + delay.assert_called_once_with( + "validation_completed", + { + "program_id": 10, + "context": "rdi", + "results": {"valid": 3, "invalid": 1}, + }, + ) + + +def test_handle_rdi_pushed_enqueues_expected_event_and_payload(mocker) -> None: + delay = mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + handle_rdi_pushed( + sender=object(), + program_id=44, + pushed_count=77, + ) + + delay.assert_called_once_with( + "rdi_push_completed", + { + "program_id": 44, + "pushed_count": 77, + }, + ) + + +def test_handle_rdp_pushed_enqueues_expected_event_and_payload(mocker) -> None: + delay = mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + handle_rdp_pushed( + sender=object(), + program_id=55, + rdp_id=66, + status="FAILURE", + ) + + delay.assert_called_once_with( + "rdp_push_status_changed", + { + "program_id": 55, + "rdp_id": 66, + "status": "FAILURE", + }, + ) diff --git a/tests/workspace/actions/test_ws_validate.py b/tests/workspace/actions/test_ws_validate.py index 98772713..794b2bca 100644 --- a/tests/workspace/actions/test_ws_validate.py +++ b/tests/workspace/actions/test_ws_validate.py @@ -3,12 +3,14 @@ import freezegun import pytest +from django.core.cache import cache from django.urls import reverse from testutils.utils import select_office from country_workspace.models import Household, Individual +from country_workspace.notifications.signals import validation_completed_signal from country_workspace.state import state -from country_workspace.workspaces.admin.cleaners.validate import validate_queryset +from country_workspace.workspaces.admin.cleaners.validate import create_validation_jobs, validate_queryset if TYPE_CHECKING: from django_webtest import DjangoTestApp @@ -61,6 +63,11 @@ def app(django_app_factory: "MixinWithInstanceVariables") -> "DjangoTestApp": return django_app +@pytest.fixture(autouse=True) +def _mock_bitcaster_dispatch(mocker): + return mocker.patch("country_workspace.notifications.handlers.send_bitcaster_event_task.delay") + + def test_ws_validate( app: "DjangoTestApp", force_migrated_records, settings: "SettingsWrapper", household: "CountryHousehold" ) -> None: @@ -120,3 +127,93 @@ def test_validate_queryset_individuals(program, force_migrated_records): result = validate_queryset(qs) assert result["valid"] + result["invalid"] == 2 + + +@pytest.mark.django_db +def test_validate_queryset_emits_single_notification_per_validation_run(program, force_migrated_records, mocker): + from testutils.factories import IndividualFactory + + ind1 = IndividualFactory( + household=None, + batch__program=program, + batch__country_office=program.country_office, + ) + ind2 = IndividualFactory( + household=None, + batch__program=program, + batch__country_office=program.country_office, + ) + + cache.delete("validation-run:run-id") + send_mock = mocker.patch.object(validation_completed_signal, "send") + mocker.patch( + "country_workspace.workspaces.admin.cleaners.validate._validate_and_count", + return_value=(1, 0), + ) + + validate_queryset( + Individual.objects.filter(pk=ind1.pk), + context="rdi", + validation_run_id="run-id", + validation_total_chunks=2, + ) + send_mock.assert_not_called() + + validate_queryset( + Individual.objects.filter(pk=ind2.pk), + context="rdi", + validation_run_id="run-id", + validation_total_chunks=2, + ) + send_mock.assert_called_once_with( + sender=Individual, + program_id=program.id, + context="rdi", + results={"valid": 2, "invalid": 0}, + ) + + +@pytest.mark.django_db +def test_create_validation_jobs_sets_context_and_validation_metadata(program, force_migrated_records, mocker): + from testutils.factories import IndividualFactory + + individual = IndividualFactory( + household=None, + batch__program=program, + batch__country_office=program.country_office, + ) + job_mock = mocker.Mock() + create_mock = mocker.patch( + "country_workspace.workspaces.admin.cleaners.validate.AsyncJob.objects.create", + return_value=job_mock, + ) + + result = create_validation_jobs( + description="Validate records", + owner=mocker.Mock(), + program=program, + queryset=Individual.objects.filter(pk=individual.pk), + context="total", + ) + + assert result is job_mock + assert create_mock.call_args.kwargs["config"]["kwargs"]["context"] == "total" + assert create_mock.call_args.kwargs["config"]["kwargs"]["validation_total_chunks"] == 1 + assert create_mock.call_args.kwargs["config"]["kwargs"]["validation_run_id"] + job_mock.queue.assert_called_once() + + +@pytest.mark.django_db +def test_create_validation_jobs_returns_none_for_empty_queryset(program, force_migrated_records, mocker): + create_mock = mocker.patch("country_workspace.workspaces.admin.cleaners.validate.AsyncJob.objects.create") + + result = create_validation_jobs( + description="Validate records", + owner=mocker.Mock(), + program=program, + queryset=Individual.objects.none(), + context="total", + ) + + assert result is None + create_mock.assert_not_called()