Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bb39d84
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
3524d75
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
efcb2f1
chore: pre commit issue fixed
Apr 1, 2026
96970d4
fix: improve processing time for large files (#445)
brianmusisi Apr 1, 2026
cb122b5
feat: staging table restructure (#443)
brianmusisi Mar 31, 2026
81d581c
fix: improve processing time for large files (#445)
brianmusisi Apr 1, 2026
7869c37
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
ff92198
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 2, 2026
c90589c
update the setup to upgrade the Hive metastore
sharky93 Apr 6, 2026
adebbfe
Update metastore-site.template.xml
sharky93 Apr 6, 2026
398a987
feat: migrate from wasbs to abfss and vectorize admin data functions …
brianmusisi Apr 8, 2026
7e2abbe
fix: update asset key check for multi asets (#447)
brianmusisi Apr 8, 2026
a9da8cb
fix: make updates to geolocation_staging idempotent
brianmusisi Apr 8, 2026
fca9e95
feat: add SAS token support for hive with abfss
brianmusisi Apr 8, 2026
6e110b7
fix: add FixedSASTokenProvider JAR to hive
brianmusisi Apr 9, 2026
00ee468
fix: use correct version for jar build
brianmusisi Apr 9, 2026
9d6def7
update timeout for creating the physical table
sharky93 Apr 13, 2026
e731264
fix: merge main branch
bidhan-nagarro Apr 14, 2026
ebdf193
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 14, 2026
14331d8
fix: schema evolution changes
bidhan-nagarro Apr 15, 2026
3c62591
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 20, 2026
e39b68b
fix: spark error addressed
bidhan-nagarro Apr 22, 2026
abbc539
fix: bug fix
bidhan-nagarro Apr 23, 2026
1cefbe8
fix: bugfix
bidhan-nagarro Apr 23, 2026
75686dd
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 24, 2026
8d30924
fix: bugfix
bidhan-nagarro Apr 24, 2026
df0d08b
fix: rename and delete fixed
bidhan-nagarro Apr 27, 2026
3c8b18f
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 5, 2026
4f01fdb
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 12, 2026
a52d0ab
fix: partition key bug fix
bidhan-nagarro May 12, 2026
17a7002
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 13, 2026
9d7986d
fix: catch exception
bidhan-nagarro May 13, 2026
becc0b7
fix: added one time script for column id mapping
bidhan-nagarro May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 0 additions & 65 deletions dagster/src/assets/adhoc/master_csv_to_gold.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
check_table_exists,
create_delta_table,
create_schema,
sync_schema,
)
from src.utils.logger import ContextLoggerWithLoguruFallback
from src.utils.metadata import get_output_metadata, get_table_preview
Expand Down Expand Up @@ -608,38 +607,6 @@ def adhoc__publish_master_to_gold(
)
gold = compute_row_hash(gold)

table_exists = check_table_exists(
spark=spark.spark_session,
schema_name="school_master",
table_name=config.country_code.lower(),
data_tier=DataTier.GOLD,
)

if table_exists:
table_name = f"{config.metastore_schema}.{config.country_code}"
updated_schema = StructType(
get_schema_columns(
spark=spark.spark_session, schema_name=config.metastore_schema
)
)

context.log.info(f"Existing table name: {table_name}")

spark.spark_session.catalog.refreshTable(table_name)
existing_df = DeltaTable.forName(
sparkSession=spark.spark_session, tableOrViewName=table_name
).toDF()

existing_schema = existing_df.schema

sync_schema(
table_name=table_name,
existing_schema=existing_schema,
updated_schema=updated_schema,
spark=spark.spark_session,
context=context,
)

schema_reference = get_schema_columns_datahub(
spark.spark_session,
config.metastore_schema,
Expand Down Expand Up @@ -690,38 +657,6 @@ def adhoc__publish_reference_to_gold(
)
gold = compute_row_hash(gold)

table_exists = check_table_exists(
spark=spark.spark_session,
schema_name="school_reference",
table_name=config.country_code.lower(),
data_tier=DataTier.GOLD,
)

if table_exists:
table_name = f"{config.metastore_schema}.{config.country_code}"
updated_schema = StructType(
get_schema_columns(
spark=spark.spark_session, schema_name=config.metastore_schema
)
)

context.log.info(f"Existing table name: {table_name}")

spark.spark_session.catalog.refreshTable(table_name)
existing_df = DeltaTable.forName(
sparkSession=spark.spark_session,
tableOrViewName=table_name,
).toDF()
existing_schema = existing_df.schema

sync_schema(
table_name=table_name,
existing_schema=existing_schema,
updated_schema=updated_schema,
spark=spark.spark_session,
context=context,
)

schema_reference = get_schema_columns_datahub(
spark.spark_session,
config.metastore_schema,
Expand Down
6 changes: 3 additions & 3 deletions dagster/src/assets/common/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def reset_staging_table(
raise


def _handle_null_columns(schema_columns, primary_key):
def handle_null_columns(schema_columns, primary_key):
"""Handle null columns by providing default values based on data type.

If the column value is NULL, add a placeholder value if the following
Expand Down Expand Up @@ -771,7 +771,7 @@ def master(
else:
new_master = silver

column_actions = _handle_null_columns(schema_columns, primary_key)
column_actions = handle_null_columns(schema_columns, primary_key)
new_master = new_master.withColumns(column_actions)
new_master = compute_row_hash(new_master)

Expand Down Expand Up @@ -829,7 +829,7 @@ def reference(
else:
new_reference = silver

column_actions = _handle_null_columns(schema_columns, primary_key)
column_actions = handle_null_columns(schema_columns, primary_key)
new_reference = new_reference.withColumns(column_actions)
new_reference = compute_row_hash(new_reference)

Expand Down
191 changes: 191 additions & 0 deletions dagster/src/assets/migrations/bootstrap_column_id_maps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""One-time migration: bootstrap giga.columnId.* table properties for all existing
Delta tables that predate the UUID-based rename/delete detection feature.

For tables where the schema CSV has already been renamed before this feature was
deployed, the script also detects the mismatch and renames the physical column to
match the current schema before persisting the mapping — eliminating the need for
manual per-table ALTER TABLE statements.

Run once per environment after deploying the rename/delete detection feature.
Safe to re-run: tables that already have the mapping stored are skipped.
"""

from dagster_pyspark import PySparkResource
from pyspark.sql import SparkSession
from src.constants import DataTier
from src.utils.delta import (
enable_column_mapping,
get_stored_column_id_map,
persist_column_id_map,
)
from src.utils.schema import (
construct_schema_name_for_tier,
get_schema_columns_with_id,
)

from dagster import OpExecutionContext, asset


def _get_all_country_tables(
spark: SparkSession,
tier_schema: str,
) -> list[str]:
"""Return all fully-qualified table names in a given schema."""
if not spark.catalog.databaseExists(tier_schema):
return []
rows = spark.sql(f"SHOW TABLES IN `{tier_schema}`").collect()
return [f"{tier_schema}.{row['tableName']}" for row in rows]


def _resolve_schema_name(dataset_type: str) -> str:
"""Return the schemas-namespace table name for a dataset type, e.g. 'school_geolocation'."""
return dataset_type


def bootstrap_table(
spark: SparkSession,
context: OpExecutionContext,
full_table_name: str,
schema_name: str,
dry_run: bool = False,
) -> None:
"""Bootstrap or repair the column-ID mapping for a single Delta table.

Steps:
1. If the table already has a complete mapping, skip it.
2. Build updated_id_map from the schema CSV (name -> uuid).
3. For physical columns that have no stored UUID and no direct name match in
the schema, try to match them to unmatched schema columns by comparing
the set of columns that differ between table and schema.
If the mismatch is unambiguous (1 old name : 1 new name), rename the
physical column and proceed. If ambiguous, log a warning and skip.
4. Call persist_column_id_map to store the final mapping.
"""
if not spark.catalog.tableExists(full_table_name):
context.log.info(f"Table {full_table_name} does not exist — skipping.")
return

existing_map = get_stored_column_id_map(spark, full_table_name)

try:
columns_with_id = get_schema_columns_with_id(spark, schema_name)
except Exception as exc:
context.log.warning(
f"Could not load schema '{schema_name}' for {full_table_name}: {exc} — skipping."
)
return

updated_id_map: dict[str, str] = {
field.name: csv_id for csv_id, field in columns_with_id
}

spark.catalog.refreshTable(full_table_name)
physical_columns: list[str] = spark.table(full_table_name).columns

# Columns in the table that have no stored UUID and no direct name match in schema
orphan_table_cols = [
c for c in physical_columns if c not in existing_map and c not in updated_id_map
]
# Schema columns that are not physically present in the table
missing_schema_cols = [c for c in updated_id_map if c not in physical_columns]

if orphan_table_cols:
context.log.info(
f"{full_table_name}: orphan table columns (no UUID, no schema match): "
f"{orphan_table_cols}"
)
context.log.info(
f"{full_table_name}: missing schema columns (in schema but not in table): "
f"{missing_schema_cols}"
)

if orphan_table_cols and missing_schema_cols:
if len(orphan_table_cols) == len(missing_schema_cols):
# Unambiguous 1-to-1 mismatch — safe to rename
renames = dict(zip(orphan_table_cols, missing_schema_cols, strict=False))
context.log.info(f"{full_table_name}: detected probable renames: {renames}")
if not dry_run:
enable_column_mapping(spark, full_table_name)
for old_name, new_name in renames.items():
stmt = (
f"ALTER TABLE {full_table_name} "
f"RENAME COLUMN `{old_name}` TO `{new_name}`"
)
context.log.info(f"Executing: {stmt}")
spark.sql(stmt)
spark.catalog.refreshTable(full_table_name)
else:
context.log.info(
f"[DRY RUN] Would rename columns in {full_table_name}: {renames}"
)
else:
# Ambiguous — multiple columns differ; cannot safely auto-rename
context.log.warning(
f"{full_table_name}: ambiguous column mismatch "
f"(orphan table cols: {orphan_table_cols}, "
f"missing schema cols: {missing_schema_cols}). "
f"Cannot auto-rename. Manual intervention required."
)
# Still persist what we can so the rest of the mapping is stored
elif not orphan_table_cols and not missing_schema_cols:
context.log.info(
f"{full_table_name}: all physical columns match the schema by name."
)

if not dry_run:
persist_column_id_map(spark, full_table_name, schema_name)
context.log.info(f"{full_table_name}: column-ID mapping persisted.")
else:
context.log.info(
f"[DRY RUN] Would persist column-ID mapping for {full_table_name}."
)


@asset
def bootstrap_column_id_maps(
context: OpExecutionContext,
spark: PySparkResource,
) -> None:
"""One-time asset: bootstrap giga.columnId.* props for all existing Delta tables.

Set DRY_RUN = True to log what would happen without making any changes.
"""
DRY_RUN = False

s: SparkSession = spark.spark_session

# Map of (dataset_type, tier) -> schema_name used for rename/delete detection
# Add more entries here if other dataset types are introduced.
dataset_configs: list[tuple[str, DataTier | None]] = [
("school_geolocation", DataTier.SILVER),
("school_geolocation", DataTier.STAGING),
("school_geolocation", None), # master: schema = "school_master"
]

for dataset_type, tier in dataset_configs:
if tier is None:
tier_schema = "school_master"
schema_name = dataset_type
else:
tier_schema = construct_schema_name_for_tier(
f"school_{dataset_type.replace('school_', '')}", tier
)
schema_name = dataset_type

context.log.info(
f"Processing schema '{tier_schema}' with schema_name='{schema_name}' ..."
)
table_names = _get_all_country_tables(s, tier_schema)

if not table_names:
context.log.info(f"No tables found in '{tier_schema}' — skipping.")
continue

for full_table_name in table_names:
bootstrap_table(
spark=s,
context=context,
full_table_name=full_table_name,
schema_name=schema_name,
dry_run=DRY_RUN,
)
29 changes: 24 additions & 5 deletions dagster/src/assets/migrations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from models import VALID_PRIMITIVES, Schema
from pyspark import sql
from pyspark.sql.functions import col, when
from src.utils.delta import execute_query_with_error_handler
from src.utils.delta import (
create_delta_table,
persist_column_id_map,
sync_schema,
)

from dagster import OpExecutionContext

Expand Down Expand Up @@ -42,17 +46,32 @@ def save_schema_delta_table(context: OpExecutionContext, df: sql.DataFrame):
full_table_name = f"{schema_name}.{table_name}"

columns = Schema.fields
query = (
DeltaTable.createOrReplace(spark).tableName(full_table_name).addColumns(columns)
create_delta_table(
spark,
schema_name,
table_name,
columns,
context,
if_not_exists=True,
)
sync_schema(
table_name=full_table_name,
existing_schema=spark.table(full_table_name).schema,
updated_schema=Schema.schema,
spark=spark,
context=context,
)
execute_query_with_error_handler(spark, query, schema_name, table_name, context)

spark.catalog.refreshTable(full_table_name)
(
DeltaTable.forName(spark, full_table_name)
.alias("master")
.merge(df.alias("updates"), "master.name = updates.name")
.merge(df.alias("updates"), "master.id = updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)

# Persist column-ID mapping after merge succeeds
persist_column_id_map(spark, full_table_name, table_name)
15 changes: 13 additions & 2 deletions dagster/src/assets/school_geolocation/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,19 @@ def geolocation_metadata(
context.log.info("Create spark dataframe")
metadata_df = s.createDataFrame(metadata_df)

table_columns = get_schema_columns(s, "school_geolocation_metadata")
table_name = "school_geolocation_metadata"
table_schema_name = "pipeline_tables"

context.log.info("Get schema columns for metadata table")
try:
table_columns = get_schema_columns(s, "school_geolocation_metadata")
except Exception:
context.log.warning(
"Schema table schemas.school_geolocation_metadata not found; "
"using DataFrame schema for metadata table creation"
)
table_columns = list(metadata_df.schema.fields)

context.log.info("Create the schema and table if they do not exist")
metadata_df = add_missing_columns(metadata_df, table_columns)
metadata_df = metadata_df.select(*StructType(table_columns).fieldNames())
Expand Down Expand Up @@ -327,7 +336,9 @@ def geolocation_data_quality_results(
)

dq_results_schema_name = f"{schema_name}_dq_results"
table_name = f"{id}_{country_code}_{current_timestamp}"
# Replace hyphens with underscores so the identifier is valid in Spark SQL
safe_id = id.replace("-", "_")
table_name = f"{safe_id}_{country_code}_{current_timestamp}"

schema_columns = [
StructField(field.name, field.dataType, nullable=True)
Expand Down
Loading