Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions src/specify_cli/workflows/steps/gate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import sys
from pathlib import Path
from typing import Any

from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus
Expand All @@ -23,6 +24,10 @@ class GateStep(StepBase):

type_key = "gate"

#: Maximum number of ``show_file`` lines rendered at the prompt, so a
#: large file cannot flood the terminal before the choice.
MAX_SHOW_FILE_LINES = 200

def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
message = config.get("message", "Review required.")
if isinstance(message, str) and "{{" in message:
Expand All @@ -32,8 +37,14 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
on_reject = config.get("on_reject", "abort")

show_file = config.get("show_file")
if show_file and isinstance(show_file, str) and "{{" in show_file:
if isinstance(show_file, str) and "{{" in show_file:
show_file = evaluate_expression(show_file, context)
# ``evaluate_expression`` can return a non-string for a single
# expression (e.g. a number from a prior step), and a literal
# non-string is also possible; coerce so it is rendered rather
# than silently skipped at the prompt.
if show_file is not None:
show_file = str(show_file)

output = {
"message": message,
Expand All @@ -48,7 +59,7 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
return StepResult(status=StepStatus.PAUSED, output=output)

# Interactive: prompt the user
choice = self._prompt(message, options)
choice = self._prompt(message, options, show_file)
output["choice"] = choice

if choice in ("reject", "abort"):
Expand All @@ -68,10 +79,20 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
return StepResult(status=StepStatus.COMPLETED, output=output)

@staticmethod
def _prompt(message: str, options: list[str]) -> str:
"""Display gate message and prompt for a choice."""
def _prompt(message: str, options: list[str], show_file: str | None = None) -> str:
"""Display gate message and prompt for a choice.

When ``show_file`` names a readable file, its contents are shown
before the options so the operator can review the material the
gate refers to.
"""
print("\n ┌─ Gate ─────────────────────────────────────")
print(f" │ {message}")
if show_file:
print(" │")
print(f" │ {show_file}:")
for line in GateStep._read_show_file(show_file):
print(f" │ {line}")
print(" │")
for i, opt in enumerate(options, 1):
print(f" │ [{i}] {opt}")
Expand All @@ -90,6 +111,34 @@ def _prompt(message: str, options: list[str]) -> str:
return next(o for o in options if o.lower() == raw.lower())
print(f" Invalid choice. Enter 1-{len(options)} or an option name.")

@staticmethod
def _read_show_file(show_file: str) -> list[str]:
"""Return the lines of ``show_file`` for display.

Reads at most ``MAX_SHOW_FILE_LINES`` lines so a large file cannot
flood the prompt, and returns a short notice instead of raising
when the file is missing or cannot be decoded, so a misconfigured
path never breaks the interactive prompt.
"""
lines: list[str] = []
truncated = False
try:
with Path(show_file).open(encoding="utf-8") as handle:
for line in handle:
if len(lines) >= GateStep.MAX_SHOW_FILE_LINES:
truncated = True
break
lines.append(line.rstrip("\n"))
except (OSError, UnicodeDecodeError) as exc:
return [f"(could not read file: {exc})"]
Comment thread
mnriem marked this conversation as resolved.
Outdated
if not lines and not truncated:
return ["(file is empty)"]
if truncated:
lines.append(
f"… (output truncated at {GateStep.MAX_SHOW_FILE_LINES} lines)"
)
return lines

def validate(self, config: dict[str, Any]) -> list[str]:
errors = super().validate(config)
if "message" not in config:
Expand Down
109 changes: 109 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,115 @@ def test_validate_invalid_on_reject(self):
})
assert any("on_reject" in e for e in errors)

def test_interactive_prompt_renders_show_file(self, tmp_path, monkeypatch, capsys):
Comment thread
mnriem marked this conversation as resolved.
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("LINE-ONE\nLINE-TWO\n", encoding="utf-8")

monkeypatch.setattr("sys.stdin.isatty", lambda: True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")
Comment thread
mnriem marked this conversation as resolved.
Outdated

step = GateStep()
config = {
"id": "review",
"message": "Review the spec.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "LINE-ONE" in out and "LINE-TWO" in out
assert str(review) in out
assert result.status == StepStatus.COMPLETED
assert result.output["choice"] == "approve"

def test_interactive_prompt_missing_show_file_does_not_crash(
self, tmp_path, monkeypatch, capsys
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

missing = tmp_path / "does-not-exist.md"

monkeypatch.setattr("sys.stdin.isatty", lambda: True)
monkeypatch.setattr("builtins.input", lambda _prompt="": "1")
Comment thread
mnriem marked this conversation as resolved.
Outdated

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(missing),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
out = capsys.readouterr().out

assert "could not read file" in out
assert result.status == StepStatus.COMPLETED

def test_non_interactive_show_file_still_pauses_without_reading(
self, tmp_path, monkeypatch
):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

review = tmp_path / "spec.md"
review.write_text("CONTENT\n", encoding="utf-8")

monkeypatch.setattr("sys.stdin.isatty", lambda: False)

step = GateStep()
config = {
"id": "review",
"message": "Review.",
"show_file": str(review),
"options": ["approve", "reject"],
}
result = step.execute(config, StepContext())
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == str(review)

Comment thread
mnriem marked this conversation as resolved.
def test_read_show_file_empty(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

empty = tmp_path / "empty.md"
empty.write_text("", encoding="utf-8")
assert GateStep._read_show_file(str(empty)) == ["(file is empty)"]

def test_read_show_file_truncates_large_file(self, tmp_path):
from specify_cli.workflows.steps.gate import GateStep

big = tmp_path / "big.md"
big.write_text(
"\n".join(f"line{i}" for i in range(GateStep.MAX_SHOW_FILE_LINES + 50)),
encoding="utf-8",
)
rendered = GateStep._read_show_file(str(big))
# MAX_SHOW_FILE_LINES content lines + one truncation notice line.
assert len(rendered) == GateStep.MAX_SHOW_FILE_LINES + 1
assert "truncated" in rendered[-1]

def test_templated_show_file_resolving_to_non_string_is_coerced(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus

# A single-expression template can resolve to a non-string (e.g. a
# number from a prior step); it must be coerced to str, not skipped.
step = GateStep()
ctx = StepContext(steps={"prev": {"output": {"ref": 123}}})
config = {
"id": "review",
"message": "Review.",
"show_file": "{{ steps.prev.output.ref }}",
"options": ["approve", "reject"],
}
result = step.execute(config, ctx) # non-interactive -> PAUSED
assert result.status == StepStatus.PAUSED
assert result.output["show_file"] == "123"
Comment thread
mnriem marked this conversation as resolved.


class TestIfThenStep:
"""Test the if/then/else step type."""
Expand Down
Loading