Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions esrally/utils/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,51 @@

import logging
import os
import shutil
import time

from esrally import exceptions
from esrally.utils import io, process

GIT_TIMEOUT = 600
# Number of attempts for network-bound git operations (clone, fetch) before giving up.
GIT_RETRIES = 3
# Base delay for exponential backoff between retries, in seconds (waits ~2s, then ~4s).
GIT_RETRY_BACKOFF_SECONDS = 2

Comment thread
b-deam marked this conversation as resolved.

def _with_retries(operation, *, on_retry=None):
"""
Runs ``operation`` and retries it on ``SupplyError`` up to ``GIT_RETRIES`` times,
sleeping with exponential backoff between attempts. This is intended for network-bound git
operations that may hang or fail transiently.

:param operation: A zero-argument callable that performs the git operation and raises
``exceptions.SupplyError`` on failure.
:param on_retry: An optional zero-argument callable invoked between attempts (e.g. to clean up
partial state) but not after the final, failing attempt.
"""
logger = logging.getLogger(__name__)
for attempt in range(1, GIT_RETRIES + 1):
try:
return operation()
except exceptions.SupplyError:
if attempt == GIT_RETRIES:
raise
wait = GIT_RETRY_BACKOFF_SECONDS * 2 ** (attempt - 1)
logger.warning("git operation failed (attempt [%d/%d]); retrying in [%d]s.", attempt, GIT_RETRIES, wait)
if on_retry is not None:
on_retry()
time.sleep(wait)


def probed(f):
def probe(src, *args, **kwargs):
# Probe for -C
if not process.exit_status_as_bool(
lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG),
lambda: process.run_subprocess_with_logging(
f"git -C {io.escape_path(src)} --version", level=logging.DEBUG, timeout=GIT_TIMEOUT
),
quiet=True,
):
version = process.run_subprocess_with_output("git --version")
Expand Down Expand Up @@ -67,34 +102,41 @@ def is_branch(src_dir, identifier):


def clone(src, *, remote):
io.ensure_dir(src)
# Don't swallow subprocess output, user might need to enter credentials...
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))):
raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src))
def _clone():
io.ensure_dir(src)
# Don't swallow subprocess output, user might need to enter credentials...
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src))

# A timed-out or failed clone could leave a partial directory behind, remove it so the retry starts clean.
_with_retries(_clone, on_retry=lambda: shutil.rmtree(src, ignore_errors=True))


@probed
def fetch(src, *, remote):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"):
raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote)
def _fetch():
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote)

_with_retries(_fetch)


@probed
def checkout(src_dir, *, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def checkout_branch(src_dir, remote, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def rebase(src_dir, *, remote, branch):
checkout(src_dir, branch=branch)
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch)


Expand All @@ -114,13 +156,13 @@ def pull_ts(src_dir, ts, *, remote, branch, default_branch):
else:
rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}'
revision = process.run_subprocess_with_output(rev_list_command)[0].strip()
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}", timeout=GIT_TIMEOUT):
Comment thread
b-deam marked this conversation as resolved.
raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts)


@probed
def checkout_revision(src_dir, *, revision):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision)


Expand Down
29 changes: 28 additions & 1 deletion esrally/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import os
import shlex
import signal
import subprocess
import time
from collections.abc import Iterable, Mapping
Expand Down Expand Up @@ -79,6 +80,7 @@ def run_subprocess_with_logging(
stdin: Optional[Union[FileId, IO[bytes]]] = None,
env: Optional[Mapping[str, str]] = None,
detach: bool = False,
timeout: Optional[float] = None,
) -> int:
"""
Runs the provided command line in a subprocess. All output will be captured by a logger.
Expand All @@ -90,6 +92,8 @@ def run_subprocess_with_logging(
(default: None).
:param env: Use specific environment variables (default: None).
:param detach: Whether to detach this process from its parent process (default: False).
:param timeout: Optional time in seconds to wait for the subprocess to finish. If exceeded, the child is killed
and this function returns the exit code from the killed child. ``None`` (the default) waits indefinitely.
:return: The process exit code as an int.
"""
logger = logging.getLogger(__name__)
Expand All @@ -99,6 +103,9 @@ def run_subprocess_with_logging(
if header is not None:
logger.info(header)

# only start a new session when a timeout is requested
new_session = timeout is not None

# pylint: disable=subprocess-popen-preexec-fn
with subprocess.Popen(
command_line_args,
Expand All @@ -108,8 +115,28 @@ def run_subprocess_with_logging(
env=env,
stdin=stdin if stdin else None,
preexec_fn=pre_exec,
start_new_session=new_session,
) as command_line_process:
Comment thread
b-deam marked this conversation as resolved.
stdout, _ = command_line_process.communicate()
try:
stdout, _ = command_line_process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
try:
os.killpg(command_line_process.pid, signal.SIGKILL)
except ProcessLookupError:
# the process group already exited between the timeout firing and the kill
logger.debug("Subprocess [%s] already exited before it could be killed.", command_line)
# finish handling pipes and populate the returncode attribute
stdout, _ = command_line_process.communicate()
output = f" Output: [{stdout}]" if stdout else ""
Comment thread
b-deam marked this conversation as resolved.
logger.error(
"Subprocess [%s] exceeded timeout of [%s]s and was terminated with return code [%s].%s",
command_line,
timeout,
str(command_line_process.returncode),
output,
)
Comment thread
b-deam marked this conversation as resolved.
return command_line_process.returncode

if stdout:
logger.log(level=level, msg=stdout)

Expand Down
52 changes: 49 additions & 3 deletions tests/utils/git_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,72 @@ def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess):
with pytest.raises(exceptions.SystemSetupError) as exc:
git.head_revision("/src")
assert exc.value.args[0] == "Your git version is [1.0.0] but Rally requires at least git 1.9. Please update git."
run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG)
run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG, timeout=git.GIT_TIMEOUT)

def test_clone_successful(self):
git.clone(self.tmp_clone_dir, remote=self.remote_tmp_src_dir)
assert git.is_working_copy(self.tmp_clone_dir)

def test_clone_with_error(self):
@mock.patch("time.sleep")
def test_clone_with_error(self, sleep):
remote = "/this/remote/doesnt/exist"
with pytest.raises(exceptions.SupplyError) as exc:
git.clone(self.tmp_clone_dir, remote=remote)
assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]"

@mock.patch("time.sleep")
@mock.patch("shutil.rmtree")
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_clone_retries_until_exhausted(self, run_subprocess_with_logging, rmtree, sleep):
run_subprocess_with_logging.return_value = -9
remote = "https://github.com/elastic/rally-tracks"
with pytest.raises(exceptions.SupplyError) as exc:
git.clone(self.tmp_clone_dir, remote=remote)
assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]"
run_subprocess_with_logging.assert_called_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT)
assert run_subprocess_with_logging.call_count == git.GIT_RETRIES
# the partial clone is cleaned up between attempts, but not after the final failure
assert rmtree.call_count == git.GIT_RETRIES - 1

@mock.patch("time.sleep")
@mock.patch("shutil.rmtree")
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_clone_retries_then_succeeds(self, run_subprocess_with_logging, rmtree, sleep):
# fail once, then succeed
run_subprocess_with_logging.side_effect = [-9, 0]
remote = "https://github.com/elastic/rally-tracks"
git.clone(self.tmp_clone_dir, remote=remote)
assert run_subprocess_with_logging.call_count == 2
assert rmtree.call_count == 1

def test_fetch_successful(self):
git.fetch(self.local_tmp_src_dir, remote=self.local_remote_name)

def test_fetch_with_error(self):
@mock.patch("time.sleep")
def test_fetch_with_error(self, sleep):
with pytest.raises(exceptions.SupplyError) as exc:
git.fetch(self.local_tmp_src_dir, remote="this-remote-doesnt-actually-exist")
assert exc.value.args[0] == "Could not fetch source tree from [this-remote-doesnt-actually-exist]"

@mock.patch("time.sleep")
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_fetch_retries_until_exhausted(self, run_subprocess_with_logging, sleep):
# the @probed decorator first runs a `git --version` probe which must succeed
run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else -9
with pytest.raises(exceptions.SupplyError):
git.fetch(self.local_tmp_src_dir, remote="origin")
fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]]
assert len(fetch_calls) == git.GIT_RETRIES

@mock.patch("time.sleep")
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_fetch_retries_then_succeeds(self, run_subprocess_with_logging, sleep):
fetch_results = [-9, 0]
run_subprocess_with_logging.side_effect = lambda cmd, **kwargs: 0 if "--version" in cmd else fetch_results.pop(0)
git.fetch(self.local_tmp_src_dir, remote="origin")
fetch_calls = [c for c in run_subprocess_with_logging.call_args_list if "fetch" in c.args[0]]
assert len(fetch_calls) == 2

def test_checkout_successful(self):
git.checkout(self.local_tmp_src_dir, branch=self.local_branch)
assert git.current_branch(self.local_tmp_src_dir) == self.local_branch
Expand Down
67 changes: 67 additions & 0 deletions tests/utils/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import logging
import os
import signal
import subprocess
import time
from unittest import mock

import psutil
Expand Down Expand Up @@ -207,3 +211,66 @@ def test_run_subprocess():
assert completed_process.returncode != 0
assert completed_process.stdout != ""
assert completed_process.stderr is None


def test_run_subprocess_with_logging_timeout_kills_process_group(caplog, tmp_path):
pid_file = tmp_path / "grandchild.pid"
# The shell backgrounds `sleep 600` in the same process group; both should die on timeout.
cmd = f"sh -c 'sleep 600 & echo $! > {pid_file}; wait'"
Comment thread
b-deam marked this conversation as resolved.
Outdated
timeout = 1

with caplog.at_level(logging.ERROR, logger="esrally.utils.process"):
returncode = process.run_subprocess_with_logging(cmd, timeout=timeout)
grandchild_pid = int(pid_file.read_text().strip())

assert returncode == -signal.SIGKILL
# the grandchild PID is reaped asynchronously, so poll until its PID is gone
deadline = time.monotonic() + 10
while psutil.pid_exists(grandchild_pid) and time.monotonic() < deadline:
time.sleep(0.05)
assert not psutil.pid_exists(grandchild_pid), f"grandchild PID {grandchild_pid} survived process-group kill"
expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]."
assert any(
r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records
), f"expected ERROR log starting with {expected!r}, got: {[r.getMessage() for r in caplog.records]}"


@mock.patch("esrally.utils.process.subprocess.Popen")
def test_run_subprocess_with_logging_without_timeout_does_not_start_new_session(popen):
proc = popen.return_value.__enter__.return_value
proc.returncode = 0
proc.communicate.return_value = ("", None)

process.run_subprocess_with_logging("true")

# a new session detaches the child from the controlling terminal, so only do it when we need
# os.killpg() to enforce a timeout
assert popen.call_args.kwargs["start_new_session"] is False


@mock.patch("esrally.utils.process.subprocess.Popen")
def test_run_subprocess_with_logging_with_timeout_starts_new_session(popen):
proc = popen.return_value.__enter__.return_value
proc.returncode = 0
proc.communicate.return_value = ("", None)

process.run_subprocess_with_logging("true", timeout=5)

assert popen.call_args.kwargs["start_new_session"] is True


@mock.patch("esrally.utils.process.os.killpg", side_effect=ProcessLookupError)
@mock.patch("esrally.utils.process.subprocess.Popen")
def test_run_subprocess_with_logging_timeout_handles_already_exited_process(popen, killpg):
proc = popen.return_value.__enter__.return_value
proc.pid = 4242
proc.returncode = -signal.SIGKILL
# first communicate() times out, second (after the kill) drains the pipes
proc.communicate.side_effect = [subprocess.TimeoutExpired(cmd="sleep 600", timeout=1), ("", None)]

# the process group exiting between the timeout and the kill must not propagate as an error
returncode = process.run_subprocess_with_logging("sleep 600", timeout=1)

assert returncode == -signal.SIGKILL
killpg.assert_called_once_with(4242, signal.SIGKILL)
assert proc.communicate.call_count == 2
Loading