diff --git a/docs/configuration.rst b/docs/configuration.rst index 6edfb19cf..6fb24285b 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -77,6 +77,7 @@ This section defines how metrics are stored. * ``datastore.type`` (default: "in-memory"): If set to "in-memory" all metrics will be kept in memory while running the benchmark. If set to "elasticsearch" all metrics will instead be written to a persistent metrics store and the data are available for further analysis. * ``sample.queue.size`` (default: 2^20): The number of metrics samples that can be stored in Rally's in-memory queue. * ``metrics.request.downsample.factor`` (default: 1): Determines how many service time and latency samples should be kept in the metrics store. By default all values will be kept. To keep only e.g. every 100th sample, specify 100. This is useful to avoid overwhelming the metrics store in benchmarks with many clients (tens of thousands). +* ``metrics.request.throughput.window`` (default: false): If set to "true", Rally reports throughput per bucket window instead of averaging throughput over the full benchmark run. * ``output.processingtime`` (default: false): If set to "true", Rally will show the additional metric :ref:`processing time ` in the command line report. The following settings are applicable only if ``datastore.type`` is set to "elasticsearch": diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 64086c4c5..0ef700dfa 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -719,10 +719,13 @@ def prepare_benchmark(self, t): self.challenge = select_challenge(self.config, self.track) self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False) downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1)) + windowed_throughput = convert.to_bool( + self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False) + ) self.metrics_store = metrics.metrics_store(cfg=self.config, track=self.track.name, challenge=self.challenge.name, read_only=False) self.sample_post_processor = SamplePostprocessor( - self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data + self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data, windowed_throughput=windowed_throughput ) es_clients = self.create_es_clients() @@ -1044,13 +1047,14 @@ def post_process_samples(self): class SamplePostprocessor: - def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data): + def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data, windowed_throughput=False): self.logger = logging.getLogger(__name__) self.metrics_store = metrics_store self.track_meta_data = track_meta_data self.challenge_meta_data = challenge_meta_data self.throughput_calculator = ThroughputCalculator() self.downsample_factor = downsample_factor + self.windowed_throughput = windowed_throughput def __call__(self, raw_samples): if len(raw_samples) == 0: @@ -1127,7 +1131,7 @@ def __call__(self, raw_samples): end = time.perf_counter() self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start)) start = end - aggregates = self.throughput_calculator.calculate(raw_samples) + aggregates = self.throughput_calculator.calculate(raw_samples, windowed=self.windowed_throughput) end = time.perf_counter() self.logger.debug("Calculating throughput took [%f] seconds.", (end - start)) start = end @@ -1645,11 +1649,18 @@ def __init__(self, bucket_interval, sample_type, start_time): self.has_samples_in_sample_type = False # start relative to the beginning of our (calculation) time slice. self.start_time = start_time + self._prev_interval = 0 + self._windowed_rate = None @property def throughput(self): return self.total_count / self.interval + @property + def windowed_throughput(self): + """Throughput based only on ops and time elapsed since the previous bucket boundary.""" + return self._windowed_rate if self._windowed_rate is not None else self.throughput + def maybe_update_sample_type(self, current_sample_type): if self.sample_type < current_sample_type: self.sample_type = current_sample_type @@ -1665,6 +1676,10 @@ def can_add_final_throughput_sample(self): return self.interval > 0 and not self.has_samples_in_sample_type def finish_bucket(self, new_total): + delta_count = new_total - self.total_count + delta_interval = self.interval - self._prev_interval + self._windowed_rate = delta_count / delta_interval if delta_interval > 0 else None + self._prev_interval = self.interval self.unprocessed = [] self.total_count = new_total self.has_samples_in_sample_type = True @@ -1673,12 +1688,14 @@ def finish_bucket(self, new_total): def __init__(self): self.task_stats = {} - def calculate(self, samples, bucket_interval_secs=1): + def calculate(self, samples, bucket_interval_secs=1, windowed=False): """ Calculates global throughput based on samples gathered from multiple load generators. :param samples: A list containing all samples from all load generators. :param bucket_interval_secs: The bucket interval for aggregations. + :param windowed: When True, each throughput sample reflects only the ops since the previous bucket + rather than the cumulative average since task start. :return: A global view of throughput samples. """ @@ -1708,14 +1725,14 @@ def calculate(self, samples, bucket_interval_secs=1): # only transform the values into the expected structure. first_sample = current_samples[0] if first_sample.throughput is None: - task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs) + task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs, windowed=windowed) else: task_throughput = self.map_task_throughput(current_samples) global_throughput[task].extend(task_throughput) return global_throughput - def calculate_task_throughput(self, task, current_samples, bucket_interval_secs): + def calculate_task_throughput(self, task, current_samples, bucket_interval_secs, windowed=False): task_throughput = [] if task not in self.task_stats: @@ -1745,12 +1762,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) if current.can_calculate_throughput(): current.finish_bucket(count) + rate = current.windowed_throughput if windowed else current.throughput task_throughput.append( ( sample.absolute_time, sample.relative_time, current.sample_type, - current.throughput, + rate, # we calculate throughput per second f"{sample.total_ops_unit}/s", ) @@ -1762,12 +1780,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) # interval (mainly needed to ensure we show throughput data in test mode) if last_sample is not None and current.can_add_final_throughput_sample(): current.finish_bucket(count) + rate = current.windowed_throughput if windowed else current.throughput task_throughput.append( ( last_sample.absolute_time, last_sample.relative_time, current.sample_type, - current.throughput, + rate, f"{last_sample.total_ops_unit}/s", ) ) diff --git a/esrally/types.py b/esrally/types.py index e7b0b2334..bb3a1bd2c 100644 --- a/esrally/types.py +++ b/esrally/types.py @@ -108,6 +108,7 @@ "master.nodes", "metrics.log.dir", "metrics.request.downsample.factor", + "metrics.request.throughput.window", "metrics.url", "network.host", "network.http.port", diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index 47c99afbc..66af1d1c7 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -984,6 +984,37 @@ def test_use_provided_throughput(self): assert throughput[1] == (38596, 22, metrics.SampleType.Normal, 8000, "byte/s") assert throughput[2] == (38597, 23, metrics.SampleType.Normal, 8000, "byte/s") + def test_windowed_throughput(self): + # Three one-second buckets: 1000 ops/s, 1000 ops/s, then a spike to 9000 ops/s. + # Windowed mode should report 9000 for the spike bucket; cumulative mode blends it down to ~3667. + op = track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source") + + samples = [ + driver.Sample(0, 100, 1, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 1, 1 / 3), + driver.Sample(0, 101, 2, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 2, 2 / 3), + driver.Sample(0, 102, 3, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 9000, "ops", 3, 3 / 3), + ] + + windowed = driver.ThroughputCalculator().calculate(samples, windowed=True) + cumulative = driver.ThroughputCalculator().calculate(samples, windowed=False) + + assert op in windowed + assert op in cumulative + + w = windowed[op] + c = cumulative[op] + assert len(w) == len(c) == 3 + + # first two buckets are identical in both modes (single-client, steady rate) + assert w[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s") + assert w[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s") + assert c[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s") + assert c[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s") + + # spike bucket: windowed sees only this bucket's 9000 ops over 1s; cumulative blends all 11000 ops over 3s + assert w[2] == (102, 3, metrics.SampleType.Normal, 9000, "ops/s") + assert c[2] == (102, 3, metrics.SampleType.Normal, pytest.approx(11000 / 3), "ops/s") + def calculate_global_throughput(self, samples): return driver.ThroughputCalculator().calculate(samples)