[https://nvbugs/6104831][fix] Port dataTransceiver shared_ptr<LlmRequest> lifetime fix#14979
Conversation
8da74f0 to
fee06b4
Compare
|
/bot run --disable-fail-fast |
📝 WalkthroughWalkthroughCache transceiver async APIs transition from raw-pointer/reference lifetimes to ChangesCache Transceiver Lifetime Safety
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp (1)
774-798:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winGuard async-resource initialization with a mutex.
requestAndReceiveAsyncMultiThreads()mutatesmInstanceToAsyncResourceandmRequestFuturesbefore it takesasyncResource->mMtxForQueue. Two concurrentreceiveAsync()calls for the sameprocessInfocan race on thatunordered_map/vector, which is undefined behavior and can start duplicate request workers.Possible fix
+ std::mutex mAsyncResourceMutex; + [[nodiscard]] std::future<void> requestAndReceiveAsyncMultiThreads(std::shared_ptr<LlmRequest> const& llmRequest) { try { auto promise = std::make_unique<std::promise<void>>(); auto future = promise->get_future(); TLLM_CHECK(llmRequest->getDataTransceiverState().getCommState().has_value()); std::string processInfo = kDefaultProcessInfo; if (common::getEnvRequestKVCacheConcurrent()) { processInfo = llmRequest->getDataTransceiverState().getCommState()->toString(); } - if (mInstanceToAsyncResource.find(processInfo) == mInstanceToAsyncResource.end()) { - - mInstanceToAsyncResource.emplace(processInfo, std::make_unique<AsyncResource>()); - auto requestFuture = std::async(std::launch::async, &CacheReceiver::Impl::request, this, - std::ref(*mInstanceToAsyncResource.at(processInfo))); - mRequestFutures.emplace_back(std::move(requestFuture)); + std::scoped_lock lk(mAsyncResourceMutex); + if (mInstanceToAsyncResource.find(processInfo) == mInstanceToAsyncResource.end()) + { + mInstanceToAsyncResource.emplace(processInfo, std::make_unique<AsyncResource>()); + auto requestFuture = std::async(std::launch::async, &CacheReceiver::Impl::request, this, + std::ref(*mInstanceToAsyncResource.at(processInfo))); + mRequestFutures.emplace_back(std::move(requestFuture)); + } } auto& asyncResource = mInstanceToAsyncResource.at(processInfo); { std::unique_lock<std::mutex> lck(asyncResource->mMtxForQueue); asyncResource->mRequestsQueue.emplace_back(llmRequest, std::move(promise));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp` around lines 774 - 798, requestAndReceiveAsyncMultiThreads mutates mInstanceToAsyncResource and mRequestFutures without synchronization and can race between concurrent calls; wrap the map/vector lookup and potential insertion (the block that checks mInstanceToAsyncResource.find(processInfo), emplaces a new AsyncResource, starts the async worker via std::async, and pushes into mRequestFutures) with a dedicated mutex (e.g., add or reuse a member like mInstanceMutex) so the creation of the AsyncResource and the requestFuture is atomic, then after releasing that mutex you can lock asyncResource->mMtxForQueue to push the (llmRequest, promise) into asyncResource->mRequestsQueue; ensure all accesses to mInstanceToAsyncResource and mRequestFutures use the same mutex to avoid UB and duplicate workers.
🧹 Nitpick comments (1)
cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp (1)
351-355: ⚡ Quick winAdd one regression that drops caller ownership before the future resolves.
These call sites now use the
shared_ptrAPI, but the surrounding tests still keep the request alive until afterfuture.get(). That means a regression back to raw/reference capture insidedataTransceiverwould likely still pass. Please add at least one case that callssendAsync()/receiveAsync(), immediately releases the caller-heldshared_ptr, and then asserts the future still completes correctly.Also applies to: 977-988
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp` around lines 351 - 355, Add a regression test that verifies caller ownership of the llmRequest is not required for the future to complete: after calling mSender->sendAsync(llmRequest) or mRequester->receiveAsync(llmRequest) store the returned future in mFutures (or a local future), immediately reset the caller's shared_ptr (e.g. llmRequest.reset()), then call future.get()/wait() and assert completion/success. Update the test block around mFutures.emplace_back(mSender->sendAsync(llmRequest)) / auto future = mRequester->receiveAsync(llmRequest) to include one scenario that drops the shared_ptr before resolving the future; apply the same pattern for the other occurrence mentioned (lines ~977-988) so the test will fail if dataTransceiver reintroduces raw/reference captures.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp`:
- Around line 477-482: The broken-promise occurs because promises are destroyed
when entries are erased; in CacheSender::Impl::sendResponse (and the other spots
around lines noted) ensure you resolve Response::mPromise before erasing from
mReadyResponses by calling mPromise.set_value() (or set_exception with an
appropriate exception on cancellation), and similarly in
CacheReceiver::Impl::cancelRequest resolve RequestAndPromise::mPromise before
removing from asyncResource->mRequestsQueue so the futures returned by
sendAsync() and requestAndReceiveAsyncMultiThreads() are not left broken; update
all indicated sites (including the other occurrences around 797-798 and
1089-1125) to fulfill the promise with success or a cancellation exception prior
to destroying the promise-containing object.
In `@cpp/tensorrt_llm/batch_manager/dataTransceiver.h`:
- Around line 259-262: The new shared_ptr-based APIs (e.g.,
sendAsync(std::shared_ptr<LlmRequest> const& llmRequest)) accept nullptr but
implementations immediately dereference llmRequest; reject null at entry or
enforce non-null ownership to preserve the old non-null contract. Add a
precondition check in sendAsync (and the other changed APIs at the same area)
that throws or returns a failed future when llmRequest == nullptr (or
alternatively change the API to take std::shared_ptr<LlmRequest> by value and
assert/throw on null), and document the behaviour so callers cannot pass
nullptr; reference sendAsync and the other modified functions that previously
took LlmRequest& to locate and update their entry-point null handling.
---
Outside diff comments:
In `@cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp`:
- Around line 774-798: requestAndReceiveAsyncMultiThreads mutates
mInstanceToAsyncResource and mRequestFutures without synchronization and can
race between concurrent calls; wrap the map/vector lookup and potential
insertion (the block that checks mInstanceToAsyncResource.find(processInfo),
emplaces a new AsyncResource, starts the async worker via std::async, and pushes
into mRequestFutures) with a dedicated mutex (e.g., add or reuse a member like
mInstanceMutex) so the creation of the AsyncResource and the requestFuture is
atomic, then after releasing that mutex you can lock asyncResource->mMtxForQueue
to push the (llmRequest, promise) into asyncResource->mRequestsQueue; ensure all
accesses to mInstanceToAsyncResource and mRequestFutures use the same mutex to
avoid UB and duplicate workers.
---
Nitpick comments:
In `@cpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp`:
- Around line 351-355: Add a regression test that verifies caller ownership of
the llmRequest is not required for the future to complete: after calling
mSender->sendAsync(llmRequest) or mRequester->receiveAsync(llmRequest) store the
returned future in mFutures (or a local future), immediately reset the caller's
shared_ptr (e.g. llmRequest.reset()), then call future.get()/wait() and assert
completion/success. Update the test block around
mFutures.emplace_back(mSender->sendAsync(llmRequest)) / auto future =
mRequester->receiveAsync(llmRequest) to include one scenario that drops the
shared_ptr before resolving the future; apply the same pattern for the other
occurrence mentioned (lines ~977-988) so the test will fail if dataTransceiver
reintroduces raw/reference captures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 47b75d87-4b8a-4b26-ac1e-fe04452ef7ab
📒 Files selected for processing (4)
cpp/tensorrt_llm/batch_manager/cacheTransceiver.cppcpp/tensorrt_llm/batch_manager/dataTransceiver.cppcpp/tensorrt_llm/batch_manager/dataTransceiver.hcpp/tests/unit_tests/multi_gpu/cacheTransceiverTest.cpp
fee06b4 to
3c445b2
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #52210 [ run ] triggered by Bot. Commit: |
|
PR_Github #52210 [ run ] completed with state
|
3c445b2 to
42aa1ce
Compare
…est> lifetime fix Signed-off-by: Chien-Chun Hung <2679986+chienchunhung@users.noreply.github.com>
42aa1ce to
f5ae093
Compare
|
/bot run --disable-fail-fast --stage-list "A10-PackageSanityCheck-PY310-UB2204" |
|
PR_Github #52407 [ run ] triggered by Bot. Commit: |
|
PR_Github #52407 [ run ] completed with state |
|
/bot run --disable-fail-fast |
|
PR_Github #52426 [ run ] triggered by Bot. Commit: |
|
PR_Github #52426 [ run ] completed with state
|
Summary
Closes a UAF / Broken-promise gap in the disagg KV cache transfer that survived PR #14768 (the always-on-baseline tier-1 PR). PR #14768 ported the
shared_ptr<LlmRequest>lifetime fix at the outerCacheTransceiverlayer (interface +mSenderFutures/mRequesterFutures) but not at the innerdataTransceiverlayer where the actualstd::promise<void>lives. This PR ports just the inner-layer lifetime piece from PR #13713.What was missed in PR #14768
cacheTransceiver.{h,cpp})mSender/mRequesterFuturesstorage toshared_ptr<LlmRequest>LlmRequestobject alivedataTransceiver.{h,cpp})Response::mRequestandRequestAndPromise::mRequeststill raw pointers;sendAsync(LlmRequest&)/receiveAsync(LlmRequest&)still by reference;std::async(&Impl::requestSync, this, std::ref(llmRequest))captures by referencestd::promise<void>; if they're destroyed during worker error cleanup or the async task races Python_terminate_request, the promise dies unfulfilled →std::future_error: Broken promiseon the future sideThe outer-layer fix prevents the C++ worker from dereferencing a freed
LlmRequest, but does nothing to prevent the innerResponse/RequestAndPromisestructures (which hold the promise) from being destroyed in the worker's error path. Under sustained real disagg load, peer drops and worker-side errors are routine; each destroys an inner structure and firesBroken promiseon the corresponding future.Observed signature (internal incident report)
Recurring
~1.9 hMTTF on a Qwen3-Coder-480B 4P2D disagg shadow running post-PR-#14768 image. One decode worker accumulates anstd::future_error: Broken promisestorm (concentrated on a single worker; peers idle),trtllm_num_requests_runningclimbs while peers stay at 0, dynamo canary health check fails, kubelet restarts the worker, serving recovers. Repeat. Across ~19 h: 19 decode-worker restarts on 2 workers, 4 NVCF instance replacements.What this PR changes
Pure lifetime fix at the inner layer — mirrors the relevant subset of PR #13713's dataTransceiver changes:
sendAsync(LlmRequest&)→sendAsync(std::shared_ptr<LlmRequest> const&)(bothImpland publicCacheSender::sendAsync)receiveAsync(LlmRequest&)→receiveAsync(std::shared_ptr<LlmRequest> const&)(bothImpland publicCacheReceiver::receiveAsync)std::asyncnow captures theshared_ptrby value in a lambda rather thanstd::ref(llmRequest)— closes the UAF where Python_terminate_requestbeats the async taskrequestAndReceiveAsyncMultiThreads(LlmRequest&)→shared_ptranalogstruct Response:LlmRequest*→std::shared_ptr<LlmRequest>struct RequestAndPromise:LlmRequest*→std::shared_ptr<LlmRequest>(move-semantics tightened — no more null-out-raw-pointer dance)cacheTransceiver.cpppass the already-shared_ptrllmRequestdirectly instead of dereferencingcacheTransceiverTest.cppWrappedLlmRequestswitched fromunique_ptrtoshared_ptrso the test infra compiles against the new signaturesOut of scope (intentionally — preserving PR #14768's "always-on baseline" scope)
mInFlightCancelFlagsmap,getOrCreateInFlightCancelFlag, per-request flag wiring) — that's the G* cancel surface, env-gated byTRTLLM_DISAGG_ENABLE_INFLIGHT_CANCELstd::exceptioncatch onresponse()worker — orthogonal safety improvement, separableset_exceptionon cancelled-response promise destruction — cancel-path semantic improvement, lives with the cancel surfaceThese remain part of the broader cancellation follow-up tracked under TRTLLM-12721.
Summary by CodeRabbit