Skip to content
69 changes: 57 additions & 12 deletions src/fprime_gds/common/dp/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from pathlib import Path
from typing import Dict, List, Any, Optional
import dataclasses
import re
from io import BytesIO
import zlib

from fprime_gds.common.dp.common import (
ChecksumConfig,
Expand Down Expand Up @@ -85,7 +88,7 @@
- both these assumptions can be resolved by loading dictionaries (see executables/data_products.py)
"""

def __init__(self, dictionaries: Dictionaries, binary_file_path: str, output_json_path: Optional[str] = None):
def __init__(self, dictionaries: Dictionaries, binary_file_path: str, disable_decompression: bool, output_json_path: Optional[str] = None):
"""Initialize the decoder.

Args:
Expand All @@ -96,6 +99,7 @@
"""
self.dictionaries = dictionaries
self.binary_file_path = binary_file_path
self.disable_decompression = disable_decompression
if output_json_path is None:
# Generate default output path if not provided as same path with .json extension
self.output_json_path = str(Path(binary_file_path).with_suffix('.json'))
Expand Down Expand Up @@ -145,7 +149,7 @@
Raises:
RecordNotFoundError: If record ID not found
"""

# Query ConfigManager for record definition
record_template: DpRecordTemplate = self.dictionaries.dp_record_id.get(record_id)

Expand Down Expand Up @@ -197,6 +201,46 @@

return record

def decode_records(self, r_io, data_size) -> List[Any]:
records = list()
position_at_start = r_io.tell()
while (r_io.tell() - position_at_start) < data_size:
# Read record ID
record_id_bin = r_io.read(ConfigManager().get_type("FwDpIdType").getSize())
record_id_obj = ConfigManager().get_type("FwDpIdType")()
record_id_obj.deserialize(record_id_bin, 0)
record_id = record_id_obj.val
Comment on lines +207 to +212

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk about that error message but this seems like a good check to add


# decode the record
record = self.decode_record(r_io, record_id)
records.append(record)

return records

def is_compression_record(self, record):
return re.search("dpCompressProc.CompressionRecord$", record["Record"]["record_name"]) is not None

def decompress_records(self, records):
uncomp_bytes = bytearray()

Check failure on line 224 in src/fprime_gds/common/dp/decoder.py

View workflow job for this annotation

GitHub Actions / Spell checking

`uncomp` is not a recognized word (unrecognized-spelling)
CompressionMetadata = ConfigManager().get_type("Svc.CompressionMetadata")

for record in records:
if not self.is_compression_record(record):
# All records must be compressed, otherwise bail
return None

record_io = BytesIO(bytes(record["Data"]["values"]))

record_meta = CompressionMetadata()
record_meta_data = record_io.read(record_meta.getMaxSize())
record_meta.deserialize(record_meta_data, 0)
if record_meta.val['algorithm'] == 'UNCOMPRESSED':
uncomp_bytes.extend(record_io.read())

Check failure on line 238 in src/fprime_gds/common/dp/decoder.py

View workflow job for this annotation

GitHub Actions / Spell checking

`uncomp` is not a recognized word (unrecognized-spelling)
elif record_meta.val['algorithm'] == 'ZLIB_DEFLATE':
uncomp_bytes.extend(zlib.decompress(record_io.read()))

Check failure on line 240 in src/fprime_gds/common/dp/decoder.py

View workflow job for this annotation

GitHub Actions / Spell checking

`uncomp` is not a recognized word (unrecognized-spelling)
Comment on lines +237 to +240

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on also adding an else case to log an INFO/WARNING if algorithm is neither of the two supported ones? Later we could even make it a GDS plugin

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That makes sense


return uncomp_bytes

Check failure on line 242 in src/fprime_gds/common/dp/decoder.py

View workflow job for this annotation

GitHub Actions / Spell checking

`uncomp` is not a recognized word (unrecognized-spelling)

def decode(self) -> List[Dict[str, Any]]:
"""decode the entire data product file.

Expand All @@ -223,16 +267,7 @@
#####################
data_size = header_json['DataSize']["value"]
position_at_start = f.tell()
while (f.tell() - position_at_start) < data_size:
# Read record ID
record_id_bin = f.read(ConfigManager().get_type("FwDpIdType").getSize())
record_id_obj = ConfigManager().get_type("FwDpIdType")()
record_id_obj.deserialize(record_id_bin, 0)
record_id = record_id_obj.val

# decode the record
record = self.decode_record(f, record_id)
results["Records"].append(record)
results["Records"] = self.decode_records(f, data_size)

#####################
# Validate checksum #
Expand All @@ -250,6 +285,16 @@
if computed_crc != dp_crc.val:
raise CRCError("Data", dp_crc.val, computed_crc)

if not self.disable_decompression and self.is_compression_record(results["Records"][0]):

# Compressed records. Decompress and re-process
uncomp_bytes = self.decompress_records(results["Records"])

Check failure on line 291 in src/fprime_gds/common/dp/decoder.py

View workflow job for this annotation

GitHub Actions / Spell checking

`uncomp` is not a recognized word (unrecognized-spelling)
if uncomp_bytes is not None:
uncomp_io = BytesIO(uncomp_bytes)
uncomp_records = self.decode_records(uncomp_io, len(uncomp_bytes))
if uncomp_records is not None:
results["Records"] = uncomp_records
Comment on lines +288 to +296

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required, but would be appreciated (AI can write the test scripts if you provide test inputs/outputs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I missed the test_decoder.py script. I'll add a test case for the compressed products


return results

def process(self):
Expand Down
3 changes: 2 additions & 1 deletion src/fprime_gds/executables/data_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def main():
decode_parser.add_argument("-b", "--bin-file", required=True, help="Path to input data product binary file (.fdp)")
decode_parser.add_argument("-d", "--dictionary", required=True, help="Path to F Prime JSON Dictionary")
decode_parser.add_argument("-o", "--output", required=False, help="Path to output JSON file (defaults to <binFilename>.json)")
decode_parser.add_argument("-z", "--disable-decompression", action='store_true', help="Disable automatic decompression of data products")

validate_parser = subcommands_parser.add_parser('validate', help='Validate a data product')
validate_parser.add_argument("-b", "--bin-file", required=True, help="Path to input data product binary file (.fdp)")
Expand All @@ -29,7 +30,7 @@ def main():

if args.command == "decode":
assert args.dictionaries is not None, "Dictionaries must be loaded"
DataProductDecoder(args.dictionaries, args.bin_file, args.output).process()
DataProductDecoder(args.dictionaries, args.bin_file, args.disable_decompression, args.output).process()

elif args.command == "validate":
success = DataProductValidator(
Expand Down
Loading