Skip to content

fix: stop unbounded event-bus RuntimeState recorder leak on long-lived processes#6056

Open
mattatcha wants to merge 1 commit into
mainfrom
matcha/flow-event-bus-leak-fix
Open

fix: stop unbounded event-bus RuntimeState recorder leak on long-lived processes#6056
mattatcha wants to merge 1 commit into
mainfrom
matcha/flow-event-bus-leak-fix

Conversation

@mattatcha
Copy link
Copy Markdown
Collaborator

@mattatcha mattatcha commented Jun 5, 2026

Summary

The process-global event bus (crewai_event_bus) recorded every emitted event into a RuntimeState — the entity root list plus an event_record — on every kickoff, unconditionally and with no eviction. Only the checkpoint/replay machinery ever reads that recorder, so for the common "construct a Flow/Crew, kickoff(), discard" pattern it grew ~linearly with kickoff count until a long-lived process (worker, request handler, scheduler) was OOM-killed.

Key Changes

  • Gate recording behind an armed flag — the bus only registers entities / records events once recording is enabled.
  • Recording is armed when a CheckpointConfig is resolved on a Crew/Flow/Agent, or when a state is restored via set_runtime_state() (checkpoint restore / fork). Plain and @persist kickoff loops now record nothing.
  • Add a public crewai_event_bus.reset_runtime_state() so embedders that checkpoint but never replay in-process can bound memory between runs.

Checkpoint/replay behavior is unchanged — the recorder is still populated whenever checkpointing is configured.

Repro

from crewai.flow.flow import Flow, start, listen
from crewai.events.event_bus import crewai_event_bus

class EchoFlow(Flow):
    @start()
    def begin(self): return "begin"
    @listen(begin)
    def finish(self, _): return "finish"

for _ in range(100):
    EchoFlow().kickoff(inputs={"payload": "x" * 13_000})

rs = crewai_event_bus._runtime_state
# before: RuntimeState with root=100, event_record.nodes=800 (grows every kickoff)
# after:  None — nothing recorded without checkpointing
print(rs)

Tests

  • New regression tests assert a plain flow records nothing while an armed (checkpoint/restore) flow still records into RuntimeState.
  • Existing event-bus, checkpoint, replay, persistence, and conversation suites pass.

Note

Medium Risk
Touches global singleton event-bus behavior for checkpoint/replay recording; incorrect gating could break checkpoints or resume, but scoped to recording paths with new regression tests.

Overview
Fixes an unbounded memory leak on the process-global crewai_event_bus: it no longer records every kickoff’s entities and events into RuntimeState unless recording is explicitly armed.

Recording gate: A new _recording_enabled flag defaults to off. _register_source and _record_event become no-ops until armed via enable_recording() or set_runtime_state(). Checkpoint handler registration now calls enable_recording() when a CheckpointConfig is first resolved, so checkpoint/replay behavior stays the same for configured runs.

Memory control: Adds public reset_runtime_state() to clear the attached RuntimeState and entity id set between runs while leaving recording armed—intended for long-lived embedders that checkpoint but don’t replay in-process. _record_event reads _runtime_state once to tolerate concurrent resets.

Tests: Replay test arms recording explicitly; new TestRecordingGate and _isolated_recording_state() assert plain flows leave runtime_state nil and armed flows still populate the recorder.

Reviewed by Cursor Bugbot for commit 654e738. Bugbot is set up for automated code reviews on this repo. Configure here.

Summary by CodeRabbit

  • New Features

    • Added explicit method to enable event recording, allowing fine-grained control over when runtime state is captured.
  • Bug Fixes

    • Event recording is now disabled by default to prevent unbounded memory growth in long-running applications.
    • reset_runtime_state() improved to safely clear recorded data while maintaining recording state.
  • Tests

    • Added tests validating recording state isolation and manual recording control.

…leak

The process-global event bus recorded every emitted event into a
RuntimeState (entity `root` list + `event_record`) on every kickoff,
unconditionally and with no eviction. Only the checkpoint/replay
machinery ever reads that recorder, so for the common "construct a
Flow/Crew, kickoff, discard" pattern it grew ~linearly with kickoff
count until a long-lived process (worker, request handler, scheduler)
was OOM-killed.

Gate recording behind an armed flag: the bus only registers entities and
records events once recording is enabled, which happens when a
CheckpointConfig is resolved on a Crew/Flow/Agent or when a state is
restored via set_runtime_state(). Plain and @persist kickoff loops now
record nothing; checkpoint/replay behavior is unchanged. Also expose a
public reset_runtime_state() so embedders that checkpoint but never
replay in-process can bound memory between runs.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 5, 2026

Worried about impact? Review this PR in Change Stack to explore blast radius before you approve or request changes.

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 56111bed-cf46-43a0-a5dc-9dac9c03e290

📥 Commits

Reviewing files that changed from the base of the PR and between 913a3ab and 654e738.

📒 Files selected for processing (3)
  • lib/crewai/src/crewai/events/event_bus.py
  • lib/crewai/src/crewai/state/checkpoint_listener.py
  • lib/crewai/tests/events/test_event_replay.py

📝 Walkthrough

Walkthrough

This PR adds an explicit recording gate to the event bus singleton that prevents unbounded RuntimeState growth in long-lived processes. Recording defaults to disabled and arms only when checkpointing is configured or explicitly enabled; it can be reset while staying armed for subsequent runs.

Changes

Event Bus Recording Gate and Checkpoint Integration

Layer / File(s) Summary
Event Bus Recording Gate Mechanism
lib/crewai/src/crewai/events/event_bus.py
Event bus gains a _recording_enabled flag (defaulting to False) with initialization documentation explaining the memory-growth rationale. New enable_recording() method arms recording idempotently. set_runtime_state() auto-arms recording when a runtime state is attached. reset_runtime_state() clears recorded state and entity ids while keeping recording armed. _register_source() and _record_event() become conditional on _recording_enabled and safely check for non-None runtime state.
Checkpoint Listener Recording Activation
lib/crewai/src/crewai/state/checkpoint_listener.py
Lazy checkpoint handler registration now calls enable_recording() to arm the event bus for capturing the entity tree and event record required for checkpoint serialization.
Test Infrastructure and Recording Gate Verification
lib/crewai/tests/events/test_event_replay.py
Imports contextmanager and adds _isolated_recording_state() context manager to snapshot and restore process-global recording state between tests, preventing state leakage. Existing replay test explicitly enables recording before the first flow run. New TestRecordingGate suite verifies that recording-disabled prevents runtime_state population while recording-enabled populates it with a non-empty event record.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 A gate on the bus, now armed with care,
Memory bounded, no leaks laid bare,
Recording sleeps 'til checkpoint calls,
Then wakes to capture it all—
Events dance as they should, in rightful halls!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and specifically describes the main change: preventing unbounded memory growth in the event-bus RuntimeState recorder by introducing a gating mechanism for long-lived processes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch matcha/flow-event-bus-leak-fix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the size/M label Jun 5, 2026
@mattatcha mattatcha marked this pull request as ready for review June 5, 2026 22:51
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 654e738. Configure here.

# between the check and the deref.
state = self._runtime_state
if state is not None:
state.event_record.add(event)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Persistence resume skips event replay

High Severity

Gating _register_source and _record_event on _recording_enabled stops the process-global bus from building RuntimeState for flows that use persistence but not checkpointing. _replay_recorded_events on resume still reads that record; when it is missing, replay returns immediately and completed-step MethodExecution* events are not dispatched.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 654e738. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant