Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ function main {
add_license "wheel" "https://raw.githubusercontent.com/pypa/wheel/main/LICENSE.txt"
add_license "pip" "https://raw.githubusercontent.com/pypa/pip/main/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
# OTLP corpus preparation dependencies (otlp extra)
add_license "opentelemetry-proto" "https://raw.githubusercontent.com/open-telemetry/opentelemetry-python/main/LICENSE"
add_license "protobuf" "https://raw.githubusercontent.com/protocolbuffers/protobuf/main/LICENSE"

# transitive dependencies
# Jinja2 dependencies
Expand Down
15 changes: 9 additions & 6 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2129,34 +2129,37 @@ def _parse_headers(e):
# Some runners return a raw response, causing the 'error' property to be a string literal of the bytes/BytesIO object,
# we should avoid bubbling that up
# e.g. ApiError(413, '<_io.BytesIO object at 0xffffaf146a70>')
# errors="replace" so binary response bodies (e.g. OTLP ingest returns binary protobuf
# error frames) don't crash the driver with UnicodeDecodeError. Undecodable bytes become
# U+FFFD in the logged/recorded message.
if isinstance(e.body, bytes):
# could be an empty body
if error_body := e.body.decode("utf-8"):
if error_body := e.body.decode("utf-8", errors="replace"):
error_message = error_body
else:
# to be consistent with an empty 'e.error'
error_message = str(None)
elif isinstance(e.body, BytesIO):
# could be an empty body
if error_body := e.body.read().decode("utf-8"):
if error_body := e.body.read().decode("utf-8", errors="replace"):
error_message = error_body
else:
# to be consistent with an empty 'e.error'
error_message = str(None)
# fallback to 'error' property if the body isn't bytes/BytesIO
else:
if isinstance(e.error, bytes):
error_message = e.error.decode("utf-8")
error_message = e.error.decode("utf-8", errors="replace")
elif isinstance(e.error, BytesIO):
error_message = e.error.read().decode("utf-8")
error_message = e.error.read().decode("utf-8", errors="replace")
else:
# if the 'error' is empty, we get back str(None)
error_message = e.error

if isinstance(e.info, bytes):
error_info = e.info.decode("utf-8")
error_info = e.info.decode("utf-8", errors="replace")
elif isinstance(e.info, BytesIO):
error_info = e.info.read().decode("utf-8")
error_info = e.info.read().decode("utf-8", errors="replace")
else:
error_info = e.info

Expand Down
154 changes: 153 additions & 1 deletion esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,28 @@ def first_existing(root_dirs, f):
return p
return None

def first_existing_with_suffix(root_dirs, f, suffix):
for root_dir in root_dirs:
candidate = os.path.join(root_dir, f)
if os.path.exists(candidate + suffix):
return candidate
return None

for corpus in t.corpora:
data_root = data_dir(cfg, t.name, corpus.name)
for document_set in corpus.documents:
# At this point we can assume that the file is available locally. Check which path exists and set it.
if document_set.document_archive:
document_set.document_archive = first_existing(data_root, document_set.document_archive)
if document_set.document_file:
document_set.document_file = first_existing(data_root, document_set.document_file)
resolved = first_existing(data_root, document_set.document_file)
# For OTLP corpora, the hot path reads the .pb (and .pb.offset) — the source JSON is
# only needed during prepare-track when generating the .pb locally. If only the .pb
# has been downloaded, the JSON path won't exist; resolve document_file to the path
# the JSON *would* have so the derived ``.pb`` path is correct.
if resolved is None and document_set.is_otlp:
resolved = first_existing_with_suffix(data_root, document_set.document_file, ".pb")
document_set.document_file = resolved


def is_simple_track_mode(cfg: types.Config):
Expand Down Expand Up @@ -507,6 +521,14 @@ def on_prepare_track(self, track, data_root_dir) -> Generator[tuple[Callable, di
"preparator": prep,
"document_set": document_set,
}
elif document_set.is_otlp:
yield prepare_otlp_document, {
"cfg": self.cfg,
"track": track,
"corpus": corpus,
"preparator": prep,
"document_set": document_set,
}


def prepare_document(cfg: types.Config, track, corpus, preparator, document_set):
Expand All @@ -519,6 +541,15 @@ def prepare_document(cfg: types.Config, track, corpus, preparator, document_set)
preparator.prepare_document_set(document_set, data_root[1])


def prepare_otlp_document(cfg: types.Config, track, corpus, preparator, document_set):
data_root = data_dir(cfg, track.name, corpus.name)
LOG.info("Resolved data root directory for OTLP corpus [%s] in track [%s] to [%s].", corpus.name, track.name, data_root)
if len(data_root) == 1:
preparator.prepare_otlp_document_set(document_set, data_root[0])
elif not preparator.prepare_bundled_otlp_document_set(document_set, data_root[0]):
preparator.prepare_otlp_document_set(document_set, data_root[1])


class Decompressor:

def decompress(self, archive_path, documents_path, uncompressed_size):
Expand Down Expand Up @@ -751,6 +782,104 @@ def prepare_bundled_document_set(self, document_set, data_root):
else:
return False

def prepare_otlp_document_set(self, document_set, data_root):
"""
Prepares an OTLP binary protobuf corpus file locally.

Strategy:
1. If a valid .pb file already exists, nothing to do.
2. Try downloading the .pb from the corpus base URL (avoids downloading the JSON source).
3. Download and decompress the JSON source, then convert it to .pb locally.

:param document_set: A document set with source_format == SOURCE_FORMAT_OTLP_PROTOBUF.
:param data_root: The data root directory for this document set.
"""
doc_path = os.path.join(data_root, document_set.document_file)
archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None
pb_file = io.OtlpProtobufFile.for_source_file(doc_path)

# 1. Valid .pb already present
if pb_file.is_valid():
return

# 2. Try downloading .pb directly — avoids downloading the larger JSON source
if document_set.base_url:
pb_path = doc_path + ".pb"
try:
self.downloader.download(document_set.base_url, pb_path)
if pb_file.is_valid():
return
except exceptions.DataError:
pass # .pb not available remotely, fall through

# 3. Ensure JSON source is available
while True:
if self.is_locally_available(doc_path) and self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes):
break
if (
archive_path
and self.is_locally_available(archive_path)
and self.has_expected_size(archive_path, document_set.compressed_size_in_bytes)
):
self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes)
else:
if document_set.has_compressed_corpus():
target_path = archive_path
expected_size = document_set.compressed_size_in_bytes
elif document_set.has_uncompressed_corpus():
target_path = doc_path
expected_size = document_set.uncompressed_size_in_bytes
else:
raise exceptions.RallyAssertionError(f"Track {self.track_name} specifies documents but no corpus")
try:
self.downloader.download(document_set.base_url, target_path, expected_size)
except exceptions.DataError as e:
if e.message == "Cannot download data because no base URL is provided." and self.is_locally_available(target_path):
raise exceptions.DataError(
f"[{target_path}] is present but does not have the expected "
f"size of [{expected_size}] bytes and it cannot be downloaded "
f"because no base URL is provided."
) from None
raise

# 4. Convert JSON to .pb
pb_file.create()

def prepare_bundled_otlp_document_set(self, document_set, data_root):
"""
Prepares a bundled OTLP document set (files in the same directory as the track).

:return: True if the .pb file is ready, False if required files were not found locally.
"""
doc_path = os.path.join(data_root, document_set.document_file)
archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None
pb_file = io.OtlpProtobufFile.for_source_file(doc_path)

if pb_file.is_valid():
return True

if self.is_locally_available(doc_path):
if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes):
pb_file.create()
return True
else:
raise exceptions.DataError(
f"[{doc_path}] is present but does not have the expected size " f"of [{document_set.uncompressed_size_in_bytes}] bytes."
)

if archive_path and self.is_locally_available(archive_path):
if self.has_expected_size(archive_path, document_set.compressed_size_in_bytes):
self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes)
pb_file.create()
return True
else:
raise exceptions.DataError(
f"[{archive_path}] is present but does not have "
f"the expected size of [{document_set.compressed_size_in_bytes}] bytes."
)

return False


class TemplateSource:
"""
Expand Down Expand Up @@ -1585,6 +1714,29 @@ def _create_corpora(self, corpora_specs, indices, data_streams):
meta_data=doc_meta_data,
)
corpus.documents.append(docs)
elif source_format == track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF:
source_file = self._r(doc_spec, "source-file")
if io.is_archive(source_file):
document_archive = source_file
document_file = io.splitext(source_file)[0]
else:
document_archive = None
document_file = source_file
num_docs = self._r(doc_spec, "document-count")
compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False)
uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False)
doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False)
docs = track.Documents(
source_format=source_format,
document_file=document_file,
document_archive=document_archive,
base_url=base_url,
number_of_documents=num_docs,
compressed_size_in_bytes=compressed_bytes,
uncompressed_size_in_bytes=uncompressed_bytes,
meta_data=doc_meta_data,
)
corpus.documents.append(docs)
else:
self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name))
document_corpora.append(corpus)
Expand Down
Loading
Loading