Commit 9ce3eaa9 authored by Matthieu Dorier's avatar Matthieu Dorier

improved API for bulk pools

parent 364f39b3
...@@ -19,32 +19,100 @@ extern "C" { ...@@ -19,32 +19,100 @@ extern "C" {
/* A collection of fixed-size, fixed-permission reusable bulk buffers */ /* A collection of fixed-size, fixed-permission reusable bulk buffers */
struct margo_bulk_pool; struct margo_bulk_pool;
typedef struct margo_bulk_pool margo_bulk_pool_t; typedef struct margo_bulk_pool* margo_bulk_pool_t;
#define MARGO_BULK_POOL_NULL ((margo_bulk_pool_t)NULL)
/* A collection of margo_bulk_pool's, each of varying sizes */ /* A collection of margo_bulk_pool's, each of varying sizes */
struct margo_bulk_poolset; struct margo_bulk_poolset;
typedef struct margo_bulk_poolset margo_bulk_poolset_t; typedef struct margo_bulk_poolset* margo_bulk_poolset_t;
#define MARGO_BULK_POOLSET_NULL ((margo_bulk_poolset_t)NULL)
/**
* @brief Creates a pool of buffers and bulk handles of a given size.
*
* @param[in] mid Margo instance with which to create the bulk handles
* @param[in] count Number of bulk handles contained in the pool
* @param[in] size Size of each bulk buffer
* @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE
* @param[out] pool Resulting pool of bulk handles
*
* @return HG_SUCCESS in case of success, or HG error codes in case of failure.
*/
hg_return_t margo_bulk_pool_create( hg_return_t margo_bulk_pool_create(
margo_instance_id mid, margo_instance_id mid,
hg_size_t count, hg_size_t count,
hg_size_t size, hg_size_t size,
hg_uint8_t flag, hg_uint8_t flag,
margo_bulk_pool_t **pool);
void margo_bulk_pool_destroy(
margo_bulk_pool_t *pool);
hg_bulk_t margo_bulk_pool_get(
margo_bulk_pool_t *pool); margo_bulk_pool_t *pool);
hg_bulk_t margo_bulk_pool_tryget( /**
margo_bulk_pool_t *pool); * @brief Destroys a margo_bulk_pool_t object, clearing all bulk handles and
* and freeing the buffers. The pool should not be in use (i.e. none of the
void margo_bulk_pool_release( * internal buffers should be in use) when this call happens.
margo_bulk_pool_t *pool, *
* @param pool margo_bulk_pool_t object to destroy.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_destroy(
margo_bulk_pool_t pool);
/**
* @brief Gets a free hg_bulk_t object from the pool. This function will block
* until a bulk object is available.
*
* @param[in] pool margo_bulk_pool_t object from which to take the hg_bulk_t.
* @param[out] bulk hg_bulk_t object (guaranteed not to be HG_BULK_NULL if the returned value is 0).
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_get(
margo_bulk_pool_t pool,
hg_bulk_t* bulk);
/**
* @brief Gets a free hg_bulk_t object from the pool. This function will return
* HG_BULK_NULL if no hg_bulk_t object is free at the moment of the call.
*
* @param pool margo_bulk_pool_t object from which to take the hg_bulk_t.
* @param bulk resulting bulk handle (can be HG_BULK_NULL).
*
* @return 0 in case of success, -1 in case of failure..
*/
int margo_bulk_pool_tryget(
margo_bulk_pool_t pool,
hg_bulk_t* bulk);
/**
* @brief Puts a bulk handle back in the pool. Note that the function is expecting
* the bulk handle to have been taken from the pool in the first place. The function
* will return -1 if the bulk was not associated with this pool to begin with.
*
* @param pool margo_bulk_pool_t object to which to return the bulk handle.
* @param bulk Bulk handle to release.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_release(
margo_bulk_pool_t pool,
hg_bulk_t bulk); hg_bulk_t bulk);
/**
* @brief Creates a poolset. A poolset is a set of pools with the same number of
* buffers in each pool, and buffer sizes increasing exponentially with the pool number.
*
* @param[in] mid Margo instance
* @param[in] npools Number of pools in the poolset.
* @param[in] nbufs Number of buffers in each pool.
* @param[in] first_size Size of buffers in the first pool.
* @param[in] size_multiple Factor by which to multiply the size of the previous pool to get the size of the next.
* @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE.
* @param[out] poolset Resulting poolset.
*
* @return HG_SUCCESS of other HG error codes.
*/
hg_return_t margo_bulk_poolset_create( hg_return_t margo_bulk_poolset_create(
margo_instance_id mid, margo_instance_id mid,
hg_size_t npools, hg_size_t npools,
...@@ -52,22 +120,63 @@ hg_return_t margo_bulk_poolset_create( ...@@ -52,22 +120,63 @@ hg_return_t margo_bulk_poolset_create(
hg_size_t first_size, hg_size_t first_size,
hg_size_t size_multiple, hg_size_t size_multiple,
hg_uint8_t flag, hg_uint8_t flag,
margo_bulk_poolset_t **poolset);
void margo_bulk_poolset_destroy(
margo_bulk_poolset_t *poolset); margo_bulk_poolset_t *poolset);
hg_bulk_t margo_bulk_poolset_get( /**
margo_bulk_poolset_t *poolset, * @brief Destroy a poolset. The poolset must not be in use when this function is called.
hg_size_t size); *
* @param poolset Poolset to destroy.
hg_bulk_t margo_bulk_poolset_tryget( *
margo_bulk_poolset_t *poolset, * @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_poolset_destroy(
margo_bulk_poolset_t poolset);
/**
* @brief Gets a bulk handle from the pool with the minimum size required to satisfy
* the provided size. May block until the pool has a bulk handle available.
*
* @param poolset Poolset from which to get the bulk handle.
* @param size Size of the buffer needed.
* @param bulk Resulting bulk handle.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_poolset_get(
margo_bulk_poolset_t poolset,
hg_size_t size, hg_size_t size,
hg_bool_t any_flag); hg_bulk_t* bulk);
void margo_bulk_poolset_release( /**
margo_bulk_poolset_t *poolset, * @brief Try getting a bulk handle from the poolset. If any_flag is HG_TRUE,
* this function will search in pools of increasingly larger buffers until it
* finds one (or return HG_BULK_NULL if it doesn't). If any_flag is HG_FALSE,
* this function will only search in the pool with the minimum required size.
*
* @param poolset Poolset in which to get a handle.
* @param size Size required.
* @param any_flag Whether to look in increasingly larger pools or not.
* @param bulk Resulting bulk handle.
*
* @return 0 in case of success (bulk = HG_BULK_NULL is also considered success),
* -1 in case of failure.
*/
int margo_bulk_poolset_tryget(
margo_bulk_poolset_t poolset,
hg_size_t size,
hg_bool_t any_flag,
hg_bulk_t* bulk);
/**
* @brief Puts a bulk handle back in its pool.
*
* @param poolset Poolset.
* @param bulk Bulk to release.
*
* @return 0 in case of success, -1 in case of success.
*/
int margo_bulk_poolset_release(
margo_bulk_poolset_t poolset,
hg_bulk_t bulk); hg_bulk_t bulk);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -27,7 +27,7 @@ struct margo_bulk_pool ...@@ -27,7 +27,7 @@ struct margo_bulk_pool
struct margo_bulk_poolset struct margo_bulk_poolset
{ {
margo_bulk_pool_t **pools; margo_bulk_pool_t *pools;
hg_size_t npools; hg_size_t npools;
hg_size_t nbufs; hg_size_t nbufs;
hg_size_t first_size; hg_size_t first_size;
...@@ -39,11 +39,11 @@ hg_return_t margo_bulk_pool_create( ...@@ -39,11 +39,11 @@ hg_return_t margo_bulk_pool_create(
hg_size_t count, hg_size_t count,
hg_size_t size, hg_size_t size,
hg_uint8_t flag, hg_uint8_t flag,
margo_bulk_pool_t **pool) margo_bulk_pool_t *pool)
{ {
int ret; int ret;
hg_return_t hret; hg_return_t hret;
margo_bulk_pool_t *p; margo_bulk_pool_t p;
hg_size_t i; hg_size_t i;
p = malloc(sizeof(*p)); p = malloc(sizeof(*p));
...@@ -53,7 +53,7 @@ hg_return_t margo_bulk_pool_create( ...@@ -53,7 +53,7 @@ hg_return_t margo_bulk_pool_create(
goto err; goto err;
} }
p->buf = malloc(count * size); p->buf = calloc(count, size);
if (p->buf == NULL) if (p->buf == NULL)
{ {
hret = HG_NOMEM_ERROR; hret = HG_NOMEM_ERROR;
...@@ -65,7 +65,7 @@ hg_return_t margo_bulk_pool_create( ...@@ -65,7 +65,7 @@ hg_return_t margo_bulk_pool_create(
p->size = size; p->size = size;
p->num_free = count; p->num_free = count;
p->flag = flag; p->flag = flag;
p->bulks = malloc(count * sizeof(*p->bulks)); p->bulks = calloc(count, sizeof(*p->bulks));
if (p->bulks == NULL) if (p->bulks == NULL)
{ {
hret = HG_NOMEM_ERROR; hret = HG_NOMEM_ERROR;
...@@ -117,79 +117,101 @@ err: ...@@ -117,79 +117,101 @@ err:
return hret; return hret;
} }
void margo_bulk_pool_destroy( int margo_bulk_pool_destroy(
margo_bulk_pool_t *pool) margo_bulk_pool_t pool)
{ {
hg_size_t i; hg_size_t i;
if(pool->size != pool->count)
return -1;
ABT_cond_free(&pool->cond); ABT_cond_free(&pool->cond);
ABT_mutex_free(&pool->mutex); ABT_mutex_free(&pool->mutex);
for (i = 0; i < pool->count; i++) margo_bulk_free(pool->bulks[i]); for (i = 0; i < pool->count; i++) {
margo_bulk_free(pool->bulks[i]);
}
free(pool->bulks); free(pool->bulks);
free(pool->buf); free(pool->buf);
free(pool); free(pool);
return; return 0;
} }
static inline hg_bulk_t margo_bp_get(margo_bulk_pool_t *pool) static inline hg_bulk_t margo_bp_get(margo_bulk_pool_t pool)
{ {
return pool->bulks[pool->count - pool->num_free--]; return pool->bulks[pool->count - pool->num_free--];
} }
static inline hg_bulk_t margo_bp_tryget(margo_bulk_pool_t *pool) static inline hg_bulk_t margo_bp_tryget(margo_bulk_pool_t pool)
{ {
return pool->num_free == 0 ? HG_BULK_NULL : margo_bp_get(pool); return pool->num_free == 0 ? HG_BULK_NULL : margo_bp_get(pool);
} }
hg_bulk_t margo_bulk_pool_get( int margo_bulk_pool_get(
margo_bulk_pool_t *pool) margo_bulk_pool_t pool,
hg_bulk_t* bulk)
{ {
hg_bulk_t bulk; if(pool == MARGO_BULK_POOL_NULL)
return -1;
hg_bulk_t b;
ABT_mutex_lock(pool->mutex); ABT_mutex_lock(pool->mutex);
while (pool->num_free == 0) while (pool->num_free == 0)
ABT_cond_wait(pool->cond, pool->mutex); ABT_cond_wait(pool->cond, pool->mutex);
assert(pool->num_free > 0); assert(pool->num_free > 0);
bulk = margo_bp_get(pool); b = margo_bp_get(pool);
ABT_mutex_unlock(pool->mutex); ABT_mutex_unlock(pool->mutex);
return bulk; *bulk = b;
return 0;
} }
hg_bulk_t margo_bulk_pool_tryget( int margo_bulk_pool_tryget(
margo_bulk_pool_t *pool) margo_bulk_pool_t pool,
hg_bulk_t* bulk)
{ {
hg_bulk_t bulk; if(pool == MARGO_BULK_POOL_NULL)
return -1;
hg_bulk_t b;
ABT_mutex_lock(pool->mutex); ABT_mutex_lock(pool->mutex);
bulk = margo_bp_tryget(pool); b = margo_bp_tryget(pool);
ABT_mutex_unlock(pool->mutex); ABT_mutex_unlock(pool->mutex);
return bulk; *bulk = b;
return 0;
} }
static inline void margo_bp_release(margo_bulk_pool_t *pool, hg_bulk_t bulk) static inline void margo_bp_release(margo_bulk_pool_t pool, hg_bulk_t bulk)
{ {
pool->bulks[pool->count - ++pool->num_free] = bulk; pool->bulks[pool->count - ++pool->num_free] = bulk;
} }
void margo_bulk_pool_release( int margo_bulk_pool_release(
margo_bulk_pool_t *pool, margo_bulk_pool_t pool,
hg_bulk_t bulk) hg_bulk_t bulk)
{ {
if (bulk == HG_BULK_NULL) return; if (pool == MARGO_BULK_POOL_NULL) return -1;
if (bulk == HG_BULK_NULL) return -1;
assert(pool->size == margo_bulk_get_size(bulk)); if (pool->size != margo_bulk_get_size(bulk)) return -1;
void* buf_ptr = NULL;
hg_size_t buf_size = 0;
hg_uint32_t actual_count = 0;
hg_return_t hret = margo_bulk_access(bulk, 0, pool->size, pool->flag, 1,
&buf_ptr, &buf_size, &actual_count);
if(hret != HG_SUCCESS) return -1;
if(buf_ptr < pool->buf || buf_ptr + buf_size > pool->buf + pool->size*pool->count) return -1;
ABT_mutex_lock(pool->mutex); ABT_mutex_lock(pool->mutex);
assert(pool->num_free != pool->count);
margo_bp_release(pool, bulk); margo_bp_release(pool, bulk);
ABT_cond_signal(pool->cond); ABT_cond_signal(pool->cond);
ABT_mutex_unlock(pool->mutex); ABT_mutex_unlock(pool->mutex);
return; return 0;
} }
hg_return_t margo_bulk_poolset_create( hg_return_t margo_bulk_poolset_create(
...@@ -199,21 +221,21 @@ hg_return_t margo_bulk_poolset_create( ...@@ -199,21 +221,21 @@ hg_return_t margo_bulk_poolset_create(
hg_size_t first_size, hg_size_t first_size,
hg_size_t size_multiple, hg_size_t size_multiple,
hg_uint8_t flag, hg_uint8_t flag,
margo_bulk_poolset_t **poolset) margo_bulk_poolset_t *poolset)
{ {
margo_bulk_poolset_t *s; margo_bulk_poolset_t s;
hg_size_t i = 0, j, size; hg_size_t i = 0, j, size;
hg_return_t hret; hg_return_t hret;
assert(npools > 0 && nbufs > 0 && first_size > 0 && size_multiple > 1); assert(npools > 0 && nbufs > 0 && first_size > 0 && size_multiple > 1);
s = malloc(sizeof(*s)); s = calloc(1, sizeof(*s));
if (s == NULL) if (s == NULL)
{ {
hret = HG_NOMEM_ERROR; hret = HG_NOMEM_ERROR;
goto err; goto err;
} }
s->pools = malloc(npools * sizeof(*s->pools)); s->pools = calloc(npools, sizeof(*s->pools));
if (s->pools == NULL) if (s->pools == NULL)
{ {
hret = HG_NOMEM_ERROR; hret = HG_NOMEM_ERROR;
...@@ -251,23 +273,30 @@ err: ...@@ -251,23 +273,30 @@ err:
return hret; return hret;
} }
void margo_bulk_poolset_destroy( int margo_bulk_poolset_destroy(
margo_bulk_poolset_t *poolset) margo_bulk_poolset_t poolset)
{ {
hg_size_t i; hg_size_t i;
for (i = 0; i < poolset->npools; i++) int ret = 0;
margo_bulk_pool_destroy(poolset->pools[i]);
for (i = 0; i < poolset->npools; i++) {
int r = margo_bulk_pool_destroy(poolset->pools[i]);
if(ret == 0 && r != 0) ret = r;
}
free(poolset->pools); free(poolset->pools);
free(poolset); free(poolset);
return; return ret;
} }
hg_bulk_t margo_bulk_poolset_get( int margo_bulk_poolset_get(
margo_bulk_poolset_t *poolset, margo_bulk_poolset_t poolset,
hg_size_t size) hg_size_t size,
hg_bulk_t* bulk)
{ {
if(poolset == MARGO_BULK_POOLSET_NULL) return -1;
hg_size_t i; hg_size_t i;
hg_size_t this_size = poolset->first_size; hg_size_t this_size = poolset->first_size;
hg_size_t size_mult = poolset->size_multiple; hg_size_t size_mult = poolset->size_multiple;
...@@ -275,19 +304,22 @@ hg_bulk_t margo_bulk_poolset_get( ...@@ -275,19 +304,22 @@ hg_bulk_t margo_bulk_poolset_get(
for (i = 0; i < poolset->npools; i++) for (i = 0; i < poolset->npools; i++)
{ {
if (size <= this_size) if (size <= this_size)
return margo_bulk_pool_get(poolset->pools[i]); return margo_bulk_pool_get(poolset->pools[i], bulk);
this_size *= size_mult; this_size *= size_mult;
} }
return HG_BULK_NULL; return -1;
} }
hg_bulk_t margo_bulk_poolset_tryget( int margo_bulk_poolset_tryget(
margo_bulk_poolset_t *poolset, margo_bulk_poolset_t poolset,
hg_size_t size, hg_size_t size,
hg_bool_t any_flag) hg_bool_t any_flag,
hg_bulk_t* bulk)
{ {
hg_bulk_t bulk; if(poolset == MARGO_BULK_POOLSET_NULL) return -1;
hg_bulk_t b = HG_BULK_NULL;
hg_size_t i; hg_size_t i;
hg_size_t this_size = poolset->first_size; hg_size_t this_size = poolset->first_size;
hg_size_t size_mult = poolset->size_multiple; hg_size_t size_mult = poolset->size_multiple;
...@@ -296,20 +328,24 @@ hg_bulk_t margo_bulk_poolset_tryget( ...@@ -296,20 +328,24 @@ hg_bulk_t margo_bulk_poolset_tryget(
{ {
if (size <= this_size) if (size <= this_size)
{ {
bulk = margo_bulk_pool_tryget(poolset->pools[i]); margo_bulk_pool_tryget(poolset->pools[i], &b);
if (bulk != HG_BULK_NULL || any_flag == HG_FALSE) return bulk; if (b != HG_BULK_NULL || any_flag == HG_FALSE) {
*bulk = b;
return 0;
}
} }
this_size *= size_mult; this_size *= size_mult;
} }
return HG_BULK_NULL; return 0;
} }
void margo_bulk_poolset_release( int margo_bulk_poolset_release(
margo_bulk_poolset_t *poolset, margo_bulk_poolset_t poolset,
hg_bulk_t bulk) hg_bulk_t bulk)
{ {
if (bulk == HG_BULK_NULL) return; if (poolset == MARGO_BULK_POOLSET_NULL) return -1;
if (bulk == HG_BULK_NULL) return -1;
hg_size_t bulk_size = HG_Bulk_get_size(bulk); hg_size_t bulk_size = HG_Bulk_get_size(bulk);
hg_size_t i; hg_size_t i;
...@@ -320,10 +356,9 @@ void margo_bulk_poolset_release( ...@@ -320,10 +356,9 @@ void margo_bulk_poolset_release(
{ {
if (bulk_size == size) if (bulk_size == size)
{ {
margo_bulk_pool_release(poolset->pools[i], bulk); return margo_bulk_pool_release(poolset->pools[i], bulk);
return;
} }
else size *= size_mult; else size *= size_mult;
} }
assert(0); /* should not reach this ... */ return -1;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment