Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -3596,7 +3596,7 @@ static int ucp_worker_do_ep_keepalive(ucp_worker_h worker, ucs_time_t now)

if (status == UCS_ERR_NO_RESOURCE) {
return 0;
} else if (status != UCS_OK) {
} else if (UCS_STATUS_IS_ERR(status)) {
Comment thread
evgeny-leksikov marked this conversation as resolved.
ucs_diag("worker %p: keepalive failed on ep %p lane[%d]=%p: %s", worker,
ep, lane, uct_ep, ucs_status_string(status));
} else {
Expand Down
3 changes: 1 addition & 2 deletions src/uct/base/uct_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ enum {
/**
* In debug mode, check that keepalive params are valid
*/
#define UCT_EP_KEEPALIVE_CHECK_PARAM(_flags, _comp) \
UCT_CHECK_PARAM((_comp) == NULL, "Unsupported completion on ep_check"); \
#define UCT_EP_KEEPALIVE_CHECK_PARAM(_flags) \
UCT_CHECK_PARAM((_flags) == 0, "Unsupported flags: %x", (_flags));


Expand Down
3 changes: 2 additions & 1 deletion src/uct/ib/mlx5/rc/rc_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ uct_rc_mlx5_base_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode,

ucs_status_t uct_rc_mlx5_base_ep_fence(uct_ep_h tl_ep, unsigned flags);

void uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep);
ucs_status_t
uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp);

void uct_rc_mlx5_base_ep_vfs_populate(uct_rc_ep_t *rc_ep);

Expand Down
21 changes: 14 additions & 7 deletions src/uct/ib/mlx5/rc/rc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -523,18 +523,25 @@ ucs_status_t uct_rc_mlx5_base_ep_fence(uct_ep_h tl_ep, unsigned flags)
return uct_rc_ep_fence(tl_ep, &ep->tx.wq.fi);
}

void uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep)
ucs_status_t
uct_rc_mlx5_base_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp)
{
UCT_RC_MLX5_BASE_EP_DECL(tl_ep, iface, ep);
uint64_t dummy = 0; /* Dummy buffer to suppress compiler warning */

uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_RDMA_WRITE, &dummy, 0,
0, 0, 0,
0, 0,
0, 0,
if (comp == NULL) {
uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
&ep->tx.wq, MLX5_OPCODE_RDMA_WRITE, &dummy,
0, 0, 0, 0, 0, 0, 0, 0, 0, INT_MAX);
return UCS_OK;
}

uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
&ep->tx.wq, MLX5_OPCODE_RDMA_WRITE, &dummy,
0, 0, 0, 0, 0, 0, 0, MLX5_WQE_CTRL_CQ_UPDATE,
0, INT_MAX);
return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super,
&ep->super.txqp, comp, ep->tx.wq.sig_pi);
}

void uct_rc_mlx5_base_ep_vfs_populate(uct_rc_ep_t *rc_ep)
Expand Down
77 changes: 53 additions & 24 deletions src/uct/ib/rc/base/rc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
return UCS_ERR_BUSY;
}

UCS_STATIC_ASSERT(sizeof(uct_pending_req_priv_arb_t) <=
UCS_STATIC_ASSERT(sizeof(uct_rc_pending_req_priv_t) <=
UCT_PENDING_REQ_PRIV_LEN);
uct_pending_req_arb_group_push(&ep->arb_group, n);
UCT_TL_EP_STAT_PEND(&ep->super);
Expand Down Expand Up @@ -372,9 +372,16 @@ uct_rc_ep_arbiter_purge_internal_cb(ucs_arbiter_t *arbiter,
uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
uct_rc_ep_t *ep = ucs_container_of(group, uct_rc_ep_t, arb_group);
uct_rc_pending_req_t *freq;
uct_completion_t *comp;

if (req->func == uct_rc_ep_check_progress) {
ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING;

comp = uct_rc_pending_req_priv(req)->comp;
if (comp != NULL) {
uct_invoke_completion(comp, UCS_ERR_CANCELED);
}

ucs_mpool_put(req);
} else if (req->func == uct_rc_ep_fc_grant) {
freq = ucs_derived_of(req, uct_rc_pending_req_t);
Expand Down Expand Up @@ -531,12 +538,15 @@ ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
return UCS_INPROGRESS;
}

static ucs_status_t uct_rc_ep_check_internal(uct_ep_h tl_ep)
static ucs_status_t
uct_rc_ep_check_internal(uct_ep_h tl_ep, uct_completion_t *comp)
{
uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_iface_t);
uct_rc_iface_ops_t *ops = ucs_derived_of(iface->super.ops, uct_rc_iface_ops_t);
uct_rc_iface_ops_t *ops = ucs_derived_of(iface->super.ops,
uct_rc_iface_ops_t);
ucs_status_t status;

/* in case if no TX resources are available then there is at least
* one signaled operation which provides actual peer status, in this case
Expand All @@ -547,52 +557,71 @@ static ucs_status_t uct_rc_ep_check_internal(uct_ep_h tl_ep)
* to add request to pending queue */
UCT_RC_CHECK_CQE_RET(iface, ep, UCS_ERR_NO_RESOURCE);

ops->ep_post_check(tl_ep);

return UCS_OK;
status = ops->ep_post_check(tl_ep, comp);
return (comp == NULL) ? UCS_OK : status;
}

static ucs_status_t uct_rc_ep_check_progress(uct_pending_req_t *self)
{
uct_rc_pending_req_t *req = ucs_derived_of(self, uct_rc_pending_req_t);
uct_rc_ep_t *ep = ucs_derived_of(req->ep, uct_rc_ep_t);
uct_completion_t *comp = uct_rc_pending_req_priv(self)->comp;
ucs_status_t status;

ucs_assert(ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING);
/* The flag may already be cleared here: when multiple uct_rc_ep_check
* calls land on the same EP under CQE pressure each queues its own
* pending req, and the first req's dispatch clears the flag before the
* rest run. With comp == NULL we have nothing to fire and the sibling's
* keepalive WQE already covers peer-liveness - skip the redundant post. */
if ((comp == NULL) && !(ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING)) {
Comment thread
evgeny-leksikov marked this conversation as resolved.
ucs_mpool_put(req);
return UCS_OK;
}

status = uct_rc_ep_check_internal(req->ep, comp);
if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
}

status = uct_rc_ep_check_internal(req->ep);
ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING;
if (status == UCS_OK) {
ep->flags &= ~UCT_RC_EP_FLAG_KEEPALIVE_PENDING;
ucs_assert(comp == NULL);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ MINOR

ucs_assert(comp == NULL) here is true by construction (uct_rc_ep_check_internal returns UCS_INPROGRESS when comp != NULL), but the early if (status == UCS_ERR_NO_RESOURCE) + late if (status == UCS_INPROGRESS) + final error branch reads easier as a switch. Minor readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep the per-case if-chain. Each arm has a different post-action (release-and-return / leave-req-and-return / invoke-completion-then-release) and the early UCS_ERR_NO_RESOURCE return is semantically distinct (the req is re-queued by the dispatcher, not consumed). A switch would either need an early return outside it or a fall-through tail — neither makes the asymmetry clearer.

ucs_mpool_put(req);
} else {
ucs_assert(status == UCS_ERR_NO_RESOURCE);
return UCS_OK;
}

return status;
if (status == UCS_INPROGRESS) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a queued ep_check posts, uct_rc_ep_check_progress() returns UCS_INPROGRESS. uct_rc_ep_process_pending() maps that to NEXT_GROUP -> the same pending request stays queued. Further progress can repost w same comp -> purge can cancel a comp that the CQE path will also invoke. Free the pending request and return UCS_OK once the async check is armed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posted operations should be removed from pending queue and enqueued to transport level outstanding ops. So, pending purge won't take any effect on it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In uct_rc_ep_check after uct_rc_ep_check_internal, there's

ucs_assert((comp == NULL) ? (status == UCS_OK) :
                   (status == UCS_INPROGRESS));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user requests check with uct_completion, we return UCS_INPROGRESS with promise return actual status in uct_completion (can be used to acknowledge new connection), otherwise UCS_OK which means EP is fine at this exact moment, but in case if check WQE completes with ERR CQE, we report error by error callback (used to detect peer/link failure). What am I missing?

return UCS_OK;
}

ucs_assert(UCS_STATUS_IS_ERR(status));
uct_invoke_completion(comp, status);
ucs_mpool_put(req);
return UCS_OK;
}

ucs_status_t
uct_rc_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)
{
uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_iface_t);
uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
uct_rc_pending_req_t *req;
ucs_status_t status;

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);
UCT_EP_KEEPALIVE_CHECK_PARAM(flags);

ucs_assert(ep->flags & UCT_RC_EP_FLAG_CONNECTED);

if (ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING) {
if ((comp == NULL) && (ep->flags & UCT_RC_EP_FLAG_KEEPALIVE_PENDING)) {
/* keepalive request is in pending queue and will be
* processed when resources are available */
return UCS_OK;
}

status = uct_rc_ep_check_internal(tl_ep);
status = uct_rc_ep_check_internal(tl_ep, comp);
if (status != UCS_ERR_NO_RESOURCE) {
ucs_assert(status == UCS_OK);
ucs_assert((comp == NULL) ? (status == UCS_OK) :
(status == UCS_INPROGRESS));
return status;
}

Expand All @@ -602,13 +631,13 @@ uct_rc_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)
return UCS_ERR_NO_MEMORY;
}

req->ep = &ep->super.super;
req->super.func = uct_rc_ep_check_progress;
status = uct_rc_ep_pending_add(tl_ep, &req->super, 0);
ep->flags |= UCT_RC_EP_FLAG_KEEPALIVE_PENDING;
req->ep = &ep->super.super;
req->super.func = uct_rc_ep_check_progress;
uct_rc_pending_req_priv(&req->super)->comp = comp;
status = uct_rc_ep_pending_add(tl_ep, &req->super, 0);
ep->flags |= UCT_RC_EP_FLAG_KEEPALIVE_PENDING;
ucs_assert_always(status == UCS_OK);

return UCS_OK;
return (comp == NULL) ? UCS_OK : UCS_INPROGRESS;
}

int uct_rc_ep_is_connected(uct_rc_ep_t *ep, struct ibv_ah_attr *ah_attr,
Expand Down
14 changes: 13 additions & 1 deletion src/uct/ib/rc/base/rc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ typedef struct uct_rc_pending_req {
} uct_rc_pending_req_t;


typedef struct {
uct_pending_req_priv_arb_t arb;
uct_completion_t *comp;
} uct_rc_pending_req_priv_t;


/**
* RC fence type.
*/
Expand Down Expand Up @@ -218,7 +224,8 @@ typedef void (*uct_rc_iface_qp_cleanup_func_t)(
uct_rc_iface_qp_cleanup_ctx_t *cleanup_ctx);


typedef void (*uct_rc_iface_ep_post_check_func_t)(uct_ep_h tl_ep);
typedef ucs_status_t (*uct_rc_iface_ep_post_check_func_t)(
uct_ep_h tl_ep, uct_completion_t *comp);


typedef void (*uct_rc_iface_ep_vfs_populate_func_t)(uct_rc_ep_t *rc_ep);
Expand Down Expand Up @@ -641,6 +648,11 @@ uct_rc_iface_send_op_set_name(uct_rc_iface_send_op_t *op, const char *name)
#endif
}

static UCS_F_ALWAYS_INLINE uct_rc_pending_req_priv_t*
uct_rc_pending_req_priv(uct_pending_req_t *req)
{
return (uct_rc_pending_req_priv_t*)&req->priv;
}

/**
* Helper function to set ECE to qp.
Expand Down
2 changes: 1 addition & 1 deletion src/uct/ib/rc/verbs/rc_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ ucs_status_t uct_rc_verbs_ep_flush(uct_ep_h tl_ep, unsigned flags,

ucs_status_t uct_rc_verbs_ep_fence(uct_ep_h tl_ep, unsigned flags);

void uct_rc_verbs_ep_post_check(uct_ep_h tl_ep);
ucs_status_t uct_rc_verbs_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp);

void uct_rc_verbs_ep_vfs_populate(uct_rc_ep_t *rc_ep);

Expand Down
15 changes: 12 additions & 3 deletions src/uct/ib/rc/verbs/rc_verbs_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,20 @@ ucs_status_t uct_rc_verbs_ep_fence(uct_ep_h tl_ep, unsigned flags)
return uct_rc_ep_fence(tl_ep, &ep->fi);
}

void uct_rc_verbs_ep_post_check(uct_ep_h tl_ep)
ucs_status_t uct_rc_verbs_ep_post_check(uct_ep_h tl_ep, uct_completion_t *comp)
{
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);
uct_rc_verbs_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_verbs_ep_t);
uct_rc_verbs_iface_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_verbs_iface_t);

uct_rc_verbs_ep_post_flush(ep, 0);
if (comp == NULL) {
uct_rc_verbs_ep_post_flush(ep, 0);
return UCS_OK;
}

uct_rc_verbs_ep_post_flush(ep, IBV_SEND_SIGNALED);
return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super,
&ep->super.txqp, comp, ep->txcnt.pi);
}

void uct_rc_verbs_ep_vfs_populate(uct_rc_ep_t *rc_ep)
Expand Down
41 changes: 32 additions & 9 deletions src/uct/ib/ud/base/ud_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1269,20 +1269,43 @@ ucs_status_t uct_ud_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *c
uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
char dummy = 0;
ucs_status_t status;

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);
UCT_EP_KEEPALIVE_CHECK_PARAM(flags);
Comment thread
evgeny-leksikov marked this conversation as resolved.

uct_ud_enter(iface);
if (/* check that no TX resources are available (i.e. there is signaled
* operation which provides actual peer status) */
!uct_ud_ep_is_connected(ep) ||
!uct_ud_ep_is_last_ack_received(ep)) {
uct_ud_leave(iface);
return UCS_OK;

if (!uct_ud_ep_is_connected(ep)) {
/* Wireup pending - nudge the CREQ out if it isn't on the wire yet. */
if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) {
uct_ud_ep_do_pending_ctl(ep, iface);
if (ucs_queue_is_empty(&ep->tx.window)) {
/* CREQ couldn't be posted - caller retries. */
status = UCS_ERR_NO_RESOURCE;
goto out;
}
}
} else if (uct_ud_ep_is_last_ack_received(ep)) {
/* Connected and idle - emit a probe for the peer to ACK
* (uct_ud_enter is recursive so put_short is safe under the lock). */
status = uct_ep_put_short(tl_ep, &dummy, 0, 0, 0);
if (UCS_STATUS_IS_ERR(status) || (comp == NULL)) {
goto out;
}
}
uct_ud_leave(iface);

return uct_ep_put_short(tl_ep, &dummy, 0, 0, 0);
if (comp == NULL) {
status = UCS_OK;
} else {
/* Chain @c comp onto the last skb in tx.window (CREQ, in-flight op, or
* the probe we just posted); fires UCS_OK on peer ACK or an error from
* the reliability layer purge if peer becomes unreachable. */
status = uct_ud_ep_comp_skb_add(iface, ep, comp);
}

out:
uct_ud_leave(iface);
return status;
}

static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep)
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/mm/base/mm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ uct_mm_ep_check(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp)
{
uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t);

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);
UCT_EP_KEEPALIVE_CHECK_PARAM(flags);
uct_ep_keepalive_check(tl_ep, &ep->keepalive, ep->fifo_ctl->pid, flags,
comp);
return UCS_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/uct/sm/scopy/cma/cma_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ ucs_status_t uct_cma_ep_check(const uct_ep_h tl_ep, unsigned flags,
{
uct_cma_ep_t *ep = ucs_derived_of(tl_ep, uct_cma_ep_t);

UCT_EP_KEEPALIVE_CHECK_PARAM(flags, comp);
UCT_EP_KEEPALIVE_CHECK_PARAM(flags);
uct_ep_keepalive_check(tl_ep, &ep->keepalive, ep->remote_pid, flags, comp);
return UCS_OK;
}
18 changes: 12 additions & 6 deletions src/uct/tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,18 @@ typedef struct uct_tcp_md_config {


typedef struct uct_tcp_ep_pending_req {
uct_pending_req_t super;
uct_tcp_ep_t *ep;
struct {
uct_tcp_cm_conn_event_t event;
uint8_t log_error;
} cm;
uct_pending_req_t super;
uct_tcp_ep_t *ep;
union {
struct {
uct_tcp_cm_conn_event_t event;
uint8_t log_error;
} cm;
struct {
/* User completion passed to uct_ep_check */
uct_completion_t *comp;
} keepalive;
};
} uct_tcp_ep_pending_req_t;


Expand Down
Loading