From 467e024ef064a5d57b9f001f78d383dfa19cabbb Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Thu, 11 Apr 2019 20:59:48 -0500 Subject: [PATCH] add mobject-server-ctl utility send stat and shutdown commands to server groups. clean command not yet supported --- src/Makefile.subdir | 9 +- src/server/core/core-write-op.cpp | 32 ++++- src/server/mobject-server-context.h | 7 + src/server/mobject-server-ctl.c | 209 ++++++++++++++++++++++++++++ src/server/mobject-server.c | 69 +++++++++ 5 files changed, 321 insertions(+), 5 deletions(-) create mode 100644 src/server/mobject-server-ctl.c diff --git a/src/Makefile.subdir b/src/Makefile.subdir index ba4c337..061d688 100644 --- a/src/Makefile.subdir +++ b/src/Makefile.subdir @@ -81,6 +81,13 @@ src_server_mobject_server_daemon_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS} src_server_mobject_server_daemon_LDADD = \ src/server/libmobject-server.la ${SERVER_LIBS} +src_server_mobject_server_ctl_SOURCES = \ + src/server/mobject-server-ctl.c +src_server_mobject_server_ctl_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS} +src_server_mobject_server_ctl_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS} +src_server_mobject_server_ctl_LDADD = ${SERVER_LIBS} + bin_PROGRAMS += \ - src/server/mobject-server-daemon + src/server/mobject-server-daemon \ + src/server/mobject-server-ctl diff --git a/src/server/core/core-write-op.cpp b/src/server/core/core-write-op.cpp index 5a2f1e0..5dc36f4 100644 --- a/src/server/core/core-write-op.cpp +++ b/src/server/core/core-write-op.cpp @@ -126,14 +126,25 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) return; } - bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph; - bake_target_id_t bti = vargs->srv_ctx->bake_tid; + struct mobject_server_context *srv_ctx = vargs->srv_ctx; + bake_provider_handle_t bake_ph = srv_ctx->bake_ph; + bake_target_id_t bti = srv_ctx->bake_tid; bake_region_id_t rid; hg_bulk_t remote_bulk = vargs->bulk_handle; const char* remote_addr_str = vargs->client_addr_str; hg_addr_t remote_addr = vargs->client_addr; + double wr_start, wr_end; + int ret; + ABT_mutex_lock(srv_ctx->stats_mutex); + wr_start = ABT_get_wtime(); + if((srv_ctx->last_wr_start > 0) && (srv_ctx->last_wr_start >= srv_ctx->last_wr_end)) { + srv_ctx->total_seg_wr_duration += (wr_start - srv_ctx->last_wr_start); + } + srv_ctx->last_wr_start = wr_start; + ABT_mutex_unlock(srv_ctx->stats_mutex); + if(len > SMALL_REGION_THRESHOLD) { ret = bake_create(bake_ph, bti, len, &rid); if(ret != 0) { @@ -149,7 +160,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ERROR bake_perror("bake_persist", ret); } - insert_region_log_entry(vargs->srv_ctx, oid, offset, len, &rid); + insert_region_log_entry(srv_ctx, oid, offset, len, &rid); } else { margo_instance_id mid = vargs->srv_ctx->mid; char data[SMALL_REGION_THRESHOLD]; @@ -169,8 +180,21 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret); } - insert_small_region_log_entry(vargs->srv_ctx, oid, offset, len, data); + insert_small_region_log_entry(srv_ctx, oid, offset, len, data); + } + + ABT_mutex_lock(srv_ctx->stats_mutex); + wr_end = ABT_get_wtime(); + srv_ctx->segs++; + srv_ctx->total_seg_size += len; + if(srv_ctx->last_wr_start > srv_ctx->last_wr_end) { + srv_ctx->total_seg_wr_duration += (wr_end - srv_ctx->last_wr_start); + } + else { + srv_ctx->total_seg_wr_duration += (wr_end - srv_ctx->last_wr_end); } + srv_ctx->last_wr_end = wr_end; + ABT_mutex_unlock(srv_ctx->stats_mutex); LEAVING; } diff --git a/src/server/mobject-server-context.h b/src/server/mobject-server-context.h index d66903a..9af9587 100644 --- a/src/server/mobject-server-context.h +++ b/src/server/mobject-server-context.h @@ -25,6 +25,7 @@ struct mobject_server_context uint16_t provider_id; ABT_pool pool; ABT_mutex mutex; + ABT_mutex stats_mutex; /* ssg-related data */ ssg_group_id_t gid; /* bake-related data */ @@ -39,6 +40,12 @@ struct mobject_server_context /* other data */ uint32_t seq_id; int ref_count; + /* stats/counters/timers and helpers */ + uint32_t segs; + uint64_t total_seg_size; + double total_seg_wr_duration; + double last_wr_start; + double last_wr_end; }; #ifdef __cplusplus diff --git a/src/server/mobject-server-ctl.c b/src/server/mobject-server-ctl.c new file mode 100644 index 0000000..2153e11 --- /dev/null +++ b/src/server/mobject-server-ctl.c @@ -0,0 +1,209 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include + +static void usage(void) +{ + fprintf(stderr, "Usage: mobject-server-ctl \n"); + fprintf(stderr, " Mobject cluster ID file to issue commands to\n"); + fprintf(stderr, " Mobject server operation to perform (stat, clean, shutdown)\n"); + exit(-1); +} + +static void parse_args(int argc, char **argv, char **server_gid_file, char **server_op) +{ + if (argc != 3) + usage(); + *server_gid_file = argv[1]; + *server_op = argv[2]; + + return; +} + +#define send_mobject_server_shutdown \ + margo_shutdown_remote_instance +int send_mobject_server_clean( + margo_instance_id mid, hg_addr_t server_addr); +int send_mobject_server_stat( + margo_instance_id mid, hg_addr_t server_addr); + +hg_id_t mobject_server_clean_rpc_id; +hg_id_t mobject_server_stat_rpc_id; + +int main(int argc, char *argv[]) +{ + char *server_gid_file; + char *server_op; + char *server_addr_str; + ssg_group_id_t server_gid; + margo_instance_id mid; + char proto[24] = {0}; + int group_size; + int (*send_op_ptr)(margo_instance_id, hg_addr_t); + int i; + hg_addr_t server_addr; + int ret; + + parse_args(argc, argv, &server_gid_file, &server_op); + + if (strcmp(server_op, "shutdown") == 0) + send_op_ptr = send_mobject_server_shutdown; + else if (strcmp(server_op, "clean") == 0) + send_op_ptr = send_mobject_server_clean; + else if (strcmp(server_op, "stat") == 0) + send_op_ptr = send_mobject_server_stat; + else + { + fprintf(stderr, "Error: Invalid server control operation: %s\n", server_op); + return -1; + } + + ret = ssg_group_id_load(server_gid_file, &server_gid); + if (ret != 0) + { + fprintf(stderr, "Error: Unable to load mobject server group ID\n"); + return -1; + } + + server_addr_str = ssg_group_id_get_addr_str(server_gid); + if (!server_addr_str) + { + fprintf(stderr, "Error: Unable to get mobject server group address string\n"); + return -1; + } + + /* we only need to get the proto portion of the address to initialize margo */ + for(i=0; i<24 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'; i++) + proto[i] = server_addr_str[i]; + + mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1); + if (mid == MARGO_INSTANCE_NULL) + { + fprintf(stderr, "Error: Unable to initialize margo\n"); + free(server_addr_str); + return(-1); + } + + /* register RPC handlers for controlling mobject server groups */ + mobject_server_clean_rpc_id = MARGO_REGISTER( + mid, "mobject_server_clean", void, void, NULL); + mobject_server_stat_rpc_id = MARGO_REGISTER( + mid, "mobject_server_stat", void, void, NULL); + + /* SSG initialization */ + ret = ssg_init(mid); + if (ret != SSG_SUCCESS) + { + fprintf(stderr, "Error: Unable to initialize SSG\n"); + margo_finalize(mid); + free(server_addr_str); + return(-1); + } + + /* attach to server group to get all server addresses */ + ret = ssg_group_attach(server_gid); + if (ret != SSG_SUCCESS) + { + fprintf(stderr, "Error: Unable to attach to SSG server group\n"); + ssg_finalize(); + margo_finalize(mid); + free(server_addr_str); + return(-1); + + } + + group_size = ssg_get_group_size(server_gid); + if (group_size == 0) + { + fprintf(stderr, "Error: Unable to determine SSG server group size\n"); + ssg_group_detach(server_gid); + ssg_finalize(); + margo_finalize(mid); + free(server_addr_str); + return(-1); + + } + + for (i = 0 ; i < group_size; i++) + { + server_addr = ssg_get_addr(server_gid, i); + if (server_addr == HG_ADDR_NULL) + { + fprintf(stderr, "Error: NULL address given for group member %d\n", i); + ssg_group_detach(server_gid); + ssg_finalize(); + margo_finalize(mid); + free(server_addr_str); + return(-1); + } + + ret = send_op_ptr(mid, server_addr); + } + + ssg_group_detach(server_gid); + ssg_finalize(); + margo_finalize(mid); + free(server_addr_str); + + return 0; +} + +int send_mobject_server_clean( + margo_instance_id mid, hg_addr_t server_addr) +{ + hg_handle_t handle; + hg_return_t hret; + + hret = margo_create(mid, server_addr, mobject_server_clean_rpc_id, &handle); + if (hret != HG_SUCCESS) + { + fprintf(stderr, "Error: Unable to create Mercury handle\n"); + return -1; + } + + /* XXX hard-coded to mobject server provider id of 1 */ + hret = margo_provider_forward(1, handle, NULL); + if (hret != HG_SUCCESS) + { + margo_destroy(handle); + fprintf(stderr, "Error: Unable to forward server clean RPC\n"); + return -1; + } + + margo_destroy(handle); + return 0; +} + +int send_mobject_server_stat( + margo_instance_id mid, hg_addr_t server_addr) +{ + hg_handle_t handle; + hg_return_t hret; + + hret = margo_create(mid, server_addr, mobject_server_stat_rpc_id, &handle); + if (hret != HG_SUCCESS) + { + fprintf(stderr, "Error: Unable to create Mercury handle\n"); + return -1; + } + + /* XXX hard-coded to mobject server provider id of 1 */ + hret = margo_provider_forward(1, handle, NULL); + if (hret != HG_SUCCESS) + { + margo_destroy(handle); + fprintf(stderr, "Error: Unable to forward server stat RPC\n"); + return -1; + } + + margo_destroy(handle); + return 0; +} diff --git a/src/server/mobject-server.c b/src/server/mobject-server.c index 75fd0e9..79a716d 100644 --- a/src/server/mobject-server.c +++ b/src/server/mobject-server.c @@ -7,6 +7,7 @@ //#define FAKE_CPP_SERVER #include +#include #include #include #include @@ -31,6 +32,8 @@ DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult) DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult) +DECLARE_MARGO_RPC_HANDLER(mobject_server_clean_ult) +DECLARE_MARGO_RPC_HANDLER(mobject_server_stat_ult) static void mobject_finalize_cb(void* data); @@ -68,6 +71,7 @@ int mobject_provider_register( srv_ctx->pool = pool; srv_ctx->ref_count = 1; ABT_mutex_create(&srv_ctx->mutex); + ABT_mutex_create(&srv_ctx->stats_mutex); srv_ctx->gid = gid; my_id = ssg_get_group_self_id(srv_ctx->gid); @@ -136,6 +140,7 @@ int mobject_provider_register( hg_id_t rpc_id; + /* read/write op RPCs */ rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, mobject_write_op_ult, provider_id, pool); @@ -146,6 +151,17 @@ int mobject_provider_register( provider_id, pool); margo_register_data(mid, rpc_id, srv_ctx, NULL); + /* server ctl RPCs */ + rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_server_clean", + void, void, mobject_server_clean_ult, + provider_id, pool); + margo_register_data(mid, rpc_id, srv_ctx, NULL); + + rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_server_stat", + void, void, mobject_server_stat_ult, + provider_id, pool); + margo_register_data(mid, rpc_id, srv_ctx, NULL); + margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx); *provider = srv_ctx; @@ -257,6 +273,58 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) } DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult) +static hg_return_t mobject_server_clean_ult(hg_handle_t h) +{ + hg_return_t ret; + + /* XXX clean up mobject data */ + fprintf(stderr, "ERROR: CLEANUP NOT SUPPORTED\n"); + + ret = margo_respond(h, NULL); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + + return ret; +} +DEFINE_MARGO_RPC_HANDLER(mobject_server_clean_ult) + +static hg_return_t mobject_server_stat_ult(hg_handle_t h) +{ + hg_return_t ret; + int my_rank; + char my_hostname[256] = {0}; + + const struct hg_info* info = margo_get_info(h); + margo_instance_id mid = margo_hg_handle_get_instance(h); + + struct mobject_server_context *srv_ctx = margo_registered_data(mid, info->id); + my_rank = ssg_get_group_self_id(srv_ctx->gid); + gethostname(my_hostname, sizeof(my_hostname)); + + ABT_mutex_lock(srv_ctx->stats_mutex); + fprintf(stderr, + "Server %d (host: %s):\n" \ + "\tSegments allocated: %u\n" \ + "\tTotal segment size: %lu bytes\n" \ + "\tTotal segment write time: %.4lf s\ns" \ + "\tTotal segment write b/w: %.4lf MiB/s\n", \ + my_rank, my_hostname, srv_ctx->segs, + srv_ctx->total_seg_size, srv_ctx->total_seg_wr_duration, + (srv_ctx->total_seg_size / (1024.0 * 1024.0 ) / srv_ctx->total_seg_wr_duration)); + ABT_mutex_unlock(srv_ctx->stats_mutex); + + ret = margo_respond(h, NULL); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + + return ret; +} +DEFINE_MARGO_RPC_HANDLER(mobject_server_stat_ult) + static void mobject_finalize_cb(void* data) { mobject_provider_t srv_ctx = (mobject_provider_t)data; @@ -264,6 +332,7 @@ static void mobject_finalize_cb(void* data) sdskv_provider_handle_release(srv_ctx->sdskv_ph); bake_provider_handle_release(srv_ctx->bake_ph); ABT_mutex_free(&srv_ctx->mutex); + ABT_mutex_free(&srv_ctx->stats_mutex); free(srv_ctx); } -- 2.26.2