Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 128 additions & 84 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

#include "cuda_ipc_cache.h"
#include "cuda_ipc_iface.h"
#include "cuda_ipc.inl"

#include <ucs/datastruct/khash.h>
#include <ucs/debug/log.h>
#include <ucs/debug/memtrack_int.h>
#include <ucs/profile/profile.h>
#include <ucs/sys/sys.h>
#include <ucs/sys/string.h>
#include <ucs/sys/ptr_arith.h>
#include <ucs/datastruct/khash.h>
#include <ucs/type/rwlock.h>
#include <uct/cuda/base/cuda_ctx.inl>
#include "cuda_ipc.inl"

typedef struct uct_cuda_ipc_cache_hash_key {
pid_t pid;
ucs_sys_ns_t pid_ns;
Expand Down Expand Up @@ -59,7 +60,7 @@ KHASH_INIT(cuda_ipc_rem_cache, uct_cuda_ipc_cache_hash_key_t,
*/
typedef struct uct_cuda_ipc_remote_cache {
khash_t(cuda_ipc_rem_cache) hash;
ucs_recursive_spinlock_t lock;
ucs_rw_spinlock_t lock;
unsigned long max_regions; /**< Global max regions limit */
size_t max_size; /**< Global max total size limit */
} uct_cuda_ipc_remote_cache_t;
Expand Down Expand Up @@ -513,67 +514,70 @@ static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache,
cache->name, from, to);
}

static ucs_status_t
uct_cuda_ipc_get_remote_cache(const uct_cuda_ipc_cache_hash_key_t *key,
uct_cuda_ipc_cache_t **cache)
static uct_cuda_ipc_cache_t *
uct_cuda_ipc_remote_cache_get(uct_cuda_ipc_cache_hash_key_t key)
{
ucs_status_t status = UCS_OK;
char target_name[64];
khiter_t khiter;
int khret;

ucs_recursive_spin_lock(&uct_cuda_ipc_remote_cache.lock);

khiter = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, *key,
&khret);
if ((khret == UCS_KH_PUT_BUCKET_EMPTY) ||
(khret == UCS_KH_PUT_BUCKET_CLEAR)) {
ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%u:%d",
key->pid, key->pid_ns, key->cu_device);
status = uct_cuda_ipc_create_cache(cache, target_name);
if (status != UCS_OK) {
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, khiter);
ucs_error("could not create create cuda ipc cache: %s",
ucs_status_string(status));
goto err_unlock;
}
const khint_t it = kh_get(cuda_ipc_rem_cache,
&uct_cuda_ipc_remote_cache.hash, key);

kh_val(&uct_cuda_ipc_remote_cache.hash, khiter) = *cache;
} else if (khret == UCS_KH_PUT_KEY_PRESENT) {
*cache = kh_val(&uct_cuda_ipc_remote_cache.hash, khiter);
} else {
ucs_error("unable to use cuda_ipc remote_cache hash");
status = UCS_ERR_NO_RESOURCE;
if (it != kh_end(&uct_cuda_ipc_remote_cache.hash)) {
return kh_value(&uct_cuda_ipc_remote_cache.hash, it);
}
err_unlock:
ucs_recursive_spin_unlock(&uct_cuda_ipc_remote_cache.lock);
return status;

return NULL;
}

ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr,
const void *mapped_addr,
CUdevice cu_dev, int cache_enabled)
static ucs_status_t
uct_cuda_ipc_remote_cache_put(uct_cuda_ipc_cache_hash_key_t key,
uct_cuda_ipc_cache_t **cache_p)
{
ucs_status_t status = UCS_OK;
const uct_cuda_ipc_cache_hash_key_t key = {pid, pid_ns, cu_dev};
uct_cuda_ipc_cache_t *cache;
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;
int ret;
khint_t it;
char target_name[64];
ucs_status_t status;

/* checking if the mapped address is the same as the d_bptr
* this is true for the case of single process memory mapping
* see uct_cuda_ipc_map_memhandle for more details */
if ((d_bptr == (uintptr_t)mapped_addr) &&
uct_cuda_ipc_is_rkey_local(pid, pid_ns)) {
return UCS_OK;
cache = uct_cuda_ipc_remote_cache_get(key);
if (cache != NULL) {
goto out;
}

status = uct_cuda_ipc_get_remote_cache(&key, &cache);
it = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, key, &ret);
if (ret == UCS_KH_PUT_FAILED) {
ucs_error("failed to allocate cuda_ipc remote_cache hash entry");
return UCS_ERR_NO_MEMORY;
}

ucs_assertv_always(ret != UCS_KH_PUT_KEY_PRESENT, "key %d:%u:%d is present",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the case when 2 threads miss on read (line 548), then go here. The second kh_put will return UCS_KH_PUT_KEY_PRESENT -> assert fails.

key.pid, key.pid_ns, key.cu_device);
ucs_assertv_always((ret == UCS_KH_PUT_BUCKET_EMPTY) ||
Comment thread
rakhmets marked this conversation as resolved.
(ret == UCS_KH_PUT_BUCKET_CLEAR),
"invalid return value: %d", ret);

ucs_snprintf_safe(target_name, sizeof(target_name), "dest:%d:%u:%d",
key.pid, key.pid_ns, key.cu_device);
status = uct_cuda_ipc_create_cache(&cache, target_name);
if (status != UCS_OK) {
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, it);
ucs_error("failed to create cuda ipc cache: %s",
ucs_status_string(status));
return status;
}

kh_val(&uct_cuda_ipc_remote_cache.hash, it) = cache;

out:
*cache_p = cache;
return UCS_OK;
}

static void
uct_cuda_ipc_cache_destroy_region(uct_cuda_ipc_cache_t *cache, uintptr_t d_bptr,
const void *mapped_addr, int cache_enabled)
{
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;

/* use write lock because cache maybe modified */
pthread_rwlock_wrlock(&cache->lock);
pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup, &cache->pgtable, d_bptr);
Expand All @@ -589,44 +593,45 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
}

pthread_rwlock_unlock(&cache->lock);
return status;
}

UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
(ext_key, cu_dev, mapped_addr, log_level),
uct_cuda_ipc_extended_rkey_t *ext_key, CUdevice cu_dev,
void **mapped_addr, ucs_log_level_t log_level)
void uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr, const void *mapped_addr,
CUdevice cu_dev, int cache_enabled)
{
uct_cuda_ipc_rkey_t *key = &ext_key->super;
const uct_cuda_ipc_cache_hash_key_t hash_key = {key->pid, ext_key->pid_ns,
cu_dev};
const uct_cuda_ipc_cache_hash_key_t key = {pid, pid_ns, cu_dev};
uct_cuda_ipc_cache_t *cache;
ucs_status_t status;
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;
CUuuid uuid;
int ret;

status = UCT_CUDADRV_FUNC_LOG_ERR(cuDeviceGetUuid(&uuid, cu_dev));
if (status != UCS_OK) {
return status;
/* checking if the mapped address is the same as the d_bptr
* this is true for the case of single process memory mapping
* see uct_cuda_ipc_map_memhandle for more details */
if ((d_bptr == (uintptr_t)mapped_addr) &&
uct_cuda_ipc_is_rkey_local(pid, pid_ns)) {
return;
}

if (uct_cuda_ipc_is_rkey_local(key->pid, ext_key->pid_ns) &&
(memcmp(uuid.bytes, key->uuid.bytes, sizeof(uuid.bytes)) == 0)) {
/* TODO: added for test purpose to enable cuda_ipc tests in gtest
* mapped addrr is set to be same as d_bptr avoiding any calls to
* uct_cuda_ipc_open_memhandle which would fail with invalid argument
* error
*/
*mapped_addr = (CUdeviceptr*)key->d_bptr;
return UCS_OK;
ucs_rw_spinlock_read_lock(&uct_cuda_ipc_remote_cache.lock);
Comment thread
rakhmets marked this conversation as resolved.
cache = uct_cuda_ipc_remote_cache_get(key);
if (cache != NULL) {
uct_cuda_ipc_cache_destroy_region(cache, d_bptr, mapped_addr,
cache_enabled);
} else {
ucs_debug("no remote cache found for key: %d:%u:%d", pid, pid_ns,
cu_dev);
Comment thread
rakhmets marked this conversation as resolved.
}

status = uct_cuda_ipc_get_remote_cache(&hash_key, &cache);
if (status != UCS_OK) {
return status;
}
ucs_rw_spinlock_read_unlock(&uct_cuda_ipc_remote_cache.lock);
}

static ucs_status_t
uct_cuda_ipc_cache_put_region(uct_cuda_ipc_cache_t *cache,
uct_cuda_ipc_rkey_t *key, CUdevice cu_dev,
void **mapped_addr, ucs_log_level_t log_level)
{
ucs_pgt_region_t *pgt_region;
uct_cuda_ipc_cache_region_t *region;
ucs_status_t status;
int ret;

pthread_rwlock_wrlock(&cache->lock);
pgt_region = UCS_PROFILE_CALL(ucs_pgtable_lookup,
Expand Down Expand Up @@ -753,6 +758,45 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
return status;
}

UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
(ext_key, cu_dev, mapped_addr, log_level),
uct_cuda_ipc_extended_rkey_t *ext_key, CUdevice cu_dev,
void **mapped_addr, ucs_log_level_t log_level)
{
uct_cuda_ipc_rkey_t *key = &ext_key->super;
const uct_cuda_ipc_cache_hash_key_t hash_key = {key->pid, ext_key->pid_ns,
cu_dev};
uct_cuda_ipc_cache_t *cache;
ucs_status_t status;
CUuuid uuid;

status = UCT_CUDADRV_FUNC_LOG_ERR(cuDeviceGetUuid(&uuid, cu_dev));
if (status != UCS_OK) {
return status;
}

if (uct_cuda_ipc_is_rkey_local(key->pid, ext_key->pid_ns) &&
(memcmp(uuid.bytes, key->uuid.bytes, sizeof(uuid.bytes)) == 0)) {
/* TODO: added for test purpose to enable cuda_ipc tests in gtest
* mapped addrr is set to be same as d_bptr avoiding any calls to
* uct_cuda_ipc_open_memhandle which would fail with invalid argument
* error
*/
*mapped_addr = (CUdeviceptr*)key->d_bptr;
return UCS_OK;
}

ucs_rw_spinlock_write_lock(&uct_cuda_ipc_remote_cache.lock);
status = uct_cuda_ipc_remote_cache_put(hash_key, &cache);
if (status == UCS_OK) {
status = uct_cuda_ipc_cache_put_region(cache, key, cu_dev, mapped_addr,
log_level);
}

ucs_rw_spinlock_write_unlock(&uct_cuda_ipc_remote_cache.lock);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 BLOCKER

this serializes all map_memhandle calls as writers — the global write lock is held across cuIpcOpenMemHandle, pgtable insert, eviction, etc. The previous code released the outer lock before doing those heavy operations. Defeats the purpose of switching to rw_spinlock. Can we mirror the read-lock-first / upgrade-to-write-only-on-hash-insert pattern used in uct_cuda_ipc_open_memhandle_mempool?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot mirror the pattern used in uct_cuda_ipc_open_memhandle_mempool, as we can remove a cache entry in uct_cuda_ipc_destroy_cache_by_iface_address.

return status;
}

ucs_status_t uct_cuda_ipc_create_cache(uct_cuda_ipc_cache_t **cache,
const char *name)
{
Expand Down Expand Up @@ -837,7 +881,7 @@ void uct_cuda_ipc_destroy_cache_by_iface_address(
return;
}

ucs_recursive_spin_lock(&uct_cuda_ipc_remote_cache.lock);
ucs_rw_spinlock_write_lock(&uct_cuda_ipc_remote_cache.lock);

for (device_index = 0; device_index < num_devices; ++device_index) {
status = UCT_CUDADRV_FUNC_LOG_WARN(
Expand All @@ -859,11 +903,11 @@ void uct_cuda_ipc_destroy_cache_by_iface_address(
kh_del(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, khiter);
}

ucs_recursive_spin_unlock(&uct_cuda_ipc_remote_cache.lock);
ucs_rw_spinlock_write_unlock(&uct_cuda_ipc_remote_cache.lock);
}

UCS_STATIC_INIT {
ucs_recursive_spinlock_init(&uct_cuda_ipc_remote_cache.lock, 0);
ucs_rw_spinlock_init(&uct_cuda_ipc_remote_cache.lock);
kh_init_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
uct_cuda_ipc_remote_cache.max_regions = ULONG_MAX;
uct_cuda_ipc_remote_cache.max_size = SIZE_MAX;
Expand Down Expand Up @@ -896,5 +940,5 @@ UCS_STATIC_CLEANUP {
uct_cuda_ipc_destroy_cache(rem_cache);
})
kh_destroy_inplace(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash);
ucs_recursive_spinlock_destroy(&uct_cuda_ipc_remote_cache.lock);
ucs_rw_spinlock_cleanup(&uct_cuda_ipc_remote_cache.lock);
}
7 changes: 3 additions & 4 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ ucs_status_t uct_cuda_ipc_map_memhandle(uct_cuda_ipc_extended_rkey_t *key,
ucs_log_level_t log_level);


ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr,
const void *mapped_addr,
CUdevice cu_dev, int cache_enabled);
void uct_cuda_ipc_unmap_memhandle(pid_t pid, ucs_sys_ns_t pid_ns,
uintptr_t d_bptr, const void *mapped_addr,
CUdevice cu_dev, int cache_enabled);


/**
Expand Down
15 changes: 5 additions & 10 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,12 @@ static void uct_cuda_ipc_complete_event(uct_iface_h tl_iface,
uct_cuda_ipc_iface_t);
uct_cuda_ipc_event_desc_t *cuda_ipc_event = ucs_derived_of(cuda_event,
uct_cuda_ipc_event_desc_t);
ucs_status_t status;

status = uct_cuda_ipc_unmap_memhandle(cuda_ipc_event->pid,
cuda_ipc_event->pid_ns,
cuda_ipc_event->d_bptr,
cuda_ipc_event->mapped_addr,
cuda_ipc_event->cuda_device,
iface->config.enable_cache);
if (status != UCS_OK) {
ucs_fatal("failed to unmap addr:%p", cuda_ipc_event->mapped_addr);
}
uct_cuda_ipc_unmap_memhandle(cuda_ipc_event->pid, cuda_ipc_event->pid_ns,
cuda_ipc_event->d_bptr,
cuda_ipc_event->mapped_addr,
cuda_ipc_event->cuda_device,
iface->config.enable_cache);
}

static uct_iface_ops_t uct_cuda_ipc_iface_ops = {
Expand Down
Loading