diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index e04446e1b4..fd95654c13 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -117,6 +117,7 @@ class CrewAIEventsBus: _executor_initialized: bool _has_pending_events: bool _runtime_state: RuntimeState | None + _recording_enabled: bool def __new__(cls) -> Self: """Create or return the singleton instance. @@ -153,6 +154,14 @@ def _initialize(self) -> None: self._has_pending_events = False self._runtime_state: RuntimeState | None = None self._registered_entity_ids: set[int] = set() + # The RuntimeState recorder (entity root + event_record) exists solely + # to serialize a run for checkpoint/replay. It is only ever read back + # by the checkpoint/fork/resume machinery, so we leave it disabled + # until something actually needs it. Recording it unconditionally on a + # process-global singleton is an unbounded leak in long-lived processes + # that run many kickoffs. Armed by ``enable_recording()`` (called when + # a checkpoint config is resolved) and by ``set_runtime_state()``. + self._recording_enabled: bool = False def _ensure_executor_initialized(self) -> None: """Lazily initialize the thread pool executor and event loop. @@ -270,11 +279,46 @@ def decorator(handler: Callable[P, R]) -> Callable[P, R]: return decorator + def enable_recording(self) -> None: + """Arm RuntimeState recording for this process. + + Until armed, the bus does not register entities or record events into a + ``RuntimeState`` — that recorder feeds checkpoint/replay only and is + wasted work (and an unbounded leak on a long-lived singleton) when no + checkpointing is configured. Called when a ``CheckpointConfig`` is + resolved on a Crew/Flow/Agent (see + ``crewai.state.checkpoint_listener``) and by :meth:`set_runtime_state`. + Idempotent. + """ + with self._instance_lock: + self._recording_enabled = True + def set_runtime_state(self, state: RuntimeState) -> None: """Set the RuntimeState that will be passed to event handlers.""" with self._instance_lock: self._runtime_state = state self._registered_entity_ids = {id(e) for e in state.root} + self._recording_enabled = True + + def reset_runtime_state(self) -> None: + """Drop the recorded ``RuntimeState`` and registered entity ids. + + When recording is armed (checkpointing configured, or a state was + restored via :meth:`set_runtime_state`), the bus records every emitted + event into a process-global ``RuntimeState`` — the entity ``root`` list + plus the ``event_record`` — so checkpoint/replay can reconstruct a run. + Because the bus is a process-global singleton, that record grows + without bound across successive ``kickoff`` calls in a long-lived + process (worker, request handler, scheduler). Embedders that checkpoint + but never replay in-process can call this between runs to bound memory. + + This clears the recorded data but leaves recording armed, so subsequent + runs still record. Safe to call when no state is attached. Do not call + mid-run or while a pending checkpoint/replay depends on the record. + """ + with self._instance_lock: + self._runtime_state = None + self._registered_entity_ids = set() @property def runtime_state(self) -> RuntimeState | None: @@ -464,17 +508,31 @@ async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None: await self._acall_handlers(source, event, level_async) def _register_source(self, source: Any) -> None: - """Register the source entity in RuntimeState if applicable.""" + """Register the source entity in RuntimeState if applicable. + + No-op unless recording is armed (see :meth:`enable_recording`): the + RuntimeState entity list is only read by checkpoint/replay. + """ if ( - getattr(source, "entity_type", None) in ("flow", "crew", "agent") + self._recording_enabled + and getattr(source, "entity_type", None) in ("flow", "crew", "agent") and id(source) not in self._registered_entity_ids ): self.register_entity(source) def _record_event(self, event: BaseEvent) -> None: - """Add an event to the RuntimeState event record.""" - if self._runtime_state is not None: - self._runtime_state.event_record.add(event) + """Add an event to the RuntimeState event record. + + No-op unless recording is armed (see :meth:`enable_recording`): the + event record is only read by checkpoint/replay. + """ + if not self._recording_enabled: + return + # Read once: a concurrent reset_runtime_state() can null _runtime_state + # between the check and the deref. + state = self._runtime_state + if state is not None: + state.event_record.add(event) def _prepare_event(self, source: Any, event: BaseEvent) -> None: """Register source, set scope/sequence metadata, and record the event. diff --git a/lib/crewai/src/crewai/state/checkpoint_listener.py b/lib/crewai/src/crewai/state/checkpoint_listener.py index 53ae0b4947..4ec0b99764 100644 --- a/lib/crewai/src/crewai/state/checkpoint_listener.py +++ b/lib/crewai/src/crewai/state/checkpoint_listener.py @@ -49,6 +49,11 @@ def _ensure_handlers_registered() -> None: if _handlers_registered: return _register_all_handlers(crewai_event_bus) + # Arm RuntimeState recording: from here on the bus must capture the + # entity tree + event record so a checkpoint can serialize the run. + # Until a checkpoint config is resolved, recording stays off to avoid + # an unbounded leak on the process-global bus. + crewai_event_bus.enable_recording() _handlers_registered = True diff --git a/lib/crewai/tests/events/test_event_replay.py b/lib/crewai/tests/events/test_event_replay.py index d141385ca7..13bdc34074 100644 --- a/lib/crewai/tests/events/test_event_replay.py +++ b/lib/crewai/tests/events/test_event_replay.py @@ -2,6 +2,7 @@ from __future__ import annotations +from contextlib import contextmanager from typing import Any from unittest.mock import patch @@ -132,6 +133,12 @@ def step_b(self) -> str: def step_c(self) -> str: return "c" + # The event record (which replay reads from) is only populated when + # recording is armed. In production a resume arms it by restoring the + # RuntimeState from a checkpoint (``from_checkpoint`` -> + # ``set_runtime_state``); here we arm it directly and let flow1 record + # live, then resume flow2 against that record. + crewai_event_bus.enable_recording() if crewai_event_bus.runtime_state is not None: crewai_event_bus.runtime_state.event_record.clear() @@ -163,3 +170,75 @@ def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None: assert captured_finished.count("step_a") == 1 assert captured_finished.count("step_b") == 1 assert captured_finished.count("step_c") == 1 + + +@contextmanager +def _isolated_recording_state() -> Any: + """Snapshot and fully restore the singleton bus recording state. + + Restores all three coupled fields (``_recording_enabled``, + ``_runtime_state``, ``_registered_entity_ids``) so a test that pokes the + process-global bus can't leak an inconsistent state — e.g. a populated + ``_runtime_state`` with an emptied id set — into later tests. + """ + prev_enabled = crewai_event_bus._recording_enabled + prev_state = crewai_event_bus._runtime_state + prev_ids = crewai_event_bus._registered_entity_ids + try: + yield + finally: + crewai_event_bus._recording_enabled = prev_enabled + crewai_event_bus._runtime_state = prev_state + crewai_event_bus._registered_entity_ids = prev_ids + + +class TestRecordingGate: + """RuntimeState recording is armed only when checkpoint/replay needs it. + + The bus is a process-global singleton; recording every event into its + ``RuntimeState`` unconditionally leaks linearly across kickoffs in a + long-lived process. Recording stays off until a checkpoint config is + resolved (or a state is restored), so the common "construct, kickoff, + discard" loop allocates nothing. + """ + + def test_plain_flow_does_not_record_when_recording_disarmed(self) -> None: + from crewai.flow.flow import Flow, listen, start + + class EchoFlow(Flow): + @start() + def begin(self) -> str: + return "begin" + + @listen(begin) + def finish(self, _: Any) -> str: + return "finish" + + with _isolated_recording_state(): + # Force a clean, never-checkpointed process state. (Another test in + # the same process may have armed recording via a checkpoint config.) + crewai_event_bus.reset_runtime_state() + crewai_event_bus._recording_enabled = False + + for _ in range(5): + EchoFlow().kickoff(inputs={"payload": "x" * 1000}) + # No checkpointing configured -> nothing recorded -> no leak. + assert crewai_event_bus.runtime_state is None + + def test_armed_flow_records_into_runtime_state(self) -> None: + from crewai.flow.flow import Flow, start + + class OneStep(Flow): + @start() + def begin(self) -> str: + return "begin" + + with _isolated_recording_state(): + crewai_event_bus.reset_runtime_state() + crewai_event_bus.enable_recording() + + OneStep().kickoff() + state = crewai_event_bus.runtime_state + assert state is not None + assert len(state.root) == 1 + assert len(state.event_record.nodes) > 0