GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit 467e024e authored by Shane Snyder's avatar Shane Snyder

add mobject-server-ctl utility

send stat and shutdown commands to server groups. clean command
not yet supported
parent ed698519
......@@ -81,6 +81,13 @@ src_server_mobject_server_daemon_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_mobject_server_daemon_LDADD = \
src/server/libmobject-server.la ${SERVER_LIBS}
src_server_mobject_server_ctl_SOURCES = \
src/server/mobject-server-ctl.c
src_server_mobject_server_ctl_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_mobject_server_ctl_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_mobject_server_ctl_LDADD = ${SERVER_LIBS}
bin_PROGRAMS += \
src/server/mobject-server-daemon
src/server/mobject-server-daemon \
src/server/mobject-server-ctl
......@@ -126,14 +126,25 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
return;
}
bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
struct mobject_server_context *srv_ctx = vargs->srv_ctx;
bake_provider_handle_t bake_ph = srv_ctx->bake_ph;
bake_target_id_t bti = srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
double wr_start, wr_end;
int ret;
ABT_mutex_lock(srv_ctx->stats_mutex);
wr_start = ABT_get_wtime();
if((srv_ctx->last_wr_start > 0) && (srv_ctx->last_wr_start >= srv_ctx->last_wr_end)) {
srv_ctx->total_seg_wr_duration += (wr_start - srv_ctx->last_wr_start);
}
srv_ctx->last_wr_start = wr_start;
ABT_mutex_unlock(srv_ctx->stats_mutex);
if(len > SMALL_REGION_THRESHOLD) {
ret = bake_create(bake_ph, bti, len, &rid);
if(ret != 0) {
......@@ -149,7 +160,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR bake_perror("bake_persist", ret);
}
insert_region_log_entry(vargs->srv_ctx, oid, offset, len, &rid);
insert_region_log_entry(srv_ctx, oid, offset, len, &rid);
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
......@@ -169,8 +180,21 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(vargs->srv_ctx, oid, offset, len, data);
insert_small_region_log_entry(srv_ctx, oid, offset, len, data);
}
ABT_mutex_lock(srv_ctx->stats_mutex);
wr_end = ABT_get_wtime();
srv_ctx->segs++;
srv_ctx->total_seg_size += len;
if(srv_ctx->last_wr_start > srv_ctx->last_wr_end) {
srv_ctx->total_seg_wr_duration += (wr_end - srv_ctx->last_wr_start);
}
else {
srv_ctx->total_seg_wr_duration += (wr_end - srv_ctx->last_wr_end);
}
srv_ctx->last_wr_end = wr_end;
ABT_mutex_unlock(srv_ctx->stats_mutex);
LEAVING;
}
......
......@@ -25,6 +25,7 @@ struct mobject_server_context
uint16_t provider_id;
ABT_pool pool;
ABT_mutex mutex;
ABT_mutex stats_mutex;
/* ssg-related data */
ssg_group_id_t gid;
/* bake-related data */
......@@ -39,6 +40,12 @@ struct mobject_server_context
/* other data */
uint32_t seq_id;
int ref_count;
/* stats/counters/timers and helpers */
uint32_t segs;
uint64_t total_seg_size;
double total_seg_wr_duration;
double last_wr_start;
double last_wr_end;
};
#ifdef __cplusplus
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <getopt.h>
#include <margo.h>
#include <mercury_proc_string.h>
#include <ssg.h>
static void usage(void)
{
fprintf(stderr, "Usage: mobject-server-ctl <cluster_file> <operation>\n");
fprintf(stderr, " <cluster_file> Mobject cluster ID file to issue commands to\n");
fprintf(stderr, " <operation> Mobject server operation to perform (stat, clean, shutdown)\n");
exit(-1);
}
static void parse_args(int argc, char **argv, char **server_gid_file, char **server_op)
{
if (argc != 3)
usage();
*server_gid_file = argv[1];
*server_op = argv[2];
return;
}
#define send_mobject_server_shutdown \
margo_shutdown_remote_instance
int send_mobject_server_clean(
margo_instance_id mid, hg_addr_t server_addr);
int send_mobject_server_stat(
margo_instance_id mid, hg_addr_t server_addr);
hg_id_t mobject_server_clean_rpc_id;
hg_id_t mobject_server_stat_rpc_id;
int main(int argc, char *argv[])
{
char *server_gid_file;
char *server_op;
char *server_addr_str;
ssg_group_id_t server_gid;
margo_instance_id mid;
char proto[24] = {0};
int group_size;
int (*send_op_ptr)(margo_instance_id, hg_addr_t);
int i;
hg_addr_t server_addr;
int ret;
parse_args(argc, argv, &server_gid_file, &server_op);
if (strcmp(server_op, "shutdown") == 0)
send_op_ptr = send_mobject_server_shutdown;
else if (strcmp(server_op, "clean") == 0)
send_op_ptr = send_mobject_server_clean;
else if (strcmp(server_op, "stat") == 0)
send_op_ptr = send_mobject_server_stat;
else
{
fprintf(stderr, "Error: Invalid server control operation: %s\n", server_op);
return -1;
}
ret = ssg_group_id_load(server_gid_file, &server_gid);
if (ret != 0)
{
fprintf(stderr, "Error: Unable to load mobject server group ID\n");
return -1;
}
server_addr_str = ssg_group_id_get_addr_str(server_gid);
if (!server_addr_str)
{
fprintf(stderr, "Error: Unable to get mobject server group address string\n");
return -1;
}
/* we only need to get the proto portion of the address to initialize margo */
for(i=0; i<24 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'; i++)
proto[i] = server_addr_str[i];
mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
free(server_addr_str);
return(-1);
}
/* register RPC handlers for controlling mobject server groups */
mobject_server_clean_rpc_id = MARGO_REGISTER(
mid, "mobject_server_clean", void, void, NULL);
mobject_server_stat_rpc_id = MARGO_REGISTER(
mid, "mobject_server_stat", void, void, NULL);
/* SSG initialization */
ret = ssg_init(mid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(mid);
free(server_addr_str);
return(-1);
}
/* attach to server group to get all server addresses */
ret = ssg_group_attach(server_gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to attach to SSG server group\n");
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
return(-1);
}
group_size = ssg_get_group_size(server_gid);
if (group_size == 0)
{
fprintf(stderr, "Error: Unable to determine SSG server group size\n");
ssg_group_detach(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
return(-1);
}
for (i = 0 ; i < group_size; i++)
{
server_addr = ssg_get_addr(server_gid, i);
if (server_addr == HG_ADDR_NULL)
{
fprintf(stderr, "Error: NULL address given for group member %d\n", i);
ssg_group_detach(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
return(-1);
}
ret = send_op_ptr(mid, server_addr);
}
ssg_group_detach(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
return 0;
}
int send_mobject_server_clean(
margo_instance_id mid, hg_addr_t server_addr)
{
hg_handle_t handle;
hg_return_t hret;
hret = margo_create(mid, server_addr, mobject_server_clean_rpc_id, &handle);
if (hret != HG_SUCCESS)
{
fprintf(stderr, "Error: Unable to create Mercury handle\n");
return -1;
}
/* XXX hard-coded to mobject server provider id of 1 */
hret = margo_provider_forward(1, handle, NULL);
if (hret != HG_SUCCESS)
{
margo_destroy(handle);
fprintf(stderr, "Error: Unable to forward server clean RPC\n");
return -1;
}
margo_destroy(handle);
return 0;
}
int send_mobject_server_stat(
margo_instance_id mid, hg_addr_t server_addr)
{
hg_handle_t handle;
hg_return_t hret;
hret = margo_create(mid, server_addr, mobject_server_stat_rpc_id, &handle);
if (hret != HG_SUCCESS)
{
fprintf(stderr, "Error: Unable to create Mercury handle\n");
return -1;
}
/* XXX hard-coded to mobject server provider id of 1 */
hret = margo_provider_forward(1, handle, NULL);
if (hret != HG_SUCCESS)
{
margo_destroy(handle);
fprintf(stderr, "Error: Unable to forward server stat RPC\n");
return -1;
}
margo_destroy(handle);
return 0;
}
......@@ -7,6 +7,7 @@
//#define FAKE_CPP_SERVER
#include <assert.h>
#include <unistd.h>
#include <mpi.h>
#include <abt.h>
#include <margo.h>
......@@ -31,6 +32,8 @@
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_server_clean_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_server_stat_ult)
static void mobject_finalize_cb(void* data);
......@@ -68,6 +71,7 @@ int mobject_provider_register(
srv_ctx->pool = pool;
srv_ctx->ref_count = 1;
ABT_mutex_create(&srv_ctx->mutex);
ABT_mutex_create(&srv_ctx->stats_mutex);
srv_ctx->gid = gid;
my_id = ssg_get_group_self_id(srv_ctx->gid);
......@@ -136,6 +140,7 @@ int mobject_provider_register(
hg_id_t rpc_id;
/* read/write op RPCs */
rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult,
provider_id, pool);
......@@ -146,6 +151,17 @@ int mobject_provider_register(
provider_id, pool);
margo_register_data(mid, rpc_id, srv_ctx, NULL);
/* server ctl RPCs */
rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_server_clean",
void, void, mobject_server_clean_ult,
provider_id, pool);
margo_register_data(mid, rpc_id, srv_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_server_stat",
void, void, mobject_server_stat_ult,
provider_id, pool);
margo_register_data(mid, rpc_id, srv_ctx, NULL);
margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx);
*provider = srv_ctx;
......@@ -257,6 +273,58 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
static hg_return_t mobject_server_clean_ult(hg_handle_t h)
{
hg_return_t ret;
/* XXX clean up mobject data */
fprintf(stderr, "ERROR: CLEANUP NOT SUPPORTED\n");
ret = margo_respond(h, NULL);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_server_clean_ult)
static hg_return_t mobject_server_stat_ult(hg_handle_t h)
{
hg_return_t ret;
int my_rank;
char my_hostname[256] = {0};
const struct hg_info* info = margo_get_info(h);
margo_instance_id mid = margo_hg_handle_get_instance(h);
struct mobject_server_context *srv_ctx = margo_registered_data(mid, info->id);
my_rank = ssg_get_group_self_id(srv_ctx->gid);
gethostname(my_hostname, sizeof(my_hostname));
ABT_mutex_lock(srv_ctx->stats_mutex);
fprintf(stderr,
"Server %d (host: %s):\n" \
"\tSegments allocated: %u\n" \
"\tTotal segment size: %lu bytes\n" \
"\tTotal segment write time: %.4lf s\ns" \
"\tTotal segment write b/w: %.4lf MiB/s\n", \
my_rank, my_hostname, srv_ctx->segs,
srv_ctx->total_seg_size, srv_ctx->total_seg_wr_duration,
(srv_ctx->total_seg_size / (1024.0 * 1024.0 ) / srv_ctx->total_seg_wr_duration));
ABT_mutex_unlock(srv_ctx->stats_mutex);
ret = margo_respond(h, NULL);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_server_stat_ult)
static void mobject_finalize_cb(void* data)
{
mobject_provider_t srv_ctx = (mobject_provider_t)data;
......@@ -264,6 +332,7 @@ static void mobject_finalize_cb(void* data)
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
bake_provider_handle_release(srv_ctx->bake_ph);
ABT_mutex_free(&srv_ctx->mutex);
ABT_mutex_free(&srv_ctx->stats_mutex);
free(srv_ctx);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment