From 954ea4cf4a8ed1d923fbff77b1e3cbcb310c8cdf Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 20 May 2026 22:02:37 +0200 Subject: [PATCH 1/5] Add per window throughput --- esrally/driver/driver.py | 36 ++++++++++++++++++++++++++++-------- esrally/types.py | 1 + tests/driver/driver_test.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 64086c4c5..8f0d33c44 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -719,10 +719,11 @@ def prepare_benchmark(self, t): self.challenge = select_challenge(self.config, self.track) self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False) downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1)) + windowed_throughput = self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False) self.metrics_store = metrics.metrics_store(cfg=self.config, track=self.track.name, challenge=self.challenge.name, read_only=False) self.sample_post_processor = SamplePostprocessor( - self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data + self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data, windowed_throughput=windowed_throughput ) es_clients = self.create_es_clients() @@ -1044,13 +1045,14 @@ def post_process_samples(self): class SamplePostprocessor: - def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data): + def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data, windowed_throughput=False): self.logger = logging.getLogger(__name__) self.metrics_store = metrics_store self.track_meta_data = track_meta_data self.challenge_meta_data = challenge_meta_data self.throughput_calculator = ThroughputCalculator() self.downsample_factor = downsample_factor + self.windowed_throughput = windowed_throughput def __call__(self, raw_samples): if len(raw_samples) == 0: @@ -1127,7 +1129,7 @@ def __call__(self, raw_samples): end = time.perf_counter() self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start)) start = end - aggregates = self.throughput_calculator.calculate(raw_samples) + aggregates = self.throughput_calculator.calculate(raw_samples, windowed=self.windowed_throughput) end = time.perf_counter() self.logger.debug("Calculating throughput took [%f] seconds.", (end - start)) start = end @@ -1645,11 +1647,23 @@ def __init__(self, bucket_interval, sample_type, start_time): self.has_samples_in_sample_type = False # start relative to the beginning of our (calculation) time slice. self.start_time = start_time + # snapshot of total_count and interval at the last bucket boundary, used for windowed throughput + self.prev_total_count = 0 + self.prev_interval = 0 @property def throughput(self): return self.total_count / self.interval + @property + def windowed_throughput(self): + """Throughput based only on ops and time elapsed since the previous bucket boundary.""" + delta_count = self.total_count - self.prev_total_count + delta_interval = self.interval - self.prev_interval + if delta_interval <= 0: + return self.throughput + return delta_count / delta_interval + def maybe_update_sample_type(self, current_sample_type): if self.sample_type < current_sample_type: self.sample_type = current_sample_type @@ -1665,6 +1679,8 @@ def can_add_final_throughput_sample(self): return self.interval > 0 and not self.has_samples_in_sample_type def finish_bucket(self, new_total): + self.prev_total_count = self.total_count + self.prev_interval = self.interval self.unprocessed = [] self.total_count = new_total self.has_samples_in_sample_type = True @@ -1673,12 +1689,14 @@ def finish_bucket(self, new_total): def __init__(self): self.task_stats = {} - def calculate(self, samples, bucket_interval_secs=1): + def calculate(self, samples, bucket_interval_secs=1, windowed=False): """ Calculates global throughput based on samples gathered from multiple load generators. :param samples: A list containing all samples from all load generators. :param bucket_interval_secs: The bucket interval for aggregations. + :param windowed: When True, each throughput sample reflects only the ops since the previous bucket + rather than the cumulative average since task start. :return: A global view of throughput samples. """ @@ -1708,14 +1726,14 @@ def calculate(self, samples, bucket_interval_secs=1): # only transform the values into the expected structure. first_sample = current_samples[0] if first_sample.throughput is None: - task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs) + task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs, windowed=windowed) else: task_throughput = self.map_task_throughput(current_samples) global_throughput[task].extend(task_throughput) return global_throughput - def calculate_task_throughput(self, task, current_samples, bucket_interval_secs): + def calculate_task_throughput(self, task, current_samples, bucket_interval_secs, windowed=False): task_throughput = [] if task not in self.task_stats: @@ -1745,12 +1763,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) if current.can_calculate_throughput(): current.finish_bucket(count) + rate = current.windowed_throughput if windowed else current.throughput task_throughput.append( ( sample.absolute_time, sample.relative_time, current.sample_type, - current.throughput, + rate, # we calculate throughput per second f"{sample.total_ops_unit}/s", ) @@ -1762,12 +1781,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) # interval (mainly needed to ensure we show throughput data in test mode) if last_sample is not None and current.can_add_final_throughput_sample(): current.finish_bucket(count) + rate = current.windowed_throughput if windowed else current.throughput task_throughput.append( ( last_sample.absolute_time, last_sample.relative_time, current.sample_type, - current.throughput, + rate, f"{last_sample.total_ops_unit}/s", ) ) diff --git a/esrally/types.py b/esrally/types.py index e7b0b2334..bb3a1bd2c 100644 --- a/esrally/types.py +++ b/esrally/types.py @@ -108,6 +108,7 @@ "master.nodes", "metrics.log.dir", "metrics.request.downsample.factor", + "metrics.request.throughput.window", "metrics.url", "network.host", "network.http.port", diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 47c99afbc..66af1d1c7 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -984,6 +984,37 @@ def test_use_provided_throughput(self): assert throughput[1] == (38596, 22, metrics.SampleType.Normal, 8000, "byte/s") assert throughput[2] == (38597, 23, metrics.SampleType.Normal, 8000, "byte/s") + def test_windowed_throughput(self): + # Three one-second buckets: 1000 ops/s, 1000 ops/s, then a spike to 9000 ops/s. + # Windowed mode should report 9000 for the spike bucket; cumulative mode blends it down to ~3667. + op = track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source") + + samples = [ + driver.Sample(0, 100, 1, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 1, 1 / 3), + driver.Sample(0, 101, 2, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 2, 2 / 3), + driver.Sample(0, 102, 3, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 9000, "ops", 3, 3 / 3), + ] + + windowed = driver.ThroughputCalculator().calculate(samples, windowed=True) + cumulative = driver.ThroughputCalculator().calculate(samples, windowed=False) + + assert op in windowed + assert op in cumulative + + w = windowed[op] + c = cumulative[op] + assert len(w) == len(c) == 3 + + # first two buckets are identical in both modes (single-client, steady rate) + assert w[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s") + assert w[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s") + assert c[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s") + assert c[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s") + + # spike bucket: windowed sees only this bucket's 9000 ops over 1s; cumulative blends all 11000 ops over 3s + assert w[2] == (102, 3, metrics.SampleType.Normal, 9000, "ops/s") + assert c[2] == (102, 3, metrics.SampleType.Normal, pytest.approx(11000 / 3), "ops/s") + def calculate_global_throughput(self, samples): return driver.ThroughputCalculator().calculate(samples) From bbd87308b1ed7bc0ea140e83d10e30178136d8ff Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Wed, 20 May 2026 22:14:33 +0200 Subject: [PATCH 2/5] Update method --- esrally/driver/driver.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 8f0d33c44..5ab8661ce 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1647,9 +1647,10 @@ def __init__(self, bucket_interval, sample_type, start_time): self.has_samples_in_sample_type = False # start relative to the beginning of our (calculation) time slice. self.start_time = start_time - # snapshot of total_count and interval at the last bucket boundary, used for windowed throughput - self.prev_total_count = 0 - self.prev_interval = 0 + # used for windowed throughput: stored inside finish_bucket before snapshots are updated + self._prev_total_count = 0 + self._prev_interval = 0 + self._windowed_rate = None @property def throughput(self): @@ -1658,11 +1659,7 @@ def throughput(self): @property def windowed_throughput(self): """Throughput based only on ops and time elapsed since the previous bucket boundary.""" - delta_count = self.total_count - self.prev_total_count - delta_interval = self.interval - self.prev_interval - if delta_interval <= 0: - return self.throughput - return delta_count / delta_interval + return self._windowed_rate if self._windowed_rate is not None else self.throughput def maybe_update_sample_type(self, current_sample_type): if self.sample_type < current_sample_type: @@ -1679,8 +1676,11 @@ def can_add_final_throughput_sample(self): return self.interval > 0 and not self.has_samples_in_sample_type def finish_bucket(self, new_total): - self.prev_total_count = self.total_count - self.prev_interval = self.interval + delta_count = new_total - self.total_count + delta_interval = self.interval - self._prev_interval + self._windowed_rate = delta_count / delta_interval if delta_interval > 0 else None + self._prev_total_count = self.total_count + self._prev_interval = self.interval self.unprocessed = [] self.total_count = new_total self.has_samples_in_sample_type = True From 8db1cae5884f0c0e775ed9395f9d3cb19cfa5dc0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 20:42:43 +0000 Subject: [PATCH 3/5] Remove unused _prev_total_count from TaskStats Agent-Logs-Url: https://github.com/elastic/rally/sessions/e662b65c-f00b-4944-9e57-7dbbd48ca06b Co-authored-by: gareth-ellis <14981026+gareth-ellis@users.noreply.github.com> --- esrally/driver/driver.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 5ab8661ce..18421d5de 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -1647,8 +1647,6 @@ def __init__(self, bucket_interval, sample_type, start_time): self.has_samples_in_sample_type = False # start relative to the beginning of our (calculation) time slice. self.start_time = start_time - # used for windowed throughput: stored inside finish_bucket before snapshots are updated - self._prev_total_count = 0 self._prev_interval = 0 self._windowed_rate = None @@ -1679,7 +1677,6 @@ def finish_bucket(self, new_total): delta_count = new_total - self.total_count delta_interval = self.interval - self._prev_interval self._windowed_rate = delta_count / delta_interval if delta_interval > 0 else None - self._prev_total_count = self.total_count self._prev_interval = self.interval self.unprocessed = [] self.total_count = new_total From 6235fee4988bb6a1519be33328bf3c3ceb206804 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 20:43:44 +0000 Subject: [PATCH 4/5] Fix throughput window config parsing as boolean Agent-Logs-Url: https://github.com/elastic/rally/sessions/3c808134-d09e-4bd5-b77d-1f73c2100dac Co-authored-by: gareth-ellis <14981026+gareth-ellis@users.noreply.github.com> --- esrally/driver/driver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 18421d5de..0ef700dfa 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -719,7 +719,9 @@ def prepare_benchmark(self, t): self.challenge = select_challenge(self.config, self.track) self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False) downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1)) - windowed_throughput = self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False) + windowed_throughput = convert.to_bool( + self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False) + ) self.metrics_store = metrics.metrics_store(cfg=self.config, track=self.track.name, challenge=self.challenge.name, read_only=False) self.sample_post_processor = SamplePostprocessor( From 89e95737f23bf7b0f881bbb8d16441dc24a81300 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 25 May 2026 09:35:03 +0000 Subject: [PATCH 5/5] docs: document throughput window reporting setting Agent-Logs-Url: https://github.com/elastic/rally/sessions/8eb678d2-662e-4e40-9328-52c387fe5dba Co-authored-by: gareth-ellis <14981026+gareth-ellis@users.noreply.github.com> --- docs/configuration.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration.rst b/docs/configuration.rst index 6edfb19cf..6fb24285b 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -77,6 +77,7 @@ This section defines how metrics are stored. * ``datastore.type`` (default: "in-memory"): If set to "in-memory" all metrics will be kept in memory while running the benchmark. If set to "elasticsearch" all metrics will instead be written to a persistent metrics store and the data are available for further analysis. * ``sample.queue.size`` (default: 2^20): The number of metrics samples that can be stored in Rally's in-memory queue. * ``metrics.request.downsample.factor`` (default: 1): Determines how many service time and latency samples should be kept in the metrics store. By default all values will be kept. To keep only e.g. every 100th sample, specify 100. This is useful to avoid overwhelming the metrics store in benchmarks with many clients (tens of thousands). +* ``metrics.request.throughput.window`` (default: false): If set to "true", Rally reports throughput per bucket window instead of averaging throughput over the full benchmark run. * ``output.processingtime`` (default: false): If set to "true", Rally will show the additional metric :ref:`processing time ` in the command line report. The following settings are applicable only if ``datastore.type`` is set to "elasticsearch":