bake-server-daemon.c 5.71 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9 10 11 12 13
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <libpmemobj.h>
14
#include <bake-server.h>
15

16 17 18 19 20
typedef enum {
    MODE_TARGETS   = 0,
    MODE_PROVIDERS = 1
} mplex_mode_t;

21 22
struct options
{
23
    char *listen_addr_str;
24 25
    unsigned num_pools;
    char **bake_pools;
26
    char *host_file;
27
    mplex_mode_t mplex_mode;
28 29 30 31
};

static void usage(int argc, char **argv)
{
32
    fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> <bake_pool2> ...\n");
33
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
34
    fprintf(stderr, "       bake_pool is the path to the BAKE pool\n");
35
    fprintf(stderr, "       [-f filename] to write the server address to a file\n");
36
    fprintf(stderr, "       [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n"); 
37
    fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
38 39 40 41 42 43 44 45 46 47
    return;
}

static void parse_args(int argc, char **argv, struct options *opts)
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
48
    while((opt = getopt(argc, argv, "f:m:")) != -1)
49 50 51 52 53 54
    {
        switch(opt)
        {
            case 'f':
                opts->host_file = optarg;
                break;
55 56 57 58 59 60 61 62 63 64
            case 'm':
                if(0 == strcmp(optarg, "targets"))
                    opts->mplex_mode = MODE_TARGETS;
                else if(0 == strcmp(optarg, "providers"))
                    opts->mplex_mode = MODE_PROVIDERS;
                else {
                    fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
                    exit(EXIT_FAILURE);
                }
                break;
65 66 67 68 69 70 71
            default:
                usage(argc, argv);
                exit(EXIT_FAILURE);
        }
    }

    /* get required arguments after options */
72
    if((argc - optind) < 2)
73 74 75 76
    {
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
77
    opts->num_pools = argc - optind - 1;
78
    opts->listen_addr_str = argv[optind++];
79 80 81 82 83
    opts->bake_pools = calloc(opts->num_pools, sizeof(char*));
    int i;
    for(i=0; i < opts->num_pools; i++) {
        opts->bake_pools[i] = argv[optind++];
    }
84 85 86 87

    return;
}

88 89
int main(int argc, char **argv) 
{
90 91
    struct options opts;
    margo_instance_id mid;
92
    int ret;
93

94 95 96 97
    parse_args(argc, argv, &opts);

    /* start margo */
    /* use the main xstream for driving progress and executing rpc handlers */
98
    mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
99
    if(mid == MARGO_INSTANCE_NULL)
100
    {
101
        fprintf(stderr, "Error: margo_init()\n");
102 103 104
        return(-1);
    }

105 106 107 108 109 110 111 112
    if(opts.host_file)
    {
        /* write the server address to file if requested */
        FILE *fp;
        hg_addr_t self_addr;
        char self_addr_str[128];
        hg_size_t self_addr_str_sz = 128;
        hg_return_t hret;
113

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_self()\n");
            margo_finalize(mid);
            return(-1);
        }
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_to_string()\n");
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
            return(-1);
        }
        margo_addr_free(mid, self_addr);

        fp = fopen(opts.host_file, "w");
        if(!fp)
        {
            perror("fopen");
            margo_finalize(mid);
            return(-1);
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }
143

144
    /* initialize the BAKE server */
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
    if(opts.mplex_mode == MODE_PROVIDERS) {
        int i;
        for(i=0; i< opts.num_pools; i++) {
            bake_provider_t provider;
            bake_target_id_t tid;
            ret = bake_provider_register(mid, i+1,
                    BAKE_ABT_POOL_DEFAULT,
                    &provider);

            if(ret != 0)
            {
                fprintf(stderr, "Error: bake_provider_register()\n");
                margo_finalize(mid);
                return(-1);
            }

            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);

            if(ret != 0)
            {
                fprintf(stderr, "Error: bake_provider_add_storage_target()\n");
                margo_finalize(mid);
                return(-1);
            }

            printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
        }
172

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    } else {

        int i;
        bake_provider_t provider;
        ret = bake_provider_register(mid, 1,
                BAKE_ABT_POOL_DEFAULT,
                &provider);

        if(ret != 0)
        {
            fprintf(stderr, "Error: bake_provider_register()\n");
            margo_finalize(mid);                                    
            return(-1);
        }

        for(i=0; i < opts.num_pools; i++) {
            bake_target_id_t tid;
            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);

            if(ret != 0)
            {
                fprintf(stderr, "Error: bake_provider_add_storage_target()\n");
                margo_finalize(mid);                                    
                return(-1);
            }

199
            printf("Provider 0 managing new target at multiplex id %d\n", 1);
200
        }
201
    }
202

203
    /* suspend until the BAKE server gets a shutdown signal from the client */
204
    margo_wait_for_finalize(mid);
205

206 207
    free(opts.bake_pools);

208 209
    return(0);
}