Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dagster/models/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
VALID_PRIMITIVES = [
"string",
"integer",
"int",
"long",
"float",
"double",
Expand All @@ -22,6 +23,7 @@ class TypeMapping(BaseModel):
class TypeMappings(BaseModel):
string: TypeMapping
integer: TypeMapping
int: TypeMapping # Alias for integer
long: TypeMapping
float: TypeMapping
double: TypeMapping
Expand Down
15 changes: 12 additions & 3 deletions dagster/src/assets/school_geolocation/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pyspark.sql.types import StringType, StructField, StructType
from sqlalchemy import select
from src.constants import DataTier
from src.data_quality_checks.dq_context import DQContext, DQMode
from src.data_quality_checks.utils import (
aggregate_report_json,
aggregate_report_spark_df,
Expand Down Expand Up @@ -301,9 +302,13 @@ def geolocation_data_quality_results(
dq_results = row_level_checks(
df=renamed_bronze,
silver=casted_silver,
dataset_type=dataset_type,
_country_code_iso3=country_code,
mode=config.metadata["mode"],
dq_context=DQContext(
dq_mode=DQMode(config.metadata.get("dq_mode", "master")),
dataset_type=dataset_type,
country_code_iso3=country_code,
upload_id=id,
upload_mode=config.metadata.get("mode"),
),
context=context,
)

Expand Down Expand Up @@ -696,6 +701,10 @@ def geolocation_staging(
spark: PySparkResource,
config: FileConfig,
) -> Output[None]:
if config.metadata.get("dq_mode") == "uploaded":
context.log.info("Skipping staging as dq_mode is 'uploaded'")
return Output(None)

if geolocation_dq_passed_rows.isEmpty():
context.log.warning("Skipping staging as there are no rows passing DQ checks")
return Output(None)
Expand Down
5 changes: 5 additions & 0 deletions dagster/src/constants/constants_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class Constants(BaseSettings):
pyspark=IntegerType,
datahub=NumberTypeClass,
),
int=TypeMapping(
native=int,
pyspark=IntegerType,
datahub=NumberTypeClass,
),
long=TypeMapping(
native=int,
pyspark=LongType,
Expand Down
21 changes: 21 additions & 0 deletions dagster/src/data_quality_checks/dq_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional


class DQMode(str, Enum):
UPLOADED = "uploaded"
MASTER = "master"


@dataclass(frozen=True)
class DQContext:
dq_mode: DQMode
dataset_type: str
country_code_iso3: str
upload_id: Optional[int] = None
upload_mode: Optional[str] = None

@property
def mode(self) -> DQMode:
return self.dq_mode
275 changes: 190 additions & 85 deletions dagster/src/data_quality_checks/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import UTC, datetime
from typing import Any

import pandas as pd
from jinja2 import BaseLoader, Environment
Expand All @@ -23,6 +24,7 @@
update_checks,
)
from src.data_quality_checks.critical import critical_error_checks
from src.data_quality_checks.dq_context import DQContext, DQMode
from src.data_quality_checks.duplicates import (
duplicate_all_except_checks,
duplicate_set_checks,
Expand Down Expand Up @@ -630,101 +632,204 @@ def dq_geolocation_extract_relevant_columns(
return df, human_readable_mappings


def row_level_checks(
def run_master_checks(
df: sql.DataFrame,
dq_context: DQContext,
context: OpExecutionContext = None,
) -> sql.DataFrame:
df = is_not_within_country(df, dq_context.country_code_iso3, context)
df = similar_name_level_within_110_check(df, context)
df = school_density_check(df, context)
df = standard_checks(df, dq_context.dataset_type, context)
df = duplicate_all_except_checks(
df,
CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dq_context.dataset_type],
context,
)
df = precision_check(df, config.PRECISION, context)
df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context)
df = duplicate_name_level_110_check(df, context)
df = column_relation_checks(df, dq_context.dataset_type, context)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
context,
)
return df


def run_geolocation_checks(
df: sql.DataFrame,
dataset_type: str,
_country_code_iso3: str,
dq_context: DQContext,
silver: sql.DataFrame = None,
mode=None,
context: OpExecutionContext = None,
) -> sql.DataFrame:
logger = get_context_with_fallback_logger(context)
logger.info("Starting row level checks...")

if dataset_type == "master":
df = is_not_within_country(df, _country_code_iso3, context)
df = similar_name_level_within_110_check(df, context)
df = school_density_check(df, context)
df = standard_checks(df, dataset_type, context)
df = duplicate_all_except_checks(
df,
CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dataset_type],
context,
)
df = precision_check(df, config.PRECISION, context)
df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context)
df = duplicate_name_level_110_check(df, context)
df = column_relation_checks(df, dataset_type, context)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
context,
)
elif dataset_type == "geolocation":
if mode == UploadMode.CREATE.value:
if dq_context.dq_mode == DQMode.MASTER:
if dq_context.upload_mode == UploadMode.CREATE.value:
df = create_checks(bronze=df, silver=silver, context=context)
elif mode == UploadMode.UPDATE.value:
elif dq_context.upload_mode == UploadMode.UPDATE.value:
df = update_checks(bronze=df, silver=silver, context=context)
else:
# For assessment-only (uploaded mode), skip cross-checks against silver
# but ensure the columns exist to avoid downstream errors.
if dq_context.upload_mode == UploadMode.CREATE.value:
df = df.withColumn("dq_is_not_create", f.lit(0))
elif dq_context.upload_mode == UploadMode.UPDATE.value:
df = df.withColumn("dq_is_not_update", f.lit(0))

Comment on lines +674 to +680
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't want to do this, so we can skip it

df = is_not_within_country(df, dq_context.country_code_iso3, context)
df = similar_name_level_within_110_check(df, context)
df = school_density_check(df, context)
df = standard_checks(df, dq_context.dataset_type, context)
df = duplicate_all_except_checks(
df,
CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dq_context.dataset_type],
context,
)
df = precision_check(df, config.PRECISION, context)
df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context)
df = duplicate_name_level_110_check(df, context)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
dq_context.upload_mode,
context,
)
df = column_relation_checks(df, dq_context.dataset_type, context)
return df


def run_reference_checks(
df: sql.DataFrame,
dq_context: DQContext,
context: OpExecutionContext = None,
) -> sql.DataFrame:
df = standard_checks(df, dq_context.dataset_type, context)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
context,
)
return df


def run_coverage_checks(
df: sql.DataFrame,
dq_context: DQContext,
context: OpExecutionContext = None,
) -> sql.DataFrame:
df = standard_checks(df, dq_context.dataset_type, context)
df = column_relation_checks(df, dq_context.dataset_type, context)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
context,
)
return df


def run_coverage_fb_checks(
df: sql.DataFrame,
dq_context: DQContext,
context: OpExecutionContext = None,
) -> sql.DataFrame:
df = standard_checks(
df, dq_context.dataset_type, context, domain=False, range_=False
)
df = fb_percent_sum_to_100_check(df, context)
df = column_relation_checks(df, dq_context.dataset_type, context)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
context,
)
return df


def run_qos_checks(
df: sql.DataFrame,
dq_context: DQContext,
context: OpExecutionContext = None,
) -> sql.DataFrame:
df = standard_checks(
df, dq_context.dataset_type, context, domain=False, range_=False
)
df = critical_error_checks(
df,
dq_context.dataset_type,
CONFIG_NONEMPTY_COLUMNS[dq_context.dataset_type],
context,
)
return df


def row_level_checks_internal(
df: sql.DataFrame,
dq_context: DQContext,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we've added dq_context, it will break the uses of this function in row_level_checks for the master and reference checks

silver: sql.DataFrame = None,
context: OpExecutionContext = None,
) -> sql.DataFrame:
logger = get_context_with_fallback_logger(context)
logger.info(
"Starting row level checks",
extra={
"dq_mode": dq_context.mode.value,
"dataset_type": dq_context.dataset_type,
"country": dq_context.country_code_iso3,
"upload_mode": dq_context.upload_mode,
},
)

if dq_context.dataset_type == "master":
df = run_master_checks(df, dq_context, context)
elif dq_context.dataset_type == "geolocation":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this:
Here, we need to only run this if the DQmode is "master"

Or set the resulting is_not_create and is_not_update` columns to 0 so that they don't cause critical errors when DQMode is not master

df = run_geolocation_checks(df, dq_context, silver, context)
elif dq_context.dataset_type == "reference":
df = run_reference_checks(df, dq_context, context)
elif dq_context.dataset_type in ["coverage", "coverage_itu"]:
df = run_coverage_checks(df, dq_context, context)
elif dq_context.dataset_type == "coverage_fb":
df = run_coverage_fb_checks(df, dq_context, context)
elif dq_context.dataset_type == "qos":
df = run_qos_checks(df, dq_context, context)

df = is_not_within_country(df, _country_code_iso3, context)
df = similar_name_level_within_110_check(df, context)
df = school_density_check(df, context)
df = standard_checks(df, dataset_type, context)
df = duplicate_all_except_checks(
df,
CONFIG_COLUMNS_EXCEPT_SCHOOL_ID[dataset_type],
context,
)
df = precision_check(df, config.PRECISION, context)
df = duplicate_set_checks(df, config.UNIQUE_SET_COLUMNS, context)
df = duplicate_name_level_110_check(df, context)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
mode,
context,
)
df = column_relation_checks(df, dataset_type, context)
elif dataset_type == "reference":
df = standard_checks(df, dataset_type, context)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
context,
)
elif dataset_type in ["coverage", "coverage_itu"]:
df = standard_checks(df, dataset_type, context)
df = column_relation_checks(df, dataset_type, context)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
context,
)
elif dataset_type == "coverage_fb":
df = standard_checks(df, dataset_type, context, domain=False, range_=False)
df = fb_percent_sum_to_100_check(df, context)
df = column_relation_checks(df, dataset_type, context)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
context,
)
elif dataset_type == "qos":
df = standard_checks(df, dataset_type, context, domain=False, range_=False)
df = critical_error_checks(
df,
dataset_type,
CONFIG_NONEMPTY_COLUMNS[dataset_type],
context,
)
return df


def row_level_checks(
df: sql.DataFrame,
dq_context: Any = None,
_country_code_iso3: str = None,
silver: sql.DataFrame = None,
mode: str = None,
context: OpExecutionContext = None,
dataset_type: str = None,
) -> sql.DataFrame:
# Resolve which signature is being used
if isinstance(dq_context, DQContext):
# Modern signature: row_level_checks(df, dq_context=DQContext(...), ...)
return row_level_checks_internal(df, dq_context, silver, context)
else:
# Legacy signature: row_level_checks(df, dataset_type, country_code, ...)
if dq_context is not None and dataset_type is None:
dataset_type = dq_context # Positional dataset_type

# Build a temporary DQContext for internal processing

internal_context = DQContext(
dq_mode=DQMode.MASTER, # Legacy calls default to MASTER
dataset_type=dataset_type,
country_code_iso3=_country_code_iso3,
upload_mode=mode,
)
return row_level_checks_internal(df, internal_context, silver, context)


def extract_school_id_govt_duplicates(df: sql.DataFrame):
window = w.Window.partitionBy("school_id_govt").orderBy(f.lit(1))

Expand Down
Loading