margo.h 32 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#ifndef __MARGO
#define __MARGO

10
11
12
13
#ifdef __cplusplus
extern "C" {
#endif

14
#include <mercury.h>
15
16
#include <mercury_types.h>
#include <mercury_bulk.h>
17
18
19
#include <mercury_macros.h>
#include <abt.h>

20
21
22
23
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)

Shane Snyder's avatar
Shane Snyder committed
24
25
26
27
28
/* This is to prevent the user from usin HG_Register_data
 * and HG_Registered_data, which are replaced with
 * margo_register_data and margo_registered_data
 * respecively.
 */
29
30
#undef MERCURY_REGISTER

31
32
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
33
typedef struct margo_data* margo_data_ptr;
34
typedef struct margo_request_struct* margo_request;
35

36
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
37
#define MARGO_REQUEST_NULL ((margo_request)NULL)
Shane Snyder's avatar
Shane Snyder committed
38
39
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
40
#define MARGO_DEFAULT_PROVIDER_ID 0
41
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
42

Shane Snyder's avatar
Shane Snyder committed
43
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
44

Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
46
/**
 * Initializes margo library.
Shane Snyder's avatar
Shane Snyder committed
47
 * @param [in] addr_str            Mercury host address with port number
Shane Snyder's avatar
Shane Snyder committed
48
49
50
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
Jonathan Jenkins's avatar
Jonathan Jenkins committed
51
52
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
53
 *                                 it will run in the caller's thread context.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
55
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
56
57
58
59
60
61
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
62
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
63
64
65
66
67
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
68
 */
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#define margo_init(_addr_str, _mode, _use_progress_thread, _rpc_thread_count)\
 margo_init_opt(_addr_str, _mode, NULL, _use_progress_thread, _rpc_thread_count)

/**
 * Initializes margo library with custom Mercury options.
 * @param [in] addr_str            Mercury host address with port number
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
 * @param [in] hg_init_info        (Optional) Hg init info, passed directly
 *                                 to Mercury
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
 *                                 it will run in the caller's thread context.
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
 */
margo_instance_id margo_init_opt(
Shane Snyder's avatar
Shane Snyder committed
99
    const char *addr_str,
Shane Snyder's avatar
Shane Snyder committed
100
    int mode,
101
    const struct hg_init_info *hg_init_info,
Shane Snyder's avatar
Shane Snyder committed
102
103
    int use_progress_thread,
    int rpc_thread_count);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
104

105

106
/**
Jonathan Jenkins's avatar
doc fix    
Jonathan Jenkins committed
107
 * Initializes margo library from given argobots and Mercury instances.
Philip Carns's avatar
Philip Carns committed
108
109
110
 * @param [in] progress_pool Argobots pool to drive communication
 * @param [in] handler_pool Argobots pool to service RPC handlers
 * @param [in] hg_context Mercury context
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
111
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
Philip Carns's avatar
Philip Carns committed
112
113
114
115
116
117
118
119
120
121
 *
 * NOTE: if you are configuring Argobots pools yourself before
 * passing them into this function, please consider setting
 * ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
 * environment or programmatically with putenv() in your code before
 * creating the pools to prevent excess memory consumption under
 * load from producer/consumer patterns across execution streams that
 * fail to utilize per-execution stream stack caches.  See
 * https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
 * The margo_init() function does this automatically.
122
 */
Shane Snyder's avatar
Shane Snyder committed
123
124
125
margo_instance_id margo_init_pool(
    ABT_pool progress_pool,
    ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
126
    hg_context_t *hg_context);
127
128

/**
Philip Carns's avatar
Philip Carns committed
129
 * Shuts down margo library and its underlying abt and mercury resources
Philip Carns's avatar
Philip Carns committed
130
 * @param [in] mid Margo instance
131
 */
132
133
void margo_finalize(
    margo_instance_id mid);
134
135
136
137
138
139
140
141
142
143
144

/**
 * Suspends the caller until some other entity (e.g. an RPC, thread, or
 * signal handler) invokes margo_finalize().
 *
 * NOTE: This informs Margo that the calling thread no longer needs to be 
 * scheduled for execution if it is sharing an Argobots pool with the
 * progress engine.
 *
 * @param [in] mid Margo instance
 */
145
146
void margo_wait_for_finalize(
    margo_instance_id mid);
147

148
149
150
151
152
153
154
155
156
157
/**
 * Checks whether a Margo instance we initialized as a server.
 *
 * @param [in] mid Margo instance
 *
 * @return HG_TRUE if listening or HG_FALSE if not, or not a valid margo instance.
 */
hg_bool_t margo_is_listening(
    margo_instance_id mid);

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
 * Installs a callback to be called before the margo instance is finalize.
 * Callbacks installed will be called in reverse ordered than they have been
 * pushed, and with the user-provider pointer as argument.
 *
 * Note that callbacks may not be called within margo_finalize. They are called
 * when the margo instance is cleaned up, which may happen in margo_wait_for_finalize.
 *
 * @param mid The margo instance
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_push_finalize_callback(
    margo_instance_id mid,
    void(*cb)(void*), 
    void* uargs);

175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/**
 * @brief Removes the last finalize callback that was pushed into the margo instance
 * without calling it. If a callback was remoted, this function returns 1, otherwise
 * it returns 0.
 *
 * @param mid Margo instance.
 */
int margo_pop_finalize_callback(
    margo_instance_id mid);

/**
 * @brief Installs a callback to be called before the margo instance is finalized.
 * The owner pointer allows to identify callbacks installed by particular providers.
 * Note that the same owner can install multiple callbacks. If they are not popped,
 * they will be called in reverse order of installation by the margo cleanup procedure.
 *
 * @param mid The margo instance
 * @param owner Owner of the callback (to be used when popping callbacks)
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_provider_push_finalize_callback(
    margo_instance_id mid,
    void* owner,
    void(*cb)(void*),
    void* uargs);

/**
 * @brief Removes the last finalize callback that was pushed into the margo instance
 * by the specified owner. If a callback was remoted, this function returns 1, otherwise
 * it returns 0.
 *
 * @param mid Margo instance.
 * @param owner Owner of the callback.
 */
int margo_provider_pop_finalize_callback(
    margo_instance_id mid,
    void* owner);

Matthieu Dorier's avatar
Matthieu Dorier committed
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/**
 * Allows the passed Margo instance to be shut down remotely
 * using margo_shutdown_remote_instance().
 * 
 * @param mid Margo instance
 */
void margo_enable_remote_shutdown(margo_instance_id mid);

/**
 * Trigger the shutdown of the Margo instance running
 * at remote_addr.
 *
 * @param mid Local Margo instance
 * @param remote_addr Address of the Margo instance to shut down.
 *
 * @return 0 on success, -1 on failure.
 */
int margo_shutdown_remote_instance(
        margo_instance_id mid, 
        hg_addr_t remote_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
235

236
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
237
 * Registers an RPC with margo that is associated with a provider instance
238
239
240
241
242
243
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
Matthieu Dorier's avatar
Matthieu Dorier committed
244
245
 * \param [in] provider_id provider identifier
 * \param [in] pool Argobots pool the handler will execute in
246
247
 *
 * \return unique ID associated to the registered function
248
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
249
hg_id_t margo_provider_register_name(
250
251
252
253
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
254
255
256
    hg_rpc_cb_t rpc_cb,
    uint16_t provider_id,
    ABT_pool pool);
257

258
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
259
 * Registers an RPC with margo
260
261
262
263
264
265
266
267
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
 *
 * \return unique ID associated to the registered function
268
 */
269
static inline hg_id_t margo_register_name(
270
271
272
273
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
274
275
276
277
278
    hg_rpc_cb_t rpc_cb)
{
    return margo_provider_register_name(mid, func_name,
            in_proc_cb, out_proc_cb, rpc_cb, 0, ABT_POOL_NULL);
}
279

Matthieu Dorier's avatar
Matthieu Dorier committed
280
281
282
283
284
285
286
287
288
289
290
291
/**
 * Deregisters an RPC with margo
 *
 * \param [in] mid Margo instance
 * \param [in] rpc_id Id of the RPC to deregister
 *
 * \return HG_SUCCESS or corresponding error code
 */
hg_return_t margo_deregister(
    margo_instance_id mid,
    hg_id_t rpc_id);

292
/*
293
 * Indicate whether margo_register_name() has been called for the RPC specified by
294
295
 * func_name.
 *
296
297
298
299
 * \param [in] mid Margo instance
 * \param [in] func_name function name
 * \param [out] id registered RPC ID
 * \param [out] flag pointer to boolean
300
301
 *
 * \return HG_SUCCESS or corresponding HG error code
302
 */
303
304
305
306
307
hg_return_t margo_registered_name(
    margo_instance_id mid,
    const char *func_name,
    hg_id_t *id,
    hg_bool_t *flag);
308

309
/**
310
 * Indicate whether the given RPC name has been registered with the given provider id.
311
312
313
 *
 * @param [in] mid Margo instance
 * @param [in] func_name function name
314
 * @param [in] provider_id provider id
315
316
317
318
319
 * @param [out] id registered RPC ID
 * @param [out] flag pointer to boolean
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
320
hg_return_t margo_provider_registered_name(
321
322
    margo_instance_id mid,
    const char *func_name,
323
    uint16_t provider_id,
324
325
326
    hg_id_t *id,
    hg_bool_t *flag);

327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/**
 * Register and associate user data to registered function.
 * When HG_Finalize() is called free_callback (if defined) is called 
 * to free the registered data.
 *
 * \param [in] mid            Margo instance
 * \param [in] id             registered function ID
 * \param [in] data           pointer to data
 * \param [in] free_callback  pointer to free function
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *));

/**
 * Indicate whether margo_register_data() has been called and return associated
 * data.
 *
 * \param [in] mid        Margo instance 
 * \param [in] id         registered function ID
 *
 * \return Pointer to data or NULL
 */
354
355
356
void* margo_registered_data(
    margo_instance_id mid,
    hg_id_t id);
357

358

359
/**
360
 * Disable response for a given RPC ID.
361
 *
362
363
364
 * \param [in] mid          Margo instance 
 * \param [in] id           registered function ID
 * \param [in] disable_flag flag to disable (1) or re-enable (0) responses
365
366
 *
 * \return HG_SUCCESS or corresponding HG error code
367
 */
368
369
370
371
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag);
372

373
374
375
376
377
378
379
380
381
382
383
384
385
386
/**
 * Checks if response is disabled for a given RPC ID.
 *
 * @param [in] mid           Margo instance
 * @param [in] id            registered function ID
 * @param [ou] disabled_flag flag indicating whether response is disabled (1) or not (0)
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_registered_disabled_response(
    margo_instance_id mid,
    hg_id_t id,
    int* disabled_flag);

387
388
/**
 * Lookup an addr from a peer address/name.
389
390
391
392
 * \param [in] name     lookup name
 * \param [out] addr    return address
 *
 * \return HG_SUCCESS or corresponding HG error code
393
394
395
 */
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
396
397
    const char *name,
    hg_addr_t *addr);
398
399

/**
400
 * Free the given Mercury addr.
401
 *
402
403
 * \param [in] mid  Margo instance 
 * \param [in] addr Mercury address
404
405
406
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
407
408
409
hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr);
410
411

/**
412
 * Access self address. Address must be freed with margo_addr_free().
413
 *
414
415
 * \param [in] mid  Margo instance 
 * \param [in] addr pointer to abstract Mercury address
416
417
418
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
419
420
421
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr);
422
423

/**
424
 * Duplicate an existing Mercury address. 
425
 *
426
427
428
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address to duplicate
 * \param [in] new_addr pointer to newly allocated abstract Mercury address
429
430
431
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
432
433
434
435
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr);
436
437

/**
438
439
440
441
442
 * Convert a Mercury addr to a string (returned string includes the
 * terminating null byte '\0'). If buf is NULL, the address is not
 * converted and only the required size of the buffer is returned.
 * If the input value passed through buf_size is too small, HG_SIZE_ERROR
 * is returned and the buf_size output is set to the minimum size required.
443
 *
444
445
446
447
 * \param [in] mid          Margo instance 
 * \param [in/out] buf      pointer to destination buffer
 * \param [in/out] buf_size pointer to buffer size
 * \param [in] addr         abstract Mercury address
448
449
450
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
451
452
453
454
455
hg_return_t margo_addr_to_string(
    margo_instance_id mid,
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr);
456
457

/**
458
459
460
461
 * Initiate a new Mercury RPC using the specified function ID and the
 * local/remote target defined by addr. The handle created can be used to
 * query input and output, as well as issuing the RPC by calling
 * HG_Forward(). After completion the handle must be freed using HG_Destroy().
462
 *
463
464
465
466
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address of destination
 * \param [in] id       registered function ID
 * \param [out] handle  pointer to HG handle
467
468
469
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
470
471
472
473
474
hg_return_t margo_create(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_id_t id,
    hg_handle_t *handle);
475
476

/**
477
 * Destroy Mercury handle.
478
 *
Shane Snyder's avatar
Shane Snyder committed
479
 * \param [in] handle   Mercury handle
480
481
482
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
483
484
hg_return_t margo_destroy(
    hg_handle_t handle);
485
486

/**
487
 * Increment ref count on a Mercury handle.
488
 *
489
 * \param [in] handle Mercury handle
490
491
492
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
493
#define margo_ref_incr HG_Ref_incr
494
495
496
497

/**
 * Get info from handle.
 *
498
 * \param [in] handle Mercury handle
499
500
501
 *
 * \return Pointer to info or NULL in case of failure
 */
502
#define margo_get_info HG_Get_info
503
504
505

/**
 * Get input from handle (requires registration of input proc to deserialize
506
 * parameters). Input must be freed using margo_free_input().
507
 *
508
509
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
510
511
512
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
513
#define margo_get_input HG_Get_input
514
515
516
517

/**
 * Free resources allocated when deserializing the input.
 *
518
519
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
520
521
522
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
523
#define margo_free_input HG_Free_input
524
525
526

/**
 * Get output from handle (requires registration of output proc to deserialize
527
 * parameters). Output must be freed using margo_free_output().
528
 *
529
530
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
531
532
533
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
534
#define margo_get_output HG_Get_output
535
536
537
538

/**
 * Free resources allocated when deserializing the output.
 *
539
540
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
541
542
543
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
544
#define margo_free_output HG_Free_output
545

546
547
/**
 * Forward an RPC request to a remote host
548
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
549
550
551
552
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @returns 0 on success, hg_return_t values on error
 */
553
hg_return_t margo_provider_forward(
554
    uint16_t provider_id,
555
556
557
    hg_handle_t handle,
    void *in_struct);

558
#define margo_forward(__handle, __in_struct)\
559
    margo_provider_forward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct)
560

Matthieu Dorier's avatar
Matthieu Dorier committed
561
562
/**
 * Forward (without blocking) an RPC request to a remote host
563
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
Matthieu Dorier's avatar
Matthieu Dorier committed
564
565
566
567
568
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
569
hg_return_t margo_provider_iforward(
570
    uint16_t provider_id,
Matthieu Dorier's avatar
Matthieu Dorier committed
571
572
573
574
    hg_handle_t handle,
    void* in_struct,
    margo_request* req);

575
#define margo_iforward(__handle, __in_struct, __req)\
576
    margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
577

578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
/**
 * Forward an RPC request to a remote provider with a user-defined timeout
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_forward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms);

#define margo_forward_timed(__handle, __in_struct, __timeout)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout)

/**
 * Non-blocking version of margo_provider_forward_timed.
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_iforward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms,
    margo_request* req);

#define margo_iforward_timed(__handle, __in_struct, __timeout, __req)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout, __req)

Matthieu Dorier's avatar
Matthieu Dorier committed
613
614
615
616
617
618
619
620
621
/**
 * Wait for an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * @param [in] req request to wait on
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_wait(
    margo_request req);

Matthieu Dorier's avatar
Matthieu Dorier committed
622
623
624
625
626
627
628
629
630
631
632
633
634

/**
 * Test if an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * has completed.
 *
 * @param [in] req request created by the non-blocking call
 * @param [out] flag 1 if request is completed, 0 otherwise
 *
 * @return 0 on success, ABT error code otherwise
 */
int margo_test(margo_request req, int* flag);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
/**
 * Send an RPC response, waiting for completion before returning
 * control to the calling ULT.
 * Note: this call is typically not needed as RPC listeners need not concern
 * themselves with what happens after an RPC finishes. However, there are cases
 * when this is useful (deferring resource cleanup, calling margo_finalize()
 * for e.g. a shutdown RPC).
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct);

Matthieu Dorier's avatar
Matthieu Dorier committed
651
652
653
654
655
656
657
658
659
660
661
662
663
/**
 * Send an RPC response without blocking.
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @param [out] req request on which to wait using margo_wait
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req);

664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
/**
 * Create an abstract bulk handle from specified memory segments.
 * Memory allocated is then freed when margo_bulk_free() is called.
 * \remark If NULL is passed to buf_ptrs, i.e.,
 * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim
 * memory for the missing buf_ptrs array will be internally allocated.
 *
 * \param [in] mid          Margo instance 
 * \param [in] count        number of segments
 * \param [in] buf_ptrs     array of pointers
 * \param [in] buf_sizes    array of sizes
 * \param [in] flags        permission flag:
 *                             - HG_BULK_READWRITE
 *                             - HG_BULK_READ_ONLY
 *                             - HG_BULK_WRITE_ONLY
 * \param [out] handle      pointer to returned abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle);

/**
 * Free bulk handle.
 *
 * \param [in/out] handle   abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_free(
    hg_bulk_t handle);

/**
 * Increment ref count on bulk handle.
 *
 * \param handle [IN]           abstract bulk handle
705
706
707
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
708
#define margo_bulk_ref_incr HG_Bulk_ref_incr
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725

/**
 * Access bulk handle to retrieve memory segments abstracted by handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] offset           bulk offset
 * \param [in] size             bulk size
 * \param [in] flags            permission flag:
 *                                 - HG_BULK_READWRITE
 *                                 - HG_BULK_READ_ONLY
 * \param [in] max_count        maximum number of segments to be returned
 * \param [in/out] buf_ptrs     array of buffer pointers
 * \param [in/out] buf_sizes    array of buffer sizes
 * \param [out] actual_count    actual number of segments returned
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
726
#define margo_bulk_access HG_Bulk_access
727
728
729
730
731
732
733
734

/**
 * Get total size of data abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
735
#define margo_bulk_get_size HG_Bulk_get_size
736
737
738
739
740
741
742
743

/**
 * Get total number of segments abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
744
#define margo_bulk_get_segment_count HG_Bulk_get_segment_count
745
746
747
748
749
750
751
752
753
754
755

/**
 * Get size required to serialize bulk handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] request_eager    boolean (passing HG_TRUE adds size of encoding
 *                              actual data along the handle if handle meets
 *                              HG_BULK_READ_ONLY flag condition)
 *
 * \return Non-negative value
 */
756
#define margo_bulk_get_serialize_size HG_Bulk_get_serialize_size
757
758
759
760
761
762
763
764
765
766
767
768
769
770

/**
 * Serialize bulk handle into a buffer.
 *
 * \param [in/out] buf          pointer to buffer
 * \param [in] buf_size         buffer size
 * \param [in] request_eager    boolean (passing HG_TRUE encodes actual data
 *                              along the handle, which is more efficient for
 *                              small data, this is only valid if bulk handle
 *                              has HG_BULK_READ_ONLY permission)
 * \param [in] handle           abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
771
#define margo_bulk_serialize HG_Bulk_serialize
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787

/**
 * Deserialize bulk handle from an existing buffer.
 *
 * \param [in] mid      Margo instance 
 * \param [out] handle  abstract bulk handle
 * \param [in] buf      pointer to buffer
 * \param [in] buf_size buffer size
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size);
788

789
790
/** 
 * Perform a bulk transfer
Philip Carns's avatar
Philip Carns committed
791
 * @param [in] mid Margo instance
792
793
794
795
796
797
798
799
800
801
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_transfer(
802
    margo_instance_id mid,
803
    hg_bulk_op_t op,
804
    hg_addr_t origin_addr,
805
806
807
808
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
809
    size_t size);
810

Matthieu Dorier's avatar
Matthieu Dorier committed
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
/** 
 * Asynchronously performs a bulk transfer
 * @param [in] mid Margo instance
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req);

835
836
837
838
839
840
841
842
/**
 * Suspends the calling ULT for a specified time duration
 * @param [in] mid Margo instance
 * @param [in] timeout_ms timeout duration in milliseconds
 */
void margo_thread_sleep(
    margo_instance_id mid,
    double timeout_ms);
843

844
/**
845
846
847
 * Retrieve the abt_handler pool that was associated with the instance at 
 *    initialization time
 * @param [in] mid Margo instance
848
849
 * @param [out] pool handler pool
 * @return 0 on success, error code on failure
850
 */
851
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool);
852

853
854
855
856
857
858
859
/**
 * Retrieve the rpc handler abt pool that is associated with this handle
 * @param [in] h handle
 * @return pool
 */
ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h);

860
/**
861
862
 * Retrieve the Mercury context that was associated with this instance at
 *    initialization time
863
 * @param [in] mid Margo instance
864
 * @return the Mercury context used in margo_init
865
 */
866
hg_context_t* margo_get_context(margo_instance_id mid);
867

868
869
870
/**
 * Retrieve the Mercury class that was associated with this instance at
 *    initialization time
871
 * @param [in] mid Margo instance
872
 * @return the Mercury class used in margo_init
873
 */
874
hg_class_t* margo_get_class(margo_instance_id mid);
875

876
877
878
879
880
881
882
883
884
/**
 * Get the margo_instance_id from a received RPC handle.
 *
 * \param [in] h          RPC handle
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);

885
886
887
888
889
890
891
892
893
/**
 * Get the margo_instance_id from an hg_info struct 
 *
 * \param [in] info       hg_info struct 
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);

Philip Carns's avatar
Philip Carns committed
894
895
896
897
898
899
900
901
902
903
904
905
906
907
/**
 * Enables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_start(margo_instance_id mid);

/**
 * Appends diagnostic statistics (enabled via margo_diag_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
908
909
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
Philip Carns's avatar
Philip Carns committed
910
911
 * @returns void
 */
912
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
913

914
915
916
917
918
919
920
921
/**
 * Sets configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] inout_param used to pass in values
 * @returns void
 */
922
void margo_set_param(margo_instance_id mid, int option, const void *param);
923
924
925
926
927
928
929
930
931

/**
 * Retrieves configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] param used to pass out values
 * @returns void
 */
932
void margo_get_param(margo_instance_id mid, int option, void *param);
933
934


935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 *
 * @return whether margo_finalize() was called.
 */
int __margo_internal_finalize_requested(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_incr_pending(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_decr_pending(margo_instance_id mid);

964
965
966
/**
 * macro that registers a function as an RPC.
 */
967
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
968
    margo_provider_register_name(__mid, __func_name, \
969
970
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
971
972
        __handler##_handler, \
        MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
973

974
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
975
    margo_provider_register_name(__mid, __func_name, \
976
977
978
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
        __handler##_handler, \
979
        __provider_id, __pool);
980

981
982
#define NULL_handler NULL

983
/**
Philip Carns's avatar
Philip Carns committed
984
 * macro that defines a function to glue an RPC handler to a ult handler
985
986
 * @param [in] __name name of handler function
 */
987
#define DEFINE_MARGO_RPC_HANDLER(__name) \
988
989
990
void __name##_wrapper(hg_handle_t handle) { \
    margo_instance_id __mid; \
    __mid = margo_hg_handle_get_instance(handle); \
991
    __name(handle); \
992
993
994
995
996
    __margo_internal_decr_pending(__mid); \
    if(__margo_internal_finalize_requested(__mid)) { \
        margo_finalize(__mid); \
    } \
} \
997
hg_return_t __name##_handler(hg_handle_t handle) { \
998
    int __ret; \
Philip Carns's avatar
Philip Carns committed
999
    ABT_pool __pool; \
1000
    margo_instance_id __mid; \
1001
1002
    __mid = margo_hg_handle_get_instance(handle); \
    if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
1003
    if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
1004
    __pool = margo_hg_handle_get_handler_pool(handle); \
1005
1006
    __margo_internal_incr_pending(__mid); \
    __ret = ABT_thread_create(__pool, (void (*)(void *))__name##_wrapper, handle, ABT_THREAD_ATTR_NULL, NULL); \
1007
1008
1009
1010
1011
1012
    if(__ret != 0) { \
        return(HG_NOMEM_ERROR); \
    } \
    return(HG_SUCCESS); \
}

Philip Carns's avatar
Philip Carns committed
1013
1014
/**
 * macro that declares the prototype for a function to glue an RPC 
Philip Carns's avatar
Philip Carns committed
1015
 * handler to a ult
Philip Carns's avatar
Philip Carns committed
1016
1017
 * @param [in] __name name of handler function
 */
1018
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
1019

1020
1021
1022
1023
#ifdef __cplusplus
}
#endif

1024
#endif /* __MARGO */