diff --git a/create-notice.sh b/create-notice.sh index 60d965d7b..202929106 100755 --- a/create-notice.sh +++ b/create-notice.sh @@ -70,6 +70,9 @@ function main { add_license "wheel" "https://raw.githubusercontent.com/pypa/wheel/main/LICENSE.txt" add_license "pip" "https://raw.githubusercontent.com/pypa/pip/main/LICENSE.txt" add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE" + # OTLP corpus preparation dependencies (otlp extra) + add_license "opentelemetry-proto" "https://raw.githubusercontent.com/open-telemetry/opentelemetry-python/main/LICENSE" + add_license "protobuf" "https://raw.githubusercontent.com/protocolbuffers/protobuf/main/LICENSE" # transitive dependencies # Jinja2 dependencies diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 64086c4c5..aab67b1b9 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -2129,16 +2129,19 @@ def _parse_headers(e): # Some runners return a raw response, causing the 'error' property to be a string literal of the bytes/BytesIO object, # we should avoid bubbling that up # e.g. ApiError(413, '<_io.BytesIO object at 0xffffaf146a70>') + # errors="replace" so binary response bodies (e.g. OTLP ingest returns binary protobuf + # error frames) don't crash the driver with UnicodeDecodeError. Undecodable bytes become + # U+FFFD in the logged/recorded message. if isinstance(e.body, bytes): # could be an empty body - if error_body := e.body.decode("utf-8"): + if error_body := e.body.decode("utf-8", errors="replace"): error_message = error_body else: # to be consistent with an empty 'e.error' error_message = str(None) elif isinstance(e.body, BytesIO): # could be an empty body - if error_body := e.body.read().decode("utf-8"): + if error_body := e.body.read().decode("utf-8", errors="replace"): error_message = error_body else: # to be consistent with an empty 'e.error' @@ -2146,17 +2149,17 @@ def _parse_headers(e): # fallback to 'error' property if the body isn't bytes/BytesIO else: if isinstance(e.error, bytes): - error_message = e.error.decode("utf-8") + error_message = e.error.decode("utf-8", errors="replace") elif isinstance(e.error, BytesIO): - error_message = e.error.read().decode("utf-8") + error_message = e.error.read().decode("utf-8", errors="replace") else: # if the 'error' is empty, we get back str(None) error_message = e.error if isinstance(e.info, bytes): - error_info = e.info.decode("utf-8") + error_info = e.info.decode("utf-8", errors="replace") elif isinstance(e.info, BytesIO): - error_info = e.info.read().decode("utf-8") + error_info = e.info.read().decode("utf-8", errors="replace") else: error_info = e.info diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 443704ca2..8ce87918c 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -340,6 +340,13 @@ def first_existing(root_dirs, f): return p return None + def first_existing_with_suffix(root_dirs, f, suffix): + for root_dir in root_dirs: + candidate = os.path.join(root_dir, f) + if os.path.exists(candidate + suffix): + return candidate + return None + for corpus in t.corpora: data_root = data_dir(cfg, t.name, corpus.name) for document_set in corpus.documents: @@ -347,7 +354,14 @@ def first_existing(root_dirs, f): if document_set.document_archive: document_set.document_archive = first_existing(data_root, document_set.document_archive) if document_set.document_file: - document_set.document_file = first_existing(data_root, document_set.document_file) + resolved = first_existing(data_root, document_set.document_file) + # For OTLP corpora, the hot path reads the .pb (and .pb.offset) — the source JSON is + # only needed during prepare-track when generating the .pb locally. If only the .pb + # has been downloaded, the JSON path won't exist; resolve document_file to the path + # the JSON *would* have so the derived ``.pb`` path is correct. + if resolved is None and document_set.is_otlp: + resolved = first_existing_with_suffix(data_root, document_set.document_file, ".pb") + document_set.document_file = resolved def is_simple_track_mode(cfg: types.Config): @@ -507,6 +521,14 @@ def on_prepare_track(self, track, data_root_dir) -> Generator[tuple[Callable, di "preparator": prep, "document_set": document_set, } + elif document_set.is_otlp: + yield prepare_otlp_document, { + "cfg": self.cfg, + "track": track, + "corpus": corpus, + "preparator": prep, + "document_set": document_set, + } def prepare_document(cfg: types.Config, track, corpus, preparator, document_set): @@ -519,6 +541,15 @@ def prepare_document(cfg: types.Config, track, corpus, preparator, document_set) preparator.prepare_document_set(document_set, data_root[1]) +def prepare_otlp_document(cfg: types.Config, track, corpus, preparator, document_set): + data_root = data_dir(cfg, track.name, corpus.name) + LOG.info("Resolved data root directory for OTLP corpus [%s] in track [%s] to [%s].", corpus.name, track.name, data_root) + if len(data_root) == 1: + preparator.prepare_otlp_document_set(document_set, data_root[0]) + elif not preparator.prepare_bundled_otlp_document_set(document_set, data_root[0]): + preparator.prepare_otlp_document_set(document_set, data_root[1]) + + class Decompressor: def decompress(self, archive_path, documents_path, uncompressed_size): @@ -751,6 +782,104 @@ def prepare_bundled_document_set(self, document_set, data_root): else: return False + def prepare_otlp_document_set(self, document_set, data_root): + """ + Prepares an OTLP binary protobuf corpus file locally. + + Strategy: + 1. If a valid .pb file already exists, nothing to do. + 2. Try downloading the .pb from the corpus base URL (avoids downloading the JSON source). + 3. Download and decompress the JSON source, then convert it to .pb locally. + + :param document_set: A document set with source_format == SOURCE_FORMAT_OTLP_PROTOBUF. + :param data_root: The data root directory for this document set. + """ + doc_path = os.path.join(data_root, document_set.document_file) + archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + pb_file = io.OtlpProtobufFile.for_source_file(doc_path) + + # 1. Valid .pb already present + if pb_file.is_valid(): + return + + # 2. Try downloading .pb directly — avoids downloading the larger JSON source + if document_set.base_url: + pb_path = doc_path + ".pb" + try: + self.downloader.download(document_set.base_url, pb_path) + if pb_file.is_valid(): + return + except exceptions.DataError: + pass # .pb not available remotely, fall through + + # 3. Ensure JSON source is available + while True: + if self.is_locally_available(doc_path) and self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): + break + if ( + archive_path + and self.is_locally_available(archive_path) + and self.has_expected_size(archive_path, document_set.compressed_size_in_bytes) + ): + self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + else: + if document_set.has_compressed_corpus(): + target_path = archive_path + expected_size = document_set.compressed_size_in_bytes + elif document_set.has_uncompressed_corpus(): + target_path = doc_path + expected_size = document_set.uncompressed_size_in_bytes + else: + raise exceptions.RallyAssertionError(f"Track {self.track_name} specifies documents but no corpus") + try: + self.downloader.download(document_set.base_url, target_path, expected_size) + except exceptions.DataError as e: + if e.message == "Cannot download data because no base URL is provided." and self.is_locally_available(target_path): + raise exceptions.DataError( + f"[{target_path}] is present but does not have the expected " + f"size of [{expected_size}] bytes and it cannot be downloaded " + f"because no base URL is provided." + ) from None + raise + + # 4. Convert JSON to .pb + pb_file.create() + + def prepare_bundled_otlp_document_set(self, document_set, data_root): + """ + Prepares a bundled OTLP document set (files in the same directory as the track). + + :return: True if the .pb file is ready, False if required files were not found locally. + """ + doc_path = os.path.join(data_root, document_set.document_file) + archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + pb_file = io.OtlpProtobufFile.for_source_file(doc_path) + + if pb_file.is_valid(): + return True + + if self.is_locally_available(doc_path): + if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): + pb_file.create() + return True + else: + raise exceptions.DataError( + f"[{doc_path}] is present but does not have the expected size " f"of [{document_set.uncompressed_size_in_bytes}] bytes." + ) + + if archive_path and self.is_locally_available(archive_path): + if self.has_expected_size(archive_path, document_set.compressed_size_in_bytes): + self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + pb_file.create() + return True + else: + raise exceptions.DataError( + f"[{archive_path}] is present but does not have " + f"the expected size of [{document_set.compressed_size_in_bytes}] bytes." + ) + + return False + class TemplateSource: """ @@ -1585,6 +1714,29 @@ def _create_corpora(self, corpora_specs, indices, data_streams): meta_data=doc_meta_data, ) corpus.documents.append(docs) + elif source_format == track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF: + source_file = self._r(doc_spec, "source-file") + if io.is_archive(source_file): + document_archive = source_file + document_file = io.splitext(source_file)[0] + else: + document_archive = None + document_file = source_file + num_docs = self._r(doc_spec, "document-count") + compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False) + uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False) + doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False) + docs = track.Documents( + source_format=source_format, + document_file=document_file, + document_archive=document_archive, + base_url=base_url, + number_of_documents=num_docs, + compressed_size_in_bytes=compressed_bytes, + uncompressed_size_in_bytes=uncompressed_bytes, + meta_data=doc_meta_data, + ) + corpus.documents.append(docs) else: self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name)) document_corpora.append(corpus) diff --git a/esrally/track/params.py b/esrally/track/params.py index 91c7816de..e957d55f1 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -719,6 +719,191 @@ def params(self): raise exceptions.RallyError("Do not use a BulkIndexParamSource without partitioning") +class OtlpParamSource(ParamSource): + """ + Parameter source for OTLP binary protobuf corpus files (source_format: otlp-proto). + + Reads pre-generated ExportMetricsServiceRequest records from a .pb file. + Supports multi-client partitioning via the companion .pb.offset index. + """ + + def __init__(self, track_obj, params, **kwargs): + super().__init__(track_obj, params, **kwargs) + self._partition_index = 0 + self._total_partitions = 1 + # Streaming state — generators can't be pickled, so they're created lazily on the first + # params() call inside the worker process (after Rally has done its actor pickling). + # The cursor and end_record bounds let us track progress without materializing records. + self._record_iter = None + self._cursor = 0 + self._partition_size: int | None = None + self.looped = params.get("looped", False) + + otlp_docs = self._find_otlp_docs() + if not otlp_docs: + requested = self._params.get("corpora") + if requested: + raise exceptions.InvalidSyntax( + f"No OTLP corpus matching 'corpora'={requested!r} found in track [{track_obj}]. " + f"Available corpora: {[c.name for c in self.track.corpora]}." + ) + raise exceptions.InvalidSyntax( + f"No OTLP corpus found in track [{track_obj}]. " + f"Add at least one document corpus with source_format={track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF!r}." + ) + # expose corpora so used_corpora() in loader.py includes OTLP corpora in the prepare-track phase + seen_names: set[str] = set() + self.corpora = [] + for corpus, _ in otlp_docs: + if corpus.name not in seen_names: + seen_names.add(corpus.name) + self.corpora.append(corpus) + # use the first matching document set + _, self._doc = otlp_docs[0] + + def _find_otlp_docs(self): + # honor the operation's "corpora" param so a track with multiple OTLP corpora can pick + # between them — without this we'd silently use whichever corpus comes first in track.corpora. + track_corpora_names = [corpus.name for corpus in self.track.corpora] + corpora_names = self._params.get("corpora", track_corpora_names) + if isinstance(corpora_names, str): + corpora_names = [corpora_names] + return [ + (corpus, doc) + for corpus in self.track.corpora + if corpus.name in corpora_names + for doc in corpus.documents + if doc.source_format == track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF + ] + + def partition(self, partition_index, total_partitions): + # pylint: disable=protected-access + # the "copy" is another OtlpParamSource instance, so accessing its private state is fine + copy = OtlpParamSource.__new__(OtlpParamSource) + copy.__dict__.update(self.__dict__) + copy._partition_index = partition_index + copy._total_partitions = total_partitions + copy._record_iter = None # streaming iterator created lazily on first params() call + copy._cursor = 0 + copy._partition_size = None + return copy + + def _total_records(self): + """ + Source of truth for partitioning: count records in the .pb file directly. The track's + document-count is only used as a fallback (e.g. when the .pb doesn't exist yet, such as + during initial track parsing — corpora preparation comes later). Trusting the track value + unconditionally is a footgun: if it doesn't match the actual .pb, most workers either get + empty partitions or seek past EOF and the benchmark silently finishes after only one + client does any work. + """ + if not hasattr(self, "_cached_total_records"): + pb_file = io.OtlpProtobufFile.for_source_file(self._doc.document_file) + actual = pb_file.count_records() + self._cached_total_records = actual if actual is not None else self._doc.number_of_documents + return self._cached_total_records + + def size(self): + """ + Return the partition size so Rally treats this as a finite (non-infinite) source. + Without this, the base class returns None → infinite=True → Rally defaults + iterations=1 and each worker runs exactly one operation. + """ + total_records = self._total_records() + if total_records <= 0: + return None + if self._total_partitions > 1: + records_per_partition = total_records / self._total_partitions + start_record = round(records_per_partition * self._partition_index) + end_record = round(records_per_partition * (self._partition_index + 1)) + return end_record - start_record + return total_records + + @property + def percent_completed(self): + """ + Fraction of this partition's records that have been yielded so far. + + Rally pulls this per-client and averages across all clients to render the [N% done] bar. + Without this property the bar stays stuck at 0% because the loop control falls into the + ``infinite`` branch (we don't set a time_period/iterations explicitly — the param source + itself terminates the task by raising StopIteration in non-looped mode). + + Returns ``None`` in looped mode because the cursor cycles back to 0 indefinitely, so a + cursor-based percentage is meaningless. The schedule's time_period / iterations bounds + progress instead in that case. + """ + if self.looped: + return None + partition_size = self.size() + if not partition_size: + return None + return min(self._cursor / partition_size, 1.0) + + def _open_iter(self): + """ + Create a new streaming iterator over this partition's records. We do NOT materialize them + into memory — for large corpora (e.g. 1 MB protobuf records, 6k records per partition) that + would consume 6+ GB per worker. The iterator holds an open file handle and yields one + record at a time; the file handle closes when the generator completes or is GC'd. + """ + if not self._doc.document_file: + raise exceptions.SystemSetupError( + f"OTLP corpus document_file is unset for [{self._doc}]. This usually means neither the " + "source .json nor the pre-built .pb was found in any data root after prepare-track. " + "Check that the prepare-track phase completed successfully (look for the " + "'Successfully downloaded binary protobuf file from ...' log line) and that the " + "data directory is the same as Rally is reading from." + ) + pb_file = io.OtlpProtobufFile.for_source_file(self._doc.document_file) + total_records = self._total_records() + + if total_records > 0 and self._total_partitions > 1: + records_per_partition = total_records / self._total_partitions + start_record = round(records_per_partition * self._partition_index) + end_record = round(records_per_partition * (self._partition_index + 1)) + else: + start_record = 0 + end_record = None + + # cache the partition size so percent_completed/size don't have to recompute + if self._partition_size is None: + self._partition_size = (end_record - start_record) if end_record is not None else total_records + + logger = logging.getLogger(__name__) + logger.info( + "OtlpParamSource partition %d/%d: total_records=%s start=%d end=%s (streaming, not preloading)", + self._partition_index, + self._total_partitions, + total_records, + start_record, + end_record, + ) + return pb_file.read_records(start_record, end_record) + + def params(self): + if self._record_iter is None: + self._record_iter = self._open_iter() + + try: + payload = next(self._record_iter) + except StopIteration: + if not self.looped: + raise + # restart the iterator from the start of this partition. If even the fresh iterator + # is empty (partition has no records at all), the StopIteration here propagates. + self._record_iter = self._open_iter() + self._cursor = 0 + payload = next(self._record_iter) + + self._cursor += 1 + + result = {"body": payload} + if "request-timeout" in self._params: + result["request-timeout"] = self._params["request-timeout"] + return result + + class PartitionBulkIndexParamSource: def __init__( self, @@ -1431,6 +1616,7 @@ def read_bulk(self): register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource) register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource) register_param_source_for_operation(track.OperationType.Downsample, DownsampleParamSource) +register_param_source_for_operation(track.OperationType.OtlpIngest, OtlpParamSource) # Also register by name, so users can use it too register_param_source_for_name("file-reader", BulkIndexParamSource) diff --git a/esrally/track/track.py b/esrally/track/track.py index 5d38af7a6..050797b14 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -181,6 +181,7 @@ def __eq__(self, other): class Documents: SOURCE_FORMAT_BULK = "bulk" + SOURCE_FORMAT_OTLP_PROTOBUF = "otlp-proto" def __init__( self, @@ -275,6 +276,10 @@ def number_of_lines(self): def is_bulk(self): return self.source_format == Documents.SOURCE_FORMAT_BULK + @property + def is_otlp(self): + return self.source_format == Documents.SOURCE_FORMAT_OTLP_PROTOBUF + def __str__(self): return "%s documents from %s" % (self.source_format, self.document_file) @@ -745,6 +750,7 @@ class OperationType(Enum): # this is classed the same as RawRequest, but could potentially be used to call endpoints that are blocked RunUntil = (58, AdminStatus.No, serverless.Status.Public) EnrichPolicy = (59, AdminStatus.Yes, serverless.Status.Public) + OtlpIngest = (60, AdminStatus.No, serverless.Status.Public) def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status): self.id = id @@ -884,6 +890,8 @@ def from_hyphenated_string(cls, v): return OperationType.RunUntil elif v == "enrich-policy": return OperationType.EnrichPolicy + elif v == "otlp-ingest": + return OperationType.OtlpIngest else: raise KeyError(f"No enum value for [{v}]") diff --git a/esrally/utils/io.py b/esrally/utils/io.py index 2a52c0719..a041fb6eb 100644 --- a/esrally/utils/io.py +++ b/esrally/utils/io.py @@ -23,11 +23,12 @@ import mmap import os import shutil +import struct import subprocess import sys import tarfile import zipfile -from collections.abc import Collection, Mapping, Sequence +from collections.abc import Collection, Iterator, Mapping, Sequence from types import TracebackType from typing import IO, Any, AnyStr, Callable, Generic, Literal, Optional, overload @@ -37,6 +38,7 @@ # but they are treated the same by mypy, so I'm not going to use conditional imports here from typing_extensions import Self +from esrally import exceptions from esrally.utils import console, net SUPPORTED_ARCHIVE_FORMATS = [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".zst"] @@ -705,6 +707,300 @@ def remove_file_offset_table(data_file_path: str) -> None: FileOffsetTable.remove(data_file_path) +class OtlpProtobufFile: + """ + Manages the binary protobuf corpus file derived from an OTLP JSON source. + + On-disk format: sequence of length-prefixed records — + 4-byte big-endian uint32 (payload length) + binary ExportMetricsServiceRequest bytes. + + A companion ``{pb_path}.offset`` file maps record numbers to byte offsets for efficient + multi-client partitioning, using the same ``record_number;byte_offset`` text format as + FileOffsetTable. One entry is written every OFFSET_SAMPLING_INTERVAL records. + """ + + OFFSET_SAMPLING_INTERVAL = 1000 + + def __init__(self, source_json_path: str, pb_path: str): + self.source_json_path = source_json_path + self.pb_path = pb_path + + def exists(self) -> bool: + return os.path.exists(self.pb_path) and os.path.getsize(self.pb_path) > 0 + + def is_valid(self) -> bool: + if not self.exists(): + return False + # if the source JSON is present, the .pb must be newer than it + if os.path.exists(self.source_json_path): + if os.path.getmtime(self.pb_path) < os.path.getmtime(self.source_json_path): + return False + return True + + def try_download_from_corpus_location(self, corpus_base_url: str | None) -> bool: + """ + Attempts to download the pre-built .pb file from the corpus URL. Also attempts to + download the companion .pb.offset file; if that's not available, partitioning will + still work by scanning the .pb from the start (just slightly slower on startup). + + :return: True if the .pb file was downloaded successfully, False otherwise. + """ + if not corpus_base_url: + return False + logger = logging.getLogger(__name__) + pb_name = os.path.basename(self.pb_path) + remote_url = f"{corpus_base_url.rstrip('/')}/{pb_name}" + logger.info("Attempting to download binary protobuf file from [%s]", remote_url) + os.makedirs(os.path.dirname(self.pb_path), exist_ok=True) + try: + net.download(remote_url, self.pb_path) + logger.info("Successfully downloaded binary protobuf file from [%s]", remote_url) + except Exception: + logger.debug("Could not download binary protobuf file from [%s]", remote_url) + return False + + # Best-effort: also fetch the offset index. Failure is non-fatal — read_records() + # falls back to scanning from the start of the .pb if .offset is missing. + offset_path = self.pb_path + ".offset" + offset_url = f"{corpus_base_url.rstrip('/')}/{os.path.basename(offset_path)}" + try: + net.download(offset_url, offset_path) + logger.info("Successfully downloaded offset index from [%s]", offset_url) + except Exception: + logger.debug("Could not download offset index from [%s] (will scan .pb directly)", offset_url) + + return True + + def create(self) -> int: + """ + Parse the source OTLP JSON file and write binary protobuf records to the .pb file, + also writing a companion .offset file for fast multi-client partitioning. + + :return: Total number of records written. + :raises exceptions.SystemSetupError: if opentelemetry-proto is not installed. + """ + # opentelemetry-proto is an optional dependency, only needed when preparing an OTLP corpus. + # pylint: disable=import-outside-toplevel + try: + from google.protobuf.json_format import Parse + from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( + ExportMetricsServiceRequest, + ) + except ImportError: + raise exceptions.SystemSetupError( + "The 'opentelemetry-proto' package is required to pre-process OTLP corpus files. " + "Install it with: pip install opentelemetry-proto" + ) + + offset_path = self.pb_path + ".offset" + record_count = 0 + byte_offset = 0 + with ( + open(self.source_json_path, encoding="utf-8") as src, + open(self.pb_path, "wb") as dst, + open(offset_path, "w", encoding="utf-8") as off, + ): + for line in src: + line = line.strip() + if not line: + continue + msg = Parse(line, ExportMetricsServiceRequest()) + payload = msg.SerializeToString() + length_bytes = struct.pack(">I", len(payload)) + if record_count % self.OFFSET_SAMPLING_INTERVAL == 0: + print(f"{record_count};{byte_offset}", file=off) + dst.write(length_bytes) + dst.write(payload) + byte_offset += 4 + len(payload) + record_count += 1 + return record_count + + def read_records(self, start_record: int, end_record: int | None = None) -> Iterator[bytes]: + """ + Generator yielding raw binary payloads from start_record up to (but not including) end_record. + Uses the companion .offset file to seek efficiently. + """ + seek_byte, records_to_skip = self._find_offset(start_record) + with open(self.pb_path, "rb") as f: + f.seek(seek_byte) + for _ in range(records_to_skip): + length_data = f.read(4) + if len(length_data) < 4: + return + (length,) = struct.unpack(">I", length_data) + f.seek(length, 1) + count = 0 + target = None if end_record is None else (end_record - start_record) + while target is None or count < target: + length_data = f.read(4) + if len(length_data) < 4: + return + (length,) = struct.unpack(">I", length_data) + payload = f.read(length) + if len(payload) < length: + return + count += 1 + yield payload + + def count_records(self) -> int | None: + """ + Return the exact number of records in the .pb file. Uses the companion .pb.offset index + (entries every OFFSET_SAMPLING_INTERVAL records) to jump near the end, then scans only the + final partial chunk by reading length prefixes — so this is fast even for multi-GB files. + + Side effect: if no .pb.offset is present, generates one while scanning the file. The + offset file lets subsequent workers (and future runs) seek directly to their partition's + start record instead of walking from byte 0 — a big win for high-index partitions on + multi-GB corpora. + + Returns ``None`` if the .pb file is missing. + """ + if not os.path.exists(self.pb_path): + return None + offset_path = self.pb_path + ".offset" + # if an offset file exists, use it to skip to the last sampled position before scanning the tail + last_record = 0 + last_byte = 0 + if os.path.exists(offset_path): + try: + with open(offset_path, encoding="utf-8") as f: + for line in f: + parts = line.strip().split(";") + if len(parts) != 2: + continue + try: + rec_num, byte_off = int(parts[0]), int(parts[1]) + except ValueError: + continue + if rec_num >= last_record: + last_record = rec_num + last_byte = byte_off + except OSError: + pass + + # If no offset file exists, generate one as we scan. Write to a temp file and atomically + # rename at the end so concurrent workers don't see a half-written file. If another worker + # beat us to the rename, that's fine — both versions of the file are byte-identical. + offset_tmp_path: str | None = None + offset_out: Optional[IO[str]] = None + if not os.path.exists(offset_path): + offset_tmp_path = f"{offset_path}.tmp.{os.getpid()}" + offset_out = open(offset_tmp_path, "w", encoding="utf-8", buffering=64 * 1024) + logging.getLogger(__name__).info("Generating %s while scanning .pb (one-time cost per machine).", offset_path) + + count = last_record + try: + with open(self.pb_path, "rb") as f: + f.seek(last_byte) + byte_offset = last_byte + while True: + if offset_out is not None and count % self.OFFSET_SAMPLING_INTERVAL == 0: + offset_out.write(f"{count};{byte_offset}\n") + header = f.read(4) + if len(header) < 4: + break + (length,) = struct.unpack(">I", header) + # skip the payload without reading it into memory + f.seek(length, 1) + byte_offset += 4 + length + count += 1 + except OSError: + if offset_out is not None and offset_tmp_path is not None: + offset_out.close() + try: + os.remove(offset_tmp_path) + except OSError: + pass + return None + + if offset_out is not None and offset_tmp_path is not None: + offset_out.close() + try: + os.replace(offset_tmp_path, offset_path) + except OSError: + # best-effort — if we can't rename (e.g. another worker beat us), clean up our temp + try: + os.remove(offset_tmp_path) + except OSError: + pass + return count + + def _find_offset(self, target_record: int) -> tuple[int, int]: + """Return (byte_offset, records_still_to_skip) for the sampled position closest to target_record.""" + offset_path = self.pb_path + ".offset" + if not os.path.exists(offset_path): + return 0, target_record + prior_byte = 0 + prior_remaining = target_record + try: + with open(offset_path, encoding="utf-8") as f: + for line in f: + parts = line.strip().split(";") + if len(parts) != 2: + continue + rec_num, byte_off = int(parts[0]), int(parts[1]) + if rec_num <= target_record: + prior_byte = byte_off + prior_remaining = target_record - rec_num + else: + break + except OSError: + pass + return prior_byte, prior_remaining + + @classmethod + def for_source_file(cls, source_json_path: str) -> "OtlpProtobufFile": + if not source_json_path: + raise ValueError( + "OtlpProtobufFile.for_source_file got an empty/None source path. " + "This usually means set_absolute_data_path could not resolve the corpus path: " + "neither the source .json nor the pre-built .pb was found in any data root. " + "Check that prepare-track ran successfully and the .pb is on disk where Rally expects it." + ) + return cls(source_json_path, f"{source_json_path}.pb") + + +def prepare_otlp_protobuf_file(source_json_path: str, corpus_base_url: str | None) -> int | None: + """ + Ensures a binary protobuf (.pb) file exists for the given OTLP JSON corpus. + + Strategy: + 1. If .pb is already valid locally, return None immediately. + 2. Try downloading the .pb from corpus_base_url (avoids downloading the larger JSON source). + 3. If JSON is present locally, convert it to .pb. + + Returns the record count if the .pb was created locally, or None if it already existed or + was downloaded. Returns None without creating the file if the JSON source is absent and the + download failed — the caller is responsible for ensuring the JSON is present if needed. + """ + pb_file = OtlpProtobufFile.for_source_file(source_json_path) + if pb_file.is_valid(): + return None + + if corpus_base_url: + console.info( + "Attempting to download binary protobuf file for [%s] ... " % os.path.basename(source_json_path), + end="", + flush=True, + ) + if pb_file.try_download_from_corpus_location(corpus_base_url) and pb_file.is_valid(): + console.println("[DOWNLOADED]") + return None + console.println("[NOT FOUND - will create locally]") + + if not os.path.exists(source_json_path): + return None + + console.info( + "Converting OTLP JSON to binary protobuf for [%s] ... " % os.path.basename(source_json_path), + end="", + flush=True, + ) + record_count = pb_file.create() + console.println("[OK]") + return record_count + + def skip_lines(data_file_path: str, data_file: IO[AnyStr], number_of_lines_to_skip: int) -> None: """ Skips the first `number_of_lines_to_skip` lines in `data_file` as a side effect. diff --git a/pyproject.toml b/pyproject.toml index 141c96302..d4d30b416 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,15 @@ dependencies = [ ] [project.optional-dependencies] +# These packages are required for pre-processing OTLP JSON corpus files into binary protobuf format. +# Only needed during corpus preparation (prepare-track phase), not during the benchmark hot path. +otlp = [ + # License: Apache 2.0 + "opentelemetry-proto==1.34.0", + # License: BSD + "protobuf>=5.29.0", +] + # These packages are required to download files from a private AWS S3 bucket s3 = [ # License: Apache 2.0 @@ -115,6 +124,9 @@ s3 = [ develop = [ # s3 "boto3==1.34.68", + # otlp — required for the OtlpProtobufFile / OtlpParamSource / OtlpIngest test suites + "opentelemetry-proto==1.34.0", + "protobuf>=5.29.0", # tests "ujson", "pytest==8.4.2", @@ -137,6 +149,7 @@ develop = [ "types-tabulate==0.8.9", "types-requests>=2.31.0.7,<3", "types-jsonschema==3.2.0", + "types-protobuf>=4.24,<8", # Python dead library removed in version 3.13 and used to build Rally docs. "standard-imghdr==3.13.0", ] diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index 4c2ce8753..613433b24 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -915,6 +915,35 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file assert prepare_file_offset_table.call_count == 0 +class TestOtlpDocumentPreparation: + """Tests for the OTLP-specific path in DocumentSetPreparator — specifically the compressed + .pb download support that mirrors the JSON corpus's archive compression.""" + + def _doc_set(self, *, archive=None, compressed_size=0): + return track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + document_file="metrics.otlp.json", + document_archive=archive, + number_of_documents=10, + base_url="http://example.com/otlp", + uncompressed_size_in_bytes=2000, + compressed_size_in_bytes=compressed_size, + ) + + def _preparator(self): + return loader.DocumentSetPreparator( + track_name="unit-test", + downloader=mock.MagicMock(spec=loader.Downloader), + decompressor=mock.MagicMock(spec=loader.Decompressor), + ) + + def test_skips_when_pb_already_valid(self): + p = self._preparator() + with mock.patch.object(io.OtlpProtobufFile, "is_valid", return_value=True): + p.prepare_otlp_document_set(self._doc_set(), data_root="/tmp") + p.downloader.download.assert_not_called() + + class TestTemplateSource: @mock.patch("esrally.utils.io.dirname") @mock.patch.object(loader.TemplateSource, "read_glob_files") @@ -1542,6 +1571,40 @@ def test_sets_absolute_path(self, path_exists): assert t.corpora[0].documents[0].document_file == "/data/unittest/docs/documents.json" assert t.corpora[0].documents[0].document_archive == "/data/unittest/docs/documents.json.bz2" + @mock.patch("os.path.exists") + def test_otlp_resolves_via_pb_when_json_missing(self, path_exists): + # OTLP corpora are special: the hot path only needs the .pb (and .pb.offset). If we + # downloaded just the .pb (no source JSON), document_file would otherwise stay None and + # the param source would later try to open "None.pb". + def fake_exists(path: str) -> bool: + # JSON doesn't exist locally; only the .pb does + return path.endswith(".pb") + + path_exists.side_effect = fake_exists + + cfg = config.Config() + cfg.add(config.Scope.application, "benchmarks", "local.dataset.cache", "/data") + + t = track.Track( + name="u", + corpora=[ + track.DocumentCorpus( + "otlp", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + document_file="metrics.otlp.json", + ) + ], + ) + ], + ) + + loader.set_absolute_data_path(cfg, t) + + # document_file points to where the JSON would be — so the derived .pb path is correct + assert t.corpora[0].documents[0].document_file == "/data/otlp/metrics.otlp.json" + class TestTrackFilter: def filter(self, track_specification, *, include_tasks=None, exclude_tasks=None): diff --git a/tests/track/params_test.py b/tests/track/params_test.py index bd598f217..b59c53863 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -3450,3 +3450,295 @@ def test_downsample_empty_params(self): assert p["fixed-interval"] == "1h" assert p["target-index"] == f"{p['source-index']}-{p['fixed-interval']}" assert p.get("sampling-method") is None + + +class TestOtlpParamSource: + """Tests for OtlpParamSource — covers partitioning, finite size signalling, and looping.""" + + _SAMPLE_OTLP_JSON_LINE = ( + '{"resourceMetrics":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"host-0"}}]},' + '"scopeMetrics":[{"scope":{"name":"hostmetrics"},"metrics":[' + '{"name":"system.cpu.utilization","gauge":{"dataPoints":[' + '{"timeUnixNano":"1700000000000000000","asDouble":0.42,' + '"attributes":[{"key":"cpu","value":{"stringValue":"0"}}]}]}}]}]}]}' + ) + + def _build_corpus(self, tmp_path, num_records, corpus_name="otlp-corpus"): + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join([self._SAMPLE_OTLP_JSON_LINE] * num_records) + "\n") + pb = io.OtlpProtobufFile.for_source_file(str(json_path)) + pb.create() + corpus = track.DocumentCorpus( + name=corpus_name, + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=num_records, + document_file=str(json_path), + ) + ], + ) + return corpus + + def test_selects_corpus_by_operation_param(self, tmp_path): + # two OTLP corpora — make sure the operation's `corpora` param picks the right one + d_a = tmp_path / "a" + d_b = tmp_path / "b" + d_a.mkdir() + d_b.mkdir() + corpus_a = self._build_corpus(d_a, num_records=3, corpus_name="corpus-60m") + corpus_b = self._build_corpus(d_b, num_records=5, corpus_name="corpus-270m") + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus_a, corpus_b]), + params={"corpora": "corpus-270m"}, + ) + # _doc must come from corpus-270m, not the first-listed one (corpus-60m) + assert source._doc.number_of_documents == 5 + # corpora exposed to prepare-track only contains the selected corpus + assert [c.name for c in source.corpora] == ["corpus-270m"] + + def test_selects_first_corpus_when_no_param(self, tmp_path): + # legacy behaviour — without an explicit `corpora` param we fall back to all matching corpora + d_a = tmp_path / "a" + d_b = tmp_path / "b" + d_a.mkdir() + d_b.mkdir() + corpus_a = self._build_corpus(d_a, num_records=3, corpus_name="corpus-60m") + corpus_b = self._build_corpus(d_b, num_records=5, corpus_name="corpus-270m") + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus_a, corpus_b]), + params={}, + ) + # both corpora are visible to prepare-track… + assert [c.name for c in source.corpora] == ["corpus-60m", "corpus-270m"] + # …and we pick the first one as the active document set + assert source._doc.number_of_documents == 3 + + def test_raises_when_corpora_param_doesnt_match(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=3, corpus_name="corpus-60m") + with pytest.raises(exceptions.InvalidSyntax) as exc: + params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"corpora": "corpus-270m"}, + ) + assert "corpus-270m" in exc.value.args[0] + assert "corpus-60m" in exc.value.args[0] + + def test_raises_when_no_otlp_corpus(self): + bulk_corpus = track.DocumentCorpus( + name="bulk-only", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_BULK, + number_of_documents=10, + ) + ], + ) + with pytest.raises(exceptions.InvalidSyntax) as exc: + params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[bulk_corpus]), + params={}, + ) + assert "No OTLP corpus" in exc.value.args[0] + + def test_exposes_corpora_for_prepare_track(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=3) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + # used_corpora() in loader.py checks for this attribute + assert source.corpora == [corpus] + + def test_deduplicates_corpora_by_name(self, tmp_path): + # one corpus with two OTLP document sets — should appear only once + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text(self._SAMPLE_OTLP_JSON_LINE + "\n") + io.OtlpProtobufFile.for_source_file(str(json_path)).create() + corpus = track.DocumentCorpus( + name="otlp-corpus", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=1, + document_file=str(json_path), + ), + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=1, + document_file=str(json_path), + ), + ], + ) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + assert source.corpora == [corpus] + + def test_size_uses_actual_pb_count_not_document_count(self, tmp_path): + # Critical correctness test: if the track's document-count doesn't match the actual .pb, + # we MUST use the actual count or partitioning silently breaks (most workers seek past EOF + # or get empty partitions, and only one client ends up doing any work). + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join([self._SAMPLE_OTLP_JSON_LINE] * 100) + "\n") + io.OtlpProtobufFile.for_source_file(str(json_path)).create() + # track claims 10 documents but the .pb actually has 100 + corpus = track.DocumentCorpus( + name="otlp-corpus", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=10, + document_file=str(json_path), + ) + ], + ) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 4) + # Partition size should be derived from the ACTUAL 100 records, not the (wrong) 10 + assert p.size() == 25 + # And all 100 records should actually be reachable across 4 partitions + sizes = [source.partition(i, 4).size() for i in range(4)] + assert sum(sizes) == 100 + + def test_size_returns_finite_value_not_none(self, tmp_path): + # critical: size() must NOT be None, otherwise Rally treats us as infinite and defaults iterations=1 + corpus = self._build_corpus(tmp_path, num_records=10) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + assert source.size() == 10 + assert source.infinite is False + + def test_size_accounts_for_partition(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=100) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + # 100 records / 8 partitions = 12 or 13 per partition (depending on rounding) + sizes = [source.partition(i, 8).size() for i in range(8)] + assert sum(sizes) == 100 + # each worker should have at least 12, at most 13 + assert all(12 <= s <= 13 for s in sizes) + + def test_partition_returns_separate_instances(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=8) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p0 = source.partition(0, 4) + p1 = source.partition(1, 4) + # different instances with different state + assert p0 is not p1 + assert p0._partition_index == 0 + assert p1._partition_index == 1 + # but shared (deduplicated) reference to the underlying document set + assert p0._doc is p1._doc + + def test_params_yields_full_corpus_across_partitions(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=8) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + all_bodies = [] + for i in range(4): + p = source.partition(i, 4) + while True: + try: + all_bodies.append(p.params()["body"]) + except StopIteration: + break + # 8 records total across 4 partitions + assert len(all_bodies) == 8 + + def test_params_raises_stop_iteration_when_exhausted(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=2) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + p.params() + p.params() + with pytest.raises(StopIteration): + p.params() + + def test_params_loops_when_looped_true(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=2) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"looped": True}, + ) + p = source.partition(0, 1) + # take 5 records from a 2-record corpus + bodies = [p.params()["body"] for _ in range(5)] + # should cycle through the 2 records + assert bodies[0] == bodies[2] == bodies[4] + assert bodies[1] == bodies[3] + + def test_percent_completed_progresses_with_cursor(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=10) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + assert p.percent_completed == 0.0 # before any params() call + for i in range(1, 11): + p.params() + assert p.percent_completed == i / 10 + + def test_percent_completed_is_none_when_looped(self, tmp_path): + # in looped mode the cursor cycles back to 0 — Rally must fall back to time/iteration + # based progress, so we return None to avoid misleading numbers. + corpus = self._build_corpus(tmp_path, num_records=3) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"looped": True}, + ) + p = source.partition(0, 1) + assert p.percent_completed is None + for _ in range(7): + p.params() + assert p.percent_completed is None + + def test_params_propagates_request_timeout(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"request-timeout": 30}, + ) + p = source.partition(0, 1) + result = p.params() + assert result["request-timeout"] == 30 + assert "body" in result + + def test_params_body_is_non_empty_bytes(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + result = p.params() + assert isinstance(result["body"], bytes) + assert len(result["body"]) > 0 + + def test_registered_for_otlp_ingest_operation(self, tmp_path): + # ensure Rally picks up our class for the otlp-ingest operation type + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.param_source_for_operation( + track.OperationType.OtlpIngest.to_hyphenated_string(), + track.Track(name="unit-test", corpora=[corpus]), + params={}, + task_name="unit-test-task", + ) + assert isinstance(source, params.OtlpParamSource) diff --git a/tests/utils/io_test.py b/tests/utils/io_test.py index d5cdd4557..904a4ba52 100644 --- a/tests/utils/io_test.py +++ b/tests/utils/io_test.py @@ -292,3 +292,223 @@ def test_returns_none_when_valid_offset_already_exists(self, tmp_path): mock_dl.assert_not_called() assert result is None + + +class TestOtlpProtobufFile: # pylint: disable=too-many-public-methods + """Tests for OTLP JSON → length-prefixed binary protobuf conversion + read-back.""" + + SAMPLE_OTLP_JSON_LINE = ( + '{"resourceMetrics":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"host-0"}}]},' + '"scopeMetrics":[{"scope":{"name":"hostmetrics"},"metrics":[' + '{"name":"system.cpu.utilization","gauge":{"dataPoints":[' + '{"timeUnixNano":"1700000000000000000","asDouble":0.42,' + '"attributes":[{"key":"cpu","value":{"stringValue":"0"}}]}]}}]}]}]}' + ) + + def _write_json_lines(self, tmp_path, lines): + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join(lines) + "\n") + return str(json_path) + + def test_for_source_file_derives_pb_path(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.source_json_path == json_path + assert pb.pb_path == json_path + ".pb" + + def test_exists_false_when_pb_missing(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.exists() is False + + def test_exists_false_when_pb_empty(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + with open(pb.pb_path, "wb"): + pass + assert pb.exists() is False + + def test_is_valid_rejects_pb_older_than_source(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + # write a non-empty .pb but with an older mtime than the source + with open(pb.pb_path, "wb") as f: + f.write(b"\x00\x00\x00\x01x") + json_mtime = os.path.getmtime(json_path) + os.utime(pb.pb_path, (json_mtime - 10, json_mtime - 10)) + assert pb.is_valid() is False + + def test_create_then_read_round_trip(self, tmp_path): + # write 3 identical lines so we get 3 distinct records back + lines = [self.SAMPLE_OTLP_JSON_LINE] * 3 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + record_count = pb.create() + + assert record_count == 3 + assert pb.is_valid() is True + # offset index is created alongside + assert os.path.exists(pb.pb_path + ".offset") + # no .count file (we rely on the track's document-count) + assert not os.path.exists(pb.pb_path + ".count") + + records = list(pb.read_records(0, None)) + assert len(records) == 3 + # every record should round-trip identically + assert all(len(r) > 0 for r in records) + assert records[0] == records[1] == records[2] + + def test_create_blank_lines_are_skipped(self, tmp_path): + json_path = self._write_json_lines(tmp_path, ["", self.SAMPLE_OTLP_JSON_LINE, "", self.SAMPLE_OTLP_JSON_LINE, ""]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.create() == 2 + + def test_count_records_returns_none_when_pb_missing(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.count_records() is None + + def test_count_records_uses_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile.OFFSET_SAMPLING_INTERVAL * 2 + 137) + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + assert pb.count_records() == len(lines) + + def test_count_records_generates_offset_index_when_missing(self, tmp_path): + # If the .pb was downloaded without its offset, count_records should regenerate the offset + # on the fly so subsequent runs/partitions can seek efficiently. + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile.OFFSET_SAMPLING_INTERVAL + 17) + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + # delete the offset file as if only the .pb was downloaded + os.remove(pb.pb_path + ".offset") + + # call count_records — it should produce an offset file as a side effect + assert pb.count_records() == len(lines) + assert os.path.exists(pb.pb_path + ".offset") + + # confirm the generated offset file is valid: subsequent reads partition correctly + records_via_offset = list(pb.read_records(io.OtlpProtobufFile.OFFSET_SAMPLING_INTERVAL, None)) + assert len(records_via_offset) == 17 + + def test_count_records_works_without_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 50 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + os.remove(pb.pb_path + ".offset") + assert pb.count_records() == 50 + + def test_read_records_respects_partition_range(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 8 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + + # 4 partitions across 8 records + slice0 = list(pb.read_records(0, 2)) + slice1 = list(pb.read_records(2, 4)) + slice2 = list(pb.read_records(4, 6)) + slice3 = list(pb.read_records(6, 8)) + assert [len(s) for s in (slice0, slice1, slice2, slice3)] == [2, 2, 2, 2] + # rejoined slices should equal the full read + full = list(pb.read_records(0, None)) + assert slice0 + slice1 + slice2 + slice3 == full + + def test_read_records_handles_start_past_end(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 3 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + # seeking past the end returns no records (does not error) + assert list(pb.read_records(100, 200)) == [] + + def test_read_records_falls_back_without_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 5 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + + # remove the offset index — read should still work by scanning from the start + os.remove(pb.pb_path + ".offset") + records = list(pb.read_records(2, 4)) + assert len(records) == 2 + + def test_try_download_returns_false_without_base_url(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.try_download_from_corpus_location(None) is False + + def test_try_download_pb_and_offset(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + downloaded = [] + + def fake_download(url, dest, **kwargs): + downloaded.append(url) + with open(dest, "wb") as f: + f.write(b"\x00") + + with mock.patch("esrally.utils.net.download", side_effect=fake_download): + assert pb.try_download_from_corpus_location("http://example.com/corpus/") is True + + # both .pb and .pb.offset should have been attempted, with trailing slash stripped + assert downloaded == [ + "http://example.com/corpus/metrics.otlp.json.pb", + "http://example.com/corpus/metrics.otlp.json.pb.offset", + ] + + def test_try_download_offset_failure_is_non_fatal(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + def fake_download(url, dest, **kwargs): + if url.endswith(".offset"): + raise RuntimeError("not found") + with open(dest, "wb") as f: + f.write(b"\x00") + + with mock.patch("esrally.utils.net.download", side_effect=fake_download): + # .pb succeeds, .offset fails → overall result is still True + assert pb.try_download_from_corpus_location("http://example.com/corpus") is True + assert os.path.exists(pb.pb_path) + assert not os.path.exists(pb.pb_path + ".offset") + + def test_try_download_pb_failure_returns_false(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + assert pb.try_download_from_corpus_location("http://example.com/corpus") is False + + def test_prepare_skips_when_pb_already_valid(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE] * 3) + # pre-create the .pb so it's already valid + io.OtlpProtobufFile.for_source_file(json_path).create() + + with mock.patch("esrally.utils.net.download") as mock_dl: + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + mock_dl.assert_not_called() + assert result is None + + def test_prepare_falls_back_to_local_when_download_fails(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE] * 2) + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + assert result == 2 + + def test_prepare_returns_none_when_no_local_json_and_download_fails(self, tmp_path): + # source JSON does not exist + json_path = str(tmp_path / "missing.otlp.json") + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + assert result is None diff --git a/uv.lock b/uv.lock index 737913cae..cf726871d 100644 --- a/uv.lock +++ b/uv.lock @@ -652,7 +652,9 @@ develop = [ { name = "github3-py" }, { name = "gitpython" }, { name = "mypy" }, + { name = "opentelemetry-proto" }, { name = "pre-commit" }, + { name = "protobuf" }, { name = "pylint" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -662,11 +664,16 @@ develop = [ { name = "standard-imghdr" }, { name = "trustme" }, { name = "types-jsonschema" }, + { name = "types-protobuf" }, { name = "types-psutil" }, { name = "types-requests" }, { name = "types-tabulate" }, { name = "ujson" }, ] +otlp = [ + { name = "opentelemetry-proto" }, + { name = "protobuf" }, +] s3 = [ { name = "boto3" }, ] @@ -702,8 +709,12 @@ requires-dist = [ { name = "jsonschema", specifier = "==3.1.1" }, { name = "markupsafe", specifier = "==2.0.1" }, { name = "mypy", marker = "extra == 'develop'", specifier = "==1.15.0" }, + { name = "opentelemetry-proto", marker = "extra == 'develop'", specifier = "==1.34.0" }, + { name = "opentelemetry-proto", marker = "extra == 'otlp'", specifier = "==1.34.0" }, { name = "pip", specifier = "==26.1" }, { name = "pre-commit", marker = "extra == 'develop'", specifier = "==2.20.0" }, + { name = "protobuf", marker = "extra == 'develop'", specifier = ">=5.29.0" }, + { name = "protobuf", marker = "extra == 'otlp'", specifier = ">=5.29.0" }, { name = "psutil", specifier = "==5.9.4" }, { name = "py-cpuinfo", specifier = "==7.0.0" }, { name = "pylint", marker = "extra == 'develop'", specifier = "==3.3.8" }, @@ -719,6 +730,7 @@ requires-dist = [ { name = "thespian", specifier = "==4.0.1" }, { name = "trustme", marker = "extra == 'develop'", specifier = "==0.9.0" }, { name = "types-jsonschema", marker = "extra == 'develop'", specifier = "==3.2.0" }, + { name = "types-protobuf", marker = "extra == 'develop'", specifier = ">=4.24,<8" }, { name = "types-psutil", marker = "extra == 'develop'", specifier = "==5.9.4" }, { name = "types-requests", marker = "extra == 'develop'", specifier = ">=2.31.0.7,<3" }, { name = "types-tabulate", marker = "extra == 'develop'", specifier = "==0.8.9" }, @@ -729,7 +741,7 @@ requires-dist = [ { name = "yappi", specifier = "==1.6.10" }, { name = "zstandard", specifier = "==0.25.0" }, ] -provides-extras = ["develop", "s3"] +provides-extras = ["develop", "otlp", "s3"] [package.metadata.requires-dev] dev = [{ name = "esrally", extras = ["develop"], editable = "." }] @@ -1493,6 +1505,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/1d/1b658dbd2b9fa9c4c9f32accbfc0205d532c8c6194dc0f2a4c0428e7128a/nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9", size = 22314, upload-time = "2024-06-04T18:44:08.352Z" }, ] +[[package]] +name = "opentelemetry-proto" +version = "1.34.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/19/45adb533d0a34990942d12eefb2077d59b22958940c71484a45e694f5dd7/opentelemetry_proto-1.34.0.tar.gz", hash = "sha256:73e40509b692630a47192888424f7e0b8fb19d9ecf2f04e6f708170cd3346dfe", size = 34343, upload-time = "2025-06-04T13:31:35.695Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/db/58/708881f5ad3c72954caa61ac970d3c01209dbebf5e534fb840dfb777bad2/opentelemetry_proto-1.34.0-py3-none-any.whl", hash = "sha256:ffb1f1b27552fda5a1cd581e34243cc0b6f134fb14c1c2a33cc3b4b208c9bf97", size = 55691, upload-time = "2025-06-04T13:31:20.333Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1656,6 +1680,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/35/cc0aaecf278bb4575b8555f2b137de5ab821595ddae9da9d3cd1da4072c7/propcache-0.3.2-py3-none-any.whl", hash = "sha256:98f1ec44fb675f5052cccc8e609c46ed23a35a1cfd18545ad4e29002d858a43f", size = 12663, upload-time = "2025-06-09T22:56:04.484Z" }, ] +[[package]] +name = "protobuf" +version = "5.29.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/57/394a763c103e0edf87f0938dafcd918d53b4c011dfc5c8ae80f3b0452dbb/protobuf-5.29.6.tar.gz", hash = "sha256:da9ee6a5424b6b30fd5e45c5ea663aef540ca95f9ad99d1e887e819cdf9b8723", size = 425623, upload-time = "2026-02-04T22:54:40.584Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/88/9ee58ff7863c479d6f8346686d4636dd4c415b0cbeed7a6a7d0617639c2a/protobuf-5.29.6-cp310-abi3-win32.whl", hash = "sha256:62e8a3114992c7c647bce37dcc93647575fc52d50e48de30c6fcb28a6a291eb1", size = 423357, upload-time = "2026-02-04T22:54:25.805Z" }, + { url = "https://files.pythonhosted.org/packages/1c/66/2dc736a4d576847134fb6d80bd995c569b13cdc7b815d669050bf0ce2d2c/protobuf-5.29.6-cp310-abi3-win_amd64.whl", hash = "sha256:7e6ad413275be172f67fdee0f43484b6de5a904cc1c3ea9804cb6fe2ff366eda", size = 435175, upload-time = "2026-02-04T22:54:28.592Z" }, + { url = "https://files.pythonhosted.org/packages/06/db/49b05966fd208ae3f44dcd33837b6243b4915c57561d730a43f881f24dea/protobuf-5.29.6-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:b5a169e664b4057183a34bdc424540e86eea47560f3c123a0d64de4e137f9269", size = 418619, upload-time = "2026-02-04T22:54:30.266Z" }, + { url = "https://files.pythonhosted.org/packages/b7/d7/48cbf6b0c3c39761e47a99cb483405f0fde2be22cf00d71ef316ce52b458/protobuf-5.29.6-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:a8866b2cff111f0f863c1b3b9e7572dc7eaea23a7fae27f6fc613304046483e6", size = 320284, upload-time = "2026-02-04T22:54:31.782Z" }, + { url = "https://files.pythonhosted.org/packages/e3/dd/cadd6ec43069247d91f6345fa7a0d2858bef6af366dbd7ba8f05d2c77d3b/protobuf-5.29.6-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:e3387f44798ac1106af0233c04fb8abf543772ff241169946f698b3a9a3d3ab9", size = 320478, upload-time = "2026-02-04T22:54:32.909Z" }, + { url = "https://files.pythonhosted.org/packages/5a/cb/e3065b447186cb70aa65acc70c86baf482d82bf75625bf5a2c4f6919c6a3/protobuf-5.29.6-py3-none-any.whl", hash = "sha256:6b9edb641441b2da9fa8f428760fc136a49cf97a52076010cf22a2ff73438a86", size = 173126, upload-time = "2026-02-04T22:54:39.462Z" }, +] + [[package]] name = "psutil" version = "5.9.4" @@ -2312,6 +2350,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/f0/7558cd331217021e37975f4f0fb4f58548971d002c6401d5012194fd781b/types_jsonschema-3.2.0-py3-none-any.whl", hash = "sha256:4a8f2e87aa7001361b4c3666565f8684f0e016517228396ac1bffd397d8c3fd0", size = 6619, upload-time = "2021-07-15T18:19:34.203Z" }, ] +[[package]] +name = "types-protobuf" +version = "7.34.1.20260518" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/29/59/e2b13b499d15e6720150c4b1a8d91e31fcacf716b432397475b3151ff7e4/types_protobuf-7.34.1.20260518.tar.gz", hash = "sha256:28cfaded25889cb83ebfb63cfb0a43628f0b6f3785767bec17287dc6468795f2", size = 68936, upload-time = "2026-05-18T06:01:47.332Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/1f/ec5caf72c2e3b688ca3927e0979a04ddad19e1afc4bf1c199bd743e0f419/types_protobuf-7.34.1.20260518-py3-none-any.whl", hash = "sha256:a0a5337413347166439c0e07cbc26c6164d091401c6f01b1dfd8cdb966c4dd8f", size = 85992, upload-time = "2026-05-18T06:01:45.696Z" }, +] + [[package]] name = "types-psutil" version = "5.9.4"