Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions lib/crewai/src/crewai/events/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class CrewAIEventsBus:
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
_recording_enabled: bool

def __new__(cls) -> Self:
"""Create or return the singleton instance.
Expand Down Expand Up @@ -153,6 +154,14 @@ def _initialize(self) -> None:
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
# The RuntimeState recorder (entity root + event_record) exists solely
# to serialize a run for checkpoint/replay. It is only ever read back
# by the checkpoint/fork/resume machinery, so we leave it disabled
# until something actually needs it. Recording it unconditionally on a
# process-global singleton is an unbounded leak in long-lived processes
# that run many kickoffs. Armed by ``enable_recording()`` (called when
# a checkpoint config is resolved) and by ``set_runtime_state()``.
self._recording_enabled: bool = False

def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
Expand Down Expand Up @@ -270,11 +279,46 @@ def decorator(handler: Callable[P, R]) -> Callable[P, R]:

return decorator

def enable_recording(self) -> None:
"""Arm RuntimeState recording for this process.

Until armed, the bus does not register entities or record events into a
``RuntimeState`` — that recorder feeds checkpoint/replay only and is
wasted work (and an unbounded leak on a long-lived singleton) when no
checkpointing is configured. Called when a ``CheckpointConfig`` is
resolved on a Crew/Flow/Agent (see
``crewai.state.checkpoint_listener``) and by :meth:`set_runtime_state`.
Idempotent.
"""
with self._instance_lock:
self._recording_enabled = True

def set_runtime_state(self, state: RuntimeState) -> None:
"""Set the RuntimeState that will be passed to event handlers."""
with self._instance_lock:
self._runtime_state = state
self._registered_entity_ids = {id(e) for e in state.root}
self._recording_enabled = True

def reset_runtime_state(self) -> None:
"""Drop the recorded ``RuntimeState`` and registered entity ids.

When recording is armed (checkpointing configured, or a state was
restored via :meth:`set_runtime_state`), the bus records every emitted
event into a process-global ``RuntimeState`` — the entity ``root`` list
plus the ``event_record`` — so checkpoint/replay can reconstruct a run.
Because the bus is a process-global singleton, that record grows
without bound across successive ``kickoff`` calls in a long-lived
process (worker, request handler, scheduler). Embedders that checkpoint
but never replay in-process can call this between runs to bound memory.

This clears the recorded data but leaves recording armed, so subsequent
runs still record. Safe to call when no state is attached. Do not call
mid-run or while a pending checkpoint/replay depends on the record.
"""
with self._instance_lock:
self._runtime_state = None
self._registered_entity_ids = set()

@property
def runtime_state(self) -> RuntimeState | None:
Expand Down Expand Up @@ -464,17 +508,31 @@ async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
await self._acall_handlers(source, event, level_async)

def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
"""Register the source entity in RuntimeState if applicable.

No-op unless recording is armed (see :meth:`enable_recording`): the
RuntimeState entity list is only read by checkpoint/replay.
"""
if (
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
self._recording_enabled
and getattr(source, "entity_type", None) in ("flow", "crew", "agent")
and id(source) not in self._registered_entity_ids
):
self.register_entity(source)

def _record_event(self, event: BaseEvent) -> None:
"""Add an event to the RuntimeState event record."""
if self._runtime_state is not None:
self._runtime_state.event_record.add(event)
"""Add an event to the RuntimeState event record.

No-op unless recording is armed (see :meth:`enable_recording`): the
event record is only read by checkpoint/replay.
"""
if not self._recording_enabled:
return
# Read once: a concurrent reset_runtime_state() can null _runtime_state
# between the check and the deref.
state = self._runtime_state
if state is not None:
state.event_record.add(event)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Persistence resume skips event replay

High Severity

Gating _register_source and _record_event on _recording_enabled stops the process-global bus from building RuntimeState for flows that use persistence but not checkpointing. _replay_recorded_events on resume still reads that record; when it is missing, replay returns immediately and completed-step MethodExecution* events are not dispatched.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 654e738. Configure here.


def _prepare_event(self, source: Any, event: BaseEvent) -> None:
"""Register source, set scope/sequence metadata, and record the event.
Expand Down
5 changes: 5 additions & 0 deletions lib/crewai/src/crewai/state/checkpoint_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def _ensure_handlers_registered() -> None:
if _handlers_registered:
return
_register_all_handlers(crewai_event_bus)
# Arm RuntimeState recording: from here on the bus must capture the
# entity tree + event record so a checkpoint can serialize the run.
# Until a checkpoint config is resolved, recording stays off to avoid
# an unbounded leak on the process-global bus.
crewai_event_bus.enable_recording()
_handlers_registered = True


Expand Down
79 changes: 79 additions & 0 deletions lib/crewai/tests/events/test_event_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from contextlib import contextmanager
from typing import Any
from unittest.mock import patch

Expand Down Expand Up @@ -132,6 +133,12 @@ def step_b(self) -> str:
def step_c(self) -> str:
return "c"

# The event record (which replay reads from) is only populated when
# recording is armed. In production a resume arms it by restoring the
# RuntimeState from a checkpoint (``from_checkpoint`` ->
# ``set_runtime_state``); here we arm it directly and let flow1 record
# live, then resume flow2 against that record.
crewai_event_bus.enable_recording()
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()

Expand Down Expand Up @@ -163,3 +170,75 @@ def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1


@contextmanager
def _isolated_recording_state() -> Any:
"""Snapshot and fully restore the singleton bus recording state.

Restores all three coupled fields (``_recording_enabled``,
``_runtime_state``, ``_registered_entity_ids``) so a test that pokes the
process-global bus can't leak an inconsistent state — e.g. a populated
``_runtime_state`` with an emptied id set — into later tests.
"""
prev_enabled = crewai_event_bus._recording_enabled
prev_state = crewai_event_bus._runtime_state
prev_ids = crewai_event_bus._registered_entity_ids
try:
yield
finally:
crewai_event_bus._recording_enabled = prev_enabled
crewai_event_bus._runtime_state = prev_state
crewai_event_bus._registered_entity_ids = prev_ids


class TestRecordingGate:
"""RuntimeState recording is armed only when checkpoint/replay needs it.

The bus is a process-global singleton; recording every event into its
``RuntimeState`` unconditionally leaks linearly across kickoffs in a
long-lived process. Recording stays off until a checkpoint config is
resolved (or a state is restored), so the common "construct, kickoff,
discard" loop allocates nothing.
"""

def test_plain_flow_does_not_record_when_recording_disarmed(self) -> None:
from crewai.flow.flow import Flow, listen, start

class EchoFlow(Flow):
@start()
def begin(self) -> str:
return "begin"

@listen(begin)
def finish(self, _: Any) -> str:
return "finish"

with _isolated_recording_state():
# Force a clean, never-checkpointed process state. (Another test in
# the same process may have armed recording via a checkpoint config.)
crewai_event_bus.reset_runtime_state()
crewai_event_bus._recording_enabled = False

for _ in range(5):
EchoFlow().kickoff(inputs={"payload": "x" * 1000})
# No checkpointing configured -> nothing recorded -> no leak.
assert crewai_event_bus.runtime_state is None

def test_armed_flow_records_into_runtime_state(self) -> None:
from crewai.flow.flow import Flow, start

class OneStep(Flow):
@start()
def begin(self) -> str:
return "begin"

with _isolated_recording_state():
crewai_event_bus.reset_runtime_state()
crewai_event_bus.enable_recording()

OneStep().kickoff()
state = crewai_event_bus.runtime_state
assert state is not None
assert len(state.root) == 1
assert len(state.event_record.nodes) > 0
Loading