bake-server.c 42.9 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1
2
/*
 * (C) 2015 The University of Chicago
Philip Carns's avatar
Philip Carns committed
3
 *
Philip Carns's avatar
Philip Carns committed
4
5
6
 * See COPYRIGHT in top-level directory.
 */

7
8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11
12
#include <unistd.h>
#include <fcntl.h>
13
14
#include <margo.h>
#include <margo-bulk-pool.h>
15
#include <json-c/json.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#ifdef USE_REMI
Philip Carns's avatar
Philip Carns committed
17
18
    #include <remi/remi-client.h>
    #include <remi/remi-server.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
19
#endif
20
#include "bake-server.h"
21
#include "uthash.h"
22
#include "bake-rpc.h"
23
#include "bake-timing.h"
24
#include "bake-provider.h"
25
#include "bake-macros.h"
26
27

extern bake_backend g_bake_pmem_backend;
Philip Carns's avatar
Philip Carns committed
28
extern bake_backend g_bake_file_backend;
Philip Carns's avatar
Philip Carns committed
29

30
31
32
33
34
35
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
36
DECLARE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
37
38
39
40
41
42
43
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
44
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
45
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
46

Philip Carns's avatar
Philip Carns committed
47
48
49
50
51
52
/**
 * Validates the format of the configuration and fills default values
 * if they are not provided
 */
static int validate_and_complete_config(struct json_object* _config,
                                        ABT_pool            _progress_pool);
53
54
static int configure_targets(bake_provider_t     provider,
                             struct json_object* _config);
55
static int setup_poolset(bake_provider_t provider);
Philip Carns's avatar
Philip Carns committed
56

Philip Carns's avatar
Philip Carns committed
57
58
static bake_target_t* find_target_entry(bake_provider_t  provider,
                                        bake_target_id_t target_id)
59
60
{
    bake_target_t* entry = NULL;
Philip Carns's avatar
Philip Carns committed
61
62
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t),
              entry);
63
64
65
    return entry;
}

Philip Carns's avatar
Philip Carns committed
66
static void bake_server_finalize_cb(void* data);
Philip Carns's avatar
Philip Carns committed
67

Matthieu Dorier's avatar
Matthieu Dorier committed
68
#ifdef USE_REMI
Philip Carns's avatar
Philip Carns committed
69
70
static int bake_target_post_migration_callback(remi_fileset_t fileset,
                                               void*          provider);
Matthieu Dorier's avatar
Matthieu Dorier committed
71
#endif
72

73
74
75
76
int bake_provider_register(margo_instance_id                     mid,
                           uint16_t                              provider_id,
                           const struct bake_provider_init_info* uargs,
                           bake_provider_t*                      provider)
77
{
78
79
    struct bake_provider_init_info args         = *uargs;
    bake_provider*                 tmp_provider = NULL;
80
    int                            ret;
81
82
    struct json_object*            config                   = NULL;
    int                            configuring_targets_flag = 0;
83

84
    /* check if a provider with the same provider id already exists */
85
    {
Philip Carns's avatar
Philip Carns committed
86
        hg_id_t   id;
87
        hg_bool_t flag;
Philip Carns's avatar
Philip Carns committed
88
89
90
        margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id,
                                       &flag);
        if (flag == HG_TRUE) {
Philip Carns's avatar
Philip Carns committed
91
92
93
94
95
            BAKE_ERROR(
                mid,
                "bake_provider_register(): a bake provider with the same "
                "id (%d) already exists",
                provider_id);
96
97
            ret = BAKE_ERR_MERCURY;
            goto error;
98
        }
99
    }
100

Philip Carns's avatar
Philip Carns committed
101
102
103
104
105
106
107
108
109
    if (args.json_config) {
        /* read JSON config from provided string argument */
        struct json_tokener*    tokener = json_tokener_new();
        enum json_tokener_error jerr;

        config = json_tokener_parse_ex(tokener, args.json_config,
                                       strlen(args.json_config));
        if (!config) {
            jerr = json_tokener_get_error(tokener);
Philip Carns's avatar
Philip Carns committed
110
111
            BAKE_ERROR(mid, "JSON parse error: %s",
                       json_tokener_error_desc(jerr));
Philip Carns's avatar
Philip Carns committed
112
            json_tokener_free(tokener);
113
114
            ret = BAKE_ERR_INVALID_ARG;
            goto error;
Philip Carns's avatar
Philip Carns committed
115
116
117
118
119
120
121
122
123
124
        }
        json_tokener_free(tokener);
    } else {
        /* create default JSON config */
        config = json_object_new_object();
    }

    /* validate and complete configuration */
    ret = validate_and_complete_config(config, args.rpc_pool);
    if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
125
        BAKE_ERROR(mid, "could not validate and complete configuration");
126
127
        ret = BAKE_ERR_INVALID_ARG;
        goto error;
Philip Carns's avatar
Philip Carns committed
128
129
    }

Philip Carns's avatar
Philip Carns committed
130
131
    /* allocate the resulting structure */
    tmp_provider = calloc(1, sizeof(*tmp_provider));
132
133
134
135
    if (!tmp_provider) {
        ret = BAKE_ERR_NOMEM;
        goto error;
    }
Philip Carns's avatar
Philip Carns committed
136

137
138
    tmp_provider->json_cfg = config;

139
    tmp_provider->mid = mid;
140
141
    if (args.rpc_pool != NULL)
        tmp_provider->handler_pool = args.rpc_pool;
142
143
144
145
    else {
        margo_get_handler_pool(mid, &(tmp_provider->handler_pool));
    }

Philip Carns's avatar
Philip Carns committed
146
    /* create buffer poolset if needed for config */
147
148
    ret = setup_poolset(tmp_provider);
    if (ret != 0) {
Philip Carns's avatar
Philip Carns committed
149
        BAKE_ERROR(mid, "could not create poolset for pipelining");
150
        goto error;
151
152
    }

153
    /* Create rwlock */
154
    ret = ABT_rwlock_create(&(tmp_provider->lock));
Philip Carns's avatar
Philip Carns committed
155
    if (ret != ABT_SUCCESS) {
156
157
        ret = BAKE_ERR_ARGOBOTS;
        goto error;
158
159
    }

160
    /* register RPCs */
161
    hg_id_t rpc_id;
Philip Carns's avatar
Philip Carns committed
162
163
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc", bake_create_in_t,
                                     bake_create_out_t, bake_create_ult,
164
                                     provider_id, tmp_provider->handler_pool);
165
166
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_create_id = rpc_id;
167

Philip Carns's avatar
Philip Carns committed
168
169
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc", bake_write_in_t,
                                     bake_write_out_t, bake_write_ult,
170
                                     provider_id, tmp_provider->handler_pool);
171
172
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_write_id = rpc_id;
173

Philip Carns's avatar
Philip Carns committed
174
175
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_eager_write_rpc", bake_eager_write_in_t,
176
177
        bake_eager_write_out_t, bake_eager_write_ult, provider_id,
        tmp_provider->handler_pool);
178
179
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_write_id = rpc_id;
180

Philip Carns's avatar
Philip Carns committed
181
182
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_eager_read_rpc", bake_eager_read_in_t, bake_eager_read_out_t,
183
        bake_eager_read_ult, provider_id, tmp_provider->handler_pool);
184
185
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_read_id = rpc_id;
186

Philip Carns's avatar
Philip Carns committed
187
188
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc", bake_persist_in_t,
                                     bake_persist_out_t, bake_persist_ult,
189
                                     provider_id, tmp_provider->handler_pool);
190
191
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_persist_id = rpc_id;
192

Philip Carns's avatar
Philip Carns committed
193
194
195
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_create_write_persist_rpc", bake_create_write_persist_in_t,
        bake_create_write_persist_out_t, bake_create_write_persist_ult,
196
        provider_id, tmp_provider->handler_pool);
197
198
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_create_write_persist_id = rpc_id;
199

200
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_create_write_persist_rpc",
Philip Carns's avatar
Philip Carns committed
201
202
203
                                     bake_eager_create_write_persist_in_t,
                                     bake_eager_create_write_persist_out_t,
                                     bake_eager_create_write_persist_ult,
204
                                     provider_id, tmp_provider->handler_pool);
205
206
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_eager_create_write_persist_id = rpc_id;
207

208
209
210
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t,
        bake_get_size_ult, provider_id, tmp_provider->handler_pool);
211
212
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_get_size_id = rpc_id;
213

214
215
216
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_get_data_rpc", bake_get_data_in_t, bake_get_data_out_t,
        bake_get_data_ult, provider_id, tmp_provider->handler_pool);
217
218
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_get_data_id = rpc_id;
219

Philip Carns's avatar
Philip Carns committed
220
221
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc", bake_read_in_t,
                                     bake_read_out_t, bake_read_ult,
222
                                     provider_id, tmp_provider->handler_pool);
223
224
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_read_id = rpc_id;
225

Philip Carns's avatar
Philip Carns committed
226
227
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc", bake_probe_in_t,
                                     bake_probe_out_t, bake_probe_ult,
228
                                     provider_id, tmp_provider->handler_pool);
229
230
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_probe_id = rpc_id;
231

Philip Carns's avatar
Philip Carns committed
232
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc", void, void,
233
234
                                     bake_noop_ult, provider_id,
                                     tmp_provider->handler_pool);
235
236
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_noop_id = rpc_id;
237

Philip Carns's avatar
Philip Carns committed
238
239
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_remove_rpc", bake_remove_in_t,
                                     bake_remove_out_t, bake_remove_ult,
240
                                     provider_id, tmp_provider->handler_pool);
241
242
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_remove_id = rpc_id;
243

Philip Carns's avatar
Philip Carns committed
244
245
246
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_migrate_region_rpc", bake_migrate_region_in_t,
        bake_migrate_region_out_t, bake_migrate_region_ult, provider_id,
247
        tmp_provider->handler_pool);
248
249
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_migrate_region_id = rpc_id;
250

Philip Carns's avatar
Philip Carns committed
251
252
253
    rpc_id = MARGO_REGISTER_PROVIDER(
        mid, "bake_migrate_target_rpc", bake_migrate_target_in_t,
        bake_migrate_target_out_t, bake_migrate_target_ult, provider_id,
254
        tmp_provider->handler_pool);
255
256
    margo_register_data(mid, rpc_id, (void*)tmp_provider, NULL);
    tmp_provider->rpc_migrate_target_id = rpc_id;
257

Matthieu Dorier's avatar
Matthieu Dorier committed
258
259
260
    /* get a client-side version of the bake_create_write_persist RPC */
    hg_bool_t flag;
    margo_registered_name(mid, "bake_create_write_persist_rpc", &rpc_id, &flag);
Philip Carns's avatar
Philip Carns committed
261
    if (flag) {
262
        tmp_provider->bake_create_write_persist_id = rpc_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
263
    } else {
Philip Carns's avatar
Philip Carns committed
264
265
266
267
        tmp_provider->bake_create_write_persist_id
            = MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                             bake_create_write_persist_in_t,
                             bake_create_write_persist_out_t, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
268
269
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
270
#ifdef USE_REMI
271
    /* register a REMI client */
272
    // TODO actually use an ABT-IO instance
Philip Carns's avatar
Philip Carns committed
273
274
275
    ret = remi_client_init(mid, ABT_IO_INSTANCE_NULL,
                           &(tmp_provider->remi_client));
    if (ret != REMI_SUCCESS) {
276
277
        ret = BAKE_ERR_REMI;
        goto error;
278
    }
279
280

    /* register a REMI provider */
Matthieu Dorier's avatar
Matthieu Dorier committed
281
    {
Philip Carns's avatar
Philip Carns committed
282
        int             flag;
Matthieu Dorier's avatar
Matthieu Dorier committed
283
284
        remi_provider_t remi_provider;
        /* check if a REMI provider exists with the same provider id */
Philip Carns's avatar
Philip Carns committed
285
286
287
288
        remi_provider_registered(mid, provider_id, &flag, NULL, NULL,
                                 &remi_provider);
        if (flag) { /* REMI provider exists */
            tmp_provider->remi_provider      = remi_provider;
289
            tmp_provider->owns_remi_provider = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
290
291
        } else { /* REMI provider does not exist */
            // TODO actually use an ABT-IO instance
Philip Carns's avatar
Philip Carns committed
292
            ret = remi_provider_register(mid, ABT_IO_INSTANCE_NULL, provider_id,
293
                                         tmp_provider->handler_pool,
Philip Carns's avatar
Philip Carns committed
294
295
                                         &(tmp_provider->remi_provider));
            if (ret != REMI_SUCCESS) {
296
297
                ret = BAKE_ERR_REMI;
                goto error;
Matthieu Dorier's avatar
Matthieu Dorier committed
298
            }
299
            tmp_provider->owns_remi_provider = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
300
        }
Philip Carns's avatar
Philip Carns committed
301
302
303
304
        ret = remi_provider_register_migration_class(
            tmp_provider->remi_provider, "bake", NULL,
            bake_target_post_migration_callback, NULL, tmp_provider);
        if (ret != REMI_SUCCESS) {
305
306
            ret = BAKE_ERR_REMI;
            goto error;
Matthieu Dorier's avatar
Matthieu Dorier committed
307
        }
308
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
309
#endif
310

311
    /* Did the config include targets that we need to attach or create? */
312
313
    configuring_targets_flag = 1;
    ret                      = configure_targets(tmp_provider, config);
314
315
316
317
318
    if (ret < 0) {
        ret = BAKE_ERR_INVALID_ARG;
        goto error;
    }

Philip Carns's avatar
Philip Carns committed
319
320
    tmp_provider->json_cfg = config;

321
    /* install the bake server finalize callback */
Philip Carns's avatar
Philip Carns committed
322
323
    margo_provider_push_finalize_callback(
        mid, tmp_provider, &bake_server_finalize_cb, tmp_provider);
324

Philip Carns's avatar
Philip Carns committed
325
    if (provider != BAKE_PROVIDER_IGNORE) *provider = tmp_provider;
326

Matthieu Dorier's avatar
Matthieu Dorier committed
327
    return BAKE_SUCCESS;
328
329
330

error:

331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
    if (configuring_targets_flag) {
        /* we might have auto attached targets that need to be detached now */
        bake_provider_detach_all_targets(tmp_provider);
    }

    if (tmp_provider && tmp_provider->rpc_create_id) {
        margo_deregister(mid, tmp_provider->rpc_create_id);
        margo_deregister(mid, tmp_provider->rpc_write_id);
        margo_deregister(mid, tmp_provider->rpc_eager_write_id);
        margo_deregister(mid, tmp_provider->rpc_persist_id);
        margo_deregister(mid, tmp_provider->rpc_create_write_persist_id);
        margo_deregister(mid, tmp_provider->rpc_eager_create_write_persist_id);
        margo_deregister(mid, tmp_provider->rpc_get_size_id);
        margo_deregister(mid, tmp_provider->rpc_get_data_id);
        margo_deregister(mid, tmp_provider->rpc_read_id);
        margo_deregister(mid, tmp_provider->rpc_eager_read_id);
        margo_deregister(mid, tmp_provider->rpc_probe_id);
        margo_deregister(mid, tmp_provider->rpc_noop_id);
        margo_deregister(mid, tmp_provider->rpc_remove_id);
        margo_deregister(mid, tmp_provider->rpc_migrate_region_id);
        margo_deregister(mid, tmp_provider->rpc_migrate_target_id);
    }

#ifdef USE_REMI
    if (tmp_provider && tmp_provider->remi_client) {
        remi_client_finalize(tmp_provider->remi_client);
        if (tmp_provider->owns_remi_provider) {
            remi_provider_destroy(tmp_provider->remi_provider);
        }
    }
#endif

    if (config) json_object_put(config);

    if (tmp_provider) {
        if (tmp_provider->poolset)
            margo_bulk_poolset_destroy(tmp_provider->poolset);
        if (tmp_provider->lock) ABT_rwlock_free(&(tmp_provider->lock));
        free(tmp_provider);
    }

372
    return (ret);
373
374
}

375
int bake_provider_deregister(bake_provider_t provider)
376
377
378
379
380
381
{
    margo_provider_pop_finalize_callback(provider->mid, provider);
    bake_server_finalize_cb(provider);
    return BAKE_SUCCESS;
}

Philip Carns's avatar
Philip Carns committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
int bake_provider_create_target(bake_provider_t   provider,
                                const char*       target_name,
                                size_t            size,
                                bake_target_id_t* target_id)
{
    int ret;

    /* create the actual target */
    ret = bake_create_raw_target(target_name, size);
    if (ret < 0) return (ret);

    /* begin managing it */
    ret = bake_provider_attach_target(provider, target_name, target_id);

    return (ret);
}

Philip Carns's avatar
Philip Carns committed
399
400
401
int bake_provider_attach_target(bake_provider_t   provider,
                                const char*       target_name,
                                bake_target_id_t* target_id)
402
{
Philip Carns's avatar
Philip Carns committed
403
404
    int               ret = BAKE_SUCCESS;
    bake_target_id_t  tid;
405
406
407
408
    backend_context_t ctx = NULL;

    char* backend_type = NULL;
    // figure out the backend by searching until the ":" in the target name
Philip Carns's avatar
Philip Carns committed
409
410
411
412
413
    const char* tmp = strchr(target_name, ':');
    if (tmp != NULL) {
        backend_type                                     = strdup(target_name);
        backend_type[(unsigned long)(tmp - target_name)] = '\0';
        target_name                                      = tmp + 1;
414
415
    } else {
        backend_type = strdup("pmem");
416
417
    }

418
    bake_target_t* new_entry = calloc(1, sizeof(*new_entry));
Philip Carns's avatar
Philip Carns committed
419
420

    if (strcmp(backend_type, "pmem") == 0) {
421
        new_entry->backend = &g_bake_pmem_backend;
Philip Carns's avatar
Philip Carns committed
422
    } else if (strcmp(backend_type, "file") == 0) {
Philip Carns's avatar
Philip Carns committed
423
        new_entry->backend = &g_bake_file_backend;
424
    } else {
Philip Carns's avatar
Philip Carns committed
425
        BAKE_ERROR(provider->mid, "unknown backend type \"%s\"", backend_type);
426
427
428
        free(backend_type);
        return BAKE_ERR_BACKEND_TYPE;
    }
429

430
    ret = new_entry->backend->_initialize(provider, target_name, &tid, &ctx);
Philip Carns's avatar
Philip Carns committed
431
    if (ret != 0) {
432
        free(backend_type);
433
        free(new_entry);
434
        return ret;
435
    }
Philip Carns's avatar
Philip Carns committed
436
    new_entry->context   = ctx;
437
    new_entry->target_id = tid;
438

439
440
    /* write-lock the provider */
    ABT_rwlock_wrlock(provider->lock);
441
    /* insert in the provider's hash */
Philip Carns's avatar
Philip Carns committed
442
443
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t),
             new_entry);
444
    /* check that it was inserted */
445
    bake_target_t* check_entry = NULL;
Philip Carns's avatar
Philip Carns committed
446
447
448
    HASH_FIND(hh, provider->targets, &tid, sizeof(bake_target_id_t),
              check_entry);
    if (check_entry != new_entry) {
Philip Carns's avatar
Philip Carns committed
449
450
        BAKE_ERROR(provider->mid,
                   "could not insert new pmem pool into the hash");
451
        new_entry->backend->_finalize(ctx);
452
        free(new_entry);
453
454
455
        ret = BAKE_ERR_ALLOCATION;
    } else {
        provider->num_targets += 1;
456
        *target_id = new_entry->target_id;
Philip Carns's avatar
Philip Carns committed
457
        ret        = BAKE_SUCCESS;
458
    }
459
460
    /* unlock provider */
    ABT_rwlock_unlock(provider->lock);
461
    free(backend_type);
462
    return ret;
463
464
}

Philip Carns's avatar
Philip Carns committed
465
466
int bake_provider_detach_target(bake_provider_t  provider,
                                bake_target_id_t target_id)
467
{
468
469
    int ret;
    ABT_rwlock_wrlock(provider->lock);
470
    bake_target_t* entry = NULL;
Philip Carns's avatar
Philip Carns committed
471
472
473
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t),
              entry);
    if (!entry) {
474
475
476
        ret = BAKE_ERR_UNKNOWN_TARGET;
    } else {
        HASH_DEL(provider->targets, entry);
477
        entry->backend->_finalize(entry->context);
478
479
480
481
482
        free(entry);
        ret = BAKE_SUCCESS;
    }
    ABT_rwlock_unlock(provider->lock);
    return ret;
483
484
}

Philip Carns's avatar
Philip Carns committed
485
int bake_provider_detach_all_targets(bake_provider_t provider)
486
{
487
    ABT_rwlock_wrlock(provider->lock);
488
    bake_target_t *p, *tmp;
Philip Carns's avatar
Philip Carns committed
489
490
    HASH_ITER(hh, provider->targets, p, tmp)
    {
491
        HASH_DEL(provider->targets, p);
492
        p->backend->_finalize(p->context);
493
494
        free(p);
    }
495
    provider->num_targets = 0;
496
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
497
    return BAKE_SUCCESS;
498
499
}

Philip Carns's avatar
Philip Carns committed
500
int bake_provider_count_targets(bake_provider_t provider, uint64_t* num_targets)
501
{
502
    ABT_rwlock_rdlock(provider->lock);
503
    *num_targets = provider->num_targets;
504
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
505
    return BAKE_SUCCESS;
506
507
}

Philip Carns's avatar
Philip Carns committed
508
509
int bake_provider_list_targets(bake_provider_t   provider,
                               bake_target_id_t* targets)
510
{
511
    ABT_rwlock_rdlock(provider->lock);
512
    bake_target_t *p, *tmp;
Philip Carns's avatar
Philip Carns committed
513
514
515
    uint64_t       i = 0;
    HASH_ITER(hh, provider->targets, p, tmp)
    {
516
517
518
        targets[i] = p->target_id;
        i += 1;
    }
519
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
520
    return BAKE_SUCCESS;
521
522
}

Philip Carns's avatar
Philip Carns committed
523
524
525
526
527
528
529
530
531
#define DECLARE_LOCAL_VARS(rpc_name)                    \
    margo_instance_id       mid = MARGO_INSTANCE_NULL;  \
    bake_##rpc_name##_out_t out = {0};                  \
    bake_##rpc_name##_in_t  in;                         \
    hg_return_t             hret;                       \
    ABT_rwlock              lock     = ABT_RWLOCK_NULL; \
    const struct hg_info*   info     = NULL;            \
    bake_provider_t         provider = NULL;            \
    bake_target_t*          target   = NULL
532
533
534
535
536

#define FIND_PROVIDER                                    \
    do {                                                 \
        mid = margo_hg_handle_get_instance(handle);      \
        assert(mid);                                     \
Philip Carns's avatar
Philip Carns committed
537
        info     = margo_get_info(handle);               \
538
        provider = margo_registered_data(mid, info->id); \
Philip Carns's avatar
Philip Carns committed
539
        if (!provider) {                                 \
540
541
            out.ret = BAKE_ERR_UNKNOWN_PROVIDER;         \
            goto finish;                                 \
Philip Carns's avatar
Philip Carns committed
542
543
        }                                                \
    } while (0)
544
545
546
547

#define GET_RPC_INPUT                        \
    do {                                     \
        hret = margo_get_input(handle, &in); \
Philip Carns's avatar
Philip Carns committed
548
        if (hret != HG_SUCCESS) {            \
549
550
551
            out.ret = BAKE_ERR_MERCURY;      \
            goto finish;                     \
        }                                    \
Philip Carns's avatar
Philip Carns committed
552
    } while (0)
553
554
555
556
557

#define LOCK_PROVIDER            \
    do {                         \
        lock = provider->lock;   \
        ABT_rwlock_rdlock(lock); \
Philip Carns's avatar
Philip Carns committed
558
    } while (0)
559
560
561
562

#define FIND_TARGET                                   \
    do {                                              \
        target = find_target_entry(provider, in.bti); \
Philip Carns's avatar
Philip Carns committed
563
        if (target == NULL) {                         \
564
565
566
            out.ret = BAKE_ERR_UNKNOWN_TARGET;        \
            goto finish;                              \
        }                                             \
Philip Carns's avatar
Philip Carns committed
567
    } while (0)
568

Philip Carns's avatar
Philip Carns committed
569
570
571
572
#define UNLOCK_PROVIDER                                       \
    do {                                                      \
        if (lock != ABT_RWLOCK_NULL) ABT_rwlock_unlock(lock); \
    } while (0)
573

Philip Carns's avatar
Philip Carns committed
574
575
576
577
578
579
#define RESPOND_AND_CLEANUP            \
    do {                               \
        margo_respond(handle, &out);   \
        margo_free_input(handle, &in); \
        margo_destroy(handle);         \
    } while (0)
580

581
/* service a remote RPC that creates a BAKE region */
582
static void bake_create_ult(hg_handle_t handle)
583
{
584
585
586
587
588
    DECLARE_LOCAL_VARS(create);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
589

590
    memset(&out, 0, sizeof(out));
Philip Carns's avatar
Philip Carns committed
591
592
    out.ret
        = target->backend->_create(target->context, in.region_size, &out.rid);
593

594
595
596
597
598
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
599

600
601
602
603
604
605
606
607
/* service a remote RPC that writes to a BAKE region */
static void bake_write_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(write);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
608

609
610
    memset(&out, 0, sizeof(out));
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
611
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
612
613
614
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
615
    }
Philip Carns's avatar
Philip Carns committed
616
    if (hret != HG_SUCCESS) {
617
        out.ret = BAKE_ERR_MERCURY;
618
        goto finish;
619
    }
620

Philip Carns's avatar
Philip Carns committed
621
622
623
    out.ret = target->backend->_write_bulk(
        target->context, in.rid, in.region_offset, in.bulk_size, in.bulk_handle,
        src_addr, in.bulk_offset);
624
625

finish:
626
627
628
    UNLOCK_PROVIDER;
    margo_addr_free(mid, src_addr);
    RESPOND_AND_CLEANUP;
629
}
630
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
631

Philip Carns's avatar
Philip Carns committed
632
/* service a remote RPC that writes to a BAKE region in eager mode */
633
static void bake_eager_write_ult(hg_handle_t handle)
634
{
635
636
    DECLARE_LOCAL_VARS(eager_write);
    in.buffer = NULL;
Philip Carns's avatar
Philip Carns committed
637
    in.size   = 0;
638
639
640
641
642
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;

Philip Carns's avatar
Philip Carns committed
643
644
    out.ret = target->backend->_write_raw(target->context, in.rid,
                                          in.region_offset, in.size, in.buffer);
Matthieu Dorier's avatar
Matthieu Dorier committed
645

646
647
648
649
650
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
651

Philip Carns's avatar
Philip Carns committed
652
/* service a remote RPC that persists to a BAKE region */
653
654
655
656
657
658
659
static void bake_persist_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(persist);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
660

Philip Carns's avatar
Philip Carns committed
661
662
    out.ret = target->backend->_persist(target->context, in.rid, in.offset,
                                        in.size);
Philip Carns's avatar
Philip Carns committed
663
664

finish:
665
666
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
Philip Carns's avatar
Philip Carns committed
667
}
668
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
Philip Carns's avatar
Philip Carns committed
669

670
static void bake_create_write_persist_ult(hg_handle_t handle)
Philip Carns's avatar
Philip Carns committed
671
{
672
673
674
675
676
    DECLARE_LOCAL_VARS(create_write_persist);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
Philip Carns's avatar
Philip Carns committed
677
678
    memset(&out, 0, sizeof(out));

679
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
680
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
681
682
683
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
Philip Carns's avatar
Philip Carns committed
684
    }
Philip Carns's avatar
Philip Carns committed
685
    if (hret != HG_SUCCESS) {
Philip Carns's avatar
Philip Carns committed
686
687
688
        out.ret = BAKE_ERR_MERCURY;
        goto finish;
    }
689

Philip Carns's avatar
Philip Carns committed
690
    if (!target->backend->_create_write_persist_bulk) {
Philip Carns's avatar
Philip Carns committed
691
692
693
694
        /* If the backend does not provide a combination
         * create_write_persist function, then issue constituent backend
         * calls instead.
         */
Philip Carns's avatar
Philip Carns committed
695
696
697
698
699
700
701
702
703
704
        out.ret = target->backend->_create(target->context, in.region_size,
                                           &out.rid);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_write_bulk(target->context, out.rid, 0,
                                               in.bulk_size, in.bulk_handle,
                                               src_addr, in.bulk_offset);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_persist(target->context, out.rid, 0,
                                            in.region_size);
    } else {
Philip Carns's avatar
Philip Carns committed
705
        out.ret = target->backend->_create_write_persist_bulk(
Philip Carns's avatar
Philip Carns committed
706
707
            target->context, in.bulk_handle, src_addr, in.bulk_offset,
            in.bulk_size, &out.rid);
Philip Carns's avatar
Philip Carns committed
708
709
    }

710
finish:
711
    UNLOCK_PROVIDER;
712
    margo_addr_free(mid, src_addr);
713
    RESPOND_AND_CLEANUP;
714
715
    return;
}
716
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
717

718
static void bake_eager_create_write_persist_ult(hg_handle_t handle)
719
{
720
    DECLARE_LOCAL_VARS(eager_create_write_persist);
721
    in.buffer = NULL;
Philip Carns's avatar
Philip Carns committed
722
    in.size   = 0;
723
724
725
726
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
727
728
729

    memset(&out, 0, sizeof(out));

Philip Carns's avatar
Philip Carns committed
730
    if (!target->backend->_create_write_persist_raw) {
Philip Carns's avatar
Philip Carns committed
731
732
733
734
735
        /* If the backend does not provide a combination
         * create_write_persist function, then issue constituent backend
         * calls instead.
         */
        out.ret = target->backend->_create(target->context, in.size, &out.rid);
Philip Carns's avatar
Philip Carns committed
736
737
738
739
740
741
742
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret = target->backend->_write_raw(target->context, out.rid, 0,
                                              in.size, in.buffer);
        if (out.ret != BAKE_SUCCESS) goto finish;
        out.ret
            = target->backend->_persist(target->context, out.rid, 0, in.size);
    } else {
Philip Carns's avatar
Philip Carns committed
743
744
745
        out.ret = target->backend->_create_write_persist_raw(
            target->context, in.buffer, in.size, &out.rid);
    }
746

747
748
749
750
751
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
752

753
754
755
756
757
758
759
760
/* service a remote RPC that retrieves the size of a BAKE region */
static void bake_get_size_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(get_size);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
761

762
    memset(&out, 0, sizeof(out));
Philip Carns's avatar
Philip Carns committed
763
764
    out.ret
        = target->backend->_get_region_size(target->context, in.rid, &out.size);
765

766
767
768
769
770
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
771

Philip Carns's avatar
Philip Carns committed
772
/* Get the raw pointer of a region */
773
774
775
776
777
778
779
780
static void bake_get_data_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(get_data);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
    out.ptr = 0;
781

Philip Carns's avatar
Philip Carns committed
782
783
    out.ret = target->backend->_get_region_data(target->context, in.rid,
                                                (void**)&out.ptr);
784

785
786
787
788
789
finish:
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_data_ult)
790

Philip Carns's avatar
Philip Carns committed
791
/* service a remote RPC for a BAKE no-op */
792
793
794
795
static void bake_noop_ult(hg_handle_t handle)
{
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
796

797
    margo_respond(handle, NULL);
798
    margo_destroy(handle);
799
800
    return;
}
801
DEFINE_MARGO_RPC_HANDLER(bake_noop_ult)
802

803
804
805
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads from a BAKE region */
static void bake_read_ult(hg_handle_t handle)
806
{
807
808
809
810
811
812
    DECLARE_LOCAL_VARS(read);
    in.remote_addr_str = NULL;
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
813

814
    memset(&out, 0, sizeof(out));
815
    hg_addr_t src_addr = HG_ADDR_NULL;
Philip Carns's avatar
Philip Carns committed
816
    if (in.remote_addr_str && strlen(in.remote_addr_str)) {
817
818
819
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
    } else {
        hret = margo_addr_dup(mid, info->addr, &src_addr);
820
    }
Philip Carns's avatar
Philip Carns committed
821
    if (hret != HG_SUCCESS) {
Matthieu Dorier's avatar
Matthieu Dorier committed
822
        out.ret = BAKE_ERR_MERCURY;
823
        goto finish;
824
    }
825

Philip Carns's avatar
Philip Carns committed
826
827
828
    out.ret = target->backend->_read_bulk(
        target->context, in.rid, in.region_offset, in.bulk_size, in.bulk_handle,
        src_addr, in.bulk_offset, &out.size);
829

830
831
832
833
834
835
finish:
    UNLOCK_PROVIDER;
    margo_addr_free(mid, src_addr);
    RESPOND_AND_CLEANUP;
}
DEFINE_MARGO_RPC_HANDLER(bake_read_ult)
836

Philip Carns's avatar
Philip Carns committed
837
838
/* service a remote RPC that reads from a BAKE region and eagerly sends
 * response */
839
840
841
842
843
844
845
static void bake_eager_read_ult(hg_handle_t handle)
{
    DECLARE_LOCAL_VARS(eager_read);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
846

847
    free_fn free_data = NULL;
Philip Carns's avatar
Philip Carns committed
848
849
850
    out.ret           = target->backend->_read_raw(
        target->context, in.rid, in.region_offset, in.size, (void**)&out.buffer,
        &out.size, &free_data);
851

852
finish:
853
854
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
855
    if (free_data) free_data(target->context, out.buffer);
856
}
857
DEFINE_MARGO_RPC_HANDLER(bake_eager_read_ult)
858

Philip Carns's avatar
Philip Carns committed
859
/* service a remote RPC that probes for a BAKE target id */
860
static void bake_probe_ult(hg_handle_t handle)
861
{
862
    bake_probe_out_t out;
863
864
865

    memset(&out, 0, sizeof(out));

866
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
867
    assert(mid);
Philip Carns's avatar
Philip Carns committed
868
869
870
    const struct hg_info* hgi      = margo_get_info(handle);
    bake_provider_t       provider = margo_registered_data(mid, hgi->id);
    if (!provider) {
Matthieu Dorier's avatar
Matthieu Dorier committed
871
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
872
873
874
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
875
876
    }

877
    uint64_t targets_count;
Philip Carns's avatar
Philip Carns committed
878
    bake_provider_count_targets(provider, &targets_count);
879
    bake_target_id_t targets[targets_count];
Philip Carns's avatar
Philip Carns committed
880
    bake_provider_list_targets(provider, targets);
881

Philip Carns's avatar
Philip Carns committed
882
883
    out.ret         = BAKE_SUCCESS;
    out.targets     = targets;
884
    out.num_targets = targets_count;
Philip Carns's avatar
Philip Carns committed
885

886
    margo_respond(handle, &out);
887

888
    margo_destroy(handle);
889
}
890
DEFINE_MARGO_RPC_HANDLER(bake_probe_ult)
891

Shane Snyder's avatar
Shane Snyder committed
892
893
static void bake_remove_ult(hg_handle_t handle)
{
894
895
896
897
898
    DECLARE_LOCAL_VARS(remove);
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
Shane Snyder's avatar
Shane Snyder committed
899

Philip Carns's avatar
Philip Carns committed
900
    out.ret = target->backend->_remove(target->context, in.rid);
901
finish:
902
903
    UNLOCK_PROVIDER;
    RESPOND_AND_CLEANUP;
Shane Snyder's avatar
Shane Snyder committed
904
905
906
}
DEFINE_MARGO_RPC_HANDLER(bake_remove_ult)

907
static void bake_migrate_region_ult(hg_handle_t handle)
908
{
909
    DECLARE_LOCAL_VARS(migrate_region);
910
    in.dest_addr = NULL;
911
912
913
914
    FIND_PROVIDER;
    GET_RPC_INPUT;
    LOCK_PROVIDER;
    FIND_TARGET;
915

916
    memset(&out, 0, sizeof(out));
917

918
    out.ret = target->backend->_migrate_region(
Philip Carns's avatar
Philip Carns committed
919
920
        target->context, in.source_rid, in.region_size, in.remove_src,
        in.dest_addr, in.dest_provider_id,