Skip to content

Feature/dq#444

Open
bidhan-nagarro wants to merge 21 commits into
mainfrom
feature/dq
Open

Feature/dq#444
bidhan-nagarro wants to merge 21 commits into
mainfrom
feature/dq

Conversation

@bidhan-nagarro
Copy link
Copy Markdown
Collaborator

What type of PR is this?

  • build: Commits that affect build components like build tool, dependencies, project
    version
  • chore: Miscellaneous commits (e.g. modifying .gitignore)
  • ci: Commits are special build commits that affect the CI/CD pipeline
  • docs: Commits that affect documentation only
  • feat: Commits that add a new feature
  • fix: Commits that fix a bug
  • perf: Commits are special refactor commits that improve performance
  • refactor: Commits that rewrite/restructure your code, however does not change any
    behaviour
  • revert: Commits that revert another commit/PR, usually can be autogenerated on
    GitHub or using git revert
  • style: Commits are special refactor commits that edit the code to comply with a
    code style, linter, or formatter
  • test: Commits that add missing tests or correcting existing tests

Summary

What does this PR do

How to test

  1. Instructions on how to test
  2. Specify which files to review
  3. etc.

Link to Jira/Asana/Airtable task (if applicable)

placeholder

Wireframe screenshot/screencap (if applicable)

placeholder

Implementation screenshot/screencap (if applicable)

placeholder

Copy link
Copy Markdown
Collaborator

@brianmusisi brianmusisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a bunch of comments. In the end, where I've added We need this
are the only areas we need to change.

from pyspark.sql.types import NullType, StructType
from sqlalchemy import select, update
from src.constants import DataTier
from src.data_quality_checks.dq_context import DQContext, DQMode
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes in this file or this flow

dt = DeltaTable.forName(s, f"school_master.{config.country_code}")
master = dt.toDF()

dq_context = DQContext(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no changes in this file or this flow

TimestampType,
)
from src.constants import DataTier, constants
from src.data_quality_checks.dq_context import DQContext, DQMode
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes to this file/flow, this only affects school_geolocation

)
columns = get_schema_columns(s, f"coverage_{source}")
df = add_missing_columns(df, columns)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

else:
silver = s.createDataFrame(s.sparkContext.emptyRDD(), schema=schema)

dq_context = DQContext(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes here

country_code = filename_components.country_code
metadata = adls_file_client.fetch_metadata_for_blob(adls_filepath) or {}

# Only process files with dq_mode="master" for the full merge pipeline.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not relevant because the approval workflow from Giga Sync only sends data that can be merged. This only applies before data has been sent for approval

size = properties.size

run_key = str(path)
try:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what this is doing


renamed_bronze = casted_bronze.withColumnRenamed("signature", "dq_signature")

dq_context = DQContext(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the DQContext changes do not seem necessary in this P

elif dataset_type == "geolocation":
if mode == UploadMode.CREATE.value:

elif dq_context.dataset_type == "geolocation":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this:
Here, we need to only run this if the DQmode is "master"

Or set the resulting is_not_create and is_not_update` columns to 0 so that they don't cause critical errors when DQMode is not master

schema = StructType(columns)

if check_table_exists(s, schema_name, country_code, DataTier.SILVER):
if dq_mode == DQMode.MASTER and check_table_exists(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this:

This requires no change

Copy link
Copy Markdown
Collaborator

@brianmusisi brianmusisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the requested changes

Copy link
Copy Markdown
Collaborator

@brianmusisi brianmusisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to add, is that when running on just the uploaded file, geolocation_staging shouldn't be run. We don't want to add data to the staging tables for approval

@bidhan-nagarro
Copy link
Copy Markdown
Collaborator Author

One thing to add, is that when running on just the uploaded file, geolocation_staging shouldn't be run. We don't want to add data to the staging tables for approval

@brianmusisi changes done.

Comment on lines +674 to +680
# For assessment-only (uploaded mode), skip cross-checks against silver
# but ensure the columns exist to avoid downstream errors.
if dq_context.upload_mode == UploadMode.CREATE.value:
df = df.withColumn("dq_is_not_create", f.lit(0))
elif dq_context.upload_mode == UploadMode.UPDATE.value:
df = df.withColumn("dq_is_not_update", f.lit(0))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't want to do this, so we can skip it


def row_level_checks_internal(
df: sql.DataFrame,
dq_context: DQContext,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we've added dq_context, it will break the uses of this function in row_level_checks for the master and reference checks

context.log.info(f"FILE: {path}")
yield RunRequest(
run_key=str(path),
run_key=f"{path}:{last_modified}:{dq_triggered_at}",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the run key will make all previous uploaded files run again since they will look new. I have seen this happen on dev already. Whatever we do, we should not change this

Copy link
Copy Markdown
Collaborator

@brianmusisi brianmusisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the comments I've added

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants