diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index f985da83cd..b012155caa 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -863,6 +863,13 @@ def _execute_single_native_tool_call( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) + from crewai.tools.file_artifact import ( + artifact_scope_id, + resolve_artifact_handles, + store_if_artifact, + ) + + scope_id = artifact_scope_id(self.crew, self.task, self.agent) args_dict, parse_error = parse_tool_call_args( func_args, func_name, call_id, original_tool @@ -896,6 +903,7 @@ def _execute_single_native_tool_call( tool=func_name, input=input_str ) if cached_result is not None: + cached_result = store_if_artifact(cached_result, scope_id) result = ( str(cached_result) if not isinstance(cached_result, str) @@ -960,7 +968,8 @@ def _execute_single_native_tool_call( result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." elif not from_cache and func_name in available_functions: try: - raw_result = available_functions[func_name](**(args_dict or {})) + invoke_args = resolve_artifact_handles(args_dict) if args_dict else {} + raw_result = available_functions[func_name](**invoke_args) if self.tools_handler and self.tools_handler.cache: should_cache = True @@ -977,6 +986,7 @@ def _execute_single_native_tool_call( tool=func_name, input=input_str, output=raw_result ) + raw_result = store_if_artifact(raw_result, scope_id) result = ( str(raw_result) if not isinstance(raw_result, str) else raw_result ) @@ -1020,6 +1030,10 @@ def _execute_single_native_tool_call( color="red", ) + # An after_tool_call hook may have replaced the result with a + # FileArtifact; keep those bytes out of the message and events too. + result = store_if_artifact(result, scope_id) + if not error_event_emitted: crewai_event_bus.emit( self, diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index b2cebd3ed2..874bf3f646 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -116,6 +116,7 @@ def get_supported_content_types(provider: str, api: str | None = None) -> list[s from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.agent_tools.read_file_tool import ReadFileTool from crewai.tools.base_tool import BaseTool +from crewai.tools.file_artifact import clear_artifact_scope from crewai.types.callback import SerializableCallable from crewai.types.streaming import CrewStreamingOutput from crewai.types.usage_metrics import UsageMetrics @@ -1047,6 +1048,7 @@ def run_crew() -> None: if self._memory is not None and hasattr(self._memory, "drain_writes"): self._memory.drain_writes() clear_files(self.id) + clear_artifact_scope(self.id) detach(token) def _post_kickoff(self, result: CrewOutput) -> CrewOutput: @@ -1255,6 +1257,7 @@ async def run_crew() -> None: raise finally: clear_files(self.id) + clear_artifact_scope(self.id) detach(token) async def akickoff_for_each( diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 3cc9cdd7be..d3d266bec6 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -70,6 +70,11 @@ BeforeLLMCallHookType, ) from crewai.tools.base_tool import BaseTool +from crewai.tools.file_artifact import ( + artifact_scope_id, + resolve_artifact_handles, + store_if_artifact, +) from crewai.tools.structured_tool import CrewStructuredTool from crewai.utilities.agent_utils import ( _llm_stop_words_applied, @@ -1762,6 +1767,8 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]: return parse_error args_dict: dict[str, Any] = parsed_args or {} + scope_id = artifact_scope_id(self.crew, self.task, self.agent) + # Get agent_key for event tracking agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown" @@ -1794,6 +1801,7 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]: tool=func_name, input=input_str ) if cached_result is not None: + cached_result = store_if_artifact(cached_result, scope_id) result = ( str(cached_result) if not isinstance(cached_result, str) @@ -1859,7 +1867,10 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]: if func_name in self._available_functions: try: tool_func = self._available_functions[func_name] - raw_result = tool_func(**args_dict) + invoke_args = ( + resolve_artifact_handles(args_dict) if args_dict else {} + ) + raw_result = tool_func(**invoke_args) # Add to cache after successful execution (before string conversion) if self.tools_handler and self.tools_handler.cache: @@ -1874,6 +1885,7 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]: ) # Convert to string for message + raw_result = store_if_artifact(raw_result, scope_id) result = ( str(raw_result) if not isinstance(raw_result, str) @@ -1927,6 +1939,10 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]: color="red", ) + # An after_tool_call hook may have replaced the result with a + # FileArtifact; keep those bytes out of the message and events too. + result = store_if_artifact(result, scope_id) + if not error_event_emitted: crewai_event_bus.emit( self, diff --git a/lib/crewai/src/crewai/tools/__init__.py b/lib/crewai/src/crewai/tools/__init__.py index a2415b1b2a..1001346f63 100644 --- a/lib/crewai/src/crewai/tools/__init__.py +++ b/lib/crewai/src/crewai/tools/__init__.py @@ -1,8 +1,10 @@ from crewai.tools.base_tool import BaseTool, EnvVar, tool +from crewai.tools.file_artifact import FileArtifact __all__ = [ "BaseTool", "EnvVar", + "FileArtifact", "tool", ] diff --git a/lib/crewai/src/crewai/tools/file_artifact.py b/lib/crewai/src/crewai/tools/file_artifact.py new file mode 100644 index 0000000000..8a91b65f0a --- /dev/null +++ b/lib/crewai/src/crewai/tools/file_artifact.py @@ -0,0 +1,296 @@ +"""Out-of-band binary file passing between tools. + +LLMs cannot reproduce opaque strings longer than a few kilobytes byte-perfect. +A base64-encoded binary file (PPTX, PDF, image, ...) returned by one tool and +echoed by the model as the argument to another tool drifts by a few characters, +which invalidates the base64 and corrupts the resulting file. + +To avoid routing bytes through the model, a tool returns a :class:`FileArtifact` +instead of a base64 string. The agent executor stores the bytes here and shows +the model a short, stable ``crewai+file://`` handle in place of the data. +When the model passes that handle as an argument to a later tool, the executor +expands it back to base64 *just before* the tool runs -- the bytes never enter +the model's context, so they cannot be corrupted. + +The handle is namespaced (``crewai+file://``) so resolution only ever fires on +tokens this module minted, never on arbitrary user data. Stored bytes are scoped +to a crew/task execution id and cleared when that execution finishes; a TTL prune +is the safety net for runs that never call :func:`clear_artifact_scope`. + +Limitation: handles are ephemeral and scoped to a single run. A handle only +resolves while its run's artifacts are live. If a placeholder's text is persisted +(conversation memory, a checkpoint) and a *later* run echoes that handle, it will +no longer resolve and the literal token is passed through unchanged -- so binary +producer->consumer chains must complete within one run. +""" + +from __future__ import annotations + +import base64 +from dataclasses import dataclass +import re +import threading +import time +from typing import Any, Final +from uuid import uuid4 + + +__all__ = [ + "FileArtifact", + "artifact_scope_id", + "clear_artifact_scope", + "resolve_artifact_handles", + "store_artifact", + "store_if_artifact", +] + +_HANDLE_SCHEME: Final[str] = "crewai+file" +# A minted handle: crewai+file://. Matched case-insensitively because +# uuid hex may arrive upper- or lower-cased after a model round-trip. +_HANDLE_RE: Final[re.Pattern[str]] = re.compile( + r"crewai\+file://([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-" + r"[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" +) + +DEFAULT_ARTIFACT_TTL: Final[int] = 3600 + + +@dataclass +class FileArtifact: + """Binary file produced or consumed by a tool, kept out of the LLM context. + + Return this from a tool's ``_run`` instead of a base64 string when the output + is binary. The executor stores the bytes and substitutes a short handle in the + text the model sees, so the model never has to reproduce the data verbatim. + + Attributes: + data: Raw file bytes. + filename: Human-readable name, surfaced to the model and useful as a + default for downstream ``file_name`` arguments. + mime_type: MIME type of the content. + """ + + data: bytes + filename: str = "file" + mime_type: str = "application/octet-stream" + + @property + def size_bytes(self) -> int: + return len(self.data) + + def as_base64(self) -> str: + """Return the bytes as an ASCII base64 string (what connectors expect).""" + return base64.b64encode(self.data).decode("ascii") + + def _placeholder(self, handle: str) -> str: + """Build the model-facing text that stands in for the bytes.""" + # Neutralize characters that would break the single-line bracketed + # attribute list (quotes, the closing bracket, newlines). + filename = _sanitize_attr(self.filename) + mime_type = _sanitize_attr(self.mime_type) + return ( + f'[FileArtifact filename="{filename}" ' + f'mime_type="{mime_type}" size={_human_size(self.size_bytes)} ' + f"handle={handle}]\n" + "The binary content is stored out-of-band to keep it from being " + "corrupted in transit. To use this file, pass the handle string " + f"({handle}) as the value of the content/file argument when calling " + "another tool -- it is expanded to the real data before that tool runs." + ) + + +@dataclass +class _Entry: + artifact: FileArtifact + scope_id: str | None + expires_at: float | None + obj_id: int + + +class _ArtifactStore: + """Process-local, execution-scoped store keyed by minted handle id. + + Entries are keyed by an opaque uuid (never by user-supplied content), so + concurrent crews cannot collide. Cleanup is per-scope -- clearing one crew's + artifacts never touches another's -- with a TTL prune as a backstop. + + Storing the same :class:`FileArtifact` instance again under the same scope + reuses its handle rather than minting a duplicate. The tool-result cache + hands back the same object on every cache hit, so this keeps repeated cached + calls from stacking identical byte copies in memory. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._entries: dict[str, _Entry] = {} + # (id(artifact), scope) -> handle, so re-storing the same instance under + # the same scope reuses its handle. Keying on the scope too means storing + # an object under a different scope gets its own handle and its own + # cleanup entry rather than overwriting the first. + self._handle_by_obj: dict[tuple[int, str | None], str] = {} + + def store( + self, + artifact: FileArtifact, + scope_id: str | None = None, + ttl: int = DEFAULT_ARTIFACT_TTL, + ) -> str: + norm_scope = str(scope_id) if scope_id is not None else None + obj_key = (id(artifact), norm_scope) + expires_at = (time.monotonic() + ttl) if ttl > 0 else None + with self._lock: + self._prune_locked() + existing = self._handle_by_obj.get(obj_key) + if existing is not None: + entry = self._entries.get(existing) + if entry is not None and entry.artifact is artifact: + entry.expires_at = expires_at + return f"{_HANDLE_SCHEME}://{existing}" + handle_id = str(uuid4()) + self._entries[handle_id] = _Entry( + artifact=artifact, + scope_id=norm_scope, + expires_at=expires_at, + obj_id=id(artifact), + ) + self._handle_by_obj[obj_key] = handle_id + return f"{_HANDLE_SCHEME}://{handle_id}" + + def resolve(self, handle_id: str) -> FileArtifact | None: + with self._lock: + entry = self._entries.get(handle_id) + if entry is None: + return None + if entry.expires_at is not None and entry.expires_at <= time.monotonic(): + self._delete_locked(handle_id) + return None + return entry.artifact + + def clear_scope(self, scope_id: str) -> None: + scope = str(scope_id) + with self._lock: + for handle_id in [ + hid for hid, entry in self._entries.items() if entry.scope_id == scope + ]: + self._delete_locked(handle_id) + + def _prune_locked(self) -> None: + """Drop entries whose per-entry TTL has elapsed. Caller holds the lock.""" + now = time.monotonic() + for handle_id in [ + hid + for hid, entry in self._entries.items() + if entry.expires_at is not None and entry.expires_at <= now + ]: + self._delete_locked(handle_id) + + def _delete_locked(self, handle_id: str) -> None: + """Remove an entry and its object-identity mapping. Caller holds lock.""" + entry = self._entries.pop(handle_id, None) + if entry is not None: + self._handle_by_obj.pop((entry.obj_id, entry.scope_id), None) + + +_store: Final[_ArtifactStore] = _ArtifactStore() + + +def store_artifact( + artifact: FileArtifact, + scope_id: Any | None = None, + ttl: int = DEFAULT_ARTIFACT_TTL, +) -> str: + """Store a :class:`FileArtifact` and return its model-facing placeholder text. + + Args: + artifact: The binary artifact to keep out of the model context. + scope_id: Execution id (crew or task) used to group the artifact for + cleanup. ``None`` means it is only reclaimed by the TTL prune. + ttl: Seconds after which an unreferenced artifact may be pruned. + + Returns: + The placeholder string to surface to the model in place of the bytes. + """ + handle = _store.store(artifact, scope_id=scope_id, ttl=ttl) + return artifact._placeholder(handle) + + +def resolve_artifact_handles(value: Any) -> Any: + """Recursively replace stored handles in tool arguments with base64 data. + + Walks strings, dicts, and lists. Any ``crewai+file://`` token that + resolves to a stored artifact is replaced with that artifact's base64 string; + unknown tokens and all other values are returned unchanged. A new container is + returned so the caller's original arguments (used for events, caching, and + logs) keep the short handle. + """ + if isinstance(value, str): + if _HANDLE_SCHEME not in value: + return value + + def _sub(match: re.Match[str]) -> str: + # Store keys are lowercase uuid4 strings; the regex matches hex + # case-insensitively, so normalize before lookup in case the model + # echoed the handle with uppercase hex. + artifact = _store.resolve(match.group(1).lower()) + return artifact.as_base64() if artifact is not None else match.group(0) + + return _HANDLE_RE.sub(_sub, value) + if isinstance(value, dict): + return {key: resolve_artifact_handles(val) for key, val in value.items()} + if isinstance(value, list): + return [resolve_artifact_handles(item) for item in value] + return value + + +def store_if_artifact(result: Any, scope_id: Any | None = None) -> Any: + """Store ``result`` and return its placeholder if it is a :class:`FileArtifact`. + + Any other value is returned unchanged. This is the single funnel both the + native and ReAct executor paths route tool output through, so fresh and + cached results are handled identically. + """ + if isinstance(result, FileArtifact): + return store_artifact(result, scope_id=scope_id) + return result + + +def clear_artifact_scope(scope_id: Any) -> None: + """Drop every artifact stored under ``scope_id`` (called when a run ends).""" + _store.clear_scope(scope_id) + + +def artifact_scope_id( + crew: Any | None = None, + task: Any | None = None, + agent: Any | None = None, +) -> Any | None: + """Pick the execution id used to scope a tool's file artifacts for cleanup. + + Prefer the crew id -- it matches the id ``Crew`` passes to + :func:`clear_artifact_scope` when a run ends -- falling back to the agent's + crew, then the task id, then ``None`` (TTL-only cleanup). Centralized, and + given the agent fallback, so every tool-execution path derives the scope the + same way and can't drift. + """ + if crew is None: + crew = getattr(agent, "crew", None) + crew_id = getattr(crew, "id", None) + if crew_id is not None: + return crew_id + return getattr(task, "id", None) + + +def _sanitize_attr(text: str) -> str: + """Strip characters that would break the bracketed placeholder display.""" + return ( + text.replace('"', "'").replace("]", ")").replace("\n", " ").replace("\r", " ") + ) + + +def _human_size(size_bytes: int) -> str: + size = float(size_bytes) + for unit in ("B", "KB", "MB", "GB", "TB", "PB"): + if size < 1024 or unit == "PB": + return f"{int(size)} {unit}" if unit == "B" else f"{size:.1f} {unit}" + size /= 1024 + return f"{size:.1f} PB" diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index b349218396..04f287a198 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -22,6 +22,7 @@ ToolValidateInputErrorEvent, ) from crewai.telemetry.telemetry import Telemetry +from crewai.tools.file_artifact import artifact_scope_id, resolve_artifact_handles from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling from crewai.utilities.agent_utils import ( @@ -327,12 +328,14 @@ async def _ause( if k in acceptable_args } result = await tool.ainvoke( - input=arguments, config=fingerprint_config + input=resolve_artifact_handles(arguments), + config=fingerprint_config, ) except Exception: arguments = calling.arguments result = await tool.ainvoke( - input=arguments, config=fingerprint_config + input=resolve_artifact_handles(arguments), + config=fingerprint_config, ) else: result = await tool.ainvoke(input={}, config=fingerprint_config) @@ -558,12 +561,14 @@ def _use( if k in acceptable_args } result = tool.invoke( - input=arguments, config=fingerprint_config + input=resolve_artifact_handles(arguments), + config=fingerprint_config, ) except Exception: arguments = calling.arguments result = tool.invoke( - input=arguments, config=fingerprint_config + input=resolve_artifact_handles(arguments), + config=fingerprint_config, ) else: result = tool.invoke(input={}, config=fingerprint_config) @@ -679,9 +684,17 @@ def _use( return result + @property + def _artifact_scope_id(self) -> Any | None: + """Execution id used to scope out-of-band file artifacts for cleanup.""" + return artifact_scope_id(task=self.task, agent=self.agent) + def _format_result(self, result: Any) -> str: + from crewai.tools.file_artifact import store_if_artifact + if self.task: self.task.used_tools += 1 + result = store_if_artifact(result, self._artifact_scope_id) if self._should_remember_format(): result = self._remember_format(result=result) return str(result) diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 399d749542..95b92080d8 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -27,6 +27,11 @@ from crewai.llms.base_llm import BaseLLM, call_stop_override from crewai.tools import BaseTool as CrewAITool from crewai.tools.base_tool import BaseTool +from crewai.tools.file_artifact import ( + artifact_scope_id, + resolve_artifact_handles, + store_if_artifact, +) from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_types import ToolResult from crewai.utilities.errors import AgentRepositoryError @@ -1416,6 +1421,7 @@ def execute_single_native_tool_call( args_dict = func_args agent_key = getattr(agent, "key", "unknown") if agent else "unknown" + scope_id = artifact_scope_id(crew, task, agent) original_tool: BaseTool | None = None for tool in original_tools: @@ -1430,6 +1436,7 @@ def execute_single_native_tool_call( if tools_handler and tools_handler.cache: cached_result = tools_handler.cache.read(tool=func_name, input=input_str) if cached_result is not None: + cached_result = store_if_artifact(cached_result, scope_id) result = ( str(cached_result) if not isinstance(cached_result, str) @@ -1481,7 +1488,8 @@ def execute_single_native_tool_call( if func_name in available_functions: try: tool_func = available_functions[func_name] - raw_result = tool_func(**args_dict) + invoke_args = resolve_artifact_handles(args_dict) if args_dict else {} + raw_result = tool_func(**invoke_args) if tools_handler and tools_handler.cache: should_cache = True @@ -1494,6 +1502,7 @@ def execute_single_native_tool_call( tool=func_name, input=input_str, output=raw_result ) + raw_result = store_if_artifact(raw_result, scope_id) result = ( str(raw_result) if not isinstance(raw_result, str) else raw_result ) @@ -1532,6 +1541,10 @@ def execute_single_native_tool_call( except Exception: # noqa: S110 pass + # An after_tool_call hook may have replaced the result with a FileArtifact; + # keep those bytes out of the message and events too. + result = store_if_artifact(result, scope_id) + if not error_event_emitted: crewai_event_bus.emit( event_source, diff --git a/lib/crewai/tests/tools/test_file_artifact.py b/lib/crewai/tests/tools/test_file_artifact.py new file mode 100644 index 0000000000..0485e2d2bd --- /dev/null +++ b/lib/crewai/tests/tools/test_file_artifact.py @@ -0,0 +1,397 @@ +"""Tests for out-of-band binary file passing between tools.""" + +from __future__ import annotations + +import base64 +import re +import time + +import pytest + +from crewai.tools import FileArtifact +from crewai.tools.file_artifact import ( + _store, + artifact_scope_id, + clear_artifact_scope, + resolve_artifact_handles, + store_artifact, + store_if_artifact, +) + + +_HANDLE = re.compile(r"crewai\+file://[0-9a-fA-F-]{36}") + + +@pytest.fixture(autouse=True) +def _clear_store(): + """Keep the process-local store empty between tests.""" + _store._entries.clear() + _store._handle_by_obj.clear() + yield + _store._entries.clear() + _store._handle_by_obj.clear() + + +def _handle_in(text: str) -> str: + match = _HANDLE.search(text) + assert match is not None, f"no handle in: {text!r}" + return match.group(0) + + +class TestFileArtifact: + def test_as_base64_round_trips(self) -> None: + data = bytes(range(256)) + artifact = FileArtifact(data=data, filename="x.bin") + assert base64.b64decode(artifact.as_base64()) == data + + def test_size_bytes(self) -> None: + assert FileArtifact(data=b"abc").size_bytes == 3 + + def test_defaults(self) -> None: + artifact = FileArtifact(data=b"") + assert artifact.filename == "file" + assert artifact.mime_type == "application/octet-stream" + + +class TestStoreArtifact: + def test_placeholder_contains_metadata_and_handle(self) -> None: + artifact = FileArtifact( + data=b"\x00" * 30045, filename="deck.pptx", mime_type="application/pptx" + ) + placeholder = store_artifact(artifact, scope_id="crew-1") + assert 'filename="deck.pptx"' in placeholder + assert 'mime_type="application/pptx"' in placeholder + assert "29.3 KB" in placeholder + assert _HANDLE.search(placeholder) is not None + + def test_each_store_gets_a_unique_handle(self) -> None: + h1 = _handle_in(store_artifact(FileArtifact(data=b"a"))) + h2 = _handle_in(store_artifact(FileArtifact(data=b"a"))) + assert h1 != h2 + + def test_restoring_same_instance_reuses_handle(self) -> None: + # The tool-result cache hands back the same FileArtifact on every cache + # hit; re-storing it must reuse the handle, not stack duplicate copies. + artifact = FileArtifact(data=b"payload" * 1000) + h1 = _handle_in(store_artifact(artifact, scope_id="s")) + h2 = _handle_in(store_artifact(artifact, scope_id="s")) + assert h1 == h2 + assert len(_store._entries) == 1 + + def test_same_instance_different_scope_gets_own_handle_and_cleans_up(self) -> None: + # Storing one instance under two scopes must not orphan a mapping: + # each scope keeps its own handle, and clearing one leaves the other. + artifact = FileArtifact(data=b"x" * 100) + h_a = _handle_in(store_artifact(artifact, scope_id="A")) + h_b = _handle_in(store_artifact(artifact, scope_id="B")) + assert h_a != h_b + clear_artifact_scope("A") + assert resolve_artifact_handles(h_a) == h_a # A cleared + assert base64.b64decode(resolve_artifact_handles(h_b)) == b"x" * 100 + # No dangling object-identity mapping for the cleared scope. + assert (id(artifact), "A") not in _store._handle_by_obj + clear_artifact_scope("B") + assert _store._handle_by_obj == {} + + def test_placeholder_escapes_quotes_in_metadata(self) -> None: + artifact = FileArtifact(data=b"x", filename='a".pptx', mime_type='m"/x') + placeholder = store_artifact(artifact) + # The bracketed attribute list must not be broken by an embedded quote, + # and the handle must still be recoverable. + assert 'filename="a\'.pptx"' in placeholder + assert _HANDLE.search(placeholder) is not None + + def test_placeholder_neutralizes_bracket_and_newlines(self) -> None: + artifact = FileArtifact(data=b"x", filename="a]b\nc.bin") + placeholder = store_artifact(artifact) + first_line = placeholder.splitlines()[0] + # The closing bracket and newline can't appear inside the attributes, + # so the bracketed segment stays a single, well-formed line. + assert first_line.count("]") == 1 and first_line.endswith("]") + assert _HANDLE.search(placeholder) is not None + + +class TestArtifactScopeId: + class _Obj: + def __init__(self, id_): + self.id = id_ + + def test_prefers_crew_id(self) -> None: + assert artifact_scope_id(self._Obj("crew"), self._Obj("task")) == "crew" + + def test_falls_back_to_task_when_no_crew(self) -> None: + assert artifact_scope_id(None, self._Obj("task")) == "task" + + def test_falls_back_to_task_when_crew_id_is_none(self) -> None: + assert artifact_scope_id(self._Obj(None), self._Obj("task")) == "task" + + def test_none_when_neither_present(self) -> None: + assert artifact_scope_id(None, None) is None + + def test_falls_back_to_agent_crew(self) -> None: + # Native executors may have crew=None while the agent carries the crew; + # the helper must still resolve the crew id so cleanup scopes align. + agent = self._Obj(None) + agent.crew = self._Obj("crew-from-agent") + assert artifact_scope_id(None, self._Obj("task"), agent) == "crew-from-agent" + + def test_explicit_crew_beats_agent_crew(self) -> None: + agent = self._Obj(None) + agent.crew = self._Obj("agent-crew") + assert artifact_scope_id(self._Obj("direct-crew"), None, agent) == "direct-crew" + + +class TestResolveArtifactHandles: + def test_exact_handle_resolves_to_base64(self) -> None: + data = bytes(range(256)) * 100 + handle = _handle_in(store_artifact(FileArtifact(data=data))) + resolved = resolve_artifact_handles(handle) + assert base64.b64decode(resolved) == data + + def test_resolves_handle_with_uppercased_hex(self) -> None: + # A model may echo the handle with uppercase uuid hex; lookup must still + # hit the lowercase-keyed store. + data = b"upper-case-payload" * 100 + handle = _handle_in(store_artifact(FileArtifact(data=data))) + scheme, _, hex_part = handle.rpartition("/") + upper = f"{scheme}/{hex_part.upper()}" + assert upper != handle + assert base64.b64decode(resolve_artifact_handles(upper)) == data + + def test_resolves_handle_inside_dict(self) -> None: + data = b"binary-payload" * 1000 + handle = _handle_in(store_artifact(FileArtifact(data=data))) + args = {"file_name": "a.bin", "content": handle} + resolved = resolve_artifact_handles(args) + assert base64.b64decode(resolved["content"]) == data + assert resolved["file_name"] == "a.bin" + + def test_resolves_handle_nested_in_list_and_dict(self) -> None: + handle = _handle_in(store_artifact(FileArtifact(data=b"xyz"))) + resolved = resolve_artifact_handles({"items": [{"c": handle}]}) + assert base64.b64decode(resolved["items"][0]["c"]) == b"xyz" + + def test_does_not_mutate_original_arguments(self) -> None: + handle = _handle_in(store_artifact(FileArtifact(data=b"data"))) + args = {"content": handle} + resolve_artifact_handles(args) + assert args["content"] == handle + + def test_unknown_handle_is_left_unchanged(self) -> None: + token = "crewai+file://00000000-0000-0000-0000-000000000000" + assert resolve_artifact_handles(token) == token + + def test_non_handle_strings_pass_through(self) -> None: + assert resolve_artifact_handles("just text") == "just text" + assert resolve_artifact_handles({"k": "v"}) == {"k": "v"} + + def test_non_string_values_pass_through(self) -> None: + assert resolve_artifact_handles(42) == 42 + assert resolve_artifact_handles(None) is None + assert resolve_artifact_handles([1, 2]) == [1, 2] + + +class TestStoreIfArtifact: + def test_artifact_becomes_placeholder(self) -> None: + result = store_if_artifact(FileArtifact(data=b"a" * 100), scope_id="s") + assert isinstance(result, str) + assert _HANDLE.search(result) is not None + + def test_other_values_unchanged(self) -> None: + assert store_if_artifact("hello") == "hello" + assert store_if_artifact(7) == 7 + + +class TestScoping: + def test_clear_scope_only_drops_its_own_artifacts(self) -> None: + h_a = _handle_in(store_artifact(FileArtifact(data=b"a"), scope_id="A")) + h_b = _handle_in(store_artifact(FileArtifact(data=b"b"), scope_id="B")) + + clear_artifact_scope("A") + + # A's handle no longer resolves; B's still does. + assert resolve_artifact_handles(h_a) == h_a + assert base64.b64decode(resolve_artifact_handles(h_b)) == b"b" + + def test_unscoped_artifact_survives_other_scope_clears(self) -> None: + handle = _handle_in(store_artifact(FileArtifact(data=b"x"))) + clear_artifact_scope("some-crew") + assert base64.b64decode(resolve_artifact_handles(handle)) == b"x" + + +def _legacy_executor_runner(tools): + """Return a `(func_name, args) -> result_dict` driver for the legacy executor.""" + from unittest.mock import Mock + + from crewai.agents.crew_agent_executor import CrewAgentExecutor + from crewai.tools.base_tool import to_langchain + from crewai.utilities.agent_utils import convert_tools_to_openai_schema + + executor = CrewAgentExecutor(tools=to_langchain(tools), original_tools=tools) + agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None) + agent.tools_results = [] + executor.agent = agent + task = Mock(description="t", id="scope-legacy") + task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly + executor.task = task + _, available_functions, _ = convert_tools_to_openai_schema(tools) + + def run(func_name, args): + return executor._execute_single_native_tool_call( + call_id="c", + func_name=func_name, + func_args=args, + available_functions=available_functions, + ) + + return run + + +def _experimental_executor_runner(tools): + """Return a `(func_name, args) -> result_dict` driver for the default executor.""" + import json + from types import SimpleNamespace + from unittest.mock import Mock + + from crewai.experimental.agent_executor import AgentExecutor + + executor = AgentExecutor.model_construct() + for key, value in { + "original_tools": tools, + "tools": [], + "tools_handler": None, + "crew": None, + }.items(): + object.__setattr__(executor, key, value) + agent = Mock(key="agent", role="tester", verbose=False, fingerprint=None) + agent.tools_results = [] + object.__setattr__(executor, "agent", agent) + task = Mock(id="scope-exp", description="t") + task.name = "t" # `name=` is a reserved Mock ctor kwarg, so assign explicitly + object.__setattr__(executor, "task", task) + executor._setup_native_tools() + + def run(func_name, args): + tool_call = SimpleNamespace( + id="c", + function=SimpleNamespace( + name=func_name, arguments=args if isinstance(args, str) else json.dumps(args) + ), + ) + return executor._execute_single_native_tool_call(tool_call) + + return run + + +@pytest.mark.parametrize( + "make_runner", + [_experimental_executor_runner, _legacy_executor_runner], + ids=["experimental", "legacy"], +) +class TestNativeExecutorWiring: + """Guard producer/consumer wiring on both the default and legacy executors.""" + + def test_artifact_output_is_replaced_by_handle_and_resolves_downstream( + self, make_runner + ) -> None: + from crewai.tools import BaseTool, FileArtifact + + payload = bytes(range(256)) * 200 # ~51 KB, far past the LLM round-trip limit + + class Generate(BaseTool): + name: str = "generate_file" + description: str = "Generate a binary file" + + def _run(self) -> FileArtifact: + return FileArtifact( + data=payload, filename="deck.pptx", mime_type="application/pptx" + ) + + captured: dict[str, str] = {} + + class Upload(BaseTool): + name: str = "upload_file" + description: str = "Upload base64 content" + + def _run(self, content: str) -> str: + captured["content"] = content + return "uploaded" + + run = make_runner([Generate(), Upload()]) + + # Producer: the 51 KB payload must NOT appear in the model-facing result. + gen_result = run("generate_file", "{}")["result"] + assert "deck.pptx" in gen_result + assert base64.b64encode(payload).decode() not in gen_result + handle = _handle_in(gen_result) + + # Consumer: the handle the model echoes is expanded to exact bytes. + up_result = run("upload_file", {"content": handle})["result"] + assert up_result == "uploaded" + assert base64.b64decode(captured["content"]) == payload + + +class TestAfterHookArtifact: + """An after_tool_call hook that returns a FileArtifact must still be stored.""" + + def test_hook_returned_artifact_is_replaced_by_handle(self) -> None: + from crewai.hooks.tool_hooks import ( + register_after_tool_call_hook, + unregister_after_tool_call_hook, + ) + from crewai.tools import BaseTool, FileArtifact + + payload = bytes(range(256)) * 50 + + class Echo(BaseTool): + name: str = "echo" + description: str = "Echo" + + def _run(self) -> str: + return "plain text" + + def hook(_context): + return FileArtifact(data=payload, filename="hook.bin") + + register_after_tool_call_hook(hook) + try: + run = _experimental_executor_runner([Echo()]) + result = run("echo", "{}")["result"] + finally: + unregister_after_tool_call_hook(hook) + + assert base64.b64encode(payload).decode() not in result + assert _HANDLE.search(result) is not None + + +class TestTtlPrune: + @staticmethod + def _expire(handle: str) -> None: + """Force a stored handle's per-entry TTL into the past.""" + entry = _store._entries[handle.rsplit("/", 1)[-1]] + entry.expires_at = time.monotonic() - 1 + + def test_expired_handle_does_not_resolve(self) -> None: + handle = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600)) + self._expire(handle) + # An expired handle is enforced on lookup, not just on the next write. + assert resolve_artifact_handles(handle) == handle + + def test_short_ttl_store_does_not_evict_long_ttl_entries(self) -> None: + keep = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=3600)) + # A later short-TTL store must prune only by each entry's own expiry, + # never by the current call's ttl. + store_artifact(FileArtifact(data=b"tiny"), ttl=1) + assert base64.b64decode(resolve_artifact_handles(keep)) == b"keep" + + def test_expired_entries_are_pruned_on_next_store(self) -> None: + stale = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600)) + self._expire(stale) + store_artifact(FileArtifact(data=b"new"), ttl=3600) + assert stale.rsplit("/", 1)[-1] not in _store._entries + + def test_ttl_zero_never_expires(self) -> None: + handle = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=0)) + assert _store._entries[handle.rsplit("/", 1)[-1]].expires_at is None + store_artifact(FileArtifact(data=b"another"), ttl=0) + assert base64.b64decode(resolve_artifact_handles(handle)) == b"keep"