margo.h 37.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#ifndef __MARGO
#define __MARGO

10
11
12
13
#ifdef __cplusplus
extern "C" {
#endif

14
#include <mercury.h>
15
16
#include <mercury_types.h>
#include <mercury_bulk.h>
17
18
19
#include <mercury_macros.h>
#include <abt.h>

20
21
#include "margo-diag.h"

22
23
24
25
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)

Shane Snyder's avatar
Shane Snyder committed
26
27
28
29
30
/* This is to prevent the user from usin HG_Register_data
 * and HG_Registered_data, which are replaced with
 * margo_register_data and margo_registered_data
 * respecively.
 */
31
32
#undef MERCURY_REGISTER

33
34
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
35
typedef struct margo_data* margo_data_ptr;
36
typedef struct margo_request_struct* margo_request;
37

38
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
39
#define MARGO_REQUEST_NULL ((margo_request)NULL)
Shane Snyder's avatar
Shane Snyder committed
40
41
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
42
#define MARGO_DEFAULT_PROVIDER_ID 0
43
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
44

Shane Snyder's avatar
Shane Snyder committed
45
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
46
47
#define MARGO_PARAM_ENABLE_PROFILING 2
#define MARGO_PARAM_ENABLE_DIAGNOSTICS 3
48

Jonathan Jenkins's avatar
Jonathan Jenkins committed
49
50
/**
 * Initializes margo library.
Shane Snyder's avatar
Shane Snyder committed
51
 * @param [in] addr_str            Mercury host address with port number
Shane Snyder's avatar
Shane Snyder committed
52
53
54
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
Jonathan Jenkins's avatar
Jonathan Jenkins committed
55
56
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
57
 *                                 it will run in the caller's thread context.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
58
59
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
60
61
62
63
64
65
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
66
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
67
68
69
70
71
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
72
 */
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#define margo_init(_addr_str, _mode, _use_progress_thread, _rpc_thread_count)\
 margo_init_opt(_addr_str, _mode, NULL, _use_progress_thread, _rpc_thread_count)

/**
 * Initializes margo library with custom Mercury options.
 * @param [in] addr_str            Mercury host address with port number
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
 * @param [in] hg_init_info        (Optional) Hg init info, passed directly
 *                                 to Mercury
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
 *                                 it will run in the caller's thread context.
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
 */
margo_instance_id margo_init_opt(
Shane Snyder's avatar
Shane Snyder committed
103
    const char *addr_str,
Shane Snyder's avatar
Shane Snyder committed
104
    int mode,
105
    const struct hg_init_info *hg_init_info,
Shane Snyder's avatar
Shane Snyder committed
106
107
    int use_progress_thread,
    int rpc_thread_count);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108

109

110
/**
Jonathan Jenkins's avatar
doc fix    
Jonathan Jenkins committed
111
 * Initializes margo library from given argobots and Mercury instances.
Philip Carns's avatar
Philip Carns committed
112
113
114
 * @param [in] progress_pool Argobots pool to drive communication
 * @param [in] handler_pool Argobots pool to service RPC handlers
 * @param [in] hg_context Mercury context
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
115
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
Philip Carns's avatar
Philip Carns committed
116
117
118
119
120
121
122
123
124
125
 *
 * NOTE: if you are configuring Argobots pools yourself before
 * passing them into this function, please consider setting
 * ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
 * environment or programmatically with putenv() in your code before
 * creating the pools to prevent excess memory consumption under
 * load from producer/consumer patterns across execution streams that
 * fail to utilize per-execution stream stack caches.  See
 * https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
 * The margo_init() function does this automatically.
126
 */
Shane Snyder's avatar
Shane Snyder committed
127
128
129
margo_instance_id margo_init_pool(
    ABT_pool progress_pool,
    ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
130
    hg_context_t *hg_context);
131
132

/**
Philip Carns's avatar
Philip Carns committed
133
 * Shuts down margo library and its underlying abt and mercury resources
Philip Carns's avatar
Philip Carns committed
134
 * @param [in] mid Margo instance
135
 */
136
137
void margo_finalize(
    margo_instance_id mid);
138
139
140
141
142
143
144
145
146
147
148

/**
 * Suspends the caller until some other entity (e.g. an RPC, thread, or
 * signal handler) invokes margo_finalize().
 *
 * NOTE: This informs Margo that the calling thread no longer needs to be 
 * scheduled for execution if it is sharing an Argobots pool with the
 * progress engine.
 *
 * @param [in] mid Margo instance
 */
149
150
void margo_wait_for_finalize(
    margo_instance_id mid);
151

152
153
154
155
156
157
158
159
160
161
/**
 * Checks whether a Margo instance we initialized as a server.
 *
 * @param [in] mid Margo instance
 *
 * @return HG_TRUE if listening or HG_FALSE if not, or not a valid margo instance.
 */
hg_bool_t margo_is_listening(
    margo_instance_id mid);

Matthieu Dorier's avatar
Matthieu Dorier committed
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
 * Installs a callback to be called before the margo instance is finalize,
 * and before the Mercury progress loop is terminated.
 * Callbacks installed will be called in reverse ordered than they have been
 * pushed, and with the user-provider pointer as argument.
 *
 * Note that callbacks may not be called within margo_finalize. They are called
 * when the margo instance is cleaned up, which may happen in margo_wait_for_finalize.
 * 
 * Callbacks installed using this function may call RPCs or margo_thread_sleep,
 * but do not guarantee that the process isn't itself going to receive RPCs in
 * the mean time.
 *
 * @param mid The margo instance
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_push_prefinalize_callback(
    margo_instance_id mid,
    void(*cb)(void*), 
    void* uargs);

/**
 * Removes the last pre-finalize callback that was pushed into the margo instance
 * without calling it. If a callback was removed, this function returns 1, otherwise
 * it returns 0.
 *
 * @param mid Margo instance.
 */
int margo_pop_prefinalize_callback(
    margo_instance_id mid);

/**
 * @brief Installs a callback to be called before the margo instance is finalized,
 * and before the Mercury progress loop is terminated.
 * The owner pointer allows to identify callbacks installed by particular providers.
 * Note that one can install multiple callbacks with the same owner. If popped, they
 * will be popped in reverse order of installation. If they are not popped, they will
 * be called in reverse order of installation by the margo cleanup procedure.
 *
 * Callbacks installed using this function may call RPCs or margo_thread_sleep,
 * but do not guarantee that the process isn't itself going to receive RPCs in
 * the mean time.
 *
 * @param mid The margo instance
 * @param owner Owner of the callback (to be used when popping callbacks)
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_provider_push_prefinalize_callback(
    margo_instance_id mid,
    void* owner,
    void(*cb)(void*),
    void* uargs);

/**
 * Removes the last prefinalize callback that was pushed into the margo instance
 * by the specified owner. If a callback was removed, this function returns 1, otherwise
 * it returns 0.
 *
 * @param mid Margo instance.
 * @param owner Owner of the callback.
 */
int margo_provider_pop_prefinalize_callback(
    margo_instance_id mid,
    void* owner);

229
230
231
232
233
234
235
236
/**
 * Installs a callback to be called before the margo instance is finalize.
 * Callbacks installed will be called in reverse ordered than they have been
 * pushed, and with the user-provider pointer as argument.
 *
 * Note that callbacks may not be called within margo_finalize. They are called
 * when the margo instance is cleaned up, which may happen in margo_wait_for_finalize.
 *
Matthieu Dorier's avatar
Matthieu Dorier committed
237
238
239
240
 * Important: callbacks cannot make RPC calls nor use margo_thread_sleep. If you
 * need to be able to make RPC calls or use margo_thread_sleep, you should use
 * margo_push_prefinalize_callback instead.
 *
241
242
243
244
245
246
247
248
249
 * @param mid The margo instance
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_push_finalize_callback(
    margo_instance_id mid,
    void(*cb)(void*), 
    void* uargs);

250
251
/**
 * @brief Removes the last finalize callback that was pushed into the margo instance
Matthieu Dorier's avatar
Matthieu Dorier committed
252
 * without calling it. If a callback was removed, this function returns 1, otherwise
253
254
255
256
257
258
259
260
261
262
 * it returns 0.
 *
 * @param mid Margo instance.
 */
int margo_pop_finalize_callback(
    margo_instance_id mid);

/**
 * @brief Installs a callback to be called before the margo instance is finalized.
 * The owner pointer allows to identify callbacks installed by particular providers.
Matthieu Dorier's avatar
Matthieu Dorier committed
263
264
265
 * Note that one can install multiple callbacks with the same owner. If popped, they
 * will be popped in reverse order of installation. If they are not popped, they will
 * be called in reverse order of installation by the margo cleanup procedure.
266
 *
Matthieu Dorier's avatar
Matthieu Dorier committed
267
268
269
270
 * Important: callbacks cannot make RPC calls nor use margo_thread_sleep. If you
 * need to be able to make RPC calls or use margo_thread_sleep, you should use
 * margo_provider_push_prefinalize_callback instead.
 *
271
272
273
274
275
276
277
278
279
280
281
282
283
 * @param mid The margo instance
 * @param owner Owner of the callback (to be used when popping callbacks)
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_provider_push_finalize_callback(
    margo_instance_id mid,
    void* owner,
    void(*cb)(void*),
    void* uargs);

/**
 * @brief Removes the last finalize callback that was pushed into the margo instance
Matthieu Dorier's avatar
Matthieu Dorier committed
284
 * by the specified owner. If a callback was removed, this function returns 1, otherwise
285
286
287
288
289
290
291
292
293
 * it returns 0.
 *
 * @param mid Margo instance.
 * @param owner Owner of the callback.
 */
int margo_provider_pop_finalize_callback(
    margo_instance_id mid,
    void* owner);

Matthieu Dorier's avatar
Matthieu Dorier committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/**
 * Allows the passed Margo instance to be shut down remotely
 * using margo_shutdown_remote_instance().
 * 
 * @param mid Margo instance
 */
void margo_enable_remote_shutdown(margo_instance_id mid);

/**
 * Trigger the shutdown of the Margo instance running
 * at remote_addr.
 *
 * @param mid Local Margo instance
 * @param remote_addr Address of the Margo instance to shut down.
 *
 * @return 0 on success, -1 on failure.
 */
int margo_shutdown_remote_instance(
        margo_instance_id mid, 
        hg_addr_t remote_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
315

316
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
317
 * Registers an RPC with margo that is associated with a provider instance
318
319
320
321
322
323
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
Matthieu Dorier's avatar
Matthieu Dorier committed
324
325
 * \param [in] provider_id provider identifier
 * \param [in] pool Argobots pool the handler will execute in
326
327
 *
 * \return unique ID associated to the registered function
328
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
329
hg_id_t margo_provider_register_name(
330
331
332
333
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
334
335
336
    hg_rpc_cb_t rpc_cb,
    uint16_t provider_id,
    ABT_pool pool);
337

338
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
339
 * Registers an RPC with margo
340
341
342
343
344
345
346
347
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
 *
 * \return unique ID associated to the registered function
348
 */
349
static inline hg_id_t margo_register_name(
350
351
352
353
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
354
355
356
357
358
    hg_rpc_cb_t rpc_cb)
{
    return margo_provider_register_name(mid, func_name,
            in_proc_cb, out_proc_cb, rpc_cb, 0, ABT_POOL_NULL);
}
359

Matthieu Dorier's avatar
Matthieu Dorier committed
360
361
362
363
364
365
366
367
368
369
370
371
/**
 * Deregisters an RPC with margo
 *
 * \param [in] mid Margo instance
 * \param [in] rpc_id Id of the RPC to deregister
 *
 * \return HG_SUCCESS or corresponding error code
 */
hg_return_t margo_deregister(
    margo_instance_id mid,
    hg_id_t rpc_id);

372
/*
373
 * Indicate whether margo_register_name() has been called for the RPC specified by
374
375
 * func_name.
 *
376
377
378
379
 * \param [in] mid Margo instance
 * \param [in] func_name function name
 * \param [out] id registered RPC ID
 * \param [out] flag pointer to boolean
380
381
 *
 * \return HG_SUCCESS or corresponding HG error code
382
 */
383
384
385
386
387
hg_return_t margo_registered_name(
    margo_instance_id mid,
    const char *func_name,
    hg_id_t *id,
    hg_bool_t *flag);
388

389
/**
390
 * Indicate whether the given RPC name has been registered with the given provider id.
391
392
393
 *
 * @param [in] mid Margo instance
 * @param [in] func_name function name
394
 * @param [in] provider_id provider id
395
396
397
398
399
 * @param [out] id registered RPC ID
 * @param [out] flag pointer to boolean
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
400
hg_return_t margo_provider_registered_name(
401
402
    margo_instance_id mid,
    const char *func_name,
403
    uint16_t provider_id,
404
405
406
    hg_id_t *id,
    hg_bool_t *flag);

407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
/**
 * Register and associate user data to registered function.
 * When HG_Finalize() is called free_callback (if defined) is called 
 * to free the registered data.
 *
 * \param [in] mid            Margo instance
 * \param [in] id             registered function ID
 * \param [in] data           pointer to data
 * \param [in] free_callback  pointer to free function
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *));

/**
 * Indicate whether margo_register_data() has been called and return associated
 * data.
 *
 * \param [in] mid        Margo instance 
 * \param [in] id         registered function ID
 *
 * \return Pointer to data or NULL
 */
434
435
436
void* margo_registered_data(
    margo_instance_id mid,
    hg_id_t id);
437

438

439
/**
440
 * Disable response for a given RPC ID.
441
 *
442
443
444
 * \param [in] mid          Margo instance 
 * \param [in] id           registered function ID
 * \param [in] disable_flag flag to disable (1) or re-enable (0) responses
445
446
 *
 * \return HG_SUCCESS or corresponding HG error code
447
 */
448
449
450
451
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag);
452

453
454
455
456
457
458
459
460
461
462
463
464
465
466
/**
 * Checks if response is disabled for a given RPC ID.
 *
 * @param [in] mid           Margo instance
 * @param [in] id            registered function ID
 * @param [ou] disabled_flag flag indicating whether response is disabled (1) or not (0)
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_registered_disabled_response(
    margo_instance_id mid,
    hg_id_t id,
    int* disabled_flag);

467
468
/**
 * Lookup an addr from a peer address/name.
469
470
471
472
 * \param [in] name     lookup name
 * \param [out] addr    return address
 *
 * \return HG_SUCCESS or corresponding HG error code
473
474
475
 */
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
476
477
    const char *name,
    hg_addr_t *addr);
478
479

/**
480
 * Free the given Mercury addr.
481
 *
482
483
 * \param [in] mid  Margo instance 
 * \param [in] addr Mercury address
484
485
486
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
487
488
489
hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr);
490
491

/**
492
 * Access self address. Address must be freed with margo_addr_free().
493
 *
494
495
 * \param [in] mid  Margo instance 
 * \param [in] addr pointer to abstract Mercury address
496
497
498
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
499
500
501
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr);
502
503

/**
504
 * Duplicate an existing Mercury address. 
505
 *
506
507
508
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address to duplicate
 * \param [in] new_addr pointer to newly allocated abstract Mercury address
509
510
511
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
512
513
514
515
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr);
516
517

/**
518
519
520
521
522
 * Convert a Mercury addr to a string (returned string includes the
 * terminating null byte '\0'). If buf is NULL, the address is not
 * converted and only the required size of the buffer is returned.
 * If the input value passed through buf_size is too small, HG_SIZE_ERROR
 * is returned and the buf_size output is set to the minimum size required.
523
 *
524
525
526
527
 * \param [in] mid          Margo instance 
 * \param [in/out] buf      pointer to destination buffer
 * \param [in/out] buf_size pointer to buffer size
 * \param [in] addr         abstract Mercury address
528
529
530
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
531
532
533
534
535
hg_return_t margo_addr_to_string(
    margo_instance_id mid,
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr);
536
537

/**
538
539
540
541
 * Initiate a new Mercury RPC using the specified function ID and the
 * local/remote target defined by addr. The handle created can be used to
 * query input and output, as well as issuing the RPC by calling
 * HG_Forward(). After completion the handle must be freed using HG_Destroy().
542
 *
543
544
545
546
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address of destination
 * \param [in] id       registered function ID
 * \param [out] handle  pointer to HG handle
547
548
549
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
550
551
552
553
554
hg_return_t margo_create(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_id_t id,
    hg_handle_t *handle);
555
556

/**
557
 * Destroy Mercury handle.
558
 *
Shane Snyder's avatar
Shane Snyder committed
559
 * \param [in] handle   Mercury handle
560
561
562
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
563
564
hg_return_t margo_destroy(
    hg_handle_t handle);
565
566

/**
567
 * Increment ref count on a Mercury handle.
568
 *
569
 * \param [in] handle Mercury handle
570
571
572
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
573
#define margo_ref_incr HG_Ref_incr
574
575
576
577

/**
 * Get info from handle.
 *
578
 * \param [in] handle Mercury handle
579
580
581
 *
 * \return Pointer to info or NULL in case of failure
 */
582
#define margo_get_info HG_Get_info
583
584
585

/**
 * Get input from handle (requires registration of input proc to deserialize
586
 * parameters). Input must be freed using margo_free_input().
587
 *
588
589
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
590
591
592
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
593
#define margo_get_input HG_Get_input
594
595
596
597

/**
 * Free resources allocated when deserializing the input.
 *
598
599
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
600
601
602
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
603
#define margo_free_input HG_Free_input
604
605
606

/**
 * Get output from handle (requires registration of output proc to deserialize
607
 * parameters). Output must be freed using margo_free_output().
608
 *
609
610
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
611
612
613
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
614
#define margo_get_output HG_Get_output
615
616
617
618

/**
 * Free resources allocated when deserializing the output.
 *
619
620
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
621
622
623
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
624
#define margo_free_output HG_Free_output
625

626
627
/**
 * Forward an RPC request to a remote host
628
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
629
630
631
632
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @returns 0 on success, hg_return_t values on error
 */
633
hg_return_t margo_provider_forward(
634
    uint16_t provider_id,
635
636
637
    hg_handle_t handle,
    void *in_struct);

638
#define margo_forward(__handle, __in_struct)\
639
    margo_provider_forward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct)
640

Matthieu Dorier's avatar
Matthieu Dorier committed
641
642
/**
 * Forward (without blocking) an RPC request to a remote host
643
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
Matthieu Dorier's avatar
Matthieu Dorier committed
644
645
646
647
648
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
649
hg_return_t margo_provider_iforward(
650
    uint16_t provider_id,
Matthieu Dorier's avatar
Matthieu Dorier committed
651
652
653
654
    hg_handle_t handle,
    void* in_struct,
    margo_request* req);

655
#define margo_iforward(__handle, __in_struct, __req)\
656
    margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
657

658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
/**
 * Forward an RPC request to a remote provider with a user-defined timeout
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_forward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms);

#define margo_forward_timed(__handle, __in_struct, __timeout)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout)

/**
 * Non-blocking version of margo_provider_forward_timed.
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_iforward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms,
    margo_request* req);

#define margo_iforward_timed(__handle, __in_struct, __timeout, __req)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout, __req)

Matthieu Dorier's avatar
Matthieu Dorier committed
693
694
695
696
697
698
699
700
701
/**
 * Wait for an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * @param [in] req request to wait on
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_wait(
    margo_request req);

Matthieu Dorier's avatar
Matthieu Dorier committed
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
/**
 * @brief Waits for any of the provided requests to complete.
 * Note that even if an error occures, index will be set to
 * the index of the request for which the error happened.
 * This function will correctly ignore requests that are
 * equal to MARGO_REQUEST_NULL. If all the requests are
 * equal to MARGO_REQUEST_NULL, this function will return
 * HG_SUCCESS and set index to count.
 *
 * @param req Array of requests
 * @param count Number of requests
 * @param index index of the request that completed
 *
 * @return 0 on success, hg_return_t values on error
 */
hg_return_t margo_wait_any(
    size_t count, margo_request* req, size_t* index);
Matthieu Dorier's avatar
Matthieu Dorier committed
719
720
721
722
723
724
725
726
727
728
729
730
731

/**
 * Test if an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * has completed.
 *
 * @param [in] req request created by the non-blocking call
 * @param [out] flag 1 if request is completed, 0 otherwise
 *
 * @return 0 on success, ABT error code otherwise
 */
int margo_test(margo_request req, int* flag);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
/**
 * Send an RPC response, waiting for completion before returning
 * control to the calling ULT.
 * Note: this call is typically not needed as RPC listeners need not concern
 * themselves with what happens after an RPC finishes. However, there are cases
 * when this is useful (deferring resource cleanup, calling margo_finalize()
 * for e.g. a shutdown RPC).
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct);

Matthieu Dorier's avatar
Matthieu Dorier committed
748
749
750
751
752
753
754
755
756
757
758
759
760
/**
 * Send an RPC response without blocking.
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @param [out] req request on which to wait using margo_wait
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req);

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
/**
 * Create an abstract bulk handle from specified memory segments.
 * Memory allocated is then freed when margo_bulk_free() is called.
 * \remark If NULL is passed to buf_ptrs, i.e.,
 * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim
 * memory for the missing buf_ptrs array will be internally allocated.
 *
 * \param [in] mid          Margo instance 
 * \param [in] count        number of segments
 * \param [in] buf_ptrs     array of pointers
 * \param [in] buf_sizes    array of sizes
 * \param [in] flags        permission flag:
 *                             - HG_BULK_READWRITE
 *                             - HG_BULK_READ_ONLY
 *                             - HG_BULK_WRITE_ONLY
 * \param [out] handle      pointer to returned abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle);

/**
 * Free bulk handle.
 *
 * \param [in/out] handle   abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_free(
    hg_bulk_t handle);

/**
 * Increment ref count on bulk handle.
 *
 * \param handle [IN]           abstract bulk handle
802
803
804
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
805
#define margo_bulk_ref_incr HG_Bulk_ref_incr
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822

/**
 * Access bulk handle to retrieve memory segments abstracted by handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] offset           bulk offset
 * \param [in] size             bulk size
 * \param [in] flags            permission flag:
 *                                 - HG_BULK_READWRITE
 *                                 - HG_BULK_READ_ONLY
 * \param [in] max_count        maximum number of segments to be returned
 * \param [in/out] buf_ptrs     array of buffer pointers
 * \param [in/out] buf_sizes    array of buffer sizes
 * \param [out] actual_count    actual number of segments returned
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
823
#define margo_bulk_access HG_Bulk_access
824
825
826
827
828
829
830
831

/**
 * Get total size of data abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
832
#define margo_bulk_get_size HG_Bulk_get_size
833
834
835
836
837
838
839
840

/**
 * Get total number of segments abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
841
#define margo_bulk_get_segment_count HG_Bulk_get_segment_count
842
843
844
845
846
847
848
849
850
851
852

/**
 * Get size required to serialize bulk handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] request_eager    boolean (passing HG_TRUE adds size of encoding
 *                              actual data along the handle if handle meets
 *                              HG_BULK_READ_ONLY flag condition)
 *
 * \return Non-negative value
 */
853
#define margo_bulk_get_serialize_size HG_Bulk_get_serialize_size
854
855
856
857
858
859
860
861
862
863
864
865
866
867

/**
 * Serialize bulk handle into a buffer.
 *
 * \param [in/out] buf          pointer to buffer
 * \param [in] buf_size         buffer size
 * \param [in] request_eager    boolean (passing HG_TRUE encodes actual data
 *                              along the handle, which is more efficient for
 *                              small data, this is only valid if bulk handle
 *                              has HG_BULK_READ_ONLY permission)
 * \param [in] handle           abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
868
#define margo_bulk_serialize HG_Bulk_serialize
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884

/**
 * Deserialize bulk handle from an existing buffer.
 *
 * \param [in] mid      Margo instance 
 * \param [out] handle  abstract bulk handle
 * \param [in] buf      pointer to buffer
 * \param [in] buf_size buffer size
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size);
885

886
887
/** 
 * Perform a bulk transfer
Philip Carns's avatar
Philip Carns committed
888
 * @param [in] mid Margo instance
889
890
891
892
893
894
895
896
897
898
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_transfer(
899
    margo_instance_id mid,
900
    hg_bulk_op_t op,
901
    hg_addr_t origin_addr,
902
903
904
905
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
906
    size_t size);
907

Matthieu Dorier's avatar
Matthieu Dorier committed
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
/** 
 * Asynchronously performs a bulk transfer
 * @param [in] mid Margo instance
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req);

932
933
934
935
936
937
938
939
/**
 * Suspends the calling ULT for a specified time duration
 * @param [in] mid Margo instance
 * @param [in] timeout_ms timeout duration in milliseconds
 */
void margo_thread_sleep(
    margo_instance_id mid,
    double timeout_ms);
940

941
/**
942
943
944
 * Retrieve the abt_handler pool that was associated with the instance at 
 *    initialization time
 * @param [in] mid Margo instance
945
946
 * @param [out] pool handler pool
 * @return 0 on success, error code on failure
947
 */
948
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool);
949

950
951
952
953
954
955
956
/**
 * Retrieve the rpc handler abt pool that is associated with this handle
 * @param [in] h handle
 * @return pool
 */
ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h);

957
/**
958
959
 * Retrieve the Mercury context that was associated with this instance at
 *    initialization time
960
 * @param [in] mid Margo instance
961
 * @return the Mercury context used in margo_init
962
 */
963
hg_context_t* margo_get_context(margo_instance_id mid);
964

965
966
967
/**
 * Retrieve the Mercury class that was associated with this instance at
 *    initialization time
968
 * @param [in] mid Margo instance
969
 * @return the Mercury class used in margo_init
970
 */
971
hg_class_t* margo_get_class(margo_instance_id mid);
972

973
974
975
976
977
978
979
980
981
/**
 * Get the margo_instance_id from a received RPC handle.
 *
 * \param [in] h          RPC handle
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);

982
983
984
985
986
987
988
989
990
/**
 * Get the margo_instance_id from an hg_info struct 
 *
 * \param [in] info       hg_info struct 
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);

Philip Carns's avatar
Philip Carns committed
991
992
993
994
995
996
997
998
/**
 * Enables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_start(margo_instance_id mid);

999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
/**
 * Enables profile data collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_profile_start(margo_instance_id mid);

/**
 * Disables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_stop(margo_instance_id mid);

/**
 * Disables profile data collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_profile_stop(margo_instance_id mid);

Philip Carns's avatar
Philip Carns committed
1023
1024
1025
1026
1027
1028
/**
 * Appends diagnostic statistics (enabled via margo_diag_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
1029
1030
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
Philip Carns's avatar
Philip Carns committed
1031
1032
 * @returns void
 */
1033
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
1034

1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
/**
 * Appends profile statistics (enabled via margo_profile_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
 * @returns void
 */
void margo_profile_dump(margo_instance_id mid, const char* file, int uniquify);

/**
 * Grabs a snapshot of the current state of diagnostics within the margo instance 
 *
 * @param [in] mid Margo instance
 * @param [out] snap Margo diagnostics snapshot
 * @returns void
 */
void margo_breadcrumb_snapshot(margo_instance_id mid, struct margo_breadcrumb_snapshot* snap); 

1056
1057
1058
1059
1060
1061
1062
1063
/**
 * Sets configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] inout_param used to pass in values
 * @returns void
 */
1064
void margo_set_param(margo_instance_id mid, int option, const void *param);
1065
1066
1067
1068
1069
1070
1071
1072
1073

/**
 * Retrieves configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] param used to pass out values
 * @returns void
 */
1074
void margo_get_param(margo_instance_id mid, int option, void *param);
1075
1076


1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 *
 * @return whether margo_finalize() was called.
 */
int __margo_internal_finalize_requested(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_incr_pending(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_decr_pending(margo_instance_id mid);

1106
1107
1108
1109
1110
/**
 * @private
 * Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
 * called by users!
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
1111
1112
1113
1114
1115
1116
1117
1118
void __margo_internal_pre_wrapper_hooks(margo_instance_id mid, hg_handle_t handle);

/**
 * @private
 * Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
 * called by users!
 */
void __margo_internal_post_wrapper_hooks(margo_instance_id mid);
1119

1120
1121
1122
/**
 * macro that registers a function as an RPC.
 */
1123
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
1124
    margo_provider_register_name(__mid, __func_name, \
1125
1126
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
1127
        _handler_for_##__handler, \
1128
        MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
1129

1130
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
1131
    margo_provider_register_name(__mid, __func_name, \
1132
1133
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
1134
        _handler_for_##__handler, \
1135
        __provider_id, __pool);
1136

1137
#define _handler_for_NULL NULL
1138

1139
#define __MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
1140
1141
    margo_instance_id __mid; \
    __mid = margo_hg_handle_get_instance(handle); \
Matthieu Dorier's avatar
Matthieu Dorier committed
1142
    __margo_internal_pre_wrapper_hooks(__mid, handle); \
1143
    __name(handle); \
Matthieu Dorier's avatar
Matthieu Dorier committed
1144
    __margo_internal_post_wrapper_hooks(__mid);
1145
1146

#define __MARGO_INTERNAL_RPC_WRAPPER(__name) \
1147
void _wrapper_for_##__name(hg_handle_t handle) { \
1148
1149
1150
1151
    __MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
}

#define __MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
1152
    int __ret; \
Philip Carns's avatar
Philip Carns committed
1153
    ABT_pool __pool; \
1154
    margo_instance_id __mid; \
1155
1156
    __mid = margo_hg_handle_get_instance(handle); \
    if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
1157
    if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
1158
    __pool = margo_hg_handle_get_handler_pool(handle); \
1159
    __margo_internal_incr_pending(__mid); \
1160
    __ret = ABT_thread_create(__pool, (void (*)(void *))_wrapper_for_##__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
1161
1162
1163
    if(__ret != 0) { \
        return(HG_NOMEM_ERROR); \
    } \
1164
1165
1166
    return(HG_SUCCESS);

#define __MARGO_INTERNAL_RPC_HANDLER(__name) \
1167
hg_return_t _handler_for_##__name(hg_handle_t handle) { \
1168
    __MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
1169
1170
}

1171
1172
1173
1174
1175
1176
1177
1178
/**
 * macro that defines a function to glue an RPC handler to a ult handler
 * @param [in] __name name of handler function
 */
#define DEFINE_MARGO_RPC_HANDLER(__name) \
    __MARGO_INTERNAL_RPC_WRAPPER(__name) \
    __MARGO_INTERNAL_RPC_HANDLER(__name)

Philip Carns's avatar
Philip Carns committed
1179
1180
/**
 * macro that declares the prototype for a function to glue an RPC 
Philip Carns's avatar
Philip Carns committed
1181
 * handler to a ult
Philip Carns's avatar
Philip Carns committed
1182
1183
 * @param [in] __name name of handler function
 */
1184
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t _handler_for_##__name(hg_handle_t handle);
1185

1186
1187
1188
1189
#ifdef __cplusplus
}
#endif

1190
#endif /* __MARGO */