mona.c 67 KB
Newer Older
1
2
/*
 * (C) 2020 The University of Chicago
Matthieu Dorier's avatar
Matthieu Dorier committed
3
 *
4
5
 * See COPYRIGHT in top-level directory.
 */
6
7
8
9
10
11
#include "mona-types.h"
#include <string.h>

// ------------------------------------------------------------------------------------
// Mona progress loop logic
// ------------------------------------------------------------------------------------
12

Matthieu Dorier's avatar
Matthieu Dorier committed
13
14
static void mona_progress_loop(void* uarg)
{
15
    mona_instance_t mona = (mona_instance_t)uarg;
Matthieu Dorier's avatar
Matthieu Dorier committed
16
17
18
    na_return_t     trigger_ret, na_ret;
    unsigned int    actual_count = 0;
    size_t          size;
19

Matthieu Dorier's avatar
Matthieu Dorier committed
20
    while (!mona->finalize_flag) {
21
22

        do {
Matthieu Dorier's avatar
Matthieu Dorier committed
23
24
25
26
            trigger_ret
                = NA_Trigger(mona->na_context, 0, 1, NULL, &actual_count);
        } while ((trigger_ret == NA_SUCCESS) && actual_count
                 && !mona->finalize_flag);
27

28
        ABT_pool_get_size(mona->progress_pool, &size);
Matthieu Dorier's avatar
Matthieu Dorier committed
29
        if (size) ABT_thread_yield();
30
31
32
33

        // TODO put a high timeout value to avoid busy-spinning
        // if there is no other ULT in the pool that could run
        na_ret = NA_Progress(mona->na_class, mona->na_context, 0);
Matthieu Dorier's avatar
Matthieu Dorier committed
34
        if (na_ret != NA_SUCCESS && na_ret != NA_TIMEOUT) {
Matthieu Dorier's avatar
Matthieu Dorier committed
35
36
37
            fprintf(stderr,
                    "WARNING: unexpected return value from NA_Progress (%d)\n",
                    na_ret);
38
39
40
41
        }
    }
}

42
43
44
45
// ------------------------------------------------------------------------------------
// Mona initialization logic
// ------------------------------------------------------------------------------------

Matthieu Dorier's avatar
Matthieu Dorier committed
46
47
48
mona_instance_t mona_init(const char*                info_string,
                          na_bool_t                  listen,
                          const struct na_init_info* na_init_info)
49
{
Matthieu Dorier's avatar
Matthieu Dorier committed
50
    return mona_init_thread(info_string, listen, na_init_info, NA_FALSE);
51
52
}

Matthieu Dorier's avatar
Matthieu Dorier committed
53
54
55
56
mona_instance_t mona_init_thread(const char*                info_string,
                                 na_bool_t                  listen,
                                 const struct na_init_info* na_init_info,
                                 na_bool_t                  use_progress_es)
57
{
Matthieu Dorier's avatar
Matthieu Dorier committed
58
59
60
61
    int             ret;
    ABT_xstream     xstream       = ABT_XSTREAM_NULL;
    ABT_pool        progress_pool = ABT_POOL_NULL;
    mona_instance_t mona          = MONA_INSTANCE_NULL;
62

Matthieu Dorier's avatar
Matthieu Dorier committed
63
    if (use_progress_es == NA_TRUE) {
64

Matthieu Dorier's avatar
Matthieu Dorier committed
65
66
67
        ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPSC,
                                    ABT_FALSE, &progress_pool);
        if (ret != ABT_SUCCESS) goto error;
68

Matthieu Dorier's avatar
Matthieu Dorier committed
69
70
71
        ret = ABT_xstream_create_basic(ABT_SCHED_DEFAULT, 1, &progress_pool,
                                       ABT_SCHED_CONFIG_NULL, &xstream);
        if (ret != ABT_SUCCESS) goto error;
72
73
74
75

    } else {

        ret = ABT_xstream_self(&xstream);
Matthieu Dorier's avatar
Matthieu Dorier committed
76
        if (ret != ABT_SUCCESS) goto error;
77
78

        ret = ABT_xstream_get_main_pools(xstream, 1, &progress_pool);
Matthieu Dorier's avatar
Matthieu Dorier committed
79
        if (ret != ABT_SUCCESS) goto error;
80
81
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
82
83
    mona = mona_init_pool(info_string, listen, na_init_info, progress_pool);
    if (!mona) goto error;
84

Matthieu Dorier's avatar
Matthieu Dorier committed
85
86
    if (use_progress_es == NA_TRUE) {
        mona->owns_progress_pool    = NA_TRUE;
87
88
89
90
91
92
93
94
95
        mona->owns_progress_xstream = NA_TRUE;
    }

    mona->progress_xstream = xstream;

finish:
    return mona;

error:
Matthieu Dorier's avatar
Matthieu Dorier committed
96
    if (progress_pool != ABT_POOL_NULL && use_progress_es == NA_TRUE)
97
        ABT_pool_free(&progress_pool);
Matthieu Dorier's avatar
Matthieu Dorier committed
98
    if (xstream != ABT_XSTREAM_NULL && use_progress_es == NA_TRUE)
99
100
101
102
103
        ABT_xstream_free(&xstream);
    mona = MONA_INSTANCE_NULL;
    goto finish;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
104
105
106
107
mona_instance_t mona_init_pool(const char*                info_string,
                               na_bool_t                  listen,
                               const struct na_init_info* na_init_info,
                               ABT_pool                   progress_pool)
108
{
Matthieu Dorier's avatar
Matthieu Dorier committed
109
110
111
    na_class_t*     na_class   = NULL;
    na_context_t*   na_context = NULL;
    mona_instance_t mona       = MONA_INSTANCE_NULL;
112
113

    na_class = NA_Initialize_opt(info_string, listen, na_init_info);
Matthieu Dorier's avatar
Matthieu Dorier committed
114
    if (!na_class) goto error;
115
116

    na_context = NA_Context_create(na_class);
Matthieu Dorier's avatar
Matthieu Dorier committed
117
    if (!na_context) goto error;
118
119

    mona = mona_init_na_pool(na_class, na_context, progress_pool);
Matthieu Dorier's avatar
Matthieu Dorier committed
120
    if (!mona) goto error;
121
122
123
124
125
126
127

    mona->owns_na_class_and_context = NA_TRUE;

finish:
    return mona;

error:
Matthieu Dorier's avatar
Matthieu Dorier committed
128
129
    if (na_context) NA_Context_destroy(na_class, na_context);
    if (na_class) NA_Finalize(na_class);
130
131
132
133
    mona = MONA_INSTANCE_NULL;
    goto finish;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
134
135
136
mona_instance_t mona_init_na_pool(na_class_t*   na_class,
                                  na_context_t* na_context,
                                  ABT_pool      progress_pool)
137
{
Matthieu Dorier's avatar
Matthieu Dorier committed
138
    int             ret, i;
139
    mona_instance_t mona = (mona_instance_t)calloc(1, sizeof(*mona));
Matthieu Dorier's avatar
Matthieu Dorier committed
140
    if (!mona) return MONA_INSTANCE_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
141
142
143
    mona->na_class         = na_class;
    mona->na_context       = na_context;
    mona->progress_pool    = progress_pool;
144
    mona->progress_xstream = ABT_XSTREAM_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
145
146
147
    mona->progress_thread  = ABT_THREAD_NULL;
    mona->op_id_cache_mtx  = ABT_MUTEX_NULL;
    mona->req_cache_mtx    = ABT_MUTEX_NULL;
148
    mona->msg_cache_mtx    = ABT_MUTEX_NULL;
149
150
    mona->pending_msg_mtx  = ABT_MUTEX_NULL;
    mona->pending_msg_cv   = ABT_COND_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
151
152
    ret                    = ABT_mutex_create(&(mona->op_id_cache_mtx));
    if (ret != ABT_SUCCESS) goto error;
Matthieu Dorier's avatar
Matthieu Dorier committed
153
    ret = ABT_mutex_create(&(mona->req_cache_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
154
    if (ret != ABT_SUCCESS) goto error;
155
    ret = ABT_mutex_create(&(mona->msg_cache_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
156
    if (ret != ABT_SUCCESS) goto error;
157
    ret = ABT_mutex_create(&(mona->pending_msg_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
158
    if (ret != ABT_SUCCESS) goto error;
159
    ret = ABT_cond_create(&(mona->pending_msg_cv));
Matthieu Dorier's avatar
Matthieu Dorier committed
160
    if (ret != ABT_SUCCESS) goto error;
161
162
163
164
165

    mona->op_id_cache = (cached_op_id_t)calloc(1, sizeof(*(mona->op_id_cache)));
    mona->op_id_cache->op_id = NA_Op_create(na_class);

    cached_op_id_t current = mona->op_id_cache;
Matthieu Dorier's avatar
Matthieu Dorier committed
166
167
168
    for (i = 0; i < 15; i++) {
        current->next  = (cached_op_id_t)calloc(1, sizeof(*current));
        current        = current->next;
169
170
        current->op_id = NA_Op_create(na_class);
    }
171

Matthieu Dorier's avatar
Matthieu Dorier committed
172
173
174
175
    ret = ABT_thread_create(mona->progress_pool, mona_progress_loop,
                            (void*)mona, ABT_THREAD_ATTR_NULL,
                            &(mona->progress_thread));
    if (ret != ABT_SUCCESS) goto error;
176
177
178
179
180

finish:
    return mona;

error:
Matthieu Dorier's avatar
Matthieu Dorier committed
181
    if (mona->op_id_cache_mtx != ABT_MUTEX_NULL)
182
        ABT_mutex_free(&(mona->op_id_cache_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
183
    if (mona->req_cache_mtx != ABT_MUTEX_NULL)
Matthieu Dorier's avatar
Matthieu Dorier committed
184
        ABT_mutex_free(&(mona->req_cache_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
185
    if (mona->msg_cache_mtx != ABT_MUTEX_NULL)
186
        ABT_mutex_free(&(mona->msg_cache_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
187
    if (mona->pending_msg_mtx != ABT_MUTEX_NULL)
188
        ABT_mutex_free(&(mona->pending_msg_mtx));
Matthieu Dorier's avatar
Matthieu Dorier committed
189
    if (mona->pending_msg_cv != ABT_COND_NULL)
190
        ABT_cond_free(&(mona->pending_msg_cv));
191
192
193
194
195
196
197
198
199
200
    free(mona);
    mona = MONA_INSTANCE_NULL;
    goto finish;
}

na_return_t mona_finalize(mona_instance_t mona)
{
    mona->finalize_flag = NA_TRUE;
    ABT_thread_join(mona->progress_thread);

Matthieu Dorier's avatar
Matthieu Dorier committed
201
    if (mona->owns_progress_xstream) {
202
203
204
        ABT_xstream_join(mona->progress_xstream);
        ABT_xstream_free(&(mona->progress_xstream));
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
205
    if (mona->owns_progress_pool) ABT_pool_free(&(mona->progress_pool));
206

207
    clear_op_id_cache(mona);
208
209
    ABT_mutex_free(&(mona->op_id_cache_mtx));

210
    clear_req_cache(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
211
212
    ABT_mutex_free(&(mona->req_cache_mtx));

213
214
215
    clear_msg_cache(mona);
    ABT_mutex_free(&(mona->msg_cache_mtx));

216
217
218
    ABT_mutex_free(&(mona->pending_msg_mtx));
    ABT_cond_free(&(mona->pending_msg_cv));

Matthieu Dorier's avatar
Matthieu Dorier committed
219
220
    if (mona->owns_na_class_and_context) {
        NA_Context_destroy(mona->na_class, mona->na_context);
221
222
223
224
225
226
227
        NA_Finalize(mona->na_class);
    }
    free(mona);

    return NA_SUCCESS;
}

228
229
230
231
// ------------------------------------------------------------------------------------
// Mona info access logic
// ------------------------------------------------------------------------------------

232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
const char* mona_get_class_name(mona_instance_t mona)
{
    return NA_Get_class_name(mona->na_class);
}

const char* mona_get_class_protocol(mona_instance_t mona)
{
    return NA_Get_class_protocol(mona->na_class);
}

na_bool_t mona_is_listening(mona_instance_t mona)
{
    return NA_Is_listening(mona->na_class);
}

247
248
249
250
// ------------------------------------------------------------------------------------
// Mona addresses logic
// ------------------------------------------------------------------------------------

Matthieu Dorier's avatar
Matthieu Dorier committed
251
252
na_return_t
mona_addr_lookup(mona_instance_t mona, const char* name, na_addr_t* addr)
253
254
255
256
{
    return NA_Addr_lookup(mona->na_class, name, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
257
na_return_t mona_addr_free(mona_instance_t mona, na_addr_t addr)
258
259
260
261
{
    return NA_Addr_free(mona->na_class, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
262
na_return_t mona_addr_set_remove(mona_instance_t mona, na_addr_t addr)
263
264
265
266
{
    return NA_Addr_set_remove(mona->na_class, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
267
na_return_t mona_addr_self(mona_instance_t mona, na_addr_t* addr)
268
269
270
271
{
    return NA_Addr_self(mona->na_class, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
272
273
na_return_t
mona_addr_dup(mona_instance_t mona, na_addr_t addr, na_addr_t* dup_addr)
274
275
276
277
{
    return NA_Addr_dup(mona->na_class, addr, dup_addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
278
na_bool_t mona_addr_cmp(mona_instance_t mona, na_addr_t addr1, na_addr_t addr2)
279
{
Matthieu Dorier's avatar
Matthieu Dorier committed
280
281
282
    if (addr1 == NA_ADDR_NULL && addr2 == NA_ADDR_NULL) return NA_TRUE;
    if (addr1 == NA_ADDR_NULL || addr2 == NA_ADDR_NULL) return NA_FALSE;
    char      str1[256], str2[256];
283
284
285
286
287
    na_size_t s = 256;
    mona_addr_to_string(mona, str1, &s, addr1);
    s = 256;
    mona_addr_to_string(mona, str2, &s, addr2);
    return strcmp(str1, str2) == 0 ? NA_TRUE : NA_FALSE;
288
289
}

Matthieu Dorier's avatar
Matthieu Dorier committed
290
na_bool_t mona_addr_is_self(mona_instance_t mona, na_addr_t addr)
291
292
293
294
{
    return NA_Addr_is_self(mona->na_class, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
295
296
297
298
na_return_t mona_addr_to_string(mona_instance_t mona,
                                char*           buf,
                                na_size_t*      buf_size,
                                na_addr_t       addr)
299
300
301
302
{
    return NA_Addr_to_string(mona->na_class, buf, buf_size, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
303
na_size_t mona_addr_get_serialize_size(mona_instance_t mona, na_addr_t addr)
304
305
306
307
{
    return NA_Addr_get_serialize_size(mona->na_class, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
308
309
310
311
na_return_t mona_addr_serialize(mona_instance_t mona,
                                void*           buf,
                                na_size_t       buf_size,
                                na_addr_t       addr)
312
313
314
315
{
    return NA_Addr_serialize(mona->na_class, buf, buf_size, addr);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
316
317
318
319
na_return_t mona_addr_deserialize(mona_instance_t mona,
                                  na_addr_t*      addr,
                                  const void*     buf,
                                  na_size_t       buf_size)
320
321
322
323
{
    return NA_Addr_deserialize(mona->na_class, addr, buf, buf_size);
}

324
325
326
327
// ------------------------------------------------------------------------------------
// Mona message information logic
// ------------------------------------------------------------------------------------

Matthieu Dorier's avatar
Matthieu Dorier committed
328
na_size_t mona_msg_get_max_unexpected_size(mona_instance_t mona)
329
330
331
332
{
    return NA_Msg_get_max_unexpected_size(mona->na_class);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
333
na_size_t mona_msg_get_max_expected_size(mona_instance_t mona)
334
335
336
337
{
    return NA_Msg_get_max_expected_size(mona->na_class);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
338
na_size_t mona_msg_get_unexpected_header_size(mona_instance_t mona)
339
340
341
342
{
    return NA_Msg_get_unexpected_header_size(mona->na_class);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
343
na_size_t mona_msg_get_expected_header_size(mona_instance_t mona)
344
345
346
347
348
349
350
351
352
{
    return NA_Msg_get_expected_header_size(mona->na_class);
}

na_tag_t mona_msg_get_max_tag(mona_instance_t mona)
{
    return NA_Msg_get_max_tag(mona->na_class);
}

353
354
355
356
// ------------------------------------------------------------------------------------
// Mona operation logic
// ------------------------------------------------------------------------------------

357
na_op_id_t* mona_op_create(mona_instance_t mona)
358
359
360
361
{
    return NA_Op_create(mona->na_class);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
362
na_return_t mona_op_destroy(mona_instance_t mona, na_op_id_t* op_id)
363
364
365
366
{
    return NA_Op_destroy(mona->na_class, op_id);
}

367
368
369
// ------------------------------------------------------------------------------------
// Mona message buffer logic
// ------------------------------------------------------------------------------------
Matthieu Dorier's avatar
Matthieu Dorier committed
370

Matthieu Dorier's avatar
Matthieu Dorier committed
371
372
373
void* mona_msg_buf_alloc(mona_instance_t mona,
                         na_size_t       buf_size,
                         void**          plugin_data)
374
375
376
377
{
    return NA_Msg_buf_alloc(mona->na_class, buf_size, plugin_data);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
378
379
na_return_t
mona_msg_buf_free(mona_instance_t mona, void* buf, void* plugin_data)
380
381
382
383
{
    return NA_Msg_buf_free(mona->na_class, buf, plugin_data);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
384
385
na_return_t
mona_msg_init_unexpected(mona_instance_t mona, void* buf, na_size_t buf_size)
386
387
388
389
{
    return NA_Msg_init_unexpected(mona->na_class, buf, buf_size);
}

390
391
392
// ------------------------------------------------------------------------------------
// Mona request logic
// ------------------------------------------------------------------------------------
393
394
395
396

na_return_t mona_wait(mona_request_t req)
{
    na_return_t na_ret = mona_wait_internal(req);
397
398
    if(na_ret == NA_SUCCESS)
        free(req);
399
400
401
402
403
    return na_ret;
}

int mona_test(mona_request_t req, int* flag)
{
404
    if(req == MONA_REQUEST_NULL) return ABT_ERR_OTHER;
405
406
407
    return ABT_eventual_test(req->eventual, NULL, flag);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
408
409
410
411
412
na_return_t mona_wait_any(size_t count, mona_request_t* reqs, size_t* index)
{
    // XXX this is an active loop, we should change it
    // when Argobots provide an ABT_eventual_wait_any
    size_t i;
Matthieu Dorier's avatar
Matthieu Dorier committed
413
414
415
    int    ret;
    int    flag = 0;
    int    has_pending_requests;
Matthieu Dorier's avatar
Matthieu Dorier committed
416
417
try_again:
    has_pending_requests = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
418
419
    for (i = 0; i < count; i++) {
        if (reqs[i] == MONA_REQUEST_NULL)
Matthieu Dorier's avatar
Matthieu Dorier committed
420
421
422
423
            continue;
        else
            has_pending_requests = 1;
        ret = mona_test(reqs[i], &flag);
Matthieu Dorier's avatar
Matthieu Dorier committed
424
        if (ret != ABT_SUCCESS) {
Matthieu Dorier's avatar
Matthieu Dorier committed
425
426
427
            *index = i;
            return NA_RETURN_MAX;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
428
429
        if (flag) {
            *index             = i;
Matthieu Dorier's avatar
Matthieu Dorier committed
430
            mona_request_t req = reqs[i];
Matthieu Dorier's avatar
Matthieu Dorier committed
431
            reqs[i]            = MONA_REQUEST_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
432
433
434
435
            return mona_wait(req);
        }
    }
    ABT_thread_yield();
Matthieu Dorier's avatar
Matthieu Dorier committed
436
    if (has_pending_requests) goto try_again;
Matthieu Dorier's avatar
Matthieu Dorier committed
437
438
439
440
    *index = count;
    return NA_SUCCESS;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
441
static int mona_callback(const struct na_cb_info* info)
442
{
Matthieu Dorier's avatar
Matthieu Dorier committed
443
444
    na_return_t    na_ret = info->ret;
    mona_request_t req    = (mona_request_t)(info->arg);
445

Matthieu Dorier's avatar
Matthieu Dorier committed
446
    if (na_ret == NA_SUCCESS && info->type == NA_CB_RECV_UNEXPECTED) {
447
        na_addr_t source = info->info.recv_unexpected.source;
Matthieu Dorier's avatar
Matthieu Dorier committed
448
        na_tag_t  tag    = info->info.recv_unexpected.tag;
449
        na_size_t size   = info->info.recv_unexpected.actual_buf_size;
Matthieu Dorier's avatar
Matthieu Dorier committed
450
        if (req->source_addr) {
451
452
            mona_addr_dup(req->mona, source, req->source_addr);
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
453
454
        if (req->tag) { *(req->tag) = tag; }
        if (req->size) { *(req->size) = size; }
455
    }
456
457
458
459
    ABT_eventual_set(req->eventual, &na_ret, sizeof(na_ret));
    return NA_SUCCESS;
}

460
461
462
463
// ------------------------------------------------------------------------------------
// Mona high-level send/recv logic
// ------------------------------------------------------------------------------------

Matthieu Dorier's avatar
Matthieu Dorier committed
464
465
466
467
468
469
na_return_t mona_send(mona_instance_t mona,
                      const void*     buf,
                      na_size_t       buf_size,
                      na_addr_t       dest,
                      na_uint8_t      dest_id,
                      na_tag_t        tag)
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
{
    return mona_send_nc(mona, 1, &buf, &buf_size, dest, dest_id, tag);
}

struct isend_args {
    mona_instance_t mona;
    const void*     buf;
    na_size_t       buf_size;
    na_addr_t       dest;
    na_uint8_t      dest_id;
    na_tag_t        tag;
    mona_request_t  req;
};

static void isend_thread(void* x)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
486
487
488
    struct isend_args* args   = (struct isend_args*)x;
    na_return_t        na_ret = mona_send(args->mona, args->buf, args->buf_size,
                                   args->dest, args->dest_id, args->tag);
489
490
491
492
    ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
    free(args);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
493
494
495
496
497
498
499
na_return_t mona_isend(mona_instance_t mona,
                       const void*     buf,
                       na_size_t       buf_size,
                       na_addr_t       dest,
                       na_uint8_t      dest_id,
                       na_tag_t        tag,
                       mona_request_t* req)
500
501
{
    ABT_eventual eventual;
Matthieu Dorier's avatar
Matthieu Dorier committed
502
503
    int          ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
    if (ret != 0) return NA_NOMEM;
504
505

    struct isend_args* args = (struct isend_args*)malloc(sizeof(*args));
Matthieu Dorier's avatar
Matthieu Dorier committed
506
507
508
509
510
511
    args->mona              = mona;
    args->buf               = buf;
    args->buf_size          = buf_size;
    args->dest              = dest;
    args->dest_id           = dest_id;
    args->tag               = tag;
512
513

    mona_request_t tmp_req = get_req_from_cache(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
514
515
    tmp_req->eventual      = eventual;
    args->req              = tmp_req;
516

Matthieu Dorier's avatar
Matthieu Dorier committed
517
518
519
    ret = ABT_thread_create(mona->progress_pool, isend_thread, args,
                            ABT_THREAD_ATTR_NULL, NULL);
    if (ret != ABT_SUCCESS) {
520
521
522
523
524
525
526
527
528
        return_req_to_cache(mona, tmp_req);
        return NA_NOMEM;
    } else {
        *req = tmp_req;
        ABT_thread_yield();
    }
    return NA_SUCCESS;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
529
530
531
532
533
534
535
na_return_t mona_send_nc(mona_instance_t    mona,
                         na_size_t          count,
                         const void* const* buffers,
                         const na_size_t*   buf_sizes,
                         na_addr_t          dest,
                         na_uint8_t         dest_id,
                         na_tag_t           tag)
536
{
Matthieu Dorier's avatar
Matthieu Dorier committed
537
    na_return_t     na_ret     = NA_SUCCESS;
538
    na_mem_handle_t mem_handle = NA_MEM_HANDLE_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
539
540
541
542
    na_size_t       msg_size   = mona_msg_get_unexpected_header_size(mona) + 1;
    na_size_t       data_size  = 0;
    cached_msg_t    msg        = get_msg_from_cache(mona);
    unsigned        i;
543

Matthieu Dorier's avatar
Matthieu Dorier committed
544
    for (i = 0; i < count; i++) { data_size += buf_sizes[i]; }
545
    msg_size += data_size;
546

Matthieu Dorier's avatar
Matthieu Dorier committed
547
    if (msg_size <= mona_msg_get_max_unexpected_size(mona)) {
548
549

        na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
Matthieu Dorier's avatar
Matthieu Dorier committed
550
        if (na_ret != NA_SUCCESS) goto finish;
551
552

        char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
553
        *p      = HL_MSG_SMALL;
554
        p += 1;
555

Matthieu Dorier's avatar
Matthieu Dorier committed
556
        for (i = 0; i < count; i++) {
557
558
559
            memcpy(p, buffers[i], buf_sizes[i]);
            p += buf_sizes[i];
        }
560

Matthieu Dorier's avatar
Matthieu Dorier committed
561
562
        na_ret = mona_msg_send_unexpected(mona, msg->buffer, msg_size,
                                          msg->plugin_data, dest, dest_id, tag);
563
564
565
566

    } else {

        // Expose user memory for RDMA
Matthieu Dorier's avatar
Matthieu Dorier committed
567
568
569
570
        if (count == 1) {
            na_ret
                = mona_mem_handle_create(mona, (void*)buffers[0], buf_sizes[0],
                                         NA_MEM_READ_ONLY, &mem_handle);
571
        } else {
Matthieu Dorier's avatar
Matthieu Dorier committed
572
573
            struct na_segment* segments = alloca(sizeof(*segments) * count);
            for (i = 0; i < count; i++) {
574
                segments[i].base = (na_ptr_t)buffers[i];
Matthieu Dorier's avatar
Matthieu Dorier committed
575
                segments[i].len  = buf_sizes[i];
576
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
577
578
            na_ret = mona_mem_handle_create_segments(
                mona, segments, count, NA_MEM_READ_ONLY, &mem_handle);
579
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
580
        if (na_ret != NA_SUCCESS) goto finish;
581
582

        na_ret = mona_mem_register(mona, mem_handle);
Matthieu Dorier's avatar
Matthieu Dorier committed
583
        if (na_ret != NA_SUCCESS) goto finish;
584

Matthieu Dorier's avatar
Matthieu Dorier committed
585
586
        na_ret
            = mona_send_mem(mona, mem_handle, data_size, 0, dest, dest_id, tag);
587
        mona_mem_deregister(mona, mem_handle);
588
589
590
    }

finish:
Matthieu Dorier's avatar
Matthieu Dorier committed
591
    if (mem_handle != NA_MEM_HANDLE_NULL) {
592
        mona_mem_handle_free(mona, mem_handle);
593
    }
594
595
596
597
    return_msg_to_cache(mona, msg);
    return na_ret;
}

598
599
600
601
602
603
604
605
606
struct isend_nc_args {
    mona_instance_t    mona;
    na_size_t          count;
    const void* const* buffers;
    const na_size_t*   buf_sizes;
    na_addr_t          dest;
    na_uint8_t         dest_id;
    na_tag_t           tag;
    mona_request_t     req;
607
608
};

609
static void isend_nc_thread(void* x)
610
{
611
    struct isend_nc_args* args = (struct isend_nc_args*)x;
Matthieu Dorier's avatar
Matthieu Dorier committed
612
613
614
    na_return_t           na_ret
        = mona_send_nc(args->mona, args->count, args->buffers, args->buf_sizes,
                       args->dest, args->dest_id, args->tag);
615
616
617
618
    ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
    free(args);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
619
620
621
622
623
624
625
626
na_return_t mona_isend_nc(mona_instance_t    mona,
                          na_size_t          count,
                          const void* const* buffers,
                          const na_size_t*   buf_sizes,
                          na_addr_t          dest,
                          na_uint8_t         dest_id,
                          na_tag_t           tag,
                          mona_request_t*    req)
627
{
628
    ABT_eventual eventual;
Matthieu Dorier's avatar
Matthieu Dorier committed
629
630
    int          ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
    if (ret != 0) return NA_NOMEM;
631

632
    struct isend_nc_args* args = (struct isend_nc_args*)malloc(sizeof(*args));
Matthieu Dorier's avatar
Matthieu Dorier committed
633
634
635
636
637
638
639
    args->mona                 = mona;
    args->count                = count;
    args->buffers              = buffers;
    args->buf_sizes            = buf_sizes;
    args->dest                 = dest;
    args->dest_id              = dest_id;
    args->tag                  = tag;
640
641

    mona_request_t tmp_req = get_req_from_cache(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
642
643
    tmp_req->eventual      = eventual;
    args->req              = tmp_req;
644

Matthieu Dorier's avatar
Matthieu Dorier committed
645
646
647
    ret = ABT_thread_create(mona->progress_pool, isend_nc_thread, args,
                            ABT_THREAD_ATTR_NULL, NULL);
    if (ret != ABT_SUCCESS) {
648
649
650
651
652
653
654
        return_req_to_cache(mona, tmp_req);
        return NA_NOMEM;
    } else {
        *req = tmp_req;
        ABT_thread_yield();
    }
    return NA_SUCCESS;
655
656
}

Matthieu Dorier's avatar
Matthieu Dorier committed
657
658
659
660
661
662
663
na_return_t mona_send_mem(mona_instance_t mona,
                          na_mem_handle_t mem,
                          na_size_t       size,
                          na_size_t       offset,
                          na_addr_t       dest,
                          na_uint8_t      dest_id,
                          na_tag_t        tag)
664
{
Matthieu Dorier's avatar
Matthieu Dorier committed
665
666
667
    na_return_t  na_ret   = NA_SUCCESS;
    na_size_t    msg_size = 0;
    cached_msg_t msg      = get_msg_from_cache(mona);
668
669
670
671
672

    na_size_t mem_handle_size = mona_mem_handle_get_serialize_size(mona, mem);

    // Initialize message to send
    msg_size = mona_msg_get_unexpected_header_size(mona) // NA header
Matthieu Dorier's avatar
Matthieu Dorier committed
673
674
675
676
677
             + 1                 // type of message (HL_MSG_*)
             + sizeof(na_size_t) // size of the serialized handle
             + sizeof(na_size_t) // size of the data
             + sizeof(na_size_t) // offset in handle
             + mem_handle_size;
678
679

    na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
Matthieu Dorier's avatar
Matthieu Dorier committed
680
    if (na_ret != NA_SUCCESS) goto finish;
681
682
683

    // Fill in the message
    char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
684
    *p      = HL_MSG_LARGE;
685
686
687
688
689
690
691
692
693
694
695
696
    p += 1;
    // size of serialized handle
    memcpy(p, &mem_handle_size, sizeof(mem_handle_size));
    p += sizeof(mem_handle_size);
    // size of the data
    memcpy(p, &size, sizeof(size));
    p += sizeof(size);
    // offset in the handle
    memcpy(p, &offset, sizeof(offset));
    p += sizeof(offset);
    // serialized handle
    na_ret = mona_mem_handle_serialize(mona, p, mem_handle_size, mem);
Matthieu Dorier's avatar
Matthieu Dorier committed
697
    if (na_ret != NA_SUCCESS) goto finish;
698
699
700
701
702
703

    // Initialize ack message to receive
    cached_msg_t   ack_msg      = get_msg_from_cache(mona);
    na_size_t      ack_msg_size = mona_msg_get_unexpected_header_size(mona) + 1;
    mona_request_t ack_req      = MONA_REQUEST_NULL;
    cached_op_id_t ack_cache_id = get_op_id_from_cache(mona);
704
    na_op_id_t*    ack_op_id    = ack_cache_id->op_id;
705
706
707

    // Issue non-blocking receive for ACK
    na_ret = mona_msg_irecv_expected(mona, ack_msg->buffer, ack_msg_size,
Matthieu Dorier's avatar
Matthieu Dorier committed
708
709
710
                                     ack_msg->plugin_data, dest, dest_id, tag,
                                     ack_op_id, &ack_req);
    if (na_ret != NA_SUCCESS) {
711
712
713
714
715
        return_op_id_to_cache(mona, ack_cache_id);
        goto finish;
    }

    // Issue send of message with mem handle
Matthieu Dorier's avatar
Matthieu Dorier committed
716
717
718
    na_ret = mona_msg_send_unexpected(mona, msg->buffer, msg_size,
                                      msg->plugin_data, dest, dest_id, tag);
    if (na_ret != NA_SUCCESS) {
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
        mona_cancel(mona, ack_op_id);
        return_op_id_to_cache(mona, ack_cache_id);
        goto finish;
    }

    // Wait for acknowledgement
    na_ret = mona_wait(ack_req);
    return_op_id_to_cache(mona, ack_cache_id);

finish:
    return_msg_to_cache(mona, msg);
    return na_ret;
}

struct isend_mem_args {
    mona_instance_t mona;
    na_mem_handle_t mem;
    na_size_t       size;
    na_size_t       offset;
    na_addr_t       dest;
    na_uint8_t      dest_id;
    na_tag_t        tag;
    mona_request_t  req;
};

static void isend_mem_thread(void* x)
{
    struct isend_mem_args* args = (struct isend_mem_args*)x;
Matthieu Dorier's avatar
Matthieu Dorier committed
747
748
749
    na_return_t            na_ret
        = mona_send_mem(args->mona, args->mem, args->size, args->offset,
                        args->dest, args->dest_id, args->tag);
750
751
752
753
    ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
    free(args);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
754
755
756
757
758
759
760
761
na_return_t mona_isend_mem(mona_instance_t mona,
                           na_mem_handle_t mem,
                           na_size_t       size,
                           na_size_t       offset,
                           na_addr_t       dest,
                           na_uint8_t      dest_id,
                           na_tag_t        tag,
                           mona_request_t* req)
762
763
{
    ABT_eventual eventual;
Matthieu Dorier's avatar
Matthieu Dorier committed
764
765
    int          ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
    if (ret != 0) return NA_NOMEM;
766
767

    struct isend_mem_args* args = (struct isend_mem_args*)malloc(sizeof(*args));
Matthieu Dorier's avatar
Matthieu Dorier committed
768
769
770
771
772
773
774
    args->mona                  = mona;
    args->mem                   = mem;
    args->size                  = size;
    args->offset                = offset;
    args->dest                  = dest;
    args->dest_id               = dest_id;
    args->tag                   = tag;
775
776

    mona_request_t tmp_req = get_req_from_cache(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
777
778
    tmp_req->eventual      = eventual;
    args->req              = tmp_req;
779

Matthieu Dorier's avatar
Matthieu Dorier committed
780
781
782
    ret = ABT_thread_create(mona->progress_pool, isend_mem_thread, args,
                            ABT_THREAD_ATTR_NULL, NULL);
    if (ret != ABT_SUCCESS) {
783
784
785
786
787
788
789
790
791
        return_req_to_cache(mona, tmp_req);
        return NA_NOMEM;
    } else {
        *req = tmp_req;
        ABT_thread_yield();
    }
    return NA_SUCCESS;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
792
793
794
795
796
797
static cached_msg_t wait_for_matching_unexpected_message(mona_instance_t mona,
                                                         na_addr_t       src,
                                                         na_tag_t        tag,
                                                         na_size_t* actual_size,
                                                         na_addr_t* actual_src,
                                                         na_tag_t*  actual_tag)
798
{
Matthieu Dorier's avatar
Matthieu Dorier committed
799
800
801
    cached_msg_t msg      = NULL; /* result */
    na_size_t    msg_size = mona_msg_get_max_unexpected_size(mona);
    na_return_t  na_ret   = NA_SUCCESS;
802
803
804
805
806

    // lock the queue of pending messages
    ABT_mutex_lock(mona->pending_msg_mtx);

    // search in the queue of pending messages for one matching
Matthieu Dorier's avatar
Matthieu Dorier committed
807
808
809
810
811
812
813
814
815
816
817
search_in_queue : {
    pending_msg_t p_msg      = mona->pending_msg_oldest;
    pending_msg_t p_prev_msg = NULL;
    while (p_msg) {
        if ((tag == MONA_ANY_TAG || p_msg->recv_tag == tag)
            && (src == MONA_ANY_SOURCE
                || mona_addr_cmp(mona, src, p_msg->recv_addr))) {
            break;
        } else {
            p_prev_msg = p_msg;
            p_msg      = p_msg->cached_msg->next;
818
819
        }
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
    if (p_msg) { // matching message was found
        msg = p_msg->cached_msg;
        // remove it from the queue of pending messages
        if (p_prev_msg) p_prev_msg->cached_msg->next = p_msg->cached_msg->next;
        if (p_msg == mona->pending_msg_oldest)
            mona->pending_msg_oldest = p_msg->cached_msg->next;
        if (p_msg == mona->pending_msg_newest)
            mona->pending_msg_newest = p_prev_msg;
        // unlock the queue
        ABT_mutex_unlock(mona->pending_msg_mtx);
        // copy size, source, and tag
        if (actual_size) *actual_size = p_msg->recv_size;
        if (actual_src) mona_addr_dup(mona, p_msg->recv_addr, actual_src);
        if (actual_tag) *actual_tag = p_msg->recv_tag;
        // free the pending message object
        mona_addr_free(mona, p_msg->recv_addr);
        free(p_msg);
        // return the message
        return msg;
    }
}
841
842
    // here the matching message wasn't found in the queue
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
843
844
845
        // if another thread is actively issuing unexpected recv, wait for the
        // queue to update
        if (mona->pending_msg_queue_active) {
846
            ABT_cond_wait(mona->pending_msg_cv, mona->pending_msg_mtx);
Matthieu Dorier's avatar
Matthieu Dorier committed
847
            if (mona->pending_msg_queue_active) goto search_in_queue;
848
849
        }
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
850
851
852
    // here no matching message was found and there isn't any other threads
    // updating the queue so this thread will take the responsibility for
    // actively listening for messages
853
854
    mona->pending_msg_queue_active = NA_TRUE;
    ABT_mutex_unlock(mona->pending_msg_mtx);
Matthieu Dorier's avatar
Matthieu Dorier committed
855
856
857
858
859
860
861
862
863
864
865
866
867
recv_new_message : {
    na_size_t recv_size = 0;
    na_addr_t recv_addr = NA_ADDR_NULL;
    na_tag_t  recv_tag  = 0;
    // get message from cache
    msg = get_msg_from_cache(mona);
    // issue unexpected recv
    na_ret = mona_msg_recv_unexpected(mona, msg->buffer, msg_size,
                                      msg->plugin_data, &recv_addr, &recv_tag,
                                      &recv_size);
    if (na_ret != NA_SUCCESS) goto error;
    // check is received message is matching
    if ((tag == MONA_ANY_TAG || recv_tag == tag)
868
        && (src == MONA_ANY_SOURCE || mona_addr_cmp(mona, src, recv_addr))) {
Matthieu Dorier's avatar
Matthieu Dorier committed
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
        // received message matches
        // notify other threads that this thread won't be updating the queue
        // anymore
        ABT_mutex_lock(mona->pending_msg_mtx);
        mona->pending_msg_queue_active = NA_FALSE;
        ABT_mutex_unlock(mona->pending_msg_mtx);
        ABT_cond_broadcast(mona->pending_msg_cv);
        // copy size, source, and tag
        if (actual_size) *actual_size = recv_size;
        if (actual_src)
            *actual_src = recv_addr;
        else
            mona_addr_free(mona, recv_addr);
        if (actual_tag) *actual_tag = recv_tag;
        // return the message
        return msg;
885

Matthieu Dorier's avatar
Matthieu Dorier committed
886
887
888
889
890
891
892
893
894
895
896
897
898
    } else {
        // received message doesn't match, create a pending message...
        pending_msg_t p_msg = (pending_msg_t)malloc(sizeof(*p_msg));
        p_msg->cached_msg   = msg;
        p_msg->recv_size    = recv_size;
        p_msg->recv_addr    = recv_addr;
        p_msg->recv_tag     = recv_tag;
        msg->next           = NULL;
        // ... and put it in the queue
        ABT_mutex_lock(mona->pending_msg_mtx);
        if (mona->pending_msg_oldest == NULL) {
            mona->pending_msg_oldest = p_msg;
            mona->pending_msg_newest = p_msg;
899
        } else {
Matthieu Dorier's avatar
Matthieu Dorier committed
900
901
            mona->pending_msg_newest->cached_msg->next = p_msg;
            mona->pending_msg_newest                   = p_msg;
902
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
903
904
905
906
        // notify other threads that the queue has been updated
        ABT_mutex_unlock(mona->pending_msg_mtx);
        ABT_cond_broadcast(mona->pending_msg_cv);
        goto recv_new_message;
907
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
908
}
909
910
    // error handling
error:
Matthieu Dorier's avatar
Matthieu Dorier committed
911
    if (msg) return_msg_to_cache(mona, msg);
912
913
914
915
916
    ABT_mutex_unlock(mona->pending_msg_mtx);
    ABT_cond_broadcast(mona->pending_msg_cv);
    return NULL;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
917
918
919
920
921
922
923
924
na_return_t mona_recv(mona_instance_t mona,
                      void*           buf,
                      na_size_t       size,
                      na_addr_t       src,
                      na_tag_t        tag,
                      na_size_t*      actual_size,
                      na_addr_t*      actual_src,
                      na_tag_t*       actual_tag)
925
{
Matthieu Dorier's avatar
Matthieu Dorier committed
926
927
    return mona_recv_nc(mona, 1, &buf, &size, src, tag, actual_size, actual_src,
                        actual_tag);
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
}

struct irecv_args {
    mona_instance_t mona;
    void*           buf;
    na_size_t       size;
    na_addr_t       src;
    na_tag_t        tag;
    na_size_t*      actual_size;
    na_addr_t*      actual_src;
    na_tag_t*       actual_tag;
    mona_request_t  req;
};

static void irecv_thread(void* x)
{
    struct irecv_args* args = (struct irecv_args*)x;
Matthieu Dorier's avatar
Matthieu Dorier committed
945
946
947
    na_return_t        na_ret
        = mona_recv(args->mona, args->buf, args->size, args->src, args->tag,
                    args->actual_size, args->actual_src, args->actual_tag);
948
949
950
951
    ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
    free(args);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
952
953
954
955
956
957
958
959
960
na_return_t mona_irecv(mona_instance_t mona,
                       void*           buf,
                       na_size_t       size,
                       na_addr_t       src,
                       na_tag_t        tag,
                       na_size_t*      actual_size,
                       na_addr_t*      actual_src,
                       na_tag_t*       actual_tag,
                       mona_request_t* req)
961
962
{
    ABT_eventual eventual;
Matthieu Dorier's avatar
Matthieu Dorier committed
963
964
    int          ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
    if (ret != 0) return NA_NOMEM;
965
966

    struct irecv_args* args = (struct irecv_args*)malloc(sizeof(*args));
Matthieu Dorier's avatar
Matthieu Dorier committed
967
968
969
970
971
972
973
974
    args->mona              = mona;
    args->buf               = buf;
    args->size              = size;
    args->src               = src;
    args->actual_size       = actual_size;
    args->actual_src        = actual_src;
    args->actual_tag        = actual_tag;
    args->tag               = tag;
975
976

    mona_request_t tmp_req = get_req_from_cache(mona);
Matthieu Dorier's avatar
Matthieu Dorier committed
977
978
    args->req              = tmp_req;
    tmp_req->eventual      = eventual;
979

Matthieu Dorier's avatar
Matthieu Dorier committed
980
981
982
    ret = ABT_thread_create(mona->progress_pool, irecv_thread, args,
                            ABT_THREAD_ATTR_NULL, NULL);
    if (ret != ABT_SUCCESS) {
983
984
985
986
987
988
989
990
991
        return_req_to_cache(mona, tmp_req);
        return NA_NOMEM;
    } else {
        *req = tmp_req;
        ABT_thread_yield();
    }
    return NA_SUCCESS;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
992
993
994
995
996
997
998
999
1000
na_return_t mona_recv_nc(mona_instance_t  mona,
                         na_size_t        count,
                         void**           buffers,
                         const na_size_t* buf_sizes,
                         na_addr_t        src,
                         na_tag_t         tag,
                         na_size_t*       actual_size,
                         na_addr_t*       actual_src,
                         na_tag_t*        actual_tag)
1001
{
1002

Matthieu Dorier's avatar
Matthieu Dorier committed
1003
    na_return_t     na_ret        = NA_SUCCESS;
1004
1005
    na_mem_handle_t mem_handle    = NA_MEM_HANDLE_NULL;
    na_mem_handle_t remote_handle = NA_MEM_HANDLE_NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
1006
1007
1008
1009
1010
1011
1012
1013
1014
    na_size_t       header_size   = mona_msg_get_unexpected_header_size(mona);
    cached_msg_t    msg           = NULL;
    na_size_t       recv_size     = 0;
    na_addr_t       recv_addr     = NA_ADDR_NULL;
    na_tag_t        recv_tag      = 0;
    na_size_t       max_data_size = 0;
    unsigned        i;

    for (i = 0; i < count; i++) { max_data_size += buf_sizes[i]; }
1015
1016

    // wait for a matching unexpected message to come around
Matthieu Dorier's avatar
Matthieu Dorier committed
1017
1018
1019
    msg = wait_for_matching_unexpected_message(mona, src, tag, &recv_size,
                                               &recv_addr, &recv_tag);
    if (!msg) return NA_PROTOCOL_ERROR;
1020
1021
1022
1023
1024

    // At this point, we know msg is the message we are looking for
    // and the attributes are recv_size, recv_tag, and recv_addr

    char* p = msg->buffer + header_size;
1025

Matthieu Dorier's avatar
Matthieu Dorier committed
1026
    if (*p == HL_MSG_SMALL) { // small message, embedded data
Matthieu Dorier's avatar
Matthieu Dorier committed
1027

1028
        p += 1;
1029
        recv_size -= header_size + 1;
1030
        na_size_t remaining_size = recv_size;
Matthieu Dorier's avatar
Matthieu Dorier committed
1031
1032
1033
        for (i = 0; i < count && remaining_size != 0; i++) {
            na_size_t s
                = remaining_size < buf_sizes[i] ? remaining_size : buf_sizes[i];
1034
1035
1036
1037
            memcpy(buffers[i], p, s);
            remaining_size -= s;
        }
        recv_size = recv_size < max_data_size ? recv_size : max_data_size;
1038

Matthieu Dorier's avatar
Matthieu Dorier committed
1039
    } else if (*p == HL_MSG_LARGE) { // large message, using RDMA transfer
1040
1041
1042
1043

        p += 1;
        na_size_t mem_handle_size;
        na_size_t data_size;
1044
        na_size_t remote_offset;
1045
        // read the size of the serialize mem handle
1046
1047
        memcpy(&mem_handle_size, p, sizeof(mem_handle_size));
        p += sizeof(mem_handle_size);
1048
        // read the size of the data associated with the mem handle
1049
1050
        memcpy(&data_size, p, sizeof(data_size));
        p += sizeof(data_size);
1051
1052
1053
        // read the offset
        memcpy(&remote_offset, p, sizeof(remote_offset));
        p += sizeof(remote_offset);
1054

1055
        // expose user memory for RDMA
Matthieu Dorier's avatar
Matthieu Dorier committed
1056
1057
1058
1059
        if (count == 1) {
            na_ret
                = mona_mem_handle_create(mona, (void*)buffers[0], buf_sizes[0],
                                         NA_MEM_WRITE_ONLY, &mem_handle);
1060
        } else {
Matthieu Dorier's avatar
Matthieu Dorier committed
1061
1062
            struct na_segment* segments = alloca(sizeof(*segments) * count);
            for (i = 0; i < count; i++) {
1063
                segments[i].base = (na_ptr_t)buffers[i];
Matthieu Dorier's avatar
Matthieu Dorier committed
1064
                segments[i].len  = buf_sizes[i];
1065
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
1066
1067
            na_ret = mona_mem_handle_create_segments(
                mona, segments, count, NA_MEM_WRITE_ONLY, &mem_handle);
1068
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
1069
        if (na_ret != NA_SUCCESS) goto finish;
1070
1071

        na_ret = mona_mem_register(mona, mem_handle);
Matthieu Dorier's avatar
Matthieu Dorier committed
1072
        if (na_ret != NA_SUCCESS) goto finish;
1073

1074
        // Deserialize remote memory handle
Matthieu Dorier's avatar
Matthieu Dorier committed
1075
1076
1077
        na_ret = mona_mem_handle_deserialize(mona, &remote_handle, p,
                                             mem_handle_size);
        if (na_ret != NA_SUCCESS) goto finish;
1078
1079
1080

        // Issue RDMA operation
        // XXX how do we support a source id different from 0 ?
1081
        data_size = data_size < max_data_size ? data_size : max_data_size;
Matthieu Dorier's avatar
Matthieu Dorier committed
1082
1083
1084
1085
        if (data_size) {
            na_ret = mona_get(mona, mem_handle, 0, remote_handle, remote_offset,
                              data_size, recv_addr, 0);
            if (na_ret != NA_SUCCESS) goto finish;
1086
        }
1087
        recv_size = data_size;
1088
1089

        // Send ACK
Matthieu Dorier's avatar
Matthieu Dorier committed
1090
1091
        na_size_t msg_size        = header_size + 1;
        msg->buffer[msg_size - 1] = 0;
1092
        na_ret = mona_msg_init_expected(mona, msg->buffer, msg_size);
Matthieu Dorier's avatar
Matthieu Dorier committed
1093
        if (na_ret != NA_SUCCESS) goto finish;
1094
1095

        // XXX how do we support a source id different from 0 ?
Matthieu Dorier's avatar
Matthieu Dorier committed
1096
1097
1098
1099
        na_ret
            = mona_msg_send_expected(mona, msg->buffer, msg_size,
                                     msg->plugin_data, recv_addr, 0, recv_tag);
        if (na_ret != NA_SUCCESS) goto finish;
1100
1101
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
1102
1103
1104
    if (actual_size) *actual_size = recv_size;
    if (actual_tag) *actual_tag = recv_tag;
    if (actual_src)
1105
1106
1107
1108
1109
        *actual_src = recv_addr;
    else
        mona_addr_free(mona, recv_addr);
    recv_addr = NA_ADDR_NULL;

1110
finish:
Matthieu Dorier's avatar
Matthieu Dorier committed
1111
1112
    if (recv_addr != NA_ADDR_NULL) mona_addr_free(mona, recv_addr);
    if (mem_handle != NA_MEM_HANDLE_NULL)
1113
        mona_mem_handle_free(mona, mem_handle);
Matthieu Dorier's avatar
Matthieu Dorier committed
1114
    if (remote_handle != NA_MEM_HANDLE_NULL)
1115
1116
1117
1118
1119
        mona_mem_handle_free(mona, remote_handle);
    return_msg_to_cache(mona, msg);
    return na_ret;
}

1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
struct irecv_nc_args {
    mona_instance_t  mona;
    na_size_t        count;
    void**           buffers;
    const na_size_t* buf_sizes;
    na_addr_t        src;
    na_tag_t         tag;
    na_size_t*       actual_size;
    na_addr_t*       actual_src;
    na_tag_t*        actual_tag;
    mona_request_t   req;
1131
1132
};

1133
static void irecv_nc_thread(void* x)
1134
{
Matthieu Dorier's avatar
Matthieu Dorier committed
1135
1136
1137
1138
    struct irecv_nc_args* args   = (struct irecv_nc_args*)x;
    na_return_t           na_ret = mona_recv_nc(
        args->mona, args->count, args->buffers, args->buf_sizes, args->src,
        args->tag, args->actual_size, args->actual_src, args->actual_tag);
1139
1140
1141
1142
    ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
    free(args);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
na_return_t mona_irecv_nc(mona_instance_t  mona,
                          na_size_t        count,
                          void**           buffers,
                          const na_size_t* buf_sizes,
                          na_addr_t        src,
                          na_tag_t         tag,
                          na_size_t*       actual_size,
                          na_addr_t*       actual_src,
                          na_tag_t*        actual_tag,
                          mona_request_t*  req)
1153
{
1154
    ABT_eventual eventual;
Matthieu Dorier's avatar
Matthieu Dorier committed
1155
1156
    int          ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
    if (ret != 0) return NA_NOMEM;
1157

1158
    struct irecv_nc_args* args = (struct irecv_nc_args*)malloc(sizeof(*args));