diff --git a/dagster/models/file_upload.py b/dagster/models/file_upload.py index 30dc1f6f..bff516da 100644 --- a/dagster/models/file_upload.py +++ b/dagster/models/file_upload.py @@ -34,6 +34,7 @@ class FileUpload(BaseModel): metadata_json_path: Mapped[str] = mapped_column(nullable=True) bronze_path: Mapped[str] = mapped_column(nullable=True, default=None) is_processed_in_staging: Mapped[bool] = mapped_column(nullable=False, default=False) + approval_status: Mapped[str] = mapped_column(nullable=True, default=None) country: Mapped[str] = mapped_column(VARCHAR(3), nullable=False) dataset: Mapped[str] = mapped_column(nullable=False) source: Mapped[str] = mapped_column(nullable=True) diff --git a/dagster/src/internal/common_assets/staging.py b/dagster/src/internal/common_assets/staging.py index 49c9dc9f..9726068d 100644 --- a/dagster/src/internal/common_assets/staging.py +++ b/dagster/src/internal/common_assets/staging.py @@ -107,6 +107,7 @@ def __call__(self, upstream_df: sql.DataFrame | list[str]) -> sql.DataFrame | No self._write_pending_records(pending) self._update_approval_request_status() + self._stamp_file_upload_staging_complete() self._emit_lineage() return pending @@ -383,6 +384,45 @@ def _update_approval_request_status(self) -> None: f"{self.country_code} - {formatted_dataset}: {e}" ) + def _stamp_file_upload_staging_complete(self) -> None: + """Set is_processed_in_staging=True and approval_status='PENDING' when enabled.""" + upload_id = self.config.filename_components.id + formatted_dataset = f"School {self.config.dataset_type.capitalize()}" + try: + with get_db_context() as db: + with db.begin(): + file_upload = db.scalar( + select(FileUpload).where(FileUpload.id == upload_id) + ) + if file_upload is None: + self.context.log.warning( + f"FileUpload with id `{upload_id}` not found; " + "cannot stamp is_processed_in_staging." + ) + return + + approval_request = db.scalar( + select(ApprovalRequest).where( + (ApprovalRequest.country == self.country_code) + & (ApprovalRequest.dataset == formatted_dataset) + & (ApprovalRequest.enabled == True) # noqa: E712 + ) + ) + + file_upload.is_processed_in_staging = True + if approval_request: + file_upload.approval_status = "PENDING" + + self.context.log.info( + f"Stamped FileUpload {upload_id}: " + f"is_processed_in_staging=True, " + f"approval_status={'PENDING' if approval_request else 'null'}" + ) + except Exception as e: + self.context.log.error( + f"Failed to stamp FileUpload {upload_id} after staging: {e}" + ) + def _get_current_approval_request(self, db, formatted_dataset: str): return db.scalar( select(ApprovalRequest).where(