Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 82 additions & 24 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#define UCP_WIREUP_RMA_BW_TEST_MSG_SIZE 262144
#define UCP_WIREUP_MAX_FLAGS_STRING_SIZE 50
#define UCP_WIREUP_PATH_INDEX_UNDEFINED UINT_MAX
#define UCP_WIREUP_SCORE_MAX_DIFF 0.02
#define UCP_WIREUP_NO_SCORE_TIEBREAK (-1.0)
Comment thread
shasson5 marked this conversation as resolved.

/* 6 for the string format constant length */
#define UCP_WIREUP_TLS_INFO_SIZE (UCP_WIREUP_UCT_INFO_SIZE + \
Expand Down Expand Up @@ -352,22 +354,44 @@ ucp_wireup_check_keepalive(const ucp_wireup_select_params_t *select_params,
}

static void
ucp_wireup_init_select_info(double score, unsigned addr_index,
ucp_rsc_index_t rsc_index,
uint8_t priority,
ucp_wireup_init_select_info(double score, double tiebreak, unsigned addr_index,
ucp_rsc_index_t rsc_index, uint8_t priority,
ucp_wireup_select_info_t *select_info)
{
/* score == 0.0 could be specified only when initializing a selection info
* to add CM lane (rsc_index == UCP_NULL_RESOURCE in this case) */
ucs_assert((score >= 0.0) || (rsc_index == UCP_NULL_RESOURCE));

select_info->score = score;
select_info->tiebreak = tiebreak;
select_info->addr_index = addr_index;
select_info->path_index = UCP_WIREUP_PATH_INDEX_UNDEFINED;
select_info->rsc_index = rsc_index;
select_info->priority = priority;
}

/*
* Compare a candidate transport against the currently selected one. When the
* candidate has a tiebreak and its score is within UCP_WIREUP_SCORE_MAX_DIFF of
* the lower score, the tiebreak (then priority) decides; otherwise the score
* (then priority) decides. Returns >0 if the candidate is better, <0 if worse,
* 0 if equal.
*/
static int ucp_wireup_candidate_cmp(double cand_score, double cand_tiebreak,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

candidate, don't use abbreviations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func is nor order-dependent.
Meaning, with multiple candidates inside the "window", the lane will be chosen arbitrarily based on bitmap/iter order.
Maybe better find best primary score, then use tiebreak.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int cand_prio,
const ucp_wireup_select_info_t *sel)
{
double ref_score = ucs_min(cand_score, sel->score);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's problematic that the ref_score keeps changing while iterating over candidates.
This makes the selection depend on ordering.

Maybe we should find the ref_score in a first pass, and then compare to it in a second pass?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it overcomplicates the code and not worth it for a minor corner case.


if ((cand_tiebreak >= 0.0) && (fabs(cand_score - sel->score) <=
(UCP_WIREUP_SCORE_MAX_DIFF * ref_score))) {
return ucp_score_prio_cmp(cand_tiebreak, cand_prio, sel->tiebreak,
sel->priority);
}

return ucp_score_prio_cmp(cand_score, cand_prio, sel->score, sel->priority);
}

static size_t
ucp_wireup_bw_max_lanes(const ucp_wireup_select_params_t *select_params)
{
Expand Down Expand Up @@ -422,7 +446,7 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(
uct_md_attr_v2_t *md_attr;
const uct_component_attr_t *cmpt_attr;
int is_reachable;
double score;
double score, tiebreak;
uint8_t priority;
ucp_md_index_t md_index;

Expand Down Expand Up @@ -621,19 +645,24 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(
}

score = criteria->calc_score(wiface, md_attr, address, ae,
0, criteria->arg);
0, criteria->arg);
Comment thread
shasson5 marked this conversation as resolved.
tiebreak = (criteria->calc_tiebreak == NULL) ?
UCP_WIREUP_NO_SCORE_TIEBREAK :
criteria->calc_tiebreak(wiface, md_attr,
address, ae, 0,
criteria->tiebreak_arg);
priority = iface_attr->priority + ae->iface_attr.priority;
is_reachable = 1;

ucs_trace(UCT_TL_RESOURCE_DESC_FMT
"->addr[%u] : %s score %.2f priority %d",
UCT_TL_RESOURCE_DESC_ARG(resource),
addr_index, criteria->title, score, priority);

if (!found || (ucp_score_prio_cmp(score, priority, sinfo.score,
sinfo.priority) > 0)) {
ucp_wireup_init_select_info(score, addr_index, rsc_index,
priority, &sinfo);
"->addr[%u] : %s score %.2f tiebreak %.2f priority %d",
Comment thread
shasson5 marked this conversation as resolved.
Outdated
UCT_TL_RESOURCE_DESC_ARG(resource), addr_index,
criteria->title, score, tiebreak, priority);

if (!found || (ucp_wireup_candidate_cmp(score, tiebreak, priority,
&sinfo) > 0)) {
ucp_wireup_init_select_info(score, tiebreak, addr_index,
rsc_index, priority, &sinfo);
found = 1;
}
}
Expand Down Expand Up @@ -662,13 +691,14 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(
}

ucs_trace("ep %p: selected for %s: " UCT_TL_RESOURCE_DESC_FMT " md[%d]"
" -> '%s' address[%d],md[%d] score %.2f",
" -> '%s' address[%d],md[%d] score %.2f tiebreak %.2f",
ep, criteria->title,
UCT_TL_RESOURCE_DESC_ARG(
&context->tl_rscs[sinfo.rsc_index].tl_rsc),
context->tl_rscs[sinfo.rsc_index].md_index, ucp_ep_peer_name(ep),
sinfo.addr_index,
address->address_list[sinfo.addr_index].md_index, sinfo.score);
address->address_list[sinfo.addr_index].md_index, sinfo.score,
sinfo.tiebreak);

*select_info = sinfo;
return UCS_OK;
Expand Down Expand Up @@ -1018,13 +1048,11 @@ static uint64_t ucp_ep_get_context_features(const ucp_ep_h ep)
return ep->worker->context->config.features;
}

static double ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface,
const uct_md_attr_v2_t *md_attr,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr,
int is_prioritized_ep, void *arg)
static double
ucp_wireup_iface_score_bandwidth(const ucp_worker_iface_t *wiface,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr)
{
/* best for 4k messages */
double local_bw;

if (unpacked_addr->dst_version < 17) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remote BW from addr v2 is FP8-unpacked, while local BW here remains exact. Nearby code packs/unpacks the local value for symmetry; the AM/keepalive tiebreak should do the same.

Expand All @@ -1034,12 +1062,24 @@ static double ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface,
local_bw = ucp_wireup_iface_bw_distance(wiface);
}

return ucs_min(local_bw, remote_addr->iface_attr.bandwidth);
}

static double
ucp_wireup_rma_score_func(const ucp_worker_iface_t *wiface,
const uct_md_attr_v2_t *md_attr,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr,
int is_prioritized_ep, void *arg)
{
/* best for 4k messages */
return 1e-3 /
(ucp_wireup_tl_iface_latency(
wiface, unpacked_addr, &remote_addr->iface_attr,
is_prioritized_ep) +
wiface->attr.overhead +
(4096.0 / ucs_min(local_bw, remote_addr->iface_attr.bandwidth)));
(4096.0 / ucp_wireup_iface_score_bandwidth(wiface, unpacked_addr,
remote_addr)));
}

static void ucp_wireup_fill_peer_err_criteria(ucp_wireup_criteria_t *criteria,
Expand Down Expand Up @@ -1106,6 +1146,8 @@ static void ucp_wireup_criteria_init(ucp_wireup_criteria_t *criteria)
criteria->alloc_mem_types = 0;
criteria->is_keepalive = 0;
criteria->calc_score = NULL;
criteria->calc_tiebreak = NULL;
criteria->tiebreak_arg = NULL;
criteria->tl_rsc_flags = 0;
ucp_wireup_init_select_flags(&criteria->local_iface_flags, 0, 0);
ucp_wireup_init_select_flags(&criteria->remote_iface_flags, 0, 0);
Expand Down Expand Up @@ -1140,7 +1182,7 @@ ucp_wireup_add_cm_lane(const ucp_wireup_select_params_t *select_params,
return UCS_OK;
}

ucp_wireup_init_select_info(0., UINT_MAX, UCP_NULL_RESOURCE, 0,
ucp_wireup_init_select_info(0., 0., UINT_MAX, UCP_NULL_RESOURCE, 0,
&select_info);

/* server is not a proxy because it can create all lanes connected */
Expand Down Expand Up @@ -1267,14 +1309,28 @@ ucp_wireup_am_score_func(const ucp_worker_iface_t *wiface,
const ucp_address_entry_t *remote_addr,
int is_prioritized_ep, void *arg)
{
/* best end-to-end latency */
return 1e-3 /
(ucp_wireup_tl_iface_latency(
wiface, unpacked_addr, &remote_addr->iface_attr,
is_prioritized_ep) +
wiface->attr.overhead + remote_addr->iface_attr.overhead);
}

/*
* AM-lane tiebreak: rank transports by bandwidth (scaled to MiB/s for a smaller,
* more readable value) to break ties between candidates with close scores.
*/
static double
ucp_wireup_tiebreak_func(const ucp_worker_iface_t *wiface,
const uct_md_attr_v2_t *md_attr,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr,
int is_prioritized_ep, void *arg)
{
return ucp_wireup_iface_score_bandwidth(wiface, unpacked_addr,
remote_addr) / UCS_MBYTE;
}

static double ucp_tl_iface_bandwidth_ratio(ucp_context_h context,
unsigned path_index,
unsigned num_paths)
Expand Down Expand Up @@ -1480,6 +1536,7 @@ ucp_wireup_add_am_lane(const ucp_wireup_select_params_t *select_params,
ucp_wireup_criteria_init(&criteria);
criteria.title = "active messages";
criteria.calc_score = ucp_wireup_am_score_func;
criteria.calc_tiebreak = ucp_wireup_tiebreak_func;
criteria.lane_type = UCP_LANE_TYPE_AM;
criteria.tl_rsc_flags =
(ep_init_flags & UCP_EP_INIT_ALLOW_AM_AUX_TL) ?
Expand Down Expand Up @@ -2426,6 +2483,7 @@ ucp_wireup_add_keepalive_lane(const ucp_wireup_select_params_t *select_params,
criteria.local_md_flags = 0;
criteria.is_keepalive = 1;
criteria.calc_score = ucp_wireup_keepalive_score_func;
criteria.calc_tiebreak = ucp_wireup_tiebreak_func;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR says it's only for AM, should the title/description also mention keepalive?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New tiebreak for KA, but the PR tests only AM send selection. Plz add coverage for KA candidates, (w max_inflight_eps..)

/* Keepalive can also use auxiliary transports */
criteria.tl_rsc_flags = UCP_TL_RSC_FLAG_AUX;
criteria.lane_type = UCP_LANE_TYPE_KEEPALIVE;
Expand Down
80 changes: 46 additions & 34 deletions src/ucp/wireup/wireup.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,67 +54,78 @@ enum {
};


/**
* Calculates a score of a potential transport. Used both for the primary
* selection score and for the tiebreak score.
*
* @param [in] wiface UCP worker iface.
* @param [in] md_attr Local MD attributes.
* @param [in] unpacked_addr The whole remote address unpacked.
* @param [in] remote_addr Remote transport address info and attributes.
* @param [in] is_prioritized_ep Endpoint is prioritized.
* @param [in] arg Custom argument.
*
* @return Transport score, the higher the better.
*/
typedef double (*ucp_wireup_calc_score_func_t)(
const ucp_worker_iface_t *wiface, const uct_md_attr_v2_t *md_attr,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr, int is_prioritized_ep,
void *arg);


/**
* Criteria for transport selection.
*/
typedef struct {
/* Name of the criteria for debugging */
const char *title;
const char *title;
Comment thread
shasson5 marked this conversation as resolved.

/* Required local MD flags */
uint64_t local_md_flags;
uint64_t local_md_flags;

/* Required local component flags */
uint64_t local_cmpt_flags;
uint64_t local_cmpt_flags;

/* Required local interface flags */
ucp_wireup_select_flags_t local_iface_flags;
ucp_wireup_select_flags_t local_iface_flags;

/* Required remote interface flags */
ucp_wireup_select_flags_t remote_iface_flags;
ucp_wireup_select_flags_t remote_iface_flags;

/* Required local event flags */
uint64_t local_event_flags;
uint64_t local_event_flags;

/* Required remote event flags */
uint64_t remote_event_flags;
uint64_t remote_event_flags;

/* Mandatory memory types for allocation */
uint64_t alloc_mem_types;
uint64_t alloc_mem_types;

/* Required support of keepalive mechanism */
int is_keepalive;

/**
* Calculates score of a potential transport.
*
* @param [in] wiface UCP worker iface.
* @param [in] md_attr Local MD attributes.
* @param [in] unpacked_addr The whole remote address unpacked.
* @param [in] remote_addr Remote transport address info and
* attributes.
* @param [in] is_prioritized_ep Endpoint is prioritized.
* @param [in] arg Custom argument.
*
* @return Transport score, the higher the better.
*/
double (*calc_score)(const ucp_worker_iface_t *wiface,
const uct_md_attr_v2_t *md_attr,
const ucp_unpacked_address_t *unpacked_addr,
const ucp_address_entry_t *remote_addr,
int is_prioritized_ep,
void *arg);
int is_keepalive;

/* Calculates the primary selection score of a potential transport. */
ucp_wireup_calc_score_func_t calc_score;

/* Calculates the tiebreak score, used to choose between candidates whose
* @ref calc_score values are close. May be NULL, which disables
* tiebreaking (single-score selection). */
ucp_wireup_calc_score_func_t calc_tiebreak;

/* Custom argument of @a calc_score function */
void *arg;
void *arg;

/* Custom argument of @a calc_tiebreak function */
void *tiebreak_arg;

/* Flags that describe TL specifics */
uint8_t tl_rsc_flags;
uint8_t tl_rsc_flags;

ucp_tl_iface_atomic_flags_t local_atomic_flags;
ucp_tl_iface_atomic_flags_t local_atomic_flags;

ucp_tl_iface_atomic_flags_t remote_atomic_flags;
ucp_lane_type_t lane_type;
ucp_tl_iface_atomic_flags_t remote_atomic_flags;
ucp_lane_type_t lane_type;
} ucp_wireup_criteria_t;


Expand All @@ -135,6 +146,7 @@ typedef struct ucp_wireup_msg {

typedef struct {
double score;
double tiebreak;
unsigned addr_index;
unsigned path_index;
ucp_rsc_index_t rsc_index;
Expand Down
Loading
Loading