ch3_progress.c 41.7 KB
Newer Older
1
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2
3
4
5
6
7
8
9
10
11
12
13
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidi_ch3_impl.h"

/*#include "mpidpre.h"*/
#include "mpid_nem_impl.h"
#if defined (MPID_NEM_INLINE) && MPID_NEM_INLINE
#include "mpid_nem_inline.h"
#endif
14
15
16
#ifdef HAVE_SIGNAL_H
#include <signal.h>
#endif
17

18
19
20
21
22
23
24
25
26
27
28
29
typedef struct vc_term_element
{
    struct vc_term_element *next;
    MPIDI_VC_t *vc;
    MPID_Request *req;
} vc_term_element_t;

static struct { vc_term_element_t *head, *tail; } vc_term_queue;
#define TERMQ_EMPTY() GENERIC_Q_EMPTY(vc_term_queue)
#define TERMQ_HEAD() GENERIC_Q_HEAD(vc_term_queue)
#define TERMQ_ENQUEUE(ep) GENERIC_Q_ENQUEUE(&vc_term_queue, ep, next)
#define TERMQ_DEQUEUE(epp) GENERIC_Q_DEQUEUE(&vc_term_queue, epp, next)
30
31
32
33
34

#define PKTARRAY_SIZE (MPIDI_NEM_PKT_END+1)
static MPIDI_CH3_PktHandler_Fcn *pktArray[PKTARRAY_SIZE];

#ifndef MPIDI_POSTED_RECV_ENQUEUE_HOOK
35
#define MPIDI_POSTED_RECV_ENQUEUE_HOOK(x) do{}while(0)
36
37
#endif
#ifndef MPIDI_POSTED_RECV_DEQUEUE_HOOK
38
#define MPIDI_POSTED_RECV_DEQUEUE_HOOK(x) 0
39
40
41
42
#endif

#ifdef BY_PASS_PROGRESS
extern MPID_Request ** const MPID_Recvq_posted_head_ptr;
43
extern MPID_Request ** const MPID_Recvq_unexpected_head_ptr;
44
45
46
47
extern MPID_Request ** const MPID_Recvq_posted_tail_ptr;
extern MPID_Request ** const MPID_Recvq_unexpected_tail_ptr;
#endif

48
OPA_int_t MPIDI_CH3I_progress_completion_count = OPA_INT_T_INITIALIZER(0);
49
50
51
52
53
54
55
56
57
58
59

/* NEMESIS MULTITHREADING: Extra Data Structures Added */
#ifdef MPICH_IS_THREADED
volatile int MPIDI_CH3I_progress_blocked = FALSE;
volatile int MPIDI_CH3I_progress_wakeup_signalled = FALSE;
static MPID_Thread_cond_t MPIDI_CH3I_progress_completion_cond;
static int MPIDI_CH3I_Progress_delay(unsigned int completion_count);
static int MPIDI_CH3I_Progress_continue(unsigned int completion_count);
#endif /* MPICH_IS_THREADED */
/* NEMESIS MULTITHREADING - End block*/

60
61
62
#ifdef HAVE_SIGNAL
static void (*prev_sighandler) (int);
#endif
63
static volatile int sigusr1_count = 0;
64
65
static int my_sigusr1_count = 0;

66
67
MPIDI_CH3I_shm_sendq_t MPIDI_CH3I_shm_sendq = {NULL, NULL};
struct MPID_Request *MPIDI_CH3I_shm_active_send = NULL;
68

69
static int pkt_NETMOD_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
70
71
static int shm_connection_terminated(MPIDI_VC_t * vc);
static int check_terminating_vcs(void);
72

73
74
75
int (*MPID_nem_local_lmt_progress)(void) = NULL;
int MPID_nem_local_lmt_pending = FALSE;

76
77
78
79
80
81
82
83
84
85
86
/* qn_ent and friends are used to keep a list of notification
   callbacks for posted and matched anysources */
typedef struct qn_ent
{
    struct qn_ent *next;
    void (*enqueue_fn)(MPID_Request *rreq);
    int (*dequeue_fn)(MPID_Request *rreq);
} qn_ent_t;

static qn_ent_t *qn_head = NULL;

87
#ifdef HAVE_SIGNAL
88
89
90
static void sigusr1_handler(int sig)
{
    ++sigusr1_count;
91
92
    /* poke the progress engine in case we're waiting in a blocking recv */
    MPIDI_CH3_Progress_signal_completion();
93
94
    if (prev_sighandler)
        prev_sighandler(sig);
95
}
96
#endif
97

98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#undef FUNCNAME
#define FUNCNAME check_terminating_vcs
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int check_terminating_vcs(void)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_CHECK_TERMINATING_VCS);

    MPIDI_FUNC_ENTER(MPID_STATE_CHECK_TERMINATING_VCS);

    while (!TERMQ_EMPTY() && MPID_Request_is_complete(TERMQ_HEAD()->req)) {
        vc_term_element_t *ep;
        TERMQ_DEQUEUE(&ep);
        MPID_Request_release(ep->req);
        mpi_errno = shm_connection_terminated(ep->vc);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        MPIU_Free(ep);
    }
    
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_CHECK_TERMINATING_VCS);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/* MPIDI_CH3I_Shm_send_progress() this function makes progress sending
   queued messages on the shared memory queues.  This function is
   nonblocking and does not call netmod functions..*/
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Shm_send_progress
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIDI_CH3I_Shm_send_progress(void)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_IOV *iov;
    int n_iov;
    MPID_Request *sreq;
    int again = 0;

    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_SEND_PROGRESS);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_SEND_PROGRESS);

145
    sreq = MPIDI_CH3I_shm_active_send;
146
147
148
149
150
151
152
153
154
155
156
157
    MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: cont sreq");});
    if (sreq)
    {
        if (!sreq->ch.noncontig)
        {
            MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0);

            iov = &sreq->dev.iov[sreq->dev.iov_offset];
            n_iov = sreq->dev.iov_count;

            do
            {
158
                mpi_errno = MPID_nem_mpich_sendv(&iov, &n_iov, sreq->ch.vc, &again);
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
                if (mpi_errno) MPIU_ERR_POP (mpi_errno);
            }
            while (!again && n_iov > 0);

            if (again) /* not finished sending */
            {
                sreq->dev.iov_offset = iov - sreq->dev.iov;
                sreq->dev.iov_count = n_iov;
                goto fn_exit;
            }
            else
                sreq->dev.iov_offset = 0;
        }
        else
        {
            do
            {
176
                MPID_nem_mpich_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
177
178
179
180
181
182
183
184
185
186
                                         sreq->ch.vc, &again);
            }
            while (!again && sreq->dev.segment_first < sreq->dev.segment_size);

            if (again) /* not finished sending */
                goto fn_exit;
        }
    }
    else
    {
187
        sreq = MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq);
188
189
190
191
192
193
194
195
196
        MPIU_DBG_STMT (CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: new sreq ");});

        if (!sreq->ch.noncontig)
        {
            MPIU_Assert(sreq->dev.iov_count > 0 && sreq->dev.iov[sreq->dev.iov_offset].MPID_IOV_LEN > 0);

            iov = &sreq->dev.iov[sreq->dev.iov_offset];
            n_iov = sreq->dev.iov_count;

197
            mpi_errno = MPID_nem_mpich_sendv_header(&iov, &n_iov, sreq->ch.vc, &again);
198
199
200
            if (mpi_errno) MPIU_ERR_POP (mpi_errno);
            if (!again)
            {
201
                MPIDI_CH3I_shm_active_send = sreq;
202
203
                while (!again && n_iov > 0)
                {
204
                    mpi_errno = MPID_nem_mpich_sendv(&iov, &n_iov, sreq->ch.vc, &again);
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
                    if (mpi_errno) MPIU_ERR_POP (mpi_errno);
                }
            }

            if (again) /* not finished sending */
            {
                sreq->dev.iov_offset = iov - sreq->dev.iov;
                sreq->dev.iov_count = n_iov;
                goto fn_exit;
            }
            else
                sreq->dev.iov_offset = 0;
        }
        else
        {
220
            MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
221
222
223
                                            &sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again);
            if (!again)
            {
224
                MPIDI_CH3I_shm_active_send = sreq;
225
226
                while (!again && sreq->dev.segment_first < sreq->dev.segment_size)
                {
227
                    MPID_nem_mpich_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
                                             sreq->ch.vc, &again);
                }
            }

            if (again) /* not finished sending */
                goto fn_exit;
        }
    }

    /* finished sending sreq */
    MPIU_Assert(!again);

    if (!sreq->dev.OnDataAvail)
    {
        /* MT FIXME-N1 race under per-object, harmless to disable here but
         * its a symptom of a bigger problem... */
#if !(defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_PER_OBJECT))
        MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
#endif

        MPIDI_CH3U_Request_complete(sreq);

        /* MT - clear the current active send before dequeuing/destroying the current request */
251
252
        MPIDI_CH3I_shm_active_send = NULL;
        MPIDI_CH3I_Sendq_dequeue(&MPIDI_CH3I_shm_sendq, &sreq);
253
        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
254
255
        mpi_errno = check_terminating_vcs();
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
256
257
258
259
260
261
262
263
264
    }
    else
    {
        int complete = 0;
        mpi_errno = sreq->dev.OnDataAvail(sreq->ch.vc, sreq, &complete);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);

        if (complete)
        {
265
            MPIDI_CH3I_shm_active_send = NULL;
266
            MPIDI_CH3I_Sendq_dequeue(&MPIDI_CH3I_shm_sendq, &sreq);
267
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
268
269
            mpi_errno = check_terminating_vcs();
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
270
271
272
273
274
275
276
277
278
279
280
        }
    }
        
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_SEND_PROGRESS);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


281
282
283
/* NOTE: it appears that this function is sometimes (inadvertently?) recursive.
 * Some packet handlers, such as MPIDI_CH3_PktHandler_Close, call iStartMsg,
 * which calls MPID_Progress_test. */
284
285
286
287
288
289
290
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
{
    int mpi_errno = MPI_SUCCESS;
291
#ifdef MPICH_IS_THREADED
292
293
    int pollcount = 0;
#endif
294
    int made_progress = FALSE;
295
296
297
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS);
298

299
    MPIU_THREAD_CS_ENTER(MPIDCOMM,);
300

301
302
303
304
    /* sanity: if this doesn't hold, we can't track our local view of completion safely */
    if (is_blocking) {
        MPIU_Assert(progress_state != NULL);
    }
305

306
307
308
309
310
311
    if (sigusr1_count > my_sigusr1_count) {
        my_sigusr1_count = sigusr1_count;
        mpi_errno = MPIDI_CH3U_Check_for_failed_procs();
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    
312
#ifdef ENABLE_CHECKPOINTING
313
    if (MPIR_CVAR_NEMESIS_ENABLE_CKPOINT) {
314
315
316
317
318
319
320
321
322
323
        if (MPIDI_nem_ckpt_start_checkpoint) {
            MPIDI_nem_ckpt_start_checkpoint = FALSE;
            mpi_errno = MPIDI_nem_ckpt_start();
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        }
        if (MPIDI_nem_ckpt_finish_checkpoint) {
            MPIDI_nem_ckpt_finish_checkpoint = FALSE;
            mpi_errno = MPIDI_nem_ckpt_finish();
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        }
324
325
    }
#endif
326

327
328
329
330
331
332
    do
    {
	MPID_Request        *rreq;
	MPID_nem_cell_ptr_t  cell;
	int                  in_fbox = 0;
	MPIDI_VC_t          *vc;
333

334
335
        do /* receive progress */
        {
336

337
338
339
340
341
342
343
344
345
346
347
348
349
350
#ifdef MPICH_IS_THREADED
            MPIU_THREAD_CHECK_BEGIN;
            {
                if (MPIDI_CH3I_progress_blocked == TRUE)
                {
                    /* another thread is already blocking in the progress engine.*/
                    break; /* break out of receive block */
                }
            }
            MPIU_THREAD_CHECK_END;
#endif

            /* make progress receiving */
            /* check queue */
351
            if (MPID_nem_safe_to_block_recv() && is_blocking
352
353
354
355
356
357
358
359
360
#ifdef MPICH_IS_THREADED
#ifdef HAVE_RUNTIME_THREADCHECK
                && !MPIR_ThreadInfo.isThreaded
#else
                && 0
#endif
#endif
                )
            {
361
                mpi_errno = MPID_nem_mpich_blocking_recv(&cell, &in_fbox, progress_state->ch.completion_count);
362
363
364
            }
            else
            {
365
                mpi_errno = MPID_nem_mpich_test_recv(&cell, &in_fbox, is_blocking);
366
367
368
369
370
            }
            if (mpi_errno) MPIU_ERR_POP (mpi_errno);

            if (cell)
            {
371
                char            *cell_buf    = (char *)cell->pkt.mpich.p.payload;
372
                MPIDI_msg_sz_t   payload_len = cell->pkt.mpich.datalen;
373
374
375
376
                MPIDI_CH3_Pkt_t *pkt         = (MPIDI_CH3_Pkt_t *)cell_buf;

                /* Empty packets are not allowed */
                MPIU_Assert(payload_len >= 0);
377

378
379
380
381
                if (in_fbox)
                {
                    MPIDI_CH3I_VC *vc_ch;
                    MPIDI_msg_sz_t buflen = payload_len;
382

383
384
385
386
                    /* This packet must be the first packet of a new message */
                    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from fbox");
                    MPIU_Assert(payload_len >= sizeof (MPIDI_CH3_Pkt_t));

387
                    MPIDI_PG_Get_vc_set_active(MPIDI_Process.my_pg, MPID_NEM_FBOX_SOURCE(cell), &vc);
388
		   
389
390
391
		    MPIU_Assert(vc->ch.recv_active == NULL &&
                                vc->ch.pending_pkt_len == 0);
                    vc_ch = &vc->ch;
392

393
                    /* invalid pkt data will result in unpredictable behavior */
394
                    MPIU_Assert(pkt->type >= 0 && pkt->type < MPIDI_NEM_PKT_END);
395

396
397
398
399
400
                    mpi_errno = pktArray[pkt->type](vc, pkt, &buflen, &rreq);
                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

                    if (!rreq)
                    {
401
                        MPID_nem_mpich_release_fbox(cell);
402
                        break; /* break out of recv progress block */
403
404
405
406
407
408
                    }

                    /* we received a truncated packet, handle it with handle_pkt */
                    vc_ch->recv_active = rreq;
                    cell_buf    += buflen;
                    payload_len -= buflen;
409

410
411
                    mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len);
                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
412
                    MPID_nem_mpich_release_fbox(cell);
413
414
415
416

                    /* the whole message should have been handled */
                    MPIU_Assert(!vc_ch->recv_active);

417
                    break; /* break out of recv progress block */
418
419
420
421
                }


                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Recv pkt from queue");
422

423
                MPIDI_PG_Get_vc_set_active(MPIDI_Process.my_pg, MPID_NEM_CELL_SOURCE(cell), &vc);
424
425
426

                mpi_errno = MPID_nem_handle_pkt(vc, cell_buf, payload_len);
                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
427
                MPID_nem_mpich_release_cell(cell, vc);
428

429
                break; /* break out of recv progress block */
430

431
432
433
            }
        }
        while(0);  /* do the loop exactly once.  Used so we can jump out of recv progress using break. */
434

435
436

	/* make progress sending */
437
        if (MPIDI_CH3I_shm_active_send || MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq)) {
438
439
440
441
            mpi_errno = MPIDI_CH3I_Shm_send_progress();
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        } else {
            /* there are no pending sends */
442
#ifdef MPICH_IS_THREADED
443
            MPIU_THREAD_CHECK_BEGIN;
444
            {
445
                if (MPIDI_CH3I_progress_blocked == TRUE && is_blocking && !MPID_nem_local_lmt_pending)
446
                {
447
448
449
450
                    /* There's nothing to send and there's another thread already blocking in the progress engine.*/
                    MPIDI_CH3I_Progress_delay(progress_state->ch.completion_count);
                    /* the progress_state count will be updated below at the
                     * bottom of the outermost loop (see CC-1) */
451
452
                }
            }
453
            MPIU_THREAD_CHECK_END;
454
#endif
455
        }
456
        
457
        /* make progress on LMTs */
458
        if (MPID_nem_local_lmt_pending)
459
        {
460
            mpi_errno = MPID_nem_local_lmt_progress();
461
462
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        }
463

464
465
466
467
468
469
470
        /* make progress on NBC schedules */
        mpi_errno = MPIDU_Sched_progress(&made_progress);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        if (made_progress) {
            MPIDI_CH3_Progress_signal_completion();
        }

471
        /* in the case of progress_wait, bail out if anything completed (CC-1) */
472
        if (is_blocking) {
473
474
475
476
477
            int completion_count = OPA_load_int(&MPIDI_CH3I_progress_completion_count);
            if (progress_state->ch.completion_count != completion_count) {
                /* Read barrier to make sure no reads get values before the
                   completion counter was incremented  */
                OPA_read_barrier();
478
                /* reset for the next iteration */
479
480
                progress_state->ch.completion_count = completion_count;
                break;
481
482
            }
        }
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513

#ifdef MPICH_IS_THREADED
        MPIU_THREAD_CHECK_BEGIN;
        {
	    /* In the case of threads, we poll for lesser number of
	     * iterations than the case with only processes, as
	     * threads contend for CPU and the lock, while processes
	     * only contend for the CPU. */
            if (pollcount >= MPID_NEM_THREAD_POLLS_BEFORE_YIELD)
            {
                pollcount = 0;
                MPIDI_CH3I_progress_blocked = TRUE;
                MPIU_THREAD_CS_YIELD(ALLFUNC,);
                /* MPIDCOMM yield is needed because at least the send functions
                 * acquire MPIDCOMM to put things into the send queues.  Failure
                 * to yield could result in a deadlock.  This thread needs the
                 * send from another thread to be posted, but the other thread
                 * can't post it while this CS is held. */
                /* assertion: we currently do not hold any other critical
                 * sections besides the MPIDCOMM CS at this point.  Violating
                 * this will probably lead to lock-ordering deadlocks. */
                MPIU_THREAD_CS_YIELD(MPIDCOMM,);
                MPIDI_CH3I_progress_blocked = FALSE;
                MPIDI_CH3I_progress_wakeup_signalled = FALSE;
            }
            ++pollcount;
        }
        MPIU_THREAD_CHECK_END;
#else
        MPIU_Busy_wait();
#endif
514
    }
515
    while (is_blocking);
516

517
    
518
#ifdef MPICH_IS_THREADED
519
520
521
522
    MPIU_THREAD_CHECK_BEGIN;
    {
        if (is_blocking)
        {
523
            MPIDI_CH3I_Progress_continue(0/*unused*/);
524
525
526
527
528
529
        }
    }
    MPIU_THREAD_CHECK_END;
#endif

 fn_exit:
530
    MPIU_THREAD_CS_EXIT(MPIDCOMM,);
531
532
533
534
535
536
537
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}
#ifdef MPICH_IS_THREADED

538
/* Note that this routine is only called if threads are enabled;
539
540
541
542
543
544
545
546
   it does not need to check whether runtime threads are enabled */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_delay
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Progress_delay(unsigned int completion_count)
{
    int mpi_errno = MPI_SUCCESS;
547
548
549
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY);
550
551
    /* FIXME should be appropriately abstracted somehow */
#   if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
552
    {
553
	while (1)
554
	{
555
            if (completion_count != OPA_load_int(&MPIDI_CH3I_progress_completion_count) ||
556
557
558
                MPIDI_CH3I_progress_blocked != TRUE)
                break;
	    MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
559
560
561
	}
    }
#   endif
562
563

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_DELAY);
564
565
566
567
568
569
570
571
572
    return mpi_errno;
}
/* end MPIDI_CH3I_Progress_delay() */


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_continue
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
573
static int MPIDI_CH3I_Progress_continue(unsigned int completion_count/*unused*/)
574
575
{
    int mpi_errno = MPI_SUCCESS;
576
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE);
577

578
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE);
579
580
    /* FIXME should be appropriately abstracted somehow */
#   if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
581
    {
582
        /* we currently hold the MPIDCOMM CS */
583
584
585
	MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond);
    }
#   endif
586
587

    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_CONTINUE);
588
589
590
591
592
593
594
595
596
597
598
    return mpi_errno;
}
/* end MPIDI_CH3I_Progress_continue() */


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_wakeup
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
void MPIDI_CH3I_Progress_wakeup(void)
{
599
600
601
602
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP);

603
    /* no processes sleep in nemesis progress */
604
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_WAKEUP);
605
606
607
608
609
610
611
612
613
614
615
    return;
}
#endif /* MPICH_IS_THREADED */

#undef FUNCNAME
#define FUNCNAME MPID_nem_handle_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_handle_pkt(MPIDI_VC_t *vc, char *buf, MPIDI_msg_sz_t buflen)
{
    int mpi_errno = MPI_SUCCESS;
616
    MPID_Request *rreq = NULL;
617
    int complete;
618
    MPIDI_CH3I_VC *vc_ch = &vc->ch;
619
620
    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_HANDLE_PKT);

621
    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_HANDLE_PKT);
622
623
624
625
626
627
628
629
630
631

    do
    {
        if (!vc_ch->recv_active && vc_ch->pending_pkt_len == 0 && buflen >= sizeof(MPIDI_CH3_Pkt_t))
        {
            /* handle fast-path first: received a new whole message */
            do
            {
                MPIDI_msg_sz_t len = buflen;
                MPIDI_CH3_Pkt_t *pkt = (MPIDI_CH3_Pkt_t *)buf;
632

633
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "received new message");
634

635
                /* invalid pkt data will result in unpredictable behavior */
636
                MPIU_Assert(pkt->type >= 0 && pkt->type < MPIDI_NEM_PKT_END);
637

638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
                mpi_errno = pktArray[pkt->type](vc, pkt, &len, &rreq);
                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                buflen -= len;
                buf    += len;
                MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, if (!rreq) MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...completed immediately"));
            }
            while (!rreq && buflen >= sizeof(MPIDI_CH3_Pkt_t));

            if (!rreq)
                continue;

            /* Channel fields don't get initialized on request creation, init them here */
            if (rreq)
                rreq->dev.iov_offset = 0;
        }
        else if (vc_ch->recv_active)
        {
            MPIU_Assert(vc_ch->pending_pkt_len == 0);
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "continuing recv");
            rreq = vc_ch->recv_active;
        }
        else
        {
            /* collect header fragments in vc's pending_pkt */
            MPIDI_msg_sz_t copylen;
            MPIDI_msg_sz_t pktlen;
            MPIDI_CH3_Pkt_t *pkt = (MPIDI_CH3_Pkt_t *)vc_ch->pending_pkt;

            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "received header fragment");
667

668
669
            copylen = ((vc_ch->pending_pkt_len + buflen <= sizeof(MPIDI_CH3_Pkt_t))
                       ? buflen
670
                       : sizeof(MPIDI_CH3_Pkt_t) - vc_ch->pending_pkt_len);
671
            MPIU_Memcpy((char *)vc_ch->pending_pkt + vc_ch->pending_pkt_len, buf, copylen);
672
673
674
            vc_ch->pending_pkt_len += copylen;
            if (vc_ch->pending_pkt_len < sizeof(MPIDI_CH3_Pkt_t))
                goto fn_exit;
675

676
677
678
            /* we have a whole header */
            MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    completed header");
            MPIU_Assert(vc_ch->pending_pkt_len == sizeof(MPIDI_CH3_Pkt_t));
679

680
681
            buflen -= copylen;
            buf    += copylen;
682

683
            /* invalid pkt data will result in unpredictable behavior */
684
            MPIU_Assert(pkt->type >= 0 && pkt->type < MPIDI_NEM_PKT_END);
685

686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
            pktlen = sizeof(MPIDI_CH3_Pkt_t);
            mpi_errno = pktArray[pkt->type](vc, pkt, &pktlen, &rreq);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            MPIU_Assert(pktlen == sizeof(MPIDI_CH3_Pkt_t));

            vc_ch->pending_pkt_len = 0;

            if (!rreq)
            {
                MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...completed immediately");
                continue;
            }
            /* Channel fields don't get initialized on request creation, init them here */
            rreq->dev.iov_offset = 0;
        }
701

702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
        /* copy data into user buffer described by iov in rreq */
        MPIU_Assert(rreq);
        MPIU_Assert(rreq->dev.iov_count > 0 && rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_LEN > 0);

        MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "    copying into user buffer from IOV");

        if (buflen == 0)
        {
            vc_ch->recv_active = rreq;
            goto fn_exit;
        }

        complete = 0;

        while (buflen && !complete)
        {
            MPID_IOV *iov;
            int n_iov;
720

721
722
723
724
725
            iov = &rreq->dev.iov[rreq->dev.iov_offset];
            n_iov = rreq->dev.iov_count;
		
            while (n_iov && buflen >= iov->MPID_IOV_LEN)
            {
726
727
                size_t iov_len = iov->MPID_IOV_LEN;
		MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "        %d", (int)iov_len);
728
                MPIU_Memcpy (iov->MPID_IOV_BUF, buf, iov_len);
729

730
731
732
733
734
735
736
737
738
739
                buflen -= iov_len;
                buf    += iov_len;
                --n_iov;
                ++iov;
            }
		
            if (n_iov)
            {
                if (buflen > 0)
                {
740
		    MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "        " MPIDI_MSG_SZ_FMT, buflen);
741
                    MPIU_Memcpy (iov->MPID_IOV_BUF, buf, buflen);
742
743
744
745
                    iov->MPID_IOV_BUF = (void *)((char *)iov->MPID_IOV_BUF + buflen);
                    iov->MPID_IOV_LEN -= buflen;
                    buflen = 0;
                }
746

747
748
749
                rreq->dev.iov_offset = iov - rreq->dev.iov;
                rreq->dev.iov_count = n_iov;
                vc_ch->recv_active = rreq;
750
		MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "        remaining: " MPIDI_MSG_SZ_FMT " bytes + %d iov entries", iov->MPID_IOV_LEN, n_iov));
751
752
            }
            else
753
            {
754
755
756
757
758
                int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);

                reqFn = rreq->dev.OnDataAvail;
                if (!reqFn)
                {
759
                    /* MT FIXME-N1 */
760
#if !(defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_PER_OBJECT))
761
                    MPIU_Assert(MPIDI_Request_get_type(rreq) != MPIDI_REQUEST_TYPE_GET_RESP);
762
#endif
763
764
765
766
                    MPIDI_CH3U_Request_complete(rreq);
                    complete = TRUE;
                }
                else
767
                {
768
769
770
                    mpi_errno = reqFn(vc, rreq, &complete);
                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                }
771

772
773
774
775
776
777
778
779
780
781
                if (!complete)
                {
                    rreq->dev.iov_offset = 0;
                    MPIU_Assert(rreq->dev.iov_count > 0 && rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_LEN > 0);
                    vc_ch->recv_active = rreq;
                    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...not complete");
                }
                else
                {
                    MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "...complete");
782
                    vc_ch->recv_active = NULL;
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
                }
            }
        }
    }
    while (buflen);

 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_HANDLE_PKT);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}


#define set_request_info(rreq_, pkt_, msg_type_)                \
{                                                               \
    (rreq_)->status.MPI_SOURCE = (pkt_)->match.rank;            \
    (rreq_)->status.MPI_TAG = (pkt_)->match.tag;                \
Pavan Balaji's avatar
Pavan Balaji committed
801
    MPIR_STATUS_SET_COUNT((rreq_)->status, (pkt_)->data_sz);		\
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
    (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id;         \
    (rreq_)->dev.recv_data_sz = (pkt_)->data_sz;                \
    MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum);          \
    MPIDI_Request_set_msg_type((rreq_), (msg_type_));           \
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_init
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_init(void)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);

819
    MPIU_THREAD_CHECK_BEGIN
820
821
    /* FIXME should be appropriately abstracted somehow */
#   if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
822
    {
823
824
825
        int err;
	MPID_Thread_cond_create(&MPIDI_CH3I_progress_completion_cond, &err);
        MPIU_Assert(err == 0);
826
827
828
829
    }
#   endif
    MPIU_THREAD_CHECK_END

830
831
832
833
    MPIDI_CH3I_shm_sendq.head = NULL;
    MPIDI_CH3I_shm_sendq.tail = NULL;
    MPIDI_CH3I_shm_active_send = NULL;
    
834
    /* Initialize the code to handle incoming packets */
835
836
837
838
    if (PKTARRAY_SIZE <= MPIDI_NEM_PKT_END) {
        MPIU_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_INTERN, "**ch3|pktarraytoosmall");
    }
    /* pkt handlers from CH3 */
839
840
    mpi_errno = MPIDI_CH3_PktHandler_Init(pktArray, PKTARRAY_SIZE);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
841
842

    /* pkt handlers for LMT */
843
844
    mpi_errno = MPID_nem_lmt_pkthandler_init(pktArray, PKTARRAY_SIZE);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
845
846
847
848
849
    
#ifdef ENABLE_CHECKPOINTING
    mpi_errno = MPIDI_nem_ckpt_pkthandler_init(pktArray, PKTARRAY_SIZE);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
#endif
850

851
852
    /* other pkt handlers */
    pktArray[MPIDI_NEM_PKT_NETMOD] = pkt_NETMOD_handler;
853
   
854
#ifdef HAVE_SIGNAL
855
856
857
    /* install signal handler for process failure notifications from hydra */
    prev_sighandler = signal(SIGUSR1, sigusr1_handler);
    MPIU_ERR_CHKANDJUMP1(prev_sighandler == SIG_ERR, mpi_errno, MPI_ERR_OTHER, "**signal", "**signal %s", MPIU_Strerror(errno));
858
    if (prev_sighandler == SIG_IGN || prev_sighandler == SIG_DFL)
859
        prev_sighandler = NULL;
860
861
#endif

862
863
864
865
866
867
868
869
870
871
872
873
874
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_INIT);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_finalize
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_finalize(void)
{
875
    int mpi_errno = MPI_SUCCESS;
876
    qn_ent_t *ent;
877
878
879
880
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);

881
882
883
884
885
886
    while(qn_head) {
        ent = qn_head->next;
        MPIU_Free(qn_head);
        qn_head = ent;
    }

887
 fn_exit:
888
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_FINALIZE);
889
    return mpi_errno;
890
891
 fn_fail:
    goto fn_exit;
892
893
}

894
895
896
897
898
899
900
901
902
903
904
905
#undef FUNCNAME
#define FUNCNAME shm_connection_terminated
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int shm_connection_terminated(MPIDI_VC_t * vc)
{
    /* This function is called after all sends have completed */
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_SHM_CONNECTION_TERMINATED);

    MPIDI_FUNC_ENTER(MPID_STATE_SHM_CONNECTION_TERMINATED);

906
907
    if (vc->ch.lmt_vc_terminated) {
        mpi_errno = vc->ch.lmt_vc_terminated(vc);
908
909
910
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    
911
    mpi_errno = MPIU_SHMW_Hnd_finalize(&(vc->ch.lmt_copy_buf_handle));
912
    if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
913
    mpi_errno = MPIU_SHMW_Hnd_finalize(&(vc->ch.lmt_recv_copy_buf_handle));
914
915
916
917
918
919
920
921
922
923
924
925
926
    if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
    
    mpi_errno = MPIDI_CH3U_Handle_connection(vc, MPIDI_VC_EVENT_TERMINATED);
    if(mpi_errno) MPIU_ERR_POP(mpi_errno);

    MPIU_DBG_MSG_D(CH3_DISCONNECT, TYPICAL, "Terminated VC %d", vc->pg_rank);
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_SHM_CONNECTION_TERMINATED);
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

927
928
929
930
931

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_Connection_terminate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
932
int MPIDI_CH3_Connection_terminate(MPIDI_VC_t * vc)
933
934
{
    int mpi_errno = MPI_SUCCESS;
935
    MPIU_CHKPMEM_DECL(1);
936
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_CONNECTION_TERMINATE);
937

938
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_CONNECTION_TERMINATE);
939
940
941
942
943
944
945
946

    MPIU_DBG_MSG_D(CH3_DISCONNECT, TYPICAL, "Terminating VC %d", vc->pg_rank);

    /* if this is already closed, exit */
    if (vc->state == MPIDI_VC_STATE_MORIBUND ||
        vc->state == MPIDI_VC_STATE_INACTIVE_CLOSED)
        goto fn_exit;

947
    if (vc->ch.is_local) {
948
        MPIU_DBG_MSG(CH3_DISCONNECT, TYPICAL, "VC is local");
949

950
951
952
953
954
955
956
        if (vc->state != MPIDI_VC_STATE_CLOSED) {
            /* VC is terminated as a result of a fault.  Complete
               outstanding sends with an error and terminate
               connection immediately. */
            MPIU_DBG_MSG(CH3_DISCONNECT, TYPICAL, "VC terminated due to fault");
            mpi_errno = MPIDI_CH3I_Complete_sendq_with_error(vc);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
957

958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
            mpi_errno = shm_connection_terminated(vc);
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
        } else {
            /* VC is terminated as a result of the close protocol.
               Wait for sends to complete, then terminate. */

            if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
                /* The sendq is empty, so we can immediately terminate
                   the connection. */
                MPIU_DBG_MSG(CH3_DISCONNECT, TYPICAL, "Shm send queue empty, terminating immediately");
                mpi_errno = shm_connection_terminated(vc);
                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
            } else {
                /* There may be sends from this VC on the send queue.
                   Since there is one send queue, we don't want to
                   search the queue to find the last send from this
                   VC.  Instead, we use the last send in the queue,
                   regardless of which VC it's from.  When that send
                   completes, (since no new messages are sent on this
                   VC anymore) we know that all sends on this VC must
                   have completed.  */
                vc_term_element_t *ep;
                MPIU_DBG_MSG(CH3_DISCONNECT, TYPICAL, "Shm send queue not empty, waiting to terminate");
                MPIU_CHKPMEM_MALLOC(ep, vc_term_element_t *, sizeof(vc_term_element_t), mpi_errno, "vc_term_element");
                ep->vc = vc;
                ep->req = MPIDI_CH3I_shm_sendq.tail;
                MPIR_Request_add_ref(ep->req); /* make sure this doesn't get released before we can check it */
                TERMQ_ENQUEUE(ep);
            }
        }
    
    } else {
        MPIU_DBG_MSG(CH3_DISCONNECT, TYPICAL, "VC is remote");
        mpi_errno = MPID_nem_netmod_func->vc_terminate(vc);
        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    }
    
 fn_exit:
    MPIU_CHKPMEM_COMMIT();
997
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_CONNECTION_TERMINATE);
998
    return mpi_errno;
999
1000
 fn_fail:
    MPIU_CHKPMEM_REAP();
1001
1002
1003
1004
    goto fn_exit;
}
/* end MPIDI_CH3_Connection_terminate() */

1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Complete_sendq_with_error
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIDI_CH3I_Complete_sendq_with_error(MPIDI_VC_t * vc)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_Request *req, *prev;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);

1017
    req = MPIDI_CH3I_shm_sendq.head;
1018
1019
1020
1021
1022
1023
1024
    prev = NULL;
    while (req) {
        if (req->ch.vc == vc) {
            MPID_Request *next = req->dev.next;
            if (prev)
                prev->dev.next = next;
            else
1025
1026
1027
                MPIDI_CH3I_shm_sendq.head = next;
            if (MPIDI_CH3I_shm_sendq.tail == req)
                MPIDI_CH3I_shm_sendq.tail = prev;
1028
1029

            req->status.MPI_ERROR = MPI_SUCCESS;
1030
            MPIU_ERR_SET1(req->status.MPI_ERROR, MPIX_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", vc->pg_rank);
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
            
            MPID_Request_release(req); /* ref count was incremented when added to queue */
            MPIDI_CH3U_Request_complete(req);
            req = next;
        } else {
            prev = req;
            req = req->dev.next;
        }
    }

1041
 fn_exit:
1042
1043
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);
    return mpi_errno;
1044
1045
 fn_fail:
    goto fn_exit;
1046
1047
1048
}


1049

1050
1051
1052
1053
1054
1055
1056
1057
#undef FUNCNAME
#define FUNCNAME pkt_NETMOD_handler
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int pkt_NETMOD_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp)
{
    int mpi_errno = MPI_SUCCESS;
    MPID_nem_pkt_netmod_t * const netmod_pkt = (MPID_nem_pkt_netmod_t *)pkt;
1058
    MPIDI_CH3I_VC *vc_ch = &vc->ch;
1059
1060
1061
1062
    MPIDI_STATE_DECL(MPID_STATE_PKT_NETMOD_HANDLER);

    MPIDI_FUNC_ENTER(MPID_STATE_PKT_NETMOD_HANDLER);

Darius Buntinas's avatar
Darius Buntinas committed
1063
    MPIU_Assert_fmt_msg(vc_ch->pkt_handler && netmod_pkt->subtype < vc_ch->num_pkt_handlers, ("no handler defined for netmod-local packet"));
1064
1065
1066

    mpi_errno = vc_ch->pkt_handler[netmod_pkt->subtype](vc, pkt, buflen, rreqp);

1067
fn_exit:
1068
1069
1070
1071
    MPIDI_FUNC_EXIT(MPID_STATE_PKT_NETMOD_HANDLER);
    return mpi_errno;
}

1072
1073

#undef FUNCNAME
1074
#define FUNCNAME MPIDI_CH3I_Register_anysource_notification
1075
1076
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
1077
int MPIDI_CH3I_Register_anysource_notification(void (*enqueue_fn)(MPID_Request *rreq), int (*dequeue_fn)(MPID_Request *rreq))
1078
1079
{
    int mpi_errno = MPI_SUCCESS;
1080
1081
    qn_ent_t *ent;
    MPIU_CHKPMEM_DECL(1);
1082

1083
1084
1085
1086
1087
1088
    MPIU_CHKPMEM_MALLOC(ent, qn_ent_t *, sizeof(qn_ent_t), mpi_errno, "queue entry");

    ent->enqueue_fn = enqueue_fn;
    ent->dequeue_fn = dequeue_fn;
    ent->next = qn_head;
    qn_head = ent;
1089
1090

 fn_exit:
1091
    MPIU_CHKPMEM_COMMIT();
1092
1093
    return mpi_errno;
 fn_fail:
1094
    MPIU_CHKPMEM_REAP();
1095
1096
1097
    goto fn_exit;
}

1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Anysource_posted
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static void anysource_posted(MPID_Request *rreq)
{
    qn_ent_t *ent = qn_head;

    /* call all of the registered handlers */
    while (ent)
    {
        if (ent->enqueue_fn)
        {
            ent->enqueue_fn(rreq);
        }
        ent = ent->next;
    }
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Anysource_matched
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int anysource_matched(MPID_Request *rreq)
{
    int matched = FALSE;
    qn_ent_t *ent = qn_head;

    /* call all of the registered handlers */
    while(ent) {
        if (ent->dequeue_fn)
        {
            int m;
            
            m = ent->dequeue_fn(rreq);
            
            /* this is a crude check to check if the req has been
               matched by more than one netmod.  When MPIU_Assert() is
               defined to empty, the extra matched=m is optimized
               away. */
            MPIU_Assert(!m || !matched);
            matched = m;
        }
        ent = ent->next;
    }

    return matched;
}

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Posted_recv_enqueued
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
void MPIDI_CH3I_Posted_recv_enqueued(MPID_Request *rreq)
{
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_POSTED_RECV_ENQUEUED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_POSTED_RECV_ENQUEUED);

1157
1158
1159
    /* MT FIXME acquiring MPIDCOMM here violates lock ordering rules (see
     * mpiimplthread.h comments), easily causes deadlock */

1160
1161
1162
1163
1164
1165
1166
1167
    if ((rreq)->dev.match.parts.rank == MPI_ANY_SOURCE)
        /* call anysource handler */
	anysource_posted(rreq);
    else
    {
        int local_rank = -1;
	MPIDI_VC_t *vc;

1168
        /* MT FIXME does this macro need some sort of synchronization too? */
1169
	MPIDI_Comm_get_vc((rreq)->comm, (rreq)->dev.match.parts.rank, &vc);
1170

1171
#ifdef ENABLE_COMM_OVERRIDES
1172
1173
1174
        /* MT FIXME causes deadlock b/c of the MSGQUEUE/CH3COMM ordering (acquired
         * in reverse in some pkt handlers?) */
        MPIU_THREAD_CS_ENTER(CH3COMM,vc);
1175
1176
1177
        /* call vc-specific handler */
	if (vc->comm_ops && vc->comm_ops->recv_posted)
            vc->comm_ops->recv_posted(vc, rreq);
1178
        MPIU_THREAD_CS_EXIT(CH3COMM,vc);
1179
#endif
1180
1181
1182
1183
1184
1185
1186

        /* MT FIXME we unfortunately must disable this optimization for now in
         * per_object mode. There are possibly other ways to synchronize the
         * fboxes that won't cause lock-ordering deadlocks.  There might also be
         * ways to do this that don't require a hook on every request post, but
         * instead do some sort of caching or something analogous to branch
         * prediction. */
1187
#if !(defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_PER_OBJECT))
1188
        /* enqueue fastbox */
1189

1190
1191
1192
1193
1194
1195
        /* don't enqueue a fastbox for yourself */
        MPIU_Assert(rreq->comm != NULL);
        if (rreq->dev.match.parts.rank == rreq->comm->rank)
            goto fn_exit;

        /* don't enqueue non-local processes */
1196
        if (!vc->ch.is_local)
1197
1198
1199
1200
1201
1202
1203
            goto fn_exit;

        /* Translate the communicator rank to a local rank.  Note that there is an
           implicit assumption here that because is_local is true above, that these
           processes are in the same PG. */
        local_rank = MPID_NEM_LOCAL_RANK(vc->pg_rank);

1204
        MPID_nem_mpich_enqueue_fastbox(local_rank);
1205
#endif
1206
    }
1207

1208
1209
1210
1211
1212
 fn_exit:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_POSTED_RECV_ENQUEUED);
}

/* returns non-zero when req has been matched by channel */
1213
1214
1215
1216
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Posted_recv_dequeued
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
1217
int MPIDI_CH3I_Posted_recv_dequeued(MPID_Request *rreq)
1218
{
1219
1220
    int local_rank = -1;
    MPIDI_VC_t *vc;
1221
    int matched = FALSE;
1222
1223
1224
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_POSTED_RECV_DEQUEUED);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_POSTED_RECV_DEQUEUED);
1225

1226
1227
1228
1229
    if (rreq->dev.match.parts.rank == MPI_ANY_SOURCE)
    {
	matched = anysource_matched(rreq);
    }
1230
1231
1232
    /* MT FIXME we unfortunately must disable this optimization for now in
     * per_object mode. There are possibly other ways to synchronize the
     * fboxes that won't cause lock-ordering deadlocks */
1233
#if !(defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_PER_OBJECT))
1234
1235
1236
1237
    else
    {
        if (rreq->dev.match.parts.rank == rreq->comm->rank)
            goto fn_exit;
1238

1239
        /* don't use MPID_NEM_IS_LOCAL, it doesn't handle dynamic processes */
1240
        MPIDI_Comm_get_vc(rreq->comm, rreq->dev.match.parts.rank, &vc);
1241
        MPIU_Assert(vc != NULL);
1242
        if (!vc->ch.is_local)
1243
            goto fn_exit;
1244

1245
1246
1247
1248
        /* Translate the communicator rank to a local rank.  Note that there is an
           implicit assumption here that because is_local is true above, that these
           processes are in the same PG. */
        local_rank = MPID_NEM_LOCAL_RANK(vc->pg_rank);
1249

1250
        MPID_nem_mpich_dequeue_fastbox(local_rank);
1251
    }
1252
1253
#endif

1254
 fn_exit:
1255
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_POSTED_RECV_DEQUEUED);
1256
    return matched;
1257
}
1258