mobject-server-daemon.c 6.89 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <unistd.h>
8
#include <getopt.h>
9 10 11
#include <mpi.h>
#include <margo.h>
#include <ssg.h>
12
#include <ssg-mpi.h>
13 14
#include <bake-client.h>
#include <bake-server.h>
15 16
#include <sdskv-client.h>
#include <sdskv-server.h>
17 18 19

#include "mobject-server.h"

20 21
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

22 23 24 25 26
typedef struct {
    bake_client_t          client;
    bake_provider_handle_t provider_handle;
} bake_client_data;

27 28 29 30 31
typedef struct {
    sdskv_client_t         client;
    sdskv_provider_handle_t provider_handle;
} sdskv_client_data;

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
typedef struct {
    char*   listen_addr;
    char*   cluster_file;
    int     pool_size;
} mobject_server_options;

static void usage(void)
{
    fprintf(stderr, "Usage: mobject-server-daemon <listen_addr> <cluster_file>\n");
    fprintf(stderr, "  <listen_addr>    the Mercury address to listen on\n");
    fprintf(stderr, "  <cluster_file>   the file to write mobject cluster connect info to\n");
    exit(-1);
}

static void parse_args(int argc, char **argv, mobject_server_options *opts)
{
    int c;
    char *short_options = "p:";
    struct option long_options[] = {
        {"pool-size",  required_argument, 0, 'p'},
    };
    char *check = NULL;

    while ((c = getopt_long(argc, argv, short_options, long_options, NULL)) != -1)
    {
        switch (c)
        {
            case 'p':
                opts->pool_size = atoi(optarg);
                break;
            default:
                usage();
        }
    }

    if ((argc - optind) != 2)
        usage();
    opts->listen_addr = argv[optind++];
    opts->cluster_file = argv[optind++];

    return;
}

75
static void finalize_ssg_cb(void* data);
76
static void finalize_bake_client_cb(void* data);
77
static void finalize_sdskv_client_cb(void* data);
78
static void finalized_ssg_group_cb(void* data);
79

80 81
int main(int argc, char *argv[])
{
82 83 84
    mobject_server_options server_opts = {
        .pool_size = 10485760, /* 10 MiB default */
    }; 
85 86 87
    margo_instance_id mid;
    int ret;

88
    parse_args(argc, argv, &server_opts);
89

90
    /* MPI required for SSG bootstrapping */
91
    MPI_Init(&argc, &argv);
92 93
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
94

95
    /* Margo initialization */
96
    mid = margo_init(server_opts.listen_addr, MARGO_SERVER_MODE, 0, -1);
97 98 99 100 101
    if (mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
        return -1;
    }
102
    margo_enable_remote_shutdown(mid);
103

104 105
    /* SSG initialization */
    ret = ssg_init(mid);
106
    ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
107 108 109 110 111 112 113

    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);

    /* Bake provider initialization */
    /* XXX mplex id and target name should be taken from config file */
    uint8_t bake_mplex_id = 1;
114 115 116 117 118
    char bake_target_name[128];
    sprintf(bake_target_name, "/dev/shm/mobject.%d.dat", rank);
    /* create the bake target if it does not exist */
    if(-1 == access(bake_target_name, F_OK)) {
        // XXX creating a pool of 10MB - this should come from a config file
119
        ret = bake_makepool(bake_target_name, server_opts.pool_size, 0664);
Rob Latham's avatar
Rob Latham committed
120
        if (ret != 0) bake_perror("bake_makepool", ret);
121 122
        ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
    }
123 124
    bake_provider_t bake_prov;
    bake_target_id_t bake_tid;
125
    ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
Rob Latham's avatar
Rob Latham committed
126
    if (ret != 0) bake_perror("bake_provider_register", ret);
127 128
    ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
    ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
Rob Latham's avatar
Rob Latham committed
129
    if (ret != 0) bake_perror("bake_provider_add_storage_target", ret);
130 131
    ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
            bake_target_name, ret);
132 133 134

    /* Bake provider handle initialization from self addr */
    bake_client_data bake_clt_data;
135
    ret = bake_client_init(mid, &(bake_clt_data.client));
Rob Latham's avatar
Rob Latham committed
136
    if (ret != 0) bake_perror("bake_client_init", ret);
137 138
    ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
    ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
Rob Latham's avatar
Rob Latham committed
139
    if (ret != 0) bake_perror("bake_provider_handle_create", ret);
140
    ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
141 142
    margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);

143
    /* SDSKV provider initialization */
144
    uint8_t sdskv_mplex_id = 2;
145
    sdskv_provider_t sdskv_prov;
146 147 148
    ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
    ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

149
    ret = mobject_sdskv_provider_setup(sdskv_prov);
150 151 152

    /* SDSKV provider handle initialization from self addr */
    sdskv_client_data sdskv_clt_data;
153 154 155 156
    ret = sdskv_client_init(mid, &(sdskv_clt_data.client));
    ASSERT(ret == 0, "sdskv_client_init() failed (ret = %d)\n", ret);
    ret = sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
    ASSERT(ret == 0, "sdskv_provider_handle_create() failed (ret = %d)\n", ret);
157 158
    margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);

159 160 161 162 163
    /* SSG group creation */
    ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
    ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
    margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);

164 165
    /* Mobject provider initialization */
    mobject_provider_t mobject_prov;
166 167 168
    ret = mobject_provider_register(mid, 1, 
            MOBJECT_ABT_POOL_DEFAULT, 
            bake_clt_data.provider_handle, 
169
            sdskv_clt_data.provider_handle,
170
            gid, server_opts.cluster_file, &mobject_prov);
171
    if (ret != 0)
172
    {
173
        fprintf(stderr, "Error: Unable to initialize mobject provider\n");
174 175 176 177
        margo_finalize(mid);
        return -1;
    }

178
    margo_addr_free(mid, self_addr);
179

180 181
    margo_wait_for_finalize(mid);

182 183 184 185
    MPI_Finalize();

    return 0;
}
186

187 188 189 190 191 192
static void finalize_ssg_cb(void* data)
{
    ssg_group_id_t* gid = (ssg_group_id_t*)data;
    ssg_group_destroy(*gid);
}

193 194 195 196 197 198
static void finalize_bake_client_cb(void* data)
{
    bake_client_data* clt_data = (bake_client_data*)data;
    bake_provider_handle_release(clt_data->provider_handle);
    bake_client_finalize(clt_data->client);
}
199 200 201 202 203 204 205

static void finalize_sdskv_client_cb(void* data)
{
    sdskv_client_data* clt_data = (sdskv_client_data*)data;
    sdskv_provider_handle_release(clt_data->provider_handle);
    sdskv_client_finalize(clt_data->client);
}
206 207 208 209 210 211

static void finalized_ssg_group_cb(void* data)
{
    ssg_group_id_t gid = *((ssg_group_id_t*)data);
    ssg_group_destroy(gid);
}