From 5a151c4df2d977462966956d33549a134d305d1b Mon Sep 17 00:00:00 2001 From: reanbrenda Date: Fri, 13 Mar 2026 03:33:46 +0100 Subject: [PATCH 1/5] runs not reflecting --- dagster/.env.example | 1 + dagster/src/sensors/school_coverage.py | 2 +- dagster/src/sensors/school_geolocation.py | 2 +- dagster/src/sensors/unstructured.py | 2 +- dagster/src/settings.py | 5 +++++ dagster/src/utils/adls.py | 7 ++++--- 6 files changed, 13 insertions(+), 6 deletions(-) diff --git a/dagster/.env.example b/dagster/.env.example index b606abdfa..03013462c 100644 --- a/dagster/.env.example +++ b/dagster/.env.example @@ -29,6 +29,7 @@ SPARK_RPC_AUTHENTICATION_SECRET= SPARK_SSL_NEED_CLIENT_AUTH=yes HIVE_METASTORE_URI= WAREHOUSE_USERNAME= +# Match giga-data-ingestion when local so upload sensors find blobs (optional) LAKEHOUSE_USERNAME= GITHUB_ACCESS_TOKEN= diff --git a/dagster/src/sensors/school_coverage.py b/dagster/src/sensors/school_coverage.py index 893fa6bfb..8450ad8b0 100644 --- a/dagster/src/sensors/school_coverage.py +++ b/dagster/src/sensors/school_coverage.py @@ -27,7 +27,7 @@ def school_master_coverage__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/school_geolocation.py b/dagster/src/sensors/school_geolocation.py index ff075622b..aaefdc4dc 100644 --- a/dagster/src/sensors/school_geolocation.py +++ b/dagster/src/sensors/school_geolocation.py @@ -27,7 +27,7 @@ def school_master_geolocation__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/unstructured.py b/dagster/src/sensors/unstructured.py index 2513d304c..40d4041f7 100644 --- a/dagster/src/sensors/unstructured.py +++ b/dagster/src/sensors/unstructured.py @@ -26,7 +26,7 @@ def unstructured__emit_metadata_to_datahub_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/settings.py b/dagster/src/settings.py index 0ea900177..a3e60f7c1 100644 --- a/dagster/src/settings.py +++ b/dagster/src/settings.py @@ -167,6 +167,11 @@ def SPARK_WAREHOUSE_DIR(self) -> str: @property def LAKEHOUSE_PATH(self) -> str: + """Must match giga-data-ingestion LAKEHOUSE_PATH so the upload sensor finds blobs.""" + if self.PYTHON_ENV == Environment.LOCAL: + if self.LAKEHOUSE_USERNAME: + return f"lakehouse-local-{self.LAKEHOUSE_USERNAME}" + return "lakehouse-local" return "" @property diff --git a/dagster/src/utils/adls.py b/dagster/src/utils/adls.py index 1112dbd2f..1147a9e2a 100644 --- a/dagster/src/utils/adls.py +++ b/dagster/src/utils/adls.py @@ -119,10 +119,11 @@ def _get_container_client() -> ContainerClient: class ADLSFileClient(ConfigurableResource): @staticmethod def _get_metadata_path(filepath: str) -> Optional[str]: - # Normalize paths by stripping leading slashes for comparison + # Normalize paths by stripping leading slashes for comparison. + # Use same prefix as ingestion (LAKEHOUSE_PATH + UPLOAD_PATH_PREFIX) so we match blob paths from the upload sensor. normalized_filepath = filepath.lstrip("/") - normalized_prefix = constants.UPLOAD_PATH_PREFIX.lstrip("/") - normalized_metadata_prefix = constants.UPLOAD_METADATA_PATH_PREFIX.lstrip("/") + normalized_prefix = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}".lstrip("/") + normalized_metadata_prefix = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_METADATA_PATH_PREFIX}".lstrip("/") if normalized_filepath.startswith(normalized_prefix): return ( From 3133ffc523dc23d71fcca521a60c0ac3eb59684a Mon Sep 17 00:00:00 2001 From: reanbrenda Date: Wed, 13 May 2026 01:16:35 +0200 Subject: [PATCH 2/5] feat(health): mandatory column filter and dq-results paths Drop rows missing any non-nullable schema column; write failures to gold/dq-results/failed/{country}/. Point transforms destination at gold/dq-results/transforms/{country}/ in the health master sensor. Co-authored-by: Cursor --- .../assets/adhoc/health_master_csv_to_gold.py | 36 +++++++++++++++---- dagster/src/sensors/adhoc.py | 2 +- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/dagster/src/assets/adhoc/health_master_csv_to_gold.py b/dagster/src/assets/adhoc/health_master_csv_to_gold.py index a86c1bb8d..069270412 100644 --- a/dagster/src/assets/adhoc/health_master_csv_to_gold.py +++ b/dagster/src/assets/adhoc/health_master_csv_to_gold.py @@ -1,5 +1,6 @@ from io import BytesIO from pathlib import Path +from functools import reduce import numpy as np import pandas as pd @@ -85,20 +86,43 @@ def adhoc__health_master_data_transforms( ) sdf = sdf.withColumn("signature", sha2(concat_ws("|", *sorted(sdf.columns)), 256)) - df = sdf.toPandas() - df = df.drop_duplicates("health_id_giga") + mandatory_columns = [ + col.name for col in schema_columns if not col.nullable and col.name in sdf.columns + ] + context.log.info(f"Mandatory columns: {mandatory_columns}") + country_code = config.country_code path = Path(config.filepath) stem = path.stem - df_failed = df[df["health_id_giga"].isna()] - output_filepath = ( - f"{constants.gold_folder}/dq-results/health-master/failed/{stem}.csv" + + if mandatory_columns: + mandatory_null_condition = reduce( + lambda a, b: a | b, + [f.col(c).isNull() for c in mandatory_columns], + ) + sdf_failed = sdf.filter(mandatory_null_condition) + sdf = sdf.filter(~mandatory_null_condition) + + context.log.info( + f"Mandatory check: {sdf_failed.count()} rows dropped, {sdf.count()} rows passed" + ) + else: + sdf_failed = sdf.limit(0) + + df = sdf.toPandas() + df = df.drop_duplicates("health_id_giga") + + df_failed = sdf_failed.toPandas() + + failed_filepath = ( + f"{constants.gold_folder}/dq-results/failed/{country_code}/{stem}.csv" ) adls_file_client.upload_pandas_dataframe_as_file( context=context, data=df_failed, - filepath=str(output_filepath), + filepath=str(failed_filepath), ) + context.log.info(f"Failed rows written to {failed_filepath}: {len(df_failed)} rows") context.log.info(f"columns: {df.columns.tolist()}") context.log.info(f"row count: {len(df)}") diff --git a/dagster/src/sensors/adhoc.py b/dagster/src/sensors/adhoc.py index 012096d23..e7fb8afb7 100644 --- a/dagster/src/sensors/adhoc.py +++ b/dagster/src/sensors/adhoc.py @@ -261,7 +261,7 @@ def health_master__gold_csv_to_deltatable_sensor( ), "adhoc__health_master_data_transforms": OpDestinationMapping( source_filepath=str(path), - destination_filepath=f"{constants.gold_folder}/dq-results/health-master/transforms/{stem}.csv", + destination_filepath=f"{constants.gold_folder}/dq-results/transforms/{country_code}/{stem}.csv", metastore_schema=metastore_schema, tier=DataTier.TRANSFORMS, ), From 7c1da07ff77fd23225f7a6c234f931ab94ea501f Mon Sep 17 00:00:00 2001 From: reanbrenda Date: Wed, 13 May 2026 01:21:03 +0200 Subject: [PATCH 3/5] chore: revert .env.example comment for LAKEHOUSE_USERNAME Co-authored-by: Cursor --- dagster/.env.example | 1 - 1 file changed, 1 deletion(-) diff --git a/dagster/.env.example b/dagster/.env.example index 03013462c..b606abdfa 100644 --- a/dagster/.env.example +++ b/dagster/.env.example @@ -29,7 +29,6 @@ SPARK_RPC_AUTHENTICATION_SECRET= SPARK_SSL_NEED_CLIENT_AUTH=yes HIVE_METASTORE_URI= WAREHOUSE_USERNAME= -# Match giga-data-ingestion when local so upload sensors find blobs (optional) LAKEHOUSE_USERNAME= GITHUB_ACCESS_TOKEN= From d247fb169b5bb48fa2c892ecd01ea907043bd522 Mon Sep 17 00:00:00 2001 From: reanbrenda Date: Wed, 13 May 2026 01:28:23 +0200 Subject: [PATCH 4/5] style: ruff format after pre-commit --- dagster/src/assets/adhoc/health_master_csv_to_gold.py | 6 ++++-- dagster/src/sensors/school_coverage.py | 4 +++- dagster/src/sensors/school_geolocation.py | 4 +++- dagster/src/sensors/unstructured.py | 4 +++- dagster/src/utils/adls.py | 10 ++++++++-- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/dagster/src/assets/adhoc/health_master_csv_to_gold.py b/dagster/src/assets/adhoc/health_master_csv_to_gold.py index 069270412..21e28b41f 100644 --- a/dagster/src/assets/adhoc/health_master_csv_to_gold.py +++ b/dagster/src/assets/adhoc/health_master_csv_to_gold.py @@ -1,6 +1,6 @@ +from functools import reduce from io import BytesIO from pathlib import Path -from functools import reduce import numpy as np import pandas as pd @@ -87,7 +87,9 @@ def adhoc__health_master_data_transforms( sdf = sdf.withColumn("signature", sha2(concat_ws("|", *sorted(sdf.columns)), 256)) mandatory_columns = [ - col.name for col in schema_columns if not col.nullable and col.name in sdf.columns + col.name + for col in schema_columns + if not col.nullable and col.name in sdf.columns ] context.log.info(f"Mandatory columns: {mandatory_columns}") diff --git a/dagster/src/sensors/school_coverage.py b/dagster/src/sensors/school_coverage.py index 8450ad8b0..b7cd26c0e 100644 --- a/dagster/src/sensors/school_coverage.py +++ b/dagster/src/sensors/school_coverage.py @@ -27,7 +27,9 @@ def school_master_coverage__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( + "/" + ) for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/school_geolocation.py b/dagster/src/sensors/school_geolocation.py index aaefdc4dc..eb1635542 100644 --- a/dagster/src/sensors/school_geolocation.py +++ b/dagster/src/sensors/school_geolocation.py @@ -27,7 +27,9 @@ def school_master_geolocation__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( + "/" + ) for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/unstructured.py b/dagster/src/sensors/unstructured.py index 40d4041f7..2d28379ad 100644 --- a/dagster/src/sensors/unstructured.py +++ b/dagster/src/sensors/unstructured.py @@ -26,7 +26,9 @@ def unstructured__emit_metadata_to_datahub_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip("/") + source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( + "/" + ) for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/utils/adls.py b/dagster/src/utils/adls.py index 1147a9e2a..89b06e729 100644 --- a/dagster/src/utils/adls.py +++ b/dagster/src/utils/adls.py @@ -122,8 +122,14 @@ def _get_metadata_path(filepath: str) -> Optional[str]: # Normalize paths by stripping leading slashes for comparison. # Use same prefix as ingestion (LAKEHOUSE_PATH + UPLOAD_PATH_PREFIX) so we match blob paths from the upload sensor. normalized_filepath = filepath.lstrip("/") - normalized_prefix = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}".lstrip("/") - normalized_metadata_prefix = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_METADATA_PATH_PREFIX}".lstrip("/") + normalized_prefix = ( + f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}".lstrip("/") + ) + normalized_metadata_prefix = ( + f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_METADATA_PATH_PREFIX}".lstrip( + "/" + ) + ) if normalized_filepath.startswith(normalized_prefix): return ( From 4fdb4446108df6c4547edcc79b593f135312b37b Mon Sep 17 00:00:00 2001 From: reanbrenda Date: Wed, 13 May 2026 01:33:20 +0200 Subject: [PATCH 5/5] revert: align school sensors and lakehouse helpers with main Keep this branch focused on health master pipeline changes only. --- dagster/src/sensors/school_coverage.py | 4 +--- dagster/src/sensors/school_geolocation.py | 4 +--- dagster/src/sensors/unstructured.py | 4 +--- dagster/src/settings.py | 5 ----- dagster/src/utils/adls.py | 13 +++---------- 5 files changed, 6 insertions(+), 24 deletions(-) diff --git a/dagster/src/sensors/school_coverage.py b/dagster/src/sensors/school_coverage.py index b7cd26c0e..893fa6bfb 100644 --- a/dagster/src/sensors/school_coverage.py +++ b/dagster/src/sensors/school_coverage.py @@ -27,9 +27,7 @@ def school_master_coverage__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( - "/" - ) + source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/school_geolocation.py b/dagster/src/sensors/school_geolocation.py index eb1635542..ff075622b 100644 --- a/dagster/src/sensors/school_geolocation.py +++ b/dagster/src/sensors/school_geolocation.py @@ -27,9 +27,7 @@ def school_master_geolocation__raw_file_uploads_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( - "/" - ) + source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/sensors/unstructured.py b/dagster/src/sensors/unstructured.py index 2d28379ad..2513d304c 100644 --- a/dagster/src/sensors/unstructured.py +++ b/dagster/src/sensors/unstructured.py @@ -26,9 +26,7 @@ def unstructured__emit_metadata_to_datahub_sensor( adls_file_client: ADLSFileClient, ): count = 0 - source_directory = f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}".lstrip( - "/" - ) + source_directory = f"{constants.UPLOAD_PATH_PREFIX}/{DOMAIN_DATASET_TYPE}" for file_data in adls_file_client.list_paths_generator( source_directory, recursive=True diff --git a/dagster/src/settings.py b/dagster/src/settings.py index a3e60f7c1..0ea900177 100644 --- a/dagster/src/settings.py +++ b/dagster/src/settings.py @@ -167,11 +167,6 @@ def SPARK_WAREHOUSE_DIR(self) -> str: @property def LAKEHOUSE_PATH(self) -> str: - """Must match giga-data-ingestion LAKEHOUSE_PATH so the upload sensor finds blobs.""" - if self.PYTHON_ENV == Environment.LOCAL: - if self.LAKEHOUSE_USERNAME: - return f"lakehouse-local-{self.LAKEHOUSE_USERNAME}" - return "lakehouse-local" return "" @property diff --git a/dagster/src/utils/adls.py b/dagster/src/utils/adls.py index 89b06e729..1112dbd2f 100644 --- a/dagster/src/utils/adls.py +++ b/dagster/src/utils/adls.py @@ -119,17 +119,10 @@ def _get_container_client() -> ContainerClient: class ADLSFileClient(ConfigurableResource): @staticmethod def _get_metadata_path(filepath: str) -> Optional[str]: - # Normalize paths by stripping leading slashes for comparison. - # Use same prefix as ingestion (LAKEHOUSE_PATH + UPLOAD_PATH_PREFIX) so we match blob paths from the upload sensor. + # Normalize paths by stripping leading slashes for comparison normalized_filepath = filepath.lstrip("/") - normalized_prefix = ( - f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_PATH_PREFIX}".lstrip("/") - ) - normalized_metadata_prefix = ( - f"{settings.LAKEHOUSE_PATH}/{constants.UPLOAD_METADATA_PATH_PREFIX}".lstrip( - "/" - ) - ) + normalized_prefix = constants.UPLOAD_PATH_PREFIX.lstrip("/") + normalized_metadata_prefix = constants.UPLOAD_METADATA_PATH_PREFIX.lstrip("/") if normalized_filepath.startswith(normalized_prefix): return (