Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,6 @@ metastore_db/

# Kubernetes/Helm
secrets.yaml

# GigaSpatial data (Google/MS buildings, GHSL, WorldPop)
dagster/bronze/
3,145 changes: 2,590 additions & 555 deletions dagster/poetry.lock

Large diffs are not rendered by default.

22 changes: 13 additions & 9 deletions dagster/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ readme = "../docs/README.md"
python = ">=3.11,<3.13"
scipy = ">=1.10.0"
cryptography = ">=42.0.4"
sentry-sdk = "0.10.2"
sentry-sdk = ">=1.33.1"
openpyxl = "^3.1.2"
loguru = "^0.7.2"
icecream = "^2.1.3"
Expand All @@ -34,18 +34,19 @@ azure-storage-file-datalake = "^12.13.1"
pytest-cov = "4.0.0"
azure-identity = ">=1.16.1"
geopy = "^2.4.0"
h3 = "^3.7.6"
h3 = "^4.2"
fastparquet = "^2023.8.0"
grpcio-status = "^1.59.0"
acryl-datahub = { extras = ["azure-ad"], version = "0.14.0.2" }
acryl-datahub = { extras = ["azure-ad"], version = "^1.3.1" }
shapely = "^2.0.2"
geopandas = "^0.14.1"
geopandas = "^1.0"
thefuzz = "^0.20.0"
country-converter = "^1.1.1"
sqlalchemy = "^2.0.28"
psycopg2-binary = "^2.9.9"
cuid2 = "^2.0.0"
pydantic = { extras = ["email"], version = "<2.0.0" }
pydantic = { extras = ["email"], version = "^2.10" }
pydantic-settings = "^2.0"
msgraph-sdk = "^1.2.0"
unidecode = "^1.3.8"
aiohttp = ">=3.10.2"
Expand All @@ -60,13 +61,16 @@ urllib3 = ">=2.2.2"
tornado = ">=6.4.1"
xlrd = "^2.0.1"
chardet = "^5.2.0"
giga-spatial = "==0.9.2"
numpy = "^2.4.4"
pandas = "^2.1"
networkx = "^3.2"
earthengine-api = ">=0.1.400"

[tool.poetry.group.spark.dependencies]
pyspark = "3.5.0"
delta-spark = "3.0.0"
deltalake = "0.12.0"
pyarrow = "11.0.0"
pyarrow-hotfix = "^0.5"
pyarrow = "^16.0"
roapi = "0.9.0"

[tool.poetry.group.notebook.dependencies]
Expand Down Expand Up @@ -124,7 +128,7 @@ combine-as-imports = true
force-wrap-aliases = true

[tool.ruff.lint.pep8-naming]
classmethod-decorators = ["pydantic.validator"]
classmethod-decorators = ["pydantic.field_validator"]

[tool.bandit]
tests = [
Expand Down
6 changes: 4 additions & 2 deletions dagster/src/assets/admin/assets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Annotated

from dagster_pyspark import PySparkResource
from delta import DeltaTable
from pydantic import conint
from pydantic import Field
from pyspark.sql import SparkSession
from src.constants.constants_class import constants
from src.settings import settings
Expand Down Expand Up @@ -47,7 +49,7 @@ def admin__terminate_all_runs(context: OpExecutionContext):
class RollbackTableConfig(Config):
schema_name: str
table_name: str
version: conint(ge=0)
version: Annotated[int, Field(ge=0)]


@asset
Expand Down
4 changes: 2 additions & 2 deletions dagster/src/assets/debug/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ async def debug__send_test_email(
res = await client.post(
"/api/email/send-email",
headers={"Authorization": f"Bearer {settings.EMAIL_RENDERER_BEARER_TOKEN}"},
json=config.dict(),
json=config.model_dump(),
)
if res.is_error:
context.log.error(res.json())
res.raise_for_status()

return Output(None, metadata=config.dict())
return Output(None, metadata=config.model_dump())
53 changes: 30 additions & 23 deletions dagster/src/constants/constants_class.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import ClassVar

from datahub.metadata.schema_classes import (
BooleanTypeClass,
Expand All @@ -7,7 +8,7 @@
StringTypeClass,
)
from models import TypeMapping, TypeMappings
from pydantic import BaseSettings
from pydantic_settings import BaseSettings
from pyspark.sql.types import (
BooleanType,
DoubleType,
Expand All @@ -22,32 +23,38 @@
class Constants(BaseSettings):
UPLOAD_PATH_PREFIX: str = "raw/uploads"
UPLOAD_METADATA_PATH_PREFIX: str = "raw/upload_metadata"
datetime_partition_key_format = "%Y-%m-%d-%H:%M"
datetime_partition_key_format: ClassVar[str] = "%Y-%m-%d-%H:%M"

connectivity_updates_folder = "raw/connectivity_updates"
raw_folder = "raw" # if settings.IN_PRODUCTION else "adls-testing-raw"
raw_schema_folder = "raw/schema"
raw_schema_folder_source = "raw/schema"
bronze_folder = "bronze"
silver_folder = "silver"
gold_folder = "gold"
dq_results_folder = "data-quality-results"
staging_folder = "staging"
connectivity_updates_folder: ClassVar[str] = "raw/connectivity_updates"
raw_folder: ClassVar[str] = (
"raw" # if settings.IN_PRODUCTION else "adls-testing-raw"
)
raw_schema_folder: ClassVar[str] = "raw/schema"
raw_schema_folder_source: ClassVar[str] = "raw/schema"
bronze_folder: ClassVar[str] = "bronze"
silver_folder: ClassVar[str] = "silver"
gold_folder: ClassVar[str] = "gold"
dq_results_folder: ClassVar[str] = "data-quality-results"
staging_folder: ClassVar[str] = "staging"

dq_passed_folder = "staging/pending-review"
staging_approved_folder = "staging/approved"
archive_manual_review_rejected_folder = "archive/manual-review-rejected"
gold_source_folder = "updated_master_schema"
adhoc_master_updates_source_folder = "updated_master_schema/master_updates"
qos_source_folder = "gold/qos"
qos_raw_source_folder = "gold/qos-raw"
error_folder = "error"
dq_passed_folder: ClassVar[str] = "staging/pending-review"
staging_approved_folder: ClassVar[str] = "staging/approved"
archive_manual_review_rejected_folder: ClassVar[str] = (
"archive/manual-review-rejected"
)
gold_source_folder: ClassVar[str] = "updated_master_schema"
adhoc_master_updates_source_folder: ClassVar[str] = (
"updated_master_schema/master_updates"
)
qos_source_folder: ClassVar[str] = "gold/qos"
qos_raw_source_folder: ClassVar[str] = "gold/qos-raw"
error_folder: ClassVar[str] = "error"

# can't set infinite, just set to a value most likely beyond the extinction of the human race
school_master_retention_period = "interval 1000000 weeks"
qos_retention_period = "interval 90 days"
PING_PARQUET_PATH = "giga_meter/ping"
PING_RETRY_FILE_PATH = "giga_meter/ping/.retry.json"
school_master_retention_period: ClassVar[str] = "interval 1000000 weeks"
qos_retention_period: ClassVar[str] = "interval 90 days"
PING_PARQUET_PATH: ClassVar[str] = "giga_meter/ping"
PING_RETRY_FILE_PATH: ClassVar[str] = "giga_meter/ping/.retry.json"

TYPE_MAPPINGS: TypeMappings = TypeMappings(
string=TypeMapping(
Expand Down
Loading