Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,8 @@ tasks:
desc: Spawn a Beeline shell
cmds:
- task exec -- -it hive-metastore beeline -u {{.HMS_DATABASE_URL}} -n {{.HMS_POSTGRESQL_USERNAME}} -p {{.HMS_POSTGRESQL_PASSWORD}}
test:
desc: Run tests
dir: dagster
cmds:
- poetry run pytest {{.CLI_ARGS}}
30 changes: 30 additions & 0 deletions azure/templates/test-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,33 @@ stages:

- script: pre-commit run --all-files
displayName: Run pre-commit

- job: Pytest
displayName: Run pytest
strategy:
matrix:
Python311:
python.version: '3.11'
steps:
- task: UsePythonVersion@0
displayName: 'Use Python $(python.version)'
inputs:
versionSpec: '$(python.version)'

- script: python -m pip install --upgrade pip poetry
displayName: Install Poetry

- script: poetry install --with dev,pipelines --no-root
displayName: Install dependencies
workingDirectory: $(Build.SourcesDirectory)/dagster

- script: poetry run pytest --junitxml=junit/test-results.xml
displayName: Run tests
workingDirectory: $(Build.SourcesDirectory)/dagster

- task: PublishTestResults@2
displayName: Publish test results
condition: succeededOrFailed()
inputs:
testResultsFiles: '**/test-results.xml'
testRunTitle: 'Pytest $(python.version)'
53 changes: 37 additions & 16 deletions dagster/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dagster/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ urllib3 = ">=2.2.2"
tornado = ">=6.4.1"
xlrd = "^2.0.1"
chardet = "^5.2.0"
pytest-asyncio = "^1.3.0"

[tool.poetry.group.spark.dependencies]
pyspark = "3.5.0"
Expand Down
19 changes: 19 additions & 0 deletions dagster/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# pytest.ini
[pytest]
addopts =
-v
--disable-warnings
--maxfail=1
--cov=src
--cov-report=term-missing
--cov-report=xml
--cov-report=html
--cov-fail-under=70
--junitxml=junit/test-results.xml
testpaths = tests
python_files = test_*.py
asyncio_mode = auto
python_classes = Test*
python_functions = test_*
env_files =
.env
22 changes: 22 additions & 0 deletions dagster/tests/assets/adhoc/test_custom_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from src.assets.adhoc.custom_dataset import custom_dataset_raw
from src.constants import DataTier
from src.utils.op_config import FileConfig


def test_custom_dataset_raw_downloads_file(spark_session, mock_adls_client, op_context):
mock_adls_client.download_raw.return_value = b"test,data\n1,2\n"
config = FileConfig(
filepath="custom_data/TEST/file.csv",
dataset_type="custom",
tier=DataTier.RAW,
country_code="TEST",
destination_filepath="",
file_size_bytes=10,
metastore_schema="custom",
domain="custom",
table_name="table",
)
result = custom_dataset_raw(op_context, mock_adls_client, config)
assert result.value == b"test,data\n1,2\n"
assert result.metadata is not None
mock_adls_client.download_raw.assert_called_once_with("custom_data/TEST/file.csv")
23 changes: 23 additions & 0 deletions dagster/tests/assets/adhoc/test_generate_mock_table_with_cdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from src.assets.adhoc import generate_mock_table_with_cdf
from src.assets.adhoc.generate_mock_table_with_cdf import (
adhoc__copy_original,
adhoc__generate_v2,
adhoc__generate_v3,
)


def test_adhoc_copy_original_constants():
assert generate_mock_table_with_cdf.SOURCE_TABLE_NAME == "school_master.ben"
assert generate_mock_table_with_cdf.ZCDF_TABLE_NAME == "school_master.zcdf"


def test_adhoc_copy_original_asset_exists():
assert callable(adhoc__copy_original)


def test_adhoc_generate_v2_asset_exists():
assert callable(adhoc__generate_v2)


def test_adhoc_generate_v3_asset_exists():
assert callable(adhoc__generate_v3)
63 changes: 63 additions & 0 deletions dagster/tests/assets/adhoc/test_generate_silver_from_gold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from src.assets.adhoc import generate_silver_from_gold
from src.assets.adhoc.generate_silver_from_gold import (
DataTier,
DeltaTable,
add_missing_columns,
adhoc__generate_silver_coverage_from_gold,
adhoc__generate_silver_geolocation_from_gold,
check_table_exists,
compute_row_hash,
constants,
execute_query_with_error_handler,
f,
get_schema_columns,
get_table_preview,
transform_types,
)


def test_module_imports():
assert generate_silver_from_gold is not None


def test_adhoc_generate_silver_geolocation_from_gold_exists():
assert callable(adhoc__generate_silver_geolocation_from_gold)


def test_adhoc_generate_silver_coverage_from_gold_exists():
assert callable(adhoc__generate_silver_coverage_from_gold)


def test_imports_delta_table():
assert DeltaTable is not None


def test_imports_spark_functions():
assert f is not None


def test_imports_constants():
assert DataTier is not None
assert constants is not None


def test_imports_transform_functions():
assert callable(add_missing_columns)


def test_imports_delta_utils():
assert callable(check_table_exists)
assert callable(execute_query_with_error_handler)


def test_imports_metadata_utils():
assert callable(get_table_preview)


def test_imports_schema_utils():
assert callable(get_schema_columns)


def test_imports_spark_utils():
assert callable(compute_row_hash)
assert callable(transform_types)
79 changes: 79 additions & 0 deletions dagster/tests/assets/adhoc/test_health_master_csv_to_gold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest
from pyspark.sql.types import (
StringType,
StructField,
)
from src.assets.adhoc.health_master_csv_to_gold import (
adhoc__health_master_data_transforms,
adhoc__load_health_master_csv,
)

from dagster import Output


@pytest.mark.asyncio
async def test_adhoc__load_health_master_csv(
mock_adls_client, mock_file_config, op_context
):
mock_adls_client.download_raw.return_value = b"raw_content"
result = adhoc__load_health_master_csv(
op_context, mock_adls_client, mock_file_config
)
assert result.value == b"raw_content"


@pytest.mark.asyncio
async def test_adhoc__health_master_data_transforms_functional(
spark_session, mock_file_config, op_context
):
# Setup test data
raw_csv = b"name,lat,lon\nHealth A,12.3,45.6"

mock_adls_client = MagicMock()
mock_spark = MagicMock()
mock_spark.spark_session = spark_session

# Mock schema columns
mock_cols = [
StructField("name", StringType()),
StructField("lat", StringType()),
StructField("lon", StringType()),
StructField("health_id_giga", StringType()),
]

with (
patch(
"src.assets.adhoc.health_master_csv_to_gold.get_schema_columns",
return_value=mock_cols,
),
patch(
"src.assets.adhoc.health_master_csv_to_gold.get_output_metadata",
return_value={},
),
patch(
"src.assets.adhoc.health_master_csv_to_gold.get_table_preview",
return_value="preview",
),
patch(
"src.spark.transform_functions.get_admin_boundaries", return_value=None
), # Mock to return 'Unknown'
):
result = await adhoc__health_master_data_transforms(
context=op_context,
adhoc__load_health_master_csv=raw_csv,
spark=mock_spark,
adls_file_client=mock_adls_client,
config=mock_file_config,
)

assert isinstance(result, Output)
df = result.value
assert isinstance(df, pd.DataFrame)
assert len(df) == 1
assert "health_id_giga" in df.columns
assert "admin1" in df.columns
assert df.iloc[0]["admin1"] == "Unknown"
mock_adls_client.upload_pandas_dataframe_as_file.assert_called()
Loading
Loading