sdskv-server-daemon.c 7.48 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <sdskv-server.h>

typedef enum {
    MODE_DATABASES = 0,
    MODE_PROVIDERS = 1
} kv_mplex_mode_t;

struct options
{
    char *listen_addr_str;
    unsigned num_db;
    char **db_names;
    sdskv_db_type_t *db_types;
    char *host_file;
    kv_mplex_mode_t mplex_mode;
};

static void usage(int argc, char **argv)
{
30
    fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:map|:bwt|:bdb|:ldb] <db name 2>[:map|:bwt|:bdb|:ldb] ...\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
31 32 33 34 35 36 37 38
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
    fprintf(stderr, "       db name X are the names of the databases\n");
    fprintf(stderr, "       [-f filename] to write the server address to a file\n");
    fprintf(stderr, "       [-m mode] multiplexing mode (providers or databases) for managing multiple databases (default is databases)\n"); 
    fprintf(stderr, "Example: ./sdskv-server-daemon tcp://localhost:1234 foo:bdb bar\n");
    return;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
39 40
static sdskv_db_type_t parse_db_type(char* db_fullname) {
    char* column = strstr(db_fullname, ":");
41 42 43
    if(column == NULL) {
        return KVDB_MAP;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
44 45
    *column = '\0';
    char* db_type = column + 1;
46 47 48
    if(strcmp(db_type, "map") == 0) {
        return KVDB_MAP;
    } else if(strcmp(db_type, "bwt") == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
        return KVDB_BWTREE;
    } else if(strcmp(db_type, "bdb") == 0) {
        return KVDB_BERKELEYDB;
    } else if(strcmp(db_type, "ldb") == 0) {
        return KVDB_LEVELDB;
    }
    fprintf(stderr, "Unknown database type \"%s\"\n", db_type);
    exit(-1);
}

static void parse_args(int argc, char **argv, struct options *opts)
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
    while((opt = getopt(argc, argv, "f:m:")) != -1)
    {
        switch(opt)
        {
            case 'f':
                opts->host_file = optarg;
                break;
            case 'm':
                if(0 == strcmp(optarg, "databases"))
                    opts->mplex_mode = MODE_DATABASES;
                else if(0 == strcmp(optarg, "providers"))
                    opts->mplex_mode = MODE_PROVIDERS;
                else {
                    fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
                    exit(EXIT_FAILURE);
                }
                break;
            default:
                usage(argc, argv);
                exit(EXIT_FAILURE);
        }
    }

    /* get required arguments after options */
    if((argc - optind) < 2)
    {
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
    opts->num_db = argc - optind - 1;
    opts->listen_addr_str = argv[optind++];
    opts->db_names = calloc(opts->num_db, sizeof(char*));
    opts->db_types = calloc(opts->num_db, sizeof(sdskv_db_type_t));
    int i;
    for(i=0; i < opts->num_db; i++) {
        opts->db_names[i] = argv[optind++];
        opts->db_types[i] = parse_db_type(opts->db_names[i]);
    }

    return;
}

int main(int argc, char **argv) 
{
    struct options opts;
    margo_instance_id mid;
    int ret;

    parse_args(argc, argv, &opts);

    /* start margo */
    /* use the main xstream for driving progress and executing rpc handlers */
    mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
    if(mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: margo_init()\n");
        return(-1);
    }

    margo_enable_remote_shutdown(mid);

    if(opts.host_file)
    {
        /* write the server address to file if requested */
        FILE *fp;
        hg_addr_t self_addr;
        char self_addr_str[128];
        hg_size_t self_addr_str_sz = 128;
        hg_return_t hret;

        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_self()\n");
            margo_finalize(mid);
            return(-1);
        }
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_to_string()\n");
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
            return(-1);
        }
        margo_addr_free(mid, self_addr);

        fp = fopen(opts.host_file, "w");
        if(!fp)
        {
            perror("fopen");
            margo_finalize(mid);
            return(-1);
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }

    /* initialize the SDSKV server */
    if(opts.mplex_mode == MODE_PROVIDERS) {
        int i;
        for(i=0; i< opts.num_db; i++) {
            sdskv_provider_t provider;
            ret = sdskv_provider_register(mid, i+1,
                    SDSKV_ABT_POOL_DEFAULT,
                    &provider);

            if(ret != 0)
            {
                fprintf(stderr, "Error: sdskv_provider_register()\n");
                margo_finalize(mid);
                return(-1);
            }

182 183 184 185 186 187 188 189
            char* path = opts.db_names[i];
            char* x = strrchr(path, '/');
            char* db_name = path;
            if(x != NULL) {
                db_name = x+1;
                *x = '\0';
            }

Matthieu Dorier's avatar
Matthieu Dorier committed
190
            sdskv_database_id_t db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
191
            sdskv_config_t db_config = { 
192 193
                .db_name = db_name,
                .db_path = (x == NULL ? "" : path),
Matthieu Dorier's avatar
Matthieu Dorier committed
194
                .db_type = opts.db_types[i],
195
                .db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
Matthieu Dorier's avatar
Matthieu Dorier committed
196 197 198
                .db_no_overwrite = 0
            };
            ret = sdskv_provider_attach_database(provider, &db_config, &db_id);
Matthieu Dorier's avatar
Matthieu Dorier committed
199 200 201

            if(ret != 0)
            {
Matthieu Dorier's avatar
Matthieu Dorier committed
202
                fprintf(stderr, "Error: bake_provider_attach_database()\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
203 204 205 206
                margo_finalize(mid);
                return(-1);
            }

Matthieu Dorier's avatar
Matthieu Dorier committed
207
            printf("Provider %d managing database \"%s\" at multiplex id %d\n", i, opts.db_names[i], i+1);
Matthieu Dorier's avatar
Matthieu Dorier committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
        }

    } else {

        int i;
        sdskv_provider_t provider;
        ret = sdskv_provider_register(mid, 1,
                SDSKV_ABT_POOL_DEFAULT,
                &provider);

        if(ret != 0)
        {
            fprintf(stderr, "Error: sdskv_provider_register()\n");
            margo_finalize(mid);                                    
            return(-1);
        }

        for(i=0; i < opts.num_db; i++) {
            sdskv_database_id_t db_id;
227 228 229 230 231 232 233
            char* path = opts.db_names[i];
            char* x = strrchr(path, '/');
            char* db_name = path;
            if(x != NULL) {
                db_name = x+1;
                *x = '\0';
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
234
            sdskv_config_t db_config = {
235 236
                .db_name = db_name,
                .db_path = (x == NULL ? "" : path),
Matthieu Dorier's avatar
Matthieu Dorier committed
237
                .db_type = opts.db_types[i],
238
                .db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
Matthieu Dorier's avatar
Matthieu Dorier committed
239 240 241
                .db_no_overwrite = 0
            };
            ret = sdskv_provider_attach_database(provider, &db_config, &db_id);
Matthieu Dorier's avatar
Matthieu Dorier committed
242 243 244

            if(ret != 0)
            {
Matthieu Dorier's avatar
Matthieu Dorier committed
245
                fprintf(stderr, "Error: sdskv_provider_attach_database()\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
246 247 248 249
                margo_finalize(mid);                                    
                return(-1);
            }

Matthieu Dorier's avatar
Matthieu Dorier committed
250
            printf("Provider 0 managing database \"%s\" at multiplex id %d\n", opts.db_names[i] , 1);
Matthieu Dorier's avatar
Matthieu Dorier committed
251 252 253 254 255 256 257 258 259 260
        }
    }

    /* suspend until the BAKE server gets a shutdown signal from the client */
    margo_wait_for_finalize(mid);

    free(opts.db_names);

    return(0);
}