-
Notifications
You must be signed in to change notification settings - Fork 86
A support for parsing compressed data products in fprime-dp #309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Changes from all commits
9071fab
be68320
52156b2
6ea177f
153c962
cca3be7
057ee67
e4c8f9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,9 @@ | |
| from pathlib import Path | ||
| from typing import Dict, List, Any, Optional | ||
| import dataclasses | ||
| import re | ||
| from io import BytesIO | ||
| import zlib | ||
|
|
||
| from fprime_gds.common.dp.common import ( | ||
| ChecksumConfig, | ||
|
|
@@ -85,7 +88,7 @@ | |
| - both these assumptions can be resolved by loading dictionaries (see executables/data_products.py) | ||
| """ | ||
|
|
||
| def __init__(self, dictionaries: Dictionaries, binary_file_path: str, output_json_path: Optional[str] = None): | ||
| def __init__(self, dictionaries: Dictionaries, binary_file_path: str, disable_decompression: bool, output_json_path: Optional[str] = None): | ||
| """Initialize the decoder. | ||
|
|
||
| Args: | ||
|
|
@@ -96,6 +99,7 @@ | |
| """ | ||
| self.dictionaries = dictionaries | ||
| self.binary_file_path = binary_file_path | ||
| self.disable_decompression = disable_decompression | ||
| if output_json_path is None: | ||
| # Generate default output path if not provided as same path with .json extension | ||
| self.output_json_path = str(Path(binary_file_path).with_suffix('.json')) | ||
|
|
@@ -145,7 +149,7 @@ | |
| Raises: | ||
| RecordNotFoundError: If record ID not found | ||
| """ | ||
|
|
||
| # Query ConfigManager for record definition | ||
| record_template: DpRecordTemplate = self.dictionaries.dp_record_id.get(record_id) | ||
|
|
||
|
|
@@ -197,6 +201,46 @@ | |
|
|
||
| return record | ||
|
|
||
| def decode_records(self, r_io, data_size) -> List[Any]: | ||
| records = list() | ||
| position_at_start = r_io.tell() | ||
| while (r_io.tell() - position_at_start) < data_size: | ||
| # Read record ID | ||
| record_id_bin = r_io.read(ConfigManager().get_type("FwDpIdType").getSize()) | ||
| record_id_obj = ConfigManager().get_type("FwDpIdType")() | ||
| record_id_obj.deserialize(record_id_bin, 0) | ||
| record_id = record_id_obj.val | ||
|
|
||
| # decode the record | ||
| record = self.decode_record(r_io, record_id) | ||
| records.append(record) | ||
|
|
||
| return records | ||
|
|
||
| def is_compression_record(self, record): | ||
| return re.search("dpCompressProc.CompressionRecord$", record["Record"]["record_name"]) is not None | ||
|
|
||
| def decompress_records(self, records): | ||
| uncomp_bytes = bytearray() | ||
| CompressionMetadata = ConfigManager().get_type("Svc.CompressionMetadata") | ||
|
|
||
| for record in records: | ||
| if not self.is_compression_record(record): | ||
| # All records must be compressed, otherwise bail | ||
| return None | ||
|
|
||
| record_io = BytesIO(bytes(record["Data"]["values"])) | ||
|
|
||
| record_meta = CompressionMetadata() | ||
| record_meta_data = record_io.read(record_meta.getMaxSize()) | ||
| record_meta.deserialize(record_meta_data, 0) | ||
| if record_meta.val['algorithm'] == 'UNCOMPRESSED': | ||
| uncomp_bytes.extend(record_io.read()) | ||
| elif record_meta.val['algorithm'] == 'ZLIB_DEFLATE': | ||
| uncomp_bytes.extend(zlib.decompress(record_io.read())) | ||
|
Comment on lines
+237
to
+240
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on also adding an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. That makes sense |
||
|
|
||
| return uncomp_bytes | ||
|
|
||
| def decode(self) -> List[Dict[str, Any]]: | ||
| """decode the entire data product file. | ||
|
|
||
|
|
@@ -223,16 +267,7 @@ | |
| ##################### | ||
| data_size = header_json['DataSize']["value"] | ||
| position_at_start = f.tell() | ||
| while (f.tell() - position_at_start) < data_size: | ||
| # Read record ID | ||
| record_id_bin = f.read(ConfigManager().get_type("FwDpIdType").getSize()) | ||
| record_id_obj = ConfigManager().get_type("FwDpIdType")() | ||
| record_id_obj.deserialize(record_id_bin, 0) | ||
| record_id = record_id_obj.val | ||
|
|
||
| # decode the record | ||
| record = self.decode_record(f, record_id) | ||
| results["Records"].append(record) | ||
| results["Records"] = self.decode_records(f, data_size) | ||
|
|
||
| ##################### | ||
| # Validate checksum # | ||
|
|
@@ -250,6 +285,16 @@ | |
| if computed_crc != dp_crc.val: | ||
| raise CRCError("Data", dp_crc.val, computed_crc) | ||
|
|
||
| if not self.disable_decompression and self.is_compression_record(results["Records"][0]): | ||
|
|
||
|
|
||
| # Compressed records. Decompress and re-process | ||
| uncomp_bytes = self.decompress_records(results["Records"]) | ||
| if uncomp_bytes is not None: | ||
| uncomp_io = BytesIO(uncomp_bytes) | ||
| uncomp_records = self.decode_records(uncomp_io, len(uncomp_bytes)) | ||
| if uncomp_records is not None: | ||
| results["Records"] = uncomp_records | ||
|
Comment on lines
+288
to
+296
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not required, but would be appreciated (AI can write the test scripts if you provide test inputs/outputs)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I missed the test_decoder.py script. I'll add a test case for the compressed products |
||
|
|
||
| return results | ||
|
|
||
| def process(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk about that error message but this seems like a good check to add