From 8d6b873a930adcc00c2b277fcc4f4b46320b51f9 Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Fri, 29 May 2026 16:54:36 +0930 Subject: [PATCH 1/7] add timeout to run_subprocess_with_logging and bound git operations A stalled subprocess (notably git clone of track or team repos) could wedge the calling actor indefinitely because subprocess.communicate() was invoked without a timeout. This adds an optional timeout kwarg to run_subprocess_with_logging that kills the child process on timeout expiry, drains output, and returns the (non-zero) child return code. All git operations in esrally/utils/git.py now pass a 600s timeout. --- esrally/utils/git.py | 20 ++++++++++++-------- esrally/utils/process.py | 20 +++++++++++++++++++- tests/utils/git_test.py | 11 ++++++++++- tests/utils/process_test.py | 15 +++++++++++++++ 4 files changed, 56 insertions(+), 10 deletions(-) diff --git a/esrally/utils/git.py b/esrally/utils/git.py index 7101189fd..0d2668519 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -21,12 +21,16 @@ from esrally import exceptions from esrally.utils import io, process +GIT_TIMEOUT = 600 + def probed(f): def probe(src, *args, **kwargs): # Probe for -C if not process.exit_status_as_bool( - lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), + lambda: process.run_subprocess_with_logging( + f"git -C {io.escape_path(src)} --version", level=logging.DEBUG, timeout=GIT_TIMEOUT + ), quiet=True, ): version = process.run_subprocess_with_output("git --version") @@ -69,32 +73,32 @@ def is_branch(src_dir, identifier): def clone(src, *, remote): io.ensure_dir(src) # Don't swallow subprocess output, user might need to enter credentials... - if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))): + if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) @probed def fetch(src, *, remote): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) @probed def checkout(src_dir, *, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def checkout_branch(src_dir, remote, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def rebase(src_dir, *, remote, branch): checkout(src_dir, branch=branch) - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch) @@ -114,13 +118,13 @@ def pull_ts(src_dir, ts, *, remote, branch, default_branch): else: rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}' revision = process.run_subprocess_with_output(rev_list_command)[0].strip() - if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts) @probed def checkout_revision(src_dir, *, revision): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision) diff --git a/esrally/utils/process.py b/esrally/utils/process.py index a91d463c0..4996c37e3 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -79,6 +79,7 @@ def run_subprocess_with_logging( stdin: Optional[Union[FileId, IO[bytes]]] = None, env: Optional[Mapping[str, str]] = None, detach: bool = False, + timeout: Optional[float] = None, ) -> int: """ Runs the provided command line in a subprocess. All output will be captured by a logger. @@ -90,6 +91,8 @@ def run_subprocess_with_logging( (default: None). :param env: Use specific environment variables (default: None). :param detach: Whether to detach this process from its parent process (default: False). + :param timeout: Optional time in seconds to wait for the subprocess to finish. If exceeded, the child is killed + and this function returns the exit code from the killed child. ``None`` (the default) waits indefinitely. :return: The process exit code as an int. """ logger = logging.getLogger(__name__) @@ -109,7 +112,22 @@ def run_subprocess_with_logging( stdin=stdin if stdin else None, preexec_fn=pre_exec, ) as command_line_process: - stdout, _ = command_line_process.communicate() + try: + stdout, _ = command_line_process.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + command_line_process.kill() + # finish handling pipes and populate the returncode attribute + stdout, _ = command_line_process.communicate() + output = f" Output: [{stdout}]" if stdout else "" + logger.error( + "Subprocess [%s] exceeded timeout of [%s]s and was terminated with return code [%s].%s", + command_line, + timeout, + str(command_line_process.returncode), + output, + ) + return command_line_process.returncode + if stdout: logger.log(level=level, msg=stdout) diff --git a/tests/utils/git_test.py b/tests/utils/git_test.py index e8be18399..8d4f974a4 100644 --- a/tests/utils/git_test.py +++ b/tests/utils/git_test.py @@ -148,7 +148,7 @@ def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess): with pytest.raises(exceptions.SystemSetupError) as exc: git.head_revision("/src") assert exc.value.args[0] == "Your git version is [1.0.0] but Rally requires at least git 1.9. Please update git." - run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG) + run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG, timeout=git.GIT_TIMEOUT) def test_clone_successful(self): git.clone(self.tmp_clone_dir, remote=self.remote_tmp_src_dir) @@ -160,6 +160,15 @@ def test_clone_with_error(self): git.clone(self.tmp_clone_dir, remote=remote) assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_clone_raises_supply_error_on_timeout(self, run_subprocess_with_logging): + run_subprocess_with_logging.return_value = -9 + remote = "https://github.com/elastic/rally-tracks" + with pytest.raises(exceptions.SupplyError) as exc: + git.clone(self.tmp_clone_dir, remote=remote) + assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" + run_subprocess_with_logging.assert_called_once_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT) + def test_fetch_successful(self): git.fetch(self.local_tmp_src_dir, remote=self.local_remote_name) diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index 15c350c83..9f51abcbd 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. +import logging import os +import signal from unittest import mock import psutil @@ -207,3 +209,16 @@ def test_run_subprocess(): assert completed_process.returncode != 0 assert completed_process.stdout != "" assert completed_process.stderr is None + + +def test_run_subprocess_with_logging_timeout_kills_child(caplog): + cmd = "sleep 5" + timeout = 0 + with caplog.at_level(logging.ERROR, logger="esrally.utils.process"): + returncode = process.run_subprocess_with_logging(cmd, timeout=timeout) + + assert returncode == -signal.SIGKILL + expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]." + assert any( + r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records + ), f"expected ERROR log starting with {expected!r}, got: {[r.getMessage() for r in caplog.records]}" From 7f74ef94fd72837054b04535392a478fc949276c Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Fri, 29 May 2026 18:10:11 +0930 Subject: [PATCH 2/7] Address feedback regarding process grouping --- esrally/utils/process.py | 4 +++- tests/utils/process_test.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/esrally/utils/process.py b/esrally/utils/process.py index 4996c37e3..ce1f672b1 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -18,6 +18,7 @@ import logging import os import shlex +import signal import subprocess import time from collections.abc import Iterable, Mapping @@ -111,11 +112,12 @@ def run_subprocess_with_logging( env=env, stdin=stdin if stdin else None, preexec_fn=pre_exec, + start_new_session=True, ) as command_line_process: try: stdout, _ = command_line_process.communicate(timeout=timeout) except subprocess.TimeoutExpired: - command_line_process.kill() + os.killpg(command_line_process.pid, signal.SIGKILL) # finish handling pipes and populate the returncode attribute stdout, _ = command_line_process.communicate() output = f" Output: [{stdout}]" if stdout else "" diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index 9f51abcbd..ef44c02e5 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -211,13 +211,18 @@ def test_run_subprocess(): assert completed_process.stderr is None -def test_run_subprocess_with_logging_timeout_kills_child(caplog): - cmd = "sleep 5" - timeout = 0 +def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_path): + pid_file = tmp_path / "grandchild.pid" + # The shell backgrounds `sleep 600` in the same process group; both should die on timeout. + cmd = f"sh -c 'sleep 600 & echo $! > {pid_file}; wait'" + timeout = 1 + with caplog.at_level(logging.ERROR, logger="esrally.utils.process"): returncode = process.run_subprocess_with_logging(cmd, timeout=timeout) + grandchild_pid = int(pid_file.read_text().strip()) assert returncode == -signal.SIGKILL + assert not psutil.pid_exists(grandchild_pid), f"grandchild PID {grandchild_pid} survived process-group kill" expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]." assert any( r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records From 6cd531cc15afd1a8f2e023cd5b9076b0b2f03f4d Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 22 Jun 2026 10:45:02 +0930 Subject: [PATCH 3/7] Poll grandchild PID in test --- tests/utils/process_test.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index ef44c02e5..4a1a5af14 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -18,6 +18,7 @@ import logging import os import signal +import time from unittest import mock import psutil @@ -222,6 +223,10 @@ def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_pat grandchild_pid = int(pid_file.read_text().strip()) assert returncode == -signal.SIGKILL + # the grandchild PID is reaped asynchronously, so poll until its PID is gone + deadline = time.monotonic() + 10 + while psutil.pid_exists(grandchild_pid) and time.monotonic() < deadline: + time.sleep(0.05) assert not psutil.pid_exists(grandchild_pid), f"grandchild PID {grandchild_pid} survived process-group kill" expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]." assert any( From eb446ae600b9aec6522ac6edae28c771400ff8ce Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 22 Jun 2026 11:25:43 +0930 Subject: [PATCH 4/7] Add retries to network git operations --- esrally/utils/git.py | 50 ++++++++++++++++++++++++++++++++++++----- tests/utils/git_test.py | 45 +++++++++++++++++++++++++++++++++---- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/esrally/utils/git.py b/esrally/utils/git.py index 0d2668519..9d630fa9f 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -17,11 +17,42 @@ import logging import os +import shutil +import time from esrally import exceptions from esrally.utils import io, process GIT_TIMEOUT = 600 +# Number of attempts for network-bound git operations (clone, fetch) before giving up. +GIT_RETRIES = 3 +# Base delay for exponential backoff between retries, in seconds (waits ~2s, then ~4s). +GIT_RETRY_BACKOFF_SECONDS = 2 + + +def _with_retries(operation, *, on_retry=None): + """ + Runs ``operation`` and retries it on :class:`exceptions.SupplyError` up to ``GIT_RETRIES`` times, + sleeping with exponential backoff between attempts. This is intended for network-bound git + operations that may hang or fail transiently. + + :param operation: A zero-argument callable that performs the git operation and raises + :class:`exceptions.SupplyError` on failure. + :param on_retry: An optional zero-argument callable invoked between attempts (e.g. to clean up + partial state) but not after the final, failing attempt. + """ + logger = logging.getLogger(__name__) + for attempt in range(1, GIT_RETRIES + 1): + try: + return operation() + except exceptions.SupplyError: + if attempt == GIT_RETRIES: + raise + wait = GIT_RETRY_BACKOFF_SECONDS * 2 ** (attempt - 1) + logger.warning("git operation failed (attempt [%d/%d]); retrying in [%d]s.", attempt, GIT_RETRIES, wait) + if on_retry is not None: + on_retry() + time.sleep(wait) def probed(f): @@ -71,16 +102,23 @@ def is_branch(src_dir, identifier): def clone(src, *, remote): - io.ensure_dir(src) - # Don't swallow subprocess output, user might need to enter credentials... - if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT): - raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) + def _clone(): + io.ensure_dir(src) + # Don't swallow subprocess output, user might need to enter credentials... + if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT): + raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) + + # A timed-out or failed clone could leave a partial directory behind, remove it so the retry starts clean. + _with_retries(_clone, on_retry=lambda: shutil.rmtree(src, ignore_errors=True)) @probed def fetch(src, *, remote): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT): - raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) + def _fetch(): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT): + raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) + + _with_retries(_fetch) @probed diff --git a/tests/utils/git_test.py b/tests/utils/git_test.py index 8d4f974a4..0a095467b 100644 --- a/tests/utils/git_test.py +++ b/tests/utils/git_test.py @@ -154,29 +154,66 @@ def test_clone_successful(self): git.clone(self.tmp_clone_dir, remote=self.remote_tmp_src_dir) assert git.is_working_copy(self.tmp_clone_dir) - def test_clone_with_error(self): + @mock.patch("time.sleep") + def test_clone_with_error(self, sleep): remote = "/this/remote/doesnt/exist" with pytest.raises(exceptions.SupplyError) as exc: git.clone(self.tmp_clone_dir, remote=remote) assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" + @mock.patch("time.sleep") + @mock.patch("shutil.rmtree") @mock.patch("esrally.utils.process.run_subprocess_with_logging") - def test_clone_raises_supply_error_on_timeout(self, run_subprocess_with_logging): + def test_clone_retries_until_exhausted(self, run_subprocess_with_logging, rmtree, sleep): run_subprocess_with_logging.return_value = -9 remote = "https://github.com/elastic/rally-tracks" with pytest.raises(exceptions.SupplyError) as exc: git.clone(self.tmp_clone_dir, remote=remote) assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" - run_subprocess_with_logging.assert_called_once_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT) + run_subprocess_with_logging.assert_called_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT) + assert run_subprocess_with_logging.call_count == git.GIT_RETRIES + # the partial clone is cleaned up between attempts, but not after the final failure + assert rmtree.call_count == git.GIT_RETRIES - 1 + + @mock.patch("time.sleep") + @mock.patch("shutil.rmtree") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_clone_retries_then_succeeds(self, run_subprocess_with_logging, rmtree, sleep): + # fail once, then succeed + run_subprocess_with_logging.side_effect = [-9, 0] + remote = "https://github.com/elastic/rally-tracks" + git.clone(self.tmp_clone_dir, remote=remote) + assert run_subprocess_with_logging.call_count == 2 + assert rmtree.call_count == 1 def test_fetch_successful(self): git.fetch(self.local_tmp_src_dir, remote=self.local_remote_name) - def test_fetch_with_error(self): + @mock.patch("time.sleep") + def test_fetch_with_error(self, sleep): with pytest.raises(exceptions.SupplyError) as exc: git.fetch(self.local_tmp_src_dir, remote="this-remote-doesnt-actually-exist") assert exc.value.args[0] == "Could not fetch source tree from [this-remote-doesnt-actually-exist]" + @mock.patch("time.sleep") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_fetch_retries_until_exhausted(self, run_subprocess_with_logging, sleep): + # the @probed decorator first runs a `git --version` probe which must succeed + run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else -9 + with pytest.raises(exceptions.SupplyError): + git.fetch(self.local_tmp_src_dir, remote="origin") + fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]] + assert len(fetch_calls) == git.GIT_RETRIES + + @mock.patch("time.sleep") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_fetch_retries_then_succeeds(self, run_subprocess_with_logging, sleep): + fetch_results = [-9, 0] + run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else fetch_results.pop(0) + git.fetch(self.local_tmp_src_dir, remote="origin") + fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]] + assert len(fetch_calls) == 2 + def test_checkout_successful(self): git.checkout(self.local_tmp_src_dir, branch=self.local_branch) assert git.current_branch(self.local_tmp_src_dir) == self.local_branch From 0d1067839b711da72464a4e00daa2b1642d34df6 Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 22 Jun 2026 12:44:25 +0930 Subject: [PATCH 5/7] Linting --- esrally/utils/git.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esrally/utils/git.py b/esrally/utils/git.py index 9d630fa9f..3986d62ad 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -32,12 +32,12 @@ def _with_retries(operation, *, on_retry=None): """ - Runs ``operation`` and retries it on :class:`exceptions.SupplyError` up to ``GIT_RETRIES`` times, + Runs ``operation`` and retries it on ``SupplyError`` up to ``GIT_RETRIES`` times, sleeping with exponential backoff between attempts. This is intended for network-bound git operations that may hang or fail transiently. :param operation: A zero-argument callable that performs the git operation and raises - :class:`exceptions.SupplyError` on failure. + ``exceptions.SupplyError`` on failure. :param on_retry: An optional zero-argument callable invoked between attempts (e.g. to clean up partial state) but not after the final, failing attempt. """ From 572c5ebaecfce8b0d079bd38b8252326bc846299 Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Mon, 22 Jun 2026 14:38:26 +0930 Subject: [PATCH 6/7] Address Copilot feedback --- esrally/utils/process.py | 11 ++++++++-- tests/utils/process_test.py | 42 +++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/esrally/utils/process.py b/esrally/utils/process.py index ce1f672b1..098393691 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -103,6 +103,9 @@ def run_subprocess_with_logging( if header is not None: logger.info(header) + # only start a new session when a timeout is requested + new_session = timeout is not None + # pylint: disable=subprocess-popen-preexec-fn with subprocess.Popen( command_line_args, @@ -112,12 +115,16 @@ def run_subprocess_with_logging( env=env, stdin=stdin if stdin else None, preexec_fn=pre_exec, - start_new_session=True, + start_new_session=new_session, ) as command_line_process: try: stdout, _ = command_line_process.communicate(timeout=timeout) except subprocess.TimeoutExpired: - os.killpg(command_line_process.pid, signal.SIGKILL) + try: + os.killpg(command_line_process.pid, signal.SIGKILL) + except ProcessLookupError: + # the process group already exited between the timeout firing and the kill + logger.debug("Subprocess [%s] already exited before it could be killed.", command_line) # finish handling pipes and populate the returncode attribute stdout, _ = command_line_process.communicate() output = f" Output: [{stdout}]" if stdout else "" diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index 4a1a5af14..07e99fa8f 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -18,6 +18,7 @@ import logging import os import signal +import subprocess import time from unittest import mock @@ -232,3 +233,44 @@ def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_pat assert any( r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records ), f"expected ERROR log starting with {expected!r}, got: {[r.getMessage() for r in caplog.records]}" + + +@mock.patch("esrally.utils.process.subprocess.Popen") +def test_run_subprocess_with_logging_without_timeout_does_not_start_new_session(popen): + proc = popen.return_value.__enter__.return_value + proc.returncode = 0 + proc.communicate.return_value = ("", None) + + process.run_subprocess_with_logging("true") + + # a new session detaches the child from the controlling terminal, so only do it when we need + # os.killpg() to enforce a timeout + assert popen.call_args.kwargs["start_new_session"] is False + + +@mock.patch("esrally.utils.process.subprocess.Popen") +def test_run_subprocess_with_logging_with_timeout_starts_new_session(popen): + proc = popen.return_value.__enter__.return_value + proc.returncode = 0 + proc.communicate.return_value = ("", None) + + process.run_subprocess_with_logging("true", timeout=5) + + assert popen.call_args.kwargs["start_new_session"] is True + + +@mock.patch("esrally.utils.process.os.killpg", side_effect=ProcessLookupError) +@mock.patch("esrally.utils.process.subprocess.Popen") +def test_run_subprocess_with_logging_timeout_handles_already_exited_process(popen, killpg): + proc = popen.return_value.__enter__.return_value + proc.pid = 4242 + proc.returncode = -signal.SIGKILL + # first communicate() times out, second (after the kill) drains the pipes + proc.communicate.side_effect = [subprocess.TimeoutExpired(cmd="sleep 600", timeout=1), ("", None)] + + # the process group exiting between the timeout and the kill must not propagate as an error + returncode = process.run_subprocess_with_logging("sleep 600", timeout=1) + + assert returncode == -signal.SIGKILL + killpg.assert_called_once_with(4242, signal.SIGKILL) + assert proc.communicate.call_count == 2 From c00a509766eb404615755ed266b45c8deca8cd23 Mon Sep 17 00:00:00 2001 From: Brad Deam Date: Tue, 23 Jun 2026 11:10:24 +0930 Subject: [PATCH 7/7] Quote pidfile path inside test --- tests/utils/process_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index 07e99fa8f..25a0c194d 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -216,7 +216,7 @@ def test_run_subprocess(): def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_path): pid_file = tmp_path / "grandchild.pid" # The shell backgrounds `sleep 600` in the same process group; both should die on timeout. - cmd = f"sh -c 'sleep 600 & echo $! > {pid_file}; wait'" + cmd = f"sh -c 'sleep 600 & echo $! > \"{pid_file}\"; wait'" timeout = 1 with caplog.at_level(logging.ERROR, logger="esrally.utils.process"):