Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 84 additions & 6 deletions src/specify_cli/workflows/steps/gate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

from __future__ import annotations

import re
import sys
from pathlib import Path
from typing import Any

from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus
from specify_cli.workflows.expressions import evaluate_expression

#: C0 control characters except tab — stripped from ``show_file`` content so a
#: file containing ANSI escapes (e.g. ``\x1b[2J``) cannot clear the screen or
#: spoof the prompt/options when its lines are printed to the terminal.
_CONTROL_CHARS = re.compile(r"[\x00-\x08\x0b-\x1f\x7f]")


class GateStep(StepBase):
"""Interactive review gate.
Expand All @@ -23,6 +30,10 @@ class GateStep(StepBase):

type_key = "gate"

#: Maximum number of ``show_file`` lines rendered at the prompt, so a
#: large file cannot flood the terminal before the choice.
MAX_SHOW_FILE_LINES = 200

def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
message = config.get("message", "Review required.")
if isinstance(message, str) and "{{" in message:
Expand All @@ -32,8 +43,14 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
on_reject = config.get("on_reject", "abort")

show_file = config.get("show_file")
if show_file and isinstance(show_file, str) and "{{" in show_file:
if isinstance(show_file, str) and "{{" in show_file:
show_file = evaluate_expression(show_file, context)
# ``evaluate_expression`` can return a non-string for a single
# expression (e.g. a number from a prior step), and a literal
# non-string is also possible; coerce so it is rendered rather
# than silently skipped at the prompt.
if show_file is not None:
show_file = str(show_file)

output = {
"message": message,
Expand All @@ -43,12 +60,16 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
"choice": None,
}

# Non-interactive: pause for later resume
# Non-interactive: pause for later resume (the file is not read here)
if not sys.stdin.isatty():
return StepResult(status=StepStatus.PAUSED, output=output)

# Interactive: prompt the user
choice = self._prompt(message, options)
# Interactive: prompt the user. ``show_file`` contents are folded
# into the displayed message so the operator can review the
# referenced material before choosing. Composing the prompt text
# here keeps ``_prompt`` to its ``(message, options)`` contract, so
# adding review material never widens the interactive seam.
Comment thread
mnriem marked this conversation as resolved.
choice = self._prompt(self._compose_prompt(message, show_file), options)
Comment thread
mnriem marked this conversation as resolved.
output["choice"] = choice

if choice in ("reject", "abort"):
Expand All @@ -67,11 +88,35 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:

return StepResult(status=StepStatus.COMPLETED, output=output)

@classmethod
def _compose_prompt(cls, message: object, show_file: str | None) -> str:
"""Build the gate's display text.

``message`` may be a non-string (e.g. a YAML numeric literal that
``execute`` does not coerce), so it is rendered through ``str``.
When ``show_file`` names a file, its contents (read safely, see
``_read_show_file``) are appended below the message so the operator
can review the referenced material before choosing. Always returns a
``str`` — possibly multi-line — for ``_prompt`` to render in the box.
"""
text = str(message)
if not show_file:
return text
body = "\n".join(
[f"{show_file}:", *(f" {line}" for line in cls._read_show_file(show_file))]
)
return f"{text}\n\n{body}"
Comment thread
mnriem marked this conversation as resolved.

@staticmethod
def _prompt(message: str, options: list[str]) -> str:
"""Display gate message and prompt for a choice."""
"""Display the gate message and prompt for a choice.

``message`` may span multiple lines (e.g. when review material has
been folded in); each line is rendered inside the gate box.
"""
print("\n ┌─ Gate ─────────────────────────────────────")
print(f" │ {message}")
for line in message.split("\n"):
print(f" │ {line}" if line else " │")
Comment thread
mnriem marked this conversation as resolved.
print(" │")
for i, opt in enumerate(options, 1):
print(f" │ [{i}] {opt}")
Expand All @@ -90,6 +135,39 @@ def _prompt(message: str, options: list[str]) -> str:
return next(o for o in options if o.lower() == raw.lower())
print(f" Invalid choice. Enter 1-{len(options)} or an option name.")

@staticmethod
def _read_show_file(show_file: str) -> list[str]:
"""Return the lines of ``show_file`` for display.

Reads at most ``MAX_SHOW_FILE_LINES`` lines so a large file cannot
flood the prompt, and returns a short notice instead of raising
when the file is missing, undecodable, or names an invalid path,
so a misconfigured ``show_file`` never breaks the interactive
prompt. ``ValueError`` covers paths the OS rejects outright (e.g.
an embedded NUL byte), which ``Path.open`` raises before any I/O.

Control characters are stripped from each line so file content
cannot inject ANSI escape sequences into the terminal.
"""
lines: list[str] = []
truncated = False
try:
with Path(show_file).open(encoding="utf-8") as handle:
for line in handle:
if len(lines) >= GateStep.MAX_SHOW_FILE_LINES:
truncated = True
break
lines.append(_CONTROL_CHARS.sub("", line.rstrip("\n")))
except (OSError, UnicodeDecodeError, ValueError) as exc:
return [f"(could not read file: {exc})"]
if not lines and not truncated:
return ["(file is empty)"]
if truncated:
lines.append(
f"… (output truncated at {GateStep.MAX_SHOW_FILE_LINES} lines)"
)
return lines

def validate(self, config: dict[str, Any]) -> list[str]:
errors = super().validate(config)
if "message" not in config:
Expand Down
188 changes: 188 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,40 @@ def test_validate_missing_run(self):
assert any("missing 'run'" in e for e in errors)


class _StubStdin:
"""Stdin stub with a fixed ``isatty`` result.

Swapped in via the gate module's ``sys.stdin`` rather than
``setattr(sys.stdin, "isatty", …)`` because ``TextIOWrapper.isatty``
is not assignable under some runners (e.g. pytest with capture
disabled), and because forcing the value keeps interactive/non-
interactive tests deterministic regardless of how the suite is run.
"""

def __init__(self, tty: bool):
self._tty = tty

def isatty(self) -> bool:
return self._tty


def _force_gate_stdin(monkeypatch, *, tty: bool):
from specify_cli.workflows.steps import gate as gate_module

monkeypatch.setattr(gate_module.sys, "stdin", _StubStdin(tty=tty))
Comment thread
mnriem marked this conversation as resolved.
Outdated


class TestGateStep:
"""Test the gate step type."""

@pytest.fixture(autouse=True)
def _non_tty_stdin_by_default(self, monkeypatch):
# Default every gate test to a non-TTY stdin so none can drop into
# the interactive prompt and block on input() when the suite runs
# with a real TTY. Interactive tests opt back in with
# _force_gate_stdin(monkeypatch, tty=True).
_force_gate_stdin(monkeypatch, tty=False)

def test_execute_returns_paused(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus
Expand Down Expand Up @@ -822,6 +853,163 @@ def test_validate_invalid_on_reject(self):
})
assert any("on_reject" in e for e in errors)

def test_interactive_prompt_renders_show_file(self, tmp_path, monkeypatch, capsys):
Comment thread
mnriem marked this conversation as resolved.
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("LINE-ONE\nLINE-TWO\n", encoding="utf-8")

_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {
"id": "review",
"message": "Review the spec.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "LINE-ONE" in out and "LINE-TWO" in out
assert str(review) in out
assert result.status == StepStatus.COMPLETED
assert result.output["choice"] == "approve"

def test_interactive_prompt_missing_show_file_does_not_crash(
self, tmp_path, monkeypatch, capsys
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

missing = tmp_path / "does-not-exist.md"

_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(missing),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "could not read file" in out
assert result.status == StepStatus.COMPLETED

def test_non_interactive_show_file_still_pauses_without_reading(
self, tmp_path, monkeypatch
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("CONTENT\n", encoding="utf-8")

# stdin defaults to non-TTY via the autouse fixture.
# The non-interactive path must not read the file; hard-fail if it does.
monkeypatch.setattr(
GateStep,
"_read_show_file",
staticmethod(
lambda _p: (_ for _ in ()).throw(
AssertionError("show_file read on the non-interactive path")
)
),
)

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == str(review)

Comment thread
mnriem marked this conversation as resolved.
def test_read_show_file_empty(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

empty = tmp_path / "empty.md"
empty.write_text("", encoding="utf-8")
assert GateStep._read_show_file(str(empty)) == ["(file is empty)"]

def test_read_show_file_truncates_large_file(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

big = tmp_path / "big.md"
big.write_text(
"\n".join(f"line{i}" for i in range(GateStep.MAX_SHOW_FILE_LINES + 50)),
encoding="utf-8",
)
rendered = GateStep._read_show_file(str(big))
# MAX_SHOW_FILE_LINES content lines + one truncation notice line.
assert len(rendered) == GateStep.MAX_SHOW_FILE_LINES + 1
assert "truncated" in rendered[-1]

def test_read_show_file_invalid_path_does_not_raise(self):
from specify_cli.workflows.steps.gate import GateStep

# An embedded NUL byte makes the OS reject the path with ValueError
# before any I/O; it must degrade to a notice, not crash the prompt.
rendered = GateStep._read_show_file("bad\x00path.md")
assert len(rendered) == 1
assert rendered[0].startswith("(could not read file:")

def test_read_show_file_strips_control_chars(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

# A file with ANSI/control bytes must not inject escapes into the
# terminal; ESC and other C0 controls are stripped, tab is kept.
f = tmp_path / "ansi.md"
f.write_text("a\x1b[2Jb\tc\x07d\n", encoding="utf-8")
rendered = GateStep._read_show_file(str(f))
assert rendered == ["a[2Jb\tcd"]
assert "\x1b" not in rendered[0] and "\x07" not in rendered[0]

def test_interactive_non_string_message_renders(self, monkeypatch, capsys):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

# A YAML numeric literal reaches the prompt as a non-string; it must
# render rather than crash on the multi-line split.
_force_gate_stdin(monkeypatch, tty=True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")

step = GateStep()
config = {"id": "review", "message": 123, "options": ["approve", "reject"]}
result = step.execute(config, StepContext())
out = capsys.readouterr().out
assert "123" in out
assert result.status == StepStatus.COMPLETED

def test_templated_show_file_resolving_to_non_string_is_coerced(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

# A single-expression template can resolve to a non-string (e.g. a
# number from a prior step); it must be coerced to str, not skipped.
# stdin defaults to non-TTY via the autouse fixture, so the path
# stays non-interactive (-> PAUSED) and cannot block on input.
step = GateStep()
ctx = StepContext(steps={"prev": {"output": {"ref": 123}}})
config = {
"id": "review",
"message": "Review.",
"show_file": "{{ steps.prev.output.ref }}",
"options": ["approve", "reject"],
}
result = step.execute(config, ctx) # non-interactive -> PAUSED
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == "123"
Comment thread
mnriem marked this conversation as resolved.


class TestIfThenStep:
"""Test the if/then/else step type."""
Expand Down