margo.h 29.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#ifndef __MARGO
#define __MARGO

10
11
12
13
#ifdef __cplusplus
extern "C" {
#endif

14
15
16
17
18
19
/* This is to prevent the user from usin HG_Register_data
 * and HG_Registered_data, which are replaced with
 * margo_register_data and margo_registered_data
 * respecively.
 */

20
#include <mercury.h>
21
22
#include <mercury_types.h>
#include <mercury_bulk.h>
23
24
25
#include <mercury_macros.h>
#include <abt.h>

26
27
28
29
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)

30
31
#undef MERCURY_REGISTER

32
33
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
34
typedef struct margo_data* margo_data_ptr;
Matthieu Dorier's avatar
Matthieu Dorier committed
35
typedef ABT_eventual margo_request;
36

37
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
Matthieu Dorier's avatar
Matthieu Dorier committed
38
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
Shane Snyder's avatar
Shane Snyder committed
39
40
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
41
#define MARGO_DEFAULT_PROVIDER_ID 0
42
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
43

Shane Snyder's avatar
Shane Snyder committed
44
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
45

Jonathan Jenkins's avatar
Jonathan Jenkins committed
46
47
/**
 * Initializes margo library.
Shane Snyder's avatar
Shane Snyder committed
48
 * @param [in] addr_str            Mercury host address with port number
Shane Snyder's avatar
Shane Snyder committed
49
50
51
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
Jonathan Jenkins's avatar
Jonathan Jenkins committed
52
53
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
54
 *                                 it will run in the caller's thread context.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
55
56
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
57
58
59
60
61
62
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
63
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
64
65
66
67
68
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
69
 */
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#define margo_init(_addr_str, _mode, _use_progress_thread, _rpc_thread_count)\
 margo_init_opt(_addr_str, _mode, NULL, _use_progress_thread, _rpc_thread_count)

/**
 * Initializes margo library with custom Mercury options.
 * @param [in] addr_str            Mercury host address with port number
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
 * @param [in] hg_init_info        (Optional) Hg init info, passed directly
 *                                 to Mercury
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
 *                                 it will run in the caller's thread context.
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
 */
margo_instance_id margo_init_opt(
Shane Snyder's avatar
Shane Snyder committed
100
    const char *addr_str,
Shane Snyder's avatar
Shane Snyder committed
101
    int mode,
102
    const struct hg_init_info *hg_init_info,
Shane Snyder's avatar
Shane Snyder committed
103
104
    int use_progress_thread,
    int rpc_thread_count);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
105

106

107
/**
Jonathan Jenkins's avatar
doc fix    
Jonathan Jenkins committed
108
 * Initializes margo library from given argobots and Mercury instances.
Philip Carns's avatar
Philip Carns committed
109
110
111
 * @param [in] progress_pool Argobots pool to drive communication
 * @param [in] handler_pool Argobots pool to service RPC handlers
 * @param [in] hg_context Mercury context
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
112
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
Philip Carns's avatar
Philip Carns committed
113
114
115
116
117
118
119
120
121
122
 *
 * NOTE: if you are configuring Argobots pools yourself before
 * passing them into this function, please consider setting
 * ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
 * environment or programmatically with putenv() in your code before
 * creating the pools to prevent excess memory consumption under
 * load from producer/consumer patterns across execution streams that
 * fail to utilize per-execution stream stack caches.  See
 * https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
 * The margo_init() function does this automatically.
123
 */
Shane Snyder's avatar
Shane Snyder committed
124
125
126
margo_instance_id margo_init_pool(
    ABT_pool progress_pool,
    ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
127
    hg_context_t *hg_context);
128
129

/**
Philip Carns's avatar
Philip Carns committed
130
 * Shuts down margo library and its underlying abt and mercury resources
Philip Carns's avatar
Philip Carns committed
131
 * @param [in] mid Margo instance
132
 */
133
134
void margo_finalize(
    margo_instance_id mid);
135
136
137
138
139
140
141
142
143
144
145

/**
 * Suspends the caller until some other entity (e.g. an RPC, thread, or
 * signal handler) invokes margo_finalize().
 *
 * NOTE: This informs Margo that the calling thread no longer needs to be 
 * scheduled for execution if it is sharing an Argobots pool with the
 * progress engine.
 *
 * @param [in] mid Margo instance
 */
146
147
void margo_wait_for_finalize(
    margo_instance_id mid);
148

149
150
151
152
153
154
155
156
157
158
/**
 * Checks whether a Margo instance we initialized as a server.
 *
 * @param [in] mid Margo instance
 *
 * @return HG_TRUE if listening or HG_FALSE if not, or not a valid margo instance.
 */
hg_bool_t margo_is_listening(
    margo_instance_id mid);

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/**
 * Installs a callback to be called before the margo instance is finalize.
 * Callbacks installed will be called in reverse ordered than they have been
 * pushed, and with the user-provider pointer as argument.
 *
 * Note that callbacks may not be called within margo_finalize. They are called
 * when the margo instance is cleaned up, which may happen in margo_wait_for_finalize.
 *
 * @param mid The margo instance
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_push_finalize_callback(
    margo_instance_id mid,
    void(*cb)(void*), 
    void* uargs);

Matthieu Dorier's avatar
Matthieu Dorier committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/**
 * Allows the passed Margo instance to be shut down remotely
 * using margo_shutdown_remote_instance().
 * 
 * @param mid Margo instance
 */
void margo_enable_remote_shutdown(margo_instance_id mid);

/**
 * Trigger the shutdown of the Margo instance running
 * at remote_addr.
 *
 * @param mid Local Margo instance
 * @param remote_addr Address of the Margo instance to shut down.
 *
 * @return 0 on success, -1 on failure.
 */
int margo_shutdown_remote_instance(
        margo_instance_id mid, 
        hg_addr_t remote_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
197

198
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
199
 * Registers an RPC with margo that is associated with a provider instance
200
201
202
203
204
205
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
Matthieu Dorier's avatar
Matthieu Dorier committed
206
207
 * \param [in] provider_id provider identifier
 * \param [in] pool Argobots pool the handler will execute in
208
209
 *
 * \return unique ID associated to the registered function
210
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
211
hg_id_t margo_provider_register_name(
212
213
214
215
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
216
217
218
    hg_rpc_cb_t rpc_cb,
    uint16_t provider_id,
    ABT_pool pool);
219

220
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
221
 * Registers an RPC with margo
222
223
224
225
226
227
228
229
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
 *
 * \return unique ID associated to the registered function
230
 */
231
static inline hg_id_t margo_register_name(
232
233
234
235
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
236
237
238
239
240
    hg_rpc_cb_t rpc_cb)
{
    return margo_provider_register_name(mid, func_name,
            in_proc_cb, out_proc_cb, rpc_cb, 0, ABT_POOL_NULL);
}
241

Matthieu Dorier's avatar
Matthieu Dorier committed
242
243
244
245
246
247
248
249
250
251
252
253
/**
 * Deregisters an RPC with margo
 *
 * \param [in] mid Margo instance
 * \param [in] rpc_id Id of the RPC to deregister
 *
 * \return HG_SUCCESS or corresponding error code
 */
hg_return_t margo_deregister(
    margo_instance_id mid,
    hg_id_t rpc_id);

254
/*
255
 * Indicate whether margo_register_name() has been called for the RPC specified by
256
257
 * func_name.
 *
258
259
260
261
 * \param [in] mid Margo instance
 * \param [in] func_name function name
 * \param [out] id registered RPC ID
 * \param [out] flag pointer to boolean
262
263
 *
 * \return HG_SUCCESS or corresponding HG error code
264
 */
265
266
267
268
269
hg_return_t margo_registered_name(
    margo_instance_id mid,
    const char *func_name,
    hg_id_t *id,
    hg_bool_t *flag);
270

271
/**
272
 * Indicate whether the given RPC name has been registered with the given provider id.
273
274
275
 *
 * @param [in] mid Margo instance
 * @param [in] func_name function name
276
 * @param [in] provider_id provider id
277
278
279
280
281
 * @param [out] id registered RPC ID
 * @param [out] flag pointer to boolean
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
282
hg_return_t margo_provider_registered_name(
283
284
    margo_instance_id mid,
    const char *func_name,
285
    uint16_t provider_id,
286
287
288
    hg_id_t *id,
    hg_bool_t *flag);

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
/**
 * Register and associate user data to registered function.
 * When HG_Finalize() is called free_callback (if defined) is called 
 * to free the registered data.
 *
 * \param [in] mid            Margo instance
 * \param [in] id             registered function ID
 * \param [in] data           pointer to data
 * \param [in] free_callback  pointer to free function
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *));

/**
 * Indicate whether margo_register_data() has been called and return associated
 * data.
 *
 * \param [in] mid        Margo instance 
 * \param [in] id         registered function ID
 *
 * \return Pointer to data or NULL
 */
316
317
318
void* margo_registered_data(
    margo_instance_id mid,
    hg_id_t id);
319

320

321
/**
322
 * Disable response for a given RPC ID.
323
 *
324
325
326
 * \param [in] mid          Margo instance 
 * \param [in] id           registered function ID
 * \param [in] disable_flag flag to disable (1) or re-enable (0) responses
327
328
 *
 * \return HG_SUCCESS or corresponding HG error code
329
 */
330
331
332
333
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag);
334

335
336
337
338
339
340
341
342
343
344
345
346
347
348
/**
 * Checks if response is disabled for a given RPC ID.
 *
 * @param [in] mid           Margo instance
 * @param [in] id            registered function ID
 * @param [ou] disabled_flag flag indicating whether response is disabled (1) or not (0)
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_registered_disabled_response(
    margo_instance_id mid,
    hg_id_t id,
    int* disabled_flag);

349
350
/**
 * Lookup an addr from a peer address/name.
351
352
353
354
 * \param [in] name     lookup name
 * \param [out] addr    return address
 *
 * \return HG_SUCCESS or corresponding HG error code
355
356
357
 */
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
358
359
    const char *name,
    hg_addr_t *addr);
360
361

/**
362
 * Free the given Mercury addr.
363
 *
364
365
 * \param [in] mid  Margo instance 
 * \param [in] addr Mercury address
366
367
368
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
369
370
371
hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr);
372
373

/**
374
 * Access self address. Address must be freed with margo_addr_free().
375
 *
376
377
 * \param [in] mid  Margo instance 
 * \param [in] addr pointer to abstract Mercury address
378
379
380
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
381
382
383
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr);
384
385

/**
386
 * Duplicate an existing Mercury address. 
387
 *
388
389
390
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address to duplicate
 * \param [in] new_addr pointer to newly allocated abstract Mercury address
391
392
393
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
394
395
396
397
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr);
398
399

/**
400
401
402
403
404
 * Convert a Mercury addr to a string (returned string includes the
 * terminating null byte '\0'). If buf is NULL, the address is not
 * converted and only the required size of the buffer is returned.
 * If the input value passed through buf_size is too small, HG_SIZE_ERROR
 * is returned and the buf_size output is set to the minimum size required.
405
 *
406
407
408
409
 * \param [in] mid          Margo instance 
 * \param [in/out] buf      pointer to destination buffer
 * \param [in/out] buf_size pointer to buffer size
 * \param [in] addr         abstract Mercury address
410
411
412
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
413
414
415
416
417
hg_return_t margo_addr_to_string(
    margo_instance_id mid,
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr);
418
419

/**
420
421
422
423
 * Initiate a new Mercury RPC using the specified function ID and the
 * local/remote target defined by addr. The handle created can be used to
 * query input and output, as well as issuing the RPC by calling
 * HG_Forward(). After completion the handle must be freed using HG_Destroy().
424
 *
425
426
427
428
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address of destination
 * \param [in] id       registered function ID
 * \param [out] handle  pointer to HG handle
429
430
431
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
432
433
434
435
436
hg_return_t margo_create(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_id_t id,
    hg_handle_t *handle);
437
438

/**
439
 * Destroy Mercury handle.
440
 *
Shane Snyder's avatar
Shane Snyder committed
441
 * \param [in] handle   Mercury handle
442
443
444
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
445
446
hg_return_t margo_destroy(
    hg_handle_t handle);
447
448

/**
449
 * Increment ref count on a Mercury handle.
450
 *
451
 * \param [in] handle Mercury handle
452
453
454
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
455
#define margo_ref_incr HG_Ref_incr
456
457
458
459

/**
 * Get info from handle.
 *
460
 * \param [in] handle Mercury handle
461
462
463
 *
 * \return Pointer to info or NULL in case of failure
 */
464
#define margo_get_info HG_Get_info
465
466
467

/**
 * Get input from handle (requires registration of input proc to deserialize
468
 * parameters). Input must be freed using margo_free_input().
469
 *
470
471
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
472
473
474
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
475
#define margo_get_input HG_Get_input
476
477
478
479

/**
 * Free resources allocated when deserializing the input.
 *
480
481
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
482
483
484
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
485
#define margo_free_input HG_Free_input
486
487
488

/**
 * Get output from handle (requires registration of output proc to deserialize
489
 * parameters). Output must be freed using margo_free_output().
490
 *
491
492
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
493
494
495
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
496
#define margo_get_output HG_Get_output
497
498
499
500

/**
 * Free resources allocated when deserializing the output.
 *
501
502
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
503
504
505
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
506
#define margo_free_output HG_Free_output
507

508
509
/**
 * Forward an RPC request to a remote host
510
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
511
512
513
514
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @returns 0 on success, hg_return_t values on error
 */
515
hg_return_t margo_provider_forward(
516
    uint16_t provider_id,
517
518
519
    hg_handle_t handle,
    void *in_struct);

520
#define margo_forward(__handle, __in_struct)\
521
    margo_provider_forward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct)
522

Matthieu Dorier's avatar
Matthieu Dorier committed
523
524
/**
 * Forward (without blocking) an RPC request to a remote host
525
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
Matthieu Dorier's avatar
Matthieu Dorier committed
526
527
528
529
530
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
531
hg_return_t margo_provider_iforward(
532
    uint16_t provider_id,
Matthieu Dorier's avatar
Matthieu Dorier committed
533
534
535
536
    hg_handle_t handle,
    void* in_struct,
    margo_request* req);

537
#define margo_iforward(__handle, __in_struct, __req)\
538
    margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
539

Matthieu Dorier's avatar
Matthieu Dorier committed
540
541
542
543
544
545
546
547
548
/**
 * Wait for an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * @param [in] req request to wait on
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_wait(
    margo_request req);

Matthieu Dorier's avatar
Matthieu Dorier committed
549
550
551
552
553
554
555
556
557
558
559
560
561

/**
 * Test if an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * has completed.
 *
 * @param [in] req request created by the non-blocking call
 * @param [out] flag 1 if request is completed, 0 otherwise
 *
 * @return 0 on success, ABT error code otherwise
 */
int margo_test(margo_request req, int* flag);

562
563
564
565
566
567
568
569
570
571
572
573
/**
 * Forward an RPC request to a remote host with a user-defined timeout
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_forward_timed(
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
/**
 * Send an RPC response, waiting for completion before returning
 * control to the calling ULT.
 * Note: this call is typically not needed as RPC listeners need not concern
 * themselves with what happens after an RPC finishes. However, there are cases
 * when this is useful (deferring resource cleanup, calling margo_finalize()
 * for e.g. a shutdown RPC).
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct);

Matthieu Dorier's avatar
Matthieu Dorier committed
590
591
592
593
594
595
596
597
598
599
600
601
602
/**
 * Send an RPC response without blocking.
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @param [out] req request on which to wait using margo_wait
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req);

603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
/**
 * Create an abstract bulk handle from specified memory segments.
 * Memory allocated is then freed when margo_bulk_free() is called.
 * \remark If NULL is passed to buf_ptrs, i.e.,
 * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim
 * memory for the missing buf_ptrs array will be internally allocated.
 *
 * \param [in] mid          Margo instance 
 * \param [in] count        number of segments
 * \param [in] buf_ptrs     array of pointers
 * \param [in] buf_sizes    array of sizes
 * \param [in] flags        permission flag:
 *                             - HG_BULK_READWRITE
 *                             - HG_BULK_READ_ONLY
 *                             - HG_BULK_WRITE_ONLY
 * \param [out] handle      pointer to returned abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle);

/**
 * Free bulk handle.
 *
 * \param [in/out] handle   abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_free(
    hg_bulk_t handle);

/**
 * Increment ref count on bulk handle.
 *
 * \param handle [IN]           abstract bulk handle
644
645
646
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
647
#define margo_bulk_ref_incr HG_Bulk_ref_incr
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664

/**
 * Access bulk handle to retrieve memory segments abstracted by handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] offset           bulk offset
 * \param [in] size             bulk size
 * \param [in] flags            permission flag:
 *                                 - HG_BULK_READWRITE
 *                                 - HG_BULK_READ_ONLY
 * \param [in] max_count        maximum number of segments to be returned
 * \param [in/out] buf_ptrs     array of buffer pointers
 * \param [in/out] buf_sizes    array of buffer sizes
 * \param [out] actual_count    actual number of segments returned
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
665
#define margo_bulk_access HG_Bulk_access
666
667
668
669
670
671
672
673

/**
 * Get total size of data abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
674
#define margo_bulk_get_size HG_Bulk_get_size
675
676
677
678
679
680
681
682

/**
 * Get total number of segments abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
683
#define margo_bulk_get_segment_count HG_Bulk_get_segment_count
684
685
686
687
688
689
690
691
692
693
694

/**
 * Get size required to serialize bulk handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] request_eager    boolean (passing HG_TRUE adds size of encoding
 *                              actual data along the handle if handle meets
 *                              HG_BULK_READ_ONLY flag condition)
 *
 * \return Non-negative value
 */
695
#define margo_bulk_get_serialize_size HG_Bulk_get_serialize_size
696
697
698
699
700
701
702
703
704
705
706
707
708
709

/**
 * Serialize bulk handle into a buffer.
 *
 * \param [in/out] buf          pointer to buffer
 * \param [in] buf_size         buffer size
 * \param [in] request_eager    boolean (passing HG_TRUE encodes actual data
 *                              along the handle, which is more efficient for
 *                              small data, this is only valid if bulk handle
 *                              has HG_BULK_READ_ONLY permission)
 * \param [in] handle           abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
710
#define margo_bulk_serialize HG_Bulk_serialize
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726

/**
 * Deserialize bulk handle from an existing buffer.
 *
 * \param [in] mid      Margo instance 
 * \param [out] handle  abstract bulk handle
 * \param [in] buf      pointer to buffer
 * \param [in] buf_size buffer size
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size);
727

728
729
/** 
 * Perform a bulk transfer
Philip Carns's avatar
Philip Carns committed
730
 * @param [in] mid Margo instance
731
732
733
734
735
736
737
738
739
740
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_transfer(
741
    margo_instance_id mid,
742
    hg_bulk_op_t op,
743
    hg_addr_t origin_addr,
744
745
746
747
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
748
    size_t size);
749

Matthieu Dorier's avatar
Matthieu Dorier committed
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
/** 
 * Asynchronously performs a bulk transfer
 * @param [in] mid Margo instance
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req);

774
775
776
777
778
779
780
781
/**
 * Suspends the calling ULT for a specified time duration
 * @param [in] mid Margo instance
 * @param [in] timeout_ms timeout duration in milliseconds
 */
void margo_thread_sleep(
    margo_instance_id mid,
    double timeout_ms);
782

783
/**
784
785
786
 * Retrieve the abt_handler pool that was associated with the instance at 
 *    initialization time
 * @param [in] mid Margo instance
787
788
 * @param [out] pool handler pool
 * @return 0 on success, error code on failure
789
 */
790
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool);
791

792
793
794
795
796
797
798
/**
 * Retrieve the rpc handler abt pool that is associated with this handle
 * @param [in] h handle
 * @return pool
 */
ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h);

799
/**
800
801
 * Retrieve the Mercury context that was associated with this instance at
 *    initialization time
802
 * @param [in] mid Margo instance
803
 * @return the Mercury context used in margo_init
804
 */
805
hg_context_t* margo_get_context(margo_instance_id mid);
806

807
808
809
/**
 * Retrieve the Mercury class that was associated with this instance at
 *    initialization time
810
 * @param [in] mid Margo instance
811
 * @return the Mercury class used in margo_init
812
 */
813
hg_class_t* margo_get_class(margo_instance_id mid);
814

815
816
817
818
819
820
821
822
823
/**
 * Get the margo_instance_id from a received RPC handle.
 *
 * \param [in] h          RPC handle
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);

824
825
826
827
828
829
830
831
832
/**
 * Get the margo_instance_id from an hg_info struct 
 *
 * \param [in] info       hg_info struct 
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);

Philip Carns's avatar
Philip Carns committed
833
834
835
836
837
838
839
840
841
842
843
844
845
846
/**
 * Enables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_start(margo_instance_id mid);

/**
 * Appends diagnostic statistics (enabled via margo_diag_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
847
848
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
Philip Carns's avatar
Philip Carns committed
849
850
 * @returns void
 */
851
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
852

853
854
855
856
857
858
859
860
/**
 * Sets configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] inout_param used to pass in values
 * @returns void
 */
861
void margo_set_param(margo_instance_id mid, int option, const void *param);
862
863
864
865
866
867
868
869
870

/**
 * Retrieves configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] param used to pass out values
 * @returns void
 */
871
void margo_get_param(margo_instance_id mid, int option, void *param);
872
873


874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 *
 * @return whether margo_finalize() was called.
 */
int __margo_internal_finalize_requested(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_incr_pending(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_decr_pending(margo_instance_id mid);

903
904
905
/**
 * macro that registers a function as an RPC.
 */
906
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
907
    margo_provider_register_name(__mid, __func_name, \
908
909
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
910
911
        __handler##_handler, \
        MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
912

913
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
914
    margo_provider_register_name(__mid, __func_name, \
915
916
917
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
        __handler##_handler, \
918
        __provider_id, __pool);
919

920
921
#define NULL_handler NULL

922
/**
Philip Carns's avatar
Philip Carns committed
923
 * macro that defines a function to glue an RPC handler to a ult handler
924
925
 * @param [in] __name name of handler function
 */
926
#define DEFINE_MARGO_RPC_HANDLER(__name) \
927
928
929
930
931
932
933
934
935
void __name##_wrapper(hg_handle_t handle) { \
    __name(handle); \
    margo_instance_id __mid; \
    __mid = margo_hg_handle_get_instance(handle); \
    __margo_internal_decr_pending(__mid); \
    if(__margo_internal_finalize_requested(__mid)) { \
        margo_finalize(__mid); \
    } \
} \
936
hg_return_t __name##_handler(hg_handle_t handle) { \
937
    int __ret; \
Philip Carns's avatar
Philip Carns committed
938
    ABT_pool __pool; \
939
    margo_instance_id __mid; \
940
941
    __mid = margo_hg_handle_get_instance(handle); \
    if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
942
    if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
943
    __pool = margo_hg_handle_get_handler_pool(handle); \
944
945
    __margo_internal_incr_pending(__mid); \
    __ret = ABT_thread_create(__pool, (void (*)(void *))__name##_wrapper, handle, ABT_THREAD_ATTR_NULL, NULL); \
946
947
948
949
950
951
    if(__ret != 0) { \
        return(HG_NOMEM_ERROR); \
    } \
    return(HG_SUCCESS); \
}

Philip Carns's avatar
Philip Carns committed
952
953
/**
 * macro that declares the prototype for a function to glue an RPC 
Philip Carns's avatar
Philip Carns committed
954
 * handler to a ult
Philip Carns's avatar
Philip Carns committed
955
956
 * @param [in] __name name of handler function
 */
957
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
958

959
960
961
962
#ifdef __cplusplus
}
#endif

963
#endif /* __MARGO */