Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions lib/crewai/src/crewai/flow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,93 @@ def __ne__(self, other: object) -> bool:
return not self.__eq__(other)


class LockedModelProxy:
"""Lock-guarded proxy for a nested Pydantic ``BaseModel`` held in flow state.

``StateProxy`` wraps ``list`` and ``dict`` attributes so that mutations on
nested containers acquire the flow state lock. Pydantic ``BaseModel``
instances were previously returned unwrapped (the ``return value`` fall
through), so ``flow.state.profile.name = "x"`` mutated the model attribute
entirely outside the lock. When parallel listeners mutate nested-model
attributes, those writes race and corrupt state silently. This proxy closes
that gap by routing every attribute read/write on the wrapped model through
the same lock, recursively wrapping nested lists, dicts, and models.
"""

__slots__ = ("_lock", "_model")

def __init__(self, model: BaseModel, lock: threading.Lock) -> None:
"""Wrap ``model``, guarding all attribute access with ``lock``.

Args:
model: The nested Pydantic model held in the flow's state.
lock: The shared flow-state lock that serializes access to the
state tree. The same lock instance is propagated to every
nested proxy so the whole subtree is guarded by one lock.
"""
object.__setattr__(self, "_model", model)
object.__setattr__(self, "_lock", lock)

def __getattr__(self, name: str) -> Any:
"""Read ``name`` from the wrapped model while holding the lock.

The attribute is fetched under the flow-state lock, then re-wrapped so
deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`,
``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` ->
:class:`LockedModelProxy` (recursively). Scalars are returned as-is.

Args:
name: Attribute name to read from the wrapped model.

Returns:
The attribute value, wrapped in the matching lock-aware proxy when
it is a list, dict, or nested ``BaseModel``; otherwise the raw value.
"""
lock = object.__getattribute__(self, "_lock")
model = object.__getattribute__(self, "_model")
with lock:
value = getattr(model, name)

if isinstance(value, list):
return LockedListProxy(value, lock)
if isinstance(value, dict):
return LockedDictProxy(value, lock)
if isinstance(value, BaseModel):
return LockedModelProxy(value, lock)
return value
Comment on lines +516 to +542
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Callable attributes currently execute outside the lock.

LockedModelProxy.__getattr__ fetches attributes under lock, but if the value is a bound method/callable, invocation happens after the lock is released. Any mutating model method can still race and bypass serialization.

Suggested fix
@@
     def __getattr__(self, name: str) -> Any:
@@
         with lock:
             value = getattr(model, name)

+        if callable(value):
+            def _locked_call(*args: Any, **kwargs: Any) -> Any:
+                with lock:
+                    result = value(*args, **kwargs)
+                if isinstance(result, list):
+                    return LockedListProxy(result, lock)
+                if isinstance(result, dict):
+                    return LockedDictProxy(result, lock)
+                if isinstance(result, BaseModel):
+                    return LockedModelProxy(result, lock)
+                return result
+            return _locked_call
+
         if isinstance(value, list):
             return LockedListProxy(value, lock)
         if isinstance(value, dict):
             return LockedDictProxy(value, lock)
         if isinstance(value, BaseModel):
             return LockedModelProxy(value, lock)
         return value
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __getattr__(self, name: str) -> Any:
"""Read ``name`` from the wrapped model while holding the lock.
The attribute is fetched under the flow-state lock, then re-wrapped so
deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`,
``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` ->
:class:`LockedModelProxy` (recursively). Scalars are returned as-is.
Args:
name: Attribute name to read from the wrapped model.
Returns:
The attribute value, wrapped in the matching lock-aware proxy when
it is a list, dict, or nested ``BaseModel``; otherwise the raw value.
"""
lock = object.__getattribute__(self, "_lock")
model = object.__getattribute__(self, "_model")
with lock:
value = getattr(model, name)
if isinstance(value, list):
return LockedListProxy(value, lock)
if isinstance(value, dict):
return LockedDictProxy(value, lock)
if isinstance(value, BaseModel):
return LockedModelProxy(value, lock)
return value
def __getattr__(self, name: str) -> Any:
"""Read ``name`` from the wrapped model while holding the lock.
The attribute is fetched under the flow-state lock, then re-wrapped so
deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`,
``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` ->
:class:`LockedModelProxy` (recursively). Scalars are returned as-is.
Args:
name: Attribute name to read from the wrapped model.
Returns:
The attribute value, wrapped in the matching lock-aware proxy when
it is a list, dict, or nested ``BaseModel``; otherwise the raw value.
"""
lock = object.__getattribute__(self, "_lock")
model = object.__getattribute__(self, "_model")
with lock:
value = getattr(model, name)
if callable(value):
def _locked_call(*args: Any, **kwargs: Any) -> Any:
with lock:
result = value(*args, **kwargs)
if isinstance(result, list):
return LockedListProxy(result, lock)
if isinstance(result, dict):
return LockedDictProxy(result, lock)
if isinstance(result, BaseModel):
return LockedModelProxy(result, lock)
return result
return _locked_call
if isinstance(value, list):
return LockedListProxy(value, lock)
if isinstance(value, dict):
return LockedDictProxy(value, lock)
if isinstance(value, BaseModel):
return LockedModelProxy(value, lock)
return value
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/flow/runtime.py` around lines 516 - 542, The current
__getattr__ returns callables outside the lock, letting method calls race;
modify __getattr__ (in the LockedModelProxy) so that when the resolved value is
callable you return a wrapper callable that acquires self._lock, re-fetches the
attribute from self._model (to preserve proper binding/descriptors) and invokes
it under the lock, then returns the result; use functools.wraps to preserve
metadata and ensure non-callable lists/dicts/BaseModel still return the existing
LockedListProxy/LockedDictProxy/LockedModelProxy wrappers and scalars are
returned as-is.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're a sharp bot, and you're right that the lock doesn't currently extend to method invocation: __getattr__ releases the lock before returning a callable, so a mutating method called through the proxy would run unsynchronized.

I'm scoping this out of this PR deliberately, for two reasons.

Scope. This PR closes the observed race, which is attribute-level mutation of nested flow state (the assignment / __setattr__ path). Serializing arbitrary method invocation on the state model is a broader guarantee that changes the proxy's contract, and I'd rather keep the blast radius of this fix tight.

Correctness of the suggested form. _state_lock is a plain, non-reentrant threading.Lock. Wrapping callables to execute under that lock would deadlock whenever the invoked method reads or writes its own attributes, since that re-enters __getattr__ / __setattr__ and tries to acquire the same lock the wrapper already holds. Pydantic model methods do this routinely, so the wrapper would hang real call paths rather than protect them.

Doing it safely means first moving _state_lock (and the proxy lock) to threading.RLock, then adding the callable wrapper with re-fetch-under-lock semantics and concurrency tests for the re-entrant paths. That's a coherent follow-up if you'd like full method-call serialization, and I'm glad to open one, but it's a distinct change from the assignment-race fix here. In current flow usage, state is mutated by assignment (now locked), not by methods called through the proxy.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ImmortalDemonGod, that's a well-reasoned scoping call, and the deadlock point is a genuine correctness flaw in my suggestion — I hadn't accounted for the re-entrant access pattern that Pydantic model methods trigger. A non-reentrant threading.Lock plus a callable wrapper would hang on the first method call that touches its own attributes, which is nearly all of them. The right prerequisite is exactly what you described: promote _state_lock (and the proxies) to threading.RLock and validate the re-entrant paths explicitly before adding the wrapper.

Happy to open a follow-up GitHub issue to track the full method-call serialization work (RLock migration + callable wrapper + re-entrant concurrency tests). Would you like me to do that?

🐇 (thoughtful nod)


✏️ Learnings added
Learnt from: ImmortalDemonGod
Repo: crewAIInc/crewAI PR: 6036
File: lib/crewai/src/crewai/flow/runtime.py:516-542
Timestamp: 2026-06-04T19:44:05.920Z
Learning: In `lib/crewai/src/crewai/flow/runtime.py`, `LockedModelProxy.__getattr__` deliberately does NOT wrap callable attributes under the lock. The `_state_lock` (and the proxy lock shared by `LockedModelProxy`, `LockedListProxy`, `LockedDictProxy`) is a plain non-reentrant `threading.Lock`. Wrapping callables to execute under that lock would deadlock whenever the invoked Pydantic model method reads or writes its own attributes, since that re-enters `__getattr__`/`__setattr__` and tries to acquire the already-held lock. The safe upgrade path is: (1) promote `_state_lock` to `threading.RLock`, (2) add the callable wrapper with re-fetch-under-lock semantics, and (3) add re-entrant concurrency tests — as a separate follow-up PR.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


def __setattr__(self, name: str, value: Any) -> None:
"""Write ``name`` on the wrapped model while holding the lock.

The private ``_model`` / ``_lock`` slots are assigned directly (they
carry no shared state and must be settable before the lock exists);
every other attribute write acquires the flow-state lock before mutating
the wrapped model, so concurrent listeners cannot interleave writes. A
value read back through a proxy (``LockedListProxy`` / ``LockedDictProxy``
/ ``LockedModelProxy``) is unwrapped to its native object first, so a
proxy wrapper is never persisted inside the model.

Args:
name: Attribute name to set on the wrapped model.
value: Value to assign.
"""
if name in ("_model", "_lock"):
object.__setattr__(self, name, value)
return

if isinstance(value, LockedListProxy):
value = value._list
elif isinstance(value, LockedDictProxy):
value = value._dict
elif isinstance(value, LockedModelProxy):
value = object.__getattribute__(value, "_model")

lock = object.__getattribute__(self, "_lock")
model = object.__getattribute__(self, "_model")
with lock:
setattr(model, name, value)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class StateProxy(Generic[T]):
"""Proxy that provides thread-safe access to flow state.

Expand All @@ -506,6 +593,8 @@ def __getattr__(self, name: str) -> Any:
return LockedListProxy(value, lock)
if isinstance(value, dict):
return LockedDictProxy(value, lock)
if isinstance(value, BaseModel):
return LockedModelProxy(value, lock)
return value

def __setattr__(self, name: str, value: Any) -> None:
Expand All @@ -516,6 +605,8 @@ def __setattr__(self, name: str, value: Any) -> None:
value = value._list
elif isinstance(value, LockedDictProxy):
value = value._dict
elif isinstance(value, LockedModelProxy):
value = object.__getattribute__(value, "_model")
with object.__getattribute__(self, "_proxy_lock"):
setattr(object.__getattribute__(self, "_proxy_state"), name, value)

Expand Down
117 changes: 117 additions & 0 deletions lib/crewai/tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import threading
import time
from datetime import datetime
from typing import Optional

Expand All @@ -17,6 +18,7 @@
MethodExecutionStartedEvent,
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.runtime import LockedModelProxy, StateProxy


def test_simple_sequential_flow():
Expand Down Expand Up @@ -420,6 +422,121 @@ def second_method(self):
assert flow.state.message == "final"


def test_flow_state_proxy_wraps_nested_pydantic_models():
"""Semantic-negative test for the nested-BaseModel locking gap.

``StateProxy`` only wrapped ``list`` and ``dict`` attributes; nested Pydantic
models fell through unwrapped, so attribute writes on them bypassed the flow
state lock and could race under parallel listeners. With the
``LockedModelProxy`` fix, ``flow.state.profile`` is a lock-guarded proxy
(distinct from the raw model and exposing ``_lock``), and mutations route
through the lock. Before the fix, ``profile is not self._state.profile`` and
``hasattr(profile, "_lock")`` are both False and this test fails.
"""

class UserProfile(BaseModel):
counter: int = 0

class MyStructuredState(BaseModel):
profile: UserProfile = UserProfile()

class NestedModelFlow(Flow[MyStructuredState]):
@start()
def first_method(self):
profile = self.state.profile
# The proxy must wrap the nested model, not return it raw.
assert profile is not self._state.profile
assert hasattr(profile, "_lock")

# Lock-guarded mutation is reflected on the underlying model.
profile.counter += 1
assert self.state.profile.counter == 1

flow = NestedModelFlow()
flow.kickoff()


def test_locked_model_proxy_serializes_writes_under_thread_contention():
"""Behavioral concurrency test for the nested-``BaseModel`` locking fix.

Unlike the structural test above (which asserts the model is now lock-wrapped),
this drives real thread contention to prove the lock is actually *engaged*: a
nested-model attribute write must acquire the flow-state lock, so it cannot
proceed while another holder — e.g. a state snapshot/persist — holds that lock.

Before the ``LockedModelProxy`` fix, ``state.profile`` is the raw model and its
writes bypass the lock entirely, so the write would complete *while the lock is
held* (the race this PR closes). After the fix the write blocks until the lock
is released. The assertion ``not write_done.is_set()`` therefore fails before the
fix and passes after.
"""

class UserProfile(BaseModel):
value: int = 0

class MyStructuredState(BaseModel):
profile: UserProfile = UserProfile()

lock = threading.Lock()
proxy = StateProxy(MyStructuredState(), lock)

write_started = threading.Event()
write_done = threading.Event()

def writer() -> None:
write_started.set()
# Routes through LockedModelProxy.__setattr__ -> acquires `lock`.
proxy.profile.value = 42
write_done.set()

worker = threading.Thread(target=writer)
with lock: # stand in for a lock-protected state operation (snapshot/persist)
worker.start()
assert write_started.wait(timeout=2.0)
# Give the writer time to attempt the write. With the fix it is blocked on
# the lock we hold, so it must NOT have completed yet.
time.sleep(0.1)
assert not write_done.is_set(), (
"nested-model write proceeded while the flow-state lock was held — "
"the write bypassed the lock (pre-fix race)"
)

# Lock released: the serialized write now completes with the written value.
assert write_done.wait(timeout=2.0)
worker.join()
assert proxy.profile.value == 42


def test_state_proxy_setattr_unwraps_locked_model_proxy():
"""Assigning a proxied nested model back into state must store the native model.

The read-side fix returns a nested ``BaseModel`` as a ``LockedModelProxy``. If
that value is assigned back (``state.profile = state.profile``), ``__setattr__``
must unwrap it to the underlying model first, mirroring the existing
``LockedListProxy``/``LockedDictProxy`` handling, so a proxy wrapper is never
persisted inside state. Without the unwrap, the stored value is the proxy itself.
"""

class UserProfile(BaseModel):
value: int = 0

class MyStructuredState(BaseModel):
profile: UserProfile = UserProfile()

lock = threading.Lock()
proxy = StateProxy(MyStructuredState(), lock)

read_back = proxy.profile
assert isinstance(read_back, LockedModelProxy)

# Re-assign the proxied model back onto state; it must be unwrapped.
proxy.profile = read_back

stored = object.__getattribute__(proxy, "_proxy_state").profile
assert isinstance(stored, UserProfile)
assert not isinstance(stored, LockedModelProxy)


def test_router_with_multiple_conditions():
"""Test a router that triggers when any of multiple steps complete (OR condition),
and another router that triggers only after all specified steps complete (AND condition).
Expand Down