diff --git a/src/fprime_gds/common/dp/decoder.py b/src/fprime_gds/common/dp/decoder.py index c76da11f..4603fe6f 100644 --- a/src/fprime_gds/common/dp/decoder.py +++ b/src/fprime_gds/common/dp/decoder.py @@ -19,6 +19,9 @@ from pathlib import Path from typing import Dict, List, Any, Optional import dataclasses +import re +from io import BytesIO +import zlib from fprime_gds.common.dp.common import ( ChecksumConfig, @@ -85,7 +88,7 @@ class DataProductDecoder: - both these assumptions can be resolved by loading dictionaries (see executables/data_products.py) """ - def __init__(self, dictionaries: Dictionaries, binary_file_path: str, output_json_path: Optional[str] = None): + def __init__(self, dictionaries: Dictionaries, binary_file_path: str, disable_decompression: bool, output_json_path: Optional[str] = None): """Initialize the decoder. Args: @@ -96,6 +99,7 @@ def __init__(self, dictionaries: Dictionaries, binary_file_path: str, output_jso """ self.dictionaries = dictionaries self.binary_file_path = binary_file_path + self.disable_decompression = disable_decompression if output_json_path is None: # Generate default output path if not provided as same path with .json extension self.output_json_path = str(Path(binary_file_path).with_suffix('.json')) @@ -145,7 +149,7 @@ def decode_record(self, file_handle, record_id: int) -> Dict[str, Any]: Raises: RecordNotFoundError: If record ID not found """ - + # Query ConfigManager for record definition record_template: DpRecordTemplate = self.dictionaries.dp_record_id.get(record_id) @@ -197,6 +201,46 @@ def read_element(element_type): return record + def decode_records(self, r_io, data_size) -> List[Any]: + records = list() + position_at_start = r_io.tell() + while (r_io.tell() - position_at_start) < data_size: + # Read record ID + record_id_bin = r_io.read(ConfigManager().get_type("FwDpIdType").getSize()) + record_id_obj = ConfigManager().get_type("FwDpIdType")() + record_id_obj.deserialize(record_id_bin, 0) + record_id = record_id_obj.val + + # decode the record + record = self.decode_record(r_io, record_id) + records.append(record) + + return records + + def is_compression_record(self, record): + return re.search("dpCompressProc.CompressionRecord$", record["Record"]["record_name"]) is not None + + def decompress_records(self, records): + uncomp_bytes = bytearray() + CompressionMetadata = ConfigManager().get_type("Svc.CompressionMetadata") + + for record in records: + if not self.is_compression_record(record): + # All records must be compressed, otherwise bail + return None + + record_io = BytesIO(bytes(record["Data"]["values"])) + + record_meta = CompressionMetadata() + record_meta_data = record_io.read(record_meta.getMaxSize()) + record_meta.deserialize(record_meta_data, 0) + if record_meta.val['algorithm'] == 'UNCOMPRESSED': + uncomp_bytes.extend(record_io.read()) + elif record_meta.val['algorithm'] == 'ZLIB_DEFLATE': + uncomp_bytes.extend(zlib.decompress(record_io.read())) + + return uncomp_bytes + def decode(self) -> List[Dict[str, Any]]: """decode the entire data product file. @@ -223,16 +267,7 @@ def decode(self) -> List[Dict[str, Any]]: ##################### data_size = header_json['DataSize']["value"] position_at_start = f.tell() - while (f.tell() - position_at_start) < data_size: - # Read record ID - record_id_bin = f.read(ConfigManager().get_type("FwDpIdType").getSize()) - record_id_obj = ConfigManager().get_type("FwDpIdType")() - record_id_obj.deserialize(record_id_bin, 0) - record_id = record_id_obj.val - - # decode the record - record = self.decode_record(f, record_id) - results["Records"].append(record) + results["Records"] = self.decode_records(f, data_size) ##################### # Validate checksum # @@ -250,6 +285,16 @@ def decode(self) -> List[Dict[str, Any]]: if computed_crc != dp_crc.val: raise CRCError("Data", dp_crc.val, computed_crc) + if not self.disable_decompression and self.is_compression_record(results["Records"][0]): + + # Compressed records. Decompress and re-process + uncomp_bytes = self.decompress_records(results["Records"]) + if uncomp_bytes is not None: + uncomp_io = BytesIO(uncomp_bytes) + uncomp_records = self.decode_records(uncomp_io, len(uncomp_bytes)) + if uncomp_records is not None: + results["Records"] = uncomp_records + return results def process(self): diff --git a/src/fprime_gds/executables/data_products.py b/src/fprime_gds/executables/data_products.py index bf8eee76..b2d5b80d 100644 --- a/src/fprime_gds/executables/data_products.py +++ b/src/fprime_gds/executables/data_products.py @@ -13,6 +13,7 @@ def main(): decode_parser.add_argument("-b", "--bin-file", required=True, help="Path to input data product binary file (.fdp)") decode_parser.add_argument("-d", "--dictionary", required=True, help="Path to F Prime JSON Dictionary") decode_parser.add_argument("-o", "--output", required=False, help="Path to output JSON file (defaults to .json)") + decode_parser.add_argument("-z", "--disable-decompression", action='store_true', help="Disable automatic decompression of data products") validate_parser = subcommands_parser.add_parser('validate', help='Validate a data product') validate_parser.add_argument("-b", "--bin-file", required=True, help="Path to input data product binary file (.fdp)") @@ -29,7 +30,7 @@ def main(): if args.command == "decode": assert args.dictionaries is not None, "Dictionaries must be loaded" - DataProductDecoder(args.dictionaries, args.bin_file, args.output).process() + DataProductDecoder(args.dictionaries, args.bin_file, args.disable_decompression, args.output).process() elif args.command == "validate": success = DataProductValidator(