Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
9fe8821
feat: Parquet To delta
bidhan-nagarro Jan 28, 2026
a9e3b15
feat: Parquet To delta
bidhan-nagarro Jan 29, 2026
6d77362
feat: Parquet To delta
bidhan-nagarro Jan 29, 2026
fbf03eb
feat: Parquet To delta
bidhan-nagarro Jan 29, 2026
69fa9eb
feat: Parquet To delta
bidhan-nagarro Feb 2, 2026
39fdd34
feat: Parquet To delta
bidhan-nagarro Feb 6, 2026
96680a6
feat: Parquet To delta
bidhan-nagarro Feb 6, 2026
fd9d374
feat: Parquet To delta
bidhan-nagarro Feb 6, 2026
f9c2ad4
feat: Parquet To delta
bidhan-nagarro Feb 9, 2026
95e5f54
feat: Parquet To delta
bidhan-nagarro Feb 9, 2026
f5f59c4
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Feb 26, 2026
219f5e9
feat: Data Quality dagster Changes
bidhan-nagarro Feb 27, 2026
22a5a82
feat: Data Quality dagster Changes
bidhan-nagarro Feb 27, 2026
8518dd3
feat: Unified Country for error checks
bidhan-nagarro Feb 27, 2026
8b9ecc7
feat: PR comments addressed
bidhan-nagarro Mar 6, 2026
41a9770
feat: PR comments addressed
bidhan-nagarro Mar 6, 2026
3148a92
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Mar 6, 2026
ff84c4b
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Mar 7, 2026
0a79875
feat: PR comments addressed
bidhan-nagarro Mar 9, 2026
1cda0b7
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Mar 10, 2026
336094e
feat: Error Table Changes
bidhan-nagarro Mar 10, 2026
e97837e
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Mar 11, 2026
7762c13
feat: Parquet To delta - pre-commit hook issue resolved
Mar 11, 2026
f032c8c
Merge branch 'main' into feature/parquet-to-delta
gauravgupta-nagarro Mar 11, 2026
37c73bf
feat: Parquet To delta - Sensor time reduced
Mar 11, 2026
c11fc13
feat: Data overwrite issue fix
bidhan-nagarro Mar 11, 2026
959b917
feat: Parquet To delta - Sensor name and Duration changed
Mar 12, 2026
ba42d5a
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Mar 18, 2026
1a25b9b
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Mar 19, 2026
d810b21
feat: Error table fixes
bidhan-nagarro Mar 19, 2026
727bbf1
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Mar 19, 2026
4939b5a
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Mar 21, 2026
b88c082
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Mar 21, 2026
bb39d84
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
3524d75
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
1aadce4
feat: Fixes regarding the school list
bidhan-nagarro Mar 25, 2026
4f56416
feat: Fixes regarding the school list
bidhan-nagarro Mar 25, 2026
5287132
feat: Reverted unnecessary changes
bidhan-nagarro Mar 26, 2026
654324b
fix: Review comments addressed
bidhan-nagarro Mar 27, 2026
8a8b0bf
chore: upgrade packages for pydantic 2 support
brianmusisi Mar 27, 2026
fa54ba6
fix: review comments addressed
bidhan-nagarro Mar 31, 2026
06d9731
Remove PING_RETRY_FILE_PATH constant
bidhan-nagarro Mar 31, 2026
217bca3
fix: error table
bidhan-nagarro Apr 1, 2026
efcb2f1
chore: pre commit issue fixed
Apr 1, 2026
7a1b795
chore: pre commit issue fixed
Apr 1, 2026
216b5a7
chore: pre commit issue fixed
Apr 1, 2026
a7bae4c
Merge branch 'main' into feature/dq
bidhan-nagarro Apr 1, 2026
96970d4
fix: improve processing time for large files (#445)
brianmusisi Apr 1, 2026
bf9ac0d
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Apr 2, 2026
43a3aa6
Merge branch 'main' into feature/dq
bidhan-nagarro Apr 2, 2026
a62dc2b
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Apr 2, 2026
728acec
feat: pre-commit fix
bidhan-nagarro Apr 2, 2026
1c20ea9
feat: pre-commit fix
bidhan-nagarro Apr 2, 2026
cb122b5
feat: staging table restructure (#443)
brianmusisi Mar 31, 2026
81d581c
fix: improve processing time for large files (#445)
brianmusisi Apr 1, 2026
7869c37
feat: Deletion and renaming of columns with upgrade of delta sharing …
bidhan-nagarro Mar 23, 2026
ff92198
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 2, 2026
464829b
feat: review comments addressed
bidhan-nagarro Apr 6, 2026
16c31d1
feat: review comments addressed
bidhan-nagarro Apr 6, 2026
8c7f707
feat: sensor duration reduced
Apr 7, 2026
f32071c
feat: Merged main branch
Apr 7, 2026
702b673
feat: review comments addressed
bidhan-nagarro Apr 7, 2026
600e188
feat: review comments addressed
bidhan-nagarro Apr 7, 2026
b9cff46
feat: review comments addressed
bidhan-nagarro Apr 7, 2026
9559e00
feat: migrate from wasbs to abfss and vectorize admin data functions …
brianmusisi Apr 8, 2026
9b1ffbc
feat: Parquet To delta
bidhan-nagarro Jan 28, 2026
d2bb8f3
feat: PR comments addressed
bidhan-nagarro Mar 6, 2026
9e693c8
feat: PR comments addressed
bidhan-nagarro Mar 9, 2026
783e097
feat: pre-commit fix
bidhan-nagarro Apr 2, 2026
a7c875a
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Apr 8, 2026
9d66f08
feat: review comments addressed
bidhan-nagarro Apr 8, 2026
6d95b8c
feat: error table fixes
bidhan-nagarro Apr 8, 2026
c4c9f81
fix: add country branch
bidhan-nagarro Apr 8, 2026
d829ddb
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Apr 8, 2026
0e78224
feat: TECH-6601 - Add Giga Spatial Lib & DQ Checks
Apr 8, 2026
37a44ff
fix: parquet to delta
bidhan-nagarro Apr 9, 2026
bdc0e99
fix: parquet to delta
bidhan-nagarro Apr 9, 2026
fc4d727
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Apr 9, 2026
6e249f8
feat: Sensor Time reduced
Apr 9, 2026
7e88c90
feat: TECH-6601 - school_area_type_smod removed & rural_urban renamed…
Apr 9, 2026
e59901e
fix: add FixedSASTokenProvider JAR to hive
brianmusisi Apr 9, 2026
8961693
Merge branch 'main' into feature/parquet-to-delta
brianmusisi Apr 9, 2026
8346105
Merge branch 'main' into feature/parquet-to-delta
brianmusisi Apr 9, 2026
fdc89e8
Merge branch 'main' into feature/parquet-to-delta
brianmusisi Apr 9, 2026
eb59a88
fix: error table
bidhan-nagarro Apr 14, 2026
e91ab64
Merge branch 'main' into feature/unified-country-error-table
bidhan-nagarro Apr 14, 2026
c90589c
update the setup to upgrade the Hive metastore
sharky93 Apr 6, 2026
adebbfe
Update metastore-site.template.xml
sharky93 Apr 6, 2026
398a987
feat: migrate from wasbs to abfss and vectorize admin data functions …
brianmusisi Apr 8, 2026
7e2abbe
fix: update asset key check for multi asets (#447)
brianmusisi Apr 8, 2026
a9da8cb
fix: make updates to geolocation_staging idempotent
brianmusisi Apr 8, 2026
fca9e95
feat: add SAS token support for hive with abfss
brianmusisi Apr 8, 2026
6e110b7
fix: add FixedSASTokenProvider JAR to hive
brianmusisi Apr 9, 2026
00ee468
fix: use correct version for jar build
brianmusisi Apr 9, 2026
9d6def7
update timeout for creating the physical table
sharky93 Apr 13, 2026
e731264
fix: merge main branch
bidhan-nagarro Apr 14, 2026
ebdf193
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 14, 2026
14331d8
fix: schema evolution changes
bidhan-nagarro Apr 15, 2026
958e12c
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Apr 15, 2026
a1fcce5
Merge branch 'main' into feature/unified-country-error-table
brianmusisi Apr 17, 2026
7dcc4d9
Merge branch 'main' into feature/dq
bidhan-nagarro Apr 20, 2026
fe9e6c5
Merge branch 'main' into feature/parquet-to-delta
bidhan-nagarro Apr 20, 2026
3c62591
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 20, 2026
fc5b11a
fix: review changes
bidhan-nagarro Apr 20, 2026
00b00a5
fix: review changes
bidhan-nagarro Apr 20, 2026
7e60f44
fix: Changes and Fixes
bidhan-nagarro Apr 20, 2026
896cb18
fix: DQ fixes
bidhan-nagarro Apr 21, 2026
096d9ff
fix: review comments addressed
bidhan-nagarro Apr 21, 2026
5bfc3b8
fix: review comments addressed
bidhan-nagarro Apr 21, 2026
e39b68b
fix: spark error addressed
bidhan-nagarro Apr 22, 2026
dc48213
fix: added parquet to delta
bidhan-nagarro Apr 22, 2026
b5dda44
fix: added dq
bidhan-nagarro Apr 22, 2026
da9fc18
fix: rename branch added
bidhan-nagarro Apr 22, 2026
45b2839
fix: DQ fixes
bidhan-nagarro Apr 22, 2026
848e743
fix: DQ fixes
bidhan-nagarro Apr 22, 2026
546b60c
fix: DQ fixes
bidhan-nagarro Apr 22, 2026
6874b40
fix: DQ fixes
bidhan-nagarro Apr 22, 2026
abbc539
fix: bug fix
bidhan-nagarro Apr 23, 2026
9dfd58a
fix: 'feature/TECH-7468/schema-evolution-with-deletions-and-renaming'…
bidhan-nagarro Apr 23, 2026
f6aa504
fix: dq fixes
bidhan-nagarro Apr 23, 2026
b88a7a9
fix: dq fixes
bidhan-nagarro Apr 23, 2026
1970bc5
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro Apr 23, 2026
b30a332
fix: skip staging check for dq_mode=uploaded
bidhan-nagarro Apr 23, 2026
9cee204
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro Apr 23, 2026
30b261a
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro Apr 23, 2026
1cefbe8
fix: bugfix
bidhan-nagarro Apr 23, 2026
75686dd
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro Apr 24, 2026
8d30924
fix: bugfix
bidhan-nagarro Apr 24, 2026
eab3643
fix: Merge branch 'feature/TECH-7468/schema-evolution-with-deletions-…
bidhan-nagarro Apr 24, 2026
bd74529
Merge branch 'main' into feature/dq
bidhan-nagarro Apr 24, 2026
9bb5fb8
fix: dq fixes
bidhan-nagarro Apr 24, 2026
df0d08b
fix: rename and delete fixed
bidhan-nagarro Apr 27, 2026
4594d73
feat: TECH-9186 School Registration Feature Added
Apr 28, 2026
c85bd3a
feat: Merged School Registration
Apr 28, 2026
7e7806a
feat: TECH-9186 School Registration Feature Added
Apr 28, 2026
b3f708f
feat: Merged School Registration
Apr 28, 2026
8e51fbd
feat: TECH-9186 School Registration Feature Added
Apr 28, 2026
bcdd06c
feat: Merged School Registration
Apr 28, 2026
8251fac
fix: Merge branch 'feature/TECH-7468/schema-evolution-with-deletions-…
bidhan-nagarro Apr 29, 2026
948a244
feat: TECH-9186 School Registration Feature Added
Apr 29, 2026
28de1bd
feat: Merged School Registration
Apr 29, 2026
3c8b18f
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 5, 2026
91835b0
Merge branch 'main' into dev-testing
bidhan-nagarro May 6, 2026
b6b5e5c
feat: TECH-9186 School Registration Feature Added
May 7, 2026
2c510e1
feat: Merged main
May 7, 2026
73f477b
feat: TECH-9186 School Registration Feature Added
May 7, 2026
c6df988
feat: Merge School Registration
May 7, 2026
4d1d5c0
Merge branch 'main' into feature/dq
bidhan-nagarro May 11, 2026
4f01fdb
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 12, 2026
39f56f2
Merge branch 'main' into dev-testing
bidhan-nagarro May 12, 2026
a52d0ab
fix: partition key bug fix
bidhan-nagarro May 12, 2026
68430ca
fix: Merge branch 'feature/TECH-7468/schema-evolution-with-deletions-…
bidhan-nagarro May 12, 2026
17a7002
Merge branch 'main' into feature/TECH-7468/schema-evolution-with-dele…
bidhan-nagarro May 13, 2026
64871b9
Merge branch 'main' into dev-testing
bidhan-nagarro May 13, 2026
9d7986d
fix: catch exception
bidhan-nagarro May 13, 2026
d323595
fix: Merge branch 'feature/TECH-7468/schema-evolution-with-deletions-…
bidhan-nagarro May 13, 2026
1519391
feat: Synced with main branch
May 21, 2026
ab4dc18
feat: Merged 6601
May 21, 2026
054a3b4
Merge branch 'main' into feature/dq
bidhan-nagarro May 21, 2026
da88992
fix: DQ enhancements
bidhan-nagarro May 21, 2026
c3cfd70
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro May 21, 2026
18ff605
feat: run_geospatial_checks added
May 21, 2026
f58f55b
fix: DQ enhancements
bidhan-nagarro May 22, 2026
dcf700b
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro May 22, 2026
f238920
fix: DQ enhancements
bidhan-nagarro May 22, 2026
35f76c3
fix: Merge branch 'feature/dq' into dev-testing
bidhan-nagarro May 22, 2026
577f1dc
feat: Merged main branch
May 25, 2026
528251f
feat: TECH - 6601: Pydentic Issue fixed
May 26, 2026
0e168a5
feat: Merged 6601
May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,6 @@ metastore_db/

# Kubernetes/Helm
secrets.yaml

# GigaSpatial data (Google/MS buildings, GHSL, WorldPop)
dagster/bronze/
2 changes: 2 additions & 0 deletions azure/templates/create-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ jobs:
CATALOG_BASE: "$(catalogBase)"
DATABASE_ID: "$(databaseId)"
SLACK_WORKFLOW_WEBHOOK: "$(slackWorkflowWebhook)"
GIGAMETER_API_BASE_URL: "$(gigameterApiBaseUrl)"
GIGAMETER_API_TOKEN: "$(gigameterApiToken)"

- task: Kubernetes@1
displayName: Create authproxy secrets
Expand Down
2 changes: 2 additions & 0 deletions azure/templates/variables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ variables:
catalogBase: $(CATALOG_BASE)
databaseId: $(DATABASE_ID)
slackWorkflowWebhook: $(SLACK_WORKFLOW_WEBHOOK)
gigameterApiBaseUrl: $(GIGAMETER_API_BASE_URL)
gigameterApiToken: $(GIGAMETER_API_TOKEN)
system.debug: true
1 change: 1 addition & 0 deletions dagster/models/file_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DQStatusEnum(Enum):
ERROR = "ERROR"
TIMEOUT = "TIMEOUT"
SKIPPED = "SKIPPED"
FILE_CHECKED = "FILE_CHECKED"


class FileUpload(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions dagster/models/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
VALID_PRIMITIVES = [
"string",
"integer",
"int",
"long",
"float",
"double",
Expand All @@ -22,6 +23,7 @@ class TypeMapping(BaseModel):
class TypeMappings(BaseModel):
string: TypeMapping
integer: TypeMapping
int: TypeMapping # Alias for integer
long: TypeMapping
float: TypeMapping
double: TypeMapping
Expand Down
3,145 changes: 2,590 additions & 555 deletions dagster/poetry.lock

Large diffs are not rendered by default.

22 changes: 13 additions & 9 deletions dagster/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ readme = "../docs/README.md"
python = ">=3.11,<3.13"
scipy = ">=1.10.0"
cryptography = ">=42.0.4"
sentry-sdk = "0.10.2"
sentry-sdk = ">=1.33.1"
openpyxl = "^3.1.2"
loguru = "^0.7.2"
icecream = "^2.1.3"
Expand All @@ -34,18 +34,19 @@ azure-storage-file-datalake = "^12.13.1"
pytest-cov = "4.0.0"
azure-identity = ">=1.16.1"
geopy = "^2.4.0"
h3 = "^3.7.6"
h3 = "^4.2"
fastparquet = "^2023.8.0"
grpcio-status = "^1.59.0"
acryl-datahub = { extras = ["azure-ad"], version = "0.14.0.2" }
acryl-datahub = { extras = ["azure-ad"], version = "^1.3.1" }
shapely = "^2.0.2"
geopandas = "^0.14.1"
geopandas = "^1.0"
thefuzz = "^0.20.0"
country-converter = "^1.1.1"
sqlalchemy = "^2.0.28"
psycopg2-binary = "^2.9.9"
cuid2 = "^2.0.0"
pydantic = { extras = ["email"], version = "<2.0.0" }
pydantic = { extras = ["email"], version = "^2.10" }
pydantic-settings = "^2.0"
msgraph-sdk = "^1.2.0"
unidecode = "^1.3.8"
aiohttp = ">=3.10.2"
Expand All @@ -60,13 +61,16 @@ urllib3 = ">=2.2.2"
tornado = ">=6.4.1"
xlrd = "^2.0.1"
chardet = "^5.2.0"
giga-spatial = "==0.9.2"
numpy = "^2.4.4"
pandas = "^2.1"
networkx = "^3.2"
earthengine-api = ">=0.1.400"

[tool.poetry.group.spark.dependencies]
pyspark = "3.5.0"
delta-spark = "3.0.0"
deltalake = "0.12.0"
pyarrow = "11.0.0"
pyarrow-hotfix = "^0.5"
pyarrow = "^16.0"
roapi = "0.9.0"

[tool.poetry.group.notebook.dependencies]
Expand Down Expand Up @@ -124,7 +128,7 @@ combine-as-imports = true
force-wrap-aliases = true

[tool.ruff.lint.pep8-naming]
classmethod-decorators = ["pydantic.validator"]
classmethod-decorators = ["pydantic.field_validator"]

[tool.bandit]
tests = [
Expand Down
65 changes: 0 additions & 65 deletions dagster/src/assets/adhoc/master_csv_to_gold.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
check_table_exists,
create_delta_table,
create_schema,
sync_schema,
)
from src.utils.logger import ContextLoggerWithLoguruFallback
from src.utils.metadata import get_output_metadata, get_table_preview
Expand Down Expand Up @@ -608,38 +607,6 @@ def adhoc__publish_master_to_gold(
)
gold = compute_row_hash(gold)

table_exists = check_table_exists(
spark=spark.spark_session,
schema_name="school_master",
table_name=config.country_code.lower(),
data_tier=DataTier.GOLD,
)

if table_exists:
table_name = f"{config.metastore_schema}.{config.country_code}"
updated_schema = StructType(
get_schema_columns(
spark=spark.spark_session, schema_name=config.metastore_schema
)
)

context.log.info(f"Existing table name: {table_name}")

spark.spark_session.catalog.refreshTable(table_name)
existing_df = DeltaTable.forName(
sparkSession=spark.spark_session, tableOrViewName=table_name
).toDF()

existing_schema = existing_df.schema

sync_schema(
table_name=table_name,
existing_schema=existing_schema,
updated_schema=updated_schema,
spark=spark.spark_session,
context=context,
)

schema_reference = get_schema_columns_datahub(
spark.spark_session,
config.metastore_schema,
Expand Down Expand Up @@ -690,38 +657,6 @@ def adhoc__publish_reference_to_gold(
)
gold = compute_row_hash(gold)

table_exists = check_table_exists(
spark=spark.spark_session,
schema_name="school_reference",
table_name=config.country_code.lower(),
data_tier=DataTier.GOLD,
)

if table_exists:
table_name = f"{config.metastore_schema}.{config.country_code}"
updated_schema = StructType(
get_schema_columns(
spark=spark.spark_session, schema_name=config.metastore_schema
)
)

context.log.info(f"Existing table name: {table_name}")

spark.spark_session.catalog.refreshTable(table_name)
existing_df = DeltaTable.forName(
sparkSession=spark.spark_session,
tableOrViewName=table_name,
).toDF()
existing_schema = existing_df.schema

sync_schema(
table_name=table_name,
existing_schema=existing_schema,
updated_schema=updated_schema,
spark=spark.spark_session,
context=context,
)

schema_reference = get_schema_columns_datahub(
spark.spark_session,
config.metastore_schema,
Expand Down
6 changes: 4 additions & 2 deletions dagster/src/assets/admin/assets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Annotated

from dagster_pyspark import PySparkResource
from delta import DeltaTable
from pydantic import conint
from pydantic import Field
from pyspark.sql import SparkSession
from src.constants.constants_class import constants
from src.settings import settings
Expand Down Expand Up @@ -47,7 +49,7 @@ def admin__terminate_all_runs(context: OpExecutionContext):
class RollbackTableConfig(Config):
schema_name: str
table_name: str
version: conint(ge=0)
version: Annotated[int, Field(ge=0)]


@asset
Expand Down
6 changes: 3 additions & 3 deletions dagster/src/assets/common/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ def reset_staging_table(
raise


def _handle_null_columns(schema_columns, primary_key):
def handle_null_columns(schema_columns, primary_key):
"""Handle null columns by providing default values based on data type.

If the column value is NULL, add a placeholder value if the following
Expand Down Expand Up @@ -790,7 +790,7 @@ def master(
else:
new_master = silver

column_actions = _handle_null_columns(schema_columns, primary_key)
column_actions = handle_null_columns(schema_columns, primary_key)
new_master = new_master.withColumns(column_actions)
new_master = compute_row_hash(new_master)

Expand Down Expand Up @@ -862,7 +862,7 @@ def reference(
else:
new_reference = silver

column_actions = _handle_null_columns(schema_columns, primary_key)
column_actions = handle_null_columns(schema_columns, primary_key)
new_reference = new_reference.withColumns(column_actions)
new_reference = compute_row_hash(new_reference)

Expand Down
4 changes: 2 additions & 2 deletions dagster/src/assets/debug/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ async def debug__send_test_email(
res = await client.post(
"/api/email/send-email",
headers={"Authorization": f"Bearer {settings.EMAIL_RENDERER_BEARER_TOKEN}"},
json=config.dict(),
json=config.model_dump(),
)
if res.is_error:
context.log.error(res.json())
res.raise_for_status()

return Output(None, metadata=config.dict())
return Output(None, metadata=config.model_dump())
29 changes: 24 additions & 5 deletions dagster/src/assets/migrations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from models import VALID_PRIMITIVES, Schema
from pyspark import sql
from pyspark.sql.functions import col, when
from src.utils.delta import execute_query_with_error_handler
from src.utils.delta import (
create_delta_table,
persist_column_id_map,
sync_schema,
)

from dagster import OpExecutionContext

Expand Down Expand Up @@ -42,17 +46,32 @@ def save_schema_delta_table(context: OpExecutionContext, df: sql.DataFrame):
full_table_name = f"{schema_name}.{table_name}"

columns = Schema.fields
query = (
DeltaTable.createOrReplace(spark).tableName(full_table_name).addColumns(columns)
create_delta_table(
spark,
schema_name,
table_name,
columns,
context,
if_not_exists=True,
)
sync_schema(
table_name=full_table_name,
existing_schema=spark.table(full_table_name).schema,
updated_schema=Schema.schema,
spark=spark,
context=context,
)
execute_query_with_error_handler(spark, query, schema_name, table_name, context)

spark.catalog.refreshTable(full_table_name)
(
DeltaTable.forName(spark, full_table_name)
.alias("master")
.merge(df.alias("updates"), "master.name = updates.name")
.merge(df.alias("updates"), "master.id = updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)

# Persist column-ID mapping after merge succeeds
persist_column_id_map(spark, full_table_name, table_name)
32 changes: 26 additions & 6 deletions dagster/src/assets/school_geolocation/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pyspark.sql.types import StringType, StructField, StructType
from sqlalchemy import select
from src.constants import DataTier
from src.data_quality_checks.dq_context import DQContext, DQMode
from src.data_quality_checks.utils import (
aggregate_report_json,
aggregate_report_spark_df,
Expand Down Expand Up @@ -128,10 +129,19 @@ def geolocation_metadata(
context.log.info("Create spark dataframe")
metadata_df = s.createDataFrame(metadata_df)

table_columns = get_schema_columns(s, "school_geolocation_metadata")
table_name = "school_geolocation_metadata"
table_schema_name = "pipeline_tables"

context.log.info("Get schema columns for metadata table")
try:
table_columns = get_schema_columns(s, "school_geolocation_metadata")
except Exception:
context.log.warning(
"Schema table schemas.school_geolocation_metadata not found; "
"using DataFrame schema for metadata table creation"
)
table_columns = list(metadata_df.schema.fields)

context.log.info("Create the schema and table if they do not exist")
metadata_df = add_missing_columns(metadata_df, table_columns)
metadata_df = metadata_df.select(*StructType(table_columns).fieldNames())
Expand Down Expand Up @@ -228,7 +238,7 @@ def geolocation_bronze(
df = df.withColumn("school_id_govt", f.col("school_id_govt").cast(StringType()))

df = create_bronze_layer_columns_updated(
df, mode, uploaded_columns, country_code, s
df, mode, uploaded_columns, country_code, file_upload.source, s
)

t2 = time.time()
Expand Down Expand Up @@ -301,9 +311,13 @@ def geolocation_data_quality_results(
dq_results = row_level_checks(
df=renamed_bronze,
silver=casted_silver,
dataset_type=dataset_type,
_country_code_iso3=country_code,
mode=config.metadata["mode"],
dq_context=DQContext(
dq_mode=DQMode(config.metadata.get("dq_mode", "master")),
dataset_type=dataset_type,
country_code_iso3=country_code,
upload_id=id,
upload_mode=config.metadata.get("mode"),
),
context=context,
)

Expand All @@ -327,7 +341,9 @@ def geolocation_data_quality_results(
)

dq_results_schema_name = f"{schema_name}_dq_results"
table_name = f"{id}_{country_code}_{current_timestamp}"
# Replace hyphens with underscores so the identifier is valid in Spark SQL
safe_id = id.replace("-", "_")
table_name = f"{safe_id}_{country_code}_{current_timestamp}"

schema_columns = [
StructField(field.name, field.dataType, nullable=True)
Expand Down Expand Up @@ -696,6 +712,10 @@ def geolocation_staging(
spark: PySparkResource,
config: FileConfig,
) -> Output[None]:
if config.metadata.get("dq_mode") == "uploaded":
context.log.info("Skipping staging as dq_mode is 'uploaded'")
return Output(None)

if geolocation_dq_passed_rows.isEmpty():
context.log.warning("Skipping staging as there are no rows passing DQ checks")
return Output(None)
Expand Down
Loading