ch3u_rma_pkthandler.c 79.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#include "mpidrma.h"

Xin Zhao's avatar
Xin Zhao committed
10
11
12
13
14
15
16
17
18
19
20
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_put);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_acc);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_accum);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_cas);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_fop);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_get_accum_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_cas_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_fop_resp);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_lock);
21
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_lock_ack);
Xin Zhao's avatar
Xin Zhao committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_unlock);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_flush);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_flush_ack);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_rmapkt_decr_at_cnt);

void MPIDI_CH3_RMA_Init_pkthandler_pvars(void)
{
    /* rma_rmapkt_put */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_put,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Put (in seconds)");

    /* rma_rmapkt_get */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get (in seconds)");

    /* rma_rmapkt_acc */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_acc,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Accumulate (in seconds)");

    /* rma_rmapkt_get_accum */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_accum,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get-Accumulate (in seconds)");

    /* rma_rmapkt_cas */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_cas,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Compare-and-swap (in seconds)");

    /* rma_rmapkt_fop */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_fop,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Fetch-and-op (in seconds)");

    /* rma_rmapkt_get_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Get response (in seconds)");

    /* rma_rmapkt_get_accum_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_get_accum_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
99
100
                                      "RMA",
                                      "RMA:PKTHANDLER for Get-Accumulate response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
101
102
103
104
105
106
107
108

    /* rma_rmapkt_cas_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_cas_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
109
110
                                      "RMA",
                                      "RMA:PKTHANDLER for Compare-and-Swap response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
111
112
113
114
115
116
117
118

    /* rma_rmapkt_fop_resp */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_fop_resp,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
119
120
                                      "RMA",
                                      "RMA:PKTHANDLER for Fetch-and-op response (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
121
122
123
124
125
126
127
128
129
130
131
132
133

    /* rma_rmapkt_lock */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_lock,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Lock (in seconds)");

    /* rma_rmapkt_lock_granted */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
134
                                      rma_rmapkt_lock_ack,
Xin Zhao's avatar
Xin Zhao committed
135
136
137
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
138
                                      "RMA", "RMA:PKTHANDLER for Lock-Ack (in seconds)");
Xin Zhao's avatar
Xin Zhao committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

    /* rma_rmapkt_unlock */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_unlock,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Unlock (in seconds)");

    /* rma_rmapkt_flush */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_flush,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Flush (in seconds)");

    /* rma_rmapkt_flush_ack */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_flush_ack,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Flush-Ack (in seconds)");

    /* rma_rmapkt_decr_at_cnt */
    MPIR_T_PVAR_TIMER_REGISTER_STATIC(RMA,
                                      MPI_DOUBLE,
                                      rma_rmapkt_decr_at_cnt,
                                      MPI_T_VERBOSITY_MPIDEV_DETAIL,
                                      MPI_T_BIND_NO_OBJECT,
                                      MPIR_T_PVAR_FLAG_READONLY,
                                      "RMA", "RMA:PKTHANDLER for Decr-At-Cnt (in seconds)");
}
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

/* ------------------------------------------------------------------------ */
/*
 * The following routines are the packet handlers for the packet types
 * used above in the implementation of the RMA operations in terms
 * of messages.
 */
/* ------------------------------------------------------------------------ */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Put
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_put_t *put_pkt = &pkt->put;
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
197
    int acquire_lock_fail = 0;
198
199
200
201
202
203
204
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received put pkt");

Xin Zhao's avatar
Xin Zhao committed
205
206
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_put);

207
208
209
    MPIU_Assert(put_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);

210
211
212
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
213
214

    if (acquire_lock_fail) {
215
        (*rreqp) = req;
216
217
218
        goto fn_exit;
    }

219
    if (pkt->type == MPIDI_CH3_PKT_PUT_IMMED) {
220
        MPI_Aint type_size;
221
222
223
224

        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype));

225
226
        MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);

227
        /* copy data from packet header to target buffer */
228
        MPIU_Memcpy(put_pkt->addr, put_pkt->info.data, put_pkt->count * type_size);
229
230

        /* trigger final action */
231
        mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
232
                                        put_pkt->flags, put_pkt->source_win_handle);
233
234
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
235
236
237
238
239

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
240
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_PUT);
241

242
243
244
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
245

246
247
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
248

249
250
251
252
253
254
        req->dev.user_buf = put_pkt->addr;
        req->dev.user_count = put_pkt->count;
        req->dev.target_win_handle = put_pkt->target_win_handle;
        req->dev.source_win_handle = put_pkt->source_win_handle;
        req->dev.flags = put_pkt->flags;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutRecvComplete;
255

256
        if (MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype)) {
257
258
            MPI_Aint type_size;

259
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RECV);
260
            req->dev.datatype = put_pkt->datatype;
261

262
263
            MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);

264
265
            req->dev.recv_data_sz = type_size * put_pkt->count;
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
266

267
268
269
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
270

271
272
            /* return the number of bytes processed in this function */
            *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;
273
274

            if (complete) {
275
276
277
278
279
280
281
                mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
282
283
            }
        }
284
285
        else {
            /* derived datatype */
286
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RECV_DERIVED_DT);
287
288
289
290
291
292
293
294
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
295

296
            req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
297
298
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
299
                                     put_pkt->info.dataloop_size);
300
            }
301

302
303
304
            /* if we received all of the dtype_info and dataloop, copy it
             * now and call the handler, otherwise set the iov and let the
             * channel copy it */
305
            if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
306
307
308
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
309
                            put_pkt->info.dataloop_size);
310
311
312

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
313
                    put_pkt->info.dataloop_size;
314
315
316
317
318
319
320
321
322
323
324
325
326
327

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
328
                req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
329
                req->dev.iov_count = 2;
330

331
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
332

333
                req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete;
334
335
336
337
            }

        }

338
        *rreqp = req;
339

340
341
342
343
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_ERR_SET1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                          "**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
        }
344
    }
345
346
347


  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
348
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_put);
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_PUT);
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Get
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_get_t *get_pkt = &pkt->get;
    MPID_Request *req = NULL;
    MPID_IOV iov[MPID_IOV_LIMIT];
365
    int complete = 0;
366
367
368
369
370
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
    int mpi_errno = MPI_SUCCESS;
    MPI_Aint type_size;
371
    int acquire_lock_fail = 0;
372
373
374
375
376
377
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received get pkt");

Xin Zhao's avatar
Xin Zhao committed
378
379
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_get);

380
381
382
    MPIU_Assert(get_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(get_pkt->target_win_handle, win_ptr);

383
384
385
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
386
387

    if (acquire_lock_fail) {
388
        (*rreqp) = req;
389
390
391
        goto fn_exit;
    }

392
393
394
395
    req = MPID_Request_create();
    req->dev.target_win_handle = get_pkt->target_win_handle;
    req->dev.flags = get_pkt->flags;

396
397
398
399
    /* get start location of data and length of data */
    data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
    data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);

400
    /* here we increment the Active Target counter to guarantee the GET-like
401
     * operation are completed when counter reaches zero. */
402
403
    win_ptr->at_completion_counter++;

404
405
406
407
    if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype));
    }

408
409
410
411
    if (MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype)) {
        /* basic datatype. send the data. */
        MPIDI_CH3_Pkt_t upkt;
        MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
412
413
        size_t len;
        int iovcnt;
414
        int is_contig;
415
416

        MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RESP);
417
418
        req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
419
420
        req->kind = MPID_REQUEST_SEND;

421
422
423
424
425
426
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
            MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP_IMMED);
        }
        else {
            MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
        }
427
        get_resp_pkt->request_handle = get_pkt->request_handle;
428
        get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
429
430
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
431
            get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
432
433
        if ((get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
            (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
434
435
            get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
        get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
436

437
        /* length of target data */
438
        MPID_Datatype_get_size_macro(get_pkt->datatype, type_size);
439

440
441
        MPID_Datatype_is_contig(get_pkt->datatype, &is_contig);

442
        if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
443
            MPIU_Assign_trunc(len, get_pkt->count * type_size, size_t);
444
            void *src = (void *) (get_pkt->addr), *dest = (void *) (get_resp_pkt->info.data);
445
            mpi_errno = immed_copy(src, dest, len);
446
447
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
448
449
450
451

            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
            iovcnt = 1;
452
453
454
455
456
457
458
459
460
461
462

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            /* --BEGIN ERROR HANDLING-- */
            if (mpi_errno != MPI_SUCCESS) {
                MPIU_Object_set_ref(req, 0);
                MPIDI_CH3_Request_destroy(req);
                MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
            }
            /* --END ERROR HANDLING-- */
463
        }
464
        else if (is_contig) {
465
466
            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
467
            iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) get_pkt->addr);
468
            iov[1].MPID_IOV_LEN = get_pkt->count * type_size;
469
            iovcnt = 2;
470
471
472
473
474
475
476
477
478
479
480

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            /* --BEGIN ERROR HANDLING-- */
            if (mpi_errno != MPI_SUCCESS) {
                MPIU_Object_set_ref(req, 0);
                MPIDI_CH3_Request_destroy(req);
                MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
            }
            /* --END ERROR HANDLING-- */
481
        }
482
483
484
        else {
            iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
            iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
485

486
487
488
489
490
491
492
493
494
495
496
497
498
            req->dev.segment_ptr = MPID_Segment_alloc();
            MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
                                 MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");

            MPID_Segment_init(get_pkt->addr, get_pkt->count,
                              get_pkt->datatype, req->dev.segment_ptr, 0);
            req->dev.segment_first = 0;
            req->dev.segment_size = get_pkt->count * type_size;

            MPIU_THREAD_CS_ENTER(CH3COMM, vc);
            mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
            MPIU_THREAD_CS_EXIT(CH3COMM, vc);
            MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
499
500
501
502
503
504
505
506
        }

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
        /* derived datatype. first get the dtype_info and dataloop. */

507
        MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RECV_DERIVED_DT);
508
        req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete;
509
510
511
512
513
514
515
516
517
518
519
520
521
        req->dev.OnFinal = 0;
        req->dev.user_buf = get_pkt->addr;
        req->dev.user_count = get_pkt->count;
        req->dev.datatype = MPI_DATATYPE_NULL;
        req->dev.request_handle = get_pkt->request_handle;

        req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
            MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
        if (!req->dev.dtype_info) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                 "MPIDI_RMA_dtype_info");
        }

522
        req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
523
524
        if (!req->dev.dataloop) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
525
                                 get_pkt->info.dataloop_size);
526
527
528
529
530
        }

        /* if we received all of the dtype_info and dataloop, copy it
         * now and call the handler, otherwise set the iov and let the
         * channel copy it */
531
        if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
532
533
534
            /* copy all of dtype_info and dataloop */
            MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
            MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
535
                        get_pkt->info.dataloop_size);
536
537

            *buflen =
538
                sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
539
                get_pkt->info.dataloop_size;
540
541

            /* All dtype data has been received, call req handler */
542
            mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
543
544
545
546
547
548
549
550
551
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_GET");
            if (complete)
                *rreqp = NULL;
        }
        else {
            req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
            req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
            req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
552
            req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
553
554
555
556
557
558
559
            req->dev.iov_count = 2;

            *buflen = sizeof(MPIDI_CH3_Pkt_t);
            *rreqp = req;
        }

    }
560
  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
561
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_get);
562
563
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET);
    return mpi_errno;
564
565
  fn_fail:
    goto fn_exit;
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Accumulate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                                    MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
581
    int acquire_lock_fail = 0;
582
583
584
585
586
587
588
589
    int mpi_errno = MPI_SUCCESS;
    MPI_Aint type_size;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received accumulate pkt");

Xin Zhao's avatar
Xin Zhao committed
590
591
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_acc);

592
593
594
    MPIU_Assert(accum_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(accum_pkt->target_win_handle, win_ptr);

595
596
597
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
598
599

    if (acquire_lock_fail) {
600
        (*rreqp) = req;
601
602
603
        goto fn_exit;
    }

604
605
606
607
608
609
    if (pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype));

        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
610
611
612
        mpi_errno = do_accumulate_op(accum_pkt->info.data, accum_pkt->count, accum_pkt->datatype,
                                     accum_pkt->addr, accum_pkt->count, accum_pkt->datatype,
                                     0, accum_pkt->op);
613
614
615
616
617
618
619
        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
        if (mpi_errno) {
            MPIU_ERR_POP(mpi_errno);
        }

        /* trigger final action */
620
        mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
621
                                        accum_pkt->flags, accum_pkt->source_win_handle);
622
623
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
624
625
626
627
628

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;
    }
    else {
629
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
630

631
632
633
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
        *rreqp = req;
634

635
636
637
638
639
640
        req->dev.user_count = accum_pkt->count;
        req->dev.op = accum_pkt->op;
        req->dev.real_user_buf = accum_pkt->addr;
        req->dev.target_win_handle = accum_pkt->target_win_handle;
        req->dev.source_win_handle = accum_pkt->source_win_handle;
        req->dev.flags = accum_pkt->flags;
641

642
643
        req->dev.resp_request_handle = MPI_REQUEST_NULL;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
644

645
646
647
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
648

649
        if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
650
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
651
            req->dev.datatype = accum_pkt->datatype;
652

653
654
655
656
657
658
659
660
661
662
663
            MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
            /* allocate a SRBuf for receiving stream unit */
            MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
            /* --BEGIN ERROR HANDLING-- */
            if (req->dev.tmpbuf_sz == 0) {
                MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
                mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
                                                 FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
                                                 "**nomem %d", MPIDI_CH3U_SRBuf_size);
                req->status.MPI_ERROR = mpi_errno;
                goto fn_fail;
664
            }
665
            /* --END ERROR HANDLING-- */
666

667
            req->dev.user_buf = req->dev.tmpbuf;
668

669
            MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
670

671
            req->dev.recv_data_sz = type_size * accum_pkt->count;
672
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
673

674
675
676
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
677

678
679
            /* return the number of bytes processed in this function */
            *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
680
681

            if (complete) {
682
683
684
685
686
687
688
                mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
689
690
            }
        }
691
        else {
692
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT);
693
694
695
696
697
698
699
700
701
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
702

703
            req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
704
705
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
706
                                     accum_pkt->info.dataloop_size);
707
            }
708

709
710
            if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
                sizeof(req->dev.stream_offset)) {
711
712
713
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
714
715
716
717
                            accum_pkt->info.dataloop_size);
                MPIU_Memcpy(&(req->dev.stream_offset),
                            data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
                            sizeof(req->dev.stream_offset));
718
719
720

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
721
                    accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
722
723
724
725
726
727
728
729
730
731
732
733
734
735

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
736
737
738
739
                req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
                req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
                req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
                req->dev.iov_count = 3;
740
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
741
742
            }

743
        }
744
    }
745
746
747
748
749
750
751

    if (mpi_errno != MPI_SUCCESS) {
        MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                             "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
    }

  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
752
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_acc);
753
754
755
756
757
758
759
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
    return mpi_errno;
  fn_fail:
    goto fn_exit;

}

760
761
762
763
764
765
766
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_GetAccumulate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                                       MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
Xin Zhao's avatar
Xin Zhao committed
767
    MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
768
769
770
771
772
    MPID_Request *req = NULL;
    int complete = 0;
    char *data_buf = NULL;
    MPIDI_msg_sz_t data_len;
    MPID_Win *win_ptr;
773
    int acquire_lock_fail = 0;
774
775
776
777
778
779
780
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received accumulate pkt");

Xin Zhao's avatar
Xin Zhao committed
781
782
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_get_accum);

Xin Zhao's avatar
Xin Zhao committed
783
784
    MPIU_Assert(get_accum_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(get_accum_pkt->target_win_handle, win_ptr);
785

786
787
788
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &req);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
789
790

    if (acquire_lock_fail) {
791
        (*rreqp) = req;
792
793
794
        goto fn_exit;
    }

795
796
797
798
799
800
801
802
    if (pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
        size_t len;
        void *src = NULL, *dest = NULL;
        MPID_Request *resp_req = NULL;
        MPIDI_CH3_Pkt_t upkt;
        MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
        MPID_IOV iov[MPID_IOV_LIMIT];
        int iovcnt;
803
        MPI_Aint type_size;
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820

        *buflen = sizeof(MPIDI_CH3_Pkt_t);
        *rreqp = NULL;

        /* Immed packet type is used when target datatype is predefined datatype. */
        MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));

        resp_req = MPID_Request_create();
        resp_req->dev.target_win_handle = get_accum_pkt->target_win_handle;
        resp_req->dev.flags = get_accum_pkt->flags;

        MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
        resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
        resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
        resp_req->kind = MPID_REQUEST_SEND;

        /* here we increment the Active Target counter to guarantee the GET-like
821
         * operation are completed when counter reaches zero. */
822
823
824
        win_ptr->at_completion_counter++;

        /* Calculate the length of reponse data, ensure that it fits into immed packet. */
825
        MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
826
827
        MPIU_Assign_trunc(len, get_accum_pkt->count * type_size, size_t);

828
        MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED);
829
830
831
832
833
834
835
836
837
838
        get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
        get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
        get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
        if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
            get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
        if ((get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
            (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
            get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;

839
840
        /* NOTE: 'copy data + ACC' needs to be atomic */

841
842
843
        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);

844
845
846
847
848
849
850
        /* copy data from target buffer to response packet header */
        src = (void *) (get_accum_pkt->addr), dest = (void *) (get_accum_resp_pkt->info.data);
        mpi_errno = immed_copy(src, dest, len);
        if (mpi_errno != MPI_SUCCESS) {
            if (win_ptr->shm_allocated == TRUE)
                MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
            MPIU_ERR_POP(mpi_errno);
851
852
853
        }

        /* perform accumulate operation. */
854
855
856
857
        mpi_errno =
            do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
                             get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
                             get_accum_pkt->datatype, 0, get_accum_pkt->op);
858
859
860
861

        if (win_ptr->shm_allocated == TRUE)
            MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);

862
863
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
864

865
866
867
        iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
        iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
        iovcnt = 1;
868
869
870
871
872
873
874
875
876
877
878
879
880

        MPIU_THREAD_CS_ENTER(CH3COMM, vc);
        mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
        MPIU_THREAD_CS_EXIT(CH3COMM, vc);
        /* --BEGIN ERROR HANDLING-- */
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_Object_set_ref(resp_req, 0);
            MPIDI_CH3_Request_destroy(resp_req);
            MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
        }
        /* --END ERROR HANDLING-- */
    }
    else {
881
        MPIU_Assert(pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
882

883
884
885
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);
        *rreqp = req;
886

887
888
889
890
891
        req->dev.user_count = get_accum_pkt->count;
        req->dev.op = get_accum_pkt->op;
        req->dev.real_user_buf = get_accum_pkt->addr;
        req->dev.target_win_handle = get_accum_pkt->target_win_handle;
        req->dev.flags = get_accum_pkt->flags;
892

893
894
        req->dev.resp_request_handle = get_accum_pkt->request_handle;
        req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
895

896
897
898
        /* get start location of data and length of data */
        data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
        data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
899

900
        if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
901
902
            MPI_Aint type_size;

903
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
904
            req->dev.datatype = get_accum_pkt->datatype;
905

906
907
908
909
910
911
912
913
914
915
916
            MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
            /* allocate a SRBuf for receiving stream unit */
            MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
            /* --BEGIN ERROR HANDLING-- */
            if (req->dev.tmpbuf_sz == 0) {
                MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
                mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
                                                 FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
                                                 "**nomem %d", MPIDI_CH3U_SRBuf_size);
                req->status.MPI_ERROR = mpi_errno;
                goto fn_fail;
917
            }
918
            /* --END ERROR HANDLING-- */
919

920
            req->dev.user_buf = req->dev.tmpbuf;
921

922
            MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
923
            req->dev.recv_data_sz = type_size * get_accum_pkt->count;
924
            MPIU_Assert(req->dev.recv_data_sz > 0);
Xin Zhao's avatar
Xin Zhao committed
925

926
927
928
            mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
            MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
929

930
931
            /* return the number of bytes processed in this function */
            *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
932
933

            if (complete) {
934
935
936
937
938
939
940
                mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, req, &complete);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
941
942
            }
        }
943
        else {
944
            MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT);
945
946
947
948
949
950
951
952
953
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
            req->dev.datatype = MPI_DATATYPE_NULL;

            req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
                MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
            if (!req->dev.dtype_info) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
                                     "MPIDI_RMA_dtype_info");
            }
954

955
            req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
956
957
            if (!req->dev.dataloop) {
                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
958
                                     get_accum_pkt->info.dataloop_size);
959
            }
960

961
            if (data_len >=
962
963
                sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size +
                sizeof(req->dev.stream_offset)) {
964
965
966
                /* copy all of dtype_info and dataloop */
                MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
                MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
967
968
969
970
                            get_accum_pkt->info.dataloop_size);
                MPIU_Memcpy(&(req->dev.stream_offset),
                            data_buf + sizeof(MPIDI_RMA_dtype_info) +
                            get_accum_pkt->info.dataloop_size, sizeof(req->dev.stream_offset));
971
972
973

                *buflen =
                    sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
974
                    get_accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
975
976
977
978
979
980
981
982
983
984
985
986
987
988

                /* All dtype data has been received, call req handler */
                mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
                MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                     "**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
                if (complete) {
                    *rreqp = NULL;
                    goto fn_exit;
                }
            }
            else {
                req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
                req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
                req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
989
990
991
992
                req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
                req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
                req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
                req->dev.iov_count = 3;
993
                *buflen = sizeof(MPIDI_CH3_Pkt_t);
994
995
            }

996
        }
997

998
999
1000
1001
        if (mpi_errno != MPI_SUCCESS) {
            MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
                                 "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
        }
1002
    }
1003
1004

  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
1005
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_get_accum);
1006
1007
1008
1009
1010
1011
1012
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
    return mpi_errno;
  fn_fail:
    goto fn_exit;

}

1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_CAS
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_CAS(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &upkt.cas_resp;
    MPIDI_CH3_Pkt_cas_t *cas_pkt = &pkt->cas;
    MPID_Win *win_ptr;
    MPID_Request *req;
1027
    MPID_Request *rreq = NULL;
1028
    MPI_Aint len;
1029
    int acquire_lock_fail = 0;
1030
1031
1032
1033
1034
1035
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received CAS pkt");

Xin Zhao's avatar
Xin Zhao committed
1036
1037
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_cas);

1038
1039
1040
    MPIU_Assert(cas_pkt->target_win_handle != MPI_WIN_NULL);
    MPID_Win_get_ptr(cas_pkt->target_win_handle, win_ptr);

1041
1042
1043
1044
1045
    mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen, &acquire_lock_fail, &rreq);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
    MPIU_Assert(rreq == NULL);  /* CAS should not have request because all data
                                 * can fit in packet header */
1046
1047
1048
1049
1050
1051

    if (acquire_lock_fail) {
        (*rreqp) = rreq;
        goto fn_exit;
    }

1052
1053
1054
1055
1056
    /* return the number of bytes processed in this function */
    /* data_len == 0 (all within packet) */
    *buflen = sizeof(MPIDI_CH3_Pkt_t);
    *rreqp = NULL;

1057
    MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP_IMMED);
1058
    cas_resp_pkt->request_handle = cas_pkt->request_handle;
1059
1060
    cas_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
    cas_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
1061
1062
    if (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
        cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1063
        cas_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1064
1065
    if ((cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
        (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1066
        cas_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
1067
1068
1069
1070
1071
1072
1073
1074

    /* Copy old value into the response packet */
    MPID_Datatype_get_size_macro(cas_pkt->datatype, len);
    MPIU_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));

    if (win_ptr->shm_allocated == TRUE)
        MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);

1075
    MPIU_Memcpy((void *) &cas_resp_pkt->info.data, cas_pkt->addr, len);
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094

    /* Compare and replace if equal */
    if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
        MPIU_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
    }

    if (win_ptr->shm_allocated == TRUE)
        MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);

    /* Send the response packet */
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);

    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");

    if (req != NULL) {
        if (!MPID_Request_is_complete(req)) {
            /* sending process is not completed, set proper OnDataAvail
1095
             * (it is initialized to NULL by lower layer) */
1096
1097
            req->dev.target_win_handle = cas_pkt->target_win_handle;
            req->dev.flags = cas_pkt->flags;
Xin Zhao's avatar
Xin Zhao committed
1098
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_CASSendComplete;
1099
1100

            /* here we increment the Active Target counter to guarantee the GET-like
1101
             * operation are completed when counter reaches zero. */
1102
1103
1104
1105
1106
1107
1108
1109
1110
            win_ptr->at_completion_counter++;

            MPID_Request_release(req);
            goto fn_exit;
        }
        else
            MPID_Request_release(req);
    }

1111
    mpi_errno = finish_op_on_target(win_ptr, vc, TRUE /* has response data */ ,
1112
                                    cas_pkt->flags, MPI_WIN_NULL);
1113
1114
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
1115
1116

  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
1117
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_cas);
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);
    return mpi_errno;
  fn_fail:
    goto fn_exit;

}


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_CASResp
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_CASResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
                                 MPIDI_CH3_Pkt_t * pkt,
                                 MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &pkt->cas_resp;
    MPID_Request *req;
    MPI_Aint len;
1138
    MPID_Win *win_ptr;
1139
    int target_rank = cas_resp_pkt->target_rank;
1140
1141
1142
1143
1144
1145
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_CASRESP);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_CASRESP);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received CAS response pkt");

Xin Zhao's avatar
Xin Zhao committed
1146
1147
    MPIR_T_PVAR_TIMER_START(RMA, rma_rmapkt_cas_resp);

1148
1149
    MPID_Request_get_ptr(cas_resp_pkt->request_handle, req);
    MPID_Win_get_ptr(req->dev.source_win_handle, win_ptr);
1150
1151

    /* decrement ack_counter on this target */
1152
    if (cas_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
1153
1154
1155
        mpi_errno = handle_lock_ack(win_ptr, target_rank, cas_resp_pkt->flags);
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
1156

1157
1158
1159
        mpi_errno = adjust_op_piggybacked_with_lock(win_ptr, target_rank, cas_resp_pkt->flags);
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
1160
    }
1161
    if (cas_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
1162
        mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
1163
1164
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
1165
    }
1166

1167
1168
    MPID_Datatype_get_size_macro(req->dev.datatype, len);

1169
    MPIU_Memcpy(req->dev.user_buf, (void *) &cas_resp_pkt->info.data, len);
1170
1171
1172
1173
1174
1175

    MPIDI_CH3U_Request_complete(req);
    *buflen = sizeof(MPIDI_CH3_Pkt_t);
    *rreqp = NULL;

  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
1176
    MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_cas_resp);
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_CASRESP);
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_FOP
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
                             MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_fop_t *fop_pkt = &pkt->fop;
1193
1194
1195
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
    MPID_Request *resp_req = NULL;
1196
    MPID_Request *rreq = NULL;
1197
    int acquire_lock_fail = 0;
1198
    MPID_Win *win_ptr = NULL;
1199
    MPI_Aint type_size;
1200
1201
1202
1203
1204
1205
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);

    MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received FOP pkt");