From 9ce3eaa94da65026bf64b2ca82b0c1e5649d788b Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Sat, 8 Dec 2018 09:19:53 -0600 Subject: [PATCH] improved API for bulk pools --- include/margo-bulk-pool.h | 163 +++++++++++++++++++++++++++++++------- src/margo-bulk-pool.c | 147 +++++++++++++++++++++------------- 2 files changed, 227 insertions(+), 83 deletions(-) diff --git a/include/margo-bulk-pool.h b/include/margo-bulk-pool.h index a785be2..932b749 100644 --- a/include/margo-bulk-pool.h +++ b/include/margo-bulk-pool.h @@ -19,32 +19,100 @@ extern "C" { /* A collection of fixed-size, fixed-permission reusable bulk buffers */ struct margo_bulk_pool; -typedef struct margo_bulk_pool margo_bulk_pool_t; +typedef struct margo_bulk_pool* margo_bulk_pool_t; + +#define MARGO_BULK_POOL_NULL ((margo_bulk_pool_t)NULL) /* A collection of margo_bulk_pool's, each of varying sizes */ struct margo_bulk_poolset; -typedef struct margo_bulk_poolset margo_bulk_poolset_t; - +typedef struct margo_bulk_poolset* margo_bulk_poolset_t; + +#define MARGO_BULK_POOLSET_NULL ((margo_bulk_poolset_t)NULL) + +/** + * @brief Creates a pool of buffers and bulk handles of a given size. + * + * @param[in] mid Margo instance with which to create the bulk handles + * @param[in] count Number of bulk handles contained in the pool + * @param[in] size Size of each bulk buffer + * @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE + * @param[out] pool Resulting pool of bulk handles + * + * @return HG_SUCCESS in case of success, or HG error codes in case of failure. + */ hg_return_t margo_bulk_pool_create( margo_instance_id mid, hg_size_t count, hg_size_t size, hg_uint8_t flag, - margo_bulk_pool_t **pool); - -void margo_bulk_pool_destroy( - margo_bulk_pool_t *pool); - -hg_bulk_t margo_bulk_pool_get( margo_bulk_pool_t *pool); -hg_bulk_t margo_bulk_pool_tryget( - margo_bulk_pool_t *pool); - -void margo_bulk_pool_release( - margo_bulk_pool_t *pool, +/** + * @brief Destroys a margo_bulk_pool_t object, clearing all bulk handles and + * and freeing the buffers. The pool should not be in use (i.e. none of the + * internal buffers should be in use) when this call happens. + * + * @param pool margo_bulk_pool_t object to destroy. + * + * @return 0 in case of success, -1 in case of failure. + */ +int margo_bulk_pool_destroy( + margo_bulk_pool_t pool); + +/** + * @brief Gets a free hg_bulk_t object from the pool. This function will block + * until a bulk object is available. + * + * @param[in] pool margo_bulk_pool_t object from which to take the hg_bulk_t. + * @param[out] bulk hg_bulk_t object (guaranteed not to be HG_BULK_NULL if the returned value is 0). + * + * @return 0 in case of success, -1 in case of failure. + */ +int margo_bulk_pool_get( + margo_bulk_pool_t pool, + hg_bulk_t* bulk); + +/** + * @brief Gets a free hg_bulk_t object from the pool. This function will return + * HG_BULK_NULL if no hg_bulk_t object is free at the moment of the call. + * + * @param pool margo_bulk_pool_t object from which to take the hg_bulk_t. + * @param bulk resulting bulk handle (can be HG_BULK_NULL). + * + * @return 0 in case of success, -1 in case of failure.. + */ +int margo_bulk_pool_tryget( + margo_bulk_pool_t pool, + hg_bulk_t* bulk); + +/** + * @brief Puts a bulk handle back in the pool. Note that the function is expecting + * the bulk handle to have been taken from the pool in the first place. The function + * will return -1 if the bulk was not associated with this pool to begin with. + * + * @param pool margo_bulk_pool_t object to which to return the bulk handle. + * @param bulk Bulk handle to release. + * + * @return 0 in case of success, -1 in case of failure. + */ +int margo_bulk_pool_release( + margo_bulk_pool_t pool, hg_bulk_t bulk); +/** + * @brief Creates a poolset. A poolset is a set of pools with the same number of + * buffers in each pool, and buffer sizes increasing exponentially with the pool number. + * + * @param[in] mid Margo instance + * @param[in] npools Number of pools in the poolset. + * @param[in] nbufs Number of buffers in each pool. + * @param[in] first_size Size of buffers in the first pool. + * @param[in] size_multiple Factor by which to multiply the size of the previous pool to get the size of the next. + * @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE. + * @param[out] poolset Resulting poolset. + * + * @return HG_SUCCESS of other HG error codes. + */ hg_return_t margo_bulk_poolset_create( margo_instance_id mid, hg_size_t npools, @@ -52,22 +120,63 @@ hg_return_t margo_bulk_poolset_create( hg_size_t first_size, hg_size_t size_multiple, hg_uint8_t flag, - margo_bulk_poolset_t **poolset); - -void margo_bulk_poolset_destroy( margo_bulk_poolset_t *poolset); -hg_bulk_t margo_bulk_poolset_get( - margo_bulk_poolset_t *poolset, - hg_size_t size); - -hg_bulk_t margo_bulk_poolset_tryget( - margo_bulk_poolset_t *poolset, +/** + * @brief Destroy a poolset. The poolset must not be in use when this function is called. + * + * @param poolset Poolset to destroy. + * + * @return 0 in case of success, -1 in case of failure. + */ +int margo_bulk_poolset_destroy( + margo_bulk_poolset_t poolset); + +/** + * @brief Gets a bulk handle from the pool with the minimum size required to satisfy + * the provided size. May block until the pool has a bulk handle available. + * + * @param poolset Poolset from which to get the bulk handle. + * @param size Size of the buffer needed. + * @param bulk Resulting bulk handle. + * + * @return 0 in case of success, -1 in case of failure. + */ +int margo_bulk_poolset_get( + margo_bulk_poolset_t poolset, hg_size_t size, - hg_bool_t any_flag); - -void margo_bulk_poolset_release( - margo_bulk_poolset_t *poolset, + hg_bulk_t* bulk); + +/** + * @brief Try getting a bulk handle from the poolset. If any_flag is HG_TRUE, + * this function will search in pools of increasingly larger buffers until it + * finds one (or return HG_BULK_NULL if it doesn't). If any_flag is HG_FALSE, + * this function will only search in the pool with the minimum required size. + * + * @param poolset Poolset in which to get a handle. + * @param size Size required. + * @param any_flag Whether to look in increasingly larger pools or not. + * @param bulk Resulting bulk handle. + * + * @return 0 in case of success (bulk = HG_BULK_NULL is also considered success), + * -1 in case of failure. + */ +int margo_bulk_poolset_tryget( + margo_bulk_poolset_t poolset, + hg_size_t size, + hg_bool_t any_flag, + hg_bulk_t* bulk); + +/** + * @brief Puts a bulk handle back in its pool. + * + * @param poolset Poolset. + * @param bulk Bulk to release. + * + * @return 0 in case of success, -1 in case of success. + */ +int margo_bulk_poolset_release( + margo_bulk_poolset_t poolset, hg_bulk_t bulk); #ifdef __cplusplus diff --git a/src/margo-bulk-pool.c b/src/margo-bulk-pool.c index 910688a..837b4c7 100644 --- a/src/margo-bulk-pool.c +++ b/src/margo-bulk-pool.c @@ -27,7 +27,7 @@ struct margo_bulk_pool struct margo_bulk_poolset { - margo_bulk_pool_t **pools; + margo_bulk_pool_t *pools; hg_size_t npools; hg_size_t nbufs; hg_size_t first_size; @@ -39,11 +39,11 @@ hg_return_t margo_bulk_pool_create( hg_size_t count, hg_size_t size, hg_uint8_t flag, - margo_bulk_pool_t **pool) + margo_bulk_pool_t *pool) { int ret; hg_return_t hret; - margo_bulk_pool_t *p; + margo_bulk_pool_t p; hg_size_t i; p = malloc(sizeof(*p)); @@ -53,7 +53,7 @@ hg_return_t margo_bulk_pool_create( goto err; } - p->buf = malloc(count * size); + p->buf = calloc(count, size); if (p->buf == NULL) { hret = HG_NOMEM_ERROR; @@ -65,7 +65,7 @@ hg_return_t margo_bulk_pool_create( p->size = size; p->num_free = count; p->flag = flag; - p->bulks = malloc(count * sizeof(*p->bulks)); + p->bulks = calloc(count, sizeof(*p->bulks)); if (p->bulks == NULL) { hret = HG_NOMEM_ERROR; @@ -117,79 +117,101 @@ err: return hret; } -void margo_bulk_pool_destroy( - margo_bulk_pool_t *pool) +int margo_bulk_pool_destroy( + margo_bulk_pool_t pool) { hg_size_t i; + if(pool->size != pool->count) + return -1; + ABT_cond_free(&pool->cond); ABT_mutex_free(&pool->mutex); - for (i = 0; i < pool->count; i++) margo_bulk_free(pool->bulks[i]); + for (i = 0; i < pool->count; i++) { + margo_bulk_free(pool->bulks[i]); + } free(pool->bulks); free(pool->buf); free(pool); - return; + return 0; } -static inline hg_bulk_t margo_bp_get(margo_bulk_pool_t *pool) +static inline hg_bulk_t margo_bp_get(margo_bulk_pool_t pool) { return pool->bulks[pool->count - pool->num_free--]; } -static inline hg_bulk_t margo_bp_tryget(margo_bulk_pool_t *pool) +static inline hg_bulk_t margo_bp_tryget(margo_bulk_pool_t pool) { return pool->num_free == 0 ? HG_BULK_NULL : margo_bp_get(pool); } -hg_bulk_t margo_bulk_pool_get( - margo_bulk_pool_t *pool) +int margo_bulk_pool_get( + margo_bulk_pool_t pool, + hg_bulk_t* bulk) { - hg_bulk_t bulk; + if(pool == MARGO_BULK_POOL_NULL) + return -1; + + hg_bulk_t b; ABT_mutex_lock(pool->mutex); while (pool->num_free == 0) ABT_cond_wait(pool->cond, pool->mutex); assert(pool->num_free > 0); - bulk = margo_bp_get(pool); + b = margo_bp_get(pool); ABT_mutex_unlock(pool->mutex); - return bulk; + *bulk = b; + return 0; } -hg_bulk_t margo_bulk_pool_tryget( - margo_bulk_pool_t *pool) +int margo_bulk_pool_tryget( + margo_bulk_pool_t pool, + hg_bulk_t* bulk) { - hg_bulk_t bulk; + if(pool == MARGO_BULK_POOL_NULL) + return -1; + + hg_bulk_t b; ABT_mutex_lock(pool->mutex); - bulk = margo_bp_tryget(pool); + b = margo_bp_tryget(pool); ABT_mutex_unlock(pool->mutex); - return bulk; + *bulk = b; + return 0; } -static inline void margo_bp_release(margo_bulk_pool_t *pool, hg_bulk_t bulk) +static inline void margo_bp_release(margo_bulk_pool_t pool, hg_bulk_t bulk) { pool->bulks[pool->count - ++pool->num_free] = bulk; } -void margo_bulk_pool_release( - margo_bulk_pool_t *pool, +int margo_bulk_pool_release( + margo_bulk_pool_t pool, hg_bulk_t bulk) { - if (bulk == HG_BULK_NULL) return; - - assert(pool->size == margo_bulk_get_size(bulk)); + if (pool == MARGO_BULK_POOL_NULL) return -1; + if (bulk == HG_BULK_NULL) return -1; + if (pool->size != margo_bulk_get_size(bulk)) return -1; + + void* buf_ptr = NULL; + hg_size_t buf_size = 0; + hg_uint32_t actual_count = 0; + hg_return_t hret = margo_bulk_access(bulk, 0, pool->size, pool->flag, 1, + &buf_ptr, &buf_size, &actual_count); + if(hret != HG_SUCCESS) return -1; + if(buf_ptr < pool->buf || buf_ptr + buf_size > pool->buf + pool->size*pool->count) return -1; ABT_mutex_lock(pool->mutex); - assert(pool->num_free != pool->count); margo_bp_release(pool, bulk); ABT_cond_signal(pool->cond); ABT_mutex_unlock(pool->mutex); - return; + return 0; } hg_return_t margo_bulk_poolset_create( @@ -199,21 +221,21 @@ hg_return_t margo_bulk_poolset_create( hg_size_t first_size, hg_size_t size_multiple, hg_uint8_t flag, - margo_bulk_poolset_t **poolset) + margo_bulk_poolset_t *poolset) { - margo_bulk_poolset_t *s; + margo_bulk_poolset_t s; hg_size_t i = 0, j, size; hg_return_t hret; assert(npools > 0 && nbufs > 0 && first_size > 0 && size_multiple > 1); - s = malloc(sizeof(*s)); + s = calloc(1, sizeof(*s)); if (s == NULL) { hret = HG_NOMEM_ERROR; goto err; } - s->pools = malloc(npools * sizeof(*s->pools)); + s->pools = calloc(npools, sizeof(*s->pools)); if (s->pools == NULL) { hret = HG_NOMEM_ERROR; @@ -251,23 +273,30 @@ err: return hret; } -void margo_bulk_poolset_destroy( - margo_bulk_poolset_t *poolset) +int margo_bulk_poolset_destroy( + margo_bulk_poolset_t poolset) { hg_size_t i; - for (i = 0; i < poolset->npools; i++) - margo_bulk_pool_destroy(poolset->pools[i]); + int ret = 0; + + for (i = 0; i < poolset->npools; i++) { + int r = margo_bulk_pool_destroy(poolset->pools[i]); + if(ret == 0 && r != 0) ret = r; + } free(poolset->pools); free(poolset); - return; + return ret; } -hg_bulk_t margo_bulk_poolset_get( - margo_bulk_poolset_t *poolset, - hg_size_t size) +int margo_bulk_poolset_get( + margo_bulk_poolset_t poolset, + hg_size_t size, + hg_bulk_t* bulk) { + if(poolset == MARGO_BULK_POOLSET_NULL) return -1; + hg_size_t i; hg_size_t this_size = poolset->first_size; hg_size_t size_mult = poolset->size_multiple; @@ -275,19 +304,22 @@ hg_bulk_t margo_bulk_poolset_get( for (i = 0; i < poolset->npools; i++) { if (size <= this_size) - return margo_bulk_pool_get(poolset->pools[i]); + return margo_bulk_pool_get(poolset->pools[i], bulk); this_size *= size_mult; } - return HG_BULK_NULL; + return -1; } -hg_bulk_t margo_bulk_poolset_tryget( - margo_bulk_poolset_t *poolset, +int margo_bulk_poolset_tryget( + margo_bulk_poolset_t poolset, hg_size_t size, - hg_bool_t any_flag) + hg_bool_t any_flag, + hg_bulk_t* bulk) { - hg_bulk_t bulk; + if(poolset == MARGO_BULK_POOLSET_NULL) return -1; + + hg_bulk_t b = HG_BULK_NULL; hg_size_t i; hg_size_t this_size = poolset->first_size; hg_size_t size_mult = poolset->size_multiple; @@ -296,20 +328,24 @@ hg_bulk_t margo_bulk_poolset_tryget( { if (size <= this_size) { - bulk = margo_bulk_pool_tryget(poolset->pools[i]); - if (bulk != HG_BULK_NULL || any_flag == HG_FALSE) return bulk; + margo_bulk_pool_tryget(poolset->pools[i], &b); + if (b != HG_BULK_NULL || any_flag == HG_FALSE) { + *bulk = b; + return 0; + } } this_size *= size_mult; } - return HG_BULK_NULL; + return 0; } -void margo_bulk_poolset_release( - margo_bulk_poolset_t *poolset, +int margo_bulk_poolset_release( + margo_bulk_poolset_t poolset, hg_bulk_t bulk) { - if (bulk == HG_BULK_NULL) return; + if (poolset == MARGO_BULK_POOLSET_NULL) return -1; + if (bulk == HG_BULK_NULL) return -1; hg_size_t bulk_size = HG_Bulk_get_size(bulk); hg_size_t i; @@ -320,10 +356,9 @@ void margo_bulk_poolset_release( { if (bulk_size == size) { - margo_bulk_pool_release(poolset->pools[i], bulk); - return; + return margo_bulk_pool_release(poolset->pools[i], bulk); } else size *= size_mult; } - assert(0); /* should not reach this ... */ + return -1; } -- 2.22.0