Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions api/data_ingestion/routers/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,107 @@ async def validate_fuzzy_matching(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error running fuzzy matching validation: {str(e)}",
) from e


@router.get("/dq_kit/{upload_id}/download")
async def download_dq_kit(
upload_id: str,
db: AsyncSession = Depends(get_db),
is_privileged: bool = Depends(IsPrivileged.raises(False)),
user: User = Depends(azure_scheme),
):
"""Download a complete DQ Kit ZIP for a given upload."""
from data_ingestion.utils.dq_kit_generator import generate_dq_kit_zip

file_upload = await db.scalar(select(FileUpload).where(FileUpload.id == upload_id))
if file_upload is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File Upload ID does not exist",
)

if (
not is_privileged
and file_upload.uploader_email != user.claims.get("emails", ["NONE"])[0]
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to access this file.",
)

if file_upload.dq_status != DQStatusEnum.COMPLETED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"DQ Kit is not available. DQ Status: {file_upload.dq_status.value}",
)

try:
logger.info(f"Generating DQ Kit for upload_id: {upload_id}")
zip_buffer, filename = generate_dq_kit_zip(file_upload)

return StreamingResponse(
io.BytesIO(zip_buffer.read()),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
except Exception as e:
logger.error(f"Error generating DQ Kit: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error generating DQ Kit: {str(e)}",
) from e


@router.get("/map/{upload_id}")
async def get_school_map(
upload_id: str,
db: AsyncSession = Depends(get_db),
is_privileged: bool = Depends(IsPrivileged.raises(False)),
user: User = Depends(azure_scheme),
):
"""Serve the interactive school-location HTML map for a given upload."""
from data_ingestion.utils.dq_kit_generator import get_map_blob_path

file_upload = await db.scalar(select(FileUpload).where(FileUpload.id == upload_id))
if file_upload is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="File Upload ID does not exist",
)

if (
not is_privileged
and file_upload.uploader_email != user.claims.get("emails", ["NONE"])[0]
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to access this file.",
)

map_path = get_map_blob_path(file_upload)
map_filename = Path(map_path).name
logger.info(f"Attempting to serve map from: {map_path}")

blob = storage_client.get_blob_client(map_path)
if not blob.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Map not found. It may not have been generated yet.",
)

try:
stream = blob.download_blob()
return StreamingResponse(
stream.chunks(),
media_type="text/html",
headers={
"Content-Disposition": f"inline; filename={map_filename}",
"X-Frame-Options": "SAMEORIGIN",
},
)
except Exception as e:
logger.error(f"Error serving map: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error loading map: {str(e)}",
) from e
110 changes: 110 additions & 0 deletions api/data_ingestion/utils/dq_kit_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Utility for serving DQ Kit ZIP files in the ingestion API.
Tries to serve the pre-generated ZIP from Dagster first
"""

import io
from pathlib import Path

from loguru import logger

from azure.core.exceptions import ResourceNotFoundError
from data_ingestion.internal.storage import storage_client
from data_ingestion.models.file_upload import FileUpload


class DQKitManager:
"""Serve DQ Kit ZIP files from individual ADLS artifacts."""

def __init__(self, file_upload: FileUpload):
self.file_upload = file_upload
self.dataset = file_upload.dataset
self.country = file_upload.country

if file_upload.dq_full_path:
self.stem = Path(file_upload.dq_full_path).stem
else:
self.stem = Path(file_upload.original_filename or "").stem

@staticmethod
def _get_blob_if_exists(blob_path: str | None) -> bytes | None:
if not blob_path:
return None
try:
blob_client = storage_client.get_blob_client(blob_path)
if blob_client.exists():
logger.info(f"Found file: {blob_path}")
return blob_client.download_blob().readall()
logger.warning(f"File not found: {blob_path}")
return None
except ResourceNotFoundError:
logger.warning(f"File not found: {blob_path}")
return None
except Exception as e:
logger.error(f"Error downloading {blob_path}: {e}")
return None

@property
def _dataset_prefix(self) -> str:
return (
f"school-{self.dataset}"
if self.dataset not in ("unstructured", "structured")
else self.dataset
)

def _file_paths(self) -> dict[str, str | None]:
prefix = self._dataset_prefix
country = self.country
stem = self.stem
dq_root = f"data-quality-results/{prefix}"

# Pre-built ZIP path (from Dagster `geolocation_dq_kit_zip` asset)
prebuilt_zip = (
f"{dq_root}/dq-kit/{country}/DQ_Kit_{country}_{self.dataset}_{stem}.zip"
)

return {
"prebuilt_zip": prebuilt_zip,
"raw_data": self.file_upload.upload_path,
"dq_summary_json": self.file_upload.dq_report_path,
"dq_report_txt": f"{dq_root}/dq-report/{country}/{stem}.txt",
"passed_rows": f"{dq_root}/dq-passed-rows-human-readable/{country}/{stem}.csv",
"failed_rows": f"{dq_root}/dq-failed-rows-human-readable/{country}/{stem}.csv",
"dq_full_report": self.file_upload.dq_full_path,
"map_html": f"{dq_root}/dq-map/{country}/school_map_{country}_{stem}.html",
}

def map_blob_path(self) -> str:
"""Return the conventional map HTML blob path for this upload."""
return self._file_paths()["map_html"] # type: ignore[return-value]

def generate_zip(self) -> io.BytesIO:
"""
Return ZIP bytes. Prefers the pre-built ZIP from Dagster; otherwise
builds one on-demand from the available artifacts.
"""
paths = self._file_paths()

# Fast path: pre-built ZIP already exists
if prebuilt := self._get_blob_if_exists(paths["prebuilt_zip"]):
logger.info("Serving pre-built DQ Kit ZIP from Dagster")
buffer = io.BytesIO(prebuilt)
buffer.seek(0)
return buffer

logger.info("Pre-built DQ Kit not found. Building on-demand.")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Dagster prebuilt dq kit is not found, it's just returning None and on uploads.py will break. I think there is not on-demand build or any fallback case

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Javiershenbc for old pipelines on demand zip was not required as per the requirements, so let me know how we want to handle this, just hide the button?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just hide the button in that case, so there is less confusion for the user. In case of future changes, we can manage this small change.

cc: @brianmusisi

return None

def get_zip_filename(self) -> str:
return f"DQ_Kit_{self.country}_{self.dataset}_{self.file_upload.id}.zip"


def generate_dq_kit_zip(file_upload: FileUpload) -> tuple[io.BytesIO, str]:
"""Convenience function returning (zip_buffer, filename)."""
generator = DQKitManager(file_upload)
return generator.generate_zip(), generator.get_zip_filename()


def get_map_blob_path(file_upload: FileUpload) -> str:
"""Return the conventional map HTML blob path for an upload."""
return DQKitManager(file_upload).map_blob_path()
18 changes: 18 additions & 0 deletions ui/src/api/routers/uploads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ export default function routes(axi: AxiosInstance) {
});
},

download_dq_kit: (params: {
upload_id: string;
}): Promise<AxiosResponse<Blob>> => {
const { upload_id } = params;
return axi.get(`upload/dq_kit/${upload_id}/download`, {
responseType: "blob",
});
},

download_map: (params: {
upload_id: string;
}): Promise<AxiosResponse<Blob>> => {
const { upload_id } = params;
return axi.get(`upload/map/${upload_id}`, {
responseType: "blob",
});
},

list_basic_checks: (
dataset: string,
source: string | null,
Expand Down
20 changes: 20 additions & 0 deletions ui/src/components/check-file-uploads/Downloadlogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ export function useDownloadHelpers(uploadData: UploadResponse) {
mutationFn: api.uploads.download_raw_file,
});

const { mutateAsync: downloadDqKit } = useMutation({
mutationFn: api.uploads.download_dq_kit,
});

const { mutateAsync: downloadMap } = useMutation({
mutationFn: api.uploads.download_map,
});

function getFilenameFromFullPath(): string {
const pathParts = uploadData.dq_full_path?.split("/") || [];
return pathParts[pathParts.length - 1];
Expand Down Expand Up @@ -69,10 +77,22 @@ export function useDownloadHelpers(uploadData: UploadResponse) {
if (blob) saveFile(blob);
}

async function handleDownloadDqKit() {
const blob = await downloadDqKit({ upload_id: uploadData.id });
if (blob) saveFile(blob);
}

async function handleDownloadMap() {
const blob = await downloadMap({ upload_id: uploadData.id });
if (blob) saveFile(blob);
}

return {
handleDownloadFailedRows,
handleDownloadPassedRows,
handleDownloadDqSummary,
handleDownloadRawFile,
handleDownloadDqKit,
handleDownloadMap,
};
}
Loading