diff --git a/dagster/src/assets/adhoc/master_csv_to_gold.py b/dagster/src/assets/adhoc/master_csv_to_gold.py index 648fd2a35..3d0d2f480 100644 --- a/dagster/src/assets/adhoc/master_csv_to_gold.py +++ b/dagster/src/assets/adhoc/master_csv_to_gold.py @@ -42,7 +42,6 @@ check_table_exists, create_delta_table, create_schema, - sync_schema, ) from src.utils.logger import ContextLoggerWithLoguruFallback from src.utils.metadata import get_output_metadata, get_table_preview @@ -608,38 +607,6 @@ def adhoc__publish_master_to_gold( ) gold = compute_row_hash(gold) - table_exists = check_table_exists( - spark=spark.spark_session, - schema_name="school_master", - table_name=config.country_code.lower(), - data_tier=DataTier.GOLD, - ) - - if table_exists: - table_name = f"{config.metastore_schema}.{config.country_code}" - updated_schema = StructType( - get_schema_columns( - spark=spark.spark_session, schema_name=config.metastore_schema - ) - ) - - context.log.info(f"Existing table name: {table_name}") - - spark.spark_session.catalog.refreshTable(table_name) - existing_df = DeltaTable.forName( - sparkSession=spark.spark_session, tableOrViewName=table_name - ).toDF() - - existing_schema = existing_df.schema - - sync_schema( - table_name=table_name, - existing_schema=existing_schema, - updated_schema=updated_schema, - spark=spark.spark_session, - context=context, - ) - schema_reference = get_schema_columns_datahub( spark.spark_session, config.metastore_schema, @@ -690,38 +657,6 @@ def adhoc__publish_reference_to_gold( ) gold = compute_row_hash(gold) - table_exists = check_table_exists( - spark=spark.spark_session, - schema_name="school_reference", - table_name=config.country_code.lower(), - data_tier=DataTier.GOLD, - ) - - if table_exists: - table_name = f"{config.metastore_schema}.{config.country_code}" - updated_schema = StructType( - get_schema_columns( - spark=spark.spark_session, schema_name=config.metastore_schema - ) - ) - - context.log.info(f"Existing table name: {table_name}") - - spark.spark_session.catalog.refreshTable(table_name) - existing_df = DeltaTable.forName( - sparkSession=spark.spark_session, - tableOrViewName=table_name, - ).toDF() - existing_schema = existing_df.schema - - sync_schema( - table_name=table_name, - existing_schema=existing_schema, - updated_schema=updated_schema, - spark=spark.spark_session, - context=context, - ) - schema_reference = get_schema_columns_datahub( spark.spark_session, config.metastore_schema, diff --git a/dagster/src/assets/common/assets.py b/dagster/src/assets/common/assets.py index 0dbd7364d..ef263fec2 100644 --- a/dagster/src/assets/common/assets.py +++ b/dagster/src/assets/common/assets.py @@ -629,7 +629,7 @@ def reset_staging_table( raise -def _handle_null_columns(schema_columns, primary_key): +def handle_null_columns(schema_columns, primary_key): """Handle null columns by providing default values based on data type. If the column value is NULL, add a placeholder value if the following @@ -771,7 +771,7 @@ def master( else: new_master = silver - column_actions = _handle_null_columns(schema_columns, primary_key) + column_actions = handle_null_columns(schema_columns, primary_key) new_master = new_master.withColumns(column_actions) new_master = compute_row_hash(new_master) @@ -829,7 +829,7 @@ def reference( else: new_reference = silver - column_actions = _handle_null_columns(schema_columns, primary_key) + column_actions = handle_null_columns(schema_columns, primary_key) new_reference = new_reference.withColumns(column_actions) new_reference = compute_row_hash(new_reference) diff --git a/dagster/src/assets/migrations/bootstrap_column_id_maps.py b/dagster/src/assets/migrations/bootstrap_column_id_maps.py new file mode 100644 index 000000000..8a3e7f663 --- /dev/null +++ b/dagster/src/assets/migrations/bootstrap_column_id_maps.py @@ -0,0 +1,191 @@ +"""One-time migration: bootstrap giga.columnId.* table properties for all existing +Delta tables that predate the UUID-based rename/delete detection feature. + +For tables where the schema CSV has already been renamed before this feature was +deployed, the script also detects the mismatch and renames the physical column to +match the current schema before persisting the mapping — eliminating the need for +manual per-table ALTER TABLE statements. + +Run once per environment after deploying the rename/delete detection feature. +Safe to re-run: tables that already have the mapping stored are skipped. +""" + +from dagster_pyspark import PySparkResource +from pyspark.sql import SparkSession +from src.constants import DataTier +from src.utils.delta import ( + enable_column_mapping, + get_stored_column_id_map, + persist_column_id_map, +) +from src.utils.schema import ( + construct_schema_name_for_tier, + get_schema_columns_with_id, +) + +from dagster import OpExecutionContext, asset + + +def _get_all_country_tables( + spark: SparkSession, + tier_schema: str, +) -> list[str]: + """Return all fully-qualified table names in a given schema.""" + if not spark.catalog.databaseExists(tier_schema): + return [] + rows = spark.sql(f"SHOW TABLES IN `{tier_schema}`").collect() + return [f"{tier_schema}.{row['tableName']}" for row in rows] + + +def _resolve_schema_name(dataset_type: str) -> str: + """Return the schemas-namespace table name for a dataset type, e.g. 'school_geolocation'.""" + return dataset_type + + +def bootstrap_table( + spark: SparkSession, + context: OpExecutionContext, + full_table_name: str, + schema_name: str, + dry_run: bool = False, +) -> None: + """Bootstrap or repair the column-ID mapping for a single Delta table. + + Steps: + 1. If the table already has a complete mapping, skip it. + 2. Build updated_id_map from the schema CSV (name -> uuid). + 3. For physical columns that have no stored UUID and no direct name match in + the schema, try to match them to unmatched schema columns by comparing + the set of columns that differ between table and schema. + If the mismatch is unambiguous (1 old name : 1 new name), rename the + physical column and proceed. If ambiguous, log a warning and skip. + 4. Call persist_column_id_map to store the final mapping. + """ + if not spark.catalog.tableExists(full_table_name): + context.log.info(f"Table {full_table_name} does not exist — skipping.") + return + + existing_map = get_stored_column_id_map(spark, full_table_name) + + try: + columns_with_id = get_schema_columns_with_id(spark, schema_name) + except Exception as exc: + context.log.warning( + f"Could not load schema '{schema_name}' for {full_table_name}: {exc} — skipping." + ) + return + + updated_id_map: dict[str, str] = { + field.name: csv_id for csv_id, field in columns_with_id + } + + spark.catalog.refreshTable(full_table_name) + physical_columns: list[str] = spark.table(full_table_name).columns + + # Columns in the table that have no stored UUID and no direct name match in schema + orphan_table_cols = [ + c for c in physical_columns if c not in existing_map and c not in updated_id_map + ] + # Schema columns that are not physically present in the table + missing_schema_cols = [c for c in updated_id_map if c not in physical_columns] + + if orphan_table_cols: + context.log.info( + f"{full_table_name}: orphan table columns (no UUID, no schema match): " + f"{orphan_table_cols}" + ) + context.log.info( + f"{full_table_name}: missing schema columns (in schema but not in table): " + f"{missing_schema_cols}" + ) + + if orphan_table_cols and missing_schema_cols: + if len(orphan_table_cols) == len(missing_schema_cols): + # Unambiguous 1-to-1 mismatch — safe to rename + renames = dict(zip(orphan_table_cols, missing_schema_cols, strict=False)) + context.log.info(f"{full_table_name}: detected probable renames: {renames}") + if not dry_run: + enable_column_mapping(spark, full_table_name) + for old_name, new_name in renames.items(): + stmt = ( + f"ALTER TABLE {full_table_name} " + f"RENAME COLUMN `{old_name}` TO `{new_name}`" + ) + context.log.info(f"Executing: {stmt}") + spark.sql(stmt) + spark.catalog.refreshTable(full_table_name) + else: + context.log.info( + f"[DRY RUN] Would rename columns in {full_table_name}: {renames}" + ) + else: + # Ambiguous — multiple columns differ; cannot safely auto-rename + context.log.warning( + f"{full_table_name}: ambiguous column mismatch " + f"(orphan table cols: {orphan_table_cols}, " + f"missing schema cols: {missing_schema_cols}). " + f"Cannot auto-rename. Manual intervention required." + ) + # Still persist what we can so the rest of the mapping is stored + elif not orphan_table_cols and not missing_schema_cols: + context.log.info( + f"{full_table_name}: all physical columns match the schema by name." + ) + + if not dry_run: + persist_column_id_map(spark, full_table_name, schema_name) + context.log.info(f"{full_table_name}: column-ID mapping persisted.") + else: + context.log.info( + f"[DRY RUN] Would persist column-ID mapping for {full_table_name}." + ) + + +@asset +def bootstrap_column_id_maps( + context: OpExecutionContext, + spark: PySparkResource, +) -> None: + """One-time asset: bootstrap giga.columnId.* props for all existing Delta tables. + + Set DRY_RUN = True to log what would happen without making any changes. + """ + DRY_RUN = False + + s: SparkSession = spark.spark_session + + # Map of (dataset_type, tier) -> schema_name used for rename/delete detection + # Add more entries here if other dataset types are introduced. + dataset_configs: list[tuple[str, DataTier | None]] = [ + ("school_geolocation", DataTier.SILVER), + ("school_geolocation", DataTier.STAGING), + ("school_geolocation", None), # master: schema = "school_master" + ] + + for dataset_type, tier in dataset_configs: + if tier is None: + tier_schema = "school_master" + schema_name = dataset_type + else: + tier_schema = construct_schema_name_for_tier( + f"school_{dataset_type.replace('school_', '')}", tier + ) + schema_name = dataset_type + + context.log.info( + f"Processing schema '{tier_schema}' with schema_name='{schema_name}' ..." + ) + table_names = _get_all_country_tables(s, tier_schema) + + if not table_names: + context.log.info(f"No tables found in '{tier_schema}' — skipping.") + continue + + for full_table_name in table_names: + bootstrap_table( + spark=s, + context=context, + full_table_name=full_table_name, + schema_name=schema_name, + dry_run=DRY_RUN, + ) diff --git a/dagster/src/assets/migrations/core.py b/dagster/src/assets/migrations/core.py index 06c18f153..323cd4dac 100644 --- a/dagster/src/assets/migrations/core.py +++ b/dagster/src/assets/migrations/core.py @@ -4,7 +4,11 @@ from models import VALID_PRIMITIVES, Schema from pyspark import sql from pyspark.sql.functions import col, when -from src.utils.delta import execute_query_with_error_handler +from src.utils.delta import ( + create_delta_table, + persist_column_id_map, + sync_schema, +) from dagster import OpExecutionContext @@ -42,17 +46,32 @@ def save_schema_delta_table(context: OpExecutionContext, df: sql.DataFrame): full_table_name = f"{schema_name}.{table_name}" columns = Schema.fields - query = ( - DeltaTable.createOrReplace(spark).tableName(full_table_name).addColumns(columns) + create_delta_table( + spark, + schema_name, + table_name, + columns, + context, + if_not_exists=True, + ) + sync_schema( + table_name=full_table_name, + existing_schema=spark.table(full_table_name).schema, + updated_schema=Schema.schema, + spark=spark, + context=context, ) - execute_query_with_error_handler(spark, query, schema_name, table_name, context) spark.catalog.refreshTable(full_table_name) ( DeltaTable.forName(spark, full_table_name) .alias("master") - .merge(df.alias("updates"), "master.name = updates.name") + .merge(df.alias("updates"), "master.id = updates.id") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() + .whenNotMatchedBySourceDelete() .execute() ) + + # Persist column-ID mapping after merge succeeds + persist_column_id_map(spark, full_table_name, table_name) diff --git a/dagster/src/assets/school_geolocation/assets.py b/dagster/src/assets/school_geolocation/assets.py index 226614aa4..c02c487d0 100644 --- a/dagster/src/assets/school_geolocation/assets.py +++ b/dagster/src/assets/school_geolocation/assets.py @@ -128,10 +128,19 @@ def geolocation_metadata( context.log.info("Create spark dataframe") metadata_df = s.createDataFrame(metadata_df) - table_columns = get_schema_columns(s, "school_geolocation_metadata") table_name = "school_geolocation_metadata" table_schema_name = "pipeline_tables" + context.log.info("Get schema columns for metadata table") + try: + table_columns = get_schema_columns(s, "school_geolocation_metadata") + except Exception: + context.log.warning( + "Schema table schemas.school_geolocation_metadata not found; " + "using DataFrame schema for metadata table creation" + ) + table_columns = list(metadata_df.schema.fields) + context.log.info("Create the schema and table if they do not exist") metadata_df = add_missing_columns(metadata_df, table_columns) metadata_df = metadata_df.select(*StructType(table_columns).fieldNames()) @@ -327,7 +336,9 @@ def geolocation_data_quality_results( ) dq_results_schema_name = f"{schema_name}_dq_results" - table_name = f"{id}_{country_code}_{current_timestamp}" + # Replace hyphens with underscores so the identifier is valid in Spark SQL + safe_id = id.replace("-", "_") + table_name = f"{safe_id}_{country_code}_{current_timestamp}" schema_columns = [ StructField(field.name, field.dataType, nullable=True) diff --git a/dagster/src/internal/common_assets/staging.py b/dagster/src/internal/common_assets/staging.py index 49c9dc9fa..6588f6d2d 100644 --- a/dagster/src/internal/common_assets/staging.py +++ b/dagster/src/internal/common_assets/staging.py @@ -13,6 +13,7 @@ LongType, StringType, StructField, + StructType, TimestampType, ) from sqlalchemy import select, update @@ -28,6 +29,8 @@ check_table_exists, create_delta_table, create_schema, + persist_column_id_map, + sync_schema, ) from src.utils.op_config import FileConfig from src.utils.schema import ( @@ -290,6 +293,17 @@ def _write_pending_records(self, pending: sql.DataFrame) -> None: partition_by=["upload_id"], ) else: + # Synchronise staging schema (handles renames/deletions) + existing_schema = self.spark.table(self.staging_table_name).schema + sync_schema( + table_name=self.staging_table_name, + existing_schema=existing_schema, + updated_schema=StructType(pending_schema), + spark=self.spark, + context=self.context, + schema_name=self.schema_name, + ) + upload_id = self.config.filename_components.id DeltaTable.forName(self.spark, self.staging_table_name).delete( f.col("upload_id") == upload_id @@ -315,6 +329,10 @@ def _write_pending_records(self, pending: sql.DataFrame) -> None: .saveAsTable(self.staging_table_name) ) + # Persist column-ID mapping after staging data is written + + persist_column_id_map(self.spark, self.staging_table_name, self.schema_name) + def _update_approval_request_status(self) -> None: """Enable the ApprovalRequest if any actionable (non-UNCHANGED) rows exist.""" actionable = ( diff --git a/dagster/src/resources/io_managers/adls_delta.py b/dagster/src/resources/io_managers/adls_delta.py index fe7f0687a..8110b39b5 100644 --- a/dagster/src/resources/io_managers/adls_delta.py +++ b/dagster/src/resources/io_managers/adls_delta.py @@ -12,7 +12,11 @@ from src.settings import settings from src.spark.transform_functions import add_missing_columns from src.utils.adls import ADLSFileClient -from src.utils.delta import build_deduped_merge_query, execute_query_with_error_handler +from src.utils.delta import ( + build_deduped_merge_query, + execute_query_with_error_handler, + persist_column_id_map, +) from src.utils.op_config import FileConfig from src.utils.schema import ( construct_full_table_name, @@ -191,35 +195,33 @@ def _upsert_data( columns = incoming_schema.fields primary_key = "gigasync_id" else: + from src.utils.delta import sync_schema + columns = get_schema_columns(spark, schema_name) primary_key = get_primary_key(spark, schema_name) - updated_schema = StructType(columns) - updated_columns = sorted(updated_schema.fieldNames()) - - existing_df = DeltaTable.forName(spark, full_table_name).toDF() - existing_columns = sorted(existing_df.schema.fieldNames()) - - context.log.info(f"incoming schema {data.schema}") - context.log.info(f"existing schema {existing_df.schema}") - - if updated_columns != existing_columns: - context.log.info("Updating schema...") - - empty_data = spark.sparkContext.emptyRDD() - updated_schema_df = spark.createDataFrame( - data=empty_data, schema=updated_schema - ) + # Break the lazy lineage to avoid DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS + # if sync_schema renames columns or changes types. + data = spark.createDataFrame(data.rdd, data.schema).cache() - ( - updated_schema_df.write.option("mergeSchema", "true") - .format("delta") - .mode("append") - .saveAsTable(full_table_name) - ) + updated_schema = StructType(columns) + spark.catalog.refreshTable(full_table_name) + existing_schema = DeltaTable.forName(spark, full_table_name).toDF().schema + + sync_schema( + table_name=full_table_name, + existing_schema=existing_schema, + updated_schema=updated_schema, + spark=spark, + context=context, + schema_name=schema_name, + ) update_columns = [c.name for c in columns if c.name != primary_key] + + spark.catalog.refreshTable(full_table_name) master = DeltaTable.forName(spark, full_table_name) + query = build_deduped_merge_query( master, data, @@ -233,9 +235,14 @@ def _upsert_data( if query is not None: query.execute() + # Persist column-ID mapping after merge succeeds + # This ensures mapping is only updated when data is successfully merged + if not is_qos: + persist_column_id_map(spark, full_table_name, schema_name) + def _overwrite_data( self, - data: sql.dataframe, + data: sql.DataFrame, schema_name: str, full_table_name: str, context: OutputContext = None, diff --git a/dagster/src/spark/config_expectations.py b/dagster/src/spark/config_expectations.py index 829541924..3c36a7b06 100644 --- a/dagster/src/spark/config_expectations.py +++ b/dagster/src/spark/config_expectations.py @@ -136,6 +136,7 @@ class Config(BaseSettings): ("school_id_giga", "STRING"), ("school_id_govt", "STRING"), ("school_name", "STRING"), + ("official_school_name", "STRING"), ("school_establishment_year", "INT"), ("latitude", "DOUBLE"), ("longitude", "DOUBLE"), @@ -222,6 +223,7 @@ class Config(BaseSettings): "school_id_giga", "school_id_govt", "school_name", + "official_school_name", "longitude", "latitude", "education_level", diff --git a/dagster/src/spark/transform_functions.py b/dagster/src/spark/transform_functions.py index 8d3ebe410..b1ba6836c 100644 --- a/dagster/src/spark/transform_functions.py +++ b/dagster/src/spark/transform_functions.py @@ -55,22 +55,32 @@ def create_school_id_giga(df: sql.DataFrame) -> sql.DataFrame: # "school_id_giga", f.coalesce(f.col("school_id_giga"), f.lit(None)) # ) - school_id_giga_prereqs = [ - "school_id_govt", - "school_name", - "education_level", - "latitude", - "longitude", - ] - for column in school_id_giga_prereqs: - if column not in df.columns: - return df.withColumn("school_id_giga", f.lit(None)) + available_columns = set(df.columns) + if ( + "school_id_govt" not in available_columns + or ( + "school_name" not in available_columns + and "official_school_name" not in available_columns + ) + or "education_level" not in available_columns + or "latitude" not in available_columns + or "longitude" not in available_columns + ): + return df.withColumn("school_id_giga", f.lit(None)) + + # Use official_school_name as fallback for school_name + school_name_col = f.coalesce( + f.col("school_name") if "school_name" in available_columns else f.lit(None), + f.col("official_school_name") + if "official_school_name" in available_columns + else f.lit(None), + ) df = df.withColumn( "identifier_concat", f.concat( f.col("school_id_govt").cast(StringType()), - f.col("school_name").cast(StringType()), + school_name_col.cast(StringType()), f.col("education_level").cast(StringType()), f.col("latitude").cast(StringType()), f.col("longitude").cast(StringType()), diff --git a/dagster/src/utils/delta.py b/dagster/src/utils/delta.py index c01b207df..09d52e7a9 100644 --- a/dagster/src/utils/delta.py +++ b/dagster/src/utils/delta.py @@ -1,3 +1,5 @@ +import uuid + from delta.tables import DeltaMergeBuilder, DeltaTable, DeltaTableBuilder from icecream import ic from pyspark import sql @@ -294,13 +296,652 @@ def build_nullability_queries( return alter_stmts +def enable_column_mapping(spark: SparkSession, table_name: str) -> None: + """Enable column mapping mode on an existing Delta table if not already enabled.""" + spark.sql( + f"ALTER TABLE {table_name} SET TBLPROPERTIES (" + f" 'delta.columnMapping.mode' = 'name'," + f" 'delta.minReaderVersion' = '2'," + f" 'delta.minWriterVersion' = '5'" + f")" + ) + + +def get_stored_column_id_map(spark: SparkSession, table_name: str) -> dict[str, str]: + """Retrieve the column-name → schema-CSV-ID mapping stored in table properties. + + Returns ``{column_name: csv_id}`` or an empty dict if no mapping has been + stored yet (e.g. tables created before this feature was added). + """ + spark.catalog.refreshTable(table_name) + detail = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0] + properties: dict = detail["properties"] if detail["properties"] else {} + result = {} + prefix = "giga.columnId." + for key, value in properties.items(): + if key.startswith(prefix): + col_name = key[len(prefix) :] + result[col_name] = value + return result + + +def store_column_id_map( + spark: SparkSession, + table_name: str, + column_id_map: dict[str, str], +) -> None: + """Persist the column-name → schema-CSV-ID mapping as Delta table properties. + + Also removes any stale ``giga.columnId.*`` properties for columns that are + not present in the new mapping. This prevents accumulation of old column + name props across renames, which would otherwise cause future rename + detection to misbehave (e.g. multiple props pointing to the same UUID). + """ + if not column_id_map: + return + + # Remove stale props for columns that no longer exist in the new mapping + current_props = get_stored_column_id_map(spark, table_name) + stale_columns = [ + col_name for col_name in current_props if col_name not in column_id_map + ] + if stale_columns: + remove_column_id_props(spark, table_name, stale_columns) + + props = ", ".join( + f"'giga.columnId.{col_name}' = '{csv_id}'" + for col_name, csv_id in column_id_map.items() + ) + spark.sql(f"ALTER TABLE {table_name} SET TBLPROPERTIES ({props})") + + +def remove_column_id_props( + spark: SparkSession, + table_name: str, + column_names: list[str], +) -> None: + """Remove column-ID table properties for dropped columns.""" + if not column_names: + return + props = ", ".join(f"'giga.columnId.{name}'" for name in column_names) + spark.sql(f"ALTER TABLE {table_name} UNSET TBLPROPERTIES IF EXISTS ({props})") + + +_PK_PROPERTY_KEY = "giga.pkColumnIds" + + +def get_stored_pk_uuids(spark: SparkSession, table_name: str) -> set[str]: + """Return the set of UUIDs marked as primary keys on this table. + + Returns an empty set if the property is absent (e.g. tables created + before PK persistence was added). + """ + spark.catalog.refreshTable(table_name) + detail = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0] + properties: dict = detail["properties"] if detail["properties"] else {} + raw = properties.get(_PK_PROPERTY_KEY) + if not raw: + return set() + return {part.strip() for part in raw.split(",") if part.strip()} + + +def store_pk_uuids( + spark: SparkSession, + table_name: str, + pk_uuids: set[str], +) -> None: + """Persist (or update) the set of primary-key UUIDs for this table.""" + if not pk_uuids: + return + joined = ",".join(sorted(pk_uuids)) + spark.sql( + f"ALTER TABLE {table_name} SET TBLPROPERTIES " + f"('{_PK_PROPERTY_KEY}' = '{joined}')" + ) + + +def detect_renames_and_deletes( + existing_id_map: dict[str, str], + updated_id_map: dict[str, str], +) -> tuple[dict[str, str], list[str]]: + """Compare old and new column-ID mappings to detect renames and deletes. + + Parameters + ---------- + existing_id_map : dict[str, str] + ``{column_name: csv_id}`` from the current table properties. + updated_id_map : dict[str, str] + ``{column_name: csv_id}`` from the latest schema CSV. + + Returns + ------- + renames : dict[str, str] + ``{old_name: new_name}`` for columns whose ID stayed but name changed. + deletes : list[str] + Column names present in the table but whose ID is no longer in the + updated schema (i.e. the column should be dropped). + """ + # Invert maps: csv_id → column_name + existing_by_id = {v: k for k, v in existing_id_map.items()} + updated_by_id = {v: k for k, v in updated_id_map.items()} + + renames: dict[str, str] = {} + deletes: list[str] = [] + + for csv_id, old_name in existing_by_id.items(): + if csv_id in updated_by_id: + new_name = updated_by_id[csv_id] + if old_name != new_name: + renames[old_name] = new_name + else: + # ID no longer present in the reference schema → column deleted + deletes.append(old_name) + + return renames, deletes + + +def initialize_column_id_map( + spark: SparkSession, + table_name: str, + updated_id_map: dict[str, str], + context: OpExecutionContext, +) -> tuple[dict[str, str], dict[str, str], list[str]]: + """Initialize column ID mapping from table schema when no stored mapping exists. + + Returns tuple of (initialized_id_map, renames, deletes). + """ + context.log.info( + "No stored column-ID mapping found; initialising mapping from current table schema." + ) + existing_id_map: dict[str, str] = {} + # Initialize mapping from current table columns so deletions can be detected + # This handles the case where columns were dropped from CSV but table still has them + current_columns = DeltaTable.forName(spark, table_name).toDF().schema.fieldNames() + for col_name in current_columns: + if col_name not in updated_id_map: + # Column exists in table but not in reference schema - will be detected as delete + # Use a deterministic ID based on column name for tracking + existing_id_map[col_name] = f"table_{col_name}" + + renames: dict[str, str] = {} + deletes: list[str] = [] + if existing_id_map: + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + context.log.info(f"Detected renames after init: {renames}") + context.log.info(f"Detected deletes after init: {deletes}") + + return existing_id_map, renames, deletes + + +def execute_renames( + spark: SparkSession, + table_name: str, + renames: dict[str, str], + context: OpExecutionContext, +) -> None: + """Execute column rename SQL statements.""" + context.log.info(f"Renaming columns: {renames}") + for old_name, new_name in renames.items(): + stmt = f"ALTER TABLE {table_name} RENAME COLUMN `{old_name}` TO `{new_name}`" + context.log.info(f"Executing: {stmt}") + spark.sql(stmt) + remove_column_id_props(spark, table_name, list(renames.keys())) + + +def execute_deletes( + spark: SparkSession, + table_name: str, + deletes: list[str], + context: OpExecutionContext, +) -> None: + """Execute column drop SQL statements.""" + detail = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0] + partition_columns: set[str] = set(detail["partitionColumns"] or []) + + skipped = [c for c in deletes if c in partition_columns] + to_drop = [c for c in deletes if c not in partition_columns] + + if skipped: + context.log.info( + f"Skipping delete for partition column(s) {skipped} on {table_name} " + f"— Delta Lake does not allow dropping partition columns." + ) + + context.log.info(f"Dropping columns: {to_drop}") + for col_name in to_drop: + stmt = f"ALTER TABLE {table_name} DROP COLUMN `{col_name}`" + context.log.info(f"Executing: {stmt}") + spark.sql(stmt) + remove_column_id_props(spark, table_name, to_drop) + + +def build_excluded_columns( + partition_columns: set[str], + updated_schema: StructType | None, + updated_id_map: dict[str, str], +) -> set[str]: + """Return columns that should be excluded from rename/delete detection.""" + caller_managed = ( + set(updated_schema.fieldNames()) if updated_schema is not None else set() + ) + excluded = set(partition_columns) + if caller_managed: + excluded |= {c for c in caller_managed if c not in updated_id_map} + return excluded + + +def bootstrap_orphan_columns( + spark: SparkSession, + table_name: str, + existing_id_map: dict[str, str], + updated_id_map: dict[str, str], + excluded: set[str], + context: OpExecutionContext, +) -> None: + """Supplement existing_id_map for columns present in the table but lacking stored UUIDs.""" + current_columns = DeltaTable.forName(spark, table_name).toDF().schema.fieldNames() + for col_name in current_columns: + if col_name in excluded: + continue + if col_name not in existing_id_map: + if col_name in updated_id_map: + existing_id_map[col_name] = updated_id_map[col_name] + context.log.info( + f"Column '{col_name}' exists in table but has no stored UUID; " + f"matched to schema UUID '{updated_id_map[col_name]}'." + ) + else: + existing_id_map[col_name] = f"table_{col_name}" + context.log.warning( + f"Column '{col_name}' exists in table but has no stored UUID " + f"and is not in the updated schema. " + f"This can happen when a schema rename occurred before the column-ID " + f"mapping was ever persisted for this table. " + f"The column will NOT be auto-renamed. " + f"To resolve: manually run " + f"`ALTER TABLE {table_name} RENAME COLUMN `{col_name}` TO ` " + f"and then re-run persist_column_id_map for this table." + ) + + +def filter_pk_changes( + existing_id_map: dict[str, str], + updated_id_map: dict[str, str], + renames: dict[str, str], + deletes: list[str], + primary_key_columns: set[str], + persisted_pk_uuids: set[str], + context: OpExecutionContext, +) -> tuple[dict[str, str], list[str], set[str], dict[str, str]]: + """Filter renames and deletes that touch primary-key columns. + + Returns (filtered_renames, filtered_deletes, blocked_renames, renames_original). + """ + pk_uuids = { + updated_id_map[name] for name in primary_key_columns if name in updated_id_map + } + pk_uuids |= persisted_pk_uuids + renames_original = dict(renames) + + blocked_renames: set[str] = set() + if pk_uuids: + blocked_renames = { + old_name + for old_name, new_name in renames.items() + if existing_id_map.get(old_name) in pk_uuids + } + if blocked_renames: + context.log.warning( + f"Blocked rename of primary key columns: {blocked_renames}" + ) + for old_name in blocked_renames: + del renames[old_name] + + blocked_deletes = [ + col_name + for col_name in deletes + if existing_id_map.get(col_name) in pk_uuids + ] + if blocked_deletes: + context.log.warning( + f"Blocked delete of primary key columns: {blocked_deletes}" + ) + for col_name in blocked_deletes: + deletes.remove(col_name) + + return renames, deletes, blocked_renames, renames_original + + +def detect_and_filter_changes( + spark: SparkSession, + table_name: str, + existing_id_map: dict[str, str], + updated_id_map: dict[str, str], + updated_schema: StructType | None, + partition_columns: set[str], + primary_key_columns: set[str], + persisted_pk_uuids: set[str], + context: OpExecutionContext, +) -> tuple[dict[str, str], list[str], set[str], dict[str, str]]: + """Detect renames/deletes, exclude partition/caller-managed cols, and filter PK changes.""" + excluded = build_excluded_columns(partition_columns, updated_schema, updated_id_map) + + if excluded: + for col_name in list(existing_id_map.keys()): + if col_name in excluded: + reason = ( + "partition column" + if col_name in partition_columns + else "caller-managed column" + ) + context.log.info( + f"Excluding '{col_name}' from rename/delete detection ({reason})." + ) + del existing_id_map[col_name] + for col_name in list(updated_id_map.keys()): + if col_name in excluded: + del updated_id_map[col_name] + + bootstrap_orphan_columns( + spark, table_name, existing_id_map, updated_id_map, excluded, context + ) + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + return filter_pk_changes( + existing_id_map, + updated_id_map, + renames, + deletes, + primary_key_columns, + persisted_pk_uuids, + context, + ) + + +def execute_renames_and_deletes( + spark: SparkSession, + table_name: str, + renames: dict[str, str], + deletes: list[str], + context: OpExecutionContext, +) -> None: + """Enable column mapping and execute any renames or deletes.""" + context.log.info(f"Detected renames: {renames}") + context.log.info(f"Detected deletes: {deletes}") + + if renames or deletes: + context.log.info( + "Enabling column mapping on table for rename/delete support..." + ) + enable_column_mapping(spark, table_name) + + if renames: + execute_renames(spark, table_name, renames, context) + + if deletes: + execute_deletes(spark, table_name, deletes, context) + + if renames or deletes: + spark.catalog.refreshTable(table_name) + + +def apply_renames_and_deletes( + spark: SparkSession, + table_name: str, + schema_name: str, + context: OpExecutionContext, + updated_schema: StructType | None = None, +) -> tuple[bool, set[str]]: + """Detect and apply column renames and deletes to a Delta table based on the reference schema.""" + + from src.utils.schema import get_schema_columns_with_id + + columns_with_id = get_schema_columns_with_id(spark, schema_name) + updated_id_map = {field.name: csv_id for csv_id, field in columns_with_id} + existing_id_map = get_stored_column_id_map(spark, table_name) + + if not existing_id_map: + context.log.info( + f"No stored column-ID mapping found for {table_name}. " + "Bootstrapping UUID props from current schema. " + "Rename/delete detection will be active from the next run onwards." + ) + persist_column_id_map(spark, table_name, schema_name) + return False, set() + + detail = spark.sql(f"DESCRIBE DETAIL {table_name}").collect()[0] + partition_columns: set[str] = set(detail["partitionColumns"] or []) + + # Discover primary key columns from the reference schema. + # The schemas Delta table has a ``primary_key`` boolean column. + primary_key_columns: set[str] = set() + try: + pk_rows = spark.sql( + f"SELECT name FROM schemas.{schema_name} WHERE primary_key = true" # nosec B608 + ).collect() + primary_key_columns = {row["name"] for row in pk_rows} + if primary_key_columns: + context.log.info( + f"Primary key columns detected: {sorted(primary_key_columns)}" + ) + except Exception: + # Schema table may not exist or lack the primary_key column + pass + + persisted_pk_uuids: set[str] = get_stored_pk_uuids(spark, table_name) + if persisted_pk_uuids: + context.log.info(f"Persisted PK UUIDs on table: {sorted(persisted_pk_uuids)}") + + renames, deletes, blocked_renames, renames_original = detect_and_filter_changes( + spark, + table_name, + existing_id_map, + updated_id_map, + updated_schema, + partition_columns, + primary_key_columns, + persisted_pk_uuids, + context, + ) + + execute_renames_and_deletes(spark, table_name, renames, deletes, context) + + # Collect the new names of blocked renames so that sync_schema can + # avoid adding them as new columns (since the rename was blocked). + blocked_new_names: set[str] = set() + if blocked_renames: + blocked_new_names = { + new_name + for old_name, new_name in renames_original.items() + if old_name in blocked_renames + } + + return bool(renames or deletes), blocked_new_names + + +def persist_column_id_map( + spark: SparkSession, table_name: str, schema_name: str +) -> None: + """Read the column ID mapping from the schema CSV and store it as table properties.""" + from src.utils.schema import get_schema_columns_with_id + + columns_with_id = get_schema_columns_with_id(spark, schema_name) + # uuid -> schema_column_name + schema_uuid_to_name = {csv_id: field.name for csv_id, field in columns_with_id} + # schema_column_name -> uuid + schema_name_to_uuid = {field.name: csv_id for csv_id, field in columns_with_id} + + # Previous stored map (may have old names for blocked renames) + previous_map = get_stored_column_id_map(spark, table_name) + + spark.catalog.refreshTable(table_name) + table_columns = spark.table(table_name).columns + + new_id_map: dict[str, str] = {} + for col_name in table_columns: + # 1. If the column name exactly matches a schema column → use schema UUID + if col_name in schema_name_to_uuid: + new_id_map[col_name] = schema_name_to_uuid[col_name] + continue + + # 2. If the column has a stored UUID that maps to a schema column + # (rename was blocked, old table name with valid UUID) + prev_uuid = previous_map.get(col_name) + if prev_uuid and prev_uuid in schema_uuid_to_name: + new_id_map[col_name] = prev_uuid + continue + + # 3. Otherwise keep previous UUID if any, or generate new one + if prev_uuid: + new_id_map[col_name] = prev_uuid + else: + new_id_map[col_name] = str(uuid.uuid4()) + + store_column_id_map(spark, table_name, new_id_map) + + # Persist PK UUIDs for protection across schema removals. + # We accumulate (union) so a PK once registered stays protected. + try: + pk_rows = spark.sql( + f"SELECT name FROM schemas.{schema_name} WHERE primary_key = true" # nosec B608 + ).collect() + current_pk_uuids = { + schema_name_to_uuid[row["name"]] + for row in pk_rows + if row["name"] in schema_name_to_uuid + } + except Exception: + current_pk_uuids = set() + + previous_pk_uuids = get_stored_pk_uuids(spark, table_name) + merged_pk_uuids = previous_pk_uuids | current_pk_uuids + if merged_pk_uuids and merged_pk_uuids != previous_pk_uuids: + store_pk_uuids(spark, table_name, merged_pk_uuids) + + +def apply_datatype_changes( + spark: SparkSession, + table_name: str, + changed_datatypes: dict, + context: OpExecutionContext, +) -> None: + """Apply datatype changes by casting columns and overwriting the table schema.""" + if not changed_datatypes: + return + + context.log.info("Updating datatype...") + context.log.info(f"Changed datatypes: {changed_datatypes}") + existing_dataframe = spark.table(table_name) + updated_df = existing_dataframe + + for column, datatype in changed_datatypes.items(): + updated_df = updated_df.withColumn( + column, existing_dataframe[column].cast(datatype.typeName()) + ) + + ( + updated_df.write.option("overwriteSchema", "true") + .format("delta") + .mode("overwrite") + .saveAsTable(table_name) + ) + spark.catalog.refreshTable(table_name) + + +def handle_removed_columns( + spark: SparkSession, + table_name: str, + removed_columns: set[str], + schema_name: str | None, + context: OpExecutionContext, +) -> None: + """Safely handle columns that exist in the table but not in the updated schema. + + When ``schema_name`` is provided, :func:`apply_renames_and_deletes` is the + authoritative path for both renames and deletes (UUID-based detection). + If columns end up here despite that, it likely means rename detection + failed for them. Log a clear warning instead of silently dropping to + prevent data loss. + + When ``schema_name`` is None (legacy callers, schema-tables migration), + fall back to dropping by name as before. + """ + if not removed_columns: + return + + if schema_name is not None: + context.log.warning( + f"Columns exist in table but not in updated schema: " + f"{removed_columns}. These were NOT handled by " + f"apply_renames_and_deletes - leaving them in place to avoid " + f"unintended data loss. If you intend to drop them, remove the " + f"column from the schema CSV (with its UUID) and re-run." + ) + return + + context.log.info(f"Dropping columns not in updated schema: {removed_columns}") + for col_name in removed_columns: + stmt = f"ALTER TABLE {table_name} DROP COLUMN `{col_name}`" + context.log.info(f"Executing: {stmt}") + spark.sql(stmt) + + def sync_schema( table_name: str, existing_schema: StructType, updated_schema: StructType, spark: SparkSession, context: OpExecutionContext, + schema_name: str | None = None, ): + """Synchronise a Delta table's schema with the reference schema. + + Supports: + * Adding columns (existing behaviour via ``mergeSchema``) + * Renaming columns (via ``ALTER TABLE RENAME COLUMN``) + * Dropping columns (via ``ALTER TABLE DROP COLUMN``) + * Changing data types (via overwrite with ``overwriteSchema``) + * Changing nullability constraints + + Column renames and deletes require ``schema_name`` so that the stable + UUID column IDs from the schema CSV can be compared against the IDs + stored in the table properties. + """ + # ------------------------------------------------------------------ + # 1. Detect and apply renames & deletes + # ------------------------------------------------------------------ + any_renames_deletes = False + blocked_new_names: set[str] = set() + if schema_name is not None: + any_renames_deletes, blocked_new_names = apply_renames_and_deletes( + spark, table_name, schema_name, context, updated_schema=updated_schema + ) + + # ------------------------------------------------------------------ + # 2. Refresh schemas after rename/delete to get accurate comparison + # ------------------------------------------------------------------ + if any_renames_deletes: + spark.catalog.refreshTable(table_name) + existing_schema = spark.table(table_name).schema + + # ------------------------------------------------------------------ + # 2a. Remove blocked-new names from updated_schema so they are not + # incorrectly added as new columns (e.g. a primary key rename + # that was blocked should not create a duplicate column). + # ------------------------------------------------------------------ + if blocked_new_names: + context.log.info( + f"Blocked rename targets excluded from add logic: {blocked_new_names}" + ) + filtered_fields = [ + f for f in updated_schema.fields if f.name not in blocked_new_names + ] + updated_schema = StructType(filtered_fields) + + # ------------------------------------------------------------------ + # 3. Detect added columns & datatype changes (existing logic) + # ------------------------------------------------------------------ alter_stmts = build_nullability_queries( context=context, existing_schema=existing_schema, @@ -310,38 +951,20 @@ def sync_schema( context.log.info(f"alter_stmts {alter_stmts}") has_nullability_changed = len(alter_stmts) > 0 - existing_columns = {field.name: field.name for field in existing_schema} - updated_columns = {field.name: field.name for field in updated_schema} - - added_columns = set(updated_columns.keys()) - set(existing_columns.keys()) - removed_columns = set(existing_columns.keys()) - set(updated_columns.keys()) + existing_columns = {field.name for field in existing_schema} + updated_columns_set = {field.name for field in updated_schema} - has_schema_changed = len(added_columns) + len(removed_columns) > 0 + added_columns = updated_columns_set - existing_columns + # Recalculate removed_columns after renames/deletes were applied + # This ensures we correctly identify columns that should be dropped + removed_columns = existing_columns - updated_columns_set changed_datatypes = get_changed_datatypes( context=context, existing_schema=existing_schema, updated_schema=updated_schema ) - has_datatype_changed = len(changed_datatypes) > 0 + apply_datatype_changes(spark, table_name, changed_datatypes, context) - if has_datatype_changed: - context.log.info("Updating datatype...") - context.log.info(f"Changed datatypes: {changed_datatypes}") - existing_dataframe = spark.table(table_name) - updated_df = existing_dataframe - - for column, datatype in changed_datatypes.items(): - updated_df = updated_df.withColumn( - column, existing_dataframe[column].cast(datatype.typeName()) - ) - - ( - updated_df.write.option("overwriteSchema", "true") - .format("delta") - .mode("overwrite") - .saveAsTable(table_name) - ) - - if has_schema_changed: + if added_columns: context.log.info(f"Adding schema columns {added_columns}") empty_dataframe_with_updated_schema = spark.createDataFrame( @@ -353,6 +976,9 @@ def sync_schema( .mode("append") .saveAsTable(table_name) ) + + handle_removed_columns(spark, table_name, removed_columns, schema_name, context) + context.log.info(f"has_nullability_changed {has_nullability_changed}") if has_nullability_changed: @@ -367,5 +993,10 @@ def sync_schema( except AnalysisException as exc: if "DELTA_CONSTRAINT_ALREADY_EXISTS" in str(exc): continue + elif "DELTA_NEW_CHECK_CONSTRAINT_VIOLATION" in str(exc): + context.log.warning( + f"Skipping NOT NULL constraint because existing data has nulls: {exc}" + ) + continue else: raise diff --git a/dagster/src/utils/schema.py b/dagster/src/utils/schema.py index 37185bfda..04e787625 100644 --- a/dagster/src/utils/schema.py +++ b/dagster/src/utils/schema.py @@ -14,6 +14,19 @@ from src.constants import DataTier, constants +def get_type_mapping(data_type: str): + """Map a data type string to its corresponding TypeMapping from constants. + + Handles case-insensitivity and common aliases (e.g., 'INT' -> 'integer'). + """ + normalized_type = data_type.lower() + # Handle common aliases from config_expectations + if normalized_type == "int": + normalized_type = "integer" + + return getattr(constants.TYPE_MAPPINGS, normalized_type) + + def get_schema_name( context: InputContext | OutputContext | OpExecutionContext | AssetExecutionContext, ) -> str: @@ -40,7 +53,7 @@ def get_schema_columns(spark: SparkSession, schema_name: str) -> list[StructFiel existing_columns = [ StructField( row.name, - getattr(constants.TYPE_MAPPINGS, row.data_type).pyspark(), + get_type_mapping(row.data_type).pyspark(), row.is_nullable, ) for row in df.collect() @@ -68,6 +81,34 @@ def get_schema_columns(spark: SparkSession, schema_name: str) -> list[StructFiel return existing_columns +def get_schema_columns_with_id( + spark: SparkSession, schema_name: str +) -> list[tuple[str, StructField]]: + """Return schema columns paired with their stable UUID id. + + Each tuple is ``(id, StructField)``. The ``id`` is the fixed UUID + assigned to the column in the schema CSV stored in ADLS. By comparing + IDs between the reference schema and an existing Delta table we can + detect: + + * **Renames** – same ID, different ``StructField.name`` + * **Deletes** – ID present in the table but absent from the reference + * **Adds** – ID present in the reference but absent from the table + """ + df = get_schema_table(spark, schema_name) + return [ + ( + row.id, + StructField( + row.name, + get_type_mapping(row.data_type).pyspark(), + row.is_nullable, + ), + ) + for row in df.collect() + ] + + def get_schema_column_descriptions( spark: SparkSession, schema_name: str ) -> dict[str:str]: @@ -78,8 +119,7 @@ def get_schema_column_descriptions( def get_schema_columns_datahub(spark: SparkSession, schema_name: str) -> list[tuple]: df = get_schema_table(spark, schema_name) return [ - (row.name, getattr(constants.TYPE_MAPPINGS, row.data_type).datahub()) - for row in df.collect() + (row.name, get_type_mapping(row.data_type).datahub()) for row in df.collect() ] diff --git a/dagster/src/utils/spark.py b/dagster/src/utils/spark.py index 2acabc5f4..f24383dea 100644 --- a/dagster/src/utils/spark.py +++ b/dagster/src/utils/spark.py @@ -56,6 +56,9 @@ def _get_host_ip() -> str: "spark.databricks.delta.properties.defaults.appendOnly": "false", "spark.databricks.delta.schema.autoMerge.enabled": "false", "spark.databricks.delta.catalog.update.enabled": "true", + "spark.databricks.delta.properties.defaults.columnMapping.mode": "name", + "spark.databricks.delta.properties.defaults.minReaderVersion": "2", + "spark.databricks.delta.properties.defaults.minWriterVersion": "5", "spark.sql.legacy.parquet.nanosAsLong": "true", } diff --git a/dagster/tests/utils/test_delta_sync_schema.py b/dagster/tests/utils/test_delta_sync_schema.py new file mode 100644 index 000000000..a314b83a7 --- /dev/null +++ b/dagster/tests/utils/test_delta_sync_schema.py @@ -0,0 +1,1179 @@ +"""Tests for the Delta Lake column rename/delete detection helpers in src.utils.delta.""" + +from src.utils.delta import detect_renames_and_deletes + + +class TestDetectRenamesAndDeletes: + """Unit tests for detect_renames_and_deletes.""" + + def test_no_changes(self): + existing = {"col_a": "id-1", "col_b": "id-2", "col_c": "id-3"} + updated = {"col_a": "id-1", "col_b": "id-2", "col_c": "id-3"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {} + assert deletes == [] + + def test_column_renamed(self): + existing = {"old_name": "id-1", "col_b": "id-2"} + updated = {"new_name": "id-1", "col_b": "id-2"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {"old_name": "new_name"} + assert deletes == [] + + def test_column_deleted(self): + existing = {"col_a": "id-1", "col_b": "id-2", "col_c": "id-3"} + updated = {"col_a": "id-1", "col_b": "id-2"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {} + assert deletes == ["col_c"] + + def test_column_added_only(self): + """Adding a column (ID in updated but not existing) should not trigger renames or deletes.""" + existing = {"col_a": "id-1"} + updated = {"col_a": "id-1", "col_new": "id-new"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {} + assert deletes == [] + + def test_rename_and_delete_combined(self): + existing = { + "old_name": "id-1", + "col_b": "id-2", + "col_to_drop": "id-3", + } + updated = { + "new_name": "id-1", + "col_b": "id-2", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {"old_name": "new_name"} + assert deletes == ["col_to_drop"] + + def test_rename_delete_and_add(self): + existing = { + "old_name": "id-1", + "col_b": "id-2", + "col_drop": "id-3", + } + updated = { + "new_name": "id-1", + "col_b": "id-2", + "col_new": "id-4", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {"old_name": "new_name"} + assert deletes == ["col_drop"] + + def test_multiple_renames(self): + existing = {"a": "id-1", "b": "id-2", "c": "id-3"} + updated = {"x": "id-1", "y": "id-2", "c": "id-3"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {"a": "x", "b": "y"} + assert deletes == [] + + def test_multiple_deletes(self): + existing = {"a": "id-1", "b": "id-2", "c": "id-3"} + updated = {"a": "id-1"} + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {} + assert sorted(deletes) == ["b", "c"] + + def test_empty_existing(self): + """If existing is empty, there should be no changes.""" + renames, deletes = detect_renames_and_deletes({}, {"a": "id-1"}) + assert renames == {} + assert deletes == [] + + def test_empty_updated_deletes_all(self): + """If updated is empty, all existing columns should be deleted.""" + existing = {"a": "id-1", "b": "id-2"} + renames, deletes = detect_renames_and_deletes(existing, {}) + assert renames == {} + assert sorted(deletes) == ["a", "b"] + + def test_both_empty(self): + renames, deletes = detect_renames_and_deletes({}, {}) + assert renames == {} + assert deletes == [] + + +class TestSyncSchemaRemovedColumns: + """Unit tests for sync_schema removed_columns calculation logic.""" + + def test_removed_columns_detection(self): + """Test that removed_columns is correctly calculated after renames.""" + existing_columns = {"col_a", "col_b", "col_c"} + updated_columns_set = {"col_a", "col_x"} + removed_columns = existing_columns - updated_columns_set + assert removed_columns == {"col_b", "col_c"} + + def test_removed_columns_after_rename_applied(self): + """Test that after rename is applied, only orphaned columns are removed.""" + existing_after_rename = {"col_a", "col_x", "col_c"} + updated_columns_set = {"col_a", "col_x"} + removed_columns = existing_after_rename - updated_columns_set + assert removed_columns == {"col_c"} + + def test_removed_columns_empty_when_no_deletions(self): + """Test that removed_columns is empty when no columns are deleted.""" + existing_columns = {"col_a", "col_b", "col_c"} + updated_columns_set = {"col_a", "col_b", "col_c"} + removed_columns = existing_columns - updated_columns_set + assert removed_columns == set() + + +class TestApplyRenamesAndDeletesInitialization: + """Unit tests for apply_renames_and_deletes mapping initialization.""" + + def test_empty_mapping_skips_detection(self): + """When existing_id_map is completely empty (pre-existing table with no stored + UUIDs), rename/delete detection must be skipped entirely. + + Previously the code tagged all columns with synthetic IDs and treated them as + deletes, which caused data loss on pre-existing tables. The fix: when + existing_id_map is empty, return early after bootstrapping — do NOT drop anything. + """ + # Simulate the guard added to apply_renames_and_deletes: + # if not existing_id_map: bootstrap and return False (no changes) + existing_id_map = {} + assert not existing_id_map # guard condition: skip when completely empty + + def test_partial_mapping_still_detects_deletes(self): + """When existing_id_map is PARTIALLY populated (some columns have stored UUIDs), + columns that lack a UUID are given synthetic IDs and treated as deletes if they + are absent from the updated schema. + + Columns whose name matches a key in updated_id_map get the REAL UUID instead, + preventing false deletes for unchanged business columns. + """ + existing_id_map = {"col_a": "id-1", "col_b": "id-2"} + updated_id_map = {"col_a": "id-1", "col_b": "id-2"} + current_table_columns = ["col_a", "col_b", "orphan_col"] + + # Supplement: use real UUID when name matches, synthetic for orphans + for col_name in current_table_columns: + if col_name not in existing_id_map: + if col_name in updated_id_map: + existing_id_map[col_name] = updated_id_map[col_name] + else: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + assert deletes == ["orphan_col"] + assert renames == {} + + +class TestFalseDeletePrevention: + """Regression tests for the drop-and-re-add bug. + + Bug: When existing_id_map was partially populated, columns that existed + in BOTH the table AND the schema CSV (unchanged columns) were tagged with + synthetic IDs. The synthetic IDs never matched the real schema UUIDs, + causing them to be falsely detected as deletes, dropped, and then + immediately re-added by the sync_schema add-columns logic — losing all + data in those columns. + + Fix: When supplementing existing_id_map, if a column name matches a key + in updated_id_map, use the real schema UUID instead of a synthetic one. + """ + + @staticmethod + def _supplement_logic( + stored_id_map: dict, + updated_id_map: dict, + current_table_columns: list, + excluded: set | None = None, + ): + """Mirror the FIXED production supplementing logic.""" + existing = dict(stored_id_map) + excluded = excluded or set() + for col_name in current_table_columns: + if col_name in excluded: + continue + if col_name not in existing: + if col_name in updated_id_map: + existing[col_name] = updated_id_map[col_name] + else: + existing[col_name] = f"table_{col_name}" + return existing + + def test_unchanged_columns_not_falsely_deleted(self): + """Reproduces the exact user-reported bug. + + Table has: school_id_giga, longitude, school_name, num_students, latitude + Schema CSV has the same columns (with UUIDs). + Only school_id_giga has a stored UUID; the rest don't. + Expected: no renames, no deletes. + """ + stored_id_map = {"school_id_giga": "uuid-sig"} # partial: only one stored UUID + updated_id_map = { + "school_id_giga": "uuid-sig", + "longitude": "uuid-lon", + "school_name": "uuid-sn", + "num_students": "uuid-ns", + "latitude": "uuid-lat", + } + current_table_columns = [ + "school_id_giga", + "longitude", + "school_name", + "num_students", + "latitude", + ] + + existing = self._supplement_logic( + stored_id_map, updated_id_map, current_table_columns + ) + + renames, deletes = detect_renames_and_deletes(existing, updated_id_map) + # All columns should be recognized as unchanged (real UUIDs match) + assert renames == {} + assert deletes == [] + + def test_mixed_unchanged_and_orphan(self): + """Columns in both table and CSV are preserved; orphans are deleted.""" + stored_id_map = {"col_a": "uuid-a"} + updated_id_map = { + "col_a": "uuid-a", + "col_b": "uuid-b", # in table but no stored UUID + "col_c": "uuid-c", # in table but no stored UUID + } + current_table_columns = ["col_a", "col_b", "col_c", "orphan_col"] + + existing = self._supplement_logic( + stored_id_map, updated_id_map, current_table_columns + ) + + renames, deletes = detect_renames_and_deletes(existing, updated_id_map) + assert renames == {} + # Only orphan_col should be deleted (not col_b or col_c) + assert deletes == ["orphan_col"] + + def test_rename_still_works_with_fix(self): + """Renames are still detected when one column has a stored UUID + and the schema CSV has a new name for that UUID.""" + stored_id_map = {"old_name": "uuid-1", "col_b": "uuid-2"} + updated_id_map = { + "new_name": "uuid-1", # rename via UUID + "col_b": "uuid-2", + "col_c": "uuid-c", # in table but no stored UUID + } + current_table_columns = ["old_name", "col_b", "col_c"] + + existing = self._supplement_logic( + stored_id_map, updated_id_map, current_table_columns + ) + + renames, deletes = detect_renames_and_deletes(existing, updated_id_map) + assert renames == {"old_name": "new_name"} + assert deletes == [] + + def test_old_synthetic_logic_would_cause_false_deletes(self): + """Demonstrates that the OLD logic (always synthetic) causes false deletes.""" + stored_id_map = {"school_id_giga": "uuid-sig"} + updated_id_map = { + "school_id_giga": "uuid-sig", + "longitude": "uuid-lon", + "school_name": "uuid-sn", + } + current_table_columns = ["school_id_giga", "longitude", "school_name"] + + # OLD logic: always use synthetic ID + existing_buggy = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_buggy: + existing_buggy[col_name] = f"table_{col_name}" # BUG + + _, deletes_buggy = detect_renames_and_deletes(existing_buggy, updated_id_map) + # OLD: longitude and school_name are falsely detected as deletes + assert "longitude" in deletes_buggy + assert "school_name" in deletes_buggy + + # NEW logic: use real UUID when name matches + existing_fixed = self._supplement_logic( + stored_id_map, updated_id_map, current_table_columns + ) + + _, deletes_fixed = detect_renames_and_deletes(existing_fixed, updated_id_map) + # NEW: no false deletes + assert deletes_fixed == [] + + +class TestMultipleOperations: + """Test handling of multiple simultaneous add, rename, and delete operations.""" + + def test_multiple_renames_simultaneous(self): + """Test that multiple columns can be renamed at once.""" + existing = { + "old_col_a": "id-1", + "old_col_b": "id-2", + "old_col_c": "id-3", + "unchanged": "id-4", + } + updated = { + "new_col_a": "id-1", + "new_col_b": "id-2", + "new_col_c": "id-3", + "unchanged": "id-4", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == { + "old_col_a": "new_col_a", + "old_col_b": "new_col_b", + "old_col_c": "new_col_c", + } + assert deletes == [] + + def test_multiple_deletes_simultaneous(self): + """Test that multiple columns can be deleted at once.""" + existing = { + "col_a": "id-1", + "col_to_drop_1": "id-2", + "col_b": "id-3", + "col_to_drop_2": "id-4", + "col_to_drop_3": "id-5", + } + updated = { + "col_a": "id-1", + "col_b": "id-3", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {} + assert sorted(deletes) == ["col_to_drop_1", "col_to_drop_2", "col_to_drop_3"] + + def test_multiple_renames_and_deletes_simultaneous(self): + """Test multiple renames and deletes happening together.""" + existing = { + "old_a": "id-1", + "to_drop_1": "id-2", + "old_b": "id-3", + "to_drop_2": "id-4", + "unchanged": "id-5", + } + updated = { + "new_a": "id-1", + "new_b": "id-3", + "unchanged": "id-5", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == {"old_a": "new_a", "old_b": "new_b"} + assert sorted(deletes) == ["to_drop_1", "to_drop_2"] + + def test_full_schema_evolution_add_rename_delete(self): + """Test complete schema evolution: adds, renames, and deletes simultaneously.""" + existing = { + "school_id": "id-1", + "old_funding_type": "id-2", + "num_teachers_female": "id-3", + "num_teachers_male": "id-4", + "old_tablet_count": "id-5", + } + updated = { + "school_id": "id-1", + "school_funding_source": "id-2", + "num_tablets_used": "id-5", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == { + "old_funding_type": "school_funding_source", + "old_tablet_count": "num_tablets_used", + } + assert sorted(deletes) == ["num_teachers_female", "num_teachers_male"] + + def test_complex_multi_country_scenario(self): + """Test scenario matching the user's Gambia case with multiple changes.""" + existing = { + "school_id_giga": "csv-id-001", + "school_name": "csv-id-002", + "school_funding_type": "csv-id-010", + "num_tablets": "csv-id-015", + "num_teachers_female": "csv-id-020", + "num_teachers_male": "csv-id-021", + "latitude": "csv-id-030", + "longitude": "csv-id-031", + } + updated = { + "school_id_giga": "csv-id-001", + "school_name": "csv-id-002", + "school_funding_source": "csv-id-010", + "num_tablets_used": "csv-id-015", + "latitude": "csv-id-030", + "longitude": "csv-id-031", + } + renames, deletes = detect_renames_and_deletes(existing, updated) + assert renames == { + "school_funding_type": "school_funding_source", + "num_tablets": "num_tablets_used", + } + assert sorted(deletes) == ["num_teachers_female", "num_teachers_male"] + + def test_multiple_adds_simultaneous(self): + """Test that multiple columns can be added at once.""" + existing_columns = {"col_a", "col_b", "col_c"} + updated_columns_set = { + "col_a", + "col_b", + "col_c", + "new_col_x", + "new_col_y", + "new_col_z", + } + added_columns = updated_columns_set - existing_columns + assert added_columns == {"new_col_x", "new_col_y", "new_col_z"} + + def test_complete_workflow_add_rename_delete_together(self): + """Test the complete workflow: adds, renames, and deletes all together.""" + existing_id_map = { + "school_id": "uuid-001", + "old_name_a": "uuid-002", + "old_name_b": "uuid-003", + "to_delete_1": "uuid-100", + "to_delete_2": "uuid-101", + } + updated_id_map = { + "school_id": "uuid-001", + "new_name_a": "uuid-002", + "new_name_b": "uuid-003", + "new_col_x": "uuid-200", + "new_col_y": "uuid-201", + "new_col_z": "uuid-202", + } + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + assert renames == {"old_name_a": "new_name_a", "old_name_b": "new_name_b"} + assert sorted(deletes) == ["to_delete_1", "to_delete_2"] + existing_after_renames = { + "school_id", + "new_name_a", + "new_name_b", + "to_delete_1", + "to_delete_2", + } + updated_columns_set = set(updated_id_map.keys()) + added_columns = updated_columns_set - existing_after_renames + removed_columns = existing_after_renames - updated_columns_set + assert added_columns == {"new_col_x", "new_col_y", "new_col_z"} + assert removed_columns == {"to_delete_1", "to_delete_2"} + + +class TestPartialColumnIdMap: + """Regression tests for the partial column_id_map bug. + + The bug: when ``existing_id_map`` was partially populated (some columns + had stored UUIDs, others did not), columns without UUIDs were silently + ignored by detect_renames_and_deletes. The secondary path in sync_schema + then dropped them by name, causing data loss when the user intended a + rename. + + The fix: ALWAYS supplement existing_id_map with synthetic ``table_*`` IDs + for any table column that lacks a stored UUID. This ensures the column + is at least handled by the explicit delete path (with a clear log + message) instead of being silently dropped. + """ + + def test_simulates_managers_bug_report(self): + """Reproduces the exact bug from the manager's logs. + + Setup: + - silver table has columns: school_funding_source, num_tablets_used + - column_id_map has only num_tablets_used UUID stored + (school_funding_source UUID is missing for some reason) + - User updates schema CSV: school_funding_source -> school_funding_type, + num_tablets_used -> num_tablets + + Before the fix: only num_tablets_used was detected as rename; + school_funding_source was silently dropped. + + After the fix: num_tablets_used is renamed (UUID match), + school_funding_source is detected as DELETE (synthetic ID, no match) + and dropped via the explicit path (with clear log). No silent data + loss. + """ + # Simulate stored column_id_map (partial - missing school_funding_source) + stored_id_map = { + "num_tablets_used": "uuid-tablets", + } + # Schema CSV has new names + updated_id_map = { + "school_funding_type": "uuid-funding", + "num_tablets": "uuid-tablets", + } + + # Simulate the new logic: supplement existing_id_map with table columns + current_table_columns = ["school_funding_source", "num_tablets_used"] + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + # Now detect renames and deletes + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + # num_tablets_used -> num_tablets is detected as rename via UUID + assert renames == {"num_tablets_used": "num_tablets"} + # school_funding_source is detected as DELETE (synthetic ID, no UUID match) + # This is now CONSISTENT - no silent drop in sync_schema secondary path + assert deletes == ["school_funding_source"] + + def test_partial_map_with_all_renames_intent(self): + """If user wants to rename a column without stored UUID, it becomes a delete. + + This is expected behavior - we cannot detect renames without UUID + matching. The user must ensure UUIDs are preserved across renames. + Better than silent data loss. + """ + stored_id_map = {"col_a": "uuid-a"} + updated_id_map = {"col_a_renamed": "uuid-a", "col_b_new": "uuid-b"} + current_table_columns = ["col_a", "col_b_old"] + + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + # col_a -> col_a_renamed: detected via UUID match + assert renames == {"col_a": "col_a_renamed"} + # col_b_old has no UUID -> detected as delete (synthetic ID never matches) + assert deletes == ["col_b_old"] + + def test_full_map_rename_works_correctly(self): + """When all columns have UUIDs stored, rename detection is perfect. + + This is the happy path - column_id_map is complete and accurate. + """ + stored_id_map = { + "school_funding_source": "uuid-funding", + "num_tablets_used": "uuid-tablets", + } + updated_id_map = { + "school_funding_type": "uuid-funding", + "num_tablets": "uuid-tablets", + } + current_table_columns = ["school_funding_source", "num_tablets_used"] + + # With full map, supplementing adds nothing extra + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + # Both renames detected correctly + assert renames == { + "school_funding_source": "school_funding_type", + "num_tablets_used": "num_tablets", + } + assert deletes == [] + + def test_orphan_column_not_silently_dropped(self): + """Orphan columns (in table but not in CSV) are detected as explicit deletes. + + Previously they would be silently dropped by sync_schema secondary path. + Now they show up in the deletes list with clear logging. + """ + stored_id_map = {"col_a": "uuid-a"} + updated_id_map = {"col_a": "uuid-a"} + # orphan_col is in table but neither in stored map nor in CSV + current_table_columns = ["col_a", "orphan_col"] + + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + assert renames == {} + assert deletes == ["orphan_col"] + + +class TestStoreColumnIdMapStaleCleanup: + """Tests for the store_column_id_map stale entry cleanup logic. + + Bug: store_column_id_map only ADDed/UPDATEd props but never REMOVEd + stale entries. After multiple renames, old column name props would + accumulate in table properties, eventually causing rename detection + to misbehave (e.g. multiple props pointing to the same UUID). + + Fix: store_column_id_map now removes any giga.columnId.* props for + columns not in the new mapping. + """ + + def test_stale_columns_identified(self): + """Test the logic for identifying stale columns to remove.""" + current_props = { + "old_name_a": "uuid-a", + "old_name_b": "uuid-b", + "unchanged": "uuid-c", + } + new_map = { + "new_name_a": "uuid-a", # renamed from old_name_a + "new_name_b": "uuid-b", # renamed from old_name_b + "unchanged": "uuid-c", + } + stale = [name for name in current_props if name not in new_map] + assert sorted(stale) == ["old_name_a", "old_name_b"] + + def test_no_stale_columns(self): + """If new map matches current props, no cleanup needed.""" + current_props = {"col_a": "uuid-a", "col_b": "uuid-b"} + new_map = {"col_a": "uuid-a", "col_b": "uuid-b"} + stale = [name for name in current_props if name not in new_map] + assert stale == [] + + def test_removed_columns_in_stale(self): + """Deleted columns appear in stale list.""" + current_props = {"col_a": "uuid-a", "col_to_delete": "uuid-b"} + new_map = {"col_a": "uuid-a"} + stale = [name for name in current_props if name not in new_map] + assert stale == ["col_to_delete"] + + +class TestPartitionColumnExclusion: + """Regression tests for DELTA_UNSUPPORTED_DROP_PARTITION_COLUMN bug. + + Staging tables have technical partition columns (upload_id, etc.) that are + never present in the business schema CSV. Before the fix these were tagged + with synthetic IDs and detected as deletes, causing: + ALTER TABLE ... DROP COLUMN `upload_id` + which Delta Lake rejects with DELTA_UNSUPPORTED_DROP_PARTITION_COLUMN. + + The fix: skip partition columns when supplementing existing_id_map. + """ + + def test_partition_columns_excluded_from_synthetic_tagging(self): + """Partition columns must not appear in the deletes list.""" + stored_id_map = {"electricity_type": "uuid-elec", "school_name": "uuid-name"} + updated_id_map = { + "electricity_type_test": "uuid-elec", + "school_name": "uuid-name", + } + partition_columns = {"upload_id"} + + # Simulate staging table columns including partition column + current_table_columns = [ + "electricity_type", + "school_name", + "upload_id", # partition column — must be skipped + "change_type", # technical column without UUID + ] + + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name in partition_columns: + continue # THE FIX: skip partition columns + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + assert "upload_id" not in deletes + assert renames == {"electricity_type": "electricity_type_test"} + assert deletes == ["change_type"] + + def test_partition_columns_not_in_deletes_without_fix(self): + """Demonstrates the bug: without the fix, upload_id would be in deletes.""" + stored_id_map = {"electricity_type": "uuid-elec"} + updated_id_map = {"electricity_type_test": "uuid-elec"} + current_table_columns = ["electricity_type", "upload_id"] + + # WITHOUT the fix (no partition exclusion) + existing_id_map_buggy = dict(stored_id_map) + for col_name in current_table_columns: + if col_name not in existing_id_map_buggy: + existing_id_map_buggy[col_name] = f"table_{col_name}" + + _, deletes_buggy = detect_renames_and_deletes( + existing_id_map_buggy, updated_id_map + ) + assert "upload_id" in deletes_buggy # proves the bug existed + + # WITH the fix (partition columns skipped) + partition_columns = {"upload_id"} + existing_id_map_fixed = dict(stored_id_map) + for col_name in current_table_columns: + if col_name in partition_columns: + continue + if col_name not in existing_id_map_fixed: + existing_id_map_fixed[col_name] = f"table_{col_name}" + + _, deletes_fixed = detect_renames_and_deletes( + existing_id_map_fixed, updated_id_map + ) + assert "upload_id" not in deletes_fixed # fix works + + def test_rollback_rename_with_partition_columns(self): + """The exact rollback scenario: electricity_type_test -> electricity_type. + + Staging table has partition column upload_id plus technical columns. + The rollback rename must succeed without attempting to drop upload_id. + """ + stored_id_map = { + "electricity_type_test": "uuid-elec", + "school_name": "uuid-name", + "change_type": "uuid-change", + } + updated_id_map = { + "electricity_type": "uuid-elec", # rollback rename + "school_name": "uuid-name", + "change_type": "uuid-change", + } + partition_columns = {"upload_id"} + current_table_columns = [ + "electricity_type_test", + "school_name", + "change_type", + "upload_id", # partition — must be excluded + "uploaded_columns", + "status", + ] + + existing_id_map = dict(stored_id_map) + for col_name in current_table_columns: + if col_name in partition_columns: + continue + if col_name not in existing_id_map: + existing_id_map[col_name] = f"table_{col_name}" + + renames, deletes = detect_renames_and_deletes(existing_id_map, updated_id_map) + + assert renames == {"electricity_type_test": "electricity_type"} + assert "upload_id" not in deletes + # uploaded_columns and status are technical orphans — correctly detected as deletes + # but NOT partition columns so Delta won't reject them (they can be dropped or ignored + # by execute_deletes' safety guard) + assert "upload_id" not in renames.keys() + assert "upload_id" not in renames.values() + + +class TestPartitionAndCallerManagedExclusion: + """Comprehensive coverage for the exclusion logic in apply_renames_and_deletes. + + These tests model the in-memory logic without touching Spark. Each test + simulates the EXACT logic in apply_renames_and_deletes after my fixes: + excluded = partition_columns | (caller_managed - business_csv) + # Strip excluded from existing_id_map and updated_id_map + # Skip excluded when supplementing synthetic IDs + """ + + @staticmethod + def _apply_logic( + stored_id_map: dict, + business_csv_map: dict, + current_table_columns: list, + partition_columns: set, + caller_managed_columns: set | None = None, + ): + """Mirror the production logic in apply_renames_and_deletes.""" + existing = dict(stored_id_map) + updated = dict(business_csv_map) + excluded = set(partition_columns) + if caller_managed_columns: + excluded |= {c for c in caller_managed_columns if c not in updated} + + # Strip excluded from both maps + for c in list(existing.keys()): + if c in excluded: + del existing[c] + for c in list(updated.keys()): + if c in excluded: + del updated[c] + + # Supplement with synthetic IDs (skip excluded) + for c in current_table_columns: + if c in excluded: + continue + if c not in existing: + existing[c] = f"table_{c}" + + return detect_renames_and_deletes(existing, updated) + + def test_multi_partition_table_all_protected(self): + """All partition columns (not just upload_id) must be protected.""" + # caller_managed_columns reflects the FUTURE (updated) schema + renames, deletes = self._apply_logic( + stored_id_map={"electricity_type": "u-elec"}, + business_csv_map={"electricity_type_test": "u-elec"}, + current_table_columns=[ + "electricity_type", + "year", + "month", + "country_code", + ], + partition_columns={"year", "month", "country_code"}, + caller_managed_columns={ + "electricity_type_test", # new name (from updated_schema) + "year", + "month", + "country_code", + }, + ) + assert renames == {"electricity_type": "electricity_type_test"} + assert "year" not in deletes + assert "month" not in deletes + assert "country_code" not in deletes + assert deletes == [] + + def test_partition_col_with_stored_uuid_not_dropped(self): + """Even if a partition column has a stored UUID (loophole A), it is excluded.""" + renames, deletes = self._apply_logic( + stored_id_map={ + "electricity_type": "u-elec", + "upload_id": "u-bogus-stored-uuid", # should never be acted on + }, + business_csv_map={"electricity_type": "u-elec"}, + current_table_columns=["electricity_type", "upload_id"], + partition_columns={"upload_id"}, + caller_managed_columns={"electricity_type", "upload_id"}, + ) + assert renames == {} + assert "upload_id" not in deletes + assert deletes == [] + + def test_partition_col_in_business_csv_is_ignored(self): + """If a partition column is mistakenly in the business CSV, exclusion still wins.""" + renames, deletes = self._apply_logic( + stored_id_map={"electricity_type": "u-elec"}, + business_csv_map={ + "electricity_type": "u-elec", + "upload_id": "u-bogus-csv", # mistakenly added to business CSV + }, + current_table_columns=["electricity_type", "upload_id"], + partition_columns={"upload_id"}, + caller_managed_columns={"electricity_type", "upload_id"}, + ) + assert renames == {} + assert deletes == [] + + def test_caller_managed_with_stored_uuid_not_dropped(self): + """Loophole A for technical columns: stored UUID for tech col must be ignored.""" + renames, deletes = self._apply_logic( + stored_id_map={ + "school_name": "u-sn", + "change_type": "u-bogus-tech", # tech col should never have UUID + }, + business_csv_map={"school_name": "u-sn"}, + current_table_columns=["school_name", "change_type", "status"], + partition_columns=set(), + caller_managed_columns={"school_name", "change_type", "status"}, + ) + assert renames == {} + assert "change_type" not in deletes + assert "status" not in deletes + assert deletes == [] + + def test_master_table_no_partitions_no_change(self): + """Master tables have no partitions and no tech cols — must work as before.""" + renames, deletes = self._apply_logic( + stored_id_map={ + "school_name": "u-sn", + "old_funding": "u-fund", + "to_drop": "u-drop", + }, + business_csv_map={ + "school_name": "u-sn", + "new_funding": "u-fund", + }, + current_table_columns=["school_name", "old_funding", "to_drop"], + partition_columns=set(), # no partitions + caller_managed_columns=None, # no updated_schema passed + ) + assert renames == {"old_funding": "new_funding"} + assert deletes == ["to_drop"] + + def test_orphan_business_col_still_detected(self): + """Business columns NOT in updated_schema NOR partitions are still subject to delete.""" + renames, deletes = self._apply_logic( + stored_id_map={"keep_me": "u-keep"}, + business_csv_map={"keep_me": "u-keep"}, + # orphan_biz is in the table but NOT in updated_schema (was supposed to be deleted) + current_table_columns=["keep_me", "orphan_biz", "upload_id"], + partition_columns={"upload_id"}, + caller_managed_columns={"keep_me", "upload_id"}, # orphan_biz NOT here + ) + # orphan_biz is not partition, not caller-managed → tagged synthetic → delete + assert renames == {} + assert deletes == ["orphan_biz"] + + def test_rollback_rename_full_scenario(self): + """End-to-end model of the manager's rollback scenario.""" + partition_cols = {"upload_id"} + tech_cols = { + "change_type", + "uploaded_columns", + "status", + "change_id", + "created_at", + "processed_at", + "approval_request_log_id", + "master_version", + } + # All 28 staging columns + current = ["school_id_giga", "school_name", "electricity_type_test"] + current += list(tech_cols) + list(partition_cols) + + # Stored UUIDs (only business) + stored = { + "school_id_giga": "u1", + "school_name": "u2", + "electricity_type_test": "u-elec", # current name in table + } + # New CSV: rollback rename + csv = { + "school_id_giga": "u1", + "school_name": "u2", + "electricity_type": "u-elec", # rollback to old name + } + # Updated schema (full pending) = business + tech + partition + updated_schema_names = set(csv.keys()) | tech_cols | partition_cols + + renames, deletes = self._apply_logic( + stored_id_map=stored, + business_csv_map=csv, + current_table_columns=current, + partition_columns=partition_cols, + caller_managed_columns=updated_schema_names, + ) + + assert renames == {"electricity_type_test": "electricity_type"} + assert deletes == [] + + def test_combined_multi_operations_with_partitions(self): + """Multi rename + multi delete + multi add, all with partition cols.""" + partition_cols = {"upload_id"} + tech_cols = {"change_type", "status"} + current = [ + "keep_a", + "old_b", + "old_c", # business + "to_drop_x", + "to_drop_y", # business deletes + "upload_id", # partition + "change_type", + "status", # tech + ] + stored = { + "keep_a": "u1", + "old_b": "u2", + "old_c": "u3", + "to_drop_x": "u4", + "to_drop_y": "u5", + } + csv = { + "keep_a": "u1", + "new_b": "u2", + "new_c": "u3", # 2 renames + "added_p": "u6", + "added_q": "u7", # 2 adds (handled elsewhere) + } + updated_schema_names = set(csv.keys()) | tech_cols | partition_cols + + renames, deletes = self._apply_logic( + stored_id_map=stored, + business_csv_map=csv, + current_table_columns=current, + partition_columns=partition_cols, + caller_managed_columns=updated_schema_names, + ) + + assert renames == {"old_b": "new_b", "old_c": "new_c"} + assert sorted(deletes) == ["to_drop_x", "to_drop_y"] + + +class TestSyncSchemaSafeDropPath: + """Tests for the sync_schema secondary drop path safety fix. + + Bug: When schema_name was provided and apply_renames_and_deletes + missed a column (e.g. due to incomplete column_id_map), the secondary + path in sync_schema would silently drop the column by name comparison, + causing data loss. + + Fix: When schema_name is provided, the secondary path now logs a + WARNING and leaves the column in place. apply_renames_and_deletes + is the authoritative path for both renames and deletes. + + Note: These tests verify the LOGIC of the new safe drop path; they + don't run sync_schema directly (which requires Spark). + """ + + def test_safe_drop_path_with_schema_name(self): + """When schema_name is provided, leftover columns should NOT be dropped.""" + schema_name = "school_geolocation" + removed_columns = {"school_funding_source"} + + # Simulate the new safe-drop logic + should_drop = [] + if removed_columns: + if schema_name is not None: + # New behavior: warn but don't drop + pass + else: + # Legacy behavior: drop by name + should_drop = list(removed_columns) + + assert should_drop == [] + + def test_safe_drop_path_without_schema_name(self): + """Legacy callers (no schema_name) keep old drop-by-name behavior.""" + schema_name = None + removed_columns = {"col_to_drop"} + + should_drop = [] + if removed_columns: + if schema_name is not None: + pass + else: + should_drop = list(removed_columns) + + assert should_drop == ["col_to_drop"] + + +class TestPrimaryKeyProtection: + """Unit tests for primary-key column protection in rename/delete detection. + + A primary key column must NEVER be renamed or deleted, even if the + schema CSV is updated with a different name for the same UUID. + """ + + def _apply_with_pk( + self, + stored_id_map: dict[str, str], + business_csv_map: dict[str, str], + pk_names: set[str], + persisted_pk_uuids: set[str] | None = None, + ) -> tuple[dict[str, str], list[str], set[str]]: + """Mimic the detection + PK-filtering logic from apply_renames_and_deletes. + + ``persisted_pk_uuids`` mimics the ``giga.pkColumnIds`` table property + that persists PK UUIDs even when the CSV removes the column. + """ + existing_id_map = dict(stored_id_map) + updated_id_map = dict(business_csv_map) + persisted_pk_uuids = set(persisted_pk_uuids or ()) + + # detect + id_to_name = {v: k for k, v in updated_id_map.items()} + renames: dict[str, str] = {} + deletes: list[str] = [] + for name, uid in existing_id_map.items(): + if uid in id_to_name and id_to_name[uid] != name: + renames[name] = id_to_name[uid] + elif uid not in id_to_name: + deletes.append(name) + + # PK filtering (same logic as production code). + # PK UUIDs come from BOTH the CSV-declared PK names and any persisted + # PK UUIDs (the latter mimics ``giga.pkColumnIds`` on the data table). + pk_uuids = {updated_id_map[n] for n in pk_names if n in updated_id_map} + pk_uuids |= persisted_pk_uuids + renames_original = dict(renames) + if pk_uuids: + blocked_renames = { + old_name + for old_name, new_name in renames.items() + if existing_id_map.get(old_name) in pk_uuids + } + for old_name in blocked_renames: + del renames[old_name] + blocked_deletes = [ + col_name + for col_name in deletes + if existing_id_map.get(col_name) in pk_uuids + ] + for col_name in blocked_deletes: + deletes.remove(col_name) + + # blocked new names to exclude from add logic + blocked_new_names = set() + if pk_uuids: + blocked_new_names = { + new_name + for old_name, new_name in renames_original.items() + if old_name + in { + o + for o, _ in renames_original.items() + if existing_id_map.get(o) in pk_uuids + } + } + + return renames, deletes, blocked_new_names + + def test_pk_rename_is_blocked(self): + """A primary key column rename must be blocked.""" + stored = {"school_id_giga": "u1", "latitude": "u2"} + csv = {"school_id_giga_renamed": "u1", "latitude": "u2"} + pk_names = {"school_id_giga_renamed"} + + renames, deletes, blocked = self._apply_with_pk(stored, csv, pk_names) + assert renames == {} + assert deletes == [] + assert blocked == {"school_id_giga_renamed"} + + def test_pk_delete_is_blocked(self): + """A primary key column delete must be blocked. + + Scenario: the schema CSV no longer contains the PK column at all, + so ``primary_key_columns`` (derived from the CSV) is empty. The + PK UUID is still recorded on the data table via + ``giga.pkColumnIds``; that persisted set must keep the column + protected from being dropped. + """ + stored = {"school_id_giga": "u1", "latitude": "u2"} + csv = {"latitude": "u2"} # school_id_giga entirely removed from CSV + pk_names: set[str] = set() # CSV has no PK markers + persisted_pks = {"u1"} # but UUID u1 was historically the PK + + renames, deletes, blocked = self._apply_with_pk( + stored, csv, pk_names, persisted_pks + ) + assert renames == {} + # PK delete must be blocked — school_id_giga stays + assert deletes == [] + assert blocked == set() + + def test_pk_delete_without_persisted_uuid_is_allowed(self): + """Without persisted PK UUIDs, the legacy behaviour is preserved. + + This documents the prior (pre-fix) behaviour for tables that have + not yet had ``giga.pkColumnIds`` written — the delete proceeds. + """ + stored = {"school_id_giga": "u1", "latitude": "u2"} + csv = {"latitude": "u2"} + pk_names: set[str] = set() + + renames, deletes, blocked = self._apply_with_pk( + stored, csv, pk_names, persisted_pk_uuids=set() + ) + assert renames == {} + # No PK protection available → delete proceeds + assert deletes == ["school_id_giga"] + assert blocked == set() + + def test_non_pk_rename_is_allowed(self): + """Non-PK column renames should still proceed normally.""" + stored = {"school_id_giga": "u1", "latitude": "u2"} + csv = {"school_id_giga": "u1", "lat_renamed": "u2"} + pk_names = {"school_id_giga"} + + renames, deletes, blocked = self._apply_with_pk(stored, csv, pk_names) + assert renames == {"latitude": "lat_renamed"} + assert deletes == [] + assert blocked == set() + + def test_pk_and_regular_rename_together(self): + """Mixed scenario: PK rename blocked, regular rename allowed.""" + stored = {"school_id_giga": "u1", "latitude": "u2", "old_col": "u3"} + csv = { + "school_id_giga_renamed": "u1", + "lat_renamed": "u2", + "new_col": "u3", + } + pk_names = {"school_id_giga_renamed"} + + renames, deletes, blocked = self._apply_with_pk(stored, csv, pk_names) + assert renames == {"latitude": "lat_renamed", "old_col": "new_col"} + assert deletes == [] + assert blocked == {"school_id_giga_renamed"}