Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ This section defines how metrics are stored.
* ``datastore.type`` (default: "in-memory"): If set to "in-memory" all metrics will be kept in memory while running the benchmark. If set to "elasticsearch" all metrics will instead be written to a persistent metrics store and the data are available for further analysis.
* ``sample.queue.size`` (default: 2^20): The number of metrics samples that can be stored in Rally's in-memory queue.
* ``metrics.request.downsample.factor`` (default: 1): Determines how many service time and latency samples should be kept in the metrics store. By default all values will be kept. To keep only e.g. every 100th sample, specify 100. This is useful to avoid overwhelming the metrics store in benchmarks with many clients (tens of thousands).
* ``metrics.request.throughput.window`` (default: false): If set to "true", Rally reports throughput per bucket window instead of averaging throughput over the full benchmark run.
* ``output.processingtime`` (default: false): If set to "true", Rally will show the additional metric :ref:`processing time <summary_report_processing_time>` in the command line report.

The following settings are applicable only if ``datastore.type`` is set to "elasticsearch":
Expand Down
35 changes: 27 additions & 8 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,13 @@ def prepare_benchmark(self, t):
self.challenge = select_challenge(self.config, self.track)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1))
windowed_throughput = convert.to_bool(
self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False)
)
self.metrics_store = metrics.metrics_store(cfg=self.config, track=self.track.name, challenge=self.challenge.name, read_only=False)

self.sample_post_processor = SamplePostprocessor(
self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data
self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data, windowed_throughput=windowed_throughput
)
Comment thread
gareth-ellis marked this conversation as resolved.
Comment thread
gareth-ellis marked this conversation as resolved.

es_clients = self.create_es_clients()
Expand Down Expand Up @@ -1044,13 +1047,14 @@ def post_process_samples(self):


class SamplePostprocessor:
def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data):
def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data, windowed_throughput=False):
self.logger = logging.getLogger(__name__)
self.metrics_store = metrics_store
self.track_meta_data = track_meta_data
self.challenge_meta_data = challenge_meta_data
self.throughput_calculator = ThroughputCalculator()
self.downsample_factor = downsample_factor
self.windowed_throughput = windowed_throughput

def __call__(self, raw_samples):
if len(raw_samples) == 0:
Expand Down Expand Up @@ -1127,7 +1131,7 @@ def __call__(self, raw_samples):
end = time.perf_counter()
self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
start = end
aggregates = self.throughput_calculator.calculate(raw_samples)
aggregates = self.throughput_calculator.calculate(raw_samples, windowed=self.windowed_throughput)
end = time.perf_counter()
self.logger.debug("Calculating throughput took [%f] seconds.", (end - start))
start = end
Expand Down Expand Up @@ -1645,11 +1649,18 @@ def __init__(self, bucket_interval, sample_type, start_time):
self.has_samples_in_sample_type = False
# start relative to the beginning of our (calculation) time slice.
self.start_time = start_time
self._prev_interval = 0
self._windowed_rate = None

@property
def throughput(self):
return self.total_count / self.interval

@property
def windowed_throughput(self):
"""Throughput based only on ops and time elapsed since the previous bucket boundary."""
return self._windowed_rate if self._windowed_rate is not None else self.throughput

Comment on lines +1659 to +1663

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid fallback to running average (self.throughput)? I think it would be better to return None and handle it at call sites, e.g. by skipping generation of throughput sample.

def maybe_update_sample_type(self, current_sample_type):
if self.sample_type < current_sample_type:
self.sample_type = current_sample_type
Expand All @@ -1665,6 +1676,10 @@ def can_add_final_throughput_sample(self):
return self.interval > 0 and not self.has_samples_in_sample_type

def finish_bucket(self, new_total):
delta_count = new_total - self.total_count
delta_interval = self.interval - self._prev_interval
self._windowed_rate = delta_count / delta_interval if delta_interval > 0 else None
self._prev_interval = self.interval
self.unprocessed = []
self.total_count = new_total
self.has_samples_in_sample_type = True
Expand All @@ -1673,12 +1688,14 @@ def finish_bucket(self, new_total):
def __init__(self):
self.task_stats = {}

def calculate(self, samples, bucket_interval_secs=1):
def calculate(self, samples, bucket_interval_secs=1, windowed=False):
"""
Calculates global throughput based on samples gathered from multiple load generators.

:param samples: A list containing all samples from all load generators.
:param bucket_interval_secs: The bucket interval for aggregations.
:param windowed: When True, each throughput sample reflects only the ops since the previous bucket
rather than the cumulative average since task start.
:return: A global view of throughput samples.
"""

Expand Down Expand Up @@ -1708,14 +1725,14 @@ def calculate(self, samples, bucket_interval_secs=1):
# only transform the values into the expected structure.
first_sample = current_samples[0]
if first_sample.throughput is None:
task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs)
task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs, windowed=windowed)
else:
task_throughput = self.map_task_throughput(current_samples)
global_throughput[task].extend(task_throughput)

return global_throughput

def calculate_task_throughput(self, task, current_samples, bucket_interval_secs):
def calculate_task_throughput(self, task, current_samples, bucket_interval_secs, windowed=False):
task_throughput = []

if task not in self.task_stats:
Expand Down Expand Up @@ -1745,12 +1762,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs)

if current.can_calculate_throughput():
current.finish_bucket(count)
rate = current.windowed_throughput if windowed else current.throughput
task_throughput.append(
(
sample.absolute_time,
sample.relative_time,
current.sample_type,
current.throughput,
rate,
# we calculate throughput per second
f"{sample.total_ops_unit}/s",
)
Expand All @@ -1762,12 +1780,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs)
# interval (mainly needed to ensure we show throughput data in test mode)
if last_sample is not None and current.can_add_final_throughput_sample():
current.finish_bucket(count)
rate = current.windowed_throughput if windowed else current.throughput
task_throughput.append(
(
last_sample.absolute_time,
last_sample.relative_time,
current.sample_type,
current.throughput,
rate,
f"{last_sample.total_ops_unit}/s",
)
)
Expand Down
1 change: 1 addition & 0 deletions esrally/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"master.nodes",
"metrics.log.dir",
"metrics.request.downsample.factor",
"metrics.request.throughput.window",
"metrics.url",
Comment thread
gareth-ellis marked this conversation as resolved.
"network.host",
"network.http.port",
Expand Down
31 changes: 31 additions & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,37 @@ def test_use_provided_throughput(self):
assert throughput[1] == (38596, 22, metrics.SampleType.Normal, 8000, "byte/s")
assert throughput[2] == (38597, 23, metrics.SampleType.Normal, 8000, "byte/s")

def test_windowed_throughput(self):
# Three one-second buckets: 1000 ops/s, 1000 ops/s, then a spike to 9000 ops/s.
# Windowed mode should report 9000 for the spike bucket; cumulative mode blends it down to ~3667.
op = track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source")

samples = [
driver.Sample(0, 100, 1, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 1, 1 / 3),
driver.Sample(0, 101, 2, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 2, 2 / 3),
driver.Sample(0, 102, 3, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 9000, "ops", 3, 3 / 3),
]

windowed = driver.ThroughputCalculator().calculate(samples, windowed=True)
cumulative = driver.ThroughputCalculator().calculate(samples, windowed=False)

assert op in windowed
assert op in cumulative

w = windowed[op]
c = cumulative[op]
assert len(w) == len(c) == 3

# first two buckets are identical in both modes (single-client, steady rate)
assert w[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s")
assert w[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s")
assert c[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s")
assert c[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s")

# spike bucket: windowed sees only this bucket's 9000 ops over 1s; cumulative blends all 11000 ops over 3s
assert w[2] == (102, 3, metrics.SampleType.Normal, 9000, "ops/s")
assert c[2] == (102, 3, metrics.SampleType.Normal, pytest.approx(11000 / 3), "ops/s")

def calculate_global_throughput(self, samples):
return driver.ThroughputCalculator().calculate(samples)

Expand Down
Loading