ptl_poll.c 8.41 KB
Newer Older
1
2
3
4
5
6
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
 *  (C) 2012 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

7
#include "ptl_impl.h"
8
9

#define NUMBUFS 20
10
#define BUFLEN  (sizeof(MPIDI_CH3_Pkt_t) + PTL_MAX_EAGER)
11

12
13
14
#define OVERFLOW_LENGTH (1024*1024)
#define NUM_OVERFLOW_ME 5

15
16
17
18
static char recvbuf[BUFLEN][NUMBUFS];
static ptl_le_t recvbuf_le[NUMBUFS];
static ptl_handle_le_t recvbuf_le_handle[NUMBUFS];

19
20
21
22
23
static ptl_handle_me_t overflow_me_handle[NUM_OVERFLOW_ME];
static void *overflow_buf[NUM_OVERFLOW_ME];

static int append_overflow(int i);

24
#undef FUNCNAME
25
#define FUNCNAME MPID_nem_ptl_poll_init
26
27
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
28
int MPID_nem_ptl_poll_init(void)
29
30
31
32
{
    int mpi_errno = MPI_SUCCESS;
    int i;
    int ret;
33
34
    ptl_process_t id_any;
    MPIU_CHKPMEM_DECL(NUM_OVERFLOW_ME);
35
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_POLL_INIT);
36

37
    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL_INIT);
38

39
#if 0
40
41
42
    id_any.phys.pid = PTL_PID_ANY;
    id_any.phys.nid = PTL_NID_ANY;
    
43
44
45
46
47
48
49
    for (i = 0; i < NUMBUFS; ++i) {
        recvbuf_le[i].start = recvbuf[i];
        recvbuf_le[i].length = BUFLEN;
        recvbuf_le[i].ct_handle = PTL_CT_NONE;
        recvbuf_le[i].uid = PTL_UID_ANY;
        recvbuf_le[i].options = (PTL_LE_OP_PUT | PTL_LE_USE_ONCE |
                                 PTL_LE_EVENT_UNLINK_DISABLE | PTL_LE_EVENT_LINK_DISABLE);
50
        ret = PtlLEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &recvbuf_le[i], PTL_PRIORITY_LIST, (void *)(uint64_t)i,
51
                          &recvbuf_le_handle[i]);
52
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlleappend", "**ptlleappend %s", MPID_nem_ptl_strerror(ret));
53
    }
54
55
#endif
    
56
57
58
59
60
61
62
    /* create overflow buffers */
    for (i = 0; i < NUM_OVERFLOW_ME; ++i) {
        MPIU_CHKPMEM_MALLOC(overflow_buf[i], void *, OVERFLOW_LENGTH, mpi_errno, "overflow buffer");
        mpi_errno = append_overflow(i);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    
63
 fn_exit:
64
65
    MPIU_CHKPMEM_COMMIT();
 fn_exit2:
66
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_POLL_INIT);
67
68
    return mpi_errno;
 fn_fail:
69
70
    MPIU_CHKPMEM_REAP();
    goto fn_exit2;
71
72
73
74
75
}



#undef FUNCNAME
76
#define FUNCNAME MPID_nem_ptl_poll_finalize
77
78
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
79
int MPID_nem_ptl_poll_finalize(void)
80
81
82
83
{
    int mpi_errno = MPI_SUCCESS;
    int i;
    int ret;
84
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_POLL_FINALIZE);
85

86
    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL_FINALIZE);
87
    
88
89
90
    for (i = 0; i < NUM_OVERFLOW_ME; ++i)
        if (overflow_me_handle[i] != PTL_INVALID_HANDLE) {
            ret = PtlMEUnlink(overflow_me_handle[i]);
91
            MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s", MPID_nem_ptl_strerror(ret));
92
93
        }
    
94
95
    for (i = 0; i < NUMBUFS; ++i) {
        ret = PtlLEUnlink(recvbuf_le_handle[i]);
96
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlleunlink", "**ptlleunlink %s", MPID_nem_ptl_strerror(ret));
97
98
99
    }

 fn_exit:
100
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_POLL_FINALIZE);
101
102
103
104
105
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#undef FUNCNAME
#define FUNCNAME append_overflow
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int append_overflow(int i)
{
    int mpi_errno = MPI_SUCCESS;
    int ret;
    ptl_me_t me;
    ptl_process_t id_any;
    MPIDI_STATE_DECL(MPID_STATE_APPEND_OVERFLOW);

    MPIDI_FUNC_ENTER(MPID_STATE_APPEND_OVERFLOW);

    MPIU_Assert(i >= 0 && i < NUM_OVERFLOW_ME);
    
    id_any.phys.pid = PTL_PID_ANY;
    id_any.phys.nid = PTL_NID_ANY;

    me.start = overflow_buf[i];
    me.length = OVERFLOW_LENGTH;
    me.ct_handle = PTL_CT_NONE;
    me.uid = PTL_UID_ANY;
    me.options = ( PTL_ME_OP_PUT | PTL_ME_MANAGE_LOCAL | PTL_ME_NO_TRUNCATE | PTL_ME_MAY_ALIGN |
                   PTL_ME_IS_ACCESSIBLE | PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
    me.match_id = id_any;
    me.match_bits = 0;
    me.ignore_bits = ~((ptl_match_bits_t)0);
    me.min_free = PTL_MAX_EAGER;
    
    ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt, &me, PTL_OVERFLOW_LIST, (void *)(size_t)i,
                      &overflow_me_handle[i]);
138
    MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
139
140
141
142
143
144
145
146
147

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_APPEND_OVERFLOW);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


148
#undef FUNCNAME
149
#define FUNCNAME MPID_nem_ptl_poll
150
151
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
152
int MPID_nem_ptl_poll(int is_blocking_poll)
153
154
155
156
{
    int mpi_errno = MPI_SUCCESS;
    ptl_event_t event;
    int ret;
157
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_POLL);
158

159
    /* MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL); */
160
161

    while (1) {
162
        ret = PtlEQGet(MPIDI_nem_ptl_eq, &event);
163
164
        if (ret == PTL_EQ_EMPTY)
            break;
165
        MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
166
        MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqget", "**ptleqget %s", MPID_nem_ptl_strerror(ret));
167

168
169
        switch (event.type) {
        case PTL_EVENT_PUT:
170
        case PTL_EVENT_PUT_OVERFLOW:
171
172
173
        case PTL_EVENT_GET:
        case PTL_EVENT_ACK:
        case PTL_EVENT_REPLY:
174
175
176
        case PTL_EVENT_SEARCH: {
            MPID_Request * const req = event.user_ptr;
            mpi_errno = REQ_PTL(req)->event_handler(&event);
177
178
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            break;
179
180
181
182
183
184
185
186
187
188
189
        }
        case PTL_EVENT_AUTO_FREE:
            mpi_errno = append_overflow((size_t)event.user_ptr);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            break;
        case PTL_EVENT_AUTO_UNLINK:
            overflow_me_handle[(size_t)event.user_ptr] = PTL_INVALID_HANDLE;
            break;
        case PTL_EVENT_SEND:
            /* ignore */
            break;
190
        default:
191
            MPIDI_err_printf(FCNAME, "Received unexpected event type: %d", event.type);
192
193
194
195
196
197
198
            MPIU_ERR_INTERNALANDJUMP(mpi_errno, "Unexpected event type");
        }
    }
    
    
        
        
199

200
#if 0 /* used for non-matching message passing */
201
        switch (event.type) {
202
            MPIDI_VC_t *vc;
203
204
205
206
207
208
209
210
211
212
213
214
215
        case PTL_EVENT_PUT:
            if (event.ni_fail_type) {
                /* FIXME: handle comm failures */
                printf("Message received with error (%d) from process %lu\n", event.ni_fail_type, (uint64_t)event.hdr_data);
                assert(0);
            }
            /* FIXME: doesn't handle dynamic connections */
            MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "Put received(size=%lu id=(%#x,%#x) pt=%#x)", event.rlength,
                                                    event.initiator.phys.nid, event.initiator.phys.pid, event.pt_index));
            MPIDI_PG_Get_vc_set_active(MPIDI_Process.my_pg, (uint64_t)event.hdr_data, &vc);
            mpi_errno = MPID_nem_handle_pkt(vc, event.start, event.rlength);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            assert(event.start == recvbuf[(uint64_t)event.user_ptr]);
216
            ret = PtlLEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &recvbuf_le[(uint64_t)event.user_ptr],
217
                              PTL_PRIORITY_LIST, event.user_ptr, &recvbuf_le_handle[(uint64_t)event.user_ptr]);
218
            MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
219
220
221
222
223
224
225
226
            break;
        case PTL_EVENT_SEND:
            if (event.ni_fail_type) {
                /* FIXME: handle comm failures */
                printf("Message send completed with error (%d)\n", event.ni_fail_type);
                assert(0);
            }
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Send completed");
227
            mpi_errno = MPID_nem_ptl_ev_send_handler(&event);
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            break;
        case PTL_EVENT_ACK:
            if (event.ni_fail_type) {
                /* FIXME: handle comm failures */
                printf("ACK received with error (%d) sb=%p\n", event.ni_fail_type, event.user_ptr);
                assert(0);
            }
            MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "ACK received sb=%p", event.user_ptr));
            break;
        default:
            /* FIXME: figure out what other events to expect */
            printf("Got unexpected event %d\n", event.type);
            break;
        }
243
244
245
    }
#endif

246
247

 fn_exit:
248
    /* MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_POLL); */
249
250
251
252
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}