Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/country_workspace/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ class Group(Enum):
"IGNORE_UNSUPPORTED_FEATURES": (bool, False, True),
"REQUESTS_TIMEOUT": (int, 30, 30),
"DEBUG_API_REQUESTS": (bool, False, False),
"BITCASTER_API_URL": (str, "", "http://localhost:8002", False, "Bitcaster API URL"),
"BITCASTER_API_KEY": (str, "", "", False, "Bitcaster API key"),
"BITCASTER_ORGANIZATION_SLUG": (str, "unicef", "unicef", False, "Bitcaster organization slug"),
"BITCASTER_PROJECT_SLUG": (str, "hope", "hope", False, "Bitcaster project slug"),
"BITCASTER_APPLICATION_SLUG": (str, "", "country-workspace", False, "Bitcaster application slug"),
}

env = SmartEnv(**CONFIG)
9 changes: 9 additions & 0 deletions src/country_workspace/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"country_workspace.workspaces.apps.Config",
"country_workspace.versioning",
"country_workspace.cache",
"country_workspace.notifications",
# these should be optional in the future
"country_workspace.contrib.hope.apps.Config",
"country_workspace.contrib.aurora.apps.Config",
Expand Down Expand Up @@ -149,6 +150,14 @@
},
},
}

# Bitcaster Configuration
BITCASTER_API_URL = env("BITCASTER_API_URL")
BITCASTER_API_KEY = env("BITCASTER_API_KEY")
BITCASTER_ORGANIZATION_SLUG = env("BITCASTER_ORGANIZATION_SLUG")
BITCASTER_PROJECT_SLUG = env("BITCASTER_PROJECT_SLUG")
BITCASTER_APPLICATION_SLUG = env("BITCASTER_APPLICATION_SLUG")

SELECT2_CACHE_BACKEND = "select2"

X_FRAME_OPTIONS = "SAMEORIGIN"
Expand Down
9 changes: 9 additions & 0 deletions src/country_workspace/contrib/aurora/import_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from country_workspace.utils.imports import get_aurora_originating_id
from country_workspace.utils.import_flow import build_import_processor, run_batch_postprocessing
from country_workspace.utils.sync_log import get_aurora_sync_log_name
from country_workspace.notifications.signals import data_imported_signal

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,6 +79,14 @@ def import_data(job: AsyncJob) -> ImportResult:

batch.status = Batch.BatchStatus.COMPLETE
batch.save(update_fields=["status"])

data_imported_signal.send(
sender=Batch,
program_id=batch.program_id,
batch_id=batch.id,
record_count=total_people + total_households,
source=Batch.BatchSource.AURORA,
)
return ImportResult(people=total_people, households=total_households)


Expand Down
27 changes: 27 additions & 0 deletions src/country_workspace/contrib/hope/push/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from country_workspace.contrib.hope.exceptions import HopePushError
from country_workspace.exceptions import RemoteError, RemoteUnavailableError
from country_workspace.models import AsyncJob, Rdp
from country_workspace.notifications.signals import rdi_pushed_signal, rdp_pushed_signal

from .config import CreateRdpConfig, PushWorkflowConfig
from .policy import ActionCheck, get_rdp_policy
Expand Down Expand Up @@ -314,6 +315,12 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
status=Rdp.PushStatus.FAILURE,
hope_rdi_id=hope_processor.hope_rdi_id or "N/A",
)
rdp_pushed_signal.send(
sender=Rdp,
program_id=rdp.program_id,
rdp_id=rdp.pk,
status=Rdp.PushStatus.FAILURE,
)
raise HopePushError(hope_processor.total)

with transaction.atomic():
Expand All @@ -333,6 +340,26 @@ def push_existing_rdp_core(job: AsyncJob) -> dict[str, Any]:
processor=hope_processor,
)

pushed_count = (
hope_processor.total.get("households", 0)
+ hope_processor.total.get("individuals", 0)
+ hope_processor.total.get("people", 0)
)

rdi_pushed_signal.send(
sender=Rdp,
program_id=rdp.program_id,
target="HOPE",
pushed_count=pushed_count,
)

rdp_pushed_signal.send(
sender=Rdp,
program_id=rdp.program_id,
rdp_id=rdp.pk,
status=Rdp.PushStatus.SUCCESS,
)

return hope_processor.total


Expand Down
10 changes: 10 additions & 0 deletions src/country_workspace/contrib/kobo/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from country_workspace.utils.sync_log import get_kobo_sync_log_name
from country_workspace.workspaces.admin.cleaners.validate import create_validation_jobs
from country_workspace.models.jobs import GracefulJobCancellationError
from country_workspace.notifications.signals import data_imported_signal

if TYPE_CHECKING:
from hope_flex_fields.models import DataChecker
Expand Down Expand Up @@ -461,11 +462,20 @@ def import_data(job: AsyncJob) -> ImportResult:
owner=job.owner,
program=job.program,
queryset=batch.household_set.filter(removed=False).prefetch_related("members"), # type: ignore[attr-defined]
context="rdi",
)
job.ensure_not_cancelled(refresh=True)
# Mark batch complete in a dedicated transaction.
with transaction.atomic():
Batch.objects.select_for_update().filter(pk=batch.pk).update(status=Batch.BatchStatus.COMPLETE)

data_imported_signal.send(
sender=Batch,
program_id=batch.program_id,
batch_id=batch.id,
record_count=household_counter + individual_counter,
source=Batch.BatchSource.KOBO,
)
else:
job.ensure_not_cancelled(refresh=True)
AsyncJob.objects.create(
Expand Down
16 changes: 16 additions & 0 deletions src/country_workspace/datasources/rdi/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from country_workspace.utils.functional import compose
from country_workspace.utils.import_flow import build_import_processor, run_batch_postprocessing
from country_workspace.workspaces.admin.cleaners.validate import create_validation_jobs
from country_workspace.notifications.signals import data_imported_signal

from .config import Config, SheetName, Sheet
from .exceptions import ColumnConfigurationError, SheetProcessingError, SheetNotFoundError
Expand Down Expand Up @@ -220,6 +221,13 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]:
if not config.get("validate_after_import"):
batch.status = Batch.BatchStatus.COMPLETE
batch.save(update_fields=["status"])
data_imported_signal.send(
sender=Batch,
program_id=batch.program_id,
batch_id=batch.id,
record_count=sum(result.values()),
source=Batch.BatchSource.RDI,
)
return result

job.ensure_not_cancelled(refresh=True)
Expand All @@ -233,9 +241,17 @@ def import_from_rdi(job: AsyncJob) -> dict[str, int]:
owner=job.owner,
program=job.program,
queryset=queryset,
context="rdi",
)
batch.status = Batch.BatchStatus.COMPLETE
batch.save(update_fields=["status"])
data_imported_signal.send(
sender=Batch,
program_id=batch.program_id,
batch_id=batch.id,
record_count=sum(result.values()),
source=Batch.BatchSource.RDI,
)
return result


Expand Down
4 changes: 4 additions & 0 deletions src/country_workspace/notifications/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Expose the client for easier imports
from .bitcaster_client import BitcasterClient

__all__ = ["BitcasterClient"]
10 changes: 10 additions & 0 deletions src/country_workspace/notifications/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.apps import AppConfig


class NotificationsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "country_workspace.notifications"

def ready(self) -> None:
# Import handlers to ensure signals are registered when Django starts
import country_workspace.notifications.handlers # noqa: F401
79 changes: 79 additions & 0 deletions src/country_workspace/notifications/bitcaster_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
from typing import Any

import requests
from django.conf import settings

logger = logging.getLogger(__name__)


class RetryableBitcasterError(Exception):
"""Raised when an event delivery can be retried safely."""


class BitcasterClient:
"""A generic client for interacting with the Bitcaster REST API."""

def __init__(
self,
api_url: str | None = None,
api_key: str | None = None,
organization_slug: str | None = None,
project_slug: str | None = None,
application_slug: str | None = None,
) -> None:
# Allow overriding settings, but default to Django settings
self.api_url = api_url or settings.BITCASTER_API_URL
self.api_key = api_key or settings.BITCASTER_API_KEY
self.organization_slug = organization_slug or settings.BITCASTER_ORGANIZATION_SLUG
self.project_slug = project_slug or settings.BITCASTER_PROJECT_SLUG
self.application_slug = application_slug or settings.BITCASTER_APPLICATION_SLUG

@property
def is_configured(self) -> bool:
return bool(
self.api_url and self.api_key and self.organization_slug and self.project_slug and self.application_slug
)

def trigger_event(self, event_name: str, payload: dict[str, Any]) -> bool:
"""
Trigger an event in Bitcaster.

Args:
event_name: The name of the event/signal in Bitcaster.
payload: A dictionary of context data to send to Bitcaster.

Returns:
bool: True if the request was successful, False otherwise.

"""
if not self.is_configured:
logger.warning("Bitcaster client is not fully configured. Skipping event '%s'.", event_name)
return False

endpoint = (
f"{self.api_url}/api/o/{self.organization_slug}/p/{self.project_slug}/"
f"a/{self.application_slug}/e/{event_name}/trigger/"
)

headers = {
"Authorization": self.api_key,
"Content-Type": "application/json",
}

try:
data = {"context": payload}

response = requests.post(endpoint, json=data, headers=headers, timeout=10)
if response.status_code >= 500:
raise RetryableBitcasterError(
f"Bitcaster server error while triggering event '{event_name}': {response.status_code}"
)
response.raise_for_status()
logger.info("Successfully triggered Bitcaster event: %s", event_name)
except requests.exceptions.HTTPError:
raise
except requests.exceptions.RequestException as e:
raise RetryableBitcasterError(f"Network error while triggering '{event_name}'") from e

return True
56 changes: 56 additions & 0 deletions src/country_workspace/notifications/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from typing import Any
from django.dispatch import receiver

from country_workspace.notifications.signals import (
data_imported_signal,
validation_completed_signal,
rdi_pushed_signal,
rdp_pushed_signal,
)
from country_workspace.notifications.tasks import send_bitcaster_event_task

logger = logging.getLogger(__name__)


@receiver(data_imported_signal)
def handle_data_imported(sender: Any, **kwargs: Any) -> None:
# Build a standard payload dictionary using kwargs
payload = {
"program_id": kwargs.get("program_id"),
"batch_id": kwargs.get("batch_id"),
"record_count": kwargs.get("record_count"),
"source": kwargs.get("source"),
}
# Queue the Celery task
send_bitcaster_event_task.delay("data_imported", payload)


@receiver(validation_completed_signal)
def handle_validation_completed(sender: Any, **kwargs: Any) -> None:
payload = {
"program_id": kwargs.get("program_id"),
"context": kwargs.get("context"),
"results": kwargs.get("results", {}),
}
send_bitcaster_event_task.delay("validation_completed", payload)


@receiver(rdi_pushed_signal)
def handle_rdi_pushed(sender: Any, **kwargs: Any) -> None:
payload = {
"program_id": kwargs.get("program_id"),
"target": kwargs.get("target"),
"pushed_count": kwargs.get("pushed_count"),
}
send_bitcaster_event_task.delay("rdi_pushed", payload)


@receiver(rdp_pushed_signal)
def handle_rdp_pushed(sender: Any, **kwargs: Any) -> None:
payload = {
"program_id": kwargs.get("program_id"),
"rdp_id": kwargs.get("rdp_id"),
"status": kwargs.get("status"),
}
send_bitcaster_event_task.delay("rdp_pushed", payload)
16 changes: 16 additions & 0 deletions src/country_workspace/notifications/notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any, Protocol

from country_workspace.notifications.bitcaster_client import BitcasterClient


class NotificationBackend(Protocol):
def trigger_event(self, event_name: str, payload: dict[str, Any]) -> bool: ...


def get_notification_backend() -> NotificationBackend:
return BitcasterClient()


def send_notification_event(event_name: str, payload: dict[str, Any]) -> bool:
backend = get_notification_backend()
return backend.trigger_event(event_name, payload)
18 changes: 18 additions & 0 deletions src/country_workspace/notifications/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import django.dispatch

# Bitcaster Integration Signals
# Triggered when a batch of data has been successfully imported.
# Expected kwargs: program_id (int), batch_id (int), record_count (int), source (str)
data_imported_signal = django.dispatch.Signal()

# Triggered when validation processing is finished (either full database or RDI validation)
# Expected kwargs: program_id (int), context (str - e.g. "total", "rdi"), results (dict)
validation_completed_signal = django.dispatch.Signal()

# Triggered when an RDI push cycle successfully sends records to a target.
# Expected kwargs: program_id (int), target (str), pushed_count (int)
rdi_pushed_signal = django.dispatch.Signal()

# Triggered when an RDP record status transitions to SUCCESS.
# Expected kwargs: program_id (int), rdp_id (int), status (str)
rdp_pushed_signal = django.dispatch.Signal()
35 changes: 35 additions & 0 deletions src/country_workspace/notifications/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from typing import Any

import requests

from country_workspace.config.celery import app
from country_workspace.notifications.bitcaster_client import RetryableBitcasterError
from country_workspace.notifications.notifier import get_notification_backend, send_notification_event

logger = logging.getLogger(__name__)


@app.task(bind=True, max_retries=3)
def send_bitcaster_event_task(self: Any, event_name: str, payload: dict[str, Any]) -> None:
"""Celery task to asynchronously send an event to Bitcaster."""
backend = get_notification_backend()
if not getattr(backend, "is_configured", False):
logger.warning("Skipping Bitcaster task: client not configured.")
return

try:
success = send_notification_event(event_name, payload)
if not success:
logger.warning("Bitcaster client returned false for event '%s'", event_name)
except RetryableBitcasterError as exc:
logger.warning("Retryable Bitcaster error for event '%s': %s", event_name, str(exc))
self.retry(exc=exc, countdown=2**self.request.retries * 5)
except requests.exceptions.HTTPError as exc:
status_code = getattr(getattr(exc, "response", None), "status_code", None)
logger.error(
"Non-retryable Bitcaster HTTP error for event '%s' (status=%s): %s",
event_name,
status_code,
str(exc),
)
Loading