Commit 7de544b0 authored by Rob Latham's avatar Rob Latham
Browse files

command line arguments for server

parent 2349a18f
......@@ -10,7 +10,7 @@ extern "C"
typedef struct bv_svc_provider * bv_svc_provider_t;
int bv_svc_provider_register(margo_instance_id mid,
abt_io_instance_id abtio, ABT_pool pool, ssg_group_id_t gid, bv_svc_provider_t *bv_id);
abt_io_instance_id abtio, ABT_pool pool, ssg_group_id_t gid, int bufsize, bv_svc_provider_t *bv_id);
#ifdef __cplusplus
}
......
......@@ -60,7 +60,6 @@ typedef enum {
/* note: alters addr_str */
char * get_proto_from_addr(char *addr_str)
{
int i;
char *p = addr_str;
char *q = strchr(addr_str, ':');
if (q == NULL) return NULL;
......
......@@ -26,9 +26,6 @@
#include "lustre-utils.h"
namespace tl = thallium;
#define BUFSIZE 1024
struct bv_svc_provider : public tl::provider<bv_svc_provider>
{
tl::engine * engine;
......@@ -38,7 +35,8 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ssize_t blocksize=1024*8; // todo: some kind of general distribution function perhaps
std::map<std::string, int> filetable; // filename to file id mapping
// probably needs to be larger and registered with mercury somehow
char buffer[BUFSIZE]; // intermediate buffer for read/write operations
char *buffer; // intermediate buffer for read/write operations
unsigned int bufsize;
struct io_stats stats;
tl::mutex op_mutex;
tl::mutex stats_mutex;
......@@ -97,7 +95,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
// ?? is there a way to get all of them?
std::vector<std::pair<void *, std::size_t>>segments(1);
segments[0].first = (void *)(&(buffer[0]));
segments[0].second = BUFSIZE;
segments[0].second = bufsize;
tl::endpoint ep = req.get_endpoint();
tl::bulk local = engine->expose(segments, tl::bulk_mode::read_write);
......@@ -112,7 +110,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
// ceiling division: we'll do as many rounds of I/O as necessary given
// the intermediate buffer. Note the one exception: if we run out of
// file description, we'll bail out early
size_t ntimes = 1 + (client_bulk.size() -1)/BUFSIZE;
size_t ntimes = 1 + (client_bulk.size() -1)/bufsize;
for (unsigned int i = 0; i< ntimes; i++) {
// the '>>' operator moves bytes from one bulk descriptor to the
......@@ -206,13 +204,13 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
size_t xfered = 0, file_xfer=0, client_xfer=0, nbytes;
size_t fileblk_cursor=0, buf_cursor=0, client_cursor=0;
// ceiling division. Will bail out early if we exhaust file description
size_t ntimes = 1 + (client_bulk.size() - 1)/BUFSIZE;
size_t ntimes = 1 + (client_bulk.size() - 1)/bufsize;
for (unsigned int i = 0; i< ntimes; i++) {
file_xfer = 0;
double pread_time = ABT_get_wtime();
while (file_idx < file_starts.size() && file_xfer < BUFSIZE) {
nbytes = MIN(file_sizes[file_idx]-fileblk_cursor, BUFSIZE-buf_cursor);
while (file_idx < file_starts.size() && file_xfer < bufsize) {
nbytes = MIN(file_sizes[file_idx]-fileblk_cursor, bufsize-buf_cursor);
file_xfer += abt_io_pread(abt_id, fd, buffer+buf_cursor, nbytes, file_starts[file_idx]+fileblk_cursor);
{
std::lock_guard<tl::mutex> guard(stats_mutex);
......@@ -228,7 +226,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
else
fileblk_cursor +=nbytes;
if (buf_cursor + nbytes < BUFSIZE)
if (buf_cursor + nbytes < bufsize)
buf_cursor += nbytes;
else
buf_cursor=0;
......@@ -309,9 +307,11 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
bv_svc_provider(tl::engine *e, abt_io_instance_id abtio,
ssg_group_id_t gid, uint16_t provider_id, tl::pool &pool)
ssg_group_id_t gid, uint16_t provider_id, int bufsize, tl::pool &pool)
: tl::provider<bv_svc_provider>(*e, provider_id), engine(e), gid(gid), pool(pool), abt_id(abtio) {
buffer = new char[bufsize];
this->bufsize = bufsize;
define("write", &bv_svc_provider::process_write, pool);
define("read", &bv_svc_provider::process_read, pool);
define("stat", &bv_svc_provider::getstats);
......@@ -335,6 +335,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
~bv_svc_provider() {
wait_for_finalize();
delete[] buffer;
}
};
......@@ -342,12 +343,13 @@ int bv_svc_provider_register(margo_instance_id mid,
abt_io_instance_id abtio,
ABT_pool pool,
ssg_group_id_t gid,
int bufsize,
bv_svc_provider_t *bv_id)
{
uint16_t provider_id = 0xABC;
auto thallium_engine = new tl::engine(mid);
auto thallium_pool = tl::pool(pool);
auto bv_provider = new bv_svc_provider(thallium_engine, abtio, gid, provider_id, thallium_pool);
auto bv_provider = new bv_svc_provider(thallium_engine, abtio, gid, provider_id, bufsize, thallium_pool);
*bv_id = bv_provider;
return 0;
}
......
......@@ -2,6 +2,7 @@
#include <abt-io.h>
#include <ssg.h>
#include <ssg-mpi.h>
#include <getopt.h>
#include "bv-provider.h"
......@@ -41,17 +42,45 @@ int main(int argc, char **argv)
int ret;
int rank;
ssg_group_id_t gid;
int c;
char *proto=NULL;
char *statefile=NULL;
int bufsize=1024;
int nthreads=4;
int nstreams=4;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, 1);
while ( (c = getopt(argc, argv, "p:b:s:t:f:" )) != -1) {
switch (c) {
case 'p':
proto = strdup(optarg);
break;
case 'b':
bufsize = atoi(optarg);
break;
case 's':
nstreams = atoi(optarg);
break;
case 't':
nthreads = atoi(optarg);
break;
case 'f':
statefile = strdup(optarg);
break;
default:
printf("usage: %s [-p address] [-b buffer_size] [-t threads] [-s statefile]\n", argv[0]);
exit(-1);
}
}
mid = margo_init(proto, MARGO_SERVER_MODE, 0, nstreams);
margo_enable_remote_shutdown(mid);
/* set this is "number of backing threads" to whatever is best for
* the underlying backing store. : TODO: should make it command line
* configurable */
abtio = abt_io_init(2);
* the underlying backing store. */
abtio = abt_io_init(nthreads);
margo_push_finalize_callback(mid, finalize_abtio, (void*)abtio);
ret = ssg_init(mid);
......@@ -61,9 +90,11 @@ int main(int argc, char **argv)
margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);
if (rank == 0)
service_config_store(argv[2], gid);
service_config_store(statefile, gid);
ret = bv_svc_provider_register(mid, abtio, ABT_POOL_NULL, gid, &bv_id);
ret = bv_svc_provider_register(mid, abtio, ABT_POOL_NULL, gid, bufsize, &bv_id);
free(proto);
free(statefile);
margo_wait_for_finalize(mid);
margo_finalize(mid);
......
......@@ -10,8 +10,8 @@ NCLIENT=1
OUT_FILE=`basename $1`
# Start server
echo "mpiexec -np ${NSERVER} src/provider/bv-server tcp ${OUT_FILE}.svc &"
mpiexec -np {NSERVER} src/provider/bv-server tcp ${OUT_FILE}.svc &
echo "mpiexec -np ${NSERVER} src/provider/bv-server -p tcp -b 2048 -f ${OUT_FILE}.svc &"
mpiexec -np {NSERVER} src/provider/bv-server -p tcp -b 2048 -f ${OUT_FILE}.svc &
echo "SERVER_PID=$!"
SERVER_PID=$!
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment