bake-server-daemon.c 7.09 KB
Newer Older
1
2
/*
 * (C) 2015 The University of Chicago
Philip Carns's avatar
Philip Carns committed
3
 *
4
5
6
 * See COPYRIGHT in top-level directory.
 */

7
8
#include "bake-config.h"

9
10
11
12
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
13
#include <bake-server.h>
14

Philip Carns's avatar
Philip Carns committed
15
16
typedef enum
{
17
18
19
20
    MODE_TARGETS   = 0,
    MODE_PROVIDERS = 1
} mplex_mode_t;

Philip Carns's avatar
Philip Carns committed
21
22
23
24
25
26
struct options {
    char*        listen_addr_str;
    unsigned     num_pools;
    char**       bake_pools;
    char*        host_file;
    int          pipeline_enabled;
27
    mplex_mode_t mplex_mode;
28
29
};

Philip Carns's avatar
Philip Carns committed
30
static void usage(int argc, char** argv)
31
{
Philip Carns's avatar
Philip Carns committed
32
33
34
    fprintf(stderr,
            "Usage: bake-server-daemon [OPTIONS] <listen_addr> <bake_pool1> "
            "<bake_pool2> ...\n");
Philip Carns's avatar
Philip Carns committed
35
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
36
    fprintf(stderr, "       bake_pool is the path to the BAKE pool\n");
Philip Carns's avatar
Philip Carns committed
37
38
39
40
41
42
43
    fprintf(stderr,
            "           (prepend pmem: or file: to specify backend format)\n");
    fprintf(stderr,
            "       [-f filename] to write the server address to a file\n");
    fprintf(stderr,
            "       [-m mode] multiplexing mode (providers or targets) for "
            "managing multiple pools (default is targets)\n");
44
    fprintf(stderr, "       [-p] enable pipelining\n");
Philip Carns's avatar
Philip Carns committed
45
46
47
    fprintf(stderr,
            "Example: ./bake-server-daemon tcp://localhost:1234 "
            "/dev/shm/foo.dat /dev/shm/bar.dat\n");
48
49
50
    return;
}

Philip Carns's avatar
Philip Carns committed
51
static void parse_args(int argc, char** argv, struct options* opts)
52
53
54
55
56
57
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
Philip Carns's avatar
Philip Carns committed
58
59
60
61
62
63
64
65
66
67
68
69
70
    while ((opt = getopt(argc, argv, "f:m:p")) != -1) {
        switch (opt) {
        case 'f':
            opts->host_file = optarg;
            break;
        case 'm':
            if (0 == strcmp(optarg, "targets"))
                opts->mplex_mode = MODE_TARGETS;
            else if (0 == strcmp(optarg, "providers"))
                opts->mplex_mode = MODE_PROVIDERS;
            else {
                fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n",
                        optarg);
71
                exit(EXIT_FAILURE);
Philip Carns's avatar
Philip Carns committed
72
73
74
75
76
77
78
79
            }
            break;
        case 'p':
            opts->pipeline_enabled = 1;
            break;
        default:
            usage(argc, argv);
            exit(EXIT_FAILURE);
80
81
82
83
        }
    }

    /* get required arguments after options */
Philip Carns's avatar
Philip Carns committed
84
    if ((argc - optind) < 2) {
85
86
87
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
Philip Carns's avatar
Philip Carns committed
88
    opts->num_pools       = argc - optind - 1;
Philip Carns's avatar
Philip Carns committed
89
    opts->listen_addr_str = argv[optind++];
Philip Carns's avatar
Philip Carns committed
90
    opts->bake_pools      = calloc(opts->num_pools, sizeof(char*));
91
    int i;
Philip Carns's avatar
Philip Carns committed
92
    for (i = 0; i < opts->num_pools; i++) {
93
94
        opts->bake_pools[i] = argv[optind++];
    }
95
96
97
98

    return;
}

Philip Carns's avatar
Philip Carns committed
99
int main(int argc, char** argv)
100
{
Philip Carns's avatar
Philip Carns committed
101
    struct options    opts;
102
    margo_instance_id mid;
Philip Carns's avatar
Philip Carns committed
103
    int               ret;
104
105
    char*             show_conf = NULL;
    bake_provider_t   provider;
106

107
108
109
    parse_args(argc, argv, &opts);

    /* start margo */
Philip Carns's avatar
Philip Carns committed
110
111
    /* use the main xstream for driving progress and executing rpc handlers */
    mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
Philip Carns's avatar
Philip Carns committed
112
    if (mid == MARGO_INSTANCE_NULL) {
113
        fprintf(stderr, "Error: margo_init()\n");
114
        free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
115
        return (-1);
116
117
    }

118
119
    margo_enable_remote_shutdown(mid);

Philip Carns's avatar
Philip Carns committed
120
    if (opts.host_file) {
121
        /* write the server address to file if requested */
Philip Carns's avatar
Philip Carns committed
122
123
124
125
        FILE*       fp;
        hg_addr_t   self_addr;
        char        self_addr_str[128];
        hg_size_t   self_addr_str_sz = 128;
126
        hg_return_t hret;
127

128
129
        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
Philip Carns's avatar
Philip Carns committed
130
        if (hret != HG_SUCCESS) {
131
            fprintf(stderr, "Error: margo_addr_self()\n");
132
            free(opts.bake_pools);
133
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
134
            return (-1);
135
        }
Philip Carns's avatar
Philip Carns committed
136
137
138
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz,
                                    self_addr);
        if (hret != HG_SUCCESS) {
139
            fprintf(stderr, "Error: margo_addr_to_string()\n");
140
            free(opts.bake_pools);
141
142
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
143
            return (-1);
144
145
146
        }
        margo_addr_free(mid, self_addr);

Philip Carns's avatar
Philip Carns committed
147
        fp = fopen(opts.host_file, "w");
Philip Carns's avatar
Philip Carns committed
148
        if (!fp) {
149
            free(opts.bake_pools);
150
151
            perror("fopen");
            margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
152
            return (-1);
153
154
155
156
157
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }
158

Philip Carns's avatar
Philip Carns committed
159
    /* initialize the BAKE server */
Philip Carns's avatar
Philip Carns committed
160
    if (opts.mplex_mode == MODE_PROVIDERS) {
Philip Carns's avatar
Philip Carns committed
161
        int i;
Philip Carns's avatar
Philip Carns committed
162
        for (i = 0; i < opts.num_pools; i++) {
163
            bake_target_id_t               tid;
164
165
166
167
168
169
170
171
            struct bake_provider_init_info bpargs           = {0};
            char                           json_config[256] = {0};

            if (opts.pipeline_enabled) {
                sprintf(json_config, "{\"pipeline_enable\": true}");
                bpargs.json_config = json_config;
            }

172
            ret = bake_provider_register(mid, i + 1, &bpargs, &provider);
Philip Carns's avatar
Philip Carns committed
173

Philip Carns's avatar
Philip Carns committed
174
175
            if (ret != 0) {
                bake_perror("Error: bake_provider_register()", ret);
176
                free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
177
                margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
178
                return (-1);
Philip Carns's avatar
Philip Carns committed
179
180
            }

Philip Carns's avatar
Philip Carns committed
181
182
            ret = bake_provider_attach_target(provider, opts.bake_pools[i],
                                              &tid);
183

Philip Carns's avatar
Philip Carns committed
184
            if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
185
                bake_perror("Error: bake_provider_attach_target()", ret);
186
                free(opts.bake_pools);
187
                margo_finalize(mid);
Philip Carns's avatar
Philip Carns committed
188
                return (-1);
189
190
            }

Philip Carns's avatar
Philip Carns committed
191
192
            printf("Provider %d managing new target at multiplex id %d\n", i,
                   i + 1);
193
        }
194

Philip Carns's avatar
Philip Carns committed
195
    } else {
196

197
        int                            i;
198
199
200
201
202
203
204
205
        struct bake_provider_init_info bpargs           = {0};
        char                           json_config[256] = {0};

        if (opts.pipeline_enabled) {
            sprintf(json_config, "{\"pipeline_enable\": true}");
            bpargs.json_config = json_config;
        }

206
        ret = bake_provider_register(mid, 1, &bpargs, &provider);
207

Philip Carns's avatar
Philip Carns committed
208
        if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
209
            bake_perror("Error: bake_provider_register()", ret);
210
            free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
211
212
            margo_finalize(mid);
            return (-1);
213
        }
214

Philip Carns's avatar
Philip Carns committed
215
        for (i = 0; i < opts.num_pools; i++) {
Philip Carns's avatar
Philip Carns committed
216
            bake_target_id_t tid;
Philip Carns's avatar
Philip Carns committed
217
218
            ret = bake_provider_attach_target(provider, opts.bake_pools[i],
                                              &tid);
Philip Carns's avatar
Philip Carns committed
219

Philip Carns's avatar
Philip Carns committed
220
            if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
221
                bake_perror("Error: bake_provider_attach_target()", ret);
222
                free(opts.bake_pools);
Philip Carns's avatar
Philip Carns committed
223
224
                margo_finalize(mid);
                return (-1);
Philip Carns's avatar
Philip Carns committed
225
226
227
228
229
            }

            printf("Provider 0 managing new target at multiplex id %d\n", 1);
        }
    }
Philip Carns's avatar
Philip Carns committed
230

231
232
233
234
235
236
    printf("Bake provider config:\n");
    printf("=====================\n");
    show_conf = bake_provider_get_config(provider);
    printf("%s\n", show_conf);
    free(show_conf);

237
    /* suspend until the BAKE server gets a shutdown signal from the client */
238
    margo_wait_for_finalize(mid);
239

240
241
    free(opts.bake_pools);

Philip Carns's avatar
Philip Carns committed
242
    return (0);
243
}