From 3ce2daad60bb6ee2b0f245de2ce705ea7c5de2de Mon Sep 17 00:00:00 2001 From: Javiershenbc Date: Mon, 1 Jun 2026 15:28:45 +0200 Subject: [PATCH 1/2] feat: add approval status Once we success on the upload checks, we update this approval status "PENDING" value on the upload. --- dagster/models/file_upload.py | 1 + dagster/src/internal/common_assets/staging.py | 40 +++++++++ docs/dagster-approval-status-pending.md | 86 +++++++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 docs/dagster-approval-status-pending.md diff --git a/dagster/models/file_upload.py b/dagster/models/file_upload.py index 30dc1f6f8..bff516da9 100644 --- a/dagster/models/file_upload.py +++ b/dagster/models/file_upload.py @@ -34,6 +34,7 @@ class FileUpload(BaseModel): metadata_json_path: Mapped[str] = mapped_column(nullable=True) bronze_path: Mapped[str] = mapped_column(nullable=True, default=None) is_processed_in_staging: Mapped[bool] = mapped_column(nullable=False, default=False) + approval_status: Mapped[str] = mapped_column(nullable=True, default=None) country: Mapped[str] = mapped_column(VARCHAR(3), nullable=False) dataset: Mapped[str] = mapped_column(nullable=False) source: Mapped[str] = mapped_column(nullable=True) diff --git a/dagster/src/internal/common_assets/staging.py b/dagster/src/internal/common_assets/staging.py index 49c9dc9fa..9726068d7 100644 --- a/dagster/src/internal/common_assets/staging.py +++ b/dagster/src/internal/common_assets/staging.py @@ -107,6 +107,7 @@ def __call__(self, upstream_df: sql.DataFrame | list[str]) -> sql.DataFrame | No self._write_pending_records(pending) self._update_approval_request_status() + self._stamp_file_upload_staging_complete() self._emit_lineage() return pending @@ -383,6 +384,45 @@ def _update_approval_request_status(self) -> None: f"{self.country_code} - {formatted_dataset}: {e}" ) + def _stamp_file_upload_staging_complete(self) -> None: + """Set is_processed_in_staging=True and approval_status='PENDING' when enabled.""" + upload_id = self.config.filename_components.id + formatted_dataset = f"School {self.config.dataset_type.capitalize()}" + try: + with get_db_context() as db: + with db.begin(): + file_upload = db.scalar( + select(FileUpload).where(FileUpload.id == upload_id) + ) + if file_upload is None: + self.context.log.warning( + f"FileUpload with id `{upload_id}` not found; " + "cannot stamp is_processed_in_staging." + ) + return + + approval_request = db.scalar( + select(ApprovalRequest).where( + (ApprovalRequest.country == self.country_code) + & (ApprovalRequest.dataset == formatted_dataset) + & (ApprovalRequest.enabled == True) # noqa: E712 + ) + ) + + file_upload.is_processed_in_staging = True + if approval_request: + file_upload.approval_status = "PENDING" + + self.context.log.info( + f"Stamped FileUpload {upload_id}: " + f"is_processed_in_staging=True, " + f"approval_status={'PENDING' if approval_request else 'null'}" + ) + except Exception as e: + self.context.log.error( + f"Failed to stamp FileUpload {upload_id} after staging: {e}" + ) + def _get_current_approval_request(self, db, formatted_dataset: str): return db.scalar( select(ApprovalRequest).where( diff --git a/docs/dagster-approval-status-pending.md b/docs/dagster-approval-status-pending.md new file mode 100644 index 000000000..53b777510 --- /dev/null +++ b/docs/dagster-approval-status-pending.md @@ -0,0 +1,86 @@ +# Dagster change: set `approval_status = "PENDING"` on file upload when staging completes + +## Context + +The `file_uploads` table in the primary Postgres DB now has a new nullable column: + +``` +approval_status: String (nullable) +``` + +Possible values: `"PENDING"`, `"APPROVED"`, `"REJECTED"`, `null`. + +- `null` — approval is not required for this upload (country/dataset combo has `ApprovalRequest.enabled = False`, or the upload type doesn't go through approval). +- `"PENDING"` — rows have been pushed to staging and are waiting for a reviewer to approve or reject them. **This is the value Dagster must set.** +- `"APPROVED"` / `"REJECTED"` — set by the API endpoint `POST /api/approval-requests/{country_code}/{upload_id}/submit` when the reviewer submits their decision. + +## What the API already does + +In `api/data_ingestion/routers/approval_requests.py`, the `submit_upload_review` endpoint now sets: + +```python +if approved_change_ids: + file_upload.approval_status = "APPROVED" +elif rejected_change_ids: + file_upload.approval_status = "REJECTED" +``` + +This covers the transition from `PENDING` → `APPROVED` / `REJECTED`. + +## What Dagster needs to do + +When Dagster finishes pushing rows for an upload into the staging Delta Lake table (the step that currently sets `is_processed_in_staging = True` on the `file_uploads` record), it must also set `approval_status = "PENDING"` **if and only if** the `ApprovalRequest` for that country/dataset has `enabled = True`. + +### Where to look in the Dagster repo + +The relevant op/asset is wherever `is_processed_in_staging` is set to `True` on the `FileUpload` model. Search for: + +``` +is_processed_in_staging +``` + +That is the exact place to add the `approval_status` update. + +### Logic to add + +```python +from sqlalchemy import select +from data_ingestion.models.approval_requests import ApprovalRequest +from data_ingestion.models.file_upload import FileUpload + +# After rows are successfully written to the staging Delta table: +approval_request = db.scalar( + select(ApprovalRequest).where( + ApprovalRequest.country == country_code.upper(), + ApprovalRequest.dataset == formatted_dataset, # e.g. "School Geolocation" + ApprovalRequest.enabled == True, + ) +) + +file_upload.is_processed_in_staging = True +if approval_request: + file_upload.approval_status = "PENDING" + +db.commit() +``` + +### Important details + +- `formatted_dataset` follows the pattern `"School {dataset.capitalize()}"` (e.g. `"School Geolocation"`, `"School Coverage"`). This matches how `ApprovalRequest.dataset` is stored — see `approval_requests.py:448`. +- If `ApprovalRequest` does not exist or `enabled = False`, leave `approval_status` as `null`. Do not set it to `"PENDING"`. +- This must run inside the same DB transaction (or commit right after) as setting `is_processed_in_staging = True`, so both fields are always consistent. + +## DB model reference + +```python +# api/data_ingestion/models/file_upload.py +class FileUpload(BaseModel): + ... + is_processed_in_staging: Mapped[bool] = mapped_column(nullable=False, default=False) + approval_status: Mapped[str] = mapped_column(nullable=True, default=None) + ... +``` + +## Migration + +The column is added in migration `c4e5f6a7b8c9` (`2026_04_25_1000_c4e5f6a7b8c9_add_mode_to_file_uploads.py`). Make sure the migration has been applied to the target environment before deploying the Dagster change. From 52638bfdb572490a07083cc7814e525e3936eca1 Mon Sep 17 00:00:00 2001 From: Javiershenbc Date: Tue, 2 Jun 2026 13:39:58 +0200 Subject: [PATCH 2/2] fix: remove extra doc file Remove extra docs --- docs/dagster-approval-status-pending.md | 86 ------------------------- 1 file changed, 86 deletions(-) delete mode 100644 docs/dagster-approval-status-pending.md diff --git a/docs/dagster-approval-status-pending.md b/docs/dagster-approval-status-pending.md deleted file mode 100644 index 53b777510..000000000 --- a/docs/dagster-approval-status-pending.md +++ /dev/null @@ -1,86 +0,0 @@ -# Dagster change: set `approval_status = "PENDING"` on file upload when staging completes - -## Context - -The `file_uploads` table in the primary Postgres DB now has a new nullable column: - -``` -approval_status: String (nullable) -``` - -Possible values: `"PENDING"`, `"APPROVED"`, `"REJECTED"`, `null`. - -- `null` — approval is not required for this upload (country/dataset combo has `ApprovalRequest.enabled = False`, or the upload type doesn't go through approval). -- `"PENDING"` — rows have been pushed to staging and are waiting for a reviewer to approve or reject them. **This is the value Dagster must set.** -- `"APPROVED"` / `"REJECTED"` — set by the API endpoint `POST /api/approval-requests/{country_code}/{upload_id}/submit` when the reviewer submits their decision. - -## What the API already does - -In `api/data_ingestion/routers/approval_requests.py`, the `submit_upload_review` endpoint now sets: - -```python -if approved_change_ids: - file_upload.approval_status = "APPROVED" -elif rejected_change_ids: - file_upload.approval_status = "REJECTED" -``` - -This covers the transition from `PENDING` → `APPROVED` / `REJECTED`. - -## What Dagster needs to do - -When Dagster finishes pushing rows for an upload into the staging Delta Lake table (the step that currently sets `is_processed_in_staging = True` on the `file_uploads` record), it must also set `approval_status = "PENDING"` **if and only if** the `ApprovalRequest` for that country/dataset has `enabled = True`. - -### Where to look in the Dagster repo - -The relevant op/asset is wherever `is_processed_in_staging` is set to `True` on the `FileUpload` model. Search for: - -``` -is_processed_in_staging -``` - -That is the exact place to add the `approval_status` update. - -### Logic to add - -```python -from sqlalchemy import select -from data_ingestion.models.approval_requests import ApprovalRequest -from data_ingestion.models.file_upload import FileUpload - -# After rows are successfully written to the staging Delta table: -approval_request = db.scalar( - select(ApprovalRequest).where( - ApprovalRequest.country == country_code.upper(), - ApprovalRequest.dataset == formatted_dataset, # e.g. "School Geolocation" - ApprovalRequest.enabled == True, - ) -) - -file_upload.is_processed_in_staging = True -if approval_request: - file_upload.approval_status = "PENDING" - -db.commit() -``` - -### Important details - -- `formatted_dataset` follows the pattern `"School {dataset.capitalize()}"` (e.g. `"School Geolocation"`, `"School Coverage"`). This matches how `ApprovalRequest.dataset` is stored — see `approval_requests.py:448`. -- If `ApprovalRequest` does not exist or `enabled = False`, leave `approval_status` as `null`. Do not set it to `"PENDING"`. -- This must run inside the same DB transaction (or commit right after) as setting `is_processed_in_staging = True`, so both fields are always consistent. - -## DB model reference - -```python -# api/data_ingestion/models/file_upload.py -class FileUpload(BaseModel): - ... - is_processed_in_staging: Mapped[bool] = mapped_column(nullable=False, default=False) - approval_status: Mapped[str] = mapped_column(nullable=True, default=None) - ... -``` - -## Migration - -The column is added in migration `c4e5f6a7b8c9` (`2026_04_25_1000_c4e5f6a7b8c9_add_mode_to_file_uploads.py`). Make sure the migration has been applied to the target environment before deploying the Dagster change.