Commit 2349a18f authored by Rob Latham's avatar Rob Latham
Browse files

code review from Matthieu

- watch out for lifetime of bulk handles
- renamed to 'io_access' to avoid collision with access() function
- no need for 'struct whatever' in C++
- calloc of a struct with c++ objects is risky.  implement a
  construcor for bv_client so we can use 'new' like a real C++
  programmer
parent 1a96aeda
......@@ -11,7 +11,7 @@
//
// instead of a 'target' in the access, we will have a collection of accesses indexed by target
//
struct access {
struct io_access {
// thallium bulk described by a vector of (address, len) 'pair'
// build that as we process the memory and file lists so we don't have to
// loop through the description lists twice
......
......@@ -16,7 +16,7 @@ void compute_striping_info(int stripe_size, int stripe_count, int *server_count,
* figure out where a file request goes, but also trying to match each piece of
* the memory description with each part of the file description.
* remember:
* - vector<struct access> is "number of targets" big
* - vector<io_access> is "number of targets" big
* - we generate offset, length, and pointer vectors for each target in the my_reqs vector.
*
* iovec_count: [IN] how many memory requests
......@@ -34,4 +34,4 @@ void compute_striping_info(int stripe_size, int stripe_count, int *server_count,
int calc_requests(int mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
int file_count, const off_t *file_starts, const uint64_t *file_sizes,
int stripe_size, int targets_used,
std::vector<struct access> & my_reqs);
std::vector<io_access> & my_reqs);
......@@ -17,9 +17,8 @@ namespace tl = thallium;
#define MAX_PROTO_LEN 24
struct bv_client {
tl::engine *engine;
tl::engine *engine = nullptr;
std::vector<tl::provider_handle> targets;
char proto[MAX_PROTO_LEN];
tl::remote_procedure read_op;
tl::remote_procedure write_op;
tl::remote_procedure stat_op;
......@@ -30,11 +29,27 @@ struct bv_client {
ssg_group_id_t gid; // attaches to this group; not a member
io_stats statistics;
ssize_t blocksize=1024*4; // TODO: make more dynamic
int stripe_size;
int stripe_count;
int targets_used;
/* used to think the server would know something about how it wanted to
* distribute data, but now that's probably best handled on a per-file
* basis. Pick some reasonable defaults, but don't talk to server. */
ssize_t blocksize=1024*4;
int stripe_size = 4096;
int stripe_count = 1;
int targets_used = 1;
bv_client(tl::engine *e, ssg_group_id_t group) :
engine(e),
read_op(engine->define("read")),
write_op(engine->define("write")),
stat_op(engine->define("stat")),
delete_op(engine->define("delete")),
flush_op(engine->define("flush")),
statistics_op(engine->define("statistics")),
size_op(engine->define("size")),
gid(group) {}
};
typedef enum {
......@@ -42,25 +57,22 @@ typedef enum {
BV_WRITE
} io_kind;
static int set_proto_from_addr(bv_client_t client, char *addr_str)
/* note: alters addr_str */
char * get_proto_from_addr(char *addr_str)
{
int i;
for (i=0; i< MAX_PROTO_LEN; i++) {
if (addr_str[i] == ':') {
client->proto[i] = '\0';
break;
}
client->proto[i] = addr_str[i];
}
if (client->proto[i] != '\0') return -1;
return 0;
char *p = addr_str;
char *q = strchr(addr_str, ':');
if (q == NULL) return NULL;
*q = '\0';
return p;
}
bv_client_t bv_init(MPI_Comm comm, const char * cfg_file)
{
char *addr_str;
int rank;
int ret, i, nr_targets;
struct bv_client * client = (struct bv_client *)calloc(1,sizeof(*client));
char *ssg_group_buf;
double init_time = ABT_get_wtime();
MPI_Comm dupcomm;
......@@ -71,21 +83,25 @@ bv_client_t bv_init(MPI_Comm comm, const char * cfg_file)
MPI_Comm_dup(comm, &dupcomm);
MPI_Comm_rank(dupcomm, &rank);
uint64_t ssg_serialize_size;
ssg_group_id_t ssg_gid;
ssg_group_buf = (char *) malloc(1024); // how big do these get?
if (rank == 0) {
ret = ssg_group_id_load(cfg_file, &(client->gid));
ret = ssg_group_id_load(cfg_file, &ssg_gid);
assert (ret == SSG_SUCCESS);
ssg_group_id_serialize(client->gid, &ssg_group_buf, &ssg_serialize_size);
ssg_group_id_serialize(ssg_gid, &ssg_group_buf, &ssg_serialize_size);
}
MPI_Bcast(&ssg_serialize_size, 1, MPI_UINT64_T, 0, dupcomm);
MPI_Bcast(ssg_group_buf, ssg_serialize_size, MPI_CHAR, 0, dupcomm);
MPI_Comm_free(&dupcomm);
ssg_group_id_deserialize(ssg_group_buf, ssg_serialize_size, &(client->gid));
addr_str = ssg_group_id_get_addr_str(client->gid);
if (set_proto_from_addr(client, addr_str) != 0) return NULL;
ssg_group_id_deserialize(ssg_group_buf, ssg_serialize_size, &ssg_gid);
addr_str = ssg_group_id_get_addr_str(ssg_gid);
char * proto = get_proto_from_addr(addr_str);
if (proto == NULL) return NULL;
auto theEngine = new tl::engine(proto, THALLIUM_CLIENT_MODE);
bv_client *client = new bv_client(theEngine, ssg_gid);
/* This is very c-like. probably needs some C++ RAII thing here */
client->engine = new tl::engine(client->proto, THALLIUM_CLIENT_MODE);
ssg_init(client->engine->get_margo_instance());
ret = ssg_group_attach(client->gid);
......@@ -100,22 +116,6 @@ bv_client_t bv_init(MPI_Comm comm, const char * cfg_file)
client->targets.push_back(tl::provider_handle(server, 0xABC));
}
client->read_op = client->engine->define("read");
client->write_op = client->engine->define("write");
client->stat_op = client->engine->define("stat");
client->delete_op = client->engine->define("delete");
client->flush_op = client->engine->define("flush");
client->statistics_op = client->engine->define("statistics");
client->size_op = client->engine->define("size");
/* used to think the server would know something about how it wanted to
* distribute data, but now that's probably best handled on a per-file
* basis. Pick some reasonable defaults, but don't talk to server. */
client->blocksize = 4096;
client->stripe_size= 4096;
client->stripe_count=1;
free(addr_str);
free(ssg_group_buf);
......@@ -164,7 +164,7 @@ static size_t bv_io(bv_client_t client, const char *filename, io_kind op,
const int64_t mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
const int64_t file_count, const off_t file_starts[], const uint64_t file_sizes[])
{
std::vector<struct access> my_reqs(client->targets.size());
std::vector<io_access> my_reqs(client->targets.size());
size_t bytes_moved = 0;
/* How expensive is this? do we need to move this out of the I/O path?
......@@ -191,11 +191,12 @@ static size_t bv_io(bv_client_t client, const char *filename, io_kind op,
}
std::vector<tl::async_response> responses;
std::vector<tl::bulk> my_bulks;
for (unsigned int i=0; i< client->targets.size(); i++) {
if (my_reqs[i].mem_vec.size() == 0) continue; // no work for this target
myBulk = client->engine->expose(my_reqs[i].mem_vec, mode);
responses.push_back(rpc.on(client->targets[i]).async(myBulk, std::string(filename), my_reqs[i].offset, my_reqs[i].len));
my_bulks.push_back(client->engine->expose(my_reqs[i].mem_vec, mode));
responses.push_back(rpc.on(client->targets[i]).async(my_bulks[i], std::string(filename), my_reqs[i].offset, my_reqs[i].len));
}
......@@ -243,7 +244,7 @@ ssize_t bv_read(bv_client_t client, const char *filename,
int bv_stat(bv_client_t client, const char *filename, struct bv_stats *stats)
{
struct file_stats response = client->stat_op.on(client->targets[0])(std::string(filename));
file_stats response = client->stat_op.on(client->targets[0])(std::string(filename));
stats->blocksize = response.blocksize;
stats->stripe_size = response.stripe_size;
......
......@@ -128,7 +128,7 @@ int calc_aggregator(off_t off, uint64_t * len,
int calc_requests(int mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
int file_count, const off_t *file_starts, const uint64_t *file_sizes,
int stripe_size, int targets_used,
std::vector<struct access> & my_reqs)
std::vector<io_access> & my_reqs)
{
// which element of the memory/file description list we are working on now
int memblk=0, fileblk=0;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment