mobject-server-daemon.c 8.85 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <unistd.h>
8
#include <getopt.h>
9 10 11
#include <mpi.h>
#include <margo.h>
#include <ssg.h>
12
#include <ssg-mpi.h>
13 14
#include <bake-client.h>
#include <bake-server.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
15 16
#include <sdskv-client.h>
#include <sdskv-server.h>
17 18 19

#include "mobject-server.h"

20 21
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

22 23 24 25 26
typedef struct {
    bake_client_t          client;
    bake_provider_handle_t provider_handle;
} bake_client_data;

Matthieu Dorier's avatar
Matthieu Dorier committed
27 28 29 30 31
typedef struct {
    sdskv_client_t         client;
    sdskv_provider_handle_t provider_handle;
} sdskv_client_data;

32
typedef struct {
33 34
    char*           listen_addr;
    char*           cluster_file;
35
    int             handler_xstreams;
36
    char *          pool_path;
37
    size_t          pool_size;
38 39
    char *          kv_path;
    sdskv_db_type_t kv_backend;
40 41 42 43
} mobject_server_options;

static void usage(void)
{
44
    fprintf(stderr, "Usage: mobject-server-daemon [OPTIONS] <listen_addr> <cluster_file>\n");
45 46
    fprintf(stderr, "  <listen_addr>    the Mercury address to listen on\n");
    fprintf(stderr, "  <cluster_file>   the file to write mobject cluster connect info to\n");
47
    fprintf(stderr, "  OPTIONS:\n");
48 49 50 51 52
    fprintf(stderr, "    --handler-xstreams Number of xtreams to user for RPC handlers [default: 4]\n"); 
    fprintf(stderr, "    --pool-path        Bake pool location [default: /dev/shm]\n");
    fprintf(stderr, "    --pool-size        Bake pool size for each server [default: 1GiB]\n");
    fprintf(stderr, "    --kv-backend       SDSKV backend to use (mapdb, leveldb, berkeleydb) [default: stdmap]\n");
    fprintf(stderr, "    --kv-path          SDSKV storage location [default: /dev/shm]\n");
53 54 55 56 57 58
    exit(-1);
}

static void parse_args(int argc, char **argv, mobject_server_options *opts)
{
    int c;
59
    char *short_options = "x:p:s:a:k:";
60
    struct option long_options[] = {
61
        {"handler-xstreams", required_argument, 0, 'x'},
62 63 64
        {"pool-path", required_argument, 0, 'p'},
        {"pool-size", required_argument, 0, 's'},
        {"kv-path", required_argument, 0, 'a'},
65
        {"kv-backend", required_argument, 0, 'k'},
66 67 68 69 70 71
    };

    while ((c = getopt_long(argc, argv, short_options, long_options, NULL)) != -1)
    {
        switch (c)
        {
72 73 74
            case 'x':
                opts->handler_xstreams = atoi(optarg);
                break;
75 76 77 78 79 80 81 82 83
            case 'p':
                opts->pool_path = optarg;
                break;
            case 's':
                opts->pool_size = strtoul(optarg, NULL, 0);
                break;
            case 'a':
                opts->kv_path = optarg;
                break;
84 85 86 87 88 89 90 91 92 93
            case 'k':
                if(strcmp(optarg, "mapdb") == 0)
                    opts->kv_backend = KVDB_MAP;
                else if(strcmp(optarg, "leveldb") == 0)
                    opts->kv_backend = KVDB_LEVELDB;
                else if(strcmp(optarg, "berkeleydb") == 0)
                    opts->kv_backend = KVDB_BERKELEYDB;
                else
                    usage();
                break;
94 95 96 97 98 99 100 101 102 103 104 105 106
            default:
                usage();
        }
    }

    if ((argc - optind) != 2)
        usage();
    opts->listen_addr = argv[optind++];
    opts->cluster_file = argv[optind++];

    return;
}

107
static void finalize_ssg_cb(void* data);
108
static void finalize_bake_client_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
109
static void finalize_sdskv_client_cb(void* data);
110
static void finalized_ssg_group_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
111

112 113
int main(int argc, char *argv[])
{
114
    mobject_server_options server_opts = {
115
        .handler_xstreams = 4, /* default to 4 rpc handler xstreams */
116
        .pool_path = "/dev/shm", /* default bake pool path */
117
        .pool_size = 1*1024*1024*1024, /* 1 GiB default */
118 119
        .kv_path = "/dev/shm", /* default sdskv path */
        .kv_backend = KVDB_MAP, /* in-memory map default */
120
    }; 
121 122 123
    margo_instance_id mid;
    int ret;

124
    parse_args(argc, argv, &server_opts);
125

126
    /* MPI required for SSG bootstrapping */
127
    MPI_Init(&argc, &argv);
128 129
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
130

131
    /* Margo initialization */
132 133
    mid = margo_init(server_opts.listen_addr, MARGO_SERVER_MODE, 0, 
        server_opts.handler_xstreams);
134 135 136 137 138
    if (mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
        return -1;
    }
139
    margo_enable_remote_shutdown(mid);
140

141 142
    /* SSG initialization */
    ret = ssg_init(mid);
143
    ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
144 145 146 147 148 149 150

    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);

    /* Bake provider initialization */
    /* XXX mplex id and target name should be taken from config file */
    uint8_t bake_mplex_id = 1;
151
    char bake_target_name[128];
152
    sprintf(bake_target_name, "%s/mobject.%d.dat", server_opts.pool_path, rank);
153 154 155
    /* create the bake target if it does not exist */
    if(-1 == access(bake_target_name, F_OK)) {
        // XXX creating a pool of 10MB - this should come from a config file
156
        ret = bake_makepool(bake_target_name, server_opts.pool_size, 0664);
Rob Latham's avatar
Rob Latham committed
157
        if (ret != 0) bake_perror("bake_makepool", ret);
158 159
        ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
    }
160 161
    bake_provider_t bake_prov;
    bake_target_id_t bake_tid;
162
    ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
Rob Latham's avatar
Rob Latham committed
163
    if (ret != 0) bake_perror("bake_provider_register", ret);
164 165
    ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
    ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
Rob Latham's avatar
Rob Latham committed
166
    if (ret != 0) bake_perror("bake_provider_add_storage_target", ret);
167 168
    ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
            bake_target_name, ret);
Shane Snyder's avatar
Shane Snyder committed
169
    bake_provider_set_conf(bake_prov, "pipeline_enabled", "1");
170 171 172

    /* Bake provider handle initialization from self addr */
    bake_client_data bake_clt_data;
173
    ret = bake_client_init(mid, &(bake_clt_data.client));
Rob Latham's avatar
Rob Latham committed
174
    if (ret != 0) bake_perror("bake_client_init", ret);
175 176
    ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
    ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
Rob Latham's avatar
Rob Latham committed
177
    if (ret != 0) bake_perror("bake_provider_handle_create", ret);
178
    ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
179 180
    margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);

Matthieu Dorier's avatar
Matthieu Dorier committed
181
    /* SDSKV provider initialization */
182
    uint8_t sdskv_mplex_id = 2;
Matthieu Dorier's avatar
Matthieu Dorier committed
183
    sdskv_provider_t sdskv_prov;
184 185 186
    ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
    ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

187
    ret = mobject_sdskv_provider_setup(sdskv_prov, server_opts.kv_path, server_opts.kv_backend);
Matthieu Dorier's avatar
Matthieu Dorier committed
188 189 190

    /* SDSKV provider handle initialization from self addr */
    sdskv_client_data sdskv_clt_data;
191 192 193 194
    ret = sdskv_client_init(mid, &(sdskv_clt_data.client));
    ASSERT(ret == 0, "sdskv_client_init() failed (ret = %d)\n", ret);
    ret = sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
    ASSERT(ret == 0, "sdskv_provider_handle_create() failed (ret = %d)\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
195 196
    margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);

197 198 199 200 201
    /* SSG group creation */
    ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
    ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
    margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);

202 203
    /* Mobject provider initialization */
    mobject_provider_t mobject_prov;
Matthieu Dorier's avatar
Matthieu Dorier committed
204 205 206
    ret = mobject_provider_register(mid, 1, 
            MOBJECT_ABT_POOL_DEFAULT, 
            bake_clt_data.provider_handle, 
207
            sdskv_clt_data.provider_handle,
208
            gid, server_opts.cluster_file, &mobject_prov);
209
    if (ret != 0)
210
    {
211
        fprintf(stderr, "Error: Unable to initialize mobject provider\n");
212 213 214 215
        margo_finalize(mid);
        return -1;
    }

216
    margo_addr_free(mid, self_addr);
217

218 219
    margo_wait_for_finalize(mid);

220 221 222 223
    MPI_Finalize();

    return 0;
}
224

225 226 227 228 229 230
static void finalize_ssg_cb(void* data)
{
    ssg_group_id_t* gid = (ssg_group_id_t*)data;
    ssg_group_destroy(*gid);
}

231 232 233 234 235 236
static void finalize_bake_client_cb(void* data)
{
    bake_client_data* clt_data = (bake_client_data*)data;
    bake_provider_handle_release(clt_data->provider_handle);
    bake_client_finalize(clt_data->client);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
237 238 239 240 241 242 243

static void finalize_sdskv_client_cb(void* data)
{
    sdskv_client_data* clt_data = (sdskv_client_data*)data;
    sdskv_provider_handle_release(clt_data->provider_handle);
    sdskv_client_finalize(clt_data->client);
}
244 245 246 247 248 249

static void finalized_ssg_group_cb(void* data)
{
    ssg_group_id_t gid = *((ssg_group_id_t*)data);
    ssg_group_destroy(gid);
}