mobject-server-daemon.c 8.43 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <unistd.h>
8
#include <getopt.h>
9 10 11
#include <mpi.h>
#include <margo.h>
#include <ssg.h>
12
#include <ssg-mpi.h>
13 14
#include <bake-client.h>
#include <bake-server.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
15 16
#include <sdskv-client.h>
#include <sdskv-server.h>
17 18 19

#include "mobject-server.h"

20 21
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

22 23 24 25 26
typedef struct {
    bake_client_t          client;
    bake_provider_handle_t provider_handle;
} bake_client_data;

Matthieu Dorier's avatar
Matthieu Dorier committed
27 28 29 30 31
typedef struct {
    sdskv_client_t         client;
    sdskv_provider_handle_t provider_handle;
} sdskv_client_data;

32
typedef struct {
33 34
    char*           listen_addr;
    char*           cluster_file;
35
    char *          pool_path;
36
    size_t          pool_size;
37 38
    char *          kv_path;
    sdskv_db_type_t kv_backend;
39 40 41 42
} mobject_server_options;

static void usage(void)
{
43
    fprintf(stderr, "Usage: mobject-server-daemon [OPTIONS] <listen_addr> <cluster_file>\n");
44 45
    fprintf(stderr, "  <listen_addr>    the Mercury address to listen on\n");
    fprintf(stderr, "  <cluster_file>   the file to write mobject cluster connect info to\n");
46
    fprintf(stderr, "  OPTIONS:\n");
47
    fprintf(stderr, "    --pool-path    Bake pool location [default: /dev/shm]\n");
48 49
    fprintf(stderr, "    --pool-size    Bake pool size for each server [default: 1GiB]\n");
    fprintf(stderr, "    --kv-backend   SDSKV backend to use (mapdb, leveldb, berkeleydb) [default: stdmap]\n");
50
    fprintf(stderr, "    --kv-path      SDSKV storage location [default: /dev/shm]\n");
51 52 53 54 55 56
    exit(-1);
}

static void parse_args(int argc, char **argv, mobject_server_options *opts)
{
    int c;
57
    char *short_options = "p:s:a:k:";
58
    struct option long_options[] = {
59 60 61
        {"pool-path", required_argument, 0, 'p'},
        {"pool-size", required_argument, 0, 's'},
        {"kv-path", required_argument, 0, 'a'},
62
        {"kv-backend", required_argument, 0, 'k'},
63 64 65 66 67 68
    };

    while ((c = getopt_long(argc, argv, short_options, long_options, NULL)) != -1)
    {
        switch (c)
        {
69 70 71 72 73 74 75 76 77
            case 'p':
                opts->pool_path = optarg;
                break;
            case 's':
                opts->pool_size = strtoul(optarg, NULL, 0);
                break;
            case 'a':
                opts->kv_path = optarg;
                break;
78 79 80 81 82 83 84 85 86 87
            case 'k':
                if(strcmp(optarg, "mapdb") == 0)
                    opts->kv_backend = KVDB_MAP;
                else if(strcmp(optarg, "leveldb") == 0)
                    opts->kv_backend = KVDB_LEVELDB;
                else if(strcmp(optarg, "berkeleydb") == 0)
                    opts->kv_backend = KVDB_BERKELEYDB;
                else
                    usage();
                break;
88 89 90 91 92 93 94 95 96 97 98 99 100
            default:
                usage();
        }
    }

    if ((argc - optind) != 2)
        usage();
    opts->listen_addr = argv[optind++];
    opts->cluster_file = argv[optind++];

    return;
}

101
static void finalize_ssg_cb(void* data);
102
static void finalize_bake_client_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
103
static void finalize_sdskv_client_cb(void* data);
104
static void finalized_ssg_group_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
105

106 107
int main(int argc, char *argv[])
{
108
    mobject_server_options server_opts = {
109
        .pool_path = "/dev/shm", /* default bake pool path */
110
        .pool_size = 1*1024*1024*1024, /* 1 GiB default */
111 112
        .kv_path = "/dev/shm", /* default sdskv path */
        .kv_backend = KVDB_MAP, /* in-memory map default */
113
    }; 
114 115 116
    margo_instance_id mid;
    int ret;

117
    parse_args(argc, argv, &server_opts);
118

119
    /* MPI required for SSG bootstrapping */
120
    MPI_Init(&argc, &argv);
121 122
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
123

124
    /* Margo initialization */
125
    mid = margo_init(server_opts.listen_addr, MARGO_SERVER_MODE, 0, 4);
126 127 128 129 130
    if (mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
        return -1;
    }
131
    margo_enable_remote_shutdown(mid);
132

133 134
    /* SSG initialization */
    ret = ssg_init(mid);
135
    ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
136 137 138 139 140 141 142

    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);

    /* Bake provider initialization */
    /* XXX mplex id and target name should be taken from config file */
    uint8_t bake_mplex_id = 1;
143
    char bake_target_name[128];
144
    sprintf(bake_target_name, "%s/mobject.%d.dat", server_opts.pool_path, rank);
145 146 147
    /* create the bake target if it does not exist */
    if(-1 == access(bake_target_name, F_OK)) {
        // XXX creating a pool of 10MB - this should come from a config file
148
        ret = bake_makepool(bake_target_name, server_opts.pool_size, 0664);
Rob Latham's avatar
Rob Latham committed
149
        if (ret != 0) bake_perror("bake_makepool", ret);
150 151
        ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
    }
152 153
    bake_provider_t bake_prov;
    bake_target_id_t bake_tid;
154
    ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
Rob Latham's avatar
Rob Latham committed
155
    if (ret != 0) bake_perror("bake_provider_register", ret);
156 157
    ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
    ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
Rob Latham's avatar
Rob Latham committed
158
    if (ret != 0) bake_perror("bake_provider_add_storage_target", ret);
159 160
    ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
            bake_target_name, ret);
Shane Snyder's avatar
Shane Snyder committed
161
    bake_provider_set_conf(bake_prov, "pipeline_enabled", "1");
162 163 164

    /* Bake provider handle initialization from self addr */
    bake_client_data bake_clt_data;
165
    ret = bake_client_init(mid, &(bake_clt_data.client));
Rob Latham's avatar
Rob Latham committed
166
    if (ret != 0) bake_perror("bake_client_init", ret);
167 168
    ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
    ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
Rob Latham's avatar
Rob Latham committed
169
    if (ret != 0) bake_perror("bake_provider_handle_create", ret);
170
    ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
171 172
    margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);

Matthieu Dorier's avatar
Matthieu Dorier committed
173
    /* SDSKV provider initialization */
174
    uint8_t sdskv_mplex_id = 2;
Matthieu Dorier's avatar
Matthieu Dorier committed
175
    sdskv_provider_t sdskv_prov;
176 177 178
    ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
    ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

179
    ret = mobject_sdskv_provider_setup(sdskv_prov, server_opts.kv_path, server_opts.kv_backend);
Matthieu Dorier's avatar
Matthieu Dorier committed
180 181 182

    /* SDSKV provider handle initialization from self addr */
    sdskv_client_data sdskv_clt_data;
183 184 185 186
    ret = sdskv_client_init(mid, &(sdskv_clt_data.client));
    ASSERT(ret == 0, "sdskv_client_init() failed (ret = %d)\n", ret);
    ret = sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
    ASSERT(ret == 0, "sdskv_provider_handle_create() failed (ret = %d)\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
187 188
    margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);

189 190 191 192 193
    /* SSG group creation */
    ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
    ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
    margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);

194 195
    /* Mobject provider initialization */
    mobject_provider_t mobject_prov;
Matthieu Dorier's avatar
Matthieu Dorier committed
196 197 198
    ret = mobject_provider_register(mid, 1, 
            MOBJECT_ABT_POOL_DEFAULT, 
            bake_clt_data.provider_handle, 
199
            sdskv_clt_data.provider_handle,
200
            gid, server_opts.cluster_file, &mobject_prov);
201
    if (ret != 0)
202
    {
203
        fprintf(stderr, "Error: Unable to initialize mobject provider\n");
204 205 206 207
        margo_finalize(mid);
        return -1;
    }

208
    margo_addr_free(mid, self_addr);
209

210 211
    margo_wait_for_finalize(mid);

212 213 214 215
    MPI_Finalize();

    return 0;
}
216

217 218 219 220 221 222
static void finalize_ssg_cb(void* data)
{
    ssg_group_id_t* gid = (ssg_group_id_t*)data;
    ssg_group_destroy(*gid);
}

223 224 225 226 227 228
static void finalize_bake_client_cb(void* data)
{
    bake_client_data* clt_data = (bake_client_data*)data;
    bake_provider_handle_release(clt_data->provider_handle);
    bake_client_finalize(clt_data->client);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
229 230 231 232 233 234 235

static void finalize_sdskv_client_cb(void* data)
{
    sdskv_client_data* clt_data = (sdskv_client_data*)data;
    sdskv_provider_handle_release(clt_data->provider_handle);
    sdskv_client_finalize(clt_data->client);
}
236 237 238 239 240 241

static void finalized_ssg_group_cb(void* data)
{
    ssg_group_id_t gid = *((ssg_group_id_t*)data);
    ssg_group_destroy(gid);
}