diff --git a/dagster/models/mappings.py b/dagster/models/mappings.py index 94dc212e..ef1654cf 100644 --- a/dagster/models/mappings.py +++ b/dagster/models/mappings.py @@ -5,6 +5,7 @@ VALID_PRIMITIVES = [ "string", "integer", + "int", "long", "float", "double", @@ -22,6 +23,7 @@ class TypeMapping(BaseModel): class TypeMappings(BaseModel): string: TypeMapping integer: TypeMapping + int: TypeMapping # Alias for integer long: TypeMapping float: TypeMapping double: TypeMapping diff --git a/dagster/src/assets/school_geolocation/assets.py b/dagster/src/assets/school_geolocation/assets.py index 226614aa..0549a26e 100644 --- a/dagster/src/assets/school_geolocation/assets.py +++ b/dagster/src/assets/school_geolocation/assets.py @@ -15,6 +15,7 @@ from pyspark.sql.types import StringType, StructField, StructType from sqlalchemy import select from src.constants import DataTier +from src.data_quality_checks.dq_context import DQContext, DQMode from src.data_quality_checks.utils import ( aggregate_report_json, aggregate_report_spark_df, @@ -301,9 +302,13 @@ def geolocation_data_quality_results( dq_results = row_level_checks( df=renamed_bronze, silver=casted_silver, - dataset_type=dataset_type, - _country_code_iso3=country_code, - mode=config.metadata["mode"], + dq_context=DQContext( + dq_mode=DQMode(config.metadata.get("dq_mode", "master")), + dataset_type=dataset_type, + country_code_iso3=country_code, + upload_id=id, + upload_mode=config.metadata.get("mode"), + ), context=context, ) @@ -696,6 +701,10 @@ def geolocation_staging( spark: PySparkResource, config: FileConfig, ) -> Output[None]: + if config.metadata.get("dq_mode") == "uploaded": + context.log.info("Skipping staging as dq_mode is 'uploaded'") + return Output(None) + if geolocation_dq_passed_rows.isEmpty(): context.log.warning("Skipping staging as there are no rows passing DQ checks") return Output(None) diff --git a/dagster/src/constants/constants_class.py b/dagster/src/constants/constants_class.py index b8de8b94..a4d18d96 100644 --- a/dagster/src/constants/constants_class.py +++ b/dagster/src/constants/constants_class.py @@ -60,6 +60,11 @@ class Constants(BaseSettings): pyspark=IntegerType, datahub=NumberTypeClass, ), + int=TypeMapping( + native=int, + pyspark=IntegerType, + datahub=NumberTypeClass, + ), long=TypeMapping( native=int, pyspark=LongType, diff --git a/dagster/src/data_quality_checks/dq_context.py b/dagster/src/data_quality_checks/dq_context.py new file mode 100644 index 00000000..a3199bb3 --- /dev/null +++ b/dagster/src/data_quality_checks/dq_context.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class DQMode(str, Enum): + UPLOADED = "uploaded" + MASTER = "master" + + +@dataclass(frozen=True) +class DQContext: + dq_mode: DQMode + dataset_type: str + country_code_iso3: str + upload_id: Optional[int] = None + upload_mode: Optional[str] = None + + @property + def mode(self) -> DQMode: + return self.dq_mode diff --git a/dagster/src/data_quality_checks/utils.py b/dagster/src/data_quality_checks/utils.py index 0522eae1..6471b2bc 100644 --- a/dagster/src/data_quality_checks/utils.py +++ b/dagster/src/data_quality_checks/utils.py @@ -1,4 +1,5 @@ from datetime import UTC, datetime +from typing import Any import pandas as pd from jinja2 import BaseLoader, Environment @@ -23,6 +24,7 @@ update_checks, ) from src.data_quality_checks.critical import critical_error_checks +from src.data_quality_checks.dq_context import DQContext, DQMode from src.data_quality_checks.duplicates import ( duplicate_all_except_checks, duplicate_set_checks, @@ -630,101 +632,204 @@ def dq_geolocation_extract_relevant_columns( return df, human_readable_mappings -def row_level_checks( +def run_master_checks( + df: sql.DataFrame, + dq_context: DQContext, + context: OpExecutionContext = None, +) -> sql.DataFrame: + df = is_not_within_country(df, dq_context.country_code_iso3, context) + df = similar_name_level_within_110_check(df, context) + df = school_density_check(df, context) + df = standard_checks(df, dq_context.dataset_type, context) + df = duplicate_all_except_checks( + df, + CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dq_context.dataset_type], + context, + ) + df = precision_check(df, config.PRECISION, context) + df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context) + df = duplicate_name_level_110_check(df, context) + df = column_relation_checks(df, dq_context.dataset_type, context) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + context, + ) + return df + + +def run_geolocation_checks( df: sql.DataFrame, - dataset_type: str, - _country_code_iso3: str, + dq_context: DQContext, silver: sql.DataFrame = None, - mode=None, context: OpExecutionContext = None, ) -> sql.DataFrame: - logger = get_context_with_fallback_logger(context) - logger.info("Starting row level checks...") - - if dataset_type == "master": - df = is_not_within_country(df, _country_code_iso3, context) - df = similar_name_level_within_110_check(df, context) - df = school_density_check(df, context) - df = standard_checks(df, dataset_type, context) - df = duplicate_all_except_checks( - df, - CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dataset_type], - context, - ) - df = precision_check(df, config.PRECISION, context) - df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context) - df = duplicate_name_level_110_check(df, context) - df = column_relation_checks(df, dataset_type, context) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - context, - ) - elif dataset_type == "geolocation": - if mode == UploadMode.CREATE.value: + if dq_context.dq_mode == DQMode.MASTER: + if dq_context.upload_mode == UploadMode.CREATE.value: df = create_checks(bronze=df, silver=silver, context=context) - elif mode == UploadMode.UPDATE.value: + elif dq_context.upload_mode == UploadMode.UPDATE.value: df = update_checks(bronze=df, silver=silver, context=context) + else: + # For assessment-only (uploaded mode), skip cross-checks against silver + # but ensure the columns exist to avoid downstream errors. + if dq_context.upload_mode == UploadMode.CREATE.value: + df = df.withColumn("dq_is_not_create", f.lit(0)) + elif dq_context.upload_mode == UploadMode.UPDATE.value: + df = df.withColumn("dq_is_not_update", f.lit(0)) + + df = is_not_within_country(df, dq_context.country_code_iso3, context) + df = similar_name_level_within_110_check(df, context) + df = school_density_check(df, context) + df = standard_checks(df, dq_context.dataset_type, context) + df = duplicate_all_except_checks( + df, + CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dq_context.dataset_type], + context, + ) + df = precision_check(df, config.PRECISION, context) + df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context) + df = duplicate_name_level_110_check(df, context) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + dq_context.upload_mode, + context, + ) + df = column_relation_checks(df, dq_context.dataset_type, context) + return df + + +def run_reference_checks( + df: sql.DataFrame, + dq_context: DQContext, + context: OpExecutionContext = None, +) -> sql.DataFrame: + df = standard_checks(df, dq_context.dataset_type, context) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + context, + ) + return df + + +def run_coverage_checks( + df: sql.DataFrame, + dq_context: DQContext, + context: OpExecutionContext = None, +) -> sql.DataFrame: + df = standard_checks(df, dq_context.dataset_type, context) + df = column_relation_checks(df, dq_context.dataset_type, context) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + context, + ) + return df + + +def run_coverage_fb_checks( + df: sql.DataFrame, + dq_context: DQContext, + context: OpExecutionContext = None, +) -> sql.DataFrame: + df = standard_checks( + df, dq_context.dataset_type, context, domain=False, range_=False + ) + df = fb_percent_sum_to_100_check(df, context) + df = column_relation_checks(df, dq_context.dataset_type, context) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + context, + ) + return df + + +def run_qos_checks( + df: sql.DataFrame, + dq_context: DQContext, + context: OpExecutionContext = None, +) -> sql.DataFrame: + df = standard_checks( + df, dq_context.dataset_type, context, domain=False, range_=False + ) + df = critical_error_checks( + df, + dq_context.dataset_type, + CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type], + context, + ) + return df + + +def row_level_checks_internal( + df: sql.DataFrame, + dq_context: DQContext, + silver: sql.DataFrame = None, + context: OpExecutionContext = None, +) -> sql.DataFrame: + logger = get_context_with_fallback_logger(context) + logger.info( + "Starting row level checks", + extra={ + "dq_mode": dq_context.mode.value, + "dataset_type": dq_context.dataset_type, + "country": dq_context.country_code_iso3, + "upload_mode": dq_context.upload_mode, + }, + ) + + if dq_context.dataset_type == "master": + df = run_master_checks(df, dq_context, context) + elif dq_context.dataset_type == "geolocation": + df = run_geolocation_checks(df, dq_context, silver, context) + elif dq_context.dataset_type == "reference": + df = run_reference_checks(df, dq_context, context) + elif dq_context.dataset_type in ["coverage", "coverage_itu"]: + df = run_coverage_checks(df, dq_context, context) + elif dq_context.dataset_type == "coverage_fb": + df = run_coverage_fb_checks(df, dq_context, context) + elif dq_context.dataset_type == "qos": + df = run_qos_checks(df, dq_context, context) - df = is_not_within_country(df, _country_code_iso3, context) - df = similar_name_level_within_110_check(df, context) - df = school_density_check(df, context) - df = standard_checks(df, dataset_type, context) - df = duplicate_all_except_checks( - df, - CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dataset_type], - context, - ) - df = precision_check(df, config.PRECISION, context) - df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context) - df = duplicate_name_level_110_check(df, context) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - mode, - context, - ) - df = column_relation_checks(df, dataset_type, context) - elif dataset_type == "reference": - df = standard_checks(df, dataset_type, context) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - context, - ) - elif dataset_type in ["coverage", "coverage_itu"]: - df = standard_checks(df, dataset_type, context) - df = column_relation_checks(df, dataset_type, context) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - context, - ) - elif dataset_type == "coverage_fb": - df = standard_checks(df, dataset_type, context, domain=False, range_=False) - df = fb_percent_sum_to_100_check(df, context) - df = column_relation_checks(df, dataset_type, context) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - context, - ) - elif dataset_type == "qos": - df = standard_checks(df, dataset_type, context, domain=False, range_=False) - df = critical_error_checks( - df, - dataset_type, - CONFIG_NONEMPTY_COLUMNS[dataset_type], - context, - ) return df +def row_level_checks( + df: sql.DataFrame, + dq_context: Any = None, + _country_code_iso3: str = None, + silver: sql.DataFrame = None, + mode: str = None, + context: OpExecutionContext = None, + dataset_type: str = None, +) -> sql.DataFrame: + # Resolve which signature is being used + if isinstance(dq_context, DQContext): + # Modern signature: row_level_checks(df, dq_context=DQContext(...), ...) + return row_level_checks_internal(df, dq_context, silver, context) + else: + # Legacy signature: row_level_checks(df, dataset_type, country_code, ...) + if dq_context is not None and dataset_type is None: + dataset_type = dq_context # Positional dataset_type + + # Build a temporary DQContext for internal processing + + internal_context = DQContext( + dq_mode=DQMode.MASTER, # Legacy calls default to MASTER + dataset_type=dataset_type, + country_code_iso3=_country_code_iso3, + upload_mode=mode, + ) + return row_level_checks_internal(df, internal_context, silver, context) + + def extract_school_id_govt_duplicates(df: sql.DataFrame): window = w.Window.partitionBy("school_id_govt").orderBy(f.lit(1)) diff --git a/dagster/src/sensors/school_geolocation.py b/dagster/src/sensors/school_geolocation.py index ff075622..c4e5531b 100644 --- a/dagster/src/sensors/school_geolocation.py +++ b/dagster/src/sensors/school_geolocation.py @@ -136,9 +136,12 @@ def school_master_geolocation__raw_file_uploads_sensor( country_code=country_code, ) + last_modified = properties.last_modified.strftime("%Y%m%d-%H%M%S") + dq_triggered_at = metadata.get("dq_triggered_at", "") + context.log.info(f"FILE: {path}") yield RunRequest( - run_key=str(path), + run_key=f"{path}:{last_modified}:{dq_triggered_at}", run_config=RunConfig(ops=run_ops), tags={"country": country_code}, )