margo.h 31.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#ifndef __MARGO
#define __MARGO

10
11
12
13
#ifdef __cplusplus
extern "C" {
#endif

14
#include <mercury.h>
15
16
#include <mercury_types.h>
#include <mercury_bulk.h>
17
18
19
#include <mercury_macros.h>
#include <abt.h>

20
21
22
23
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)

Shane Snyder's avatar
Shane Snyder committed
24
25
26
27
28
/* This is to prevent the user from usin HG_Register_data
 * and HG_Registered_data, which are replaced with
 * margo_register_data and margo_registered_data
 * respecively.
 */
29
30
#undef MERCURY_REGISTER

31
32
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
33
typedef struct margo_data* margo_data_ptr;
34
typedef struct margo_request_struct* margo_request;
35

36
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
37
#define MARGO_REQUEST_NULL ((margo_request)NULL)
Shane Snyder's avatar
Shane Snyder committed
38
39
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
40
#define MARGO_DEFAULT_PROVIDER_ID 0
41
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
42

Shane Snyder's avatar
Shane Snyder committed
43
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
44

Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
46
/**
 * Initializes margo library.
Shane Snyder's avatar
Shane Snyder committed
47
 * @param [in] addr_str            Mercury host address with port number
Shane Snyder's avatar
Shane Snyder committed
48
49
50
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
Jonathan Jenkins's avatar
Jonathan Jenkins committed
51
52
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
53
 *                                 it will run in the caller's thread context.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
55
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
56
57
58
59
60
61
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
62
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
63
64
65
66
67
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
68
 */
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#define margo_init(_addr_str, _mode, _use_progress_thread, _rpc_thread_count)\
 margo_init_opt(_addr_str, _mode, NULL, _use_progress_thread, _rpc_thread_count)

/**
 * Initializes margo library with custom Mercury options.
 * @param [in] addr_str            Mercury host address with port number
 * @param [in] mode                Mode to run Margo in:
 *                                     - MARGO_CLIENT_MODE
 *                                     - MARGO_SERVER_MODE
 * @param [in] hg_init_info        (Optional) Hg init info, passed directly
 *                                 to Mercury
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
 *                                 it will run in the caller's thread context.
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
 */
margo_instance_id margo_init_opt(
Shane Snyder's avatar
Shane Snyder committed
99
    const char *addr_str,
Shane Snyder's avatar
Shane Snyder committed
100
    int mode,
101
    const struct hg_init_info *hg_init_info,
Shane Snyder's avatar
Shane Snyder committed
102
103
    int use_progress_thread,
    int rpc_thread_count);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
104

105

106
/**
Jonathan Jenkins's avatar
doc fix    
Jonathan Jenkins committed
107
 * Initializes margo library from given argobots and Mercury instances.
Philip Carns's avatar
Philip Carns committed
108
109
110
 * @param [in] progress_pool Argobots pool to drive communication
 * @param [in] handler_pool Argobots pool to service RPC handlers
 * @param [in] hg_context Mercury context
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
111
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
Philip Carns's avatar
Philip Carns committed
112
113
114
115
116
117
118
119
120
121
 *
 * NOTE: if you are configuring Argobots pools yourself before
 * passing them into this function, please consider setting
 * ABT_MEM_MAX_NUM_STACKS to a low value (like 8) either in your
 * environment or programmatically with putenv() in your code before
 * creating the pools to prevent excess memory consumption under
 * load from producer/consumer patterns across execution streams that
 * fail to utilize per-execution stream stack caches.  See
 * https://xgitlab.cels.anl.gov/sds/margo/issues/40 for details.
 * The margo_init() function does this automatically.
122
 */
Shane Snyder's avatar
Shane Snyder committed
123
124
125
margo_instance_id margo_init_pool(
    ABT_pool progress_pool,
    ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
126
    hg_context_t *hg_context);
127
128

/**
Philip Carns's avatar
Philip Carns committed
129
 * Shuts down margo library and its underlying abt and mercury resources
Philip Carns's avatar
Philip Carns committed
130
 * @param [in] mid Margo instance
131
 */
132
133
void margo_finalize(
    margo_instance_id mid);
134
135
136
137
138
139
140
141
142
143
144

/**
 * Suspends the caller until some other entity (e.g. an RPC, thread, or
 * signal handler) invokes margo_finalize().
 *
 * NOTE: This informs Margo that the calling thread no longer needs to be 
 * scheduled for execution if it is sharing an Argobots pool with the
 * progress engine.
 *
 * @param [in] mid Margo instance
 */
145
146
void margo_wait_for_finalize(
    margo_instance_id mid);
147

148
149
150
151
152
153
154
155
156
157
/**
 * Checks whether a Margo instance we initialized as a server.
 *
 * @param [in] mid Margo instance
 *
 * @return HG_TRUE if listening or HG_FALSE if not, or not a valid margo instance.
 */
hg_bool_t margo_is_listening(
    margo_instance_id mid);

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
 * Installs a callback to be called before the margo instance is finalize.
 * Callbacks installed will be called in reverse ordered than they have been
 * pushed, and with the user-provider pointer as argument.
 *
 * Note that callbacks may not be called within margo_finalize. They are called
 * when the margo instance is cleaned up, which may happen in margo_wait_for_finalize.
 *
 * @param mid The margo instance
 * @param cb Callback to install
 * @param uargs User-provider argument to pass to the callback when called
 */
void margo_push_finalize_callback(
    margo_instance_id mid,
    void(*cb)(void*), 
    void* uargs);

Matthieu Dorier's avatar
Matthieu Dorier committed
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/**
 * Allows the passed Margo instance to be shut down remotely
 * using margo_shutdown_remote_instance().
 * 
 * @param mid Margo instance
 */
void margo_enable_remote_shutdown(margo_instance_id mid);

/**
 * Trigger the shutdown of the Margo instance running
 * at remote_addr.
 *
 * @param mid Local Margo instance
 * @param remote_addr Address of the Margo instance to shut down.
 *
 * @return 0 on success, -1 on failure.
 */
int margo_shutdown_remote_instance(
        margo_instance_id mid, 
        hg_addr_t remote_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
196

197
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
198
 * Registers an RPC with margo that is associated with a provider instance
199
200
201
202
203
204
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
Matthieu Dorier's avatar
Matthieu Dorier committed
205
206
 * \param [in] provider_id provider identifier
 * \param [in] pool Argobots pool the handler will execute in
207
208
 *
 * \return unique ID associated to the registered function
209
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
210
hg_id_t margo_provider_register_name(
211
212
213
214
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
215
216
217
    hg_rpc_cb_t rpc_cb,
    uint16_t provider_id,
    ABT_pool pool);
218

219
/** 
Matthieu Dorier's avatar
Matthieu Dorier committed
220
 * Registers an RPC with margo
221
222
223
224
225
226
227
228
 *
 * \param [in] mid Margo instance
 * \param [in] func_name unique function name for RPC
 * \param [in] in_proc_cb pointer to input proc callback
 * \param [in] out_proc_cb pointer to output proc callback
 * \param [in] rpc_cb RPC callback
 *
 * \return unique ID associated to the registered function
229
 */
230
static inline hg_id_t margo_register_name(
231
232
233
234
    margo_instance_id mid,
    const char *func_name,
    hg_proc_cb_t in_proc_cb,
    hg_proc_cb_t out_proc_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
235
236
237
238
239
    hg_rpc_cb_t rpc_cb)
{
    return margo_provider_register_name(mid, func_name,
            in_proc_cb, out_proc_cb, rpc_cb, 0, ABT_POOL_NULL);
}
240

Matthieu Dorier's avatar
Matthieu Dorier committed
241
242
243
244
245
246
247
248
249
250
251
252
/**
 * Deregisters an RPC with margo
 *
 * \param [in] mid Margo instance
 * \param [in] rpc_id Id of the RPC to deregister
 *
 * \return HG_SUCCESS or corresponding error code
 */
hg_return_t margo_deregister(
    margo_instance_id mid,
    hg_id_t rpc_id);

253
/*
254
 * Indicate whether margo_register_name() has been called for the RPC specified by
255
256
 * func_name.
 *
257
258
259
260
 * \param [in] mid Margo instance
 * \param [in] func_name function name
 * \param [out] id registered RPC ID
 * \param [out] flag pointer to boolean
261
262
 *
 * \return HG_SUCCESS or corresponding HG error code
263
 */
264
265
266
267
268
hg_return_t margo_registered_name(
    margo_instance_id mid,
    const char *func_name,
    hg_id_t *id,
    hg_bool_t *flag);
269

270
/**
271
 * Indicate whether the given RPC name has been registered with the given provider id.
272
273
274
 *
 * @param [in] mid Margo instance
 * @param [in] func_name function name
275
 * @param [in] provider_id provider id
276
277
278
279
280
 * @param [out] id registered RPC ID
 * @param [out] flag pointer to boolean
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
281
hg_return_t margo_provider_registered_name(
282
283
    margo_instance_id mid,
    const char *func_name,
284
    uint16_t provider_id,
285
286
287
    hg_id_t *id,
    hg_bool_t *flag);

288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/**
 * Register and associate user data to registered function.
 * When HG_Finalize() is called free_callback (if defined) is called 
 * to free the registered data.
 *
 * \param [in] mid            Margo instance
 * \param [in] id             registered function ID
 * \param [in] data           pointer to data
 * \param [in] free_callback  pointer to free function
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *));

/**
 * Indicate whether margo_register_data() has been called and return associated
 * data.
 *
 * \param [in] mid        Margo instance 
 * \param [in] id         registered function ID
 *
 * \return Pointer to data or NULL
 */
315
316
317
void* margo_registered_data(
    margo_instance_id mid,
    hg_id_t id);
318

319

320
/**
321
 * Disable response for a given RPC ID.
322
 *
323
324
325
 * \param [in] mid          Margo instance 
 * \param [in] id           registered function ID
 * \param [in] disable_flag flag to disable (1) or re-enable (0) responses
326
327
 *
 * \return HG_SUCCESS or corresponding HG error code
328
 */
329
330
331
332
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag);
333

334
335
336
337
338
339
340
341
342
343
344
345
346
347
/**
 * Checks if response is disabled for a given RPC ID.
 *
 * @param [in] mid           Margo instance
 * @param [in] id            registered function ID
 * @param [ou] disabled_flag flag indicating whether response is disabled (1) or not (0)
 *
 * @return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_registered_disabled_response(
    margo_instance_id mid,
    hg_id_t id,
    int* disabled_flag);

348
349
/**
 * Lookup an addr from a peer address/name.
350
351
352
353
 * \param [in] name     lookup name
 * \param [out] addr    return address
 *
 * \return HG_SUCCESS or corresponding HG error code
354
355
356
 */
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
357
358
    const char *name,
    hg_addr_t *addr);
359
360

/**
361
 * Free the given Mercury addr.
362
 *
363
364
 * \param [in] mid  Margo instance 
 * \param [in] addr Mercury address
365
366
367
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
368
369
370
hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr);
371
372

/**
373
 * Access self address. Address must be freed with margo_addr_free().
374
 *
375
376
 * \param [in] mid  Margo instance 
 * \param [in] addr pointer to abstract Mercury address
377
378
379
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
380
381
382
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr);
383
384

/**
385
 * Duplicate an existing Mercury address. 
386
 *
387
388
389
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address to duplicate
 * \param [in] new_addr pointer to newly allocated abstract Mercury address
390
391
392
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
393
394
395
396
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr);
397
398

/**
399
400
401
402
403
 * Convert a Mercury addr to a string (returned string includes the
 * terminating null byte '\0'). If buf is NULL, the address is not
 * converted and only the required size of the buffer is returned.
 * If the input value passed through buf_size is too small, HG_SIZE_ERROR
 * is returned and the buf_size output is set to the minimum size required.
404
 *
405
406
407
408
 * \param [in] mid          Margo instance 
 * \param [in/out] buf      pointer to destination buffer
 * \param [in/out] buf_size pointer to buffer size
 * \param [in] addr         abstract Mercury address
409
410
411
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
412
413
414
415
416
hg_return_t margo_addr_to_string(
    margo_instance_id mid,
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr);
417
418

/**
419
420
421
422
 * Initiate a new Mercury RPC using the specified function ID and the
 * local/remote target defined by addr. The handle created can be used to
 * query input and output, as well as issuing the RPC by calling
 * HG_Forward(). After completion the handle must be freed using HG_Destroy().
423
 *
424
425
426
427
 * \param [in] mid      Margo instance 
 * \param [in] addr     abstract Mercury address of destination
 * \param [in] id       registered function ID
 * \param [out] handle  pointer to HG handle
428
429
430
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
431
432
433
434
435
hg_return_t margo_create(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_id_t id,
    hg_handle_t *handle);
436
437

/**
438
 * Destroy Mercury handle.
439
 *
Shane Snyder's avatar
Shane Snyder committed
440
 * \param [in] handle   Mercury handle
441
442
443
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
444
445
hg_return_t margo_destroy(
    hg_handle_t handle);
446
447

/**
448
 * Increment ref count on a Mercury handle.
449
 *
450
 * \param [in] handle Mercury handle
451
452
453
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
454
#define margo_ref_incr HG_Ref_incr
455
456
457
458

/**
 * Get info from handle.
 *
459
 * \param [in] handle Mercury handle
460
461
462
 *
 * \return Pointer to info or NULL in case of failure
 */
463
#define margo_get_info HG_Get_info
464
465
466

/**
 * Get input from handle (requires registration of input proc to deserialize
467
 * parameters). Input must be freed using margo_free_input().
468
 *
469
470
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
471
472
473
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
474
#define margo_get_input HG_Get_input
475
476
477
478

/**
 * Free resources allocated when deserializing the input.
 *
479
480
 * \param [in] handle           Mercury handle
 * \param [in/out] in_struct    pointer to input structure
481
482
483
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
484
#define margo_free_input HG_Free_input
485
486
487

/**
 * Get output from handle (requires registration of output proc to deserialize
488
 * parameters). Output must be freed using margo_free_output().
489
 *
490
491
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
492
493
494
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
495
#define margo_get_output HG_Get_output
496
497
498
499

/**
 * Free resources allocated when deserializing the output.
 *
500
501
 * \param [in] handle           Mercury handle
 * \param [in/out] out_struct   pointer to output structure
502
503
504
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
505
#define margo_free_output HG_Free_output
506

507
508
/**
 * Forward an RPC request to a remote host
509
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
510
511
512
513
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @returns 0 on success, hg_return_t values on error
 */
514
hg_return_t margo_provider_forward(
515
    uint16_t provider_id,
516
517
518
    hg_handle_t handle,
    void *in_struct);

519
#define margo_forward(__handle, __in_struct)\
520
    margo_provider_forward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct)
521

Matthieu Dorier's avatar
Matthieu Dorier committed
522
523
/**
 * Forward (without blocking) an RPC request to a remote host
524
 * @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
Matthieu Dorier's avatar
Matthieu Dorier committed
525
526
527
528
529
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
530
hg_return_t margo_provider_iforward(
531
    uint16_t provider_id,
Matthieu Dorier's avatar
Matthieu Dorier committed
532
533
534
535
    hg_handle_t handle,
    void* in_struct,
    margo_request* req);

536
#define margo_iforward(__handle, __in_struct, __req)\
537
    margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
538

539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
/**
 * Forward an RPC request to a remote provider with a user-defined timeout
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_forward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms);

#define margo_forward_timed(__handle, __in_struct, __timeout)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout)

/**
 * Non-blocking version of margo_provider_forward_timed.
 * @param [in] provider_id provider id
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_provider_iforward_timed(
    uint16_t provider_id,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms,
    margo_request* req);

#define margo_iforward_timed(__handle, __in_struct, __timeout, __req)\
    margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout, __req)

Matthieu Dorier's avatar
Matthieu Dorier committed
574
575
576
577
578
579
580
581
582
/**
 * Wait for an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * @param [in] req request to wait on
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_wait(
    margo_request req);

Matthieu Dorier's avatar
Matthieu Dorier committed
583
584
585
586
587
588
589
590
591
592
593
594
595

/**
 * Test if an operation initiated by a non-blocking
 * margo function (margo_iforward, margo_irespond, etc.)
 * has completed.
 *
 * @param [in] req request created by the non-blocking call
 * @param [out] flag 1 if request is completed, 0 otherwise
 *
 * @return 0 on success, ABT error code otherwise
 */
int margo_test(margo_request req, int* flag);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
/**
 * Send an RPC response, waiting for completion before returning
 * control to the calling ULT.
 * Note: this call is typically not needed as RPC listeners need not concern
 * themselves with what happens after an RPC finishes. However, there are cases
 * when this is useful (deferring resource cleanup, calling margo_finalize()
 * for e.g. a shutdown RPC).
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct);

Matthieu Dorier's avatar
Matthieu Dorier committed
612
613
614
615
616
617
618
619
620
621
622
623
624
/**
 * Send an RPC response without blocking.
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @param [out] req request on which to wait using margo_wait
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req);

625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
/**
 * Create an abstract bulk handle from specified memory segments.
 * Memory allocated is then freed when margo_bulk_free() is called.
 * \remark If NULL is passed to buf_ptrs, i.e.,
 * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim
 * memory for the missing buf_ptrs array will be internally allocated.
 *
 * \param [in] mid          Margo instance 
 * \param [in] count        number of segments
 * \param [in] buf_ptrs     array of pointers
 * \param [in] buf_sizes    array of sizes
 * \param [in] flags        permission flag:
 *                             - HG_BULK_READWRITE
 *                             - HG_BULK_READ_ONLY
 *                             - HG_BULK_WRITE_ONLY
 * \param [out] handle      pointer to returned abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle);

/**
 * Free bulk handle.
 *
 * \param [in/out] handle   abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_free(
    hg_bulk_t handle);

/**
 * Increment ref count on bulk handle.
 *
 * \param handle [IN]           abstract bulk handle
666
667
668
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
669
#define margo_bulk_ref_incr HG_Bulk_ref_incr
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686

/**
 * Access bulk handle to retrieve memory segments abstracted by handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] offset           bulk offset
 * \param [in] size             bulk size
 * \param [in] flags            permission flag:
 *                                 - HG_BULK_READWRITE
 *                                 - HG_BULK_READ_ONLY
 * \param [in] max_count        maximum number of segments to be returned
 * \param [in/out] buf_ptrs     array of buffer pointers
 * \param [in/out] buf_sizes    array of buffer sizes
 * \param [out] actual_count    actual number of segments returned
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
687
#define margo_bulk_access HG_Bulk_access
688
689
690
691
692
693
694
695

/**
 * Get total size of data abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
696
#define margo_bulk_get_size HG_Bulk_get_size
697
698
699
700
701
702
703
704

/**
 * Get total number of segments abstracted by bulk handle.
 *
 * \param [in] handle   abstract bulk handle
 *
 * \return Non-negative value
 */
705
#define margo_bulk_get_segment_count HG_Bulk_get_segment_count
706
707
708
709
710
711
712
713
714
715
716

/**
 * Get size required to serialize bulk handle.
 *
 * \param [in] handle           abstract bulk handle
 * \param [in] request_eager    boolean (passing HG_TRUE adds size of encoding
 *                              actual data along the handle if handle meets
 *                              HG_BULK_READ_ONLY flag condition)
 *
 * \return Non-negative value
 */
717
#define margo_bulk_get_serialize_size HG_Bulk_get_serialize_size
718
719
720
721
722
723
724
725
726
727
728
729
730
731

/**
 * Serialize bulk handle into a buffer.
 *
 * \param [in/out] buf          pointer to buffer
 * \param [in] buf_size         buffer size
 * \param [in] request_eager    boolean (passing HG_TRUE encodes actual data
 *                              along the handle, which is more efficient for
 *                              small data, this is only valid if bulk handle
 *                              has HG_BULK_READ_ONLY permission)
 * \param [in] handle           abstract bulk handle
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
732
#define margo_bulk_serialize HG_Bulk_serialize
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748

/**
 * Deserialize bulk handle from an existing buffer.
 *
 * \param [in] mid      Margo instance 
 * \param [out] handle  abstract bulk handle
 * \param [in] buf      pointer to buffer
 * \param [in] buf_size buffer size
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size);
749

750
751
/** 
 * Perform a bulk transfer
Philip Carns's avatar
Philip Carns committed
752
 * @param [in] mid Margo instance
753
754
755
756
757
758
759
760
761
762
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_transfer(
763
    margo_instance_id mid,
764
    hg_bulk_op_t op,
765
    hg_addr_t origin_addr,
766
767
768
769
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
770
    size_t size);
771

Matthieu Dorier's avatar
Matthieu Dorier committed
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
/** 
 * Asynchronously performs a bulk transfer
 * @param [in] mid Margo instance
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @param [out] req request to wait on using margo_wait
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req);

796
797
798
799
800
801
802
803
/**
 * Suspends the calling ULT for a specified time duration
 * @param [in] mid Margo instance
 * @param [in] timeout_ms timeout duration in milliseconds
 */
void margo_thread_sleep(
    margo_instance_id mid,
    double timeout_ms);
804

805
/**
806
807
808
 * Retrieve the abt_handler pool that was associated with the instance at 
 *    initialization time
 * @param [in] mid Margo instance
809
810
 * @param [out] pool handler pool
 * @return 0 on success, error code on failure
811
 */
812
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool);
813

814
815
816
817
818
819
820
/**
 * Retrieve the rpc handler abt pool that is associated with this handle
 * @param [in] h handle
 * @return pool
 */
ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h);

821
/**
822
823
 * Retrieve the Mercury context that was associated with this instance at
 *    initialization time
824
 * @param [in] mid Margo instance
825
 * @return the Mercury context used in margo_init
826
 */
827
hg_context_t* margo_get_context(margo_instance_id mid);
828

829
830
831
/**
 * Retrieve the Mercury class that was associated with this instance at
 *    initialization time
832
 * @param [in] mid Margo instance
833
 * @return the Mercury class used in margo_init
834
 */
835
hg_class_t* margo_get_class(margo_instance_id mid);
836

837
838
839
840
841
842
843
844
845
/**
 * Get the margo_instance_id from a received RPC handle.
 *
 * \param [in] h          RPC handle
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);

846
847
848
849
850
851
852
853
854
/**
 * Get the margo_instance_id from an hg_info struct 
 *
 * \param [in] info       hg_info struct 
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);

Philip Carns's avatar
Philip Carns committed
855
856
857
858
859
860
861
862
863
864
865
866
867
868
/**
 * Enables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_start(margo_instance_id mid);

/**
 * Appends diagnostic statistics (enabled via margo_diag_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
869
870
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
Philip Carns's avatar
Philip Carns committed
871
872
 * @returns void
 */
873
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
874

875
876
877
878
879
880
881
882
/**
 * Sets configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] inout_param used to pass in values
 * @returns void
 */
883
void margo_set_param(margo_instance_id mid, int option, const void *param);
884
885
886
887
888
889
890
891
892

/**
 * Retrieves configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] param used to pass out values
 * @returns void
 */
893
void margo_get_param(margo_instance_id mid, int option, void *param);
894
895


896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 *
 * @return whether margo_finalize() was called.
 */
int __margo_internal_finalize_requested(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_incr_pending(margo_instance_id mid);

/**
 * @private
 * Internal function used by MARGO_REGISTER, not
 * supposed to be called by users!
 *
 * @param mid Margo instance
 */
void __margo_internal_decr_pending(margo_instance_id mid);

925
926
927
928
929
/**
 * @private
 * Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
 * called by users!
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
930
931
932
933
934
935
936
937
void __margo_internal_pre_wrapper_hooks(margo_instance_id mid, hg_handle_t handle);

/**
 * @private
 * Internal function used by DEFINE_MARGO_RPC_HANDLER, not supposed to be
 * called by users!
 */
void __margo_internal_post_wrapper_hooks(margo_instance_id mid);
938

939
940
941
/**
 * macro that registers a function as an RPC.
 */
942
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
943
    margo_provider_register_name(__mid, __func_name, \
944
945
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
946
947
        __handler##_handler, \
        MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
948

949
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
950
    margo_provider_register_name(__mid, __func_name, \
951
952
953
        BOOST_PP_CAT(hg_proc_, __in_t), \
        BOOST_PP_CAT(hg_proc_, __out_t), \
        __handler##_handler, \
954
        __provider_id, __pool);
955

956
957
#define NULL_handler NULL

958
#define __MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
959
960
    margo_instance_id __mid; \
    __mid = margo_hg_handle_get_instance(handle); \
Matthieu Dorier's avatar
Matthieu Dorier committed
961
    __margo_internal_pre_wrapper_hooks(__mid, handle); \
962
    __name(handle); \
Matthieu Dorier's avatar
Matthieu Dorier committed
963
    __margo_internal_post_wrapper_hooks(__mid);
964
965
966
967
968
969
970

#define __MARGO_INTERNAL_RPC_WRAPPER(__name) \
void __name##_wrapper(hg_handle_t handle) { \
    __MARGO_INTERNAL_RPC_WRAPPER_BODY(__name) \
}

#define __MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
971
    int __ret; \
Philip Carns's avatar
Philip Carns committed
972
    ABT_pool __pool; \
973
    margo_instance_id __mid; \
974
975
    __mid = margo_hg_handle_get_instance(handle); \
    if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
976
    if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
977
    __pool = margo_hg_handle_get_handler_pool(handle); \
978
979
    __margo_internal_incr_pending(__mid); \
    __ret = ABT_thread_create(__pool, (void (*)(void *))__name##_wrapper, handle, ABT_THREAD_ATTR_NULL, NULL); \
980
981
982
    if(__ret != 0) { \
        return(HG_NOMEM_ERROR); \
    } \
983
984
985
986
987
    return(HG_SUCCESS);

#define __MARGO_INTERNAL_RPC_HANDLER(__name) \
hg_return_t __name##_handler(hg_handle_t handle) { \
    __MARGO_INTERNAL_RPC_HANDLER_BODY(__name) \
988
989
}

990
991
992
993
994
995
996
997
998
999
1000
1001
1002
/**
 * macro that defines a function to glue an RPC handler to a ult handler
 * @param [in] __name name of handler function
 *
 * Note: we use this opportunity to set a thread-local argobots key that stores
 * the "breadcrumb" that was set in the RPC.  It is shifted down 16 bits so that
 * if this handler in turn issues more RPCs there will be a stack showing their
 * ancestry.
 */
#define DEFINE_MARGO_RPC_HANDLER(__name) \
    __MARGO_INTERNAL_RPC_WRAPPER(__name) \
    __MARGO_INTERNAL_RPC_HANDLER(__name)

Philip Carns's avatar
Philip Carns committed
1003
1004
/**
 * macro that declares the prototype for a function to glue an RPC 
Philip Carns's avatar
Philip Carns committed
1005
 * handler to a ult
Philip Carns's avatar
Philip Carns committed
1006
1007
 * @param [in] __name name of handler function
 */
1008
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
1009

1010
1011
1012
1013
#ifdef __cplusplus
}
#endif

1014
#endif /* __MARGO */