bake-server.c 35.7 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1
2
/*
 * (C) 2015 The University of Chicago
Philip Carns's avatar
Philip Carns committed
3
 *
Philip Carns's avatar
Philip Carns committed
4
5
6
 * See COPYRIGHT in top-level directory.
 */

7
8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11
12
#include <unistd.h>
#include <fcntl.h>
13
14
#include <margo.h>
#include <margo-bulk-pool.h>
15
#include <json-c/json.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#ifdef USE_REMI
Philip Carns's avatar
Philip Carns committed
17
18
    #include <remi/remi-client.h>
    #include <remi/remi-server.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
19
#endif
20
#include "bake-server.h"
21
#include "uthash.h"
22
#include "bake-rpc.h"
23
#include "bake-timing.h"
24
#include "bake-provider.h"
25
#include "bake-macros.h"
26
27

extern bake_backend g_bake_pmem_backend;
Philip Carns's avatar
Philip Carns committed
28
extern bake_backend g_bake_file_backend;
Philip Carns's avatar
Philip Carns committed
29

30
31
32
33
34
35
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
36
DECLARE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
37
38
39
40
41
42
43
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
44
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
45
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
46

Philip Carns's avatar
Philip Carns committed
47
48
49
50
51
52
/**
 * Validates the format of the configuration and fills default values
 * if they are not provided
 */
static int validate_and_complete_config(struct json_object* _config,
                                        ABT_pool            _progress_pool);
53
static int setup_poolset(bake_provider_t provider);
Philip Carns's avatar
Philip Carns committed
54

Philip Carns's avatar
Philip Carns committed
55
56
static bake_target_t* find_target_entry(bake_provider_t  provider,
                                        bake_target_id_t target_id)
57
58
{
    bake_target_t* entry = NULL;
Philip Carns's avatar
Philip Carns committed
59
60
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t),
              entry);
61
62
63
    return entry;
}

Philip Carns's avatar
Philip Carns committed
64
static void bake_server_finalize_cb(void* data);
Philip Carns's avatar
Philip Carns committed
65

Matthieu Dorier's avatar
Matthieu Dorier committed
66
#ifdef USE_REMI
Philip Carns's avatar
Philip Carns committed
67
68
static int bake_target_post_migration_callback(remi_fileset_t fileset,
                                               void*          provider);
Matthieu Dorier's avatar
Matthieu Dorier committed
69
#endif
70

71
72
73
74
int bake_provider_register(margo_instance_id                     mid,
                           uint16_t                              provider_id,
                           const struct bake_provider_init_info* uargs,
                           bake_provider_t*                      provider)
75
{
76
77
78
    struct bake_provider_init_info args = *uargs;
    bake_provider*                 tmp_provider;
    int                            ret;
Philip Carns's avatar
Philip Carns committed
79
    struct json_object*            config = NULL;
80

81
    /* check if a provider with the same provider id already exists */
82
    {
Philip Carns's avatar
Philip Carns committed
83
        hg_id_t   id;
84
        hg_bool_t flag;
Philip Carns's avatar
Philip Carns committed
85
86
87
88
89
90
91
        margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id,
                                       &flag);
        if (flag == HG_TRUE) {
            fprintf(stderr,
                    "bake_provider_register(): a BAKE provider with the same "
                    "id (%d) already exists\n",
                    provider_id);
Matthieu Dorier's avatar
Matthieu Dorier committed
92
            return BAKE_ERR_MERCURY;
93
        }
94
    }
95

Philip Carns's avatar
Philip Carns committed
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    if (args.json_config) {
        /* read JSON config from provided string argument */
        struct json_tokener*    tokener = json_tokener_new();
        enum json_tokener_error jerr;

        config = json_tokener_parse_ex(tokener, args.json_config,
                                       strlen(args.json_config));
        if (!config) {
            jerr = json_tokener_get_error(tokener);
            fprintf(stderr, "JSON parse error: %s",
                    json_tokener_error_desc(jerr));
            json_tokener_free(tokener);
            return BAKE_ERR_INVALID_ARG;
        }
        json_tokener_free(tokener);
    } else {
        /* create default JSON config */
        config = json_object_new_object();
    }

    /* validate and complete configuration */
    ret = validate_and_complete_config(config, args.rpc_pool);
    if (ret != 0) {
        fprintf(stderr, "Could not validate and complete configuration");
        json_object_put(config);
        return BAKE_ERR_INVALID_ARG;
    }

Philip Carns's avatar
Philip Carns committed
124
125
126
    /* allocate the resulting structure */
    tmp_provider = calloc(1, sizeof(*tmp_provider));
    if (!tmp_provider) return BAKE_ERR_ALLOCATION;
Philip Carns's avatar
Philip Carns committed
127

128
129
    tmp_provider->json_cfg = config;

130
    tmp_provider->mid = mid;
131
132
    if (args.rpc_pool != NULL)
        tmp_provider->handler_pool = args.rpc_pool;
133
134
135
136
    else {
        margo_get_handler_pool(mid, &(tmp_provider->handler_pool));
    }

137
138
139
140
141
142
143
144
145
    ret = setup_poolset(tmp_provider);
    if (ret != 0) {
        fprintf(stderr, "Could not create poolset for pipelining");
        json_object_put(config);
        free(tmp_provider);
        return ret;
    }

    /* create buffer poolset if needed for config */
146

147
    /* Create rwlock */
148
    ret = ABT_rwlock_create(&(tmp_provider->lock));
Philip Carns's avatar
Philip Carns committed
149
    if (ret != ABT_SUCCESS) {
150
        free(tmp_provider);
151
152
153
        return BAKE_ERR_ARGOBOTS;
    }

154
    /* register RPCs */
155
    hg_id_t rpc_id;
Philip Carns's avatar
Philip Carns committed
156
157
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc", bake_create_in_t,
                                     bake_create_out_t, bake_create_ult,
158
                                     provider_id, tmp_provider->handler_pool);
159
160
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_create_id = rpc_id;
161

Philip Carns's avatar
Philip Carns committed
162
163
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc", bake_write_in_t,
                                     bake_write_out_t, bake_write_ult,
164
                                     provider_id, tmp_provider->handler_pool);
165
166
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_write_id = rpc_id;
167

Philip Carns's avatar
Philip Carns committed
168
169
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_eager_write_rpc", bake_eager_write_in_t,
170
171
        bake_eager_write_out_t, bake_eager_write_ult, provider_id,
        tmp_provider->handler_pool);
172
173
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_write_id = rpc_id;
174

Philip Carns's avatar
Philip Carns committed
175
176
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_eager_read_rpc", bake_eager_read_in_t, bake_eager_read_out_t,
177
        bake_eager_read_ult, provider_id, tmp_provider->handler_pool);
178
179
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_read_id = rpc_id;
180

Philip Carns's avatar
Philip Carns committed
181
182
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc", bake_persist_in_t,
                                     bake_persist_out_t, bake_persist_ult,
183
                                     provider_id, tmp_provider->handler_pool);
184
185
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_persist_id = rpc_id;
186

Philip Carns's avatar
Philip Carns committed
187
188
189
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_create_write_persist_rpc", bake_create_write_persist_in_t,
        bake_create_write_persist_out_t, bake_create_write_persist_ult,
190
        provider_id, tmp_provider->handler_pool);
191
192
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_create_write_persist_id = rpc_id;
193

194
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_create_write_persist_rpc",
Philip Carns's avatar
Philip Carns committed
195
196
197
                                     bake_eager_create_write_persist_in_t,
                                     bake_eager_create_write_persist_out_t,
                                     bake_eager_create_write_persist_ult,
198
                                     provider_id, tmp_provider->handler_pool);
199
200
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_create_write_persist_id = rpc_id;
201

202
203
204
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t,
        bake_get_size_ult, provider_id, tmp_provider->handler_pool);
205
206
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_get_size_id = rpc_id;
207

208
209
210
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_get_data_rpc", bake_get_data_in_t, bake_get_data_out_t,
        bake_get_data_ult, provider_id, tmp_provider->handler_pool);
211
212
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_get_data_id = rpc_id;
213

Philip Carns's avatar
Philip Carns committed
214
215
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc", bake_read_in_t,
                                     bake_read_out_t, bake_read_ult,
216
                                     provider_id, tmp_provider->handler_pool);
217
218
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_read_id = rpc_id;
219

Philip Carns's avatar
Philip Carns committed
220
221
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc", bake_probe_in_t,
                                     bake_probe_out_t, bake_probe_ult,
222
                                     provider_id, tmp_provider->handler_pool);
223
224
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_probe_id = rpc_id;
225

Philip Carns's avatar
Philip Carns committed
226
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc", void, void,
227
228
                                     bake_noop_ult, provider_id,
                                     tmp_provider->handler_pool);
229
230
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_noop_id = rpc_id;
231

Philip Carns's avatar
Philip Carns committed
232
233
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_remove_rpc", bake_remove_in_t,
                                     bake_remove_out_t, bake_remove_ult,
234
                                     provider_id, tmp_provider->handler_pool);
235
236
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_remove_id = rpc_id;
237

Philip Carns's avatar
Philip Carns committed
238
239
240
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_migrate_region_rpc", bake_migrate_region_in_t,
        bake_migrate_region_out_t, bake_migrate_region_ult, provider_id,
241
        tmp_provider->handler_pool);
242
243
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_migrate_region_id = rpc_id;
244

Philip Carns's avatar
Philip Carns committed
245
246
247
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_migrate_target_rpc", bake_migrate_target_in_t,
        bake_migrate_target_out_t, bake_migrate_target_ult, provider_id,
248
        tmp_provider->handler_pool);
249
250
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_migrate_target_id = rpc_id;
251

Matthieu Dorier's avatar
Matthieu Dorier committed
252
253
254
    /* get a client-side version of the bake_create_write_persist RPC */
    hg_bool_t flag;
    margo_registered_name(mid, "bake_create_write_persist_rpc", &rpc_id, &flag);
Philip Carns's avatar
Philip Carns committed
255
    if (flag) {
256
        tmp_provider->bake_create_write_persist_id = rpc_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
257
    } else {
Philip Carns's avatar
Philip Carns committed
258
259
260
261
        tmp_provider->bake_create_write_persist_id
            = MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                             bake_create_write_persist_in_t,
                             bake_create_write_persist_out_t, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
262
263
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
264
#ifdef USE_REMI
265
    /* register a REMI client */
266
    // TODO actually use an ABT-IO instance
Philip Carns's avatar
Philip Carns committed
267
268
269
    ret = remi_client_init(mid, ABT_IO_INSTANCE_NULL,
                           &(tmp_provider->remi_client));
    if (ret != REMI_SUCCESS) {
270
        // XXX unregister RPCs, cleanup tmp_provider before returning
271
272
        return BAKE_ERR_REMI;
    }
273
274

    /* register a REMI provider */
Matthieu Dorier's avatar
Matthieu Dorier committed
275
    {
Philip Carns's avatar
Philip Carns committed
276
        int             flag;
Matthieu Dorier's avatar
Matthieu Dorier committed
277
278
        remi_provider_t remi_provider;
        /* check if a REMI provider exists with the same provider id */
Philip Carns's avatar
Philip Carns committed
279
280
281
282
        remi_provider_registered(mid, provider_id, &flag, NULL, NULL,
                                 &remi_provider);
        if (flag) { /* REMI provider exists */
            tmp_provider->remi_provider      = remi_provider;
283
            tmp_provider->owns_remi_provider = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
284
285
        } else { /* REMI provider does not exist */
            // TODO actually use an ABT-IO instance
Philip Carns's avatar
Philip Carns committed
286
            ret = remi_provider_register(mid, ABT_IO_INSTANCE_NULL, provider_id,
287
                                         tmp_provider->handler_pool,
Philip Carns's avatar
Philip Carns committed
288
289
                                         &(tmp_provider->remi_provider));
            if (ret != REMI_SUCCESS) {
290
                // XXX unregister RPCs, cleanup tmp_provider before returning
Matthieu Dorier's avatar
Matthieu Dorier committed
291
292
                return BAKE_ERR_REMI;
            }
293
            tmp_provider->owns_remi_provider = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
294
        }
Philip Carns's avatar
Philip Carns committed
295
296
297
298
        ret = remi_provider_register_migration_class(
            tmp_provider->remi_provider, "bake", NULL,
            bake_target_post_migration_callback, NULL, tmp_provider);
        if (ret != REMI_SUCCESS) {
299
            // XXX unregister RPCs, cleanup tmp_provider before returning
Matthieu Dorier's avatar
Matthieu Dorier committed
300
301
            return BAKE_ERR_REMI;
        }
302
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
303
#endif
304

Philip Carns's avatar
Philip Carns committed
305
306
    tmp_provider->json_cfg = config;

307
    /* install the bake server finalize callback */
Philip Carns's avatar
Philip Carns committed
308
309
    margo_provider_push_finalize_callback(
        mid, tmp_provider, &bake_server_finalize_cb, tmp_provider);
310

Philip Carns's avatar
Philip Carns committed
311
    if (provider != BAKE_PROVIDER_IGNORE) *provider = tmp_provider;
312

Matthieu Dorier's avatar
Matthieu Dorier committed
313
    return BAKE_SUCCESS;
314
315
}

316
317
318
319
320
321
322
int bake_provider_destroy(bake_provider_t provider)
{
    margo_provider_pop_finalize_callback(provider->mid, provider);
    bake_server_finalize_cb(provider);
    return BAKE_SUCCESS;
}

Philip Carns's avatar
Philip Carns committed
323
324
325
int bake_provider_add_storage_target(bake_provider_t   provider,
                                     const char*       target_name,
                                     bake_target_id_t* target_id)
326
{
Philip Carns's avatar
Philip Carns committed
327
328
    int               ret = BAKE_SUCCESS;
    bake_target_id_t  tid;
329
330
331
332
    backend_context_t ctx = NULL;

    char* backend_type = NULL;
    // figure out the backend by searching until the ":" in the target name
Philip Carns's avatar
Philip Carns committed
333
334
335
336
337
    const char* tmp = strchr(target_name, ':');
    if (tmp != NULL) {
        backend_type                                     = strdup(target_name);
        backend_type[(unsigned long)(tmp - target_name)] = '\0';
        target_name                                      = tmp + 1;
338
339
    } else {
        backend_type = strdup("pmem");
340
341
    }

342
    bake_target_t* new_entry = calloc(1, sizeof(*new_entry));
Philip Carns's avatar
Philip Carns committed
343
344

    if (strcmp(backend_type, "pmem") == 0) {
345
        new_entry->backend = &g_bake_pmem_backend;
Philip Carns's avatar
Philip Carns committed
346
    } else if (strcmp(backend_type, "file") == 0) {
Philip Carns's avatar
Philip Carns committed
347
        new_entry->backend = &g_bake_file_backend;
348
349
350
351
352
    } else {
        fprintf(stderr, "ERROR: unknown backend type \"%s\"\n", backend_type);
        free(backend_type);
        return BAKE_ERR_BACKEND_TYPE;
    }
353

354
    ret = new_entry->backend->_initialize(provider, target_name, &tid, &ctx);
Philip Carns's avatar
Philip Carns committed
355
    if (ret != 0) {
356
        free(backend_type);
357
        free(new_entry);
358
        return ret;
359
    }
Philip Carns's avatar
Philip Carns committed
360
    new_entry->context   = ctx;
361
    new_entry->target_id = tid;
362

363
364
    /* write-lock the provider */
    ABT_rwlock_wrlock(provider->lock);
365
    /* insert in the provider's hash */
Philip Carns's avatar
Philip Carns committed
366
367
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t),
             new_entry);
368
    /* check that it was inserted */
369
    bake_target_t* check_entry = NULL;
Philip Carns's avatar
Philip Carns committed
370
371
372
373
374
    HASH_FIND(hh, provider->targets, &tid, sizeof(bake_target_id_t),
              check_entry);
    if (check_entry != new_entry) {
        fprintf(stderr,
                "Error: BAKE could not insert new pmem pool into the hash\n");
375
        new_entry->backend->_finalize(ctx);
376
        free(new_entry);
377
378
379
        ret = BAKE_ERR_ALLOCATION;
    } else {
        provider->num_targets += 1;
380
        *target_id = new_entry->target_id;
Philip Carns's avatar
Philip Carns committed
381
        ret        = BAKE_SUCCESS;
382
    }
383
384
    /* unlock provider */
    ABT_rwlock_unlock(provider->lock);
385
    free(backend_type);
386
    return ret;
387
388
}

Philip Carns's avatar
Philip Carns committed
389
390
int bake_provider_remove_storage_target(bake_provider_t  provider,
                                        bake_target_id_t target_id)
391
{
392
393
    int ret;
    ABT_rwlock_wrlock(provider->lock);
394
    bake_target_t* entry = NULL;
Philip Carns's avatar
Philip Carns committed
395
396
397
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t),
              entry);
    if (!entry) {
398
399
400
        ret = BAKE_ERR_UNKNOWN_TARGET;
    } else {
        HASH_DEL(provider->targets, entry);
401
        entry->backend->_finalize(entry->context);
402
403
404
405
406
        free(entry);
        ret = BAKE_SUCCESS;
    }
    ABT_rwlock_unlock(provider->lock);
    return ret;
407
408
}

Philip Carns's avatar
Philip Carns committed
409
int bake_provider_remove_all_storage_targets(bake_provider_t provider)
410
{
411
    ABT_rwlock_wrlock(provider->lock);
412
    bake_target_t *p, *tmp;
Philip Carns's avatar
Philip Carns committed
413
414
    HASH_ITER(hh, provider->targets, p, tmp)
    {
415
        HASH_DEL(provider->targets, p);
416
        p->backend->_finalize(p->context);
417
418
        free(p);
    }
419
    provider->num_targets = 0;
420
    margo_bulk_poolset_destroy(provider->poolset);
421
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
422
    return BAKE_SUCCESS;
423
424
}

Philip Carns's avatar
Philip Carns committed
425
426
int bake_provider_count_storage_targets(bake_provider_t provider,
                                        uint64_t*       num_targets)
427
{
428
    ABT_rwlock_rdlock(provider->lock);
429
    *num_targets = provider->num_targets;
430
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
431
    return BAKE_SUCCESS;
432
433
}

Philip Carns's avatar
Philip Carns committed
434
435
int bake_provider_list_storage_targets(bake_provider_t   provider,
                                       bake_target_id_t* targets)
436
{
437
    ABT_rwlock_rdlock(provider->lock);
438
    bake_target_t *p, *tmp;
Philip Carns's avatar
Philip Carns committed
439
440
441
    uint64_t       i = 0;
    HASH_ITER(hh, provider->targets, p, tmp)
    {
442
443
444
        targets[i] = p->target_id;
        i += 1;
    }
445
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
446
    return BAKE_SUCCESS;
447
448
}

Philip Carns's avatar
Philip Carns committed
449
450
451
452
453
454
455
456
457
#define DECLARE_LOCAL_VARS(rpc_name)                    \
    margo_instance_id       mid = MARGO_INSTANCE_NULL;  \
    bake_##rpc_name##_out_t out = {0};                  \
    bake_##rpc_name##_in_t  in;                         \
    hg_return_t             hret;                       \
    ABT_rwlock              lock     = ABT_RWLOCK_NULL; \
    const struct hg_info*   info     = NULL;            \
    bake_provider_t         provider = NULL;            \
    bake_target_t*          target   = NULL
458
459
460
461
462

#define FIND_PROVIDER                                    \
    do {                                                 \
        mid = margo_hg_handle_get_instance(handle);      \
        assert(mid);                                     \
Philip Carns's avatar
Philip Carns committed
463
        info     = margo_get_info(handle);               \
464
        provider = margo_registered_data(mid, info->id); \
Philip Carns's avatar
Philip Carns committed
465
        if (!provider) {                                 \
466
467
            out.ret = BAKE_ERR_UNKNOWN_PROVIDER;         \
            goto finish;                                 \
Philip Carns's avatar
Philip Carns committed
468
469
        }                                                \
    } while (0)
470
471
472
473

#define GET_RPC_INPUT                        \
    do {                                     \
        hret = margo_get_input(handle, &in); \
Philip Carns's avatar
Philip Carns committed
474
        if (hret != HG_SUCCESS) {            \
475
476
477
            out.ret = BAKE_ERR_MERCURY;      \
            goto finish;                     \
        }                                    \
Philip Carns's avatar
Philip Carns committed
478
    } while (0)
479
480
481
482
483

#define LOCK_PROVIDER            \
    do {                         \
        lock = provider->lock;   \
        ABT_rwlock_rdlock(lock); \
Philip Carns's avatar
Philip Carns committed
484
    } while (0)
485
486
487
488

#define FIND_TARGET                                   \
    do {                                              \
        target = find_target_entry(provider, in.bti); \
Philip Carns's avatar
Philip Carns committed
489
        if (target == NULL) {                         \
490
491
492
            out.ret = BAKE_ERR_UNKNOWN_TARGET;        \
            goto finish;                              \
        }                                             \
Philip Carns's avatar
Philip Carns committed
493
    } while (0)
494

Philip Carns's avatar
Philip Carns committed
495
496
497
498
#define UNLOCK_PROVIDER                                       \
    do {                                                      \
        if (lock != ABT_RWLOCK_NULL) ABT_rwlock_unlock(lock); \
    } while (0)
499

Philip Carns's avatar
Philip Carns committed
500
501
502
503
504
505
#define RESPOND_AND_CLEANUP            \
    do {                               \
        margo_respond(handle, &out);   \
        margo_free_input(handle, &in); \
        margo_destroy(handle);         \
    } while (0)
506

507
/* service a remote RPC that creates a BAKE region */
508
static void bake_create_ult(hg_handle_t handle)
509
{
510
511
512
513
514
    DECLARE_LOCAL_VARS(create);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
515

516
    memset(&out, 0, sizeof(out));
Philip Carns's avatar
Philip Carns committed
517
518
    out.ret
        = target->backend->_create(target->context, in.region_size, &out.rid);
519

520
521
522
523
524
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
525

526
527
528
529
530
531
532
533
/* service a remote RPC that writes to a BAKE region */
static void bake_write_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(write);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
534

535
536
    memset(&out, 0, sizeof(out));
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
537
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
538
539
540
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
541
    }
Philip Carns's avatar
Philip Carns committed
542
    if (hret != HG_SUCCESS) {
543
        out.ret = BAKE_ERR_MERCURY;
544
        goto finish;
545
    }
546

Philip Carns's avatar
Philip Carns committed
547
548
549
    out.ret = target->backend->_write_bulk(
        target->context, in.rid, in.region_offset, in.bulk_size, in.bulk_handle,
        src_addr, in.bulk_offset);
550
551

finish:
552
553
554
    UNLOCK_PROVIDER;
    margo_addr_free(mid, src_addr);
    RESPOND_AND_CLEANUP;
555
}
556
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
557

Philip Carns's avatar
Philip Carns committed
558
/* service a remote RPC that writes to a BAKE region in eager mode */
559
static void bake_eager_write_ult(hg_handle_t handle)
560
{
561
562
    DECLARE_LOCAL_VARS(eager_write);
    in.buffer = NULL;
Philip Carns's avatar
Philip Carns committed
563
    in.size   = 0;
564
565
566
567
568
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;

Philip Carns's avatar
Philip Carns committed
569
570
    out.ret = target->backend->_write_raw(target->context, in.rid,
                                          in.region_offset, in.size, in.buffer);
Matthieu Dorier's avatar
Matthieu Dorier committed
571

572
573
574
575
576
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
577

Philip Carns's avatar
Philip Carns committed
578
/* service a remote RPC that persists to a BAKE region */
579
580
581
582
583
584
585
static void bake_persist_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(persist);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
586

Philip Carns's avatar
Philip Carns committed
587
588
    out.ret = target->backend->_persist(target->context, in.rid, in.offset,
                                        in.size);
Philip Carns's avatar
Philip Carns committed
589
590

finish:
591
592
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
Philip Carns's avatar
Philip Carns committed
593
}
594
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
Philip Carns's avatar
Philip Carns committed
595

596
static void bake_create_write_persist_ult(hg_handle_t handle)
Philip Carns's avatar
Philip Carns committed
597
{
598
599
600
601
602
    DECLARE_LOCAL_VARS(create_write_persist);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
Philip Carns's avatar
Philip Carns committed
603
604
    memset(&out, 0, sizeof(out));

605
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
606
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
607
608
609
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
Philip Carns's avatar
Philip Carns committed
610
    }
Philip Carns's avatar
Philip Carns committed
611
    if (hret != HG_SUCCESS) {
Philip Carns's avatar
Philip Carns committed
612
613
614
        out.ret = BAKE_ERR_MERCURY;
        goto finish;
    }
615

Philip Carns's avatar
Philip Carns committed
616
    if (!target->backend->_create_write_persist_bulk) {
Philip Carns's avatar
Philip Carns committed
617
618
619
620
        /* If the backend does not provide a combination
         * create_write_persist function, then issue constituent backend
         * calls instead.
         */
Philip Carns's avatar
Philip Carns committed
621
622
623
624
625
626
627
628
629
630
        out.ret = target->backend->_create(target->context, in.region_size,
                                           &out.rid);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_write_bulk(target->context, out.rid, 0,
                                               in.bulk_size, in.bulk_handle,
                                               src_addr, in.bulk_offset);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_persist(target->context, out.rid, 0,
                                            in.region_size);
    } else {
Philip Carns's avatar
Philip Carns committed
631
        out.ret = target->backend->_create_write_persist_bulk(
Philip Carns's avatar
Philip Carns committed
632
633
            target->context, in.bulk_handle, src_addr, in.bulk_offset,
            in.bulk_size, &out.rid);
Philip Carns's avatar
Philip Carns committed
634
635
    }

636
finish:
637
    UNLOCK_PROVIDER;
638
    margo_addr_free(mid, src_addr);
639
    RESPOND_AND_CLEANUP;
640
641
    return;
}
642
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
643

644
static void bake_eager_create_write_persist_ult(hg_handle_t handle)
645
{
646
    DECLARE_LOCAL_VARS(eager_create_write_persist);
647
    in.buffer = NULL;
Philip Carns's avatar
Philip Carns committed
648
    in.size   = 0;
649
650
651
652
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
653
654
655

    memset(&out, 0, sizeof(out));

Philip Carns's avatar
Philip Carns committed
656
    if (!target->backend->_create_write_persist_raw) {
Philip Carns's avatar
Philip Carns committed
657
658
659
660
661
        /* If the backend does not provide a combination
         * create_write_persist function, then issue constituent backend
         * calls instead.
         */
        out.ret = target->backend->_create(target->context, in.size, &out.rid);
Philip Carns's avatar
Philip Carns committed
662
663
664
665
666
667
668
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_write_raw(target->context, out.rid, 0,
                                              in.size, in.buffer);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret
            = target->backend->_persist(target->context, out.rid, 0, in.size);
    } else {
Philip Carns's avatar
Philip Carns committed
669
670
671
        out.ret = target->backend->_create_write_persist_raw(
            target->context, in.buffer, in.size, &out.rid);
    }
672

673
674
675
676
677
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
678

679
680
681
682
683
684
685
686
/* service a remote RPC that retrieves the size of a BAKE region */
static void bake_get_size_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(get_size);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
687

688
    memset(&out, 0, sizeof(out));
Philip Carns's avatar
Philip Carns committed
689
690
    out.ret
        = target->backend->_get_region_size(target->context, in.rid, &out.size);
691

692
693
694
695
696
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
697

Philip Carns's avatar
Philip Carns committed
698
/* Get the raw pointer of a region */
699
700
701
702
703
704
705
706
static void bake_get_data_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(get_data);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
    out.ptr = 0;
707

Philip Carns's avatar
Philip Carns committed
708
709
    out.ret = target->backend->_get_region_data(target->context, in.rid,
                                                (void**)&out.ptr);
710

711
712
713
714
715
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_data_ult)
716

Philip Carns's avatar
Philip Carns committed
717
/* service a remote RPC for a BAKE no-op */
718
719
720
721
static void bake_noop_ult(hg_handle_t handle)
{
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
722

723
    margo_respond(handle, NULL);
724
    margo_destroy(handle);
725
726
    return;
}
727
DEFINE_MARGO_RPC_HANDLER(bake_noop_ult)
728

729
730
731
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads from a BAKE region */
static void bake_read_ult(hg_handle_t handle)
732
{
733
734
735
736
737
738
    DECLARE_LOCAL_VARS(read);
    in.remote_addr_str = NULL;
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
739

740
    memset(&out, 0, sizeof(out));
741
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
742
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
743
744
745
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
746
    }
Philip Carns's avatar
Philip Carns committed
747
    if (hret != HG_SUCCESS) {
Matthieu Dorier's avatar
Matthieu Dorier committed
748
        out.ret = BAKE_ERR_MERCURY;
749
        goto finish;
750
    }
751

Philip Carns's avatar
Philip Carns committed
752
753
754
    out.ret = target->backend->_read_bulk(
        target->context, in.rid, in.region_offset, in.bulk_size, in.bulk_handle,
        src_addr, in.bulk_offset, &out.size);
755

756
757
758
759
760
761
finish:
    UNLOCK_PROVIDER;
    margo_addr_free(mid, src_addr);
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_read_ult)
762

Philip Carns's avatar
Philip Carns committed
763
764
/* service a remote RPC that reads from a BAKE region and eagerly sends
 * response */
765
766
767
768
769
770
771
static void bake_eager_read_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(eager_read);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
772

773
    free_fn free_data = NULL;
Philip Carns's avatar
Philip Carns committed
774
775
776
    out.ret           = target->backend->_read_raw(
        target->context, in.rid, in.region_offset, in.size, (void**)&out.buffer,
        &out.size, &free_data);
777

778
finish:
779
780
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
Philip Carns's avatar
Philip Carns committed
781
    if (free_data) free_data(out.buffer);
782
}
783
DEFINE_MARGO_RPC_HANDLER(bake_eager_read_ult)
784

Philip Carns's avatar
Philip Carns committed
785
/* service a remote RPC that probes for a BAKE target id */
786
static void bake_probe_ult(hg_handle_t handle)
787
{
788
    bake_probe_out_t out;
789
790
791

    memset(&out, 0, sizeof(out));

792
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
793
    assert(mid);
Philip Carns's avatar
Philip Carns committed
794
795
796
    const struct hg_info* hgi      = margo_get_info(handle);
    bake_provider_t       provider = margo_registered_data(mid, hgi->id);
    if (!provider) {
Matthieu Dorier's avatar
Matthieu Dorier committed
797
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
798
799
800
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
801
802
    }

803
804
805
806
    uint64_t targets_count;
    bake_provider_count_storage_targets(provider, &targets_count);
    bake_target_id_t targets[targets_count];
    bake_provider_list_storage_targets(provider, targets);
807

Philip Carns's avatar
Philip Carns committed
808
809
    out.ret         = BAKE_SUCCESS;
    out.targets     = targets;
810
    out.num_targets = targets_count;
Philip Carns's avatar
Philip Carns committed
811

812
    margo_respond(handle, &out);
813

814
    margo_destroy(handle);
815
}
816
DEFINE_MARGO_RPC_HANDLER(bake_probe_ult)
817

Shane Snyder's avatar
Shane Snyder committed
818
819
static void bake_remove_ult(hg_handle_t handle)
{
820
821
822
823
824
    DECLARE_LOCAL_VARS(remove);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
Shane Snyder's avatar
Shane Snyder committed
825

Philip Carns's avatar
Philip Carns committed
826
    out.ret = target->backend->_remove(target->context, in.rid);
827
finish:
828
829
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
Shane Snyder's avatar
Shane Snyder committed
830
831
832
}
DEFINE_MARGO_RPC_HANDLER(bake_remove_ult)

833
static void bake_migrate_region_ult(hg_handle_t handle)
834
{
835
    DECLARE_LOCAL_VARS(migrate_region);
836
    in.dest_addr = NULL;
837
838
839
840
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
841

842
    memset(&out, 0, sizeof(out));
843

844
    out.ret = target->backend->_migrate_region(
Philip Carns's avatar
Philip Carns committed
845
846
        target->context, in.source_rid, in.region_size, in.remove_src,
        in.dest_addr, in.dest_provider_id, in.dest_target_id, &out.dest_rid);
Matthieu Dorier's avatar
Matthieu Dorier committed
847

848
finish:
849
850
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
851
}
852
DEFINE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
853

854
855
static void bake_migrate_target_ult(hg_handle_t handle)
{
856
#ifdef USE_REMI
857
858
    DECLARE_LOCAL_VARS(migrate_target);
    int ret;
859
    in.dest_remi_addr = NULL;
Philip Carns's avatar
Philip Carns committed
860
    in.dest_root      = NULL;
861
862
    FIND_PROVIDER;
    GET_RPC_INPUT;
863
864
865
866
    hg_addr_t dest_addr = HG_ADDR_NULL;

    memset(&out, 0, sizeof(out));

Philip Carns's avatar
Philip Carns committed
867
868
    remi_provider_handle_t remi_ph       = REMI_PROVIDER_HANDLE_NULL;
    remi_fileset_t         local_fileset = REMI_FILESET_NULL;
869
    /* lock provider */
870
    lock = provider->lock;
871
    ABT_rwlock_wrlock(lock);
872

873
    FIND_TARGET;
874

875
876
    /* lookup the address of the destination REMI provider */
    hret = margo_addr_lookup(mid, in.dest_remi_addr, &dest_addr);
Philip Carns's avatar
Philip Carns committed
877
    if (hret != HG_SUCCESS) {
878
879
880
881
882
        out.ret = BAKE_ERR_MERCURY;
        goto finish;
    }

    /* use the REMI client to create a REMI provider handle */
Philip Carns's avatar
Philip Carns committed
883
884
885
    ret = remi_provider_handle_create(provider->remi_client, dest_addr,
                                      in.dest_remi_provider_id, &remi_ph);
    if (ret != REMI_SUCCESS) {
886
887
888
889
        out.ret = BAKE_ERR_REMI;
        goto finish;
    }

890
    /* ask the backend to fill the fileset */
Philip Carns's avatar
Philip Carns committed
891
892
893
    out.ret = target->backend->_create_fileset(target->context, &local_fileset);
    if (out.ret != BAKE_SUCCESS) { goto finish; }
    if (local_fileset == NULL) {
894
        out.ret = BAKE_ERR_OP_UNSUPPORTED;
895
896
        goto finish;
    }
897

Philip Carns's avatar
Philip Carns committed
898
899
    remi_fileset_register_metadata(local_fileset, "backend",
                                   target->backend->name);