bake-server-daemon.c 7.51 KB
Newer Older
1
2
/*
 * (C) 2015 The University of Chicago
Philip Carns's avatar
Philip Carns committed
3
 *
4
5
6
 * See COPYRIGHT in top-level directory.
 */

7
8
#include "bake-config.h"

9
10
11
12
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
13
#include <bake-server.h>
14

Philip Carns's avatar
Philip Carns committed
15
16
typedef enum
{
17
18
19
20
    MODE_TARGETS   = 0,
    MODE_PROVIDERS = 1
} mplex_mode_t;

Philip Carns's avatar
Philip Carns committed
21
22
23
24
25
26
struct options {
    char*        listen_addr_str;
    unsigned     num_pools;
    char**       bake_pools;
    char*        host_file;
    int          pipeline_enabled;
27
    mplex_mode_t mplex_mode;
28
29
};

Philip Carns's avatar
Philip Carns committed
30
static void usage(int argc, char** argv)
31
{
Philip Carns's avatar
Philip Carns committed
32
33
34
    fprintf(stderr,
            "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> "
            "<bake_pool2> ...\n");
Philip Carns's avatar
Philip Carns committed
35
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
36
    fprintf(stderr, "       bake_pool is the path to the BAKE pool\n");
Philip Carns's avatar
Philip Carns committed
37
38
39
40
41
42
43
    fprintf(stderr,
            "           (prepend pmem: or file: to specify backend format)\n");
    fprintf(stderr,
            "       [-f filename] to write the server address to a file\n");
    fprintf(stderr,
            "       [-m mode] multiplexing mode (providers or targets) for "
            "managing multiple pools (default is targets)\n");
44
    fprintf(stderr, "       [-p] enable pipelining\n");
Philip Carns's avatar
Philip Carns committed
45
46
47
    fprintf(stderr,
            "Example: ./bake-server-daemon tcp://localhost:1234 "
            "/dev/shm/foo.dat /dev/shm/bar.dat\n");
48
49
50
    return;
}

Philip Carns's avatar
Philip Carns committed
51
static void parse_args(int argc, char** argv, struct options* opts)
52
53
54
55
56
57
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
Philip Carns's avatar
Philip Carns committed
58
59
60
61
62
63
64
65
66
67
68
69
70
    while ((opt = getopt(argc, argv, "f:m:p")) != -1) {
        switch (opt) {
        case 'f':
            opts->host_file = optarg;
            break;
        case 'm':
            if (0 == strcmp(optarg, "targets"))
                opts->mplex_mode = MODE_TARGETS;
            else if (0 == strcmp(optarg, "providers"))
                opts->mplex_mode = MODE_PROVIDERS;
            else {
                fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n",
                        optarg);
71
                exit(EXIT_FAILURE);
Philip Carns's avatar
Philip Carns committed
72
73
74
75
76
77
78
79
            }
            break;
        case 'p':
            opts->pipeline_enabled = 1;
            break;
        default:
            usage(argc, argv);
            exit(EXIT_FAILURE);
80
81
82
83
        }
    }

    /* get required arguments after options */
Philip Carns's avatar
Philip Carns committed
84
    if ((argc - optind) < 2) {
85
86
87
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
Philip Carns's avatar
Philip Carns committed
88
    opts->num_pools       = argc - optind - 1;
Philip Carns's avatar
Philip Carns committed
89
    opts->listen_addr_str = argv[optind++];
Philip Carns's avatar
Philip Carns committed
90
    opts->bake_pools      = calloc(opts->num_pools, sizeof(char*));
91
    int i;
Philip Carns's avatar
Philip Carns committed
92
    for (i = 0; i < opts->num_pools; i++) {
93
94
        opts->bake_pools[i] = argv[optind++];
    }
95
96
97
98

    return;
}

Philip Carns's avatar
Philip Carns committed
99
int main(int argc, char** argv)
100
{
Philip Carns's avatar
Philip Carns committed
101
    struct options    opts;
102
    margo_instance_id mid;
Philip Carns's avatar
Philip Carns committed
103
    int               ret;
104

105
106
107
    parse_args(argc, argv, &opts);

    /* start margo */
Philip Carns's avatar
Philip Carns committed
108
109
    /* use the main xstream for driving progress and executing rpc handlers */
    mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
Philip Carns's avatar
Philip Carns committed
110
    if (mid == MARGO_INSTANCE_NULL) {
111
        fprintf(stderr, "Error: margo_init()\n");
112
        free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
113
        return (-1);
114
115
    }

116
117
    margo_enable_remote_shutdown(mid);

Philip Carns's avatar
Philip Carns committed
118
    if (opts.host_file) {
119
        /* write the server address to file if requested */
Philip Carns's avatar
Philip Carns committed
120
121
122
123
        FILE*       fp;
        hg_addr_t   self_addr;
        char        self_addr_str[128];
        hg_size_t   self_addr_str_sz = 128;
124
        hg_return_t hret;
125

126
127
        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
Philip Carns's avatar
Philip Carns committed
128
        if (hret != HG_SUCCESS) {
129
            fprintf(stderr, "Error: margo_addr_self()\n");
130
            free(opts.bake_pools);
131
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
132
            return (-1);
133
        }
Philip Carns's avatar
Philip Carns committed
134
135
136
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz,
                                    self_addr);
        if (hret != HG_SUCCESS) {
137
            fprintf(stderr, "Error: margo_addr_to_string()\n");
138
            free(opts.bake_pools);
139
140
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
141
            return (-1);
142
143
144
        }
        margo_addr_free(mid, self_addr);

Philip Carns's avatar
Philip Carns committed
145
        fp = fopen(opts.host_file, "w");
Philip Carns's avatar
Philip Carns committed
146
        if (!fp) {
147
            free(opts.bake_pools);
148
149
            perror("fopen");
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
150
            return (-1);
151
152
153
154
155
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }
156

Philip Carns's avatar
Philip Carns committed
157
    /* initialize the BAKE server */
Philip Carns's avatar
Philip Carns committed
158
    if (opts.mplex_mode == MODE_PROVIDERS) {
Philip Carns's avatar
Philip Carns committed
159
        int i;
Philip Carns's avatar
Philip Carns committed
160
        for (i = 0; i < opts.num_pools; i++) {
161
162
            bake_provider_t                provider;
            bake_target_id_t               tid;
163
164
            struct bake_provider_init_info bpargs           = {0};
            char                           json_config[256] = {0};
165
            char*                          show_conf        = NULL;
166
167
168
169
170
171

            if (opts.pipeline_enabled) {
                sprintf(json_config, "{\"pipeline_enable\": true}");
                bpargs.json_config = json_config;
            }

172
            ret = bake_provider_register(mid, i + 1, &bpargs, &provider);
Philip Carns's avatar
Philip Carns committed
173

Philip Carns's avatar
Philip Carns committed
174
175
            if (ret != 0) {
                bake_perror("Error: bake_provider_register()", ret);
176
                free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
177
                margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
178
                return (-1);
Philip Carns's avatar
Philip Carns committed
179
180
            }

Philip Carns's avatar
Philip Carns committed
181
182
            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i],
                                                   &tid);
183

Philip Carns's avatar
Philip Carns committed
184
            if (ret != 0) {
185
                bake_perror("Error: bake_provider_add_storage_target()", ret);
186
                free(opts.bake_pools);
187
                margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
188
                return (-1);
189
190
            }

Philip Carns's avatar
Philip Carns committed
191
192
            printf("Provider %d managing new target at multiplex id %d\n", i,
                   i + 1);
193
194
195
196
197
            printf("Bake provider config:\n");
            printf("=====================\n");
            show_conf = bake_provider_get_config(provider);
            printf("%s\n", show_conf);
            free(show_conf);
198
        }
199

Philip Carns's avatar
Philip Carns committed
200
    } else {
201

202
203
        int                            i;
        bake_provider_t                provider;
204
205
        struct bake_provider_init_info bpargs           = {0};
        char                           json_config[256] = {0};
206
        char*                          show_conf        = NULL;
207
208
209
210
211
212

        if (opts.pipeline_enabled) {
            sprintf(json_config, "{\"pipeline_enable\": true}");
            bpargs.json_config = json_config;
        }

213
        ret = bake_provider_register(mid, 1, &bpargs, &provider);
214

Philip Carns's avatar
Philip Carns committed
215
        if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
216
            bake_perror("Error: bake_provider_register()", ret);
217
            free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
218
219
            margo_finalize(mid);
            return (-1);
220
        }
221

Philip Carns's avatar
Philip Carns committed
222
        for (i = 0; i < opts.num_pools; i++) {
Philip Carns's avatar
Philip Carns committed
223
            bake_target_id_t tid;
Philip Carns's avatar
Philip Carns committed
224
225
            ret = bake_provider_add_storage_target(provider, opts.bake_pools[i],
                                                   &tid);
Philip Carns's avatar
Philip Carns committed
226

Philip Carns's avatar
Philip Carns committed
227
            if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
228
                bake_perror("Error: bake_provider_add_storage_target()", ret);
229
                free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
230
231
                margo_finalize(mid);
                return (-1);
Philip Carns's avatar
Philip Carns committed
232
233
234
235
            }

            printf("Provider 0 managing new target at multiplex id %d\n", 1);
        }
236
237
238
239
240
        printf("Bake provider config:\n");
        printf("=====================\n");
        show_conf = bake_provider_get_config(provider);
        printf("%s\n", show_conf);
        free(show_conf);
Philip Carns's avatar
Philip Carns committed
241
    }
Philip Carns's avatar
Philip Carns committed
242

243
    /* suspend until the BAKE server gets a shutdown signal from the client */
244
    margo_wait_for_finalize(mid);
245

246
247
    free(opts.bake_pools);

Philip Carns's avatar
Philip Carns committed
248
    return (0);
249
}