Commit 09a54186 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-xfer-ults' into 'master'

Using ULTs to do transfers in parallel

See merge request !9
parents 18390075 522acb8e
......@@ -165,6 +165,11 @@ int bake_write(
* Writes data into a previously created BAKE region like bake_write(),
* except the write is performed on behalf of some remote entity.
*
* Note that by passing NULL as remote_addr, the server will assume
* that the hg_bulk_t sent comes from the caller of this function, which
* is a way to do a bake_write for a buffer for which an hg_bulk_t was
* already created.
*
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write
......@@ -218,6 +223,13 @@ int bake_create_write_persist(
bake_region_id_t *rid);
/**
* Issues a bake_create_write_persist on behalf of a remote entity (remote_addr)
* that previously sent an hg_bulk_t.
*
* Note that by passing NULL as remote_addr, the server will assume
* that the hg_bulk_t sent comes from the caller of this function, which
* is a way to do a bake_create_write_persist for a buffer for which an
* hg_bulk_t was already created.
*
* @param [in] provider provider handle
* @param [in] bti BAKE target identifier
......@@ -304,6 +316,11 @@ int bake_read(
* Reads data from a previously persisted BAKE region like bake_read(),
* except the read is performed on behalf of some remote entity.
*
* Note that by passing NULL as remote_addr, the server will assume
* that the hg_bulk_t sent comes from the caller of this function, which
* is a way to do a bake_read for a buffer for which an hg_bulk_t was
* already created.
*
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write
......
......@@ -120,22 +120,39 @@ int bake_provider_list_storage_targets(
bake_target_id_t* targets);
/**
* @brief Sets the size of the intermediate buffer used for transfering data.
* @brief Sets the size and number of intermediate buffers used for transfering data.
* The size is set to 0 by default. A size of 0 indicates that RDMA will be
* done all at once and target the backend device directly without using an
* intermediate buffer.
*
* @param provider Bake provider
* @param target_id Target for which to change the buffer size.
* @param count Number of buffers to initialize.
* @param size Size of the buffer.
*
* @return 0 on success, -1 on failure
*/
int bake_provider_set_target_xfer_buffer_size(
int bake_provider_set_target_xfer_buffer(
bake_provider_t provider,
bake_target_id_t target_id,
size_t count,
size_t size);
/**
* @brief Sets the maximum number of ULTs that will be used to concurrently
* transfer data.
*
* @param provider Bake provider
* @param target_id Target for which to change the number of ULTs
* @param num_threads Number of ULTs
*
* @return 0 on success, -1 on failure
*/
int bake_provider_set_target_xfer_concurrency(
bake_provider_t provider,
bake_target_id_t target_id,
uint32_t num_threads);
#ifdef __cplusplus
}
#endif
......
......@@ -25,6 +25,8 @@ struct options
char **bake_pools;
char *host_file;
size_t buf_size;
size_t buf_count;
uint32_t num_threads;
mplex_mode_t mplex_mode;
};
......@@ -36,6 +38,8 @@ static void usage(int argc, char **argv)
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
fprintf(stderr, " [-b size] buffer size for writes on provider\n");
fprintf(stderr, " [-c count] count of buffers used for accesses on provider\n");
fprintf(stderr, " [-t threads] number of threads used for concurrency\n");
fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
return;
}
......@@ -47,7 +51,7 @@ static void parse_args(int argc, char **argv, struct options *opts)
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:m:b:")) != -1)
while((opt = getopt(argc, argv, "f:m:b:t:c:")) != -1)
{
switch(opt)
{
......@@ -67,6 +71,12 @@ static void parse_args(int argc, char **argv, struct options *opts)
case 'b':
opts->buf_size = atol(optarg);
break;
case 'c':
opts->buf_count = atol(optarg);
break;
case 't':
opts->num_threads = atol(optarg);
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
......@@ -174,7 +184,8 @@ int main(int argc, char **argv)
return(-1);
}
bake_provider_set_target_xfer_buffer_size(provider, tid, opts.buf_size);
bake_provider_set_target_xfer_buffer(provider, tid, opts.buf_count, opts.buf_size);
bake_provider_set_target_xfer_concurrency(provider, tid, opts.num_threads);
printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
}
......@@ -205,7 +216,8 @@ int main(int argc, char **argv)
return(-1);
}
bake_provider_set_target_xfer_buffer_size(provider, tid, opts.buf_size);
bake_provider_set_target_xfer_buffer(provider, tid, opts.buf_count, opts.buf_size);
bake_provider_set_target_xfer_concurrency(provider, tid, opts.num_threads);
printf("Provider 0 managing new target at multiplex id %d\n", 1);
}
......
This diff is collapsed.
......@@ -15,6 +15,7 @@ TESTS += \
tests/create-write-persist-remove.sh
EXTRA_DIST += \
tests/lorem.txt \
tests/basic.sh \
tests/copy-to-and-from.sh \
tests/copy-to-and-from-multi-providers.sh \
......
......@@ -86,6 +86,7 @@ int main(int argc, char *argv[])
margo_finalize(mid);
return(-1);
}
bake_provider_handle_set_eager_limit(bph, 0);
/* obtain info on the server's BAKE target */
ret = bake_probe(bph, 1, &bti, &num_targets);
......
......@@ -13,8 +13,7 @@
#include "bake-client.h"
#define ALLOC_BUF_SIZE 512
static char* read_input_file(const char* filename);
int main(int argc, char *argv[])
{
......@@ -29,20 +28,23 @@ int main(int argc, char *argv[])
uint64_t num_targets;
bake_target_id_t bti;
bake_region_id_t the_rid;
const char *test_str = "This is a test string for create-write-persist test.";
char *test_str = NULL;
char *buf;
uint64_t buf_size;
hg_return_t hret;
int ret;
if(argc != 3)
if(argc != 4)
{
fprintf(stderr, "Usage: create-write-persist-test <bake server addr> <mplex id>\n");
fprintf(stderr, "Usage: create-write-persist-test <input file> <bake server addr> <mplex id>\n");
fprintf(stderr, " Example: ./create-write-persist-test tcp://localhost:1234 1\n");
return(-1);
}
bake_svr_addr_str = argv[1];
mplex_id = atoi(argv[2]);
char* input_file = argv[1];
bake_svr_addr_str = argv[2];
mplex_id = atoi(argv[3]);
test_str = read_input_file(input_file);
/* initialize Margo using the transport portion of the server
* address (i.e., the part before the first : character if present)
......@@ -86,6 +88,7 @@ int main(int argc, char *argv[])
margo_finalize(mid);
return(-1);
}
bake_provider_handle_set_eager_limit(bph, 0);
/* obtain info on the server's BAKE target */
ret = bake_probe(bph, 1, &bti, &num_targets);
......@@ -99,27 +102,13 @@ int main(int argc, char *argv[])
return(-1);
}
buf = malloc(ALLOC_BUF_SIZE);
if(!buf)
{
bake_provider_handle_release(bph);
margo_addr_free(mid, svr_addr);
bake_client_finalize(bcl);
margo_finalize(mid);
return(-1);
}
/**** write phase ****/
/* copy the test string into a buffer and forward to the proxy server */
strcpy(buf, test_str);
buf_size = strlen(test_str) + 1;
ret = bake_create_write_persist(bph, bti, buf, buf_size, &the_rid);
ret = bake_create_write_persist(bph, bti, test_str, buf_size, &the_rid);
if(ret != 0)
{
fprintf(stderr, "Error: bake_create_write_persist()\n");
free(buf);
bake_provider_handle_release(bph);
margo_addr_free(mid, svr_addr);
bake_client_finalize(bcl);
......@@ -129,8 +118,8 @@ int main(int argc, char *argv[])
/**** read-back phase ****/
/* reset the buffer and read it back via BAKE */
memset(buf, 0, ALLOC_BUF_SIZE);
buf = malloc(buf_size);
memset(buf, 0, buf_size);
uint64_t bytes_read;
ret = bake_read(bph, the_rid, 0, buf, buf_size, &bytes_read);
......@@ -163,6 +152,7 @@ int main(int argc, char *argv[])
/**** cleanup ****/
free(buf);
free(test_str);
bake_provider_handle_release(bph);
margo_addr_free(mid, svr_addr);
bake_client_finalize(bcl);
......@@ -170,3 +160,17 @@ int main(int argc, char *argv[])
return(ret);
}
static char* read_input_file(const char* filename) {
FILE* fp = fopen(filename, "r");
if(fp == NULL) {
fprintf(stderr, "Could not open %s\n", filename);
exit(-1);
}
fseek(fp, 0, SEEK_END);
size_t sz = ftell(fp);
fseek(fp, 0, SEEK_SET);
char* buf = calloc(1,sz+1);
fread(buf, 1, sz, fp);
fclose(fp);
return buf;
}
......@@ -17,7 +17,7 @@ sleep 1
#####################
# run test
run_to 10 tests/create-write-persist-test $svr1 1
run_to 10 tests/create-write-persist-test $srcdir/tests/lorem.txt $svr1 1
if [ $? -ne 0 ]; then
wait
exit 1
......
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse ac laoreet dolor. Donec non est sed libero consectetur placerat. Suspendisse ultricies eros non nunc maximus, a fermentum diam placerat. Suspendisse quis elit luctus, accumsan enim vitae, facilisis nibh. Pellentesque et ante eget quam iaculis imperdiet id at nulla. Morbi faucibus purus nisi, id pulvinar tortor vulputate at. Suspendisse pharetra posuere velit, a varius metus. Donec hendrerit, leo sed fermentum dapibus, nisi nunc pretium sapien, nec scelerisque nulla ipsum quis nulla.
Nam lacinia nisi at erat sodales ullamcorper. Integer ullamcorper justo tincidunt tristique pharetra. Nunc pretium neque sit amet mi faucibus, a posuere sem fringilla. Phasellus dignissim, libero at tempus sagittis, quam nulla hendrerit urna, ac lacinia orci metus in justo. Nam venenatis molestie mauris vitae volutpat. Phasellus fermentum justo eu libero pellentesque, sit amet consectetur est ultricies. Proin mollis in neque ut blandit. Curabitur placerat, risus eu gravida maximus, enim mauris lobortis tortor, eu venenatis tortor nulla ac lorem. Morbi viverra vel augue et aliquet. Praesent malesuada ex ut leo rhoncus eleifend. Fusce sollicitudin ac nisl eu vestibulum. Aenean vulputate metus lacus, in lacinia massa eleifend ac. Sed neque urna, vestibulum id ultrices vitae, venenatis in lacus. Pellentesque nec scelerisque nisi, mattis condimentum velit. Integer blandit dui sed dui bibendum ornare. Vestibulum non magna molestie, euismod libero sed, pulvinar mauris.
Donec risus eros, accumsan eu elit fermentum, tempus scelerisque magna. Nam iaculis tincidunt justo. Vivamus at erat quis enim maximus lobortis. Ut eu tortor lorem. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi vulputate, est et euismod lacinia, sapien ipsum varius mi, facilisis condimentum sem sem nec nisi. Quisque vitae est tristique, mollis felis eget, rutrum lacus. Mauris porta augue vel quam eleifend, sit amet mattis erat eleifend. Nullam aliquet quis eros vitae convallis. Curabitur sed pharetra neque, in imperdiet diam. Donec urna lorem, accumsan vel urna eleifend, ullamcorper tincidunt urna. Quisque leo neque, sagittis a volutpat ac, molestie sed sapien. Vestibulum ligula nulla, hendrerit sit amet ultricies eu, ullamcorper in urna.
Ut vel ullamcorper orci. In neque ipsum, lacinia eu sapien maximus, suscipit maximus mi. Sed eu turpis lacus. Aliquam erat volutpat. Aliquam accumsan quam vel quam venenatis, vitae tempor sem luctus. Pellentesque lobortis sapien eget est pretium venenatis. Duis mattis vestibulum elit id tristique. Fusce sit amet iaculis turpis. Nulla id aliquet lectus, a tincidunt nunc. Mauris pretium ornare congue. Sed consequat augue lacus, in blandit libero tempor eu. Vestibulum consectetur ligula quis augue facilisis, vel imperdiet tellus aliquet. Aliquam feugiat scelerisque facilisis. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus.
Vestibulum massa enim, rutrum id dui eget, vehicula sagittis felis. In posuere sollicitudin sollicitudin. Duis faucibus, libero et varius pulvinar, massa nibh convallis justo, sed consectetur nisi nulla ac eros. In vitae lorem magna. Nullam ex odio, pretium vitae bibendum eu, sodales ac enim. Proin condimentum finibus vestibulum. Nunc neque tortor, tincidunt cursus faucibus nec, finibus et ante. Nullam erat massa, rutrum non velit sit amet, luctus convallis nibh. Ut vitae facilisis mauris. Duis vehicula, diam eget dignissim consequat, orci massa scelerisque nulla, sit amet porta turpis nisl venenatis nulla. Sed suscipit vulputate nibh sit amet ornare. Nam at lectus elementum enim tempus varius ac sed diam. Duis elementum vestibulum nibh, sit amet dignissim nulla efficitur sed.
......@@ -44,7 +44,7 @@ function test_start_servers ()
exit 1
fi
run_to ${maxtime} src/bake-server-daemon -f $TMPBASE/svr-$i.addr na+sm $TMPBASE/svr-$i.dat &
run_to ${maxtime} src/bake-server-daemon -b 128 -c 4 -t 4 -f $TMPBASE/svr-$i.addr na+sm $TMPBASE/svr-$i.dat &
if [ $? -ne 0 ]; then
# TODO: this doesn't actually work; can't check return code of
# something executing in background. We have to rely on the
......@@ -127,3 +127,4 @@ function test_start_servers_multi_targets ()
svr1=`cat $TMPBASE/svr-1.addr`
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment