ptl_recv.c 27.7 KB
Newer Older
1
2
3
4
5
6
7
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2012 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "ptl_impl.h"
8
#include "rptl.h"
9
10
11
12
13
14
15
16
17

#undef FUNCNAME
#define FUNCNAME dequeue_req
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static void dequeue_req(const ptl_event_t *e)
{
    int found;
    MPID_Request *const rreq = e->user_ptr;
18
19
    int s_len, r_len;

20
21
    /* At this point we know the ME is unlinked. Invalidate the handle to
       prevent further accesses, e.g. an attempted cancel. */
22
    REQ_PTL(rreq)->put_me = PTL_INVALID_HANDLE;
23

24
    found = MPIDI_CH3U_Recvq_DP(rreq);
25
26
27
28
    /* an MPI_ANY_SOURCE request may have been previously removed from the
       CH3 queue by an FDP (find and dequeue posted) operation */
    if (rreq->dev.match.parts.rank != MPI_ANY_SOURCE)
        MPIU_Assert(found);
29

30
    rreq->status.MPI_ERROR = MPI_SUCCESS;
31
    rreq->status.MPI_SOURCE = NPTL_MATCH_GET_RANK(e->match_bits);
32
33
34
35
36
37
38
39
40
    rreq->status.MPI_TAG = NPTL_MATCH_GET_TAG(e->match_bits);

    MPID_Datatype_get_size_macro(rreq->dev.datatype, r_len);
    r_len *= rreq->dev.user_count;

    s_len = NPTL_HEADER_GET_LENGTH(e->hdr_data);

    if (s_len > r_len) {
        /* truncated data */
Pavan Balaji's avatar
Pavan Balaji committed
41
        MPIR_STATUS_SET_COUNT(rreq->status, r_len);
42
        MPIU_ERR_SET2(rreq->status.MPI_ERROR, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d", s_len, r_len);
43
44
    } else {
        MPIR_STATUS_SET_COUNT(rreq->status, s_len);
45
    }
46
47
48
49
50
51
52
53
54
55
56
}

#undef FUNCNAME
#define FUNCNAME handler_recv_complete
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_complete(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *const rreq = e->user_ptr;
    int ret;
57
    int i;
58
59
60
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_COMPLETE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_COMPLETE);
61
    
62
    MPIU_Assert(e->type == PTL_EVENT_REPLY || e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
63

64
65
    if (REQ_PTL(rreq)->md != PTL_INVALID_HANDLE) {
        ret = PtlMDRelease(REQ_PTL(rreq)->md);
66
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdrelease", "**ptlmdrelease %s", MPID_nem_ptl_strerror(ret));
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    }

    for (i = 0; i < MPID_NEM_PTL_NUM_CHUNK_BUFFERS; ++i)
        if (REQ_PTL(rreq)->chunk_buffer[i])
            MPIU_Free(REQ_PTL(rreq)->chunk_buffer[i]);
    
    MPIDI_CH3U_Request_complete(rreq);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_COMPLETE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME handler_recv_dequeue_complete
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_dequeue_complete(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *const rreq = e->user_ptr;
90
91
92
93
94
95
    int is_contig;
    MPI_Aint last;
    MPI_Aint dt_true_lb;
    MPIDI_msg_sz_t data_sz;
    MPID_Datatype *dt_ptr ATTRIBUTE((unused));

96
97
98
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
99

100
    MPIU_Assert(e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
101
102

    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, is_contig, data_sz, dt_ptr, dt_true_lb);
103
    
104
    dequeue_req(e);
105
106
107

    if (e->type == PTL_EVENT_PUT_OVERFLOW) {
        /* unpack the data from unexpected buffer */
108
109
        MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "is_contig = %d", is_contig);

110
        if (is_contig) {
111
            MPIU_Memcpy((char *)rreq->dev.user_buf + dt_true_lb, e->start, e->mlength);
112
113
        } else {
            last = e->mlength;
114
            MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, e->start);
115
116
            if (last != e->mlength)
                MPIU_ERR_SET(rreq->status.MPI_ERROR, MPI_ERR_TYPE, "**dtypemismatch");
117
        }
118
119
120
    } else {
        if (!is_contig && data_sz != e->mlength)
            MPIU_ERR_SET(rreq->status.MPI_ERROR, MPI_ERR_TYPE, "**dtypemismatch");
121
122
    }
    
123
124
125
126
127
128
129
130
131
    mpi_errno = handler_recv_complete(e);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

132
#undef FUNCNAME
133
#define FUNCNAME handler_recv_big_get
134
135
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
136
static int handler_recv_big_get(const ptl_event_t *e)
137
138
139
140
141
142
143
144
145
146
147
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *const rreq = e->user_ptr;
    MPI_Aint last;

    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_UNPACK);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_UNPACK);

    MPIU_Assert(e->type == PTL_EVENT_REPLY);

148
149
150
151
152
153
154
155
156
    /* decrement the number of remaining gets */
    REQ_PTL(rreq)->num_gets--;
    if (REQ_PTL(rreq)->num_gets == 0) {
        /* if we used a temporary buffer, unpack the data */
        if (REQ_PTL(rreq)->chunk_buffer[0]) {
            last = rreq->dev.segment_size;
            MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, REQ_PTL(rreq)->chunk_buffer[0]);
            MPIU_Assert(last == rreq->dev.segment_size);
        }
157
        mpi_errno = handler_recv_complete(e);
158
    }
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181

    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_UNPACK);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME big_get
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static void big_get(void *buf, ptl_size_t left_to_get, MPIDI_VC_t *vc, ptl_match_bits_t match_bits, MPID_Request *rreq)
{
    int ret;
    MPID_nem_ptl_vc_area *vc_ptl;
    ptl_size_t start, get_sz;

    vc_ptl = VC_PTL(vc);
    start = (ptl_size_t)buf;

182
183
    /* we need to handle all events */
    REQ_PTL(rreq)->event_handler = handler_recv_big_get;
184
185
186

    while (left_to_get > 0) {
        /* get up to the maximum allowed by the portals interface */
187
        if (left_to_get > MPIDI_nem_ptl_ni_limits.max_msg_size)
188
            get_sz = MPIDI_nem_ptl_ni_limits.max_msg_size;
189
        else
190
            get_sz = left_to_get;
191
192

        ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, start, get_sz, vc_ptl->id, vc_ptl->ptg, match_bits, 0, rreq);
193
194
195
196
197
198
199
        DBG_MSG_GET("global", get_sz, vc->pg_rank, match_bits);
        MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "   buf=%p", (char *)start);
        MPIU_Assert(ret == 0);

        /* account for what has been sent */
        start += get_sz;
        left_to_get -= get_sz;
200
        REQ_PTL(rreq)->num_gets++;
201
202
203
    }
}

204
205
206
207
208
209
210
211
#undef FUNCNAME
#define FUNCNAME handler_recv_unpack_complete
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_unpack_complete(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *const rreq = e->user_ptr;
212
    void *buf;
213
214
    MPI_Aint last;

215
216
217
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_UNPACK_COMPLETE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_UNPACK_COMPLETE);
218
    
219
    MPIU_Assert(e->type == PTL_EVENT_REPLY || e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
220

221
222
223
224
    if (e->type == PTL_EVENT_PUT_OVERFLOW)
        buf = e->start;
    else
        buf = REQ_PTL(rreq)->chunk_buffer[0];
225

226
227
228
    last = rreq->dev.segment_first + e->mlength;
    MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, buf);
    MPIU_Assert(last == rreq->dev.segment_first + e->mlength);
229
    
230
    mpi_errno = handler_recv_complete(e);
231
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_UNPACK_COMPLETE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME handler_recv_dequeue_unpack_complete
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_dequeue_unpack_complete(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_COMPLETE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_COMPLETE);
250
    
251
    MPIU_Assert(e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270

    dequeue_req(e);
    mpi_errno = handler_recv_unpack_complete(e);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_COMPLETE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME handler_recv_dequeue_large
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_dequeue_large(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *const rreq = e->user_ptr;
271
272
    MPIDI_VC_t *vc;
    MPID_nem_ptl_vc_area *vc_ptl;
273
274
275
276
277
278
    int ret;
    int dt_contig;
    MPIDI_msg_sz_t data_sz;
    MPID_Datatype *dt_ptr;
    MPI_Aint dt_true_lb;
    MPI_Aint last;
279
    MPIU_CHKPMEM_DECL(1);
280
281
282
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);
283
    
284
    MPIU_Assert(e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
285

286
287
288
    MPIDI_Comm_get_vc(rreq->comm, NPTL_MATCH_GET_RANK(e->match_bits), &vc);
    vc_ptl = VC_PTL(vc);
    
289
290
    dequeue_req(e);

291
292
293
294
295
    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);

    /* unpack data from unexpected buffer first */
    if (e->type == PTL_EVENT_PUT_OVERFLOW) {
        if (dt_contig) {
296
            MPIU_Memcpy((char *)rreq->dev.user_buf + dt_true_lb, e->start, e->mlength);
297
298
        } else {
            last = e->mlength;
299
            MPID_Segment_unpack(rreq->dev.segment_ptr, 0, &last, e->start);
300
301
302
303
304
            MPIU_Assert(last == e->mlength);
            rreq->dev.segment_first = e->mlength;
        }
    }
    
305
306
307
308
309
310
    if (!(e->hdr_data & NPTL_LARGE)) {
        /* all data has already been received; we're done */
        mpi_errno = handler_recv_complete(e);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        goto fn_exit;
    }
311
        
312
313
    MPIU_Assert (e->mlength == PTL_LARGE_THRESHOLD);

314
315
    /* we need to GET the rest of the data from the sender's buffer */
    if (dt_contig) {
316
317
        big_get((char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD, data_sz - PTL_LARGE_THRESHOLD,
                vc, e->match_bits, rreq);
318
319
320
321
        goto fn_exit;
    }

    /* noncontig recv buffer */
322
    
323
324
325
326
    last = rreq->dev.segment_size;
    rreq->dev.iov_count = MPID_IOV_LIMIT;
    MPID_Segment_pack_vector(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.iov, &rreq->dev.iov_count);

327
    if (last == rreq->dev.segment_size && rreq->dev.segment_size <= MPIDI_nem_ptl_ni_limits.max_msg_size + PTL_LARGE_THRESHOLD) {
328
329
330
331
332
333
        /* Rest of message fits in one IOV */
        ptl_md_t md;

        md.start = rreq->dev.iov;
        md.length = rreq->dev.iov_count;
        md.options = PTL_IOVEC;
334
        md.eq_handle = MPIDI_nem_ptl_origin_eq;
335
336
        md.ct_handle = PTL_CT_NONE;
        ret = PtlMDBind(MPIDI_nem_ptl_ni, &md, &REQ_PTL(rreq)->md);
337
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
338

339
        REQ_PTL(rreq)->event_handler = handler_recv_complete;
340
        ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg,
341
                     e->match_bits, 0, rreq);
342
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
343
344
345
346
347
        goto fn_exit;
    }
        
    /* message won't fit in a single IOV, allocate buffer and unpack when received */
    /* FIXME: For now, allocate a single large buffer to hold entire message */
348
    MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, data_sz - PTL_LARGE_THRESHOLD,
349
                        mpi_errno, "chunk_buffer");
350
    big_get(REQ_PTL(rreq)->chunk_buffer[0], data_sz - PTL_LARGE_THRESHOLD, vc, e->match_bits, rreq);
351
352

 fn_exit:
353
354
    MPIU_CHKPMEM_COMMIT();
 fn_exit2:
355
356
357
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);
    return mpi_errno;
 fn_fail:
358
359
    MPIU_CHKPMEM_REAP();
    goto fn_exit2;
360
361
362
363
364
365
366
367
368
369
}


#undef FUNCNAME
#define FUNCNAME handler_recv_dequeue_unpack_large
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int handler_recv_dequeue_unpack_large(const ptl_event_t *e)
{
    int mpi_errno = MPI_SUCCESS;
370
371
    MPID_Request *const rreq = e->user_ptr;
    MPIDI_VC_t *vc;
372
    MPI_Aint last;
373
374
    void *buf;
    MPIU_CHKPMEM_DECL(1);
375
376
377
    MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_LARGE);

    MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_LARGE);
378
379
380
    MPIU_Assert(e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);

    MPIDI_Comm_get_vc(rreq->comm, NPTL_MATCH_GET_RANK(e->match_bits), &vc);
381
382
383
384
385
386
387
388
389
390

    dequeue_req(e);

    if (!(e->hdr_data & NPTL_LARGE)) {
        /* all data has already been received; we're done */
        mpi_errno = handler_recv_unpack_complete(e);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        goto fn_exit;
    }

391
392
393
394
    if (e->type == PTL_EVENT_PUT_OVERFLOW)
        buf = e->start;
    else
        buf = REQ_PTL(rreq)->chunk_buffer[0];
395
396

    MPIU_Assert(e->mlength == PTL_LARGE_THRESHOLD);
397
398
399
    last = PTL_LARGE_THRESHOLD;
    MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, buf);
    MPIU_Assert(last == PTL_LARGE_THRESHOLD);
400
401
402
    rreq->dev.segment_first += PTL_LARGE_THRESHOLD;
    MPIU_Free(REQ_PTL(rreq)->chunk_buffer[0]);

403
404
405
    MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first,
                        mpi_errno, "chunk_buffer");
    big_get(REQ_PTL(rreq)->chunk_buffer[0], rreq->dev.segment_size - rreq->dev.segment_first, vc, e->match_bits, rreq);
406
407

 fn_exit:
408
409
410
    MPIU_CHKPMEM_COMMIT();
 fn_exit2:
    MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_LARGE);
411
412
    return mpi_errno;
 fn_fail:
413
414
    MPIU_CHKPMEM_REAP();
    goto fn_exit2;
415
416
417
418
419
420
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_recv_posted
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
421
int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request *rreq)
422
423
424
425
426
427
428
429
430
{
    int mpi_errno = MPI_SUCCESS;
    MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
    ptl_me_t me;
    int dt_contig;
    MPIDI_msg_sz_t data_sz;
    MPID_Datatype *dt_ptr;
    MPI_Aint dt_true_lb;
    MPI_Aint last;
431
    ptl_process_t id_any;
432
433
    int ret;
    MPIU_CHKPMEM_DECL(1);
434
435
436
437
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RECV_POSTED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RECV_POSTED);

438
439
440
    id_any.phys.nid = PTL_NID_ANY;
    id_any.phys.pid = PTL_PID_ANY;

441
442
443
444
445
446
    MPID_nem_ptl_init_req(rreq);
    
    me.ct_handle = PTL_CT_NONE;
    me.uid = PTL_UID_ANY;
    me.options = ( PTL_ME_OP_PUT | PTL_ME_IS_ACCESSIBLE | PTL_ME_EVENT_LINK_DISABLE |
                   PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_USE_ONCE );
447
    if (vc == NULL) {
448
449
        /* MPI_ANY_SOURCE receive */
        me.match_id = id_any;
450
451
452
453
454
    } else {
        if (!vc_ptl->id_initialized) {
            mpi_errno = MPID_nem_ptl_init_id(vc);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        }
455
        me.match_id = vc_ptl->id;
456
    }
457
458
459
460

    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "tag=%#x ctx=%#x rank=%#x", rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id, rreq->dev.match.parts.rank));
    me.match_bits = NPTL_MATCH(rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id,
                               rreq->dev.match.parts.rank);
461
462
463
464
465
    if (rreq->dev.match.parts.tag == MPI_ANY_TAG)
        me.ignore_bits = NPTL_MATCH_IGNORE_ANY_TAG;
    else
        me.ignore_bits = NPTL_MATCH_IGNORE;

466
467
468
    me.min_free = 0;

    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);
469
    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "count=%d datatype=%#x contig=%d data_sz=%lu", rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz));
470

471
    if (data_sz <= PTL_LARGE_THRESHOLD) {
472
473
        if (dt_contig) {
            /* small contig message */
474
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Small contig message");
475
            me.start = (char *)rreq->dev.user_buf + dt_true_lb;
476
            me.length = data_sz;
477
            REQ_PTL(rreq)->event_handler = handler_recv_dequeue_complete;
478
479
        } else {
            /* small noncontig */
480
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Small noncontig message");
481
482
            rreq->dev.segment_ptr = MPID_Segment_alloc();
            MPIU_ERR_CHKANDJUMP1(rreq->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
483
484
            MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, rreq->dev.segment_ptr, 0);
            rreq->dev.segment_first = 0;
485
486
487
488
            rreq->dev.segment_size = data_sz;

            last = rreq->dev.segment_size;
            rreq->dev.iov_count = MPID_IOV_LIMIT;
489
            MPID_Segment_pack_vector(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.iov, &rreq->dev.iov_count);
490
491
492

            if (last == rreq->dev.segment_size) {
                /* entire message fits in IOV */
493
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    entire message fits in IOV");
494
495
496
                me.start = rreq->dev.iov;
                me.length = rreq->dev.iov_count;
                me.options |= PTL_IOVEC;
497
                REQ_PTL(rreq)->event_handler = handler_recv_dequeue_complete;
498
499
500
            } else {
                /* IOV is not long enough to describe entire message: recv into
                   buffer and unpack later */
501
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    IOV too long: using bounce buffer");
502
                MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, data_sz, mpi_errno, "chunk_buffer");
503
504
                me.start = REQ_PTL(rreq)->chunk_buffer[0];
                me.length = data_sz;
505
                REQ_PTL(rreq)->event_handler = handler_recv_dequeue_unpack_complete;
506
507
508
509
510
511
            }
        }
    } else {
        /* Large message: Create an ME for the first chunk of data, then do a GET for the rest */
        if (dt_contig) {
            /* large contig message */
512
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Large contig message");
513
            me.start = (char *)rreq->dev.user_buf + dt_true_lb;
514
            me.length = PTL_LARGE_THRESHOLD;
515
            REQ_PTL(rreq)->event_handler = handler_recv_dequeue_large;
516
517
        } else {
            /* large noncontig */
518
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Large noncontig message");
519
520
521
            rreq->dev.segment_ptr = MPID_Segment_alloc();
            MPIU_ERR_CHKANDJUMP1(rreq->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
            MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype, rreq->dev.segment_ptr, 0);
522
            rreq->dev.segment_first = 0;
523
524
525
526
            rreq->dev.segment_size = data_sz;

            last = PTL_LARGE_THRESHOLD;
            rreq->dev.iov_count = MPID_IOV_LIMIT;
527
            MPID_Segment_pack_vector(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.iov, &rreq->dev.iov_count);
528
529
530

            if (last == PTL_LARGE_THRESHOLD) {
                /* first chunk fits in IOV */
531
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    first chunk fits in IOV");
532
533
534
535
                rreq->dev.segment_first = last;
                me.start = rreq->dev.iov;
                me.length = rreq->dev.iov_count;
                me.options |= PTL_IOVEC;
536
                REQ_PTL(rreq)->event_handler = handler_recv_dequeue_large;
537
538
539
            } else {
                /* IOV is not long enough to describe the first chunk: recv into
                   buffer and unpack later */
540
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    IOV too long: using bounce buffer for first chunk");
541
542
                MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, PTL_LARGE_THRESHOLD, mpi_errno, "chunk_buffer");
                me.start = REQ_PTL(rreq)->chunk_buffer[0];
543
                me.length = PTL_LARGE_THRESHOLD;
544
                REQ_PTL(rreq)->event_handler = handler_recv_dequeue_unpack_large;
545
546
547
548
549
            }
        }
        
    }

550
    ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt, &me, PTL_PRIORITY_LIST, rreq, &REQ_PTL(rreq)->put_me);
551
    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
552
    DBG_MSG_MEAPPEND("REG", vc ? vc->pg_rank : MPI_ANY_SOURCE, me, rreq);
553
554
    MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "    buf=%p", me.start);
    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "MPIDI_nem_ptl_pt = %d", MPIDI_nem_ptl_pt);
555
556

 fn_exit:
557
558
    MPIU_CHKPMEM_COMMIT();
 fn_exit2:
559
560
561
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RECV_POSTED);
    return mpi_errno;
 fn_fail:
562
563
    MPIU_CHKPMEM_REAP();
    goto fn_exit2;
564
565
}

566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_anysource_posted
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
void MPID_nem_ptl_anysource_posted(MPID_Request *rreq)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);

    mpi_errno = MPID_nem_ptl_recv_posted(NULL, rreq);

    /* FIXME: This function is void, so we can't return an error.  This function
       cannot return an error because the queue functions (where the posted_recv
       hooks are called) return no error code. */
    MPIU_Assertp(mpi_errno == MPI_SUCCESS);

    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);
}

#undef FUNCNAME
#define FUNCNAME cancel_recv
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int cancel_recv(MPID_Request *rreq, int *cancelled)
{
    int mpi_errno = MPI_SUCCESS;
594
    int ptl_err   = PTL_OK;
595
596
597
598
    MPIDI_STATE_DECL(MPID_STATE_CANCEL_RECV);

    MPIDI_FUNC_ENTER(MPID_STATE_CANCEL_RECV);

599
600
601
602
603
    *cancelled = FALSE;

    /* An invalid handle indicates the operation has been completed
       and the matching list entry unlinked. At that point, the operation
       cannot be cancelled. */
604
605
606
607
608
609
    if (REQ_PTL(rreq)->put_me == PTL_INVALID_HANDLE)
        goto fn_exit;

    ptl_err = PtlMEUnlink(REQ_PTL(rreq)->put_me);
    if (ptl_err == PTL_OK)
        *cancelled = TRUE;
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_CANCEL_RECV);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_anysource_matched
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_anysource_matched(MPID_Request *rreq)
{
625
626
    int mpi_errno, cancelled;

627
628
629
630
631
632
633
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);

    mpi_errno = cancel_recv(rreq, &cancelled);
    /* FIXME: This function is does not return an error because the queue
       functions (where the posted_recv hooks are called) return no error
634
       code. See also comment on cancel_recv. */
635
636
    MPIU_Assertp(mpi_errno == MPI_SUCCESS);

637
 fn_exit:
638
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);
639
    return !cancelled;
640
641
 fn_fail:
    goto fn_exit;
642
643
644
645
}



646
647
648
649
650
651
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_cancel_recv
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_cancel_recv(MPIDI_VC_t *vc,  MPID_Request *rreq)
{
652
653
    int mpi_errno, cancelled;

654
655
656
657
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);

658
659
660
661
662
    mpi_errno = cancel_recv(rreq, &cancelled);
    /* FIXME: This function is does not return an error because the queue
       functions (where the posted_recv hooks are called) return no error
       code. */
    MPIU_Assertp(mpi_errno == MPI_SUCCESS);
663
664
665

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
666
    return !cancelled;
667
668
669
 fn_fail:
    goto fn_exit;
}
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705



#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_lmt_start_recv
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc,  MPID_Request *rreq, MPID_IOV s_cookie)
{
    /* This function should only be called as a result of an Mrecv because of the CH3 protocol for
       Rendezvous Mrecvs. The regular CH3 protocol is not optimal for portals, since we don't need
       to exchange CTS/RTS. We need this code here because at the time of the Mprobe we don't know
       the target buffer, but we dequeue (and lose) the portals entry. This doesn't happen on
       regular large transfers because we handle them directly on the netmod. */
    int mpi_errno = MPI_SUCCESS;
    int dt_contig;
    MPIDI_msg_sz_t data_sz;
    MPID_Datatype *dt_ptr;
    MPI_Aint dt_true_lb;
    ptl_match_bits_t match_bits;
    int was_incomplete;
    int ret;
    MPID_nem_ptl_vc_area *vc_ptl = VC_PTL(vc);
    MPIU_CHKPMEM_DECL(1);

    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_LMT_START_RECV);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_LMT_START_RECV);

    /* This Rendezvous protocol does not do RTS-CTS. Since we have all the data, we limit to get it */
    /* The following code is inspired on handler_recv_dqueue_large */

    match_bits = NPTL_MATCH(rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id,
                            rreq->dev.match.parts.rank);
    MPIDI_CH3U_Request_increment_cc(rreq, &was_incomplete);
    MPIU_Assert(was_incomplete == 0);
706
    MPIR_Request_add_ref(rreq);
707
708
709
710
711
712

    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz, dt_ptr,
                            dt_true_lb);
    if (dt_contig) {
        void * real_user_buf = (char *)rreq->dev.user_buf + dt_true_lb;

713
714
        big_get((char *)real_user_buf + PTL_LARGE_THRESHOLD, data_sz - PTL_LARGE_THRESHOLD, vc, match_bits, rreq);

715
716
717
718
719
720
721
722
723
724
725
        /* The memcpy is done after the get purposely for overlapping */
        MPIU_Memcpy(real_user_buf, rreq->dev.tmpbuf, PTL_LARGE_THRESHOLD);
    }
    else {
        MPI_Aint last;

        rreq->dev.segment_ptr = MPID_Segment_alloc();
        MPIU_ERR_CHKANDJUMP1(rreq->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem",
                             "**nomem %s", "MPID_Segment_alloc");
        MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, rreq->dev.datatype,
                          rreq->dev.segment_ptr, 0);
726
        rreq->dev.segment_first = 0;
727
        rreq->dev.segment_size = data_sz;
728
729
730
731
        last = PTL_LARGE_THRESHOLD;
        MPID_Segment_unpack(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.tmpbuf);
        MPIU_Assert(last == PTL_LARGE_THRESHOLD);
        rreq->dev.segment_first = PTL_LARGE_THRESHOLD;
732
        last = rreq->dev.segment_size;
733
734
735
        rreq->dev.iov_count = MPID_IOV_LIMIT;
        MPID_Segment_pack_vector(rreq->dev.segment_ptr, rreq->dev.segment_first, &last, rreq->dev.iov,
                                 &rreq->dev.iov_count);
736
        if (last == rreq->dev.segment_size && last <= MPIDI_nem_ptl_ni_limits.max_msg_size + PTL_LARGE_THRESHOLD) {
737
738
739
740
741
742
            /* Rest of message fits in one IOV */
            ptl_md_t md;

            md.start = rreq->dev.iov;
            md.length = rreq->dev.iov_count;
            md.options = PTL_IOVEC;
743
            md.eq_handle = MPIDI_nem_ptl_origin_eq;
744
745
746
747
748
749
            md.ct_handle = PTL_CT_NONE;
            ret = PtlMDBind(MPIDI_nem_ptl_ni, &md, &REQ_PTL(rreq)->md);
            MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s",
                                 MPID_nem_ptl_strerror(ret));

            REQ_PTL(rreq)->event_handler = handler_recv_complete;
750
751
            ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first,
                                        vc_ptl->id, vc_ptl->ptg, match_bits, 0, rreq);
752
753
754
755
756
757
            MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
                                 MPID_nem_ptl_strerror(ret));
        }
        else {
            /* message won't fit in a single IOV, allocate buffer and unpack when received */
            /* FIXME: For now, allocate a single large buffer to hold entire message */
758
            MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first,
759
                                mpi_errno, "chunk_buffer");
760
            big_get(REQ_PTL(rreq)->chunk_buffer[0], rreq->dev.segment_size - rreq->dev.segment_first, vc, match_bits, rreq);
761
762
763
764
765
766
767
768
769
770
771
772
773
        }
    }
    MPIU_Free(rreq->dev.tmpbuf);
    rreq->ch.lmt_tmp_cookie.MPID_IOV_LEN = 0;  /* Required for do_cts in mpid_nem_lmt.c */

 fn_exit:
    MPIU_CHKPMEM_COMMIT();
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_LMT_START_RECV);
    return mpi_errno;
 fn_fail:
    MPIU_CHKPMEM_REAP();
    goto fn_exit;
}