From 4f791c87542224976ae8901e87113a924e08b4fe Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Fri, 5 Jun 2026 17:44:19 +0300 Subject: [PATCH 1/4] UCT/EP: Enhance uct_ep_check to support asynchronous completion callbacks - Modified uct_ep_check to accept a completion callback, allowing for asynchronous reporting of peer status. - Updated related functions across various transport implementations to handle the new completion parameter. - Added tests to verify the correct behavior of the asynchronous completion mechanism during endpoint checks. This change improves the flexibility and responsiveness of endpoint status checks in the UCT layer. --- src/ucp/core/ucp_worker.c | 2 +- src/uct/base/uct_iface.h | 3 +- src/uct/ib/mlx5/rc/rc_mlx5.h | 3 +- src/uct/ib/mlx5/rc/rc_mlx5_ep.c | 21 ++++-- src/uct/ib/rc/base/rc_ep.c | 77 ++++++++++++++------- src/uct/ib/rc/base/rc_iface.h | 14 +++- src/uct/ib/rc/verbs/rc_verbs.h | 2 +- src/uct/ib/rc/verbs/rc_verbs_ep.c | 15 ++++- src/uct/ib/ud/base/ud_ep.c | 42 +++++++++--- src/uct/sm/mm/base/mm_ep.c | 2 +- src/uct/sm/scopy/cma/cma_ep.c | 2 +- src/uct/tcp/tcp.h | 18 +++-- src/uct/tcp/tcp_ep.c | 51 +++++++++++++- test/gtest/uct/test_peer_failure.cc | 100 ++++++++++++++++++++++++++++ 14 files changed, 294 insertions(+), 58 deletions(-) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 0bffb0b8b88..4f5606b96f3 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -3596,7 +3596,7 @@ static int ucp_worker_do_ep_keepalive(ucp_worker_h worker, ucs_time_t now) if (status == UCS_ERR_NO_RESOURCE) { return 0; - } else if (status != UCS_OK) { + } else if (UCS_STATUS_IS_ERR(status)) { ucs_diag("worker %p: keepalive failed on ep %p lane[%d]=%p: %s", worker, ep, lane, uct_ep, ucs_status_string(status)); } else { diff --git a/src/uct/base/uct_iface.h b/src/uct/base/uct_iface.h index 417977292c0..63ca158f825 100644 --- a/src/uct/base/uct_iface.h +++ b/src/uct/base/uct_iface.h @@ -244,8 +244,7 @@ enum { /** * In debug mode, check that keepalive params are valid */ -#define UCT_EP_KEEPALIVE_CHECK_PARAM(_flags, _comp) \ - UCT_CHECK_PARAM((_comp) == NULL, "Unsupported completion on ep_check"); \ +#define UCT_EP_KEEPALIVE_CHECK_PARAM(_flags) \ UCT_CHECK_PARAM((_flags) == 0, "Unsupported flags: %x", (_flags)); diff --git a/src/uct/ib/mlx5/rc/rc_mlx5.h b/src/uct/ib/mlx5/rc/rc_mlx5.h index 970d3c055c6..d346a44f86d 100644 --- a/src/uct/ib/mlx5/rc/rc_mlx5.h +++ b/src/uct/ib/mlx5/rc/rc_mlx5.h @@ -185,7 +185,8 @@ uct_rc_mlx5_base_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode, ucs_status_t uct_rc_mlx5_base_ep_fence(uct_ep_h tl_ep, unsigned flags); -void uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep); +ucs_status_t +uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp); void uct_rc_mlx5_base_ep_vfs_populate(uct_rc_ep_t *rc_ep); diff --git a/src/uct/ib/mlx5/rc/rc_mlx5_ep.c b/src/uct/ib/mlx5/rc/rc_mlx5_ep.c index f796e15c589..53302370fea 100644 --- a/src/uct/ib/mlx5/rc/rc_mlx5_ep.c +++ b/src/uct/ib/mlx5/rc/rc_mlx5_ep.c @@ -523,18 +523,25 @@ ucs_status_t uct_rc_mlx5_base_ep_fence(uct_ep_h tl_ep, unsigned flags) return uct_rc_ep_fence(tl_ep, &ep->tx.wq.fi); } -void uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep) +ucs_status_t +uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp) { UCT_RC_MLX5_BASE_EP_DECL(tl_ep, iface, ep); uint64_t dummy = 0; /* Dummy buffer to suppress compiler warning */ - uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC, - &ep->super.txqp, &ep->tx.wq, - MLX5_OPCODE_RDMA_WRITE, &dummy, 0, - 0, 0, 0, - 0, 0, - 0, 0, + if (comp == NULL) { + uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC, &ep->super.txqp, + &ep->tx.wq, MLX5_OPCODE_RDMA_WRITE, &dummy, + 0, 0, 0, 0, 0, 0, 0, 0, 0, INT_MAX); + return UCS_OK; + } + + uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC, &ep->super.txqp, + &ep->tx.wq, MLX5_OPCODE_RDMA_WRITE, &dummy, + 0, 0, 0, 0, 0, 0, 0, MLX5_WQE_CTRL_CQ_UPDATE, 0, INT_MAX); + return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super, + &ep->super.txqp, comp, ep->tx.wq.sig_pi); } void uct_rc_mlx5_base_ep_vfs_populate(uct_rc_ep_t *rc_ep) diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index b7177a70671..bf98169032d 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -319,7 +319,7 @@ ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n, return UCS_ERR_BUSY; } - UCS_STATIC_ASSERT(sizeof(uct_pending_req_priv_arb_t) <= + UCS_STATIC_ASSERT(sizeof(uct_rc_pending_req_priv_t) <= UCT_PENDING_REQ_PRIV_LEN); uct_pending_req_arb_group_push(&ep->arb_group, n); UCT_TL_EP_STAT_PEND(&ep->super); @@ -372,9 +372,16 @@ uct_rc_ep_arbiter_purge_internal_cb(ucs_arbiter_t *arbiter, uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv); uct_rc_ep_t *ep = ucs_container_of(group, uct_rc_ep_t, arb_group); uct_rc_pending_req_t *freq; + uct_completion_t *comp; if (req->func == uct_rc_ep_check_progress) { ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING; + + comp = uct_rc_pending_req_priv(req)->comp; + if (comp != NULL) { + uct_invoke_completion(comp, UCS_ERR_CANCELED); + } + ucs_mpool_put(req); } else if (req->func == uct_rc_ep_fc_grant) { freq = ucs_derived_of(req, uct_rc_pending_req_t); @@ -531,12 +538,15 @@ ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available, return UCS_INPROGRESS; } -static ucs_status_t uct_rc_ep_check_internal(uct_ep_h tl_ep) +static ucs_status_t +uct_rc_ep_check_internal(uct_ep_h tl_ep, uct_completion_t *comp) { uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t); uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t); - uct_rc_iface_ops_t *ops = ucs_derived_of(iface->super.ops, uct_rc_iface_ops_t); + uct_rc_iface_ops_t *ops = ucs_derived_of(iface->super.ops, + uct_rc_iface_ops_t); + ucs_status_t status; /* in case if no TX resources are available then there is at least * one signaled operation which provides actual peer status, in this case @@ -547,52 +557,71 @@ static ucs_status_t uct_rc_ep_check_internal(uct_ep_h tl_ep) * to add request to pending queue */ UCT_RC_CHECK_CQE_RET(iface, ep, UCS_ERR_NO_RESOURCE); - ops->ep_post_check(tl_ep); - - return UCS_OK; + status = ops->ep_post_check(tl_ep, comp); + return (comp == NULL) ? UCS_OK : status; } static ucs_status_t uct_rc_ep_check_progress(uct_pending_req_t *self) { uct_rc_pending_req_t *req = ucs_derived_of(self, uct_rc_pending_req_t); uct_rc_ep_t *ep = ucs_derived_of(req->ep, uct_rc_ep_t); + uct_completion_t *comp = uct_rc_pending_req_priv(self)->comp; ucs_status_t status; - ucs_assert(ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING); + /* The flag may already be cleared here: when multiple uct_rc_ep_check + * calls land on the same EP under CQE pressure each queues its own + * pending req, and the first req's dispatch clears the flag before the + * rest run. With comp == NULL we have nothing to fire and the sibling's + * keepalive WQE already covers peer-liveness - skip the redundant post. */ + if ((comp == NULL) && !(ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING)) { + ucs_mpool_put(req); + return UCS_OK; + } + + status = uct_rc_ep_check_internal(req->ep, comp); + if (status == UCS_ERR_NO_RESOURCE) { + return UCS_ERR_NO_RESOURCE; + } - status = uct_rc_ep_check_internal(req->ep); + ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING; if (status == UCS_OK) { - ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING; + ucs_assert(comp == NULL); ucs_mpool_put(req); - } else { - ucs_assert(status == UCS_ERR_NO_RESOURCE); + return UCS_OK; } - return status; + if (status == UCS_INPROGRESS) { + return status; + } + + ucs_assert(UCS_STATUS_IS_ERR(status)); + uct_invoke_completion(comp, status); + ucs_mpool_put(req); + return UCS_OK; } ucs_status_t uct_rc_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) { uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t); - uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, - uct_rc_iface_t); + uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t); uct_rc_pending_req_t *req; ucs_status_t status; - UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); + UCT_EP_KEEPALIVE_CHECK_PARAM(flags); ucs_assert(ep->flags & UCT_RC_EP_FLAG_CONNECTED); - if (ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING) { + if ((comp == NULL) && (ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING)) { /* keepalive request is in pending queue and will be * processed when resources are available */ return UCS_OK; } - status = uct_rc_ep_check_internal(tl_ep); + status = uct_rc_ep_check_internal(tl_ep, comp); if (status != UCS_ERR_NO_RESOURCE) { - ucs_assert(status == UCS_OK); + ucs_assert((comp == NULL) ? (status == UCS_OK) : + (status == UCS_INPROGRESS)); return status; } @@ -602,13 +631,13 @@ uct_rc_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) return UCS_ERR_NO_MEMORY; } - req->ep = &ep->super.super; - req->super.func = uct_rc_ep_check_progress; - status = uct_rc_ep_pending_add(tl_ep, &req->super, 0); - ep->flags |= UCT_RC_EP_FLAG_KEEPALIVE_PENDING; + req->ep = &ep->super.super; + req->super.func = uct_rc_ep_check_progress; + uct_rc_pending_req_priv(&req->super)->comp = comp; + status = uct_rc_ep_pending_add(tl_ep, &req->super, 0); + ep->flags |= UCT_RC_EP_FLAG_KEEPALIVE_PENDING; ucs_assert_always(status == UCS_OK); - - return UCS_OK; + return (comp == NULL) ? UCS_OK : UCS_INPROGRESS; } int uct_rc_ep_is_connected(uct_rc_ep_t *ep, struct ibv_ah_attr *ah_attr, diff --git a/src/uct/ib/rc/base/rc_iface.h b/src/uct/ib/rc/base/rc_iface.h index bed6c7dc70a..bbc7c94eb29 100644 --- a/src/uct/ib/rc/base/rc_iface.h +++ b/src/uct/ib/rc/base/rc_iface.h @@ -141,6 +141,12 @@ typedef struct uct_rc_pending_req { } uct_rc_pending_req_t; +typedef struct { + uct_pending_req_priv_arb_t arb; + uct_completion_t *comp; +} uct_rc_pending_req_priv_t; + + /** * RC fence type. */ @@ -218,7 +224,8 @@ typedef void (*uct_rc_iface_qp_cleanup_func_t)( uct_rc_iface_qp_cleanup_ctx_t *cleanup_ctx); -typedef void (*uct_rc_iface_ep_post_check_func_t)(uct_ep_h tl_ep); +typedef ucs_status_t (*uct_rc_iface_ep_post_check_func_t)( + uct_ep_h tl_ep, uct_completion_t *comp); typedef void (*uct_rc_iface_ep_vfs_populate_func_t)(uct_rc_ep_t *rc_ep); @@ -641,6 +648,11 @@ uct_rc_iface_send_op_set_name(uct_rc_iface_send_op_t *op, const char *name) #endif } +static UCS_F_ALWAYS_INLINE uct_rc_pending_req_priv_t* +uct_rc_pending_req_priv(uct_pending_req_t *req) +{ + return (uct_rc_pending_req_priv_t*)&req->priv; +} /** * Helper function to set ECE to qp. diff --git a/src/uct/ib/rc/verbs/rc_verbs.h b/src/uct/ib/rc/verbs/rc_verbs.h index 41e26129bc2..6a49fb8ffc2 100644 --- a/src/uct/ib/rc/verbs/rc_verbs.h +++ b/src/uct/ib/rc/verbs/rc_verbs.h @@ -166,7 +166,7 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags, ucs_status_t uct_rc_verbs_ep_fence(uct_ep_h tl_ep, unsigned flags); -void uct_rc_verbs_ep_post_check(uct_ep_h tl_ep); +ucs_status_t uct_rc_verbs_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp); void uct_rc_verbs_ep_vfs_populate(uct_rc_ep_t *rc_ep); diff --git a/src/uct/ib/rc/verbs/rc_verbs_ep.c b/src/uct/ib/rc/verbs/rc_verbs_ep.c index 0552125d576..f4f471b7966 100644 --- a/src/uct/ib/rc/verbs/rc_verbs_ep.c +++ b/src/uct/ib/rc/verbs/rc_verbs_ep.c @@ -519,11 +519,20 @@ ucs_status_t uct_rc_verbs_ep_fence(uct_ep_h tl_ep, unsigned flags) return uct_rc_ep_fence(tl_ep, &ep->fi); } -void uct_rc_verbs_ep_post_check(uct_ep_h tl_ep) +ucs_status_t uct_rc_verbs_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp) { - uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t); + uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t); + uct_rc_verbs_iface_t *iface = ucs_derived_of(tl_ep->iface, + uct_rc_verbs_iface_t); - uct_rc_verbs_ep_post_flush(ep, 0); + if (comp == NULL) { + uct_rc_verbs_ep_post_flush(ep, 0); + return UCS_OK; + } + + uct_rc_verbs_ep_post_flush(ep, IBV_SEND_SIGNALED); + return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super, + &ep->super.txqp, comp, ep->txcnt.pi); } void uct_rc_verbs_ep_vfs_populate(uct_rc_ep_t *rc_ep) diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index 009eb90c01c..afe0c867287 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -1269,20 +1269,44 @@ ucs_status_t uct_ud_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *c uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); char dummy = 0; + ucs_status_t status; - UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); + UCT_EP_KEEPALIVE_CHECK_PARAM(flags); uct_ud_enter(iface); - if (/* check that no TX resources are available (i.e. there is signaled - * operation which provides actual peer status) */ - !uct_ud_ep_is_connected(ep) || - !uct_ud_ep_is_last_ack_received(ep)) { - uct_ud_leave(iface); - return UCS_OK; + + if (!uct_ud_ep_is_connected(ep)) { + /* Wireup pending - nudge the CREQ out if it isn't on the wire yet. */ + if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) { + uct_ud_ep_do_pending_ctl(ep, iface); + if (ucs_queue_is_empty(&ep->tx.window)) { + /* CREQ couldn't be posted - caller retries. */ + status = UCS_ERR_NO_RESOURCE; + goto out; + } + } + } else if (uct_ud_ep_is_last_ack_received(ep)) { + /* Connected and idle - emit a probe for the peer to ACK + * (uct_ud_enter is recursive so put_short is safe under the lock). */ + status = uct_ep_put_short(tl_ep, &dummy, 0, 0, 0); + if (UCS_STATUS_IS_ERR(status) || (comp == NULL)) { + goto out; + } + } else if (comp == NULL) { + /* Connected with in-flight signaled op - reliability layer is + * already monitoring it, status known good. */ + status = UCS_OK; + goto out; } - uct_ud_leave(iface); - return uct_ep_put_short(tl_ep, &dummy, 0, 0, 0); + /* Chain @c comp onto the last skb in tx.window (CREQ, in-flight op, or + * the probe we just posted); fires UCS_OK on peer ACK or an error from + * the reliability layer purge if peer becomes unreachable. */ + status = uct_ud_ep_comp_skb_add(iface, ep, comp); + +out: + uct_ud_leave(iface); + return status; } static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep) diff --git a/src/uct/sm/mm/base/mm_ep.c b/src/uct/sm/mm/base/mm_ep.c index 2b10048a488..db4f9cb48bc 100644 --- a/src/uct/sm/mm/base/mm_ep.c +++ b/src/uct/sm/mm/base/mm_ep.c @@ -560,7 +560,7 @@ uct_mm_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) { uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t); - UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); + UCT_EP_KEEPALIVE_CHECK_PARAM(flags); uct_ep_keepalive_check(tl_ep, &ep->keepalive, ep->fifo_ctl->pid, flags, comp); return UCS_OK; diff --git a/src/uct/sm/scopy/cma/cma_ep.c b/src/uct/sm/scopy/cma/cma_ep.c index 12be85a49d3..d662fac59a8 100644 --- a/src/uct/sm/scopy/cma/cma_ep.c +++ b/src/uct/sm/scopy/cma/cma_ep.c @@ -143,7 +143,7 @@ ucs_status_t uct_cma_ep_check(const uct_ep_h tl_ep, unsigned flags, { uct_cma_ep_t *ep = ucs_derived_of(tl_ep, uct_cma_ep_t); - UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); + UCT_EP_KEEPALIVE_CHECK_PARAM(flags); uct_ep_keepalive_check(tl_ep, &ep->keepalive, ep->remote_pid, flags, comp); return UCS_OK; } diff --git a/src/uct/tcp/tcp.h b/src/uct/tcp/tcp.h index 68c3f394b12..f899ed43daf 100644 --- a/src/uct/tcp/tcp.h +++ b/src/uct/tcp/tcp.h @@ -488,12 +488,18 @@ typedef struct uct_tcp_md_config { typedef struct uct_tcp_ep_pending_req { - uct_pending_req_t super; - uct_tcp_ep_t *ep; - struct { - uct_tcp_cm_conn_event_t event; - uint8_t log_error; - } cm; + uct_pending_req_t super; + uct_tcp_ep_t *ep; + union { + struct { + uct_tcp_cm_conn_event_t event; + uint8_t log_error; + } cm; + struct { + /* User completion passed to uct_ep_check */ + uct_completion_t *comp; + } keepalive; + }; } uct_tcp_ep_pending_req_t; diff --git a/src/uct/tcp/tcp_ep.c b/src/uct/tcp/tcp_ep.c index 08e49966732..aafba0e8e81 100644 --- a/src/uct/tcp/tcp_ep.c +++ b/src/uct/tcp/tcp_ep.c @@ -2160,14 +2160,34 @@ ucs_status_t uct_tcp_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *req, return UCS_OK; } +static ucs_status_t uct_tcp_ep_keepalive_pending_cb(uct_pending_req_t *self) +{ + uct_tcp_ep_pending_req_t *req = ucs_derived_of(self, + uct_tcp_ep_pending_req_t); + + /* Dispatch implies EP is CONNECTED: the wireup handshake completed, + * which is the peer-alive proof uct_ep_check needs. */ + uct_invoke_completion(req->keepalive.comp, UCS_OK); + ucs_free(req); + return UCS_OK; +} + static void uct_tcp_ep_pending_purge_cb(uct_pending_req_t *self, void *arg) { uct_tcp_ep_pending_purge_arg_t *purge_arg = arg; uct_tcp_ep_pending_req_t *tcp_pending_req; + ucs_status_t status; if (self->func == uct_tcp_cm_send_event_pending_cb) { tcp_pending_req = ucs_derived_of(self, uct_tcp_ep_pending_req_t); ucs_free(tcp_pending_req); + } else if (self->func == uct_tcp_ep_keepalive_pending_cb) { + tcp_pending_req = ucs_derived_of(self, uct_tcp_ep_pending_req_t); + status = (tcp_pending_req->ep->conn_state == + UCT_TCP_EP_CONN_STATE_CLOSED) ? + UCS_ERR_CONNECTION_RESET : UCS_ERR_CANCELED; + uct_invoke_completion(tcp_pending_req->keepalive.comp, status); + ucs_free(tcp_pending_req); } else { purge_arg->cb(self, purge_arg->arg); } @@ -2235,10 +2255,39 @@ uct_tcp_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) uct_tcp_ep_t *ep = ucs_derived_of(tl_ep, uct_tcp_ep_t); uct_tcp_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_tcp_iface_t); uct_tcp_am_hdr_t *hdr = NULL; /* init to suppress build warning */ + uct_tcp_ep_pending_req_t *req; ucs_status_t status; - UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp); + UCT_EP_KEEPALIVE_CHECK_PARAM(flags); + + if (ep->conn_state == UCT_TCP_EP_CONN_STATE_CLOSED) { + return UCS_ERR_CONNECTION_RESET; + } + + if (ep->conn_state != UCT_TCP_EP_CONN_STATE_CONNECTED) { + /* Wireup in progress - defer @c comp on pending_q. It fires with + * UCS_OK on CONNECTED via uct_tcp_ep_keepalive_pending_cb, or with + * UCS_ERR_CANCELED on EP teardown via the pending-purge path. */ + if (comp == NULL) { + return UCS_INPROGRESS; + } + + req = ucs_malloc(sizeof(*req), "tcp_keepalive_pending_req"); + if (ucs_unlikely(req == NULL)) { + return UCS_ERR_NO_MEMORY; + } + + req->ep = ep; + req->keepalive.comp = comp; + req->super.func = uct_tcp_ep_keepalive_pending_cb; + uct_pending_req_queue_push(&ep->pending_q, &req->super); + UCT_TL_EP_STAT_PEND(&ep->super); + return UCS_INPROGRESS; + } + /* CONNECTED: fire-and-forget keepalive AM. TCP socket failures are picked + * up asynchronously via the existing EPOLLERR / EPOLLHUP path and + * delivered through the iface err_handler. */ status = uct_tcp_ep_am_prepare(iface, ep, UCT_TCP_EP_KEEPALIVE_AM_ID, &hdr); if (status != UCS_OK) { diff --git a/test/gtest/uct/test_peer_failure.cc b/test/gtest/uct/test_peer_failure.cc index 0e7a26b4a9e..dfff6cf024b 100644 --- a/test/gtest/uct/test_peer_failure.cc +++ b/test/gtest/uct/test_peer_failure.cc @@ -528,6 +528,106 @@ UCS_TEST_P(test_uct_keepalive, ep_check) _UCT_INSTANTIATE_TEST_CASE(test_uct_keepalive, posix) +/* Verify that uct_ep_check honors a user completion: peer status that is not + * yet known must be reported asynchronously via @c comp rather than being + * faked as UCS_OK. Instantiated on every transport so that any transport + * that advertises @c UCT_IFACE_FLAG_EP_CHECK but has not yet implemented the + * asynchronous form of @ref uct_ep_check is surfaced as a test failure. */ +class test_uct_ep_check_async : public uct_test { +public: + test_uct_ep_check_async() : m_e1(NULL), m_e2(NULL), m_comp_count(0), + m_comp_status(UCS_OK) + { + m_comp.func = comp_cb; + m_comp.count = 1; + m_comp.status = UCS_OK; + } + + void init() + { + uct_test::init(); + m_e1 = uct_test::create_entity(0, err_handler); + m_e2 = uct_test::create_entity(0, err_handler); + m_entities.push_back(m_e1); + m_entities.push_back(m_e2); + check_skip_test(); + } + +protected: + static void comp_cb(uct_completion_t *self) + { + test_uct_ep_check_async *test = + ucs_container_of(self, test_uct_ep_check_async, m_comp); + test->m_comp_status = self->status; + test->m_comp_count++; + } + + static ucs_status_t err_handler(void *arg, uct_ep_h ep, ucs_status_t status) + { + ADD_FAILURE() << "unexpected EP error: " << ucs_status_string(status); + return UCS_OK; + } + + /* Verify uct_ep_check honors @c comp: peer status known synchronously + * may be reported as UCS_OK without firing @c comp, otherwise it must be + * reported as UCS_INPROGRESS and propagated via @c comp. */ + void check_ep_check_async(uct_ep_h ep) + { + ucs_status_t status = uct_ep_check(ep, 0, &m_comp); + if (status == UCS_OK) { + EXPECT_EQ(0u, m_comp_count); + return; + } + ASSERT_EQ(UCS_INPROGRESS, status); + wait_for_flag(&m_comp_count); + EXPECT_EQ(1u, m_comp_count); + EXPECT_UCS_OK(m_comp_status); + } + + entity *m_e1; + entity *m_e2; + uct_completion_t m_comp; + volatile unsigned m_comp_count; + ucs_status_t m_comp_status; +}; + + +/* ep_check on a freshly created connected EP must report peer status via + * @c comp (asynchronously) or as a synchronous UCS_OK when the transport + * already knows the answer. */ +UCS_TEST_SKIP_COND_P(test_uct_ep_check_async, comp_during_wireup, + !check_caps(UCT_IFACE_FLAG_EP_CHECK)) +{ + m_e1->connect(0, *m_e2, 0); + check_ep_check_async(m_e1->ep(0)); +} + + +/* Same as above on a fully connected, idle EP. */ +UCS_TEST_SKIP_COND_P(test_uct_ep_check_async, comp_when_idle, + !check_caps(UCT_IFACE_FLAG_EP_CHECK)) +{ + m_e1->connect(0, *m_e2, 0); + flush(); + check_ep_check_async(m_e1->ep(0)); +} + + +/* ep_check with @c comp == NULL on a fully connected idle EP keeps the + * legacy synchronous behavior and returns UCS_OK. */ +UCS_TEST_SKIP_COND_P(test_uct_ep_check_async, no_comp_when_connected, + !check_caps(UCT_IFACE_FLAG_EP_CHECK)) +{ + m_e1->connect(0, *m_e2, 0); + flush(); + + EXPECT_EQ(UCS_OK, uct_ep_check(m_e1->ep(0), 0, NULL)); +} + + +UCT_INSTANTIATE_NO_SELF_TEST_CASE(test_uct_ep_check_async) + + class test_uct_peer_failure_keepalive : public test_uct_peer_failure { public: From 01632ed4c8ce0ff338d59a9a517ae026ae705881 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Thu, 28 May 2026 09:47:20 +0300 Subject: [PATCH 2/4] GTEST: Refactor completion handling in test_peer_failure --- test/gtest/uct/test_peer_failure.cc | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/test/gtest/uct/test_peer_failure.cc b/test/gtest/uct/test_peer_failure.cc index dfff6cf024b..c58bacd1a2a 100644 --- a/test/gtest/uct/test_peer_failure.cc +++ b/test/gtest/uct/test_peer_failure.cc @@ -538,9 +538,10 @@ class test_uct_ep_check_async : public uct_test { test_uct_ep_check_async() : m_e1(NULL), m_e2(NULL), m_comp_count(0), m_comp_status(UCS_OK) { - m_comp.func = comp_cb; - m_comp.count = 1; - m_comp.status = UCS_OK; + m_comp.self = this; + m_comp.uct.func = comp_cb; + m_comp.uct.count = 1; + m_comp.uct.status = UCS_OK; } void init() @@ -554,12 +555,16 @@ class test_uct_ep_check_async : public uct_test { } protected: + struct completion { + test_uct_ep_check_async *self; + uct_completion_t uct; + }; + static void comp_cb(uct_completion_t *self) { - test_uct_ep_check_async *test = - ucs_container_of(self, test_uct_ep_check_async, m_comp); - test->m_comp_status = self->status; - test->m_comp_count++; + completion *comp = ucs_container_of(self, completion, uct); + comp->self->m_comp_status = self->status; + comp->self->m_comp_count++; } static ucs_status_t err_handler(void *arg, uct_ep_h ep, ucs_status_t status) @@ -573,7 +578,7 @@ class test_uct_ep_check_async : public uct_test { * reported as UCS_INPROGRESS and propagated via @c comp. */ void check_ep_check_async(uct_ep_h ep) { - ucs_status_t status = uct_ep_check(ep, 0, &m_comp); + ucs_status_t status = uct_ep_check(ep, 0, &m_comp.uct); if (status == UCS_OK) { EXPECT_EQ(0u, m_comp_count); return; @@ -586,7 +591,7 @@ class test_uct_ep_check_async : public uct_test { entity *m_e1; entity *m_e2; - uct_completion_t m_comp; + completion m_comp; volatile unsigned m_comp_count; ucs_status_t m_comp_status; }; From 6d97acba8076c1f197703a43b2a74397d4ed4f22 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Fri, 5 Jun 2026 14:49:45 +0300 Subject: [PATCH 3/4] UCT/RC: fix return code of uct_rc_ep_check_progress for correct pending handling --- src/uct/ib/rc/base/rc_ep.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/uct/ib/rc/base/rc_ep.c b/src/uct/ib/rc/base/rc_ep.c index bf98169032d..d5aa86a7bdd 100644 --- a/src/uct/ib/rc/base/rc_ep.c +++ b/src/uct/ib/rc/base/rc_ep.c @@ -591,7 +591,7 @@ static ucs_status_t uct_rc_ep_check_progress(uct_pending_req_t *self) } if (status == UCS_INPROGRESS) { - return status; + return UCS_OK; } ucs_assert(UCS_STATUS_IS_ERR(status)); From 421e02a204e1ab0851cd82d598d3f8b2e090cad9 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Fri, 5 Jun 2026 16:26:12 +0300 Subject: [PATCH 4/4] UCT/UD: preserve UCS_OK return for uct_ud_ep_check(comp==NULL) Address PR #11495 review r3318024007: in the wireup branch uct_ud_ep_check() fell through to uct_ud_ep_comp_skb_add(), which returns UCS_INPROGRESS for comp==NULL and broke the previous UCS_OK contract observed by legacy NULL-comp callers. Short-circuit the comp==NULL case in a unified tail block; the now redundant "connected + in-flight + comp==NULL" else-if is folded into the same check. --- src/uct/ib/ud/base/ud_ep.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/uct/ib/ud/base/ud_ep.c b/src/uct/ib/ud/base/ud_ep.c index afe0c867287..1163149955b 100644 --- a/src/uct/ib/ud/base/ud_ep.c +++ b/src/uct/ib/ud/base/ud_ep.c @@ -1292,17 +1292,16 @@ ucs_status_t uct_ud_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *c if (UCS_STATUS_IS_ERR(status) || (comp == NULL)) { goto out; } - } else if (comp == NULL) { - /* Connected with in-flight signaled op - reliability layer is - * already monitoring it, status known good. */ - status = UCS_OK; - goto out; } - /* Chain @c comp onto the last skb in tx.window (CREQ, in-flight op, or - * the probe we just posted); fires UCS_OK on peer ACK or an error from - * the reliability layer purge if peer becomes unreachable. */ - status = uct_ud_ep_comp_skb_add(iface, ep, comp); + if (comp == NULL) { + status = UCS_OK; + } else { + /* Chain @c comp onto the last skb in tx.window (CREQ, in-flight op, or + * the probe we just posted); fires UCS_OK on peer ACK or an error from + * the reliability layer purge if peer becomes unreachable. */ + status = uct_ud_ep_comp_skb_add(iface, ep, comp); + } out: uct_ud_leave(iface);