iallgatherv.c 32.2 KB
Newer Older
1
2
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
3
 *  (C) 2010 by Argonne National Laboratory.
4
5
6
7
8
 *      See COPYRIGHT in top-level directory.
 */

#include "mpiimpl.h"

9
/* -- Begin Profiling Symbol Block for routine MPI_Iallgatherv */
10
#if defined(HAVE_PRAGMA_WEAK)
11
#pragma weak MPI_Iallgatherv = PMPI_Iallgatherv
12
#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
13
#pragma _HP_SECONDARY_DEF PMPI_Iallgatherv  MPI_Iallgatherv
14
#elif defined(HAVE_PRAGMA_CRI_DUP)
15
#pragma _CRI duplicate MPI_Iallgatherv as PMPI_Iallgatherv
16
17
18
19
20
21
#endif
/* -- End Profiling Symbol Block */

/* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
   the MPI routines */
#ifndef MPICH_MPI_FROM_PMPI
22
23
#undef MPI_Iallgatherv
#define MPI_Iallgatherv PMPI_Iallgatherv
24

25
26
27
28
#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_rec_dbl
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
29
30
31
int MPIR_Iallgatherv_rec_dbl(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                             void *recvbuf, const int recvcounts[], const int displs[],
                             MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPID_Sched_t s)
32
33
34
35
36
37
38
{
    int mpi_errno = MPI_SUCCESS;
    int comm_size, rank, i, j, k;
    int curr_count, send_offset, incoming_count, recv_offset;
    int mask, dst, total_count, position, offset, my_tree_root, dst_tree_root;
    MPI_Aint recvtype_extent, recvtype_true_extent, recvtype_true_lb;
    void *tmp_buf = NULL;
39
    int is_homogeneous ATTRIBUTE((unused));
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
    MPIR_SCHED_CHKPMEM_DECL(1);

    comm_size = comm_ptr->local_size;
    rank = comm_ptr->rank;

    is_homogeneous = 1;
#ifdef MPID_HAS_HETERO
    if (comm_ptr->is_hetero)
        is_homogeneous = 0;
#endif
    MPIU_Assert(is_homogeneous); /* we only handle the homogeneous for now */

    /* need to receive contiguously into tmp_buf because
       displs could make the recvbuf noncontiguous */
    MPID_Datatype_get_extent_macro(recvtype, recvtype_extent);
    MPIR_Type_get_true_extent_impl(recvtype, &recvtype_true_lb, &recvtype_true_extent);

    total_count = 0;
    for (i=0; i<comm_size; i++)
        total_count += recvcounts[i];

    if (total_count == 0)
        goto fn_exit;

    MPID_Ensure_Aint_fits_in_pointer(total_count*(MPIR_MAX(recvtype_true_extent, recvtype_extent)));
    MPIR_SCHED_CHKPMEM_MALLOC(tmp_buf, void *, total_count*(MPIR_MAX(recvtype_true_extent,recvtype_extent)), mpi_errno, "tmp_buf");

    /* adjust for potential negative lower bound in datatype */
    tmp_buf = (void *)((char*)tmp_buf - recvtype_true_lb);

    /* copy local data into right location in tmp_buf */
    position = 0;
    for (i=0; i<rank; i++) position += recvcounts[i];
    if (sendbuf != MPI_IN_PLACE)
    {
        mpi_errno = MPID_Sched_copy(sendbuf, sendcount, sendtype,
                                   ((char *)tmp_buf + position*recvtype_extent),
                                   recvcounts[rank], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    else
    {
        /* if in_place specified, local data is found in recvbuf */
        mpi_errno = MPID_Sched_copy(((char *)recvbuf + displs[rank]*recvtype_extent),
                                   recvcounts[rank], recvtype,
                                   ((char *)tmp_buf + position*recvtype_extent),
                                   recvcounts[rank], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }

    curr_count = recvcounts[rank];

    /* never used uninitialized w/o this, but compiler can't tell that */
    incoming_count = -1;

    /* [goodell@] random notes that help slightly when deciphering this code:
     * - mask is also equal to the number of blocks that we are going to recv
     *   (less if comm_size is non-pof2)
     * - FOO_tree_root is the leftmost (lowest ranked) process with whom FOO has
     *   communicated, directly or indirectly, at the beginning of round the
     *   round.  FOO is either "dst" or "my", where "my" means use my rank.
     * - in each round we are going to recv the blocks
     *   B[dst_tree_root],B[dst_tree_root+1],...,B[min(dst_tree_root+mask,comm_size)]
     */
    mask = 0x1;
    i = 0;
    while (mask < comm_size) {
        dst = rank ^ mask;

        /* find offset into send and recv buffers. zero out
           the least significant "i" bits of rank and dst to
           find root of src and dst subtrees. Use ranks of
           roots as index to send from and recv into buffer */

        dst_tree_root = dst >> i;
        dst_tree_root <<= i;

        my_tree_root = rank >> i;
        my_tree_root <<= i;

        if (dst < comm_size) {
            send_offset = 0;
            for (j = 0; j < my_tree_root; j++)
                send_offset += recvcounts[j];

            recv_offset = 0;
            for (j = 0; j < dst_tree_root; j++)
                recv_offset += recvcounts[j];

            incoming_count = 0;
            for (j = dst_tree_root; j < (dst_tree_root + mask) && j < comm_size; ++j)
                incoming_count += recvcounts[j];

            mpi_errno = MPID_Sched_send(((char *)tmp_buf + send_offset * recvtype_extent),
                                        curr_count, recvtype, dst, comm_ptr, s);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            /* sendrecv, no barrier here */
            mpi_errno = MPID_Sched_recv(((char *)tmp_buf + recv_offset * recvtype_extent),
                                        incoming_count, recvtype, dst, comm_ptr, s);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            MPID_SCHED_BARRIER(s);

            curr_count += incoming_count;
        }

        /* if some processes in this process's subtree in this step
           did not have any destination process to communicate with
           because of non-power-of-two, we need to send them the
           data that they would normally have received from those
           processes. That is, the haves in this subtree must send to
           the havenots. We use a logarithmic
           recursive-halfing algorithm for this. */

        /* This part of the code will not currently be
           executed because we are not using recursive
           doubling for non power of two. Mark it as experimental
           so that it doesn't show up as red in the coverage
           tests. */

        /* --BEGIN EXPERIMENTAL-- */
        if (dst_tree_root + mask > comm_size) {
            int tmp_mask, tree_root;
            int nprocs_completed = comm_size - my_tree_root - mask;
            /* nprocs_completed is the number of processes in this
               subtree that have all the data. Send data to others
               in a tree fashion. First find root of current tree
               that is being divided into two. k is the number of
               least-significant bits in this process's rank that
               must be zeroed out to find the rank of the root */
            /* [goodell@] it looks like (k==i) is always true, could possibly
             * skip the loop below */
            j = mask;
            k = 0;
            while (j) {
                j >>= 1;
                k++;
            }
            k--;

            tmp_mask = mask >> 1;

            while (tmp_mask) {
                dst = rank ^ tmp_mask;

                tree_root = rank >> k;
                tree_root <<= k;

                /* send only if this proc has data and destination
                   doesn't have data. at any step, multiple processes
                   can send if they have the data */
                if ((dst > rank) &&
                    (rank < tree_root + nprocs_completed) &&
                    (dst >= tree_root + nprocs_completed))
                {
                    offset = 0;
                    for (j = 0; j < (my_tree_root+mask); j++)
                        offset += recvcounts[j];
                    offset *= recvtype_extent;

                    /* incoming_count was set in the previous
                       receive. that's the amount of data to be
                       sent now. */
                    mpi_errno = MPID_Sched_send(((char *)tmp_buf + offset),
                                                incoming_count, recvtype, dst, comm_ptr, s);
                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                    MPID_SCHED_BARRIER(s);
                }
                /* recv only if this proc. doesn't have data and sender
                   has data */
                else if ((dst < rank) &&
                         (dst < tree_root + nprocs_completed) &&
                         (rank >= tree_root + nprocs_completed))
                {

                    offset = 0;
                    for (j = 0; j < (my_tree_root+mask); j++)
                        offset += recvcounts[j];

                    /* recalculate incoming_count, since not all processes will have
                     * this value */
                    incoming_count = 0;
                    for (j = dst_tree_root; j < (dst_tree_root + mask) && j < comm_size; ++j)
                        incoming_count += recvcounts[j];

                    mpi_errno = MPID_Sched_recv(((char *)tmp_buf + offset * recvtype_extent),
                                                incoming_count, recvtype,
                                                dst, comm_ptr, s);
                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                    MPID_SCHED_BARRIER(s);
                    curr_count += incoming_count;
                }
                tmp_mask >>= 1;
                k--;
            }
        }
        /* --END EXPERIMENTAL-- */

        mask <<= 1;
        i++;
    }

    /* sanity check that we got all of the data blocks */
    MPIU_Assert(curr_count == total_count);

    /* copy data from tmp_buf to recvbuf */
    position = 0;
    for (j = 0; j < comm_size; j++) {
        if ((sendbuf != MPI_IN_PLACE) || (j != rank)) {
            /* not necessary to copy if in_place and
               j==rank. otherwise copy. */
            mpi_errno = MPID_Sched_copy(((char *)tmp_buf + position*recvtype_extent),
                                       recvcounts[j], recvtype,
                                       ((char *)recvbuf + displs[j]*recvtype_extent),
                                       recvcounts[j], recvtype, s);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        }
        position += recvcounts[j];
    }

    MPIR_SCHED_CHKPMEM_COMMIT(s);
fn_exit:
    return mpi_errno;
fn_fail:
    MPIR_SCHED_CHKPMEM_REAP(s);
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_bruck
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
271
272
273
int MPIR_Iallgatherv_bruck(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                           void *recvbuf, const int recvcounts[], const int displs[],
                           MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPID_Sched_t s)
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
{
    int mpi_errno = MPI_SUCCESS;
    int comm_size, rank, j, i;
    MPI_Aint recvbuf_extent, recvtype_extent, recvtype_true_extent, recvtype_true_lb;
    int send_cnt, dst, total_count, pof2, src, rem;
    int incoming_count, curr_count;
    void *tmp_buf;
    MPIR_SCHED_CHKPMEM_DECL(1);

    comm_size = comm_ptr->local_size;
    rank = comm_ptr->rank;

    MPID_Datatype_get_extent_macro(recvtype, recvtype_extent);

    total_count = 0;
    for (i=0; i<comm_size; i++)
        total_count += recvcounts[i];

    if (total_count == 0)
        goto fn_exit;

    /* allocate a temporary buffer of the same size as recvbuf. */

    /* get true extent of recvtype */
    MPIR_Type_get_true_extent_impl(recvtype, &recvtype_true_lb, &recvtype_true_extent);

    MPID_Ensure_Aint_fits_in_pointer(total_count * MPIR_MAX(recvtype_true_extent, recvtype_extent));
    recvbuf_extent = total_count * (MPIR_MAX(recvtype_true_extent, recvtype_extent));

    MPIR_SCHED_CHKPMEM_MALLOC(tmp_buf, void *, recvbuf_extent, mpi_errno, "tmp_buf");

    /* adjust for potential negative lower bound in datatype */
    tmp_buf = (void *)((char*)tmp_buf - recvtype_true_lb);

    /* copy local data to the top of tmp_buf */
    if (sendbuf != MPI_IN_PLACE) {
        mpi_errno = MPID_Sched_copy(sendbuf, sendcount, sendtype,
                                    tmp_buf, recvcounts[rank], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        MPID_SCHED_BARRIER(s);
    }
    else {
        mpi_errno = MPID_Sched_copy(((char *)recvbuf + displs[rank]*recvtype_extent),
                                    recvcounts[rank], recvtype,
                                    tmp_buf, recvcounts[rank], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        MPID_SCHED_BARRIER(s);
    }

    /* \floor(\lg p) send-recv rounds */
    /* recv these blocks (actual block numbers are mod comm_size):
     * - rank+1
     * - rank+2,rank+3
     * - rank+4,rank+5,rank+6,rank+7
     * - ...
     */
    /* curr_count is the amount of data (in counts of recvtype) that we have
     * right now, starting with the block we copied in the previous step */
    curr_count = recvcounts[rank];
    pof2 = 1;
    while (pof2 <= comm_size/2) {
        src = (rank + pof2) % comm_size;
        dst = (rank - pof2 + comm_size) % comm_size;

        incoming_count = 0;
        for (i = 0; i < pof2; ++i) {
            incoming_count += recvcounts[(src + i) % comm_size];
        }

        mpi_errno = MPID_Sched_send(tmp_buf, curr_count, recvtype, dst, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        /* sendrecv, no barrier */
        mpi_errno = MPID_Sched_recv(((char *)tmp_buf + curr_count*recvtype_extent),
                                    incoming_count, recvtype, src, comm_ptr, s);
348
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
        MPID_SCHED_BARRIER(s);

        /* we will send everything we had plus what we just got to next round's dst */
        curr_count += incoming_count;
        pof2 *= 2;
    }

    /* if comm_size is not a power of two, one more step is needed */
    rem = comm_size - pof2;
    if (rem) {
        src = (rank + pof2) % comm_size;
        dst = (rank - pof2 + comm_size) % comm_size;

        send_cnt = 0;
        for (i=0; i<rem; i++)
            send_cnt += recvcounts[(rank+i)%comm_size];

        mpi_errno = MPID_Sched_send(tmp_buf, send_cnt, recvtype, dst, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        /* sendrecv, no barrier */
        mpi_errno = MPID_Sched_recv(((char *)tmp_buf + curr_count*recvtype_extent),
                                     (total_count - curr_count), recvtype, src, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        MPID_SCHED_BARRIER(s);
    }

    /* Rotate blocks in tmp_buf down by (rank) blocks and store
     * result in recvbuf. */

    send_cnt = 0;
    for (i=0; i < (comm_size-rank); i++) {
        j = (rank+i)%comm_size;
        mpi_errno = MPID_Sched_copy(((char *)tmp_buf + send_cnt*recvtype_extent),
                                    recvcounts[j], recvtype,
                                    ((char *)recvbuf + displs[j]*recvtype_extent),
                                    recvcounts[j], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        send_cnt += recvcounts[j];
    }

    for (i=0; i<rank; i++) {
        mpi_errno = MPID_Sched_copy(((char *)tmp_buf + send_cnt*recvtype_extent),
                                    recvcounts[i], recvtype,
                                    ((char *)recvbuf + displs[i]*recvtype_extent),
                                    recvcounts[i], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        send_cnt += recvcounts[i];
    }

    MPIR_SCHED_CHKPMEM_COMMIT(s);
fn_exit:
    return mpi_errno;
fn_fail:
    MPIR_SCHED_CHKPMEM_REAP(s);
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_ring
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
410
411
412
int MPIR_Iallgatherv_ring(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                          void *recvbuf, const int recvcounts[], const int displs[],
                          MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPID_Sched_t s)
413
414
415
416
417
418
419
420
421
422
423
{
    int mpi_errno = MPI_SUCCESS;
    int i, total_count;
    MPI_Aint recvtype_extent;
    int rank, comm_size;
    int left, right;
    char *sbuf = NULL;
    char *rbuf = NULL;
    int soffset, roffset;
    int torecv, tosend, min;
    int sendnow, recvnow;
424
    int sidx, ridx;
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455

    comm_size = comm_ptr->local_size;
    rank = comm_ptr->rank;
    MPID_Datatype_get_extent_macro(recvtype, recvtype_extent);

    total_count = 0;
    for (i=0; i<comm_size; i++)
        total_count += recvcounts[i];

    if (total_count == 0)
        goto fn_exit;

    if (sendbuf != MPI_IN_PLACE) {
        /* First, load the "local" version in the recvbuf. */
        mpi_errno = MPID_Sched_copy(sendbuf, sendcount, sendtype,
                                    ((char *)recvbuf + displs[rank]*recvtype_extent),
                                    recvcounts[rank], recvtype, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        MPID_SCHED_BARRIER(s);
    }

    left  = (comm_size + rank - 1) % comm_size;
    right = (rank + 1) % comm_size;

    torecv = total_count - recvcounts[rank];
    tosend = total_count - recvcounts[right];

    min = recvcounts[0];
    for (i = 1; i < comm_size; i++)
        if (min > recvcounts[i])
            min = recvcounts[i];
456
457
    if (min * recvtype_extent < MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE)
        min = MPIR_CVAR_ALLGATHERV_PIPELINE_MSG_SIZE / recvtype_extent;
458
459
460
461
462
    /* Handle the case where the datatype extent is larger than
     * the pipeline size. */
    if (!min)
        min = 1;

463
464
    sidx = rank;
    ridx = left;
465
466
467
    soffset = 0;
    roffset = 0;
    while (tosend || torecv) { /* While we have data to send or receive */
468
469
470
471
        sendnow = ((recvcounts[sidx] - soffset) > min) ? min : (recvcounts[sidx] - soffset);
        recvnow = ((recvcounts[ridx] - roffset) > min) ? min : (recvcounts[ridx] - roffset);
        sbuf = (char *)recvbuf + ((displs[sidx] + soffset) * recvtype_extent);
        rbuf = (char *)recvbuf + ((displs[ridx] + roffset) * recvtype_extent);
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493

        /* Protect against wrap-around of indices */
        if (!tosend)
            sendnow = 0;
        if (!torecv)
            recvnow = 0;

        /* Communicate */
        if (recvnow) { /* If there's no data to send, just do a recv call */
            mpi_errno = MPID_Sched_recv(rbuf, recvnow, recvtype, left, comm_ptr, s);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            torecv -= recvnow;
        }
        if (sendnow) { /* If there's no data to receive, just do a send call */
            mpi_errno = MPID_Sched_send(sbuf, sendnow, recvtype, right, comm_ptr, s);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            tosend -= sendnow;
        }
        MPID_SCHED_BARRIER(s);

        soffset += sendnow;
        roffset += recvnow;
494
        if (soffset == recvcounts[sidx]) {
495
            soffset = 0;
496
            sidx = (sidx + comm_size - 1) % comm_size;
497
        }
498
        if (roffset == recvcounts[ridx]) {
499
            roffset = 0;
500
            ridx = (ridx + comm_size - 1) % comm_size;
501
502
503
504
505
506
507
508
509
        }
    }

fn_exit:
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

510
511
/* any non-MPI functions go here, especially non-static ones */

512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
/* This is the default implementation of allgatherv. The algorithm is:

   Algorithm: MPI_Allgatherv

   For short messages and non-power-of-two no. of processes, we use
   the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
   paper. It is a variant of the disemmination algorithm for
   barrier. It takes ceiling(lg p) steps.

   Cost = lgp.alpha + n.((p-1)/p).beta
   where n is total size of data gathered on each process.

   For short or medium-size messages and power-of-two no. of
   processes, we use the recursive doubling algorithm.

   Cost = lgp.alpha + n.((p-1)/p).beta

   TODO: On TCP, we may want to use recursive doubling instead of the Bruck
   algorithm in all cases because of the pairwise-exchange property of
   recursive doubling (see Benson et al paper in Euro PVM/MPI
   2003).

   For long messages or medium-size messages and non-power-of-two
   no. of processes, we use a ring algorithm. In the first step, each
   process i sends its contribution to process i+1 and receives
   the contribution from process i-1 (with wrap-around). From the
   second step onwards, each process i forwards to process i+1 the
   data it received from process i-1 in the previous step. This takes
   a total of p-1 steps.

   Cost = (p-1).alpha + n.((p-1)/p).beta

   Possible improvements:

   End Algorithm: MPI_Allgatherv
*/
#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_intra
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
552
553
554
int MPIR_Iallgatherv_intra(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                           void *recvbuf, const int recvcounts[], const int displs[],
                           MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPID_Sched_t s)
555
556
557
558
559
560
561
562
563
564
565
566
567
568
{
    int mpi_errno = MPI_SUCCESS;
    int i, comm_size, total_count, recvtype_size;

    comm_size = comm_ptr->local_size;
    MPID_Datatype_get_size_macro(recvtype, recvtype_size);

    total_count = 0;
    for (i=0; i<comm_size; i++)
        total_count += recvcounts[i];

    if (total_count == 0)
        goto fn_exit;

569
    if ((total_count*recvtype_size < MPIR_CVAR_ALLGATHER_LONG_MSG_SIZE) &&
570
571
572
573
574
575
576
        !(comm_size & (comm_size - 1)))
    {
        /* Short or medium size message and power-of-two no. of processes. Use
         * recursive doubling algorithm */
        mpi_errno = MPIR_Iallgatherv_rec_dbl(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
577
    else if (total_count*recvtype_size < MPIR_CVAR_ALLGATHER_SHORT_MSG_SIZE) {
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
        /* Short message and non-power-of-two no. of processes. Use
         * Bruck algorithm (see description above). */
        mpi_errno = MPIR_Iallgatherv_bruck(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    else {
        /* long message or medium-size message and non-power-of-two
         * no. of processes. Use ring algorithm. */
        mpi_errno = MPIR_Iallgatherv_ring(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }

fn_exit:
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_inter
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
600
601
602
int MPIR_Iallgatherv_inter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                           void *recvbuf, const int recvcounts[], const int displs[],
                           MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPID_Sched_t s)
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
{
/* Intercommunicator Allgatherv.
   This is done differently from the intercommunicator allgather
   because we don't have all the information to do a local
   intracommunictor gather (sendcount can be different on each
   process). Therefore, we do the following:
   Each group first does an intercommunicator gather to rank 0
   and then does an intracommunicator broadcast.
*/
    int mpi_errno = MPI_SUCCESS;
    int remote_size, root, rank;
    MPID_Comm *newcomm_ptr = NULL;
    MPI_Datatype newtype = MPI_DATATYPE_NULL;

    remote_size = comm_ptr->remote_size;
    rank = comm_ptr->rank;

620
    MPIU_Assert(comm_ptr->coll_fns && comm_ptr->coll_fns->Igatherv_sched);
621
622
623
624
625
626

    /* first do an intercommunicator gatherv from left to right group,
       then from right to left group */
    if (comm_ptr->is_low_group) {
        /* gatherv from right group */
        root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
627
        mpi_errno = comm_ptr->coll_fns->Igatherv_sched(sendbuf, sendcount, sendtype, recvbuf,
628
629
630
631
632
                                                 recvcounts, displs, recvtype, root,
                                                 comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        /* gatherv to right group */
        root = 0;
633
        mpi_errno = comm_ptr->coll_fns->Igatherv_sched(sendbuf, sendcount, sendtype, recvbuf,
634
635
636
637
638
639
640
                                                 recvcounts, displs, recvtype, root,
                                                 comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    else {
        /* gatherv to left group  */
        root = 0;
641
        mpi_errno = comm_ptr->coll_fns->Igatherv_sched(sendbuf, sendcount, sendtype, recvbuf,
642
643
644
645
646
                                                 recvcounts, displs, recvtype, root,
                                                 comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        /* gatherv from left group */
        root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
647
        mpi_errno = comm_ptr->coll_fns->Igatherv_sched(sendbuf, sendcount, sendtype, recvbuf,
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
                                                 recvcounts, displs, recvtype, root,
                                                 comm_ptr, s);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }

    MPID_SCHED_BARRIER(s);

    /* now do an intracommunicator broadcast within each group. we use
       a derived datatype to handle the displacements */

    /* Get the local intracommunicator */
    if (!comm_ptr->local_comm) {
        mpi_errno = MPIR_Setup_intercomm_localcomm( comm_ptr );
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }

    newcomm_ptr = comm_ptr->local_comm;
665
    MPIU_Assert(newcomm_ptr->coll_fns && newcomm_ptr->coll_fns->Ibcast_sched);
666
667
668
669
670
671
672

    mpi_errno = MPIR_Type_indexed_impl(remote_size, recvcounts, displs, recvtype, &newtype);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    mpi_errno = MPIR_Type_commit_impl(&newtype);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

673
    mpi_errno = newcomm_ptr->coll_fns->Ibcast_sched(recvbuf, 1, newtype, 0, newcomm_ptr, s);
674
675
676
677
678
679
680
681
682
683
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    MPIR_Type_free_impl(&newtype);

fn_exit:
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

684
685
686
687
#undef FUNCNAME
#define FUNCNAME MPIR_Iallgatherv_impl
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
688
689
690
int MPIR_Iallgatherv_impl(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
                          void *recvbuf, const int recvcounts[], const int displs[],
                          MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPI_Request *request)
691
692
{
    int mpi_errno = MPI_SUCCESS;
693
    MPID_Request *reqp = NULL;
694
695
    int tag = -1;
    MPID_Sched_t s = MPID_SCHED_NULL;
696
697
698

    *request = MPI_REQUEST_NULL;

699
    MPIU_Assert(comm_ptr->coll_fns != NULL);
700
    if (comm_ptr->coll_fns->Iallgatherv_req != NULL) {
701
        /* --BEGIN USEREXTENSION-- */
702
        mpi_errno = comm_ptr->coll_fns->Iallgatherv_req(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, &reqp);
703
704
705
706
707
708
709
710
        if (reqp) {
            *request = reqp->handle;
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            goto fn_exit;
        }
        /* --END USEREXTENSION-- */
    }

711
    mpi_errno = MPID_Sched_next_tag(comm_ptr, &tag);
712
713
714
715
716
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    mpi_errno = MPID_Sched_create(&s);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    MPIU_Assert(comm_ptr->coll_fns != NULL);
717
718
    MPIU_Assert(comm_ptr->coll_fns->Iallgatherv_sched != NULL);
    mpi_errno = comm_ptr->coll_fns->Iallgatherv_sched(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, s);
719
720
721
722
723
724
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
    if (reqp)
        *request = reqp->handle;
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
725
726
727
728
729
730
731
732
733
734

fn_exit:
    return mpi_errno;
fn_fail:
    goto fn_exit;
}

#endif /* MPICH_MPI_FROM_PMPI */

#undef FUNCNAME
735
#define FUNCNAME MPI_Iallgatherv
736
737
738
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
/*@
739
740
MPI_Iallgatherv - Gathers data from all tasks and deliver the combined data
                  to all tasks in a nonblocking way
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760

Input Parameters:
+ sendbuf - starting address of the send buffer (choice)
. sendcount - number of elements in send buffer (non-negative integer)
. sendtype - data type of send buffer elements (handle)
. recvcounts - non-negative integer array (of length group size) containing the number of elements that are received from each process
. displs - integer array (of length group size). Entry i specifies the displacement relative to recvbuf at which to place the incoming data from process i
. recvtype - data type of receive buffer elements (handle)
- comm - communicator (handle)

Output Parameters:
+ recvbuf - starting address of the receive buffer (choice)
- request - communication request (handle)

.N ThreadSafe

.N Fortran

.N Errors
@*/
761
762
763
int MPI_Iallgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                    const int recvcounts[], const int displs[], MPI_Datatype recvtype,
                    MPI_Comm comm, MPI_Request *request)
764
765
766
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Comm *comm_ptr = NULL;
767
    MPID_MPI_STATE_DECL(MPID_STATE_MPI_IALLGATHERV);
768
769

    MPIU_THREAD_CS_ENTER(ALLFUNC,);
770
    MPID_MPI_FUNC_ENTER(MPID_STATE_MPI_IALLGATHERV);
771
772
773
774
775
776

    /* Validate parameters, especially handles needing to be converted */
#   ifdef HAVE_ERROR_CHECKING
    {
        MPID_BEGIN_ERROR_CHECKS
        {
777
778
            if (sendbuf != MPI_IN_PLACE)
                MPIR_ERRTEST_DATATYPE(sendtype, "sendtype", mpi_errno);
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
            MPIR_ERRTEST_DATATYPE(recvtype, "recvtype", mpi_errno);
            MPIR_ERRTEST_COMM(comm, mpi_errno);

            /* TODO more checks may be appropriate */
        }
        MPID_END_ERROR_CHECKS
    }
#   endif /* HAVE_ERROR_CHECKING */

    /* Convert MPI object handles to object pointers */
    MPID_Comm_get_ptr(comm, comm_ptr);

    /* Validate parameters and objects (post conversion) */
#   ifdef HAVE_ERROR_CHECKING
    {
        MPID_BEGIN_ERROR_CHECKS
        {
796
            MPID_Comm_valid_ptr(comm_ptr, mpi_errno);
797
798
            if (mpi_errno != MPI_SUCCESS) goto fn_fail;

799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
            if (sendbuf != MPI_IN_PLACE) {
                if (HANDLE_GET_KIND(sendtype) != HANDLE_KIND_BUILTIN) {
                    MPID_Datatype *sendtype_ptr = NULL;
                    MPID_Datatype_get_ptr(sendtype, sendtype_ptr);
                    MPID_Datatype_valid_ptr(sendtype_ptr, mpi_errno);
                    if (mpi_errno != MPI_SUCCESS) goto fn_fail;
                    MPID_Datatype_committed_ptr(sendtype_ptr, mpi_errno);
                    if (mpi_errno != MPI_SUCCESS) goto fn_fail;
                }

                /* catch common aliasing cases */
                if (comm_ptr->comm_kind == MPID_INTRACOMM &&
                        sendtype == recvtype &&
                        recvcounts[comm_ptr->rank] != 0 &&
                        sendcount != 0) {
                    int recvtype_size;
                    MPID_Datatype_get_size_macro(recvtype, recvtype_size);
                    MPIR_ERRTEST_ALIAS_COLL(sendbuf, (char*)recvbuf + displs[comm_ptr->rank]*recvtype_size, mpi_errno);
                }
818
819
820
821
822
823
824
825
            }

            MPIR_ERRTEST_ARGNULL(recvcounts,"recvcounts", mpi_errno);
            MPIR_ERRTEST_ARGNULL(displs,"displs", mpi_errno);
            if (HANDLE_GET_KIND(recvtype) != HANDLE_KIND_BUILTIN) {
                MPID_Datatype *recvtype_ptr = NULL;
                MPID_Datatype_get_ptr(recvtype, recvtype_ptr);
                MPID_Datatype_valid_ptr(recvtype_ptr, mpi_errno);
826
                if (mpi_errno != MPI_SUCCESS) goto fn_fail;
827
                MPID_Datatype_committed_ptr(recvtype_ptr, mpi_errno);
828
                if (mpi_errno != MPI_SUCCESS) goto fn_fail;
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
            }

            MPIR_ERRTEST_ARGNULL(request,"request", mpi_errno);
            /* TODO more checks may be appropriate (counts, in_place, buffer aliasing, etc) */
        }
        MPID_END_ERROR_CHECKS
    }
#   endif /* HAVE_ERROR_CHECKING */

    /* ... body of routine ...  */

    mpi_errno = MPIR_Iallgatherv_impl(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, request);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    /* ... end of body of routine ... */

fn_exit:
846
    MPID_MPI_FUNC_EXIT(MPID_STATE_MPI_IALLGATHERV);
847
848
849
850
851
852
853
854
855
    MPIU_THREAD_CS_EXIT(ALLFUNC,);
    return mpi_errno;

fn_fail:
    /* --BEGIN ERROR HANDLING-- */
#   ifdef HAVE_ERROR_CHECKING
    {
        mpi_errno = MPIR_Err_create_code(
            mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER,
856
            "**mpi_iallgatherv", "**mpi_iallgatherv %p %d %D %p %p %p %D %C %p", sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, request);
857
858
859
860
861
862
863
    }
#   endif
    mpi_errno = MPIR_Err_return_comm(comm_ptr, FCNAME, mpi_errno);
    goto fn_exit;
    /* --END ERROR HANDLING-- */
    goto fn_exit;
}