Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,11 @@ def prepare_benchmark(self, t):
self.challenge = select_challenge(self.config, self.track)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
downsample_factor = int(self.config.opts("reporting", "metrics.request.downsample.factor", mandatory=False, default_value=1))
windowed_throughput = self.config.opts("reporting", "metrics.request.throughput.window", mandatory=False, default_value=False)
self.metrics_store = metrics.metrics_store(cfg=self.config, track=self.track.name, challenge=self.challenge.name, read_only=False)

self.sample_post_processor = SamplePostprocessor(
self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data
self.metrics_store, downsample_factor, self.track.meta_data, self.challenge.meta_data, windowed_throughput=windowed_throughput
)
Comment thread
gareth-ellis marked this conversation as resolved.
Comment thread
gareth-ellis marked this conversation as resolved.

es_clients = self.create_es_clients()
Expand Down Expand Up @@ -1044,13 +1045,14 @@ def post_process_samples(self):


class SamplePostprocessor:
def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data):
def __init__(self, metrics_store, downsample_factor, track_meta_data, challenge_meta_data, windowed_throughput=False):
self.logger = logging.getLogger(__name__)
self.metrics_store = metrics_store
self.track_meta_data = track_meta_data
self.challenge_meta_data = challenge_meta_data
self.throughput_calculator = ThroughputCalculator()
self.downsample_factor = downsample_factor
self.windowed_throughput = windowed_throughput

def __call__(self, raw_samples):
if len(raw_samples) == 0:
Expand Down Expand Up @@ -1127,7 +1129,7 @@ def __call__(self, raw_samples):
end = time.perf_counter()
self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start))
start = end
aggregates = self.throughput_calculator.calculate(raw_samples)
aggregates = self.throughput_calculator.calculate(raw_samples, windowed=self.windowed_throughput)
end = time.perf_counter()
self.logger.debug("Calculating throughput took [%f] seconds.", (end - start))
start = end
Expand Down Expand Up @@ -1645,11 +1647,23 @@ def __init__(self, bucket_interval, sample_type, start_time):
self.has_samples_in_sample_type = False
# start relative to the beginning of our (calculation) time slice.
self.start_time = start_time
# snapshot of total_count and interval at the last bucket boundary, used for windowed throughput
self.prev_total_count = 0
self.prev_interval = 0

@property
def throughput(self):
return self.total_count / self.interval

@property
def windowed_throughput(self):
"""Throughput based only on ops and time elapsed since the previous bucket boundary."""
delta_count = self.total_count - self.prev_total_count
delta_interval = self.interval - self.prev_interval
if delta_interval <= 0:
return self.throughput
return delta_count / delta_interval

Comment on lines +1659 to +1663

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid fallback to running average (self.throughput)? I think it would be better to return None and handle it at call sites, e.g. by skipping generation of throughput sample.

def maybe_update_sample_type(self, current_sample_type):
if self.sample_type < current_sample_type:
self.sample_type = current_sample_type
Expand All @@ -1665,6 +1679,8 @@ def can_add_final_throughput_sample(self):
return self.interval > 0 and not self.has_samples_in_sample_type

def finish_bucket(self, new_total):
self.prev_total_count = self.total_count
self.prev_interval = self.interval
self.unprocessed = []
self.total_count = new_total
self.has_samples_in_sample_type = True
Expand All @@ -1673,12 +1689,14 @@ def finish_bucket(self, new_total):
def __init__(self):
self.task_stats = {}

def calculate(self, samples, bucket_interval_secs=1):
def calculate(self, samples, bucket_interval_secs=1, windowed=False):
"""
Calculates global throughput based on samples gathered from multiple load generators.

:param samples: A list containing all samples from all load generators.
:param bucket_interval_secs: The bucket interval for aggregations.
:param windowed: When True, each throughput sample reflects only the ops since the previous bucket
rather than the cumulative average since task start.
:return: A global view of throughput samples.
"""

Expand Down Expand Up @@ -1708,14 +1726,14 @@ def calculate(self, samples, bucket_interval_secs=1):
# only transform the values into the expected structure.
first_sample = current_samples[0]
if first_sample.throughput is None:
task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs)
task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs, windowed=windowed)
else:
task_throughput = self.map_task_throughput(current_samples)
global_throughput[task].extend(task_throughput)

return global_throughput

def calculate_task_throughput(self, task, current_samples, bucket_interval_secs):
def calculate_task_throughput(self, task, current_samples, bucket_interval_secs, windowed=False):
task_throughput = []

if task not in self.task_stats:
Expand Down Expand Up @@ -1745,12 +1763,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs)

if current.can_calculate_throughput():
current.finish_bucket(count)
rate = current.windowed_throughput if windowed else current.throughput
task_throughput.append(
(
sample.absolute_time,
sample.relative_time,
current.sample_type,
current.throughput,
rate,
# we calculate throughput per second
f"{sample.total_ops_unit}/s",
)
Expand All @@ -1762,12 +1781,13 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs)
# interval (mainly needed to ensure we show throughput data in test mode)
if last_sample is not None and current.can_add_final_throughput_sample():
current.finish_bucket(count)
rate = current.windowed_throughput if windowed else current.throughput
task_throughput.append(
(
last_sample.absolute_time,
last_sample.relative_time,
current.sample_type,
current.throughput,
rate,
f"{last_sample.total_ops_unit}/s",
)
)
Expand Down
1 change: 1 addition & 0 deletions esrally/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"master.nodes",
"metrics.log.dir",
"metrics.request.downsample.factor",
"metrics.request.throughput.window",
"metrics.url",
Comment thread
gareth-ellis marked this conversation as resolved.
"network.host",
"network.http.port",
Expand Down
31 changes: 31 additions & 0 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,37 @@ def test_use_provided_throughput(self):
assert throughput[1] == (38596, 22, metrics.SampleType.Normal, 8000, "byte/s")
assert throughput[2] == (38597, 23, metrics.SampleType.Normal, 8000, "byte/s")

def test_windowed_throughput(self):
# Three one-second buckets: 1000 ops/s, 1000 ops/s, then a spike to 9000 ops/s.
# Windowed mode should report 9000 for the spike bucket; cumulative mode blends it down to ~3667.
op = track.Operation("index", track.OperationType.Bulk, param_source="driver-test-param-source")

samples = [
driver.Sample(0, 100, 1, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 1, 1 / 3),
driver.Sample(0, 101, 2, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 1000, "ops", 2, 2 / 3),
driver.Sample(0, 102, 3, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 9000, "ops", 3, 3 / 3),
]

windowed = driver.ThroughputCalculator().calculate(samples, windowed=True)
cumulative = driver.ThroughputCalculator().calculate(samples, windowed=False)

assert op in windowed
assert op in cumulative

w = windowed[op]
c = cumulative[op]
assert len(w) == len(c) == 3

# first two buckets are identical in both modes (single-client, steady rate)
assert w[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s")
assert w[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s")
assert c[0] == (100, 1, metrics.SampleType.Normal, 1000, "ops/s")
assert c[1] == (101, 2, metrics.SampleType.Normal, 1000, "ops/s")

# spike bucket: windowed sees only this bucket's 9000 ops over 1s; cumulative blends all 11000 ops over 3s
assert w[2] == (102, 3, metrics.SampleType.Normal, 9000, "ops/s")
assert c[2] == (102, 3, metrics.SampleType.Normal, pytest.approx(11000 / 3), "ops/s")

def calculate_global_throughput(self, samples):
return driver.ThroughputCalculator().calculate(samples)

Expand Down
Loading