Commit 2c2c2ae5 authored by Matthieu Dorier's avatar Matthieu Dorier

added intermediate buffer

parent e45c249e
......@@ -119,6 +119,23 @@ int bake_provider_list_storage_targets(
bake_provider_t provider,
bake_target_id_t* targets);
/**
* @brief Sets the size of the intermediate buffer used for transfering data.
* The size is set to 0 by default. A size of 0 indicates that RDMA will be
* done all at once and target the backend device directly without using an
* intermediate buffer.
*
* @param provider Bake provider
* @param target_id Target for which to change the buffer size.
* @param size Size of the buffer.
*
* @return 0 on success, -1 on failure
*/
int bake_provider_set_target_xfer_buffer_size(
bake_provider_t provider,
bake_target_id_t target_id,
size_t size);
#ifdef __cplusplus
}
#endif
......
......@@ -24,6 +24,7 @@ struct options
unsigned num_pools;
char **bake_pools;
char *host_file;
size_t buf_size;
mplex_mode_t mplex_mode;
};
......@@ -33,7 +34,8 @@ static void usage(int argc, char **argv)
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " bake_pool is the path to the BAKE pool\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, " [-b size] buffer size for writes on provider\n");
fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
return;
}
......@@ -45,7 +47,7 @@ static void parse_args(int argc, char **argv, struct options *opts)
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:m:")) != -1)
while((opt = getopt(argc, argv, "f:m:b:")) != -1)
{
switch(opt)
{
......@@ -62,6 +64,9 @@ static void parse_args(int argc, char **argv, struct options *opts)
exit(EXIT_FAILURE);
}
break;
case 'b':
opts->buf_size = atol(optarg);
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
......@@ -169,6 +174,8 @@ int main(int argc, char **argv)
return(-1);
}
bake_provider_set_target_xfer_buffer_size(provider, tid, opts.buf_size);
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
}
......@@ -198,6 +205,8 @@ int main(int argc, char **argv)
return(-1);
}
bake_provider_set_target_xfer_buffer_size(provider, tid, opts.buf_size);
printf("Provider 0 managing new target at multiplex id %d\n", 1);
}
}
......
......@@ -60,6 +60,7 @@ typedef struct
bake_target_id_t target_id;
char* root;
char* filename;
size_t xfer_buffer_size;
UT_hash_handle hh;
} bake_pmem_entry_t;
......@@ -263,6 +264,7 @@ int bake_provider_add_storage_target(
bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));
new_entry->root = NULL;
new_entry->filename = NULL;
new_entry->xfer_buffer_size = 0;
char* tmp = strrchr(target_name, '/');
new_entry->filename = strdup(tmp);
......@@ -391,6 +393,24 @@ int bake_provider_list_storage_targets(
return BAKE_SUCCESS;
}
int bake_provider_set_target_xfer_buffer_size(
bake_provider_t provider,
bake_target_id_t target_id,
size_t size)
{
int ret = BAKE_SUCCESS;
ABT_rwlock_rdlock(provider->lock);
bake_pmem_entry_t* entry = find_pmem_entry(provider, target_id);
if(entry == NULL) {
ret = -1;
goto finish;
}
entry->xfer_buffer_size = size;
finish:
ABT_rwlock_unlock(provider->lock);
return ret;
}
/* service a remote RPC that creates a BAKE region */
static void bake_create_ult(hg_handle_t handle)
{
......@@ -489,7 +509,9 @@ static void bake_write_ult(hg_handle_t handle)
in.bulk_handle = HG_BULK_NULL;
hg_return_t hret;
hg_addr_t src_addr = HG_ADDR_NULL;
char* buffer;
char* memory;
char* buffer = NULL;
size_t xfer_buf_size = 0;
hg_bulk_t bulk_handle = HG_BULK_NULL;
const struct hg_info *hgi;
margo_instance_id mid;
......@@ -535,20 +557,20 @@ static void bake_write_ult(hg_handle_t handle)
}
#endif
TIMERS_END_STEP(0);
/* find enclosing pool and target id */
PMEMobjpool *pool = pmemobj_pool_by_oid(prid->oid);
PMEMoid root_oid = pmemobj_root(pool, 0);
bake_root_t* root = pmemobj_direct(root_oid);
buffer = region->data + in.region_offset;
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
/* find the pmem entry */
bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, root->pool_id);
if(entry == NULL) {
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
xfer_buf_size = entry->xfer_buffer_size;
TIMERS_END_STEP(1);
memory = region->data + in.region_offset;
if(in.remote_addr_str)
{
......@@ -566,12 +588,66 @@ static void bake_write_ult(hg_handle_t handle)
src_addr = hgi->addr;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset, bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
TIMERS_END_STEP(0);
if(xfer_buf_size == 0 || xfer_buf_size > in.bulk_size) { // direct transfer to device in one go
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
TIMERS_END_STEP(1);
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset, bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
} else { // multiple transfers using intermediate buffer
buffer = calloc(xfer_buf_size,1);
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&buffer), &xfer_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
TIMERS_END_STEP(1);
size_t remaining_size = in.bulk_size;
size_t current_offset = 0;
size_t current_size = xfer_buf_size;
while(remaining_size != 0) {
current_size = remaining_size < xfer_buf_size ? remaining_size : xfer_buf_size;
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset+current_offset, bulk_handle, 0, current_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
memcpy(memory+current_offset, buffer, current_size);
remaining_size -= current_size;
current_offset += current_size;
}
}
TIMERS_END_STEP(2);
......@@ -584,6 +660,7 @@ finish:
margo_respond(handle, &out);
TIMERS_END_STEP(3);
TIMERS_FINALIZE();
free(buffer);
if(in.remote_addr_str)
margo_addr_free(mid, src_addr);
margo_bulk_free(bulk_handle);
......@@ -741,6 +818,8 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
in.remote_addr_str = NULL;
hg_addr_t src_addr = HG_ADDR_NULL;
char* buffer = NULL;
char* memory = NULL;
size_t xfer_buf_size = 0;
hg_bulk_t bulk_handle = HG_BULK_NULL;
const struct hg_info *hgi = NULL;
margo_instance_id mid;
......@@ -779,6 +858,9 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
xfer_buf_size = entry->xfer_buffer_size;
#ifdef USE_SIZECHECK_HEADERS
size_t content_size = in.bulk_size + sizeof(uint64_t);
#else
......@@ -809,18 +891,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
#ifdef USE_SIZECHECK_HEADERS
region->size = in.bulk_size;
#endif
buffer = region->data;
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
TIMERS_END_STEP(2);
memory = region->data;
if(in.remote_addr_str)
{
......@@ -837,12 +908,65 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
/* no proxy write, use the source of this request */
src_addr = hgi->addr;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset, bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
if(xfer_buf_size == 0 || xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
TIMERS_END_STEP(2);
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset, bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
} else {
buffer = calloc(xfer_buf_size,1);
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&buffer), &xfer_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
TIMERS_END_STEP(2);
size_t remaining_size = in.bulk_size;
size_t current_offset = 0;
size_t current_size = xfer_buf_size;
while(remaining_size != 0) {
current_size = remaining_size < xfer_buf_size ? remaining_size : xfer_buf_size;
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset+current_offset, bulk_handle, 0, current_size);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
memcpy(memory+current_offset, buffer, current_size);
current_offset += current_size;
remaining_size -= current_size;
}
}
TIMERS_END_STEP(3);
......@@ -863,6 +987,7 @@ finish:
if(in.remote_addr_str) {
margo_addr_free(mid, src_addr);
}
free(buffer);
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_destroy(handle);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment