bake-server-daemon.c 6.71 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9 10 11 12 13
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <libpmemobj.h>
14
#include <bake-server.h>
15

16 17 18 19 20
typedef enum {
    MODE_TARGETS   = 0,
    MODE_PROVIDERS = 1
} mplex_mode_t;

21 22
struct options
{
23
    char *listen_addr_str;
24 25
    unsigned num_pools;
    char **bake_pools;
26
    char *host_file;
27
    size_t buf_size;
28 29
    size_t buf_count;
    uint32_t num_threads;
30
    mplex_mode_t mplex_mode;
31 32 33 34
};

static void usage(int argc, char **argv)
{
35
    fprintf(stderr, "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> <bake_pool2> ...\n");
36
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
37
    fprintf(stderr, "       bake_pool is the path to the BAKE pool\n");
38
    fprintf(stderr, "       [-f filename] to write the server address to a file\n");
39 40
    fprintf(stderr, "       [-m mode] multiplexing mode (providers or targets) for managing multiple pools (default is targets)\n");
    fprintf(stderr, "       [-b size] buffer size for writes on provider\n");
41 42
    fprintf(stderr, "       [-c count] count of buffers used for accesses on provider\n");
    fprintf(stderr, "       [-t threads] number of threads used for concurrency\n");
43
    fprintf(stderr, "Example: ./bake-server-daemon tcp://localhost:1234 /dev/shm/foo.dat /dev/shm/bar.dat\n");
44 45 46 47 48 49 50 51 52 53
    return;
}

static void parse_args(int argc, char **argv, struct options *opts)
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
54
    while((opt = getopt(argc, argv, "f:m:b:t:c:")) != -1)
55 56 57 58 59 60
    {
        switch(opt)
        {
            case 'f':
                opts->host_file = optarg;
                break;
61 62 63 64 65 66 67 68 69 70
            case 'm':
                if(0 == strcmp(optarg, "targets"))
                    opts->mplex_mode = MODE_TARGETS;
                else if(0 == strcmp(optarg, "providers"))
                    opts->mplex_mode = MODE_PROVIDERS;
                else {
                    fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
                    exit(EXIT_FAILURE);
                }
                break;
71 72 73
            case 'b':
                opts->buf_size = atol(optarg);
                break;
74 75 76 77 78 79
            case 'c':
                opts->buf_count = atol(optarg);
                break;
            case 't':
                opts->num_threads = atol(optarg);
                break;
80 81 82 83 84 85 86
            default:
                usage(argc, argv);
                exit(EXIT_FAILURE);
        }
    }

    /* get required arguments after options */
87
    if((argc - optind) < 2)
88 89 90 91
    {
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
92
    opts->num_pools = argc - optind - 1;
93
    opts->listen_addr_str = argv[optind++];
94 95 96 97 98
    opts->bake_pools = calloc(opts->num_pools, sizeof(char*));
    int i;
    for(i=0; i < opts->num_pools; i++) {
        opts->bake_pools[i] = argv[optind++];
    }
99 100 101 102

    return;
}

103 104
int main(int argc, char **argv) 
{
105 106
    struct options opts;
    margo_instance_id mid;
107
    int ret;
108

109 110 111 112
    parse_args(argc, argv, &opts);

    /* start margo */
    /* use the main xstream for driving progress and executing rpc handlers */
113
    mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
114
    if(mid == MARGO_INSTANCE_NULL)
115
    {
116
        fprintf(stderr, "Error: margo_init()\n");
117 118 119
        return(-1);
    }

120 121
    margo_enable_remote_shutdown(mid);

122 123 124 125 126 127 128 129
    if(opts.host_file)
    {
        /* write the server address to file if requested */
        FILE *fp;
        hg_addr_t self_addr;
        char self_addr_str[128];
        hg_size_t self_addr_str_sz = 128;
        hg_return_t hret;
130

131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_self()\n");
            margo_finalize(mid);
            return(-1);
        }
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_to_string()\n");
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
            return(-1);
        }
        margo_addr_free(mid, self_addr);

        fp = fopen(opts.host_file, "w");
        if(!fp)
        {
            perror("fopen");
            margo_finalize(mid);
            return(-1);
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }
160

161
    /* initialize the BAKE server */
162 163 164 165 166 167 168 169 170 171 172
    if(opts.mplex_mode == MODE_PROVIDERS) {
        int i;
        for(i=0; i< opts.num_pools; i++) {
            bake_provider_t provider;
            bake_target_id_t tid;
            ret = bake_provider_register(mid, i+1,
                    BAKE_ABT_POOL_DEFAULT,
                    &provider);

            if(ret != 0)
            {
173
                bake_perror( "Error: bake_provider_register()", ret);
174 175 176 177 178 179 180 181
                margo_finalize(mid);
                return(-1);
            }

            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);

            if(ret != 0)
            {
182
                bake_perror("Error: bake_provider_add_storage_target()", ret);
183 184 185 186
                margo_finalize(mid);
                return(-1);
            }

187 188
            bake_provider_set_target_xfer_buffer(provider, tid, opts.buf_count, opts.buf_size);
            bake_provider_set_target_xfer_concurrency(provider, tid, opts.num_threads);
189

190 191
            printf("Provider %d managing new target at multiplex id %d\n", i, i+1);
        }
192

193 194 195 196 197 198 199 200 201 202
    } else {

        int i;
        bake_provider_t provider;
        ret = bake_provider_register(mid, 1,
                BAKE_ABT_POOL_DEFAULT,
                &provider);

        if(ret != 0)
        {
203
            bake_perror("Error: bake_provider_register()", ret);
204 205 206 207 208 209 210 211 212 213
            margo_finalize(mid);                                    
            return(-1);
        }

        for(i=0; i < opts.num_pools; i++) {
            bake_target_id_t tid;
            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i], &tid);

            if(ret != 0)
            {
214
                bake_perror("Error: bake_provider_add_storage_target()", ret);
215 216 217 218
                margo_finalize(mid);                                    
                return(-1);
            }

219 220
            bake_provider_set_target_xfer_buffer(provider, tid, opts.buf_count, opts.buf_size);
            bake_provider_set_target_xfer_concurrency(provider, tid, opts.num_threads);
221

222
            printf("Provider 0 managing new target at multiplex id %d\n", 1);
223
        }
224
    }
225

226
    /* suspend until the BAKE server gets a shutdown signal from the client */
227
    margo_wait_for_finalize(mid);
228

229 230
    free(opts.bake_pools);

231 232
    return(0);
}