margo.h 12 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#ifndef __MARGO
#define __MARGO

10
11
12
13
#ifdef __cplusplus
extern "C" {
#endif

14
15
16
17
18
19
/* This is to prevent the user from usin HG_Register_data
 * and HG_Registered_data, which are replaced with
 * margo_register_data and margo_registered_data
 * respecively.
 */

20
21
22
23
24
#include <mercury_bulk.h>
#include <mercury.h>
#include <mercury_macros.h>
#include <abt.h>

25
26
#undef MERCURY_REGISTER

27
28
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
29
typedef struct margo_data* margo_data_ptr;
30

31
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
32
#define MARGO_DEFAULT_MPLEX_ID 0
33
#define MARGO_RPC_ID_IGNORE ((hg_id_t*)NULL)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
34

35
36
#define MARGO_INFO_PROGRESS_TIMEOUT_UB 1

Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
38
39
40
/**
 * Initializes margo library.
 * @param [in] use_progress_thread Boolean flag to use a dedicated thread for
 *                                 running Mercury's progress loop. If false,
41
 *                                 it will run in the caller's thread context.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
42
43
 * @param [in] rpc_thread_count    Number of threads to use for running RPC
 *                                 calls. A value of 0 directs Margo to execute
44
45
46
47
48
49
 *                                 RPCs in the caller's thread context.
 *                                 Clients (i.e processes that will *not* 
 *                                 service incoming RPCs) should use a value 
 *                                 of 0. A value of -1 directs Margo to use 
 *                                 the same execution context as that used 
 *                                 for Mercury progress.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
50
51
 * @param [in] hg_context
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
52
53
54
55
56
 *
 * NOTE: Servers (processes expecting to service incoming RPC requests) must
 * specify non-zero values for use_progress_thread and rpc_thread_count *or*
 * call margo_wait_for_finalize() after margo_init() to relinguish control to 
 * Margo.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
57
58
59
60
 */
margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count,
    hg_context_t *hg_context);

61
/**
Jonathan Jenkins's avatar
doc fix    
Jonathan Jenkins committed
62
 * Initializes margo library from given argobots and Mercury instances.
Philip Carns's avatar
Philip Carns committed
63
64
65
 * @param [in] progress_pool Argobots pool to drive communication
 * @param [in] handler_pool Argobots pool to service RPC handlers
 * @param [in] hg_context Mercury context
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
66
 * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error
67
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
68
margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
69
    hg_context_t *hg_context);
70
71

/**
Philip Carns's avatar
Philip Carns committed
72
 * Shuts down margo library and its underlying abt and mercury resources
Philip Carns's avatar
Philip Carns committed
73
 * @param [in] mid Margo instance
74
 */
75
void margo_finalize(margo_instance_id mid);
76
77
78
79
80
81
82
83
84
85
86
87

/**
 * Suspends the caller until some other entity (e.g. an RPC, thread, or
 * signal handler) invokes margo_finalize().
 *
 * NOTE: This informs Margo that the calling thread no longer needs to be 
 * scheduled for execution if it is sharing an Argobots pool with the
 * progress engine.
 *
 * @param [in] mid Margo instance
 */
void margo_wait_for_finalize(margo_instance_id mid);
88
89

/**
Philip Carns's avatar
Philip Carns committed
90
91
92
 * Retrieve the abt_handler pool that was associated with the instance at 
 *    initialization time
 * @param [in] mid Margo instance
93
 */
94
ABT_pool* margo_get_handler_pool(margo_instance_id mid);
95

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
 * Retrieve the Mercury context that was associated with this instance at
 *    initialization time
 * @param [in] mid Margo instance
 * @return the Mercury context used in margo_init
 */
hg_context_t* margo_get_context(margo_instance_id mid);

/**
 * Retrieve the Mercury class that was associated with this instance at
 *    initialization time
 * @param [in] mid Margo instance
 * @return the Mercury class used in margo_init
 */
hg_class_t* margo_get_class(margo_instance_id mid);

112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
 * Register and associate user data to registered function.
 * When HG_Finalize() is called free_callback (if defined) is called 
 * to free the registered data.
 *
 * \param [in] mid            Margo instance
 * \param [in] id             registered function ID
 * \param [in] data           pointer to data
 * \param [in] free_callback  pointer to free function
 *
 * \return HG_SUCCESS or corresponding HG error code
 */
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *));

/**
 * Indicate whether margo_register_data() has been called and return associated
 * data.
 *
 * \param [in] mid        Margo instance 
 * \param [in] id         registered function ID
 *
 * \return Pointer to data or NULL
 */
void* margo_registered_data(margo_instance_id mid, hg_id_t id);

/**
 * Get the margo_instance_id from a received RPC handle.
 *
 * \param [in] h          RPC handle
 * 
 * \return Margo instance
 */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);

150
151
/**
 * Forward an RPC request to a remote host
Philip Carns's avatar
Philip Carns committed
152
 * @param [in] mid Margo instance
153
154
155
156
157
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_forward(
158
    margo_instance_id mid,
159
160
161
    hg_handle_t handle,
    void *in_struct);

162
163
164
165
166
167
168
169
170
171
172
173
174
175
/**
 * Forward an RPC request to a remote host with a user-defined timeout
 * @param [in] mid Margo instance
 * @param [in] handle identifier for the RPC to be sent
 * @param [in] in_struct input argument struct for RPC
 * @param [in] timeout_ms timeout in milliseconds
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_forward_timed(
    margo_instance_id mid,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/**
 * Send an RPC response, waiting for completion before returning
 * control to the calling ULT.
 * Note: this call is typically not needed as RPC listeners need not concern
 * themselves with what happens after an RPC finishes. However, there are cases
 * when this is useful (deferring resource cleanup, calling margo_finalize()
 * for e.g. a shutdown RPC).
 * @param [in] mid Margo instance
 * @param [in] handle identifier for the RPC for which a response is being
 * sent
 * @param [in] out_struct output argument struct for response
 * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
 */
hg_return_t margo_respond(
    margo_instance_id mid,
    hg_handle_t handle,
    void *out_struct);

194
195
/** 
 * Perform a bulk transfer
Philip Carns's avatar
Philip Carns committed
196
 * @param [in] mid Margo instance
197
198
199
200
201
202
203
204
205
206
 * @param [in] op type of operation to perform
 * @param [in] origin_addr remote Mercury address
 * @param [in] origin_handle remote Mercury bulk memory handle
 * @param [in] origin_offset offset into remote bulk memory to access
 * @param [in] local_handle local bulk memory handle
 * @param [in] local_offset offset into local bulk memory to access
 * @param [in] size size (in bytes) of transfer
 * @returns 0 on success, hg_return_t values on error
 */
hg_return_t margo_bulk_transfer(
207
    margo_instance_id mid,
208
    hg_bulk_op_t op,
209
    hg_addr_t origin_addr,
210
211
212
213
214
215
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size);

216
217
218
/**
 * address lookup
 * @param [in] name             lookup name
Jonathan Jenkins's avatar
fixupme    
Jonathan Jenkins committed
219
220
 * @param [out] addr            return address
 * @returns HG_SUCCESS on success
221
 */
222
hg_return_t margo_addr_lookup(
223
    margo_instance_id mid,
Philip Carns's avatar
Philip Carns committed
224
    const char   *name,
225
    hg_addr_t    *addr);
226

227
228
/**
 * Suspends the calling ULT for a specified time duration
229
 * @param [in] mid Margo instance
230
231
232
 * @param [in] timeout_ms timeout duration in milliseconds
 */
void margo_thread_sleep(
233
    margo_instance_id mid,
234
235
    double timeout_ms);

236
237
238
239
/** 
 * Registers an RPC with margo
 * @param [in] mid Margo instance
 * @param [in] id Mercury RPC identifier
240
 */
241
int margo_register(margo_instance_id mid, hg_id_t id);
242

Philip Carns's avatar
Philip Carns committed
243
244
245
246
/** 
 * Registers an RPC with margo that is associated with a multiplexed service
 * @param [in] mid Margo instance
 * @param [in] id Mercury RPC identifier
Philip Carns's avatar
Philip Carns committed
247
 * @param [in] mplex_id multiplexing identifier
Philip Carns's avatar
Philip Carns committed
248
249
250
251
 * @param [in] pool Argobots pool the handler will execute in
 */
int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool);

Philip Carns's avatar
Philip Carns committed
252
253
254
255
256
257
258
259
260
/**
 * Maps an RPC id and mplex id to the pool that it should execute on
 * @param [in] mid Margo instance
 * @param [in] id Mercury RPC identifier
 * @param [in] mplex_id multiplexing identifier
 * @param [out] pool Argobots pool the handler will execute in
 */
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);

Philip Carns's avatar
Philip Carns committed
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/**
 * Enables diagnostic collection on specified Margo instance
 *
 * @param [in] mid Margo instance
 * @returns void
 */
void margo_diag_start(margo_instance_id mid);

/**
 * Appends diagnostic statistics (enabled via margo_diag_start()) to specified 
 * output file.
 *
 * @param [in] mid Margo instance
 * @param [in] file output file ("-" for stdout)
275
276
 * @param [in] uniquify flag indicating if file name should have additional
 *   information added to it to make output from different processes unique
Philip Carns's avatar
Philip Carns committed
277
278
 * @returns void
 */
279
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
280

281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/**
 * Sets configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] inout_param used to pass in values
 * @returns void
 */
void margo_set_info(margo_instance_id mid, int option, const void *param);

/**
 * Retrieves configurable parameters/hints
 *
 * @param [in] mid Margo instance
 * @param [in] option numerical option number
 * @param [out] param used to pass out values
 * @returns void
 */
void margo_get_info(margo_instance_id mid, int option, void *param);


302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/**
 * macro that registers a function as an RPC.
 */
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __rpc_id_ptr) do { \
    hg_return_t __hret; \
    hg_id_t __id; \
    hg_bool_t __flag; \
    int __ret; \
    __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \
    assert(__hret == HG_SUCCESS); \
    if(!__flag) \
        __id = HG_Register_name(margo_get_class(__mid), __func_name,\
                   BOOST_PP_CAT(hg_proc_, __in_t),\
                   BOOST_PP_CAT(hg_proc_, __out_t),\
                   __handler##_handler); \
    __ret = margo_register(__mid, __id); \
    assert(__ret == 0); \
    if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \
        *(__rpc_id_ptr) = __id; \
    } \
} while(0)

#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool, __rpc_id_ptr) do { \
325
326
327
328
329
330
331
    hg_return_t __hret; \
    hg_id_t __id; \
    hg_bool_t __flag; \
    int __ret; \
    __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \
    assert(__hret == HG_SUCCESS); \
    if(!__flag) \
332
333
334
335
        __id = HG_Register_name(margo_get_class(__mid), __func_name,\
                   BOOST_PP_CAT(hg_proc_, __in_t),\
                   BOOST_PP_CAT(hg_proc_, __out_t),\
                   __handler##_handler); \
336
    __ret = margo_register_mplex(__mid, __id, __mplex_id, __pool); \
Philip Carns's avatar
typo    
Philip Carns committed
337
    assert(__ret == 0); \
338
339
340
    if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \
        *(__rpc_id_ptr) = __id; \
    } \
341
342
} while(0)

343
344
#define NULL_handler NULL

345
/**
Philip Carns's avatar
Philip Carns committed
346
 * macro that defines a function to glue an RPC handler to a ult handler
347
348
 * @param [in] __name name of handler function
 */
349
#define DEFINE_MARGO_RPC_HANDLER(__name) \
350
hg_return_t __name##_handler(hg_handle_t handle) { \
351
    int __ret; \
Philip Carns's avatar
Philip Carns committed
352
    ABT_pool __pool; \
353
    margo_instance_id __mid; \
Shane Snyder's avatar
Shane Snyder committed
354
    const struct hg_info *__hgi; \
355
    __hgi = HG_Get_info(handle); \
356
357
358
359
	__mid = margo_hg_handle_get_instance(handle); \
    if(__mid == MARGO_INSTANCE_NULL) { \
        return(HG_OTHER_ERROR); \
    } \
360
    __ret = margo_lookup_mplex(__mid, __hgi->id, __hgi->target_id, (&__pool)); \
Philip Carns's avatar
Philip Carns committed
361
362
363
364
    if(__ret != 0) { \
        return(HG_INVALID_PARAM); \
    }\
    __ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
365
366
367
368
369
370
    if(__ret != 0) { \
        return(HG_NOMEM_ERROR); \
    } \
    return(HG_SUCCESS); \
}

Philip Carns's avatar
Philip Carns committed
371
372
/**
 * macro that declares the prototype for a function to glue an RPC 
Philip Carns's avatar
Philip Carns committed
373
 * handler to a ult
Philip Carns's avatar
Philip Carns committed
374
375
 * @param [in] __name name of handler function
 */
376
#define DECLARE_MARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
377

378
379
380
381
#ifdef __cplusplus
}
#endif

382
#endif /* __MARGO */