...
 
Commits (4)
......@@ -13,7 +13,10 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/margo.h
include_HEADERS = \
include/margo.h \
include/margo-bulk-pool.h
TESTS_ENVIRONMENT =
EXTRA_DIST += \
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_BULK_POOL
#define __MARGO_BULK_POOL
#ifdef __cplusplus
extern "C" {
#endif
#include <mercury.h>
#include <mercury_types.h>
#include <mercury_bulk.h>
#include <mercury_macros.h>
#include <abt.h>
/* A collection of fixed-size, fixed-permission reusable bulk buffers */
struct margo_bulk_pool;
typedef struct margo_bulk_pool* margo_bulk_pool_t;
#define MARGO_BULK_POOL_NULL ((margo_bulk_pool_t)NULL)
/* A collection of margo_bulk_pool's, each of varying sizes */
struct margo_bulk_poolset;
typedef struct margo_bulk_poolset* margo_bulk_poolset_t;
#define MARGO_BULK_POOLSET_NULL ((margo_bulk_poolset_t)NULL)
/**
* @brief Creates a pool of buffers and bulk handles of a given size.
*
* @param[in] mid Margo instance with which to create the bulk handles
* @param[in] count Number of bulk handles contained in the pool
* @param[in] size Size of each bulk buffer
* @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE
* @param[out] pool Resulting pool of bulk handles
*
* @return HG_SUCCESS in case of success, or HG error codes in case of failure.
*/
hg_return_t margo_bulk_pool_create(
margo_instance_id mid,
hg_size_t count,
hg_size_t size,
hg_uint8_t flag,
margo_bulk_pool_t *pool);
/**
* @brief Destroys a margo_bulk_pool_t object, clearing all bulk handles and
* and freeing the buffers. The pool should not be in use (i.e. none of the
* internal buffers should be in use) when this call happens.
*
* @param pool margo_bulk_pool_t object to destroy.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_destroy(
margo_bulk_pool_t pool);
/**
* @brief Gets a free hg_bulk_t object from the pool. This function will block
* until a bulk object is available.
*
* @param[in] pool margo_bulk_pool_t object from which to take the hg_bulk_t.
* @param[out] bulk hg_bulk_t object (guaranteed not to be HG_BULK_NULL if the returned value is 0).
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_get(
margo_bulk_pool_t pool,
hg_bulk_t* bulk);
/**
* @brief Gets a free hg_bulk_t object from the pool. This function will return
* HG_BULK_NULL if no hg_bulk_t object is free at the moment of the call.
*
* @param pool margo_bulk_pool_t object from which to take the hg_bulk_t.
* @param bulk resulting bulk handle (can be HG_BULK_NULL).
*
* @return 0 in case of success, -1 in case of failure..
*/
int margo_bulk_pool_tryget(
margo_bulk_pool_t pool,
hg_bulk_t* bulk);
/**
* @brief Puts a bulk handle back in the pool. Note that the function is expecting
* the bulk handle to have been taken from the pool in the first place. The function
* will return -1 if the bulk was not associated with this pool to begin with.
*
* @param pool margo_bulk_pool_t object to which to return the bulk handle.
* @param bulk Bulk handle to release.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_pool_release(
margo_bulk_pool_t pool,
hg_bulk_t bulk);
/**
* @brief Creates a poolset. A poolset is a set of pools with the same number of
* buffers in each pool, and buffer sizes increasing exponentially with the pool number.
*
* @param[in] mid Margo instance
* @param[in] npools Number of pools in the poolset.
* @param[in] nbufs Number of buffers in each pool.
* @param[in] first_size Size of buffers in the first pool.
* @param[in] size_multiple Factor by which to multiply the size of the previous pool to get the size of the next.
* @param[in] flag HG_BULK_READ_ONLY, HG_BULK_WRITE_ONLY, or HG_BULK_READWRITE.
* @param[out] poolset Resulting poolset.
*
* @return HG_SUCCESS of other HG error codes.
*/
hg_return_t margo_bulk_poolset_create(
margo_instance_id mid,
hg_size_t npools,
hg_size_t nbufs,
hg_size_t first_size,
hg_size_t size_multiple,
hg_uint8_t flag,
margo_bulk_poolset_t *poolset);
/**
* @brief Destroy a poolset. The poolset must not be in use when this function is called.
*
* @param poolset Poolset to destroy.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_poolset_destroy(
margo_bulk_poolset_t poolset);
/**
* @brief Gets a bulk handle from the pool with the minimum size required to satisfy
* the provided size. May block until the pool has a bulk handle available.
*
* @param poolset Poolset from which to get the bulk handle.
* @param size Size of the buffer needed.
* @param bulk Resulting bulk handle.
*
* @return 0 in case of success, -1 in case of failure.
*/
int margo_bulk_poolset_get(
margo_bulk_poolset_t poolset,
hg_size_t size,
hg_bulk_t* bulk);
/**
* @brief Try getting a bulk handle from the poolset. If any_flag is HG_TRUE,
* this function will search in pools of increasingly larger buffers until it
* finds one (or return HG_BULK_NULL if it doesn't). If any_flag is HG_FALSE,
* this function will only search in the pool with the minimum required size.
*
* @param poolset Poolset in which to get a handle.
* @param size Size required.
* @param any_flag Whether to look in increasingly larger pools or not.
* @param bulk Resulting bulk handle.
*
* @return 0 in case of success (bulk = HG_BULK_NULL is also considered success),
* -1 in case of failure.
*/
int margo_bulk_poolset_tryget(
margo_bulk_poolset_t poolset,
hg_size_t size,
hg_bool_t any_flag,
hg_bulk_t* bulk);
/**
* @brief Puts a bulk handle back in its pool.
*
* @param poolset Poolset.
* @param bulk Bulk to release.
*
* @return 0 in case of success, -1 in case of success.
*/
int margo_bulk_poolset_release(
margo_bulk_poolset_t poolset,
hg_bulk_t bulk);
#ifdef __cplusplus
}
#endif
#endif /* __MARGO_BULK_POOL */
......@@ -11,12 +11,6 @@
extern "C" {
#endif
/* This is to prevent the user from usin HG_Register_data
* and HG_Registered_data, which are replaced with
* margo_register_data and margo_registered_data
* respecively.
*/
#include <mercury.h>
#include <mercury_types.h>
#include <mercury_bulk.h>
......@@ -27,6 +21,11 @@ extern "C" {
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)
/* This is to prevent the user from usin HG_Register_data
* and HG_Registered_data, which are replaced with
* margo_register_data and margo_registered_data
* respecively.
*/
#undef MERCURY_REGISTER
struct margo_instance;
......
src_libmargo_la_SOURCES += \
src/margo.c \
src/margo-timer.h \
src/margo-timer.c
src/margo-timer.c \
src/margo-bulk-pool.c
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
#include "margo.h"
#include "margo-bulk-pool.h"
struct margo_bulk_pool
{
margo_instance_id mid;
void *buf;
hg_bulk_t *bulks;
hg_size_t count;
hg_size_t size;
hg_size_t num_free;
hg_uint8_t flag;
ABT_mutex mutex;
ABT_cond cond;
};
struct margo_bulk_poolset
{
margo_bulk_pool_t *pools;
hg_size_t npools;
hg_size_t nbufs;
hg_size_t first_size;
hg_size_t size_multiple;
};
hg_return_t margo_bulk_pool_create(
margo_instance_id mid,
hg_size_t count,
hg_size_t size,
hg_uint8_t flag,
margo_bulk_pool_t *pool)
{
int ret;
hg_return_t hret;
margo_bulk_pool_t p;
hg_size_t i;
p = malloc(sizeof(*p));
if (p == NULL)
{
hret = HG_NOMEM_ERROR;
goto err;
}
p->buf = calloc(count, size);
if (p->buf == NULL)
{
hret = HG_NOMEM_ERROR;
goto err;
}
p->mid = mid;
p->count = count;
p->size = size;
p->num_free = count;
p->flag = flag;
p->bulks = calloc(count, sizeof(*p->bulks));
if (p->bulks == NULL)
{
hret = HG_NOMEM_ERROR;
goto err;
}
for (i = 0; i < count; i++)
{
unsigned char *tmp = p->buf;
void *bulk_buf = tmp + i*size;
hret = margo_bulk_create(mid, 1, &bulk_buf, &size, flag,
&p->bulks[i]);
if (hret != HG_SUCCESS)
{
p->bulks[i] = HG_BULK_NULL;
goto err;
}
}
ret = ABT_mutex_create(&p->mutex);
if (ret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
goto err;
}
ret = ABT_cond_create(&p->cond);
if (ret != ABT_SUCCESS)
{
ABT_mutex_free(&p->mutex);
hret = HG_OTHER_ERROR;
goto err;
}
*pool = p;
return HG_SUCCESS;
err:
if (p != NULL)
{
if (p->bulks != NULL)
{
for (i = 0; i < p->count && p->bulks[i] != HG_BULK_NULL; i++)
margo_bulk_free(p->bulks[i]);
free(p->bulks);
}
if(p->buf != NULL) free(p->buf);
free(p);
}
*pool = NULL;
return hret;
}
int margo_bulk_pool_destroy(
margo_bulk_pool_t pool)
{
hg_size_t i;
if(pool->size != pool->count)
return -1;
ABT_cond_free(&pool->cond);
ABT_mutex_free(&pool->mutex);
for (i = 0; i < pool->count; i++) {
margo_bulk_free(pool->bulks[i]);
}
free(pool->bulks);
free(pool->buf);
free(pool);
return 0;
}
static inline hg_bulk_t margo_bp_get(margo_bulk_pool_t pool)
{
return pool->bulks[pool->count - pool->num_free--];
}
static inline hg_bulk_t margo_bp_tryget(margo_bulk_pool_t pool)
{
return pool->num_free == 0 ? HG_BULK_NULL : margo_bp_get(pool);
}
int margo_bulk_pool_get(
margo_bulk_pool_t pool,
hg_bulk_t* bulk)
{
if(pool == MARGO_BULK_POOL_NULL)
return -1;
hg_bulk_t b;
ABT_mutex_lock(pool->mutex);
while (pool->num_free == 0)
ABT_cond_wait(pool->cond, pool->mutex);
assert(pool->num_free > 0);
b = margo_bp_get(pool);
ABT_mutex_unlock(pool->mutex);
*bulk = b;
return 0;
}
int margo_bulk_pool_tryget(
margo_bulk_pool_t pool,
hg_bulk_t* bulk)
{
if(pool == MARGO_BULK_POOL_NULL)
return -1;
hg_bulk_t b;
ABT_mutex_lock(pool->mutex);
b = margo_bp_tryget(pool);
ABT_mutex_unlock(pool->mutex);
*bulk = b;
return 0;
}
static inline void margo_bp_release(margo_bulk_pool_t pool, hg_bulk_t bulk)
{
pool->bulks[pool->count - ++pool->num_free] = bulk;
}
int margo_bulk_pool_release(
margo_bulk_pool_t pool,
hg_bulk_t bulk)
{
if (pool == MARGO_BULK_POOL_NULL) return -1;
if (bulk == HG_BULK_NULL) return -1;
if (pool->size != margo_bulk_get_size(bulk)) return -1;
void* buf_ptr = NULL;
hg_size_t buf_size = 0;
hg_uint32_t actual_count = 0;
hg_return_t hret = margo_bulk_access(bulk, 0, pool->size, pool->flag, 1,
&buf_ptr, &buf_size, &actual_count);
if(hret != HG_SUCCESS) return -1;
if(buf_ptr < pool->buf || buf_ptr + buf_size > pool->buf + pool->size*pool->count) return -1;
ABT_mutex_lock(pool->mutex);
margo_bp_release(pool, bulk);
ABT_cond_signal(pool->cond);
ABT_mutex_unlock(pool->mutex);
return 0;
}
hg_return_t margo_bulk_poolset_create(
margo_instance_id mid,
hg_size_t npools,
hg_size_t nbufs,
hg_size_t first_size,
hg_size_t size_multiple,
hg_uint8_t flag,
margo_bulk_poolset_t *poolset)
{
margo_bulk_poolset_t s;
hg_size_t i = 0, j, size;
hg_return_t hret;
assert(npools > 0 && nbufs > 0 && first_size > 0 && size_multiple > 1);
s = calloc(1, sizeof(*s));
if (s == NULL)
{
hret = HG_NOMEM_ERROR;
goto err;
}
s->pools = calloc(npools, sizeof(*s->pools));
if (s->pools == NULL)
{
hret = HG_NOMEM_ERROR;
goto err;
}
s->npools = npools;
s->nbufs = nbufs;
s->first_size = first_size;
s->size_multiple = size_multiple;
size = first_size;
for (i = 0; i < npools; i++)
{
hret = margo_bulk_pool_create(mid, nbufs, size, flag, &s->pools[i]);
if (hret != HG_SUCCESS) goto err;
size *= size_multiple;
}
*poolset = s;
return HG_SUCCESS;
err:
if (s)
{
if (s->pools)
{
for (j = 0; j < i; j++)
margo_bulk_pool_destroy(s->pools[j]);
free(s->pools);
}
free(s);
}
*poolset = NULL;
return hret;
}
int margo_bulk_poolset_destroy(
margo_bulk_poolset_t poolset)
{
hg_size_t i;
int ret = 0;
for (i = 0; i < poolset->npools; i++) {
int r = margo_bulk_pool_destroy(poolset->pools[i]);
if(ret == 0 && r != 0) ret = r;
}
free(poolset->pools);
free(poolset);
return ret;
}
int margo_bulk_poolset_get(
margo_bulk_poolset_t poolset,
hg_size_t size,
hg_bulk_t* bulk)
{
if(poolset == MARGO_BULK_POOLSET_NULL) return -1;
hg_size_t i;
hg_size_t this_size = poolset->first_size;
hg_size_t size_mult = poolset->size_multiple;
for (i = 0; i < poolset->npools; i++)
{
if (size <= this_size)
return margo_bulk_pool_get(poolset->pools[i], bulk);
this_size *= size_mult;
}
return -1;
}
int margo_bulk_poolset_tryget(
margo_bulk_poolset_t poolset,
hg_size_t size,
hg_bool_t any_flag,
hg_bulk_t* bulk)
{
if(poolset == MARGO_BULK_POOLSET_NULL) return -1;
hg_bulk_t b = HG_BULK_NULL;
hg_size_t i;
hg_size_t this_size = poolset->first_size;
hg_size_t size_mult = poolset->size_multiple;
for (i = 0; i < poolset->npools; i++)
{
if (size <= this_size)
{
margo_bulk_pool_tryget(poolset->pools[i], &b);
if (b != HG_BULK_NULL || any_flag == HG_FALSE) {
*bulk = b;
return 0;
}
}
this_size *= size_mult;
}
return 0;
}
int margo_bulk_poolset_release(
margo_bulk_poolset_t poolset,
hg_bulk_t bulk)
{
if (poolset == MARGO_BULK_POOLSET_NULL) return -1;
if (bulk == HG_BULK_NULL) return -1;
hg_size_t bulk_size = HG_Bulk_get_size(bulk);
hg_size_t i;
hg_size_t size = poolset->first_size;
hg_size_t size_mult = poolset->size_multiple;
for (i = 0; i < poolset->npools; i++)
{
if (bulk_size == size)
{
return margo_bulk_pool_release(poolset->pools[i], bulk);
}
else size *= size_mult;
}
return -1;
}