diff --git a/dagster/src/assets/adhoc/health_master_csv_to_gold.py b/dagster/src/assets/adhoc/health_master_csv_to_gold.py index a86c1bb8..21e28b41 100644 --- a/dagster/src/assets/adhoc/health_master_csv_to_gold.py +++ b/dagster/src/assets/adhoc/health_master_csv_to_gold.py @@ -1,3 +1,4 @@ +from functools import reduce from io import BytesIO from pathlib import Path @@ -85,20 +86,45 @@ def adhoc__health_master_data_transforms( ) sdf = sdf.withColumn("signature", sha2(concat_ws("|", *sorted(sdf.columns)), 256)) - df = sdf.toPandas() - df = df.drop_duplicates("health_id_giga") + mandatory_columns = [ + col.name + for col in schema_columns + if not col.nullable and col.name in sdf.columns + ] + context.log.info(f"Mandatory columns: {mandatory_columns}") + country_code = config.country_code path = Path(config.filepath) stem = path.stem - df_failed = df[df["health_id_giga"].isna()] - output_filepath = ( - f"{constants.gold_folder}/dq-results/health-master/failed/{stem}.csv" + + if mandatory_columns: + mandatory_null_condition = reduce( + lambda a, b: a | b, + [f.col(c).isNull() for c in mandatory_columns], + ) + sdf_failed = sdf.filter(mandatory_null_condition) + sdf = sdf.filter(~mandatory_null_condition) + + context.log.info( + f"Mandatory check: {sdf_failed.count()} rows dropped, {sdf.count()} rows passed" + ) + else: + sdf_failed = sdf.limit(0) + + df = sdf.toPandas() + df = df.drop_duplicates("health_id_giga") + + df_failed = sdf_failed.toPandas() + + failed_filepath = ( + f"{constants.gold_folder}/dq-results/failed/{country_code}/{stem}.csv" ) adls_file_client.upload_pandas_dataframe_as_file( context=context, data=df_failed, - filepath=str(output_filepath), + filepath=str(failed_filepath), ) + context.log.info(f"Failed rows written to {failed_filepath}: {len(df_failed)} rows") context.log.info(f"columns: {df.columns.tolist()}") context.log.info(f"row count: {len(df)}") diff --git a/dagster/src/sensors/adhoc.py b/dagster/src/sensors/adhoc.py index 012096d2..e7fb8afb 100644 --- a/dagster/src/sensors/adhoc.py +++ b/dagster/src/sensors/adhoc.py @@ -261,7 +261,7 @@ def health_master__gold_csv_to_deltatable_sensor( ), "adhoc__health_master_data_transforms": OpDestinationMapping( source_filepath=str(path), - destination_filepath=f"{constants.gold_folder}/dq-results/health-master/transforms/{stem}.csv", + destination_filepath=f"{constants.gold_folder}/dq-results/transforms/{country_code}/{stem}.csv", metastore_schema=metastore_schema, tier=DataTier.TRANSFORMS, ),