margo.c 72.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
Philip Carns's avatar
Philip Carns committed
11
#include <stdlib.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
12
13

#include <margo-config.h>
14
#include <time.h>
Philip Carns's avatar
bug fix    
Philip Carns committed
15
#include <math.h>
16
17

#include "margo.h"
18
#include "margo-timer.h"
Philip Carns's avatar
Philip Carns committed
19
#include "utlist.h"
Philip Carns's avatar
Philip Carns committed
20
#include "uthash.h"
21

22
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
Shane Snyder's avatar
Shane Snyder committed
23
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
24

25
26
#define MARGO_SPARKLINE_TIMESLICE 1

27
28
29
30
31
32
33
34
35
/* If margo is initializing ABT, we need to track how many instances of margo
 * are being created, so that the last one can call ABT_finalize.
 * If margo initializes ABT, g_num_margo_instances_mtx will be created, so
 * in later calls and in margo_cleanup we can check for g_num_margo_instances_mtx != ABT_MUTEX_NULL
 * to know if we should do something to cleanup ABT as well.
 */
static int g_num_margo_instances = 0; // how many margo instances exist
static ABT_mutex g_num_margo_instances_mtx = ABT_MUTEX_NULL; // mutex for above global variable

36
/* Structure to store timing information */
Philip Carns's avatar
Philip Carns committed
37
38
struct diag_data
{
39
40
41
42
43
    /* breadcrumb stats */
    breadcrumb_stats stats;
    
    /* origin or target */
    breadcrumb_type type;
44
45

    uint64_t rpc_breadcrumb;  /* identifier for rpc and it's ancestors */
46
47
48
49
50
51
52
53
54
    struct global_breadcrumb_key key;

    /* used to combine rpc_breadcrumb, addr_hash and provider_id to create a unique key for HASH_ADD inside margo_breadcrumb_measure */
    __uint128_t x;

    /*sparkline data for breadcrumb */
    double sparkline_time[100];
    double sparkline_count[100];

55
    UT_hash_handle hh;        /* hash table link */
Philip Carns's avatar
Philip Carns committed
56
57
};

58
59
60
61
62
/* key for Argobots thread-local storage to track RPC breadcrumbs across thread
 * execution
 */
static ABT_key rpc_breadcrumb_key = ABT_KEY_NULL;

63
64
65
/* ULT-local key to hold breadcrumb timing data on the target */
static ABT_key target_timing_key = ABT_KEY_NULL;

Philip Carns's avatar
Philip Carns committed
66
67
#define __DIAG_UPDATE(__data, __time)\
do {\
68
69
70
71
    __data.stats.count++; \
    __data.stats.cumulative += (__time); \
    if((__time) > __data.stats.max) __data.stats.max = (__time); \
    if(__data.stats.min == 0 || (__time) < __data.stats.min) __data.stats.min = (__time); \
Philip Carns's avatar
Philip Carns committed
72
73
} while(0)

Shane Snyder's avatar
Shane Snyder committed
74
75
76
77
78
79
80
struct margo_handle_cache_el
{
    hg_handle_t handle;
    UT_hash_handle hh; /* in-use hash link */
    struct margo_handle_cache_el *next; /* free list link */
};

81
82
struct margo_finalize_cb
{
83
    void* owner;
84
85
86
87
88
    void(*callback)(void*);
    void* uargs;
    struct margo_finalize_cb* next;
};

89
90
struct margo_timer_list; /* defined in margo-timer.c */

91
92
93
94
95
96
97
98
99
100
101
/* Stores the name and rpc id of a registered RPC.  We track this purely for
 * debugging and instrumentation purposes
 */
struct margo_registered_rpc
{
    hg_id_t id;                            /* rpc id */
    uint64_t rpc_breadcrumb_fragment;      /* fragment id used in rpc tracing */
    char func_name[64];                    /* string name of rpc */
    struct margo_registered_rpc *next;     /* pointer to next in list */
};

102
103
struct margo_instance
{
Shane Snyder's avatar
Shane Snyder committed
104
    /* mercury/argobots state */
105
106
    hg_context_t *hg_context;
    hg_class_t *hg_class;
107
108
109
    ABT_pool handler_pool;
    ABT_pool progress_pool;

110
    /* internal to margo for this particular instance */
Shane Snyder's avatar
Shane Snyder committed
111
    int margo_init;
112
    ABT_thread hg_progress_tid;
113
    ABT_thread sparkline_data_collection_tid;
114
    int hg_progress_shutdown_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
115
    ABT_xstream progress_xstream;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
117
118
    int owns_progress_pool;
    ABT_xstream *rpc_xstreams;
    int num_handler_pool_threads;
119
    unsigned int hg_progress_timeout_ub;
120
    uint16_t num_registered_rpcs; 	   /* number of registered rpc's by all providers on this instance */
121

122
123
124
    /* list of rpcs registered on this instance for debugging and profiling purposes */
    struct margo_registered_rpc *registered_rpcs;

125
126
    /* control logic for callers waiting on margo to be finalized */
    int finalize_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
127
    int refcount;
128
129
    ABT_mutex finalize_mutex;
    ABT_cond finalize_cond;
130
    struct margo_finalize_cb* finalize_cb;
Matthieu Dorier's avatar
Matthieu Dorier committed
131
    struct margo_finalize_cb* prefinalize_cb;
132

133
134
135
136
137
138
    /* control logic to prevent margo_finalize from destroying
       the instance when some operations are pending */
    unsigned pending_operations;
    ABT_mutex pending_operations_mtx;
    int finalize_requested;

Matthieu Dorier's avatar
Matthieu Dorier committed
139
140
141
142
    /* control logic for shutting down */
    hg_id_t shutdown_rpc_id;
    int enable_remote_shutdown;

143
144
    /* timer data */
    struct margo_timer_list* timer_list;
Shane Snyder's avatar
Shane Snyder committed
145
146
147
    /* linked list of free hg handles and a hash of in-use handles */
    struct margo_handle_cache_el *free_handle_list;
    struct margo_handle_cache_el *used_handle_hash;
148
    ABT_mutex handle_cache_mtx; /* mutex protecting access to above caches */
Shane Snyder's avatar
Shane Snyder committed
149

Philip Carns's avatar
Philip Carns committed
150
151
152
153
154
155
156
    /* optional diagnostics data tracking */
    /* NOTE: technically the following fields are subject to races if they
     * are updated from more than one thread at a time.  We will be careful
     * to only update the counters from the progress_fn,
     * which will serialize access.
     */
    int diag_enabled;
157
158
159
    unsigned int profile_enabled;
    double previous_sparkline_data_collection_time;
    uint16_t sparkline_index;
Philip Carns's avatar
Philip Carns committed
160
161
162
163
    struct diag_data diag_trigger_elapsed;
    struct diag_data diag_progress_elapsed_zero_timeout;
    struct diag_data diag_progress_elapsed_nonzero_timeout;
    struct diag_data diag_progress_timeout_value;
164
165
    struct diag_data *diag_rpc;
    ABT_mutex diag_rpc_mutex;
166
167
};

168
169
170
171
struct margo_request_struct {
    ABT_eventual eventual;
    margo_timer_t* timer;
    hg_handle_t handle;
172
173
    double start_time;       /* timestamp of when the operation started */
    uint64_t rpc_breadcrumb; /* statistics tracking identifier, if applicable */
174
175
    uint64_t server_addr_hash; /* hash of globally unique string addr of margo server instance */
    uint16_t provider_id; /* id of the provider servicing the request, local to the margo server instance */
176
177
};

178
179
180
struct margo_rpc_data
{
	margo_instance_id mid;
181
    ABT_pool pool;
182
183
184
	void* user_data;
	void (*user_free_callback)(void *);
};
185

Matthieu Dorier's avatar
Matthieu Dorier committed
186
187
MERCURY_GEN_PROC(margo_shutdown_out_t, ((int32_t)(ret)))

188
static void hg_progress_fn(void* foo);
189
190
static void sparkline_data_collection_fn(void* foo);

191
static void margo_rpc_data_free(void* ptr);
192
static uint64_t margo_breadcrumb_set(hg_id_t rpc_id);
193
static void margo_breadcrumb_measure(margo_instance_id mid, uint64_t rpc_breadcrumb, double start, breadcrumb_type type, uint16_t provider_id, uint64_t hash, hg_handle_t h);
Matthieu Dorier's avatar
Matthieu Dorier committed
194
195
static void remote_shutdown_ult(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
196

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
static inline void demux_id(hg_id_t in, hg_id_t* base_id, uint16_t *provider_id)
{
    /* retrieve low bits for provider */
    *provider_id = 0;
    *provider_id += (in & (((1<<(__MARGO_PROVIDER_ID_SIZE*8))-1)));

    /* clear low order bits */
    *base_id = (in >> (__MARGO_PROVIDER_ID_SIZE*8)) <<
        (__MARGO_PROVIDER_ID_SIZE*8);

    return;
}

static inline hg_id_t mux_id(hg_id_t base_id, uint16_t provider_id)
{
    hg_id_t id;

    id = (base_id >> (__MARGO_PROVIDER_ID_SIZE*8)) <<
       (__MARGO_PROVIDER_ID_SIZE*8);
    id |= provider_id;

    return id;
}

static inline hg_id_t gen_id(const char* func_name, uint16_t provider_id)
{
    hg_id_t id;
    unsigned hashval;

    HASH_JEN(func_name, strlen(func_name), hashval);
    id = hashval << (__MARGO_PROVIDER_ID_SIZE*8);
    id |= provider_id;

    return id;
}

Shane Snyder's avatar
Shane Snyder committed
233
234
235
236
237
238
static hg_return_t margo_handle_cache_init(margo_instance_id mid);
static void margo_handle_cache_destroy(margo_instance_id mid);
static hg_return_t margo_handle_cache_get(margo_instance_id mid,
    hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid,
    hg_handle_t handle);
239
static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id,
240
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, ABT_pool pool);
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
static void set_argobots_tunables(void);

/* Set tunable parameters in Argobots to be more friendly to typical Margo
 * use cases.  No return value, this is a best-effort advisory function. It
 * also will (where possible) defer to any pre-existing explicit environment
 * variable settings.  We only override if the user has not specified yet.
 */
static void set_argobots_tunables(void)
{

    /* Rationale: Margo is very likely to create a single producer (the
     * progress function), multiple consumer usage pattern that
     * causes excess memory consumption in some versions of
     * Argobots.  See
     * https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
     * We therefore set the ABT_MEM_MAX_NUM_STACKS parameter 
     * for Argobots to a low value so that RPC handler threads do not
     * queue large numbers of stacks for reuse in per-ES data 
     * structures.
     */
    if(!getenv("ABT_MEM_MAX_NUM_STACKS"))
        putenv("ABT_MEM_MAX_NUM_STACKS=8");

    /* Rationale: the default stack size in Argobots (as of February 2019)
     * is 16K, but this is likely to be too small for Margo as it traverses
     * a Mercury -> communications library call stack, and the potential 
     * stack corruptions are very hard to debug.  We therefore pick a much
     * higher default stack size.  See this mailing list thread for
     * discussion:
     * https://lists.argobots.org/pipermail/discuss/2019-February/000039.html
     */
    if(!getenv("ABT_THREAD_STACKSIZE"))
273
        putenv("ABT_THREAD_STACKSIZE=2097152");
274
275
276

    return;
}
Shane Snyder's avatar
Shane Snyder committed
277

278
margo_instance_id margo_init_opt(const char *addr_str, int mode, const struct hg_init_info *hg_init_info,
Shane Snyder's avatar
Shane Snyder committed
279
    int use_progress_thread, int rpc_thread_count)
280
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
281
282
    ABT_xstream progress_xstream = ABT_XSTREAM_NULL;
    ABT_pool progress_pool = ABT_POOL_NULL;
283
284
285
    ABT_sched progress_sched;
    ABT_sched self_sched;
    ABT_xstream self_xstream;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
286
    ABT_xstream *rpc_xstreams = NULL;
287
    ABT_sched *rpc_scheds = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
288
289
    ABT_xstream rpc_xstream = ABT_XSTREAM_NULL;
    ABT_pool rpc_pool = ABT_POOL_NULL;
Shane Snyder's avatar
Shane Snyder committed
290
291
    hg_class_t *hg_class = NULL;
    hg_context_t *hg_context = NULL;
Shane Snyder's avatar
Shane Snyder committed
292
    int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
293
    int i;
Shane Snyder's avatar
Shane Snyder committed
294
295
296
    int ret;
    struct margo_instance *mid = MARGO_INSTANCE_NULL;

Shane Snyder's avatar
Shane Snyder committed
297
    if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err;
Shane Snyder's avatar
Shane Snyder committed
298

299
300
    /* adjust argobots settings to suit Margo */
    set_argobots_tunables();
Philip Carns's avatar
Philip Carns committed
301

302
303
304
305
    if (ABT_initialized() == ABT_ERR_UNINITIALIZED)
    {
        ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
        if(ret != 0) goto err;
306
307
        ret = ABT_mutex_create(&g_num_margo_instances_mtx);
        if(ret != 0) goto err;
308
    }
Shane Snyder's avatar
Shane Snyder committed
309

310
311
312
313
314
315
316
    /* set caller (self) ES to sleep when idle by using sched_wait */
    ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 0, NULL, 
        ABT_SCHED_CONFIG_NULL, &self_sched);
    if(ret != ABT_SUCCESS) goto err;
    ret = ABT_xstream_self(&self_xstream);
    if(ret != ABT_SUCCESS) goto err;
    ret = ABT_xstream_set_main_sched(self_xstream, self_sched);
317
318
319
320
    if(ret != ABT_SUCCESS) {
        // best effort
        ABT_sched_free(&self_sched);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
321
322
323

    if (use_progress_thread)
    {
324
325
326
327
328
329
        /* create an xstream to run progress engine */
        ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 0, NULL,
           ABT_SCHED_CONFIG_NULL, &progress_sched);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_create(progress_sched, &progress_xstream);
        if (ret != ABT_SUCCESS) goto err;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
330
331
332
333
334
335
    }
    else
    {
        ret = ABT_xstream_self(&progress_xstream);
        if (ret != ABT_SUCCESS) goto err;
    }
336
337
    ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
    if (ret != ABT_SUCCESS) goto err;
338
    if (rpc_thread_count > 0)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
339
    {
340
        /* create a collection of xstreams to run RPCs */
341
342
        rpc_xstreams = calloc(rpc_thread_count, sizeof(*rpc_xstreams));
        if (rpc_xstreams == NULL) goto err;
343
344
345
        rpc_scheds = calloc(rpc_thread_count, sizeof(*rpc_scheds));
        if (rpc_scheds == NULL) goto err;
        ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &rpc_pool);
346
        if (ret != ABT_SUCCESS) goto err;
347
348
        for(i=0; i<rpc_thread_count; i++) 
        {
Philip Carns's avatar
Philip Carns committed
349
            ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &rpc_pool,
350
351
352
               ABT_SCHED_CONFIG_NULL, &rpc_scheds[i]);
            if (ret != ABT_SUCCESS) goto err;
            ret = ABT_xstream_create(rpc_scheds[i], rpc_xstreams+i);
Shane Snyder's avatar
Shane Snyder committed
353
354
            if (ret != ABT_SUCCESS) goto err;
        }
355
356
357
358
359
360
361
362
363
364
365
    }
    else if (rpc_thread_count == 0)
    {
        ret = ABT_xstream_self(&rpc_xstream);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
        if (ret != ABT_SUCCESS) goto err;
    }
    else
    {
        rpc_pool = progress_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
366
367
    }

368
    hg_class = HG_Init_opt(addr_str, listen_flag, hg_init_info);
Shane Snyder's avatar
Shane Snyder committed
369
370
371
372
373
    if(!hg_class) goto err;

    hg_context = HG_Context_create(hg_class);
    if(!hg_context) goto err;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
374
375
376
    mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
    if (mid == MARGO_INSTANCE_NULL) goto err;

Shane Snyder's avatar
Shane Snyder committed
377
    mid->margo_init = 1;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
378
379
380
    mid->owns_progress_pool = use_progress_thread;
    mid->progress_xstream = progress_xstream;
    mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
381
    mid->rpc_xstreams = rpc_xstreams;
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
    mid->num_registered_rpcs = 0;

    /* start profiling if env variable MARGO_ENABLE_PROFILING is set */
    unsigned int profile = 0;
    mid->profile_enabled = 0;
    mid->previous_sparkline_data_collection_time = ABT_get_wtime();
    mid->sparkline_index = 0;

    if(getenv("MARGO_ENABLE_PROFILING")) {
      profile = (unsigned int)atoi(getenv("MARGO_ENABLE_PROFILING"));
      margo_set_param(mid, MARGO_PARAM_ENABLE_PROFILING, &profile);
    }

    if(profile) {
       margo_profile_start(mid);

       ret = ABT_thread_create(mid->progress_pool, sparkline_data_collection_fn, mid, 
       ABT_THREAD_ATTR_NULL, &mid->sparkline_data_collection_tid);
       if(ret != 0)
         fprintf(stderr, "MARGO_PROFILE: Failed to start sparkline data collection thread. Continuing to profile without sparkline data collection.\n");

    }

    /* start diagnostics if the variable MARGO_ENABLE_DIAGNOSTICS is set */
    unsigned int diag = 0;
    mid->diag_enabled = 0;

    if(getenv("MARGO_ENABLE_DIAGNOSTICS")) {
      diag = (unsigned int)atoi(getenv("MARGO_ENABLE_DIAGNOSTICS"));
      margo_set_param(mid, MARGO_PARAM_ENABLE_DIAGNOSTICS, &diag);
    }

    if(diag)
      margo_diag_start(mid);
416

Jonathan Jenkins's avatar
Jonathan Jenkins committed
417
418
419
    return mid;

err:
Shane Snyder's avatar
Shane Snyder committed
420
421
    if(mid)
    {
422
        margo_timer_list_free(mid->timer_list);
Shane Snyder's avatar
Shane Snyder committed
423
424
425
426
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
        free(mid);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
427
428
429
430
431
432
433
434
435
436
437
438
439
    if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL)
    {
        ABT_xstream_join(progress_xstream);
        ABT_xstream_free(&progress_xstream);
    }
    if (rpc_thread_count > 0 && rpc_xstreams != NULL)
    {
        for (i = 0; i < rpc_thread_count; i++)
        {
            ABT_xstream_join(rpc_xstreams[i]);
            ABT_xstream_free(&rpc_xstreams[i]);
        }
        free(rpc_xstreams);
440
        free(rpc_scheds);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
441
    }
Shane Snyder's avatar
Shane Snyder committed
442
443
444
445
    if(hg_context)
        HG_Context_destroy(hg_context);
    if(hg_class)
        HG_Finalize(hg_class);
446
447
448
    if(g_num_margo_instances_mtx != ABT_MUTEX_NULL && g_num_margo_instances == 0) {
        ABT_mutex_free(&g_num_margo_instances_mtx);
        g_num_margo_instances_mtx = ABT_MUTEX_NULL;
449
        ABT_finalize();
450
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
451
452
453
454
    return MARGO_INSTANCE_NULL;
}

margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
455
    hg_context_t *hg_context)
456
457
{
    int ret;
Shane Snyder's avatar
Shane Snyder committed
458
    hg_return_t hret;
459
460
    struct margo_instance *mid;

461
462
463
464
465
    /* set input offset to include breadcrumb information in Mercury requests */
    hret = HG_Class_set_input_offset(HG_Context_get_class(hg_context), sizeof(uint64_t));
    /* this should not ever fail */
    assert(hret == HG_SUCCESS);

Matthieu Dorier's avatar
Matthieu Dorier committed
466
    mid = calloc(1,sizeof(*mid));
Shane Snyder's avatar
Shane Snyder committed
467
    if(!mid) goto err;
468
    memset(mid, 0, sizeof(*mid));
469

470
471
472
    ABT_mutex_create(&mid->finalize_mutex);
    ABT_cond_create(&mid->finalize_cond);

473
474
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
475
    mid->hg_class = HG_Context_get_class(hg_context);
476
    mid->hg_context = hg_context;
477
    mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
478

Jonathan Jenkins's avatar
Jonathan Jenkins committed
479
    mid->refcount = 1;
480
    mid->finalize_cb = NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
481
    mid->prefinalize_cb = NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
482
    mid->enable_remote_shutdown = 0;
483

484
485
486
487
    mid->pending_operations = 0;
    ABT_mutex_create(&mid->pending_operations_mtx);
    mid->finalize_requested = 0;

488
489
    ABT_mutex_create(&mid->diag_rpc_mutex);

490
491
    mid->timer_list = margo_timer_list_create();
    if(mid->timer_list == NULL) goto err;
492

Shane Snyder's avatar
Shane Snyder committed
493
494
495
496
    /* initialize the handle cache */
    hret = margo_handle_cache_init(mid);
    if(hret != HG_SUCCESS) goto err;

497
498
499
500
501
502
503
504
505
    /* register thread local key to track RPC breadcrumbs across threads */
    /* NOTE: we are registering a global key, even though init could be called
     * multiple times for different margo instances.  As of May 2019 this doesn't
     * seem to be a problem to call ABT_key_create() multiple times.
     */
    ret = ABT_key_create(free, &rpc_breadcrumb_key);
    if(ret != 0)
        goto err;

506
507
508
509
    ret = ABT_key_create(free, &target_timing_key);
    if(ret != 0)
        goto err;

510
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
511
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
Shane Snyder's avatar
Shane Snyder committed
512
513
    if(ret != 0) goto err;

Matthieu Dorier's avatar
Matthieu Dorier committed
514
515
516
    mid->shutdown_rpc_id = MARGO_REGISTER(mid, "__shutdown__", 
            void, margo_shutdown_out_t, remote_shutdown_ult);

517
518
519
520
521
522
523
    /* increment the number of margo instances */
    if(g_num_margo_instances_mtx == ABT_MUTEX_NULL)
        ABT_mutex_create(&g_num_margo_instances_mtx);
    ABT_mutex_lock(g_num_margo_instances_mtx);
    g_num_margo_instances += 1;
    ABT_mutex_unlock(g_num_margo_instances_mtx);

Shane Snyder's avatar
Shane Snyder committed
524
525
    return mid;

Shane Snyder's avatar
Shane Snyder committed
526
527
err:
    if(mid)
528
    {
Shane Snyder's avatar
Shane Snyder committed
529
        margo_handle_cache_destroy(mid);
530
        margo_timer_list_free(mid->timer_list);
Shane Snyder's avatar
Shane Snyder committed
531
532
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
533
        ABT_mutex_free(&mid->pending_operations_mtx);
534
        ABT_mutex_free(&mid->diag_rpc_mutex);
535
        free(mid);
536
    }
Shane Snyder's avatar
Shane Snyder committed
537
    return MARGO_INSTANCE_NULL;
538
539
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
540
541
542
static void margo_cleanup(margo_instance_id mid)
{
    int i;
543
    struct margo_registered_rpc *next_rpc;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
544

545
546
547
    /* call finalize callbacks */
    struct margo_finalize_cb* fcb = mid->finalize_cb;
    while(fcb) {
548
        mid->finalize_cb = fcb->next;
549
550
        (fcb->callback)(fcb->uargs);
        struct margo_finalize_cb* tmp = fcb;
551
        fcb = mid->finalize_cb;
552
553
554
        free(tmp);
    }

555
    margo_timer_list_free(mid->timer_list);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
556
557
558

    ABT_mutex_free(&mid->finalize_mutex);
    ABT_cond_free(&mid->finalize_cond);
559
    ABT_mutex_free(&mid->pending_operations_mtx);
560
    ABT_mutex_free(&mid->diag_rpc_mutex);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577

    if (mid->owns_progress_pool)
    {
        ABT_xstream_join(mid->progress_xstream);
        ABT_xstream_free(&mid->progress_xstream);
    }

    if (mid->num_handler_pool_threads > 0)
    {
        for (i = 0; i < mid->num_handler_pool_threads; i++)
        {
            ABT_xstream_join(mid->rpc_xstreams[i]);
            ABT_xstream_free(&mid->rpc_xstreams[i]);
        }
        free(mid->rpc_xstreams);
    }

Shane Snyder's avatar
Shane Snyder committed
578
579
    margo_handle_cache_destroy(mid);

580
581
582
583
584
    /* TODO: technically we could/should call ABT_key_free() for
     * rpc_breadcrumb_key.  We can't do that here, though, because the key is
     * global, not local to this mid.
     */

Shane Snyder's avatar
Shane Snyder committed
585
586
587
588
589
590
    if (mid->margo_init)
    {
        if (mid->hg_context)
            HG_Context_destroy(mid->hg_context);
        if (mid->hg_class)
            HG_Finalize(mid->hg_class);
591
592
593
594
595
596
597
598
599
600
601
602
603

        if(g_num_margo_instances_mtx != ABT_MUTEX_NULL) {
            ABT_mutex_lock(g_num_margo_instances_mtx);
            g_num_margo_instances -= 1;
            if(g_num_margo_instances > 0) {
                ABT_mutex_unlock(g_num_margo_instances_mtx);
            } else {
                ABT_mutex_unlock(g_num_margo_instances_mtx);
                ABT_mutex_free(&g_num_margo_instances_mtx);
                g_num_margo_instances_mtx = ABT_MUTEX_NULL;
                ABT_finalize();
            }
        }
Shane Snyder's avatar
Shane Snyder committed
604
605
    }

606
607
608
609
610
611
612
    while(mid->registered_rpcs)
    {
        next_rpc = mid->registered_rpcs->next;
        free(mid->registered_rpcs);
        mid->registered_rpcs = next_rpc;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
613
614
615
    free(mid);
}

616
void margo_finalize(margo_instance_id mid)
617
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
618
    int do_cleanup;
619

620
621
622
623
624
625
626
627
628
629
    /* check if there are pending operations */
    int pending;
    ABT_mutex_lock(mid->pending_operations_mtx);
    pending = mid->pending_operations;
    ABT_mutex_unlock(mid->pending_operations_mtx);
    if(pending) {
        mid->finalize_requested = 1;
        return;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
630
631
632
633
634
635
636
637
638
639
    /* before exiting the progress loop, pre-finalize callbacks need to be called */
    struct margo_finalize_cb* fcb = mid->prefinalize_cb;
    while(fcb) {
        mid->prefinalize_cb = fcb->next;
        (fcb->callback)(fcb->uargs);
        struct margo_finalize_cb* tmp = fcb;
        fcb = mid->prefinalize_cb;
        free(tmp);
    }

640
    /* tell progress thread to wrap things up */
641
    mid->hg_progress_shutdown_flag = 1;
642
643

    /* wait for it to shutdown cleanly */
644
645
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
646

647
648
649
650
651
652
653
654
655
    if(mid->profile_enabled) {
      ABT_thread_join(mid->sparkline_data_collection_tid);
      ABT_thread_free(&mid->sparkline_data_collection_tid);
      margo_profile_dump(mid, "profile", 1);
    }
    
    if(mid->diag_enabled) 
      margo_diag_dump(mid, "profile", 1);

656
657
658
659
    ABT_mutex_lock(mid->finalize_mutex);
    mid->finalize_flag = 1;
    ABT_cond_broadcast(mid->finalize_cond);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
660
661
    mid->refcount--;
    do_cleanup = mid->refcount == 0;
662

Jonathan Jenkins's avatar
Jonathan Jenkins committed
663
664
665
666
667
668
669
    ABT_mutex_unlock(mid->finalize_mutex);

    /* if there was noone waiting on the finalize at the time of the finalize
     * broadcast, then we're safe to clean up. Otherwise, let the finalizer do
     * it */
    if (do_cleanup)
        margo_cleanup(mid);
670
671
672
673
674
675

    return;
}

void margo_wait_for_finalize(margo_instance_id mid)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
676
    int do_cleanup;
677
678
679

    ABT_mutex_lock(mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
680
        mid->refcount++;
681
682
683
684
            
        while(!mid->finalize_flag)
            ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
685
686
687
        mid->refcount--;
        do_cleanup = mid->refcount == 0;

688
    ABT_mutex_unlock(mid->finalize_mutex);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
689
690
691
692

    if (do_cleanup)
        margo_cleanup(mid);

693
694
695
    return;
}

696
697
698
699
700
701
702
hg_bool_t margo_is_listening(
            margo_instance_id mid)
{
    if(!mid) return HG_FALSE;
    return HG_Class_is_listening(mid->hg_class);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
void margo_push_prefinalize_callback(
            margo_instance_id mid,
            void(*cb)(void*),
            void* uargs)
{
    margo_provider_push_prefinalize_callback(
            mid,
            NULL,
            cb,
            uargs);
}

int margo_pop_prefinalize_callback(
                    margo_instance_id mid)
{   
    return margo_provider_pop_prefinalize_callback(mid, NULL);
}

void margo_provider_push_prefinalize_callback(
            margo_instance_id mid,
            void* owner,
            void(*cb)(void*),                  
            void* uargs)
{
    if(cb == NULL) return;

    struct margo_finalize_cb* fcb = 
        (struct margo_finalize_cb*)malloc(sizeof(*fcb));
    fcb->owner    = owner;
    fcb->callback = cb;
    fcb->uargs    = uargs;

    struct margo_finalize_cb* next = mid->prefinalize_cb;
    fcb->next = next;
    mid->prefinalize_cb = fcb;
}

int margo_provider_pop_prefinalize_callback(
            margo_instance_id mid,
            void* owner)
{
    struct margo_finalize_cb* prev = NULL;
    struct margo_finalize_cb* fcb  =  mid->prefinalize_cb;
    while(fcb != NULL && fcb->owner != owner) {
        prev = fcb;
        fcb = fcb->next;
    }
    if(fcb == NULL) return 0;
    if(prev == NULL) {
        mid->prefinalize_cb = fcb->next;
    } else {
        prev->next = fcb->next;
    }
    free(fcb);
    return 1;
}

760
761
void margo_push_finalize_callback(
            margo_instance_id mid,
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
            void(*cb)(void*),
            void* uargs)
{
    margo_provider_push_finalize_callback(
            mid,
            NULL,
            cb,
            uargs);
}

int margo_pop_finalize_callback(
                    margo_instance_id mid)
{   
    return margo_provider_pop_finalize_callback(mid, NULL);
}

void margo_provider_push_finalize_callback(
            margo_instance_id mid,
            void* owner,
781
782
783
784
785
786
787
            void(*cb)(void*),                  
            void* uargs)
{
    if(cb == NULL) return;

    struct margo_finalize_cb* fcb = 
        (struct margo_finalize_cb*)malloc(sizeof(*fcb));
788
    fcb->owner    = owner;
789
    fcb->callback = cb;
790
    fcb->uargs    = uargs;
791
792
793
794
795
796

    struct margo_finalize_cb* next = mid->finalize_cb;
    fcb->next = next;
    mid->finalize_cb = fcb;
}

797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
int margo_provider_pop_finalize_callback(
            margo_instance_id mid,
            void* owner)
{
    struct margo_finalize_cb* prev = NULL;
    struct margo_finalize_cb* fcb  =  mid->finalize_cb;
    while(fcb != NULL && fcb->owner != owner) {
        prev = fcb;
        fcb = fcb->next;
    }
    if(fcb == NULL) return 0;
    if(prev == NULL) {
        mid->finalize_cb = fcb->next;
    } else {
        prev->next = fcb->next;
    }
    free(fcb);
    return 1;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
void margo_enable_remote_shutdown(margo_instance_id mid)
{
    mid->enable_remote_shutdown = 1;
}

int margo_shutdown_remote_instance(
        margo_instance_id mid,
        hg_addr_t remote_addr)
{
    hg_return_t hret;
    hg_handle_t handle;

    hret = margo_create(mid, remote_addr,
                        mid->shutdown_rpc_id, &handle);
    if(hret != HG_SUCCESS) return -1;

833
    hret = margo_forward(handle, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
834
835
836
837
838
839
840
841
842
843
844
845
846
847
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return -1;
    }

    margo_shutdown_out_t out;
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return -1;
    }

848
    margo_free_output(handle, &out);
Matthieu Dorier's avatar
Matthieu Dorier committed
849
850
851
852
853
    margo_destroy(handle);

    return out.ret;
}

854
hg_id_t margo_provider_register_name(margo_instance_id mid, const char *func_name,
855
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb,
856
    uint16_t provider_id, ABT_pool pool)
857
{
858
    hg_id_t id;
859
    int ret;
860
    struct margo_registered_rpc * tmp_rpc;
861
862
863
864

    assert(provider_id <= MARGO_MAX_PROVIDER_ID);
    
    id = gen_id(func_name, provider_id);
865

866
    if(mid->profile_enabled)
867
868
869
870
871
872
873
874
875
876
877
878
879
    {
        /* track information about this rpc registration for debugging and
         * profiling
         */
        tmp_rpc = calloc(1, sizeof(*tmp_rpc));
        if(!tmp_rpc)
            return(0);
        tmp_rpc->id = id;
        tmp_rpc->rpc_breadcrumb_fragment = id >> (__MARGO_PROVIDER_ID_SIZE*8);
        tmp_rpc->rpc_breadcrumb_fragment &= 0xffff;
        strncpy(tmp_rpc->func_name, func_name, 63);
        tmp_rpc->next = mid->registered_rpcs;
        mid->registered_rpcs = tmp_rpc;
880
	mid->num_registered_rpcs += 1;
881
882
    }

883
    ret = margo_register_internal(mid, id, in_proc_cb, out_proc_cb, rpc_cb, pool);
884
    if(ret == 0)
885
    {
886
        if(mid->profile_enabled)
887
888
889
890
        {
            mid->registered_rpcs = tmp_rpc->next;
            free(tmp_rpc);
        }
891
        return(0);
892
    }
893
894

    return(id);
895
896
}

Matthieu Dorier's avatar
Matthieu Dorier committed
897
898
899
900
901
902
903
hg_return_t margo_deregister(
        margo_instance_id mid,
        hg_id_t rpc_id)
{
    return HG_Deregister(mid->hg_class, rpc_id);
}

904
905
hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name,
    hg_id_t *id, hg_bool_t *flag)
906
{
907
908
    *id = gen_id(func_name, 0);
    return(HG_Registered(mid->hg_class, *id, flag));
909
910
}

Matthieu Dorier's avatar
Matthieu Dorier committed
911
hg_return_t margo_provider_registered_name(margo_instance_id mid, const char *func_name,
912
    uint16_t provider_id, hg_id_t *id, hg_bool_t *flag)
913
{
914
915
    *id = gen_id(func_name, provider_id);

916
    return HG_Registered(mid->hg_class, *id, flag);
917
918
}

919
920
921
922
923
924
925
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *)) 
{
	struct margo_rpc_data* margo_data 
926
		= (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id);
927
	if(!margo_data) return HG_OTHER_ERROR;
928
929
930
    if(margo_data->user_data && margo_data->user_free_callback) {
        (margo_data->user_free_callback)(margo_data->user_data);
    }
931
932
933
934
935
936
937
938
939
940
941
942
943
	margo_data->user_data = data;
	margo_data->user_free_callback = free_callback;
	return HG_SUCCESS;
}

void* margo_registered_data(margo_instance_id mid, hg_id_t id)
{
	struct margo_rpc_data* data
		= (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id);
	if(!data) return NULL;
	else return data->user_data;
}

944
945
946
947
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag)
948
{
949
    return(HG_Registered_disable_response(mid->hg_class, id, disable_flag));
950
}
951

952
953
954
955
956
957
958
959
960
961
962
963
hg_return_t margo_registered_disabled_response(
    margo_instance_id mid,
    hg_id_t id,
    int* disabled_flag)
{
    hg_bool_t b;
    hg_return_t ret = HG_Registered_disabled_response(mid->hg_class, id, &b);
    if(ret != HG_SUCCESS) return ret;
    *disabled_flag = b;
    return HG_SUCCESS;
}

964
struct lookup_cb_evt
965
{
966
    hg_return_t hret;
967
968
969
970
971
972
    hg_addr_t addr;
};

static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
{
    struct lookup_cb_evt evt;
973
    evt.hret = info->ret;
974
    evt.addr = info->info.lookup.addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
975
    ABT_eventual eventual = (ABT_eventual)(info->arg);
976
977

    /* propagate return code out through eventual */
Matthieu Dorier's avatar
Matthieu Dorier committed
978
    ABT_eventual_set(eventual, &evt, sizeof(evt));
979

980
981
982
    return(HG_SUCCESS);
}

983
984
985
986
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
    const char   *name,
    hg_addr_t    *addr)
987
{
988
    hg_return_t hret;
989
990
991
    struct lookup_cb_evt *evt;
    ABT_eventual eventual;
    int ret;
992

993
994
995
996
997
998
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

999
    hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
1000
        (void*)eventual, name, HG_OP_ID_IGNORE);
1001
    if(hret == HG_SUCCESS)
1002
1003
1004
    {
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
1005
        hret = evt->hret;
1006
1007
1008
1009
    }

    ABT_eventual_free(&eventual);

1010
    return(hret);
1011
1012
1013
1014
1015
}

hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr)
1016
{
1017
1018
    return(HG_Addr_free(mid->hg_class, addr));
}
1019

1020
1021
1022
1023
1024
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr)
{
    return(HG_Addr_self(mid->hg_class, addr));
1025
1026
}

1027
1028
1029
1030
1031
1032
1033
1034
1035
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr)
{
    return(HG_Addr_dup(mid->hg_class, addr, new_addr));
}

hg_return_t margo_addr_to_string(
1036
    margo_instance_id mid,
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr)
{
    return(HG_Addr_to_string(mid->hg_class, buf, buf_size, addr));
}

hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr,
    hg_id_t id, hg_handle_t *handle)
{
1047
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
1048
1049
1050
1051
1052
1053
1054
1055

    /* look for a handle to reuse */
    hret = margo_handle_cache_get(mid, addr, id, handle);
    if(hret != HG_SUCCESS)
    {
        /* else try creating a new handle */
        hret = HG_Create(mid->hg_context, addr, id, handle);
    }
1056

Shane Snyder's avatar
Shane Snyder committed
1057
    return hret;
1058
1059
}

1060
hg_return_t margo_destroy(hg_handle_t handle)
1061
{
1062
1063
1064
    if(handle == HG_HANDLE_NULL)
        return HG_SUCCESS;

1065
1066
1067
1068
1069
1070
1071
    /* check if the reference count of the handle is 1 */
    int32_t refcount = HG_Ref_get(handle);
    if(refcount != 1) {
        /* if different from 1, then HG_Destroy will simply decrease it */
        return HG_Destroy(handle);
    }

1072
    margo_instance_id mid;
1073
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
1074

1075
1076
1077
    /* use the handle to get the associated mid */
    mid = margo_hg_handle_get_instance(handle);

Shane Snyder's avatar
Shane Snyder committed
1078
1079
1080
1081
1082
1083
1084
    /* recycle this handle if it came from the handle cache */
    hret = margo_handle_cache_put(mid, handle);
    if(hret != HG_SUCCESS)
    {
        /* else destroy the handle manually */
        hret = HG_Destroy(handle);
    }
1085

Shane Snyder's avatar
Shane Snyder committed
1086
    return hret;
1087
1088
1089
1090
1091
}

static hg_return_t margo_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;
1092
    margo_request req = (margo_request)(info->arg);
1093
    margo_instance_id mid;
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106

    if(hret == HG_CANCELED && req->timer) {
        hret = HG_TIMEOUT;
    }

    /* remove timer if there is one and it is still in place (i.e., not timed out) */
    if(hret != HG_TIMEOUT && req->timer && req->handle) {
        margo_instance_id mid = margo_hg_handle_get_instance(req->handle);
        margo_timer_destroy(mid, req->timer);
    }
    if(req->timer) {
        free(req->timer);
    }
1107

1108
1109
1110
1111
1112
1113
1114
    if(req->rpc_breadcrumb != 0)
    {
        /* This is the callback from an HG_Forward call.  Track RPC timing
         * information.
         */
        mid = margo_hg_handle_get_instance(req->handle);
        assert(mid);
1115
1116
1117
1118
1119

        if(mid->profile_enabled) {
          /* 0 here indicates this is a origin-side call */
          margo_breadcrumb_measure(mid, req->rpc_breadcrumb, req->start_time, 0, req->provider_id, req->server_addr_hash, req->handle);
        }
1120
1121
    }

1122
    /* propagate return code out through eventual */
1123
    ABT_eventual_set(req->eventual, &hret, sizeof(hret));
1124
1125
1126
1127
    
    return(HG_SUCCESS);
}

1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
static hg_return_t margo_wait_internal(margo_request req)
{
    hg_return_t* waited_hret;
    hg_return_t  hret;

    ABT_eventual_wait(req->eventual, (void**)&waited_hret);
    hret = *waited_hret;
    ABT_eventual_free(&(req->eventual));

    return(hret);
}

typedef struct
{
    hg_handle_t handle;
} margo_forward_timeout_cb_dat;

static void margo_forward_timeout_cb(void *arg)
Matthieu Dorier's avatar
Matthieu Dorier committed
1146
{
1147
1148
1149
1150
    margo_request req = (margo_request)arg;
    /* cancel the Mercury op if the forward timed out */
    HG_Cancel(req->handle);
    return;
Matthieu Dorier's avatar
Matthieu Dorier committed
1151
1152
}

1153
static hg_return_t margo_provider_iforward_internal(
1154
    uint16_t provider_id,
Matthieu Dorier's avatar
Matthieu Dorier committed
1155
    hg_handle_t handle,
1156
    double timeout_ms,
Matthieu Dorier's avatar
Matthieu Dorier committed
1157
    void *in_struct,
1158
    margo_request req) /* the request should have been allocated */
1159
1160
{
    hg_return_t hret = HG_TIMEOUT;
1161
    ABT_eventual eventual;
1162
    int ret;
1163
1164
1165
1166
    const struct hg_info* hgi; 
    hg_id_t id;
    hg_proc_cb_t in_cb, out_cb;
    hg_bool_t flag;
1167
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
1168
    uint64_t *rpc_breadcrumb;
1169
1170
    char addr_string[128];
    hg_size_t addr_string_sz = 128;
1171
1172
1173
1174

    assert(provider_id <= MARGO_MAX_PROVIDER_ID);

    hgi = HG_Get_info(handle);
Philip Carns's avatar
Philip Carns committed
1175
    id = mux_id(hgi->id, provider_id);
1176

1177
1178
1179
1180
1181
    hg_bool_t is_registered;
    ret = HG_Registered(hgi->hg_class, id, &is_registered);
    if(ret != HG_SUCCESS)
        return(ret);
    if(!is_registered)
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
    {
        /* if Mercury does not recognize this ID (with provider id included)
         * then register it now
         */
        /* find encoders for base ID */
        ret = HG_Registered_proc_cb(hgi->hg_class, hgi->id, &flag, &in_cb, &out_cb);
        if(ret != HG_SUCCESS)
            return(ret);
        if(!flag)
            return(HG_NO_MATCH);

1193
1194
1195
1196
1197
1198
        /* find out if disable_response was called for this RPC */
        hg_bool_t response_disabled;
        ret = HG_Registered_disabled_response(hgi->hg_class, hgi->id, &response_disabled);
        if(ret != HG_SUCCESS)
            return(ret);

1199
        /* register new ID that includes provider id */
1200
        ret = margo_register_internal(margo_hg_info_get_instance(hgi), 
1201
            id, in_cb, out_cb, NULL, ABT_POOL_NULL);
1202
1203
        if(ret == 0)
            return(HG_OTHER_ERROR);
1204
1205
1206
        ret = HG_Registered_disable_response(hgi->hg_class, id, response_disabled);
        if(ret != HG_SUCCESS)
            return(ret);
1207
    }
1208
1209
1210
    ret = HG_Reset(handle, hgi->addr, id);
    if(ret != HG_SUCCESS)
        return(ret);
1211
1212
1213
1214
1215
1216

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
    
    req->timer = NULL;
    req->eventual = eventual;
    req->handle = handle;

    if(timeout_ms > 0) {
        /* set a timer object to expire when this forward times out */
        req->timer = calloc(1, sizeof(*(req->timer)));
        if(!(req->timer)) {
            ABT_eventual_free(&eventual);
            return(HG_NOMEM_ERROR);
        }
        margo_timer_init(mid, req->timer, margo_forward_timeout_cb,
                         req, timeout_ms);
    }
1232

1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
    /* add rpc breadcrumb to outbound request; this will be used to track
     * rpc statistics.
     */
    ret = HG_Get_input_buf(handle, (void**)&rpc_breadcrumb, NULL);
    if(ret != HG_SUCCESS)
        return(ret);
    req->rpc_breadcrumb = margo_breadcrumb_set(hgi->id);
    /* LE encoding */
    *rpc_breadcrumb = htole64(req->rpc_breadcrumb);
    req->start_time = ABT_get_wtime();
1243
1244
1245
1246
1247
1248
    
    /* add information about the server and provider servicing the request */
    req->provider_id = provider_id; /*store id of provider servicing the request */
    const struct hg_info * inf = HG_Get_info(req->handle);
    margo_addr_to_string(mid, addr_string, &addr_string_sz, inf->addr);
    HASH_JEN(addr_string, strlen(addr_string), req->server_addr_hash); /*record server address in the breadcrumb */
1249

1250
    return HG_Forward(handle, margo_cb, (void*)req, in_struct);
Matthieu Dorier's avatar
Matthieu Dorier committed
1251
}
1252

1253
1254
1255
1256
hg_return_t margo_provider_forward(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct)
Matthieu Dorier's avatar
Matthieu Dorier committed
1257
{
1258
    return margo_provider_forward_timed(provider_id, handle, in_struct, 0);
1259
1260
}

1261
1262
1263
1264
1265
hg_return_t margo_provider_iforward(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    margo_request* req)
Matthieu Dorier's avatar
Matthieu Dorier committed
1266
{
1267
    return margo_provider_iforward_timed(provider_id, handle, in_struct, 0, req);