ch3u_rma_pkthandler.c 79.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#include "mpidrma.h"

Xin Zhao's avatar
Xin Zhao committed
10
11
12
13
14
15
16
17
18
19
20
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_put);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_acc);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_accum);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_cas);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_fop);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_accum_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_cas_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_fop_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_lock);
21
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_lock_ack);
Xin Zhao's avatar
Xin Zhao committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_unlock);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_flush);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_flush_ack);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_decr_at_cnt);

void MPIDI_CH3_RMA_Init_pkthandler_pvars(void)
{
    /* rma_rmapkt_put */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_put,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Put (in seconds)");

    /* rma_rmapkt_get */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get (in seconds)");

    /* rma_rmapkt_acc */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_acc,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Accumulate (in seconds)");

    /* rma_rmapkt_get_accum */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_accum,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get-Accumulate (in seconds)");

    /* rma_rmapkt_cas */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_cas,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Compare-and-swap (in seconds)");

    /* rma_rmapkt_fop */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_fop,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Fetch-and-op (in seconds)");

    /* rma_rmapkt_get_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get response (in seconds)");

    /* rma_rmapkt_get_accum_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_accum_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
99
100
                                      "RMA",
                                      "RMA:PKTHANDLER for Get-Accumulate response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
101
102
103
104
105
106
107
108

    /* rma_rmapkt_cas_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_cas_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
109
110
                                      "RMA",
                                      "RMA:PKTHANDLER for Compare-and-Swap response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
111
112
113
114
115
116
117
118

    /* rma_rmapkt_fop_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_fop_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
119
120
                                      "RMA",
                                      "RMA:PKTHANDLER for Fetch-and-op response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
121
122
123
124
125
126
127
128
129
130
131
132
133

    /* rma_rmapkt_lock */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_lock,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Lock (in seconds)");

    /* rma_rmapkt_lock_granted */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
134
                                      rma_rmapkt_lock_ack,
Xin Zhao's avatar
Xin Zhao committed
135
136
137
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
138
                                      "RMA", "RMA:PKTHANDLER for Lock-Ack (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

    /* rma_rmapkt_unlock */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_unlock,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Unlock (in seconds)");

    /* rma_rmapkt_flush */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_flush,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Flush (in seconds)");

    /* rma_rmapkt_flush_ack */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_flush_ack,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Flush-Ack (in seconds)");

    /* rma_rmapkt_decr_at_cnt */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_decr_at_cnt,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Decr-At-Cnt (in seconds)");
}
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

/* ------------------------------------------------------------------------ */
/*
 * The following routines are the packet handlers for the packet types
 * used above in the implementation of the RMA operations in terms
 * of messages.
 */
/* ------------------------------------------------------------------------ */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Put
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_put_t *put_pkt = &pkt->put;
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
197
    int acquire_lock_fail = 0;
198
199
200
201
202
203
204
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received put pkt");

Xin Zhao's avatar
Xin Zhao committed
205
206
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_put);

207
208
209
    MPIU_Assert(put_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);

210
211
212
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
213
214

    if (acquire_lock_fail) {
215
        (*rreqp) = req;
216
217
218
        goto fn_exit;
    }

219
    if (pkt->type == MPIDI_CH3_PKT_PUT_IMMED) {
220
        MPI_Aint type_size;
221
222
223
224

        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype));

225
226
        MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);

227
        /* copy data from packet header to target buffer */
228
        MPIU_Memcpy(put_pkt->addr, put_pkt->info.data, put_pkt->count * type_size);
229
230

        /* trigger final action */
231
        mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
232
                                        put_pkt->flags, put_pkt->source_win_handle);
233
234
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
235
236
237
238
239

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
240
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_PUT);
241

242
243
244
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
245

246
247
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
248

249
250
251
252
253
254
        req->dev.user_buf = put_pkt->addr;
        req->dev.user_count = put_pkt->count;
        req->dev.target_win_handle = put_pkt->target_win_handle;
        req->dev.source_win_handle = put_pkt->source_win_handle;
        req->dev.flags = put_pkt->flags;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutRecvComplete;
255

256
        if (MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype)) {
257
258
            MPI_Aint type_size;

259
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RECV);
260
            req->dev.datatype = put_pkt->datatype;
261

262
263
            MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);

264
265
            req->dev.recv_data_sz = type_size * put_pkt->count;
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
266

267
268
269
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
270

271
272
            /* return the number of bytes processed in this function */
            *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;
273
274

            if (complete) {
275
276
277
278
279
280
281
                mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
282
283
            }
        }
284
285
        else {
            /* derived datatype */
286
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RECV_DERIVED_DT);
287
288
289
290
291
292
293
294
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
295

296
            req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
297
298
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
299
                                     put_pkt->info.dataloop_size);
300
            }
301

302
303
304
            /* if we received all of the dtype_info and dataloop, copy it
             * now and call the handler, otherwise set the iov and let the
             * channel copy it */
305
            if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
306
307
308
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
309
                            put_pkt->info.dataloop_size);
310
311
312

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
313
                    put_pkt->info.dataloop_size;
314
315
316
317
318
319
320
321
322
323
324
325
326
327

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
328
                req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
329
                req->dev.iov_count = 2;
330

331
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
332

333
                req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete;
334
335
336
337
            }

        }

338
        *rreqp = req;
339

340
341
342
343
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_ERR_SET1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                          "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
        }
344
    }
345
346
347


  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
348
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_put);
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Get
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_get_t *get_pkt = &pkt->get;
    MPID_Request *req = NULL;
    MPID_IOV iov[MPID_IOV_LIMIT];
365
    int complete = 0;
366
367
368
369
370
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
    int mpi_errno = MPI_SUCCESS;
    MPI_Aint type_size;
371
    int acquire_lock_fail = 0;
372
373
374
375
376
377
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received get pkt");

Xin Zhao's avatar
Xin Zhao committed
378
379
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_get);

380
381
382
    MPIU_Assert(get_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(get_pkt->target_win_handle, win_ptr);

383
384
385
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
386
387

    if (acquire_lock_fail) {
388
        (*rreqp) = req;
389
390
391
        goto fn_exit;
    }

392
393
394
395
    req = MPID_Request_create();
    req->dev.target_win_handle = get_pkt->target_win_handle;
    req->dev.flags = get_pkt->flags;

396
397
398
399
    /* get start location of data and length of data */
    data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
    data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);

400
    /* here we increment the Active Target counter to guarantee the GET-like
401
     * operation are completed when counter reaches zero. */
402
403
    win_ptr->at_completion_counter++;

404
405
406
407
    if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype));
    }

408
409
410
411
    if (MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype)) {
        /* basic datatype. send the data. */
        MPIDI_CH3_Pkt_t upkt;
        MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
412
413
        size_t len;
        int iovcnt;
414
        int is_contig;
415
416

        MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RESP);
417
418
        req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
419
420
        req->kind = MPID_REQUEST_SEND;

421
422
423
424
425
426
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
            MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP_IMMED);
        }
        else {
            MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
        }
427
        get_resp_pkt->request_handle = get_pkt->request_handle;
428
        get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
429
430
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
431
            get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
432
433
        if ((get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
            (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
434
435
            get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
        get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
436

437
        /* length of target data */
438
        MPID_Datatype_get_size_macro(get_pkt->datatype, type_size);
439

440
441
        MPID_Datatype_is_contig(get_pkt->datatype, &is_contig);

442
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
443
            MPIU_Assign_trunc(len, get_pkt->count * type_size, size_t);
444
            void *src = (void *) (get_pkt->addr), *dest = (void *) (get_resp_pkt->info.data);
445
            mpi_errno = immed_copy(src, dest, len);
446
447
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
448
449
450
451

            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
            iovcnt = 1;
452
453
454
455
456
457
458
459
460
461
462

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            /* --BEGIN ERROR HANDLING-- */
            if (mpi_errno != MPI_SUCCESS) {
                MPIU_Object_set_ref(req, 0);
                MPIDI_CH3_Request_destroy(req);
                MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
            }
            /* --END ERROR HANDLING-- */
463
        }
464
        else if (is_contig) {
465
466
            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
467
            iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) get_pkt->addr);
468
            iov[1].MPID_IOV_LEN = get_pkt->count * type_size;
469
            iovcnt = 2;
470
471
472
473
474
475
476
477
478
479
480

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            /* --BEGIN ERROR HANDLING-- */
            if (mpi_errno != MPI_SUCCESS) {
                MPIU_Object_set_ref(req, 0);
                MPIDI_CH3_Request_destroy(req);
                MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
            }
            /* --END ERROR HANDLING-- */
481
        }
482
483
484
        else {
            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
485

486
487
488
489
490
491
492
493
494
495
496
497
498
            req->dev.segment_ptr = MPID_Segment_alloc();
            MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
                                 MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");

            MPID_Segment_init(get_pkt->addr, get_pkt->count,
                              get_pkt->datatype, req->dev.segment_ptr, 0);
            req->dev.segment_first = 0;
            req->dev.segment_size = get_pkt->count * type_size;

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
499
500
501
502
503
504
505
506
        }

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
        /* derived datatype. first get the dtype_info and dataloop. */

507
        MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RECV_DERIVED_DT);
508
        req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete;
509
510
511
512
513
514
515
516
517
518
519
520
521
        req->dev.OnFinal = 0;
        req->dev.user_buf = get_pkt->addr;
        req->dev.user_count = get_pkt->count;
        req->dev.datatype = MPI_DATATYPE_NULL;
        req->dev.request_handle = get_pkt->request_handle;

        req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
            MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
        if (!req->dev.dtype_info) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                 "MPIDI_RMA_dtype_info");
        }

522
        req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
523
524
        if (!req->dev.dataloop) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
525
                                 get_pkt->info.dataloop_size);
526
527
528
529
530
        }

        /* if we received all of the dtype_info and dataloop, copy it
         * now and call the handler, otherwise set the iov and let the
         * channel copy it */
531
        if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
532
533
534
            /* copy all of dtype_info and dataloop */
            MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
            MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
535
                        get_pkt->info.dataloop_size);
536
537

            *buflen =
538
                sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
539
                get_pkt->info.dataloop_size;
540
541

            /* All dtype data has been received, call req handler */
542
            mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
543
544
545
546
547
548
549
550
551
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_GET");
            if (complete)
                *rreqp = NULL;
        }
        else {
            req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
            req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
            req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
552
            req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
553
554
555
556
557
558
559
            req->dev.iov_count = 2;

            *buflen = sizeof(MPIDI_CH3_Pkt_t);
            *rreqp = req;
        }

    }
560
  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
561
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_get);
562
563
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);
    return mpi_errno;
564
565
  fn_fail:
    goto fn_exit;
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Accumulate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                                    MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
581
    int acquire_lock_fail = 0;
582
583
584
585
586
587
588
589
    int mpi_errno = MPI_SUCCESS;
    MPI_Aint type_size;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received accumulate pkt");

Xin Zhao's avatar
Xin Zhao committed
590
591
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_acc);

592
593
594
    MPIU_Assert(accum_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(accum_pkt->target_win_handle, win_ptr);

595
596
597
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
598
599

    if (acquire_lock_fail) {
600
        (*rreqp) = req;
601
602
603
        goto fn_exit;
    }

604
605
606
607
608
609
    if (pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype));

        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
610
611
612
        mpi_errno = do_accumulate_op(accum_pkt->info.data, accum_pkt->count, accum_pkt->datatype,
                                     accum_pkt->addr, accum_pkt->count, accum_pkt->datatype,
                                     0, accum_pkt->op);
613
614
615
616
617
618
619
        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
        if (mpi_errno) {
            MPIU_ERR_POP(mpi_errno);
        }

        /* trigger final action */
620
        mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
621
                                        accum_pkt->flags, accum_pkt->source_win_handle);
622
623
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
624
625
626
627
628

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
629
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
630

631
632
633
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
        *rreqp = req;
634

635
636
637
638
639
640
        req->dev.user_count = accum_pkt->count;
        req->dev.op = accum_pkt->op;
        req->dev.real_user_buf = accum_pkt->addr;
        req->dev.target_win_handle = accum_pkt->target_win_handle;
        req->dev.source_win_handle = accum_pkt->source_win_handle;
        req->dev.flags = accum_pkt->flags;
641

642
643
        req->dev.resp_request_handle = MPI_REQUEST_NULL;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
644

645
646
647
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
648

649
        if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
650
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
651
            req->dev.datatype = accum_pkt->datatype;
652

653
654
655
656
657
658
659
660
661
662
663
            MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
            /* allocate a SRBuf for receiving stream unit */
            MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
            /* --BEGIN ERROR HANDLING-- */
            if (req->dev.tmpbuf_sz == 0) {
                MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
                mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
                                                 FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
                                                 "**nomem %d", MPIDI_CH3U_SRBuf_size);
                req->status.MPI_ERROR = mpi_errno;
                goto fn_fail;
664
            }
665
            /* --END ERROR HANDLING-- */
666

667
            req->dev.user_buf = req->dev.tmpbuf;
668

669
            MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
670

671
            req->dev.recv_data_sz = type_size * accum_pkt->count;
672
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
673

674
675
676
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
677

678
679
            /* return the number of bytes processed in this function */
            *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
680
681

            if (complete) {
682
683
684
685
686
687
688
                mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
689
690
            }
        }
691
        else {
692
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT);
693
694
695
696
697
698
699
700
701
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
702

703
            req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
704
705
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
706
                                     accum_pkt->info.dataloop_size);
707
            }
708

709
710
            if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
                sizeof(req->dev.stream_offset)) {
711
712
713
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
714
715
716
717
                            accum_pkt->info.dataloop_size);
                MPIU_Memcpy(&(req->dev.stream_offset),
                            data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
                            sizeof(req->dev.stream_offset));
718
719
720

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
721
                    accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
722
723
724
725
726
727
728
729
730
731
732
733
734
735

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
736
737
738
739
                req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
                req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
                req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
                req->dev.iov_count = 3;
740
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
741
742
            }

743
        }
744
    }
745
746
747
748
749
750
751

    if (mpi_errno != MPI_SUCCESS) {
        MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                             "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
    }

  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
752
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_acc);
753
754
755
756
757
758
759
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
    return mpi_errno;
  fn_fail:
    goto fn_exit;

}

760
761
762
763
764
765
766
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_GetAccumulate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                                       MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
Xin Zhao's avatar
Xin Zhao committed
767
    MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
768
769
770
771
772
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
773
    int acquire_lock_fail = 0;
774
775
776
777
778
779
780
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received accumulate pkt");

Xin Zhao's avatar
Xin Zhao committed
781
782
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_get_accum);

Xin Zhao's avatar
Xin Zhao committed
783
784
    MPIU_Assert(get_accum_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(get_accum_pkt->target_win_handle, win_ptr);
785

786
787
788
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
789
790

    if (acquire_lock_fail) {
791
        (*rreqp) = req;
792
793
794
        goto fn_exit;
    }

795
796
797
798
799
800
801
802
    if (pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
        size_t len;
        void *src = NULL, *dest = NULL;
        MPID_Request *resp_req = NULL;
        MPIDI_CH3_Pkt_t upkt;
        MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
        MPID_IOV iov[MPID_IOV_LIMIT];
        int iovcnt;
803
        MPI_Aint type_size;
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;

        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));

        resp_req = MPID_Request_create();
        resp_req->dev.target_win_handle = get_accum_pkt->target_win_handle;
        resp_req->dev.flags = get_accum_pkt->flags;

        MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
        resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
        resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
        resp_req->kind = MPID_REQUEST_SEND;

        /* here we increment the Active Target counter to guarantee the GET-like
821
         * operation are completed when counter reaches zero. */
822
823
824
        win_ptr->at_completion_counter++;

        /* Calculate the length of reponse data, ensure that it fits into immed packet. */
825
        MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
826
827
        MPIU_Assign_trunc(len, get_accum_pkt->count * type_size, size_t);

828
        MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED);
829
830
831
832
833
834
835
836
837
838
839
840
841
        get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
        get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
        get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
        if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
            get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
        if ((get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
            (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
            get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;

        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);

842
843
844
845
846
847
848
        /* copy data from target buffer to response packet header */
        src = (void *) (get_accum_pkt->addr), dest = (void *) (get_accum_resp_pkt->info.data);
        mpi_errno = immed_copy(src, dest, len);
        if (mpi_errno != MPI_SUCCESS) {
            if (win_ptr->shm_allocated == TRUE)
                MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
            MPIU_ERR_POP(mpi_errno);
849
850
851
        }

        /* perform accumulate operation. */
852
853
854
855
        mpi_errno =
            do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
                             get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
                             get_accum_pkt->datatype, 0, get_accum_pkt->op);
856
857
858
859

        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);

860
861
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
862

863
864
865
        iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
        iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
        iovcnt = 1;
866
867
868
869
870
871
872
873
874
875
876
877
878

        MPIU_THREAD_CS_ENTER(CH3COMM, vc);
        mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
        MPIU_THREAD_CS_EXIT(CH3COMM, vc);
        /* --BEGIN ERROR HANDLING-- */
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_Object_set_ref(resp_req, 0);
            MPIDI_CH3_Request_destroy(resp_req);
            MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
        }
        /* --END ERROR HANDLING-- */
    }
    else {
879
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
880

881
882
883
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
        *rreqp = req;
884

885
886
887
888
889
        req->dev.user_count = get_accum_pkt->count;
        req->dev.op = get_accum_pkt->op;
        req->dev.real_user_buf = get_accum_pkt->addr;
        req->dev.target_win_handle = get_accum_pkt->target_win_handle;
        req->dev.flags = get_accum_pkt->flags;
890

891
892
        req->dev.resp_request_handle = get_accum_pkt->request_handle;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
893

894
895
896
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
897

898
        if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
899
900
            MPI_Aint type_size;

901
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
902
            req->dev.datatype = get_accum_pkt->datatype;
903

904
905
906
907
908
909
910
911
912
913
914
            MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
            /* allocate a SRBuf for receiving stream unit */
            MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
            /* --BEGIN ERROR HANDLING-- */
            if (req->dev.tmpbuf_sz == 0) {
                MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
                mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
                                                 FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
                                                 "**nomem %d", MPIDI_CH3U_SRBuf_size);
                req->status.MPI_ERROR = mpi_errno;
                goto fn_fail;
915
            }
916
            /* --END ERROR HANDLING-- */
917

918
            req->dev.user_buf = req->dev.tmpbuf;
919

920
            MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
921
            req->dev.recv_data_sz = type_size * get_accum_pkt->count;
922
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
923

924
925
926
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
927

928
929
            /* return the number of bytes processed in this function */
            *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
930
931

            if (complete) {
932
933
934
935
936
937
938
                mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
939
940
            }
        }
941
        else {
942
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT);
943
944
945
946
947
948
949
950
951
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
952

953
            req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
954
955
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
956
                                     get_accum_pkt->info.dataloop_size);
957
            }
958

959
            if (data_len >=
960
961
                sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size +
                sizeof(req->dev.stream_offset)) {
962
963
964
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
965
966
967
968
                            get_accum_pkt->info.dataloop_size);
                MPIU_Memcpy(&(req->dev.stream_offset),
                            data_buf + sizeof(MPIDI_RMA_dtype_info) +
                            get_accum_pkt->info.dataloop_size, sizeof(req->dev.stream_offset));
969
970
971

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
972
                    get_accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
973
974
975
976
977
978
979
980
981
982
983
984
985
986

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
987
988
989
990
                req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
                req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
                req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
                req->dev.iov_count = 3;
991
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
992
993
            }

994
        }
995

996
997
998
999
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
        }
1000
    }
For faster browsing, not all history is shown. View entire blame