diff --git a/esrally/utils/git.py b/esrally/utils/git.py index 7101189fd..a5d7b1a75 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -17,16 +17,51 @@ import logging import os +import shutil +import time from esrally import exceptions from esrally.utils import io, process +GIT_TIMEOUT = 600 +# Number of attempts for network-bound git operations (clone, fetch) before giving up. +GIT_RETRIES = 3 +# Base delay for exponential backoff between retries, in seconds (waits ~2s, then ~4s). +GIT_RETRY_BACKOFF_SECONDS = 2 + + +def _with_retries(operation, *, on_retry=None): + """ + Runs ``operation`` and retries it on ``SupplyError`` up to ``GIT_RETRIES`` times, + sleeping with exponential backoff between attempts. This is intended for network-bound git + operations that may hang or fail transiently. + + :param operation: A zero-argument callable that performs the git operation and raises + ``exceptions.SupplyError`` on failure. + :param on_retry: An optional zero-argument callable invoked between attempts (e.g. to clean up + partial state) but not after the final, failing attempt. + """ + logger = logging.getLogger(__name__) + for attempt in range(1, GIT_RETRIES + 1): + try: + return operation() + except exceptions.SupplyError: + if attempt == GIT_RETRIES: + raise + wait = GIT_RETRY_BACKOFF_SECONDS * 2 ** (attempt - 1) + logger.warning("git operation failed (attempt [%d/%d]); retrying in [%d]s.", attempt, GIT_RETRIES, wait) + if on_retry is not None: + on_retry() + time.sleep(wait) + def probed(f): def probe(src, *args, **kwargs): # Probe for -C if not process.exit_status_as_bool( - lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), + lambda: process.run_subprocess_with_logging( + f"git -C {io.escape_path(src)} --version", level=logging.DEBUG, timeout=GIT_TIMEOUT + ), quiet=True, ): version = process.run_subprocess_with_output("git --version") @@ -67,34 +102,40 @@ def is_branch(src_dir, identifier): def clone(src, *, remote): - io.ensure_dir(src) - # Don't swallow subprocess output, user might need to enter credentials... - if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))): - raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) + def _clone(): + io.ensure_dir(src) + if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT): + raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) + + # A timed-out or failed clone could leave a partial directory behind, remove it so the retry starts clean. + _with_retries(_clone, on_retry=lambda: shutil.rmtree(src, ignore_errors=True)) @probed def fetch(src, *, remote): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"): - raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) + def _fetch(): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT): + raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) + + _with_retries(_fetch) @probed def checkout(src_dir, *, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def checkout_branch(src_dir, remote, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def rebase(src_dir, *, remote, branch): checkout(src_dir, branch=branch) - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch) @@ -114,13 +155,13 @@ def pull_ts(src_dir, ts, *, remote, branch, default_branch): else: rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}' revision = process.run_subprocess_with_output(rev_list_command)[0].strip() - if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts) @probed def checkout_revision(src_dir, *, revision): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}", timeout=GIT_TIMEOUT): raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision) diff --git a/esrally/utils/process.py b/esrally/utils/process.py index a91d463c0..3e2709528 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -18,6 +18,7 @@ import logging import os import shlex +import signal import subprocess import time from collections.abc import Iterable, Mapping @@ -79,6 +80,7 @@ def run_subprocess_with_logging( stdin: Optional[Union[FileId, IO[bytes]]] = None, env: Optional[Mapping[str, str]] = None, detach: bool = False, + timeout: Optional[float] = None, ) -> int: """ Runs the provided command line in a subprocess. All output will be captured by a logger. @@ -90,16 +92,16 @@ def run_subprocess_with_logging( (default: None). :param env: Use specific environment variables (default: None). :param detach: Whether to detach this process from its parent process (default: False). + :param timeout: Optional time in seconds to wait for the subprocess to finish. If exceeded, the child is killed + and this function returns the exit code from the killed child. ``None`` (the default) waits indefinitely. :return: The process exit code as an int. """ logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with logging.", command_line) command_line_args = shlex.split(command_line) - pre_exec = os.setpgrp if detach else None if header is not None: logger.info(header) - # pylint: disable=subprocess-popen-preexec-fn with subprocess.Popen( command_line_args, stdout=subprocess.PIPE, @@ -107,9 +109,28 @@ def run_subprocess_with_logging( universal_newlines=True, env=env, stdin=stdin if stdin else None, - preexec_fn=pre_exec, + start_new_session=detach or timeout is not None, ) as command_line_process: - stdout, _ = command_line_process.communicate() + try: + stdout, _ = command_line_process.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + try: + os.killpg(command_line_process.pid, signal.SIGKILL) + except ProcessLookupError: + # the process group already exited between the timeout firing and the kill + pass + # finish handling pipes and populate the returncode attribute + stdout, _ = command_line_process.communicate() + output = f" Output: [{stdout}]" if stdout else "" + logger.error( + "Subprocess [%s] exceeded timeout of [%s]s and was terminated with return code [%s].%s", + command_line, + timeout, + str(command_line_process.returncode), + output, + ) + return command_line_process.returncode + if stdout: logger.log(level=level, msg=stdout) @@ -140,7 +161,6 @@ def run_subprocess_with_logging_and_output( logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with logging.", command_line) command_line_args = shlex.split(command_line) - pre_exec = os.setpgrp if detach else None if header is not None: logger.info(header) @@ -152,7 +172,7 @@ def run_subprocess_with_logging_and_output( env=env, check=False, stdin=stdin if stdin else None, - preexec_fn=pre_exec, + start_new_session=detach, ) for stdout in completed.stdout.splitlines(): diff --git a/tests/utils/git_test.py b/tests/utils/git_test.py index e8be18399..0a095467b 100644 --- a/tests/utils/git_test.py +++ b/tests/utils/git_test.py @@ -148,26 +148,72 @@ def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess): with pytest.raises(exceptions.SystemSetupError) as exc: git.head_revision("/src") assert exc.value.args[0] == "Your git version is [1.0.0] but Rally requires at least git 1.9. Please update git." - run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG) + run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG, timeout=git.GIT_TIMEOUT) def test_clone_successful(self): git.clone(self.tmp_clone_dir, remote=self.remote_tmp_src_dir) assert git.is_working_copy(self.tmp_clone_dir) - def test_clone_with_error(self): + @mock.patch("time.sleep") + def test_clone_with_error(self, sleep): remote = "/this/remote/doesnt/exist" with pytest.raises(exceptions.SupplyError) as exc: git.clone(self.tmp_clone_dir, remote=remote) assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" + @mock.patch("time.sleep") + @mock.patch("shutil.rmtree") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_clone_retries_until_exhausted(self, run_subprocess_with_logging, rmtree, sleep): + run_subprocess_with_logging.return_value = -9 + remote = "https://github.com/elastic/rally-tracks" + with pytest.raises(exceptions.SupplyError) as exc: + git.clone(self.tmp_clone_dir, remote=remote) + assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]" + run_subprocess_with_logging.assert_called_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT) + assert run_subprocess_with_logging.call_count == git.GIT_RETRIES + # the partial clone is cleaned up between attempts, but not after the final failure + assert rmtree.call_count == git.GIT_RETRIES - 1 + + @mock.patch("time.sleep") + @mock.patch("shutil.rmtree") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_clone_retries_then_succeeds(self, run_subprocess_with_logging, rmtree, sleep): + # fail once, then succeed + run_subprocess_with_logging.side_effect = [-9, 0] + remote = "https://github.com/elastic/rally-tracks" + git.clone(self.tmp_clone_dir, remote=remote) + assert run_subprocess_with_logging.call_count == 2 + assert rmtree.call_count == 1 + def test_fetch_successful(self): git.fetch(self.local_tmp_src_dir, remote=self.local_remote_name) - def test_fetch_with_error(self): + @mock.patch("time.sleep") + def test_fetch_with_error(self, sleep): with pytest.raises(exceptions.SupplyError) as exc: git.fetch(self.local_tmp_src_dir, remote="this-remote-doesnt-actually-exist") assert exc.value.args[0] == "Could not fetch source tree from [this-remote-doesnt-actually-exist]" + @mock.patch("time.sleep") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_fetch_retries_until_exhausted(self, run_subprocess_with_logging, sleep): + # the @probed decorator first runs a `git --version` probe which must succeed + run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else -9 + with pytest.raises(exceptions.SupplyError): + git.fetch(self.local_tmp_src_dir, remote="origin") + fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]] + assert len(fetch_calls) == git.GIT_RETRIES + + @mock.patch("time.sleep") + @mock.patch("esrally.utils.process.run_subprocess_with_logging") + def test_fetch_retries_then_succeeds(self, run_subprocess_with_logging, sleep): + fetch_results = [-9, 0] + run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else fetch_results.pop(0) + git.fetch(self.local_tmp_src_dir, remote="origin") + fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]] + assert len(fetch_calls) == 2 + def test_checkout_successful(self): git.checkout(self.local_tmp_src_dir, branch=self.local_branch) assert git.current_branch(self.local_tmp_src_dir) == self.local_branch diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index 15c350c83..61eb0c634 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -15,7 +15,11 @@ # specific language governing permissions and limitations # under the License. +import logging import os +import signal +import subprocess +import time from unittest import mock import psutil @@ -207,3 +211,42 @@ def test_run_subprocess(): assert completed_process.returncode != 0 assert completed_process.stdout != "" assert completed_process.stderr is None + + +def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_path): + pid_file = tmp_path / "grandchild.pid" + # The shell backgrounds `sleep 600` in the same process group; both should die on timeout. + cmd = f"sh -c 'sleep 600 & echo $! > \"{pid_file}\"; wait'" + timeout = 1 + + with caplog.at_level(logging.ERROR, logger="esrally.utils.process"): + returncode = process.run_subprocess_with_logging(cmd, timeout=timeout) + grandchild_pid = int(pid_file.read_text().strip()) + + assert returncode == -signal.SIGKILL + # the grandchild PID is reaped asynchronously, so poll until its PID is gone + deadline = time.monotonic() + 10 + while psutil.pid_exists(grandchild_pid) and time.monotonic() < deadline: + time.sleep(0.05) + assert not psutil.pid_exists(grandchild_pid), f"grandchild PID {grandchild_pid} survived process-group kill" + expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]." + assert any( + r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records + ), f"expected ERROR log starting with {expected!r}, got: {[r.getMessage() for r in caplog.records]}" + + +@mock.patch("esrally.utils.process.os.killpg", side_effect=ProcessLookupError) +@mock.patch("esrally.utils.process.subprocess.Popen") +def test_run_subprocess_with_logging_timeout_handles_already_exited_process(popen, killpg): + proc = popen.return_value.__enter__.return_value + proc.pid = 4242 + proc.returncode = -signal.SIGKILL + # first communicate() times out, second (after the kill) drains the pipes + proc.communicate.side_effect = [subprocess.TimeoutExpired(cmd="sleep 600", timeout=1), ("", None)] + + # the process group exiting between the timeout and the kill must not propagate as an error + returncode = process.run_subprocess_with_logging("sleep 600", timeout=1) + + assert returncode == -signal.SIGKILL + killpg.assert_called_once_with(4242, signal.SIGKILL) + assert proc.communicate.call_count == 2