Commit ff26f60f authored by Rob Latham's avatar Rob Latham
Browse files

SSG updates

update Benvolio to new SSG api. also register a pre-finalize callback
parent 1f37064b
......@@ -14,6 +14,19 @@
#include <assert.h>
namespace tl = thallium;
typedef struct {
ssg_group_id_t g_id;
margo_instance_id m_id;
} finalize_args_t;
static void finalized_ssg_group_cb(void* data)
{
finalize_args_t *args = (finalize_args_t *)data;
ssg_group_unobserve(args->g_id);
ssg_finalize();
free(args);
}
#define MAX_PROTO_LEN 24
struct bv_client {
......@@ -80,32 +93,40 @@ bv_client_t bv_init(MPI_Comm comm, const char * cfg_file)
/* scalable read-and-broadcast of group information: only one process reads
* cfg from file system. These routines can all be called before ssg_init */
* cfg from file system. These routines can not be called before ssg_init,
* but that's ok because ssg_init no longer takes a margo id */
MPI_Comm_dup(comm, &dupcomm);
MPI_Comm_rank(dupcomm, &rank);
ssg_init();
uint64_t ssg_serialize_size;
ssg_group_id_t ssg_gid;
int provider_count=1;
ssg_group_id_t ssg_gids[1]; // not sure how many is "too many"
ssg_group_buf = (char *) malloc(1024); // how big do these get?
if (rank == 0) {
ret = ssg_group_id_load(cfg_file, &ssg_gid);
ret = ssg_group_id_load(cfg_file, &provider_count, ssg_gids);
if (ret != SSG_SUCCESS) fprintf(stderr, "ssg_group_id_load: %d\n", ret);
assert (ret == SSG_SUCCESS);
ssg_group_id_serialize(ssg_gid, &ssg_group_buf, &ssg_serialize_size);
ssg_group_id_serialize(ssg_gids[0], provider_count, &ssg_group_buf, &ssg_serialize_size);
}
MPI_Bcast(&ssg_serialize_size, 1, MPI_UINT64_T, 0, dupcomm);
MPI_Bcast(&ssg_serialize_size, provider_count, MPI_UINT64_T, 0, dupcomm);
MPI_Bcast(ssg_group_buf, ssg_serialize_size, MPI_CHAR, 0, dupcomm);
MPI_Comm_free(&dupcomm);
ssg_group_id_deserialize(ssg_group_buf, ssg_serialize_size, &ssg_gid);
addr_str = ssg_group_id_get_addr_str(ssg_gid);
ssg_group_id_deserialize(ssg_group_buf, ssg_serialize_size, &provider_count, ssg_gids);
addr_str = ssg_group_id_get_addr_str(ssg_gids[0], 0);
char * proto = get_proto_from_addr(addr_str);
if (proto == NULL) return NULL;
auto theEngine = new tl::engine(proto, THALLIUM_CLIENT_MODE);
bv_client *client = new bv_client(theEngine, ssg_gid);
bv_client *client = new bv_client(theEngine, ssg_gids[0]);
finalize_args_t *args = (finalize_args_t *)malloc(sizeof(finalize_args_t));
args->g_id = client->gid;
ssg_init(client->engine->get_margo_instance());
args->m_id = client->engine->get_margo_instance();
margo_push_prefinalize_callback(client->engine->get_margo_instance(), &finalized_ssg_group_cb, (void *)args);
ret = ssg_group_attach(client->gid);
ret = ssg_group_observe(client->engine->get_margo_instance(), client->gid);
if (ret != SSG_SUCCESS) {
fprintf(stderr, "ssg_group attach: (%d)\n", ret);
assert (ret == SSG_SUCCESS);
......@@ -113,7 +134,8 @@ bv_client_t bv_init(MPI_Comm comm, const char * cfg_file)
nr_targets = ssg_get_group_size(client->gid);
for (i=0; i< nr_targets; i++) {
tl::endpoint server(*(client->engine), ssg_get_addr(client->gid, i) );
tl::endpoint server(*(client->engine),
ssg_get_group_member_addr(client->gid, ssg_get_group_member_id_from_rank(client->gid, i) ));
client->targets.push_back(tl::provider_handle(server, 0xABC));
}
......@@ -139,12 +161,7 @@ int bv_finalize(bv_client_t client)
{
if (client == NULL) return 0;
ssg_group_detach(client->gid);
ssg_finalize();
/* cleaning up endpoints first because endpoints need the engine to be able
* to cleanly delete themselves */
client->targets.erase(client->targets.begin(), client->targets.end());
delete client->engine;
ssg_group_unobserve(client->gid);
delete client;
return 0;
}
......
......@@ -139,7 +139,7 @@ static void write_ult(void *_args)
hg_bulk_t local_bulk;
hg_uint32_t actual_count;
double bulk_time, io_time, total_io_time;
double bulk_time, io_time, total_io_time=0.0;
int write_count=0;
/* Adopting same aproach as 'bake-server.c' : we will create lots of ULTs,
* some of which might not end up doing anything */
......@@ -281,7 +281,7 @@ static void read_ult(void *_args)
hg_bulk_t local_bulk;
hg_uint32_t actual_count;
double bulk_time, io_time, total_io_time;
double bulk_time, io_time, total_io_time=0.0;
int read_count = 0;
/* Adopting same aproach as 'bake-server.c' : we will create lots of ULTs,
* some of which might not end up doing anything */
......@@ -422,6 +422,9 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
tl::mutex size_mutex;
tl::mutex fd_mutex;
/* handles to RPC objects so we can clean them up in destructor */
std::vector<tl::remote_procedure> rpcs;
// server will maintain a cache of open files
// std::map not great for LRU
// if we see a request for a file with a different 'flags' we will close and reopen
......@@ -646,14 +649,14 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
* available to this provider into pool objects of that size */
margo_bulk_pool_create(engine->get_margo_instance(), bufsize/xfersize, xfersize, HG_BULK_READWRITE, &mr_pool);
define("write", &bv_svc_provider::process_write, pool);
define("read", &bv_svc_provider::process_read, pool);
define("stat", &bv_svc_provider::getstats);
define("delete", &bv_svc_provider::del);
define("flush", &bv_svc_provider::flush);
define("statistics", &bv_svc_provider::statistics);
define("size", &bv_svc_provider::getsize);
define("declare", &bv_svc_provider::declare);
rpcs.push_back(define("write", &bv_svc_provider::process_write, pool));
rpcs.push_back(define("read", &bv_svc_provider::process_read, pool));
rpcs.push_back(define("stat", &bv_svc_provider::getstats));
rpcs.push_back(define("delete", &bv_svc_provider::del));
rpcs.push_back(define("flush", &bv_svc_provider::flush));
rpcs.push_back(define("statistics", &bv_svc_provider::statistics));
rpcs.push_back(define("size", &bv_svc_provider::getsize));
rpcs.push_back(define("declare", &bv_svc_provider::declare));
}
void dump_io_req(const std::string extra, tl::bulk &client_bulk, std::vector<off_t> &file_starts, std::vector<uint64_t> &file_sizes)
......@@ -669,6 +672,9 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
}
~bv_svc_provider() {
for(auto x : rpcs)
x.deregister();
wait_for_finalize();
margo_bulk_pool_destroy(mr_pool);
}
......
......@@ -13,10 +13,15 @@ static void finalize_abtio(void* data) {
abt_io_finalize(abtio);
}
typedef struct {
ssg_group_id_t g_id;
margo_instance_id m_id;
} finalize_args_t;
static void finalized_ssg_group_cb(void* data)
{
ssg_group_id_t gid = *((ssg_group_id_t*)data);
ssg_group_destroy(gid);
finalize_args_t *args = (finalize_args_t *)data;
ssg_group_destroy(args->g_id);
ssg_finalize();
}
void print_address(margo_instance_id mid)
......@@ -30,9 +35,9 @@ void print_address(margo_instance_id mid)
printf("Server running at address %s\n", addr_str);
}
void service_config_store(char *filename, ssg_group_id_t gid)
void service_config_store(char *filename, ssg_group_id_t gid, int count)
{
ssg_group_id_store(filename, gid);
ssg_group_id_store(filename, gid, count);
}
int main(int argc, char **argv)
{
......@@ -40,7 +45,7 @@ int main(int argc, char **argv)
abt_io_instance_id abtio;
bv_svc_provider_t bv_id;
int ret;
int rank;
int rank, nprocs;
ssg_group_id_t gid;
int c;
char *proto=NULL;
......@@ -52,6 +57,7 @@ int main(int argc, char **argv)
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
while ( (c = getopt(argc, argv, "p:b:s:t:f:x:" )) != -1) {
switch (c) {
......@@ -87,20 +93,23 @@ int main(int argc, char **argv)
abtio = abt_io_init(nthreads);
margo_push_finalize_callback(mid, finalize_abtio, (void*)abtio);
ret = ssg_init(mid);
ret = ssg_init();
ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
gid = ssg_group_create_mpi(BV_PROVIDER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);
gid = ssg_group_create_mpi(mid, BV_PROVIDER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL, NULL);
ASSERT(gid != SSG_GROUP_ID_INVALID, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
finalize_args_t args = {
.g_id = gid,
.m_id = mid
};
margo_push_prefinalize_callback(mid, &finalized_ssg_group_cb, (void*)&args);
if (rank == 0)
service_config_store(statefile, gid);
service_config_store(statefile, gid, nprocs);
ret = bv_svc_provider_register(mid, abtio, ABT_POOL_NULL, gid, bufsize, xfersize, &bv_id);
free(proto);
free(statefile);
margo_wait_for_finalize(mid);
margo_finalize(mid);
MPI_Finalize();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment