diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 6fe1616f52e..e9082ef542f 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -20,10 +20,13 @@ #include #include #include +#include #define UCP_WIREUP_RMA_BW_TEST_MSG_SIZE 262144 #define UCP_WIREUP_MAX_FLAGS_STRING_SIZE 50 #define UCP_WIREUP_PATH_INDEX_UNDEFINED UINT_MAX +#define UCP_WIREUP_SCORE_MAX_DIFF 0.02 +#define UCP_WIREUP_NO_SCORE_TIEBREAK (-1.0) /* 6 for the string format constant length */ #define UCP_WIREUP_TLS_INFO_SIZE (UCP_WIREUP_UCT_INFO_SIZE + \ @@ -352,9 +355,8 @@ ucp_wireup_check_keepalive(const ucp_wireup_select_params_t *select_params, } static void -ucp_wireup_init_select_info(double score, unsigned addr_index, - ucp_rsc_index_t rsc_index, - uint8_t priority, +ucp_wireup_init_select_info(double score, double tiebreak, unsigned addr_index, + ucp_rsc_index_t rsc_index, uint8_t priority, ucp_wireup_select_info_t *select_info) { /* score == 0.0 could be specified only when initializing a selection info @@ -362,12 +364,47 @@ ucp_wireup_init_select_info(double score, unsigned addr_index, ucs_assert((score >= 0.0) || (rsc_index == UCP_NULL_RESOURCE)); select_info->score = score; + select_info->tiebreak = tiebreak; select_info->addr_index = addr_index; select_info->path_index = UCP_WIREUP_PATH_INDEX_UNDEFINED; select_info->rsc_index = rsc_index; select_info->priority = priority; } +/* + * Select by tiebreak only among candidates whose score is close to the best + * primary score. Keep the reference score fixed so later updates to *sinfo + * cannot change the tiebreak window and make the result depend on iteration + * order. + */ +static void ucp_wireup_select_transport_tiebreak( + const ucp_proto_select_info_array_t *candidates_array, + ucp_wireup_select_info_t *sinfo) +{ + const double ref_score = sinfo->score; + int found = 0; + const ucp_wireup_select_info_t *candidate; + + ucs_array_for_each(candidate, candidates_array) { + if (fabs(candidate->score - ref_score) > + (UCP_WIREUP_SCORE_MAX_DIFF * ref_score)) { + continue; + } + + if (!found || + (ucp_score_prio_cmp(candidate->tiebreak, candidate->priority, + sinfo->tiebreak, sinfo->priority) > 0)) { + *sinfo = *candidate; + found = 1; + } + } + + ucs_assertv(found, "score=%f tiebreak=%f addr_index=%u path_index=%u " + "rsc_index=%d priority=%u", sinfo->score, sinfo->tiebreak, + sinfo->addr_index, sinfo->path_index, sinfo->rsc_index, + sinfo->priority); +} + static size_t ucp_wireup_bw_max_lanes(const ucp_wireup_select_params_t *select_params) { @@ -397,13 +434,17 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport( { UCS_STRING_BUFFER_ONSTACK(missing_flags_str, UCP_WIREUP_MAX_FLAGS_STRING_SIZE); - const ucp_unpacked_address_t *address = select_params->address; - ucp_ep_h ep = select_params->ep; - ucp_worker_h worker = ep->worker; - ucp_context_h context = worker->context; - ucp_wireup_select_info_t sinfo = {0}; - int found = 0; - ucp_wireup_select_flags_t local_iface_flags = criteria->local_iface_flags; + const ucp_unpacked_address_t *address = select_params->address; + ucp_ep_h ep = select_params->ep; + ucp_worker_h worker = ep->worker; + ucp_context_h context = worker->context; + ucp_proto_select_info_array_t candidates_array = + UCS_ARRAY_DYNAMIC_INITIALIZER; + ucp_wireup_select_info_t sinfo = {0}; + int found = 0; + ucp_wireup_select_flags_t local_iface_flags = + criteria->local_iface_flags; + ucp_wireup_select_info_t *candidate; int has_cm; uint64_t local_md_flags; ucp_tl_addr_bitmap_t addr_index_map, rsc_addr_index_map; @@ -421,8 +462,9 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport( uct_iface_attr_t *iface_attr; uct_md_attr_v2_t *md_attr; const uct_component_attr_t *cmpt_attr; + ucs_status_t status; int is_reachable; - double score; + double score, tiebreak; uint8_t priority; int score_cmp; ucp_md_index_t md_index; @@ -622,16 +664,40 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport( } score = criteria->calc_score(wiface, md_attr, address, ae, - 0, criteria->arg); + 0, criteria->arg); + tiebreak = (criteria->calc_tiebreak == NULL) ? + UCP_WIREUP_NO_SCORE_TIEBREAK : + criteria->calc_tiebreak(wiface, md_attr, + address, ae, 0, + criteria->tiebreak_arg); priority = iface_attr->priority + ae->iface_attr.priority; score_cmp = found ? ucp_score_prio_cmp(score, priority, sinfo.score, sinfo.priority) : 1; is_reachable = 1; + ucs_trace(UCT_TL_RESOURCE_DESC_FMT + "->addr[%u] : %s score %.2f tiebreak %.2f priority %d", + UCT_TL_RESOURCE_DESC_ARG(resource), addr_index, + criteria->title, score, tiebreak, priority); + + if (criteria->calc_tiebreak != NULL) { + /* Save every reachable candidate so the tiebreak pass can + * compare them after the best primary score is known. */ + candidate = ucs_array_append(&candidates_array, + status = UCS_ERR_NO_MEMORY; + goto out_cleanup); + ucp_wireup_init_select_info(score, tiebreak, addr_index, + rsc_index, priority, candidate); + } + if (!found || (score_cmp > 0)) { - ucp_wireup_init_select_info(score, addr_index, rsc_index, - priority, &sinfo); + if (criteria->calc_tiebreak != NULL) { + sinfo = *candidate; + } else { + ucp_wireup_init_select_info(score, tiebreak, addr_index, + rsc_index, priority, &sinfo); + } found = 1; } } @@ -656,20 +722,30 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport( address->name, tls_info); } - return UCS_ERR_UNREACHABLE; + status = UCS_ERR_UNREACHABLE; + goto out_cleanup; + } + + if (criteria->calc_tiebreak != NULL) { + ucp_wireup_select_transport_tiebreak(&candidates_array, &sinfo); } ucs_trace("ep %p: selected for %s: " UCT_TL_RESOURCE_DESC_FMT " md[%d]" - " -> '%s' address[%d],md[%d] score %.2f", + " -> '%s' address[%d],md[%d] score %.2f tiebreak %.2f", ep, criteria->title, UCT_TL_RESOURCE_DESC_ARG( &context->tl_rscs[sinfo.rsc_index].tl_rsc), context->tl_rscs[sinfo.rsc_index].md_index, ucp_ep_peer_name(ep), sinfo.addr_index, - address->address_list[sinfo.addr_index].md_index, sinfo.score); + address->address_list[sinfo.addr_index].md_index, sinfo.score, + sinfo.tiebreak); *select_info = sinfo; - return UCS_OK; + status = UCS_OK; + +out_cleanup: + ucs_array_cleanup_dynamic(&candidates_array); + return status; } static inline double @@ -1016,13 +1092,11 @@ static uint64_t ucp_ep_get_context_features(const ucp_ep_h ep) return ep->worker->context->config.features; } -static double ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface, - const uct_md_attr_v2_t *md_attr, - const ucp_unpacked_address_t *unpacked_addr, - const ucp_address_entry_t *remote_addr, - int is_prioritized_ep, void *arg) +static double +ucp_wireup_iface_score_bandwidth(const ucp_worker_iface_t *wiface, + const ucp_unpacked_address_t *unpacked_addr, + const ucp_address_entry_t *remote_addr) { - /* best for 4k messages */ double local_bw; if (unpacked_addr->dst_version < 17) { @@ -1032,12 +1106,30 @@ static double ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface, local_bw = ucp_wireup_iface_bw_distance(wiface); } + if (unpacked_addr->addr_version == UCP_OBJECT_VERSION_V2) { + /* FP8 is a lossy compression method, so in order to create a symmetric + * calculation we pack/unpack the local bandwidth as well */ + local_bw = UCS_FP8_PACK_UNPACK(BANDWIDTH, local_bw); + } + + return ucs_min(local_bw, remote_addr->iface_attr.bandwidth); +} + +static double +ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface, + const uct_md_attr_v2_t *md_attr, + const ucp_unpacked_address_t *unpacked_addr, + const ucp_address_entry_t *remote_addr, + int is_prioritized_ep, void *arg) +{ + /* best for 4k messages */ return 1e-3 / (ucp_wireup_tl_iface_latency( wiface, unpacked_addr, &remote_addr->iface_attr, is_prioritized_ep) + wiface->attr.overhead + - (4096.0 / ucs_min(local_bw, remote_addr->iface_attr.bandwidth))); + (4096.0 / ucp_wireup_iface_score_bandwidth(wiface, unpacked_addr, + remote_addr))); } static void ucp_wireup_fill_peer_err_criteria(ucp_wireup_criteria_t *criteria, @@ -1104,6 +1196,8 @@ static void ucp_wireup_criteria_init(ucp_wireup_criteria_t *criteria) criteria->alloc_mem_types = 0; criteria->is_keepalive = 0; criteria->calc_score = NULL; + criteria->calc_tiebreak = NULL; + criteria->tiebreak_arg = NULL; criteria->tl_rsc_flags = 0; ucp_wireup_init_select_flags(&criteria->local_iface_flags, 0, 0); ucp_wireup_init_select_flags(&criteria->remote_iface_flags, 0, 0); @@ -1138,7 +1232,7 @@ ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params, return UCS_OK; } - ucp_wireup_init_select_info(0., UINT_MAX, UCP_NULL_RESOURCE, 0, + ucp_wireup_init_select_info(0., 0., UINT_MAX, UCP_NULL_RESOURCE, 0, &select_info); /* server is not a proxy because it can create all lanes connected */ @@ -1265,7 +1359,6 @@ ucp_wireup_am_score_func(const ucp_worker_iface_t *wiface, const ucp_address_entry_t *remote_addr, int is_prioritized_ep, void *arg) { - /* best end-to-end latency */ return 1e-3 / (ucp_wireup_tl_iface_latency( wiface, unpacked_addr, &remote_addr->iface_attr, @@ -1273,6 +1366,21 @@ ucp_wireup_am_score_func(const ucp_worker_iface_t *wiface, wiface->attr.overhead + remote_addr->iface_attr.overhead); } +/* + * AM-lane tiebreak: rank transports by bandwidth (scaled to MiB/s for a smaller, + * more readable value) to break ties between candidates with close scores. + */ +static double +ucp_wireup_tiebreak_func(const ucp_worker_iface_t *wiface, + const uct_md_attr_v2_t *md_attr, + const ucp_unpacked_address_t *unpacked_addr, + const ucp_address_entry_t *remote_addr, + int is_prioritized_ep, void *arg) +{ + return ucp_wireup_iface_score_bandwidth(wiface, unpacked_addr, + remote_addr) / UCS_MBYTE; +} + static double ucp_tl_iface_bandwidth_ratio(ucp_context_h context, unsigned path_index, unsigned num_paths) @@ -1478,6 +1586,7 @@ ucp_wireup_add_am_lane(const ucp_wireup_select_params_t *select_params, ucp_wireup_criteria_init(&criteria); criteria.title = "active messages"; criteria.calc_score = ucp_wireup_am_score_func; + criteria.calc_tiebreak = ucp_wireup_tiebreak_func; criteria.lane_type = UCP_LANE_TYPE_AM; criteria.tl_rsc_flags = (ep_init_flags & UCP_EP_INIT_ALLOW_AM_AUX_TL) ? @@ -2424,6 +2533,7 @@ ucp_wireup_add_keepalive_lane(const ucp_wireup_select_params_t *select_params, criteria.local_md_flags = 0; criteria.is_keepalive = 1; criteria.calc_score = ucp_wireup_keepalive_score_func; + criteria.calc_tiebreak = ucp_wireup_tiebreak_func; /* Keepalive can also use auxiliary transports */ criteria.tl_rsc_flags = UCP_TL_RSC_FLAG_AUX; criteria.lane_type = UCP_LANE_TYPE_KEEPALIVE; diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index f100080c3aa..be980a0412c 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -1,5 +1,5 @@ /** - * Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2001-2015. ALL RIGHTS RESERVED. + * Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2001-2026. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ @@ -54,67 +54,78 @@ enum { }; +/** + * Calculates a score of a potential transport. Used both for the primary + * selection score and for the tiebreak score. + * + * @param [in] wiface UCP worker iface. + * @param [in] md_attr Local MD attributes. + * @param [in] unpacked_addr The whole remote address unpacked. + * @param [in] remote_addr Remote transport address info and attributes. + * @param [in] is_prioritized_ep Endpoint is prioritized. + * @param [in] arg Custom argument. + * + * @return Transport score, the higher the better. + */ +typedef double (*ucp_wireup_calc_score_func_t)( + const ucp_worker_iface_t *wiface, const uct_md_attr_v2_t *md_attr, + const ucp_unpacked_address_t *unpacked_addr, + const ucp_address_entry_t *remote_addr, int is_prioritized_ep, + void *arg); + + /** * Criteria for transport selection. */ typedef struct { /* Name of the criteria for debugging */ - const char *title; + const char *title; /* Required local MD flags */ - uint64_t local_md_flags; + uint64_t local_md_flags; /* Required local component flags */ - uint64_t local_cmpt_flags; + uint64_t local_cmpt_flags; /* Required local interface flags */ - ucp_wireup_select_flags_t local_iface_flags; + ucp_wireup_select_flags_t local_iface_flags; /* Required remote interface flags */ - ucp_wireup_select_flags_t remote_iface_flags; + ucp_wireup_select_flags_t remote_iface_flags; /* Required local event flags */ - uint64_t local_event_flags; + uint64_t local_event_flags; /* Required remote event flags */ - uint64_t remote_event_flags; + uint64_t remote_event_flags; /* Mandatory memory types for allocation */ - uint64_t alloc_mem_types; + uint64_t alloc_mem_types; /* Required support of keepalive mechanism */ - int is_keepalive; - - /** - * Calculates score of a potential transport. - * - * @param [in] wiface UCP worker iface. - * @param [in] md_attr Local MD attributes. - * @param [in] unpacked_addr The whole remote address unpacked. - * @param [in] remote_addr Remote transport address info and - * attributes. - * @param [in] is_prioritized_ep Endpoint is prioritized. - * @param [in] arg Custom argument. - * - * @return Transport score, the higher the better. - */ - double (*calc_score)(const ucp_worker_iface_t *wiface, - const uct_md_attr_v2_t *md_attr, - const ucp_unpacked_address_t *unpacked_addr, - const ucp_address_entry_t *remote_addr, - int is_prioritized_ep, - void *arg); + int is_keepalive; + + /* Calculates the primary selection score of a potential transport. */ + ucp_wireup_calc_score_func_t calc_score; + + /* Calculates the tiebreak score, used to choose between candidates whose + * @ref calc_score values are close. May be NULL, which disables + * tiebreaking (single-score selection). */ + ucp_wireup_calc_score_func_t calc_tiebreak; /* Custom argument of @a calc_score function */ - void *arg; + void *arg; + + /* Custom argument of @a calc_tiebreak function */ + void *tiebreak_arg; /* Flags that describe TL specifics */ - uint8_t tl_rsc_flags; + uint8_t tl_rsc_flags; - ucp_tl_iface_atomic_flags_t local_atomic_flags; + ucp_tl_iface_atomic_flags_t local_atomic_flags; - ucp_tl_iface_atomic_flags_t remote_atomic_flags; - ucp_lane_type_t lane_type; + ucp_tl_iface_atomic_flags_t remote_atomic_flags; + ucp_lane_type_t lane_type; } ucp_wireup_criteria_t; @@ -135,6 +146,7 @@ typedef struct ucp_wireup_msg { typedef struct { double score; + double tiebreak; unsigned addr_index; unsigned path_index; ucp_rsc_index_t rsc_index; diff --git a/test/gtest/ucp/test_ucp_proto_mock.cc b/test/gtest/ucp/test_ucp_proto_mock.cc index f29e9fba0f2..6baaa6ebccd 100644 --- a/test/gtest/ucp/test_ucp_proto_mock.cc +++ b/test/gtest/ucp/test_ucp_proto_mock.cc @@ -1,5 +1,5 @@ /** - * Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024. ALL RIGHTS RESERVED. + * Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024-2026. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ @@ -1540,6 +1540,174 @@ UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx_twins_get_inline_0, rcx, "rc_x") +class test_ucp_proto_mock_am_tiebreak : public test_ucp_proto_mock { +public: + test_ucp_proto_mock_am_tiebreak() + { + mock_transport("rc_mlx5"); + } + +protected: + void add_mock_device(const std::string &dev_name, double bandwidth, + double latency) + { + add_mock_iface(dev_name, + [bandwidth, latency](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 208; + iface_attr.bandwidth.shared = bandwidth; + iface_attr.latency.c = latency; + iface_attr.latency.m = 1e-9; + }); + } + + void check_config(const std::string &config) + { + ucp_proto_select_key_t key = any_key(); + key.param.op_id_flags = UCP_OP_ID_AM_SEND; + key.param.op_attr = 0; + + check_ep_config(sender(), {{0, 200, "short", config}}, key); + } +}; + +class test_ucp_proto_mock_am_tiebreak_equal_score : + public test_ucp_proto_mock_am_tiebreak { +public: + virtual void init() override + { + add_mock_device("mock_0:1", 10e9, 500e-9); + add_mock_device("mock_1:1", 28e9, 500e-9); + test_ucp_proto_mock::init(); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_am_tiebreak_equal_score, higher_bandwidth_wins, + "IB_NUM_PATHS?=1") +{ + check_config("rc_mlx5/mock_1:1"); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_am_tiebreak_equal_score, rcx, + "rc_x") + +class test_ucp_proto_mock_am_tiebreak_score_dominates : + public test_ucp_proto_mock_am_tiebreak { +public: + virtual void init() override + { + add_mock_device("mock_0:1", 10e9, 500e-9); + add_mock_device("mock_1:1", 28e9, 2000e-9); + test_ucp_proto_mock::init(); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_am_tiebreak_score_dominates, + score_beats_bandwidth, "IB_NUM_PATHS?=1") +{ + check_config("rc_mlx5/mock_0:1"); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_am_tiebreak_score_dominates, + rcx, "rc_x") + +class test_ucp_proto_mock_am_tiebreak_within_window : + public test_ucp_proto_mock_am_tiebreak { +public: + virtual void init() override + { + add_mock_device("mock_0:1", 10e9, 500e-9); + add_mock_device("mock_1:1", 28e9, 508e-9); + test_ucp_proto_mock::init(); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_am_tiebreak_within_window, + bandwidth_within_window, "IB_NUM_PATHS?=1") +{ + check_config("rc_mlx5/mock_1:1"); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_am_tiebreak_within_window, + rcx, "rc_x") + +class test_ucp_proto_mock_keepalive_tiebreak : + public test_ucp_proto_mock_am_tiebreak { +public: + test_ucp_proto_mock_keepalive_tiebreak() + { + modify_config("KEEPALIVE_INTERVAL", "1s"); + } + + virtual ucp_ep_params_t get_ep_params() override + { + ucp_ep_params_t params = test_ucp_proto_mock::get_ep_params(); + + params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE | + UCP_EP_PARAM_FIELD_ERR_HANDLER; + params.err_mode = UCP_ERR_HANDLING_MODE_PEER; + params.err_handler.cb = reinterpret_cast( + ucs_empty_function); + params.err_handler.arg = reinterpret_cast(this); + + return params; + } + + virtual void init() override + { + const size_t lower_max_inflight_eps = SIZE_MAX - (SIZE_MAX / 100); + + add_keepalive_mock_device("mock_0:1", 10e9, 500e-9, SIZE_MAX); + add_keepalive_mock_device("mock_1:1", 28e9, 500e-9, + lower_max_inflight_eps); + test_ucp_proto_mock::init(); + } + +protected: + void add_keepalive_mock_device(const std::string &dev_name, + double bandwidth, double latency, + size_t max_inflight_eps) + { + add_mock_iface(dev_name, + [bandwidth, latency](uct_iface_attr_t &iface_attr) { + iface_attr.cap.flags &= ~UCT_IFACE_FLAG_EP_KEEPALIVE; + iface_attr.cap.flags |= UCT_IFACE_FLAG_EP_CHECK | + UCT_IFACE_FLAG_CONNECT_TO_EP | + UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE; + iface_attr.cap.am.max_short = 208; + iface_attr.bandwidth.shared = bandwidth; + iface_attr.latency.c = latency; + iface_attr.latency.m = 1e-9; + }, [max_inflight_eps](uct_perf_attr_t &perf_attr) { + if (perf_attr.field_mask & + UCT_PERF_ATTR_FIELD_MAX_INFLIGHT_EPS) { + perf_attr.max_inflight_eps = max_inflight_eps; + } + }); + } + + void check_keepalive_lane(const std::string &dev_name) + { + const ucp_ep_config_t *config = ucp_worker_ep_config(sender().worker(), + ep_config_index( + sender())); + const ucp_lane_index_t lane = config->key.keepalive_lane; + + ASSERT_NE(UCP_NULL_LANE, lane); + EXPECT_STREQ(dev_name.c_str(), + ucp_ep_get_tl_rsc(sender().ep(), lane)->dev_name); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_keepalive_tiebreak, + higher_bandwidth_within_score_window, "IB_NUM_PATHS?=1") +{ + check_keepalive_lane("mock_1:1"); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_keepalive_tiebreak, rcx, + "rc_x") + + #if HAVE_DECL_IBV_EVENT_PORT_SPEED_CHANGE class test_ucp_proto_mock_rcx_speed_change : public test_ucp_proto_mock {