Commit 06074a8c authored by Philip Carns's avatar Philip Carns

Merge branch 'carns/dev-client-buffer-pool' into carns/dev-experimental

parents 1c773c9f fc76e458
......@@ -8,6 +8,7 @@
#include <assert.h>
#include <margo.h>
#include <margo-bulk-pool.h>
#include <bake-client.h>
#include "uthash.h"
#include "bake-rpc.h"
......@@ -15,6 +16,24 @@
#define BAKE_DEFAULT_EAGER_LIMIT 2048
struct bake_client_conf
{
unsigned intermed_enable; /* intermediate buffering, yes or no */
unsigned intermed_npools; /* number of preallocated buffer pools */
unsigned intermed_nbuffers_per_pool; /* buffers per buffer pool */
unsigned intermed_first_buffer_size; /* size of buffers in smallest pool */
unsigned intermed_multiplier; /* factor size increase per pool */
};
struct bake_client_conf g_default_bake_client_conf =
{
.intermed_enable = 1,
.intermed_npools = 4,
.intermed_nbuffers_per_pool = 32,
.intermed_first_buffer_size = 65536,
.intermed_multiplier = 4
};
/* Refers to a single Margo initialization, for now this is shared by
* all remote BAKE targets. In the future we probably need to support
* multiple in case we run atop more than one transport at a time.
......@@ -40,6 +59,9 @@ struct bake_client
hg_id_t bake_migrate_target_id;
uint64_t num_provider_handles;
struct bake_client_conf config;
margo_bulk_poolset_t poolset; /* intermediate buffers, if used */
};
struct bake_provider_handle {
......@@ -131,14 +153,32 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
hg_return_t hret;
bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
if(!c) return BAKE_ERR_ALLOCATION;
c->num_provider_handles = 0;
c->config = g_default_bake_client_conf;
int ret = bake_client_register(c, mid);
if(ret != BAKE_SUCCESS) return ret;
if(c->config.intermed_enable)
{
hret = margo_bulk_poolset_create(
c->mid,
c->config.intermed_npools,
c->config.intermed_nbuffers_per_pool,
c->config.intermed_first_buffer_size,
c->config.intermed_multiplier,
HG_BULK_READWRITE,
&(c->poolset));
if(hret != HG_SUCCESS)
{
return(BAKE_ERR_MERCURY);
}
}
*client = c;
return BAKE_SUCCESS;
}
......@@ -150,6 +190,7 @@ int bake_client_finalize(bake_client_t client)
"[BAKE] Warning: %llu provider handles not released before bake_client_finalize was called\n",
(long long unsigned int)client->num_provider_handles);
}
margo_bulk_poolset_destroy(client->poolset);
free(client);
return BAKE_SUCCESS;
}
......@@ -690,9 +731,14 @@ int bake_create_write_persist(
in.bulk_handle = HG_BULK_NULL;
bake_create_write_persist_out_t out;
int ret;
hg_size_t poolset_max_size = 0;
void *local_bulk_ptr;
size_t tmp_buf_size;
hg_uint32_t tmp_count;
if(buf_size <= provider->eager_limit)
return(bake_eager_create_write_persist(provider, bti, buf, buf_size, rid));
margo_bulk_poolset_get_max(provider->client->poolset, &poolset_max_size);
TIMERS_INITIALIZE("bulk_create","forward","end");
......@@ -702,11 +748,32 @@ int bake_create_write_persist(
in.region_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) {
ret = BAKE_ERR_MERCURY;
goto finish;
/* will this fit in an intermediate buffer (and is intermediate
* buffering enabled)?
*/
if(buf_size <= poolset_max_size)
{
ret = margo_bulk_poolset_get(provider->client->poolset, buf_size, &in.bulk_handle);
if(ret != 0)
{
ret = BAKE_ERR_MERCURY;
goto finish;
}
ret = margo_bulk_access(in.bulk_handle, 0,
buf_size, HG_BULK_READWRITE, 1,
&local_bulk_ptr, &tmp_buf_size, &tmp_count);
/* the above should never fail */
assert(ret == 0);
memcpy(local_bulk_ptr, buf, buf_size);
}
else
{
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) {
ret = BAKE_ERR_MERCURY;
goto finish;
}
}
TIMERS_END_STEP(0);
......@@ -744,7 +811,10 @@ finish:
*rid = out.rid;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
if(buf_size <= poolset_max_size)
margo_bulk_poolset_release(provider->client->poolset, in.bulk_handle);
else
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
TIMERS_END_STEP(2);
TIMERS_FINALIZE();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment