Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ noinst_HEADERS = \
proto/proto.h \
rma/rma.h \
rma/rma.inl \
rma/rma_rndv.h \
rndv/proto_rndv.h \
rndv/proto_rndv.inl \
rndv/rndv_mtype.inl \
Expand Down Expand Up @@ -143,6 +144,7 @@ libucp_la_SOURCES = \
rma/get_offload.c \
rma/put_am.c \
rma/put_offload.c \
rma/rma_rndv.c \
rma/rma_send.c \
rma/rma_sw.c \
rma/flush.c \
Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,12 @@ void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,
}

ucp_request_put(req);
} else if (req->flags & UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL) {
ucs_assert(req->send.ep == ucp_ep);

ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1,
UCP_DT_MASK_ALL);
ucp_request_complete_send(req, status);
} else if (req->send.uct.func == ucp_amo_sw_proto.progress_fetch) {
/* Currently we don't support UCP EP request purging for proto mode */
ucs_assert(!ucp_ep->worker->context->config.ext.proto_enable);
Expand Down
8 changes: 7 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ static const char *ucp_request_flag_names[] = {
[ucs_ilog2(UCP_REQUEST_FLAG_RECV_TAG)] = "rcv_tag",
[ucs_ilog2(UCP_REQUEST_FLAG_RKEY_INUSE)] = "rk_use",
[ucs_ilog2(UCP_REQUEST_FLAG_USER_HEADER_COPIED)] = "hdr_copy",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL)] = "rndv_rcv_int",

#if UCS_ENABLE_ASSERT
[ucs_ilog2(UCP_REQUEST_FLAG_STREAM_RECV)] = "strm_rcv",
[ucs_ilog2(UCP_REQUEST_DEBUG_FLAG_EXTERNAL)] = "extrn",
[ucs_ilog2(UCP_REQUEST_FLAG_SUPER_VALID)] = "spr_vld",
#endif
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL)] = "rndv_snd_int",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_RTR_REQ)] = "rndv_rtr_req",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_FLUSH)] = "rndv_flush",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_START_FLUSH)] = "rndv_start_flush",
};

static ucs_memory_type_t ucp_request_get_mem_type(ucp_request_t *req)
Expand All @@ -59,7 +64,8 @@ static ucs_memory_type_t ucp_request_get_mem_type(ucp_request_t *req)
} else if (req->flags & (UCP_REQUEST_FLAG_SEND_AM | UCP_REQUEST_FLAG_SEND_TAG)) {
return req->send.mem_type;
} else if (req->flags &
(UCP_REQUEST_FLAG_RECV_AM | UCP_REQUEST_FLAG_RECV_TAG)) {
(UCP_REQUEST_FLAG_RECV_AM | UCP_REQUEST_FLAG_RECV_TAG |
UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL)) {
return req->recv.dt_iter.mem_info.type;
} else {
return UCS_MEMORY_TYPE_UNKNOWN;
Expand Down
23 changes: 19 additions & 4 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,20 @@ enum {
UCP_REQUEST_FLAG_USER_HEADER_COPIED = UCS_BIT(19),
UCP_REQUEST_FLAG_USAGE_TRACKED = UCS_BIT(20),
UCP_REQUEST_FLAG_FENCE_REQUIRED = UCS_BIT(21),
UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL = UCS_BIT(22),
#if UCS_ENABLE_ASSERT
UCP_REQUEST_FLAG_STREAM_RECV = UCS_BIT(22),
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(23),
UCP_REQUEST_FLAG_SUPER_VALID = UCS_BIT(24),
UCP_REQUEST_FLAG_STREAM_RECV = UCS_BIT(23),
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(24),
UCP_REQUEST_FLAG_SUPER_VALID = UCS_BIT(25),
#else
UCP_REQUEST_FLAG_STREAM_RECV = 0,
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = 0,
UCP_REQUEST_FLAG_SUPER_VALID = 0
UCP_REQUEST_FLAG_SUPER_VALID = 0,
#endif
UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL = UCS_BIT(26),
UCP_REQUEST_FLAG_RNDV_RTR_REQ = UCS_BIT(27),
UCP_REQUEST_FLAG_RNDV_FLUSH = UCS_BIT(28),
UCP_REQUEST_FLAG_RNDV_START_FLUSH = UCS_BIT(29)
};


Expand Down Expand Up @@ -261,6 +266,9 @@ struct ucp_request {
/* Remote buffer address for get/put operation */
uint64_t remote_address;

/* Remote buffer memory info for RTR_REQ */
ucp_memory_info_t remote_mem_info;

/* Key for remote buffer operation */
ucp_rkey_h rkey;

Expand Down Expand Up @@ -477,6 +485,13 @@ struct ucp_request {
size_t length; /* Completion info to fill */
} stream;

struct {
/* Remote endpoint ID used to send internal completions */
uint64_t ep_id;
/* Completion callback for internal RNDV receives */
ucp_request_callback_t complete_cb;
} rndv;

struct {
ucp_am_recv_data_nbx_callback_t cb; /* Completion callback */
ucp_recv_desc_t *desc; /* Receive desc */
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,17 @@ ucp_request_put(ucp_request_t *req)
ucs_mpool_put_inline(req);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_rndv_flush_complete(ucp_request_t *req)
{
/* Complete the extra flush op held by a RNDV wrapper until the RNDV data
* path completes. */
if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_FLUSH)) {
req->flags &= ~UCP_REQUEST_FLAG_RNDV_FLUSH;
ucp_worker_flush_ops_count_add(req->send.ep->worker, -1);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
{
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/proto/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
_macro(ucp_get_am_bcopy_proto) \
_macro(ucp_get_offload_bcopy_proto) \
_macro(ucp_get_offload_zcopy_proto) \
_macro(ucp_get_rndv_proto) \
_macro(ucp_put_am_bcopy_proto) \
_macro(ucp_put_offload_short_proto) \
_macro(ucp_put_offload_bcopy_proto) \
_macro(ucp_put_offload_zcopy_proto) \
_macro(ucp_put_rndv_proto) \
_macro(ucp_put_sgl_offload_proto) \
_macro(ucp_eager_bcopy_multi_proto) \
_macro(ucp_eager_sync_bcopy_multi_proto) \
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/proto/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


/* Maximal number of protocols in total */
#define UCP_PROTO_MAX_COUNT 64
#define UCP_PROTO_MAX_COUNT 65


/* Special value for non-existent protocol */
Expand Down
13 changes: 12 additions & 1 deletion src/ucp/proto/proto_common.inl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ ucp_proto_request_zcopy_clean(ucp_request_t *req, unsigned dt_mask)
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
ucp_proto_request_zcopy_complete_cb(ucp_request_t *req, ucs_status_t status,
ucp_request_callback_t complete_cb)
{
ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1,
UCP_DT_MASK_CONTIG_IOV |
Expand All @@ -109,10 +110,20 @@ ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
!(req->send.ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_proto_request_restart(req);
} else {
if (complete_cb != NULL) {
complete_cb(req);
}

ucp_request_complete_send(req, status);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
{
ucp_proto_request_zcopy_complete_cb(req, status, NULL);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_request_zcopy_complete_success(ucp_request_t *req)
{
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/proto/proto_debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ void ucp_proto_select_param_str(const ucp_proto_select_param_t *select_param,
[ucs_ilog2(UCP_OP_ATTR_FLAG_MULTI_SEND)] = "multi",
};
static const char *rndv_flag_names[] = {
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG)] = "frag"
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG)] = "frag",
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_RNDV_PUSH)] = "push"
};
static const char *am_flag_names[] = {
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_AM_EAGER)] = "egr",
Expand Down
13 changes: 11 additions & 2 deletions src/ucp/proto/proto_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,18 @@ ucp_proto_select_lookup_slow(ucp_worker_h worker,
return NULL;
}

/* add to hash after initializing the temp element, since calling
* ucp_proto_select_elem_init() can recursively modify the hash
/* Add to hash after initializing the temp element, since calling
* ucp_proto_select_elem_init() can recursively modify the hash.
* Re-check the key because recursive lookup may have initialized this
* exact selection already.
*/
khiter = kh_get(ucp_proto_select_hash, proto_select->hash, key.u64);
if (khiter != kh_end(proto_select->hash)) {
ucp_proto_select_elem_cleanup(&tmp_select_elem);
select_elem = &kh_value(proto_select->hash, khiter);
goto out;
}

khiter = kh_put(ucp_proto_select_hash, proto_select->hash, key.u64,
&khret);
ucs_assert_always(khret == UCS_KH_PUT_BUCKET_EMPTY);
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/proto/proto_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
* Relevant for UCP_OP_ID_RNDV_SEND and UCP_OP_ID_RNDV_RECV. */
#define UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG (UCP_PROTO_SELECT_OP_FLAGS_BASE << 1)

/* Select only push-based rendezvous receive protocols. */
#define UCP_PROTO_SELECT_OP_FLAG_RNDV_PUSH (UCP_PROTO_SELECT_OP_FLAGS_BASE << 3)


/* Select eager/rendezvous protocol for Active Message sends.
* Relevant for UCP_OP_ID_AM_SEND and UCP_OP_ID_AM_SEND_REPLY. */
Expand Down
Loading