Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions esrally/utils/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
from esrally import exceptions
from esrally.utils import io, process

GIT_TIMEOUT = 600


def probed(f):
def probe(src, *args, **kwargs):
# Probe for -C
if not process.exit_status_as_bool(
lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG),
lambda: process.run_subprocess_with_logging(
f"git -C {io.escape_path(src)} --version", level=logging.DEBUG, timeout=GIT_TIMEOUT
),
quiet=True,
):
version = process.run_subprocess_with_output("git --version")
Expand Down Expand Up @@ -69,32 +73,32 @@ def is_branch(src_dir, identifier):
def clone(src, *, remote):
io.ensure_dir(src)
# Don't swallow subprocess output, user might need to enter credentials...
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))):
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src)), timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src))


@probed
def fetch(src, *, remote):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote)


@probed
def checkout(src_dir, *, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def checkout_branch(src_dir, remote, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def rebase(src_dir, *, remote, branch):
checkout(src_dir, branch=branch)
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch)


Expand All @@ -114,13 +118,13 @@ def pull_ts(src_dir, ts, *, remote, branch, default_branch):
else:
rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}'
revision = process.run_subprocess_with_output(rev_list_command)[0].strip()
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}", timeout=GIT_TIMEOUT):
Comment thread
b-deam marked this conversation as resolved.
raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts)


@probed
def checkout_revision(src_dir, *, revision):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}", timeout=GIT_TIMEOUT):
raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision)


Expand Down
20 changes: 19 additions & 1 deletion esrally/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def run_subprocess_with_logging(
stdin: Optional[Union[FileId, IO[bytes]]] = None,
env: Optional[Mapping[str, str]] = None,
detach: bool = False,
timeout: Optional[float] = None,
) -> int:
"""
Runs the provided command line in a subprocess. All output will be captured by a logger.
Expand All @@ -90,6 +91,8 @@ def run_subprocess_with_logging(
(default: None).
:param env: Use specific environment variables (default: None).
:param detach: Whether to detach this process from its parent process (default: False).
:param timeout: Optional time in seconds to wait for the subprocess to finish. If exceeded, the child is killed
and this function returns the exit code from the killed child. ``None`` (the default) waits indefinitely.
:return: The process exit code as an int.
"""
logger = logging.getLogger(__name__)
Expand All @@ -109,7 +112,22 @@ def run_subprocess_with_logging(
stdin=stdin if stdin else None,
preexec_fn=pre_exec,
) as command_line_process:
Comment thread
b-deam marked this conversation as resolved.
stdout, _ = command_line_process.communicate()
try:
stdout, _ = command_line_process.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
command_line_process.kill()
# finish handling pipes and populate the returncode attribute
stdout, _ = command_line_process.communicate()
Comment thread
b-deam marked this conversation as resolved.
Outdated
output = f" Output: [{stdout}]" if stdout else ""
Comment thread
b-deam marked this conversation as resolved.
logger.error(
"Subprocess [%s] exceeded timeout of [%s]s and was terminated with return code [%s].%s",
command_line,
timeout,
str(command_line_process.returncode),
output,
)
Comment thread
b-deam marked this conversation as resolved.
return command_line_process.returncode

if stdout:
logger.log(level=level, msg=stdout)

Expand Down
11 changes: 10 additions & 1 deletion tests/utils/git_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess):
with pytest.raises(exceptions.SystemSetupError) as exc:
git.head_revision("/src")
assert exc.value.args[0] == "Your git version is [1.0.0] but Rally requires at least git 1.9. Please update git."
run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG)
run_subprocess_with_logging.assert_called_with("git -C /src --version", level=logging.DEBUG, timeout=git.GIT_TIMEOUT)

def test_clone_successful(self):
git.clone(self.tmp_clone_dir, remote=self.remote_tmp_src_dir)
Expand All @@ -160,6 +160,15 @@ def test_clone_with_error(self):
git.clone(self.tmp_clone_dir, remote=remote)
assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]"

@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_clone_raises_supply_error_on_timeout(self, run_subprocess_with_logging):
run_subprocess_with_logging.return_value = -9
remote = "https://github.com/elastic/rally-tracks"
with pytest.raises(exceptions.SupplyError) as exc:
git.clone(self.tmp_clone_dir, remote=remote)
assert exc.value.args[0] == f"Could not clone from [{remote}] to [{self.tmp_clone_dir}]"
run_subprocess_with_logging.assert_called_once_with(f"git clone {remote} {self.tmp_clone_dir}", timeout=git.GIT_TIMEOUT)

def test_fetch_successful(self):
git.fetch(self.local_tmp_src_dir, remote=self.local_remote_name)

Expand Down
15 changes: 15 additions & 0 deletions tests/utils/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.

import logging
import os
import signal
from unittest import mock

import psutil
Expand Down Expand Up @@ -207,3 +209,16 @@ def test_run_subprocess():
assert completed_process.returncode != 0
assert completed_process.stdout != ""
assert completed_process.stderr is None


def test_run_subprocess_with_logging_timeout_kills_child(caplog):
cmd = "sleep 5"
timeout = 0
with caplog.at_level(logging.ERROR, logger="esrally.utils.process"):
returncode = process.run_subprocess_with_logging(cmd, timeout=timeout)

assert returncode == -signal.SIGKILL
expected = f"Subprocess [{cmd}] exceeded timeout of [{timeout}]s and was terminated with return code [{-signal.SIGKILL}]."
assert any(
r.levelno == logging.ERROR and r.getMessage().startswith(expected) for r in caplog.records
), f"expected ERROR log starting with {expected!r}, got: {[r.getMessage() for r in caplog.records]}"
Loading