ptl_nm.c 20.6 KB
Newer Older
1
2
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
Antonio J. Pena's avatar
Antonio J. Pena committed
3
 *  (C) 2014 by Argonne National Laboratory.
4
5
6
7
 *      See COPYRIGHT in top-level directory.
 */

#include "ptl_impl.h"
Antonio J. Pena's avatar
Antonio J. Pena committed
8
#include "stddef.h"  /* C99; for offsetof */
9
#include <mpl_utlist.h>
10
#include "rptl.h"
11

Antonio J. Pena's avatar
Antonio J. Pena committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#define NUM_RECV_BUFS 50
#define CTL_TAG 0
#define PAYLOAD_SIZE  (PTL_MAX_EAGER - offsetof(buf_t, packet) - sizeof(MPIDI_CH3_Pkt_t))
#define SENDBUF_SIZE(sent_sz_) (offsetof(buf_t, packet) + sizeof(MPIDI_CH3_Pkt_t) + (sent_sz_))
#define SENDBUF(req_) REQ_PTL(req_)->chunk_buffer[0]
#define TMPBUF(req_) REQ_PTL(req_)->chunk_buffer[1]
#define NEW_TAG(tag_) do {     \
    global_tag += 2;           \
    if (global_tag == CTL_TAG) \
        global_tag += 2;       \
    (tag_) = global_tag;       \
} while(0)
#define GET_TAG(tag_)  (((tag_) >> 1) << 1)
#define DONE_TAG(tag_) ((tag_) | 0x1)

typedef struct {
    size_t remaining;
    ptl_match_bits_t tag;
    char packet[PTL_MAX_EAGER];
} buf_t;

static buf_t recvbufs[NUM_RECV_BUFS];
static ptl_me_t mes[NUM_RECV_BUFS];
static ptl_handle_me_t me_handles[NUM_RECV_BUFS];
static unsigned long long put_cnt = 0;  /* required to not finalizing too early */
static MPID_Request *done_req;
static ptl_match_bits_t global_tag = 0;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59


#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_init
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_nm_init(void)
{
    int mpi_errno = MPI_SUCCESS;
    int i;
    int ret;
    ptl_process_t id_any;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_INIT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_INIT);

    /* init recv */
    id_any.phys.pid = PTL_PID_ANY;
    id_any.phys.nid = PTL_NID_ANY;
    
    for (i = 0; i < NUM_RECV_BUFS; ++i) {
Antonio J. Pena's avatar
Antonio J. Pena committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
        mes[i].start = &recvbufs[i];
        mes[i].length = sizeof(buf_t);
        mes[i].ct_handle = PTL_CT_NONE;
        mes[i].uid = PTL_UID_ANY;
        mes[i].options = (PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE |
                         PTL_ME_EVENT_LINK_DISABLE | PTL_ME_IS_ACCESSIBLE);
        mes[i].match_id = id_any;
        mes[i].match_bits = CTL_TAG;
        mes[i].ignore_bits = 0;

        ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &mes[i],
                          PTL_PRIORITY_LIST, (void *)(uint64_t)i, &me_handles[i]);
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
                             MPID_nem_ptl_strerror(ret));
74
75
    }

Antonio J. Pena's avatar
Antonio J. Pena committed
76
77
78
79
80
81
    done_req = MPID_Request_create();
    MPIU_Assert(done_req != NULL);
    done_req->dev.OnDataAvail = NULL;
    SENDBUF(done_req) = NULL;
    REQ_PTL(done_req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_INIT);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_finalize
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_nm_finalize(void)
{
    int mpi_errno = MPI_SUCCESS;
    int ret;
    int i;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);

Antonio J. Pena's avatar
Antonio J. Pena committed
102
103
    while (put_cnt) MPID_nem_ptl_poll(1);  /* Wait for puts to finish */

104
    for (i = 0; i < NUM_RECV_BUFS; ++i) {
Antonio J. Pena's avatar
Antonio J. Pena committed
105
106
107
        ret = PtlMEUnlink(me_handles[i]);
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s",
                             MPID_nem_ptl_strerror(ret));
108
109
    }

Antonio J. Pena's avatar
Antonio J. Pena committed
110
111
    MPIDI_CH3_Request_destroy(done_req);

112
113
114
115
116
117
118
119
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
Antonio J. Pena's avatar
Antonio J. Pena committed
120
#define FUNCNAME meappend_done
121
122
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
123
static inline int meappend_done(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag)
124
125
{
    int mpi_errno = MPI_SUCCESS;
Antonio J. Pena's avatar
Antonio J. Pena committed
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    int ret;
    ptl_me_t me;
    ptl_handle_me_t me_handle;
    MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_DONE);

    MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_DONE);

    me.start = NULL;
    me.length = 0;
    me.ct_handle = PTL_CT_NONE;
    me.uid = PTL_UID_ANY;
    me.options = ( PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
                   PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
    me.match_id = id;
    me.match_bits = DONE_TAG(tag);
    me.ignore_bits = 0;
    me.min_free = 0;
    ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
                      &me_handle);
    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, DONE_TAG(tag)));
    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
                         MPID_nem_ptl_strerror(ret));
    ++put_cnt;
149
150

 fn_exit:
Antonio J. Pena's avatar
Antonio J. Pena committed
151
    MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_DONE);
152
153
154
155
156
157
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
Antonio J. Pena's avatar
Antonio J. Pena committed
158
#define FUNCNAME meappend_large
159
160
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
161
static inline int meappend_large(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag, void *buf, size_t remaining)
162
{
Antonio J. Pena's avatar
Antonio J. Pena committed
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
    int mpi_errno = MPI_SUCCESS;
    int ret;
    ptl_me_t me;
    MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_LARGE);

    MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_LARGE);

    me.start = buf;
    me.length = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
                    remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
    me.ct_handle = PTL_CT_NONE;
    me.uid = PTL_UID_ANY;
    me.options = ( PTL_ME_OP_GET | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
                   PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
    me.match_id = id;
    me.match_bits = GET_TAG(tag);
    me.ignore_bits = 0;
    me.min_free = 0;

    while (remaining) {
        int incomplete;
        ptl_handle_me_t foo_me_handle;

        MPIDI_CH3U_Request_increment_cc(req, &incomplete);  /* Cannot avoid GET events from poll infrastructure */

        ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
                          &foo_me_handle);
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
                             MPID_nem_ptl_strerror(ret));
        MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, GET_TAG(tag)));

        me.start = (char *)me.start + me.length;
        remaining -= me.length;
        if (remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
            me.length = remaining;
198
    }
Pavan Balaji's avatar
Pavan Balaji committed
199

Antonio J. Pena's avatar
Antonio J. Pena committed
200
201
202
203
204
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_LARGE);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
205
206
207
208
209
210
}

#undef FUNCNAME
#define FUNCNAME send_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
211
212
static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_sz_t data_sz,
                           MPID_Request *sreq)
213
214
215
216
{
    int mpi_errno = MPI_SUCCESS;
    MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
    int ret;
Antonio J. Pena's avatar
Antonio J. Pena committed
217
218
219
    buf_t *sendbuf;
    const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
    const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
220
221
222
223
    MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);

    MPIDI_FUNC_ENTER(MPID_STATE_SEND_PKT);
    
Antonio J. Pena's avatar
Antonio J. Pena committed
224
225
226
227
228
229
    sendbuf = MPIU_Malloc(sendbuf_sz);
    MPIU_Assert(sendbuf != NULL);
    MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
    sendbuf->remaining = data_sz - sent_sz;
    NEW_TAG(sendbuf->tag);
    TMPBUF(sreq) = NULL;
230

Antonio J. Pena's avatar
Antonio J. Pena committed
231
232
233
234
235
236
    if (data_sz) {
        MPIU_Memcpy(sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t), data_p, sent_sz);
        if (sendbuf->remaining)  /* Post MEs for the remote gets */
            mpi_errno = meappend_large(vc_ptl->id, sreq, sendbuf->tag, (char *)data_p + sent_sz, sendbuf->remaining);
            if (mpi_errno)
                goto fn_fail;
237
238
    }

Antonio J. Pena's avatar
Antonio J. Pena committed
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
    SENDBUF(sreq) = sendbuf;
    REQ_PTL(sreq)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;

    /* Post ME for the DONE message */
    mpi_errno = meappend_done(vc_ptl->id, sreq, sendbuf->tag);
    if (mpi_errno)
        goto fn_fail;

    ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
                                vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
                         MPID_nem_ptl_strerror(ret));
    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
                                            sendbuf_sz, vc_ptl->id.phys.nid,
                                            vc_ptl->id.phys.pid, vc_ptl->ptc));
254
255
256
257
258
259
260
261
262
263
264
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_SEND_PKT);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME send_noncontig_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
265
static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
266
267
268
269
{
    int mpi_errno = MPI_SUCCESS;
    MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
    int ret;
Antonio J. Pena's avatar
Antonio J. Pena committed
270
271
272
    buf_t *sendbuf;
    const size_t sent_sz = sreq->dev.segment_size < PAYLOAD_SIZE ? sreq->dev.segment_size : PAYLOAD_SIZE;
    size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
273
274
275
    MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
    MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT);

Antonio J. Pena's avatar
Antonio J. Pena committed
276
    MPIU_Assert(sreq->dev.segment_first == 0);
277

Antonio J. Pena's avatar
Antonio J. Pena committed
278
279
280
281
282
283
    sendbuf = MPIU_Malloc(sendbuf_sz);
    MPIU_Assert(sendbuf != NULL);
    MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
    sendbuf->remaining = sreq->dev.segment_size - sent_sz;
    NEW_TAG(sendbuf->tag);
    TMPBUF(sreq) = NULL;
284

Antonio J. Pena's avatar
Antonio J. Pena committed
285
286
287
    if (sreq->dev.segment_size) {
        MPIDI_msg_sz_t last = sent_sz;
        MPID_Segment_pack(sreq->dev.segment_ptr, 0, &last, sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t));
288

Antonio J. Pena's avatar
Antonio J. Pena committed
289
290
291
        if (sendbuf->remaining) {  /* Post MEs for the remote gets */
            TMPBUF(sreq) = MPIU_Malloc(sendbuf->remaining);
            sreq->dev.segment_first = last;
292
            last = sreq->dev.segment_size;
Antonio J. Pena's avatar
Antonio J. Pena committed
293
294
            MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, TMPBUF(sreq));
            MPIU_Assert(last == sreq->dev.segment_size);
295

Antonio J. Pena's avatar
Antonio J. Pena committed
296
297
298
            mpi_errno = meappend_large(vc_ptl->id, sreq, sendbuf->tag, TMPBUF(sreq), sendbuf->remaining);
            if (mpi_errno)
                goto fn_fail;
299
300
301
        }
    }

Antonio J. Pena's avatar
Antonio J. Pena committed
302
303
    SENDBUF(sreq) = sendbuf;
    REQ_PTL(sreq)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
304

Antonio J. Pena's avatar
Antonio J. Pena committed
305
306
307
308
    /* Post ME for the DONE message */
    mpi_errno = meappend_done(vc_ptl->id, sreq, sendbuf->tag);
    if (mpi_errno)
        goto fn_fail;
309

Antonio J. Pena's avatar
Antonio J. Pena committed
310
311
312
313
314
315
316
    ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
                                vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
                         MPID_nem_ptl_strerror(ret));
    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
                                            sendbuf_sz, vc_ptl->id.phys.nid,
                                            vc_ptl->id.phys.pid, vc_ptl->ptc));
317
318

 fn_exit:
Antonio J. Pena's avatar
Antonio J. Pena committed
319
    MPIDI_FUNC_EXIT(MPID_STATE_SEND_NONCONTIG_PKT);
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_SendNoncontig
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);
    
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
Antonio J. Pena's avatar
Antonio J. Pena committed
338
    mpi_errno = send_noncontig_pkt(vc, sreq, hdr);
339
340
341
342
343
344
345
346
347
348
349
350
351
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_iStartContigMsg
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
352
353
int MPID_nem_ptl_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data,
                                 MPIDI_msg_sz_t data_sz, MPID_Request **sreq_ptr)
354
355
356
357
358
359
360
361
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));

    /* create a request */
Antonio J. Pena's avatar
Antonio J. Pena committed
362
363
364
365
366
367
368
369
    *sreq_ptr = MPID_Request_create();
    MPIU_Assert(*sreq_ptr != NULL);
    MPIU_Object_set_ref(*sreq_ptr, 2);
    (*sreq_ptr)->kind = MPID_REQUEST_SEND;
    (*sreq_ptr)->dev.OnDataAvail = NULL;
    (*sreq_ptr)->dev.user_buf = NULL;

    mpi_errno = send_pkt(vc, hdr, data, data_sz, *sreq_ptr);
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_iSendContig
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz,
                               void *data, MPIDI_msg_sz_t data_sz)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ISENDCONTIG);

    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ISENDCONTIG);
    MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
    
Antonio J. Pena's avatar
Antonio J. Pena committed
392
    mpi_errno = send_pkt(vc, hdr, data, data_sz, sreq);
393
394
395
396
397
398
399
400
401
402
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ISENDCONTIG);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
Antonio J. Pena's avatar
Antonio J. Pena committed
403
#define FUNCNAME on_data_avail
404
405
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
406
static inline void on_data_avail(MPID_Request * req)
407
{
Antonio J. Pena's avatar
Antonio J. Pena committed
408
409
    MPIDI_STATE_DECL(MPID_STATE_ON_DATA_AVAIL);
    MPIDI_FUNC_ENTER(MPID_STATE_ON_DATA_AVAIL);
410

Antonio J. Pena's avatar
Antonio J. Pena committed
411
412
413
414
    int (*reqFn) (MPIDI_VC_t *, MPID_Request *, int *);
    reqFn = req->dev.OnDataAvail;
    if (!reqFn) {
        MPIDI_CH3U_Request_complete(req);
415
416
        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
    }
Antonio J. Pena's avatar
Antonio J. Pena committed
417
418
419
420
421
    else {
        int complete;
        MPIDI_VC_t *vc = req->ch.vc;
        reqFn(vc, req, &complete);
        MPIU_Assert(complete == TRUE);
422
    }
Antonio J. Pena's avatar
Antonio J. Pena committed
423
    MPIDI_FUNC_EXIT(MPID_STATE_ON_DATA_AVAIL);
424
425
426
}

#undef FUNCNAME
Antonio J. Pena's avatar
Antonio J. Pena committed
427
#define FUNCNAME MPID_nem_ptl_nm_ctl_event_handler
428
429
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
Antonio J. Pena's avatar
Antonio J. Pena committed
430
int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
431
432
{
    int mpi_errno = MPI_SUCCESS;
Antonio J. Pena's avatar
Antonio J. Pena committed
433
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);
434

Antonio J. Pena's avatar
Antonio J. Pena committed
435
436
437
    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);

    switch(e->type) {
438
439

    case PTL_EVENT_PUT:
Antonio J. Pena's avatar
Antonio J. Pena committed
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
        if (e->match_bits != CTL_TAG) {
            MPIU_Free(SENDBUF((MPID_Request *)e->user_ptr));
            MPIU_Free(TMPBUF((MPID_Request *)e->user_ptr));
            on_data_avail((MPID_Request *)e->user_ptr);
            --put_cnt;
        }
        else {
            int ret;
            const uint64_t buf_idx = (uint64_t) e->user_ptr;
            const size_t packet_sz = e->mlength - offsetof(buf_t, packet);
            MPIDI_VC_t *vc;
            MPID_nem_ptl_vc_area * vc_ptl;

            MPIU_Assert(e->start == &recvbufs[buf_idx]);

            MPIDI_PG_Get_vc(MPIDI_Process.my_pg, (uint64_t)e->hdr_data, &vc);
            vc_ptl = VC_PTL(vc);

            if (recvbufs[buf_idx].remaining == 0) {
                mpi_errno = MPID_nem_handle_pkt(vc, recvbufs[buf_idx].packet, packet_sz);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                /* Notify we're done */
                ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
                                            DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
                MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
                                     MPID_nem_ptl_strerror(ret));
                MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
                                                        "PtlPut(size=0 id=(%#x,%#x) pt=%#x tag=%#lx)",
                                                        vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
                                                        vc_ptl->ptc, DONE_TAG(recvbufs[buf_idx].tag)));
            }
            else {
                int incomplete;
                size_t size;
                char *buf_ptr;

                MPID_Request *req = MPID_Request_create();
                MPIU_Assert(req != NULL);
                MPIDI_CH3U_Request_decrement_cc(req, &incomplete);  /* We'll increment it below */
                REQ_PTL(req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
                REQ_PTL(req)->bytes_put = packet_sz + recvbufs[buf_idx].remaining;
                TMPBUF(req) = MPIU_Malloc(REQ_PTL(req)->bytes_put);
                MPIU_Assert(TMPBUF(req) != NULL);
                MPIU_Memcpy(TMPBUF(req), recvbufs[buf_idx].packet, packet_sz);

                req->ch.vc = vc;

                req->dev.match.parts.tag = recvbufs[buf_idx].tag;

                size = recvbufs[buf_idx].remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
                           recvbufs[buf_idx].remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
                buf_ptr = (char *)TMPBUF(req) + packet_sz;
                while (recvbufs[buf_idx].remaining) {
                    MPIDI_CH3U_Request_increment_cc(req, &incomplete);  /* Will be decremented - and eventually freed in REPLY */
                    ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)buf_ptr,
                                                size, vc_ptl->id, vc_ptl->ptc, GET_TAG(recvbufs[buf_idx].tag), 0, req);
                    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
                                         MPID_nem_ptl_strerror(ret));
                    MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
                                                            "PtlGet(size=%lu id=(%#x,%#x) pt=%#x tag=%#lx)", size,
                                                            vc_ptl->id.phys.nid,
                                                            vc_ptl->id.phys.pid, vc_ptl->ptc, GET_TAG(recvbufs[buf_idx].tag)));
                    buf_ptr += size;
                    recvbufs[buf_idx].remaining -= size;
                    if (recvbufs[buf_idx].remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
                        size = recvbufs[buf_idx].remaining;
                }
            }

            /* Repost the recv buffer */
            ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &mes[buf_idx],
                              PTL_PRIORITY_LIST, e->user_ptr /* buf_idx */, &me_handles[buf_idx]);
            MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend",
                                 "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
        }
516
        break;
Antonio J. Pena's avatar
Antonio J. Pena committed
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546

    case PTL_EVENT_REPLY:
        {
            int incomplete;
            MPID_Request *const rreq = e->user_ptr;

            MPIDI_CH3U_Request_decrement_cc(rreq, &incomplete);
            if (!incomplete) {
                int ret;
                MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(rreq->ch.vc);

                mpi_errno = MPID_nem_handle_pkt(rreq->ch.vc, TMPBUF(rreq), REQ_PTL(rreq)->bytes_put);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);

                /* Notify we're done */
                ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
                                            DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
                MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
                                     MPID_nem_ptl_strerror(ret));
                MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
                                                        "PtlPut(size=0 id=(%#x,%#x) pt=%#x tag=%#lx)",
                                                        vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
                                                        vc_ptl->ptc, DONE_TAG((ptl_match_bits_t)SENDBUF(rreq))));

                /* Free resources */
                MPIU_Free(TMPBUF(rreq));
                MPID_Request_release(rreq);
            }
        }
547
        break;
Antonio J. Pena's avatar
Antonio J. Pena committed
548
549
550

    case PTL_EVENT_GET:
        MPIDI_CH3U_Request_complete((MPID_Request *)e->user_ptr);
551
        break;
Antonio J. Pena's avatar
Antonio J. Pena committed
552

553
554
555
556
557
558
559
    default:
        MPIU_Error_printf("Received unexpected event type: %d %s", e->type, MPID_nem_ptl_strevent(e));
        MPIU_ERR_INTERNALANDJUMP(mpi_errno, "Unexpected event type");
        break;
    }

 fn_exit:
Antonio J. Pena's avatar
Antonio J. Pena committed
560
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);
561
562
563
564
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}