mobject-server-daemon.c 7.75 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <unistd.h>
8
#include <getopt.h>
9 10 11
#include <mpi.h>
#include <margo.h>
#include <ssg.h>
12
#include <ssg-mpi.h>
13 14
#include <bake-client.h>
#include <bake-server.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
15 16
#include <sdskv-client.h>
#include <sdskv-server.h>
17 18 19

#include "mobject-server.h"

20 21
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

22 23 24 25 26
typedef struct {
    bake_client_t          client;
    bake_provider_handle_t provider_handle;
} bake_client_data;

Matthieu Dorier's avatar
Matthieu Dorier committed
27 28 29 30 31
typedef struct {
    sdskv_client_t         client;
    sdskv_provider_handle_t provider_handle;
} sdskv_client_data;

32
typedef struct {
33 34 35 36
    char*           listen_addr;
    char*           cluster_file;
    sdskv_db_type_t kv_backend;
    size_t          pool_size;
37 38 39 40
} mobject_server_options;

static void usage(void)
{
41
    fprintf(stderr, "Usage: mobject-server-daemon [OPTIONS] <listen_addr> <cluster_file>\n");
42 43
    fprintf(stderr, "  <listen_addr>    the Mercury address to listen on\n");
    fprintf(stderr, "  <cluster_file>   the file to write mobject cluster connect info to\n");
44 45 46
    fprintf(stderr, "  OPTIONS:\n");
    fprintf(stderr, "    --pool-size    Bake pool size for each server [default: 1GiB]\n");
    fprintf(stderr, "    --kv-backend   SDSKV backend to use (mapdb, leveldb, berkeleydb) [default: stdmap]\n");
47 48 49 50 51 52 53 54
    exit(-1);
}

static void parse_args(int argc, char **argv, mobject_server_options *opts)
{
    int c;
    char *short_options = "p:";
    struct option long_options[] = {
55 56
        {"kv-backend", required_argument, 0, 'k'},
        {"pool-size", required_argument, 0, 'p'},
57 58 59 60 61 62
    };

    while ((c = getopt_long(argc, argv, short_options, long_options, NULL)) != -1)
    {
        switch (c)
        {
63 64 65 66 67 68 69 70 71 72
            case 'k':
                if(strcmp(optarg, "mapdb") == 0)
                    opts->kv_backend = KVDB_MAP;
                else if(strcmp(optarg, "leveldb") == 0)
                    opts->kv_backend = KVDB_LEVELDB;
                else if(strcmp(optarg, "berkeleydb") == 0)
                    opts->kv_backend = KVDB_BERKELEYDB;
                else
                    usage();
                break;
73
            case 'p':
Shane Snyder's avatar
Shane Snyder committed
74
                opts->pool_size = strtoul(optarg, NULL, 0);
75 76 77 78 79 80 81 82 83 84 85 86 87 88
                break;
            default:
                usage();
        }
    }

    if ((argc - optind) != 2)
        usage();
    opts->listen_addr = argv[optind++];
    opts->cluster_file = argv[optind++];

    return;
}

89
static void finalize_ssg_cb(void* data);
90
static void finalize_bake_client_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
91
static void finalize_sdskv_client_cb(void* data);
92
static void finalized_ssg_group_cb(void* data);
Matthieu Dorier's avatar
Matthieu Dorier committed
93

94 95
int main(int argc, char *argv[])
{
96
    mobject_server_options server_opts = {
97 98
        .kv_backend = KVDB_MAP,
        .pool_size = 1*1024*1024*1024, /* 1 GiB default */
99
    }; 
100 101 102
    margo_instance_id mid;
    int ret;

103
    parse_args(argc, argv, &server_opts);
104

105
    /* MPI required for SSG bootstrapping */
106
    MPI_Init(&argc, &argv);
107 108
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
109

110
    /* Margo initialization */
111
    mid = margo_init(server_opts.listen_addr, MARGO_SERVER_MODE, 0, 4);
112 113 114 115 116
    if (mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
        return -1;
    }
117
    margo_enable_remote_shutdown(mid);
118

119 120
    /* SSG initialization */
    ret = ssg_init(mid);
121
    ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
122 123 124 125 126 127 128

    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);

    /* Bake provider initialization */
    /* XXX mplex id and target name should be taken from config file */
    uint8_t bake_mplex_id = 1;
129 130 131 132 133
    char bake_target_name[128];
    sprintf(bake_target_name, "/dev/shm/mobject.%d.dat", rank);
    /* create the bake target if it does not exist */
    if(-1 == access(bake_target_name, F_OK)) {
        // XXX creating a pool of 10MB - this should come from a config file
134
        ret = bake_makepool(bake_target_name, server_opts.pool_size, 0664);
Rob Latham's avatar
Rob Latham committed
135
        if (ret != 0) bake_perror("bake_makepool", ret);
136 137
        ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
    }
138 139
    bake_provider_t bake_prov;
    bake_target_id_t bake_tid;
140
    ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
Rob Latham's avatar
Rob Latham committed
141
    if (ret != 0) bake_perror("bake_provider_register", ret);
142 143
    ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
    ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
Rob Latham's avatar
Rob Latham committed
144
    if (ret != 0) bake_perror("bake_provider_add_storage_target", ret);
145 146
    ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
            bake_target_name, ret);
Shane Snyder's avatar
Shane Snyder committed
147
    bake_provider_set_conf(bake_prov, "pipeline_enabled", "1");
148 149 150

    /* Bake provider handle initialization from self addr */
    bake_client_data bake_clt_data;
151
    ret = bake_client_init(mid, &(bake_clt_data.client));
Rob Latham's avatar
Rob Latham committed
152
    if (ret != 0) bake_perror("bake_client_init", ret);
153 154
    ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
    ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
Rob Latham's avatar
Rob Latham committed
155
    if (ret != 0) bake_perror("bake_provider_handle_create", ret);
156
    ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
157 158
    margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);

Matthieu Dorier's avatar
Matthieu Dorier committed
159
    /* SDSKV provider initialization */
160
    uint8_t sdskv_mplex_id = 2;
Matthieu Dorier's avatar
Matthieu Dorier committed
161
    sdskv_provider_t sdskv_prov;
162 163 164
    ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
    ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

165
    ret = mobject_sdskv_provider_setup(sdskv_prov, server_opts.kv_backend);
Matthieu Dorier's avatar
Matthieu Dorier committed
166 167 168

    /* SDSKV provider handle initialization from self addr */
    sdskv_client_data sdskv_clt_data;
169 170 171 172
    ret = sdskv_client_init(mid, &(sdskv_clt_data.client));
    ASSERT(ret == 0, "sdskv_client_init() failed (ret = %d)\n", ret);
    ret = sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
    ASSERT(ret == 0, "sdskv_provider_handle_create() failed (ret = %d)\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
173 174
    margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);

175 176 177 178 179
    /* SSG group creation */
    ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
    ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
    margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);

180 181
    /* Mobject provider initialization */
    mobject_provider_t mobject_prov;
Matthieu Dorier's avatar
Matthieu Dorier committed
182 183 184
    ret = mobject_provider_register(mid, 1, 
            MOBJECT_ABT_POOL_DEFAULT, 
            bake_clt_data.provider_handle, 
185
            sdskv_clt_data.provider_handle,
186
            gid, server_opts.cluster_file, &mobject_prov);
187
    if (ret != 0)
188
    {
189
        fprintf(stderr, "Error: Unable to initialize mobject provider\n");
190 191 192 193
        margo_finalize(mid);
        return -1;
    }

194
    margo_addr_free(mid, self_addr);
195

196 197
    margo_wait_for_finalize(mid);

198 199 200 201
    MPI_Finalize();

    return 0;
}
202

203 204 205 206 207 208
static void finalize_ssg_cb(void* data)
{
    ssg_group_id_t* gid = (ssg_group_id_t*)data;
    ssg_group_destroy(*gid);
}

209 210 211 212 213 214
static void finalize_bake_client_cb(void* data)
{
    bake_client_data* clt_data = (bake_client_data*)data;
    bake_provider_handle_release(clt_data->provider_handle);
    bake_client_finalize(clt_data->client);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
215 216 217 218 219 220 221

static void finalize_sdskv_client_cb(void* data)
{
    sdskv_client_data* clt_data = (sdskv_client_data*)data;
    sdskv_provider_handle_release(clt_data->provider_handle);
    sdskv_client_finalize(clt_data->client);
}
222 223 224 225 226 227

static void finalized_ssg_group_cb(void* data)
{
    ssg_group_id_t gid = *((ssg_group_id_t*)data);
    ssg_group_destroy(gid);
}