Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 71 additions & 44 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

#include "cuda_ipc_cache.h"
#include "cuda_ipc_iface.h"
#include "cuda_ipc.inl"

#include <ucs/datastruct/khash.h>
#include <ucs/debug/log.h>
#include <ucs/debug/memtrack_int.h>
#include <ucs/profile/profile.h>
#include <ucs/sys/sys.h>
#include <ucs/sys/string.h>
#include <ucs/sys/ptr_arith.h>
#include <ucs/datastruct/khash.h>
#include <ucs/type/rwlock.h>
#include <uct/cuda/base/cuda_ctx.inl>
#include "cuda_ipc.inl"

typedef struct uct_cuda_ipc_cache_hash_key {
pid_t pid;
ucs_sys_ns_t pid_ns;
Expand Down Expand Up @@ -59,7 +60,7 @@ KHASH_INIT(cuda_ipc_rem_cache, uct_cuda_ipc_cache_hash_key_t,
*/
typedef struct uct_cuda_ipc_remote_cache {
khash_t(cuda_ipc_rem_cache) hash;
ucs_recursive_spinlock_t lock;
ucs_rw_spinlock_t lock;
unsigned long max_regions; /**< Global max regions limit */
size_t max_size; /**< Global max total size limit */
} uct_cuda_ipc_remote_cache_t;
Expand Down Expand Up @@ -516,49 +517,75 @@ static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache,
cache->name, from, to);
}

static ucs_status_t
static int
uct_cuda_ipc_get_remote_cache(const uct_cuda_ipc_cache_hash_key_t *key,
uct_cuda_ipc_cache_t **cache)
uct_cuda_ipc_cache_t **cache_p)
{
ucs_status_t status = UCS_OK;
khint_t it;
int found;

ucs_rw_spinlock_read_lock(&uct_cuda_ipc_remote_cache.lock);

it = kh_get(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, *key);
found = (it != kh_end(&uct_cuda_ipc_remote_cache.hash));
if (found) {
*cache_p = kh_val(&uct_cuda_ipc_remote_cache.hash, it);
}

ucs_rw_spinlock_read_unlock(&uct_cuda_ipc_remote_cache.lock);
return found;
}

static ucs_status_t
uct_cuda_ipc_put_remote_cache(const uct_cuda_ipc_cache_hash_key_t *key,
uct_cuda_ipc_cache_t **cache_p)
{
int ret;
khint_t it;
ucs_status_t status;
char target_name[64];
khiter_t khiter;
int khret;

ucs_recursive_spin_lock(&uct_cuda_ipc_remote_cache.lock);
if (uct_cuda_ipc_get_remote_cache(key, cache_p)) {
return UCS_OK;
}

ucs_rw_spinlock_write_lock(&uct_cuda_ipc_remote_cache.lock);

khiter = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, *key,
&khret);
if ((khret == UCS_KH_PUT_BUCKET_EMPTY) ||
(khret == UCS_KH_PUT_BUCKET_CLEAR)) {
ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%u:%d",
key->pid, key->pid_ns, key->cu_device);
status = uct_cuda_ipc_create_cache(cache, target_name);
if (status != UCS_OK) {
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, khiter);
ucs_error("could not create create cuda ipc cache: %s",
ucs_status_string(status));
goto err_unlock;
}
it = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, *key,
&ret);
if (ret == UCS_KH_PUT_FAILED) {
ucs_error("failed to allocate cuda_ipc remote_cache hash entry");
status = UCS_ERR_NO_MEMORY;
goto out_unlock;
}

kh_val(&uct_cuda_ipc_remote_cache.hash, khiter) = *cache;
} else if (khret == UCS_KH_PUT_KEY_PRESENT) {
*cache = kh_val(&uct_cuda_ipc_remote_cache.hash, khiter);
} else {
ucs_error("unable to use cuda_ipc remote_cache hash");
status = UCS_ERR_NO_RESOURCE;
ucs_assertv_always(ret != UCS_KH_PUT_KEY_PRESENT, "key %d:%u:%d is present",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the case when 2 threads miss on read (line 548), then go here. The second kh_put will return UCS_KH_PUT_KEY_PRESENT -> assert fails.

key->pid, key->pid_ns, key->cu_device);
ucs_assertv_always((ret == UCS_KH_PUT_BUCKET_EMPTY) ||
Comment thread
rakhmets marked this conversation as resolved.
(ret == UCS_KH_PUT_BUCKET_CLEAR),
"invalid return value: %d", ret);

ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%u:%d",
key->pid, key->pid_ns, key->cu_device);
status = uct_cuda_ipc_create_cache(cache_p, target_name);
if (status != UCS_OK) {
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, it);
ucs_error("could not create create cuda ipc cache: %s",
ucs_status_string(status));
goto out_unlock;
}
err_unlock:
ucs_recursive_spin_unlock(&uct_cuda_ipc_remote_cache.lock);

kh_val(&uct_cuda_ipc_remote_cache.hash, it) = *cache_p;
Comment thread
rakhmets marked this conversation as resolved.
Outdated

out_unlock:
ucs_rw_spinlock_write_unlock(&uct_cuda_ipc_remote_cache.lock);
return status;
}

ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr,
const void *mapped_addr,
CUdevice cu_dev, int cache_enabled)
void uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr, const void *mapped_addr,
CUdevice cu_dev, int cache_enabled)
{
ucs_status_t status = UCS_OK;
const uct_cuda_ipc_cache_hash_key_t key = {pid, pid_ns, cu_dev};
uct_cuda_ipc_cache_t *cache;
ucs_pgt_region_t *pgt_region;
Expand All @@ -569,12 +596,13 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
* see uct_cuda_ipc_map_memhandle for more details */
if ((d_bptr == (uintptr_t)mapped_addr) &&
uct_cuda_ipc_is_rkey_local(pid, pid_ns)) {
return UCS_OK;
return;
}

status = uct_cuda_ipc_get_remote_cache(&key, &cache);
if (status != UCS_OK) {
return status;
if (!uct_cuda_ipc_get_remote_cache(&key, &cache)) {
ucs_debug("no remote cache found for key: %d:%u:%d", pid, pid_ns,
cu_dev);
return;
}

/* use write lock because cache maybe modified */
Expand All @@ -592,7 +620,6 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
}

pthread_rwlock_unlock(&cache->lock);
return status;
}

UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
Expand Down Expand Up @@ -626,7 +653,7 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
return UCS_OK;
}

status = uct_cuda_ipc_get_remote_cache(&hash_key, &cache);
status = uct_cuda_ipc_put_remote_cache(&hash_key, &cache);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -822,7 +849,7 @@ void uct_cuda_ipc_cache_set_global_limits(unsigned long max_regions,
}

UCS_STATIC_INIT {
ucs_recursive_spinlock_init(&uct_cuda_ipc_remote_cache.lock, 0);
ucs_rw_spinlock_init(&uct_cuda_ipc_remote_cache.lock);
kh_init_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
uct_cuda_ipc_remote_cache.max_regions = ULONG_MAX;
uct_cuda_ipc_remote_cache.max_size = SIZE_MAX;
Expand Down Expand Up @@ -855,5 +882,5 @@ UCS_STATIC_CLEANUP {
uct_cuda_ipc_destroy_cache(rem_cache);
})
kh_destroy_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
ucs_recursive_spinlock_destroy(&uct_cuda_ipc_remote_cache.lock);
ucs_rw_spinlock_cleanup(&uct_cuda_ipc_remote_cache.lock);
}
9 changes: 4 additions & 5 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2018. ALL RIGHTS RESERVED.
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2018-2026. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -68,10 +68,9 @@ ucs_status_t uct_cuda_ipc_map_memhandle(uct_cuda_ipc_extended_rkey_t *key,
ucs_log_level_t log_level);


ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr,
const void *mapped_addr,
CUdevice cu_dev, int cache_enabled);
void uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr, const void *mapped_addr,
CUdevice cu_dev, int cache_enabled);


/**
Expand Down
17 changes: 6 additions & 11 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2018-2019. ALL RIGHTS RESERVED.
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2018-2026. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

Expand Down Expand Up @@ -312,17 +312,12 @@ static void uct_cuda_ipc_complete_event(uct_iface_h tl_iface,
uct_cuda_ipc_iface_t);
uct_cuda_ipc_event_desc_t *cuda_ipc_event = ucs_derived_of(cuda_event,
uct_cuda_ipc_event_desc_t);
ucs_status_t status;

status = uct_cuda_ipc_unmap_memhandle(cuda_ipc_event->pid,
cuda_ipc_event->pid_ns,
cuda_ipc_event->d_bptr,
cuda_ipc_event->mapped_addr,
cuda_ipc_event->cuda_device,
iface->config.enable_cache);
if (status != UCS_OK) {
ucs_fatal("failed to unmap addr:%p", cuda_ipc_event->mapped_addr);
}
uct_cuda_ipc_unmap_memhandle(cuda_ipc_event->pid, cuda_ipc_event->pid_ns,
cuda_ipc_event->d_bptr,
cuda_ipc_event->mapped_addr,
cuda_ipc_event->cuda_device,
iface->config.enable_cache);
}

static uct_iface_ops_t uct_cuda_ipc_iface_ops = {
Expand Down