ch3u_eager.c 29.5 KB
Newer Older
1
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2
3
4
5
6
7
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
8
#include "mpidi_recvq_statistics.h"
9
10
11
12
13
14
15
16

/*
 * Send an eager message.  To optimize for the important, short contiguous
 * message case, there are separate routines for the contig and non-contig
 * datatype cases.
 */

#undef FUNCNAME
17
#define FUNCNAME MPIDI_CH3_SendNoncontig_iov
18
19
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
20
/* MPIDI_CH3_SendNoncontig_iov - Sends a message by loading an
21
22
   IOV and calling iSendv.  The caller must initialize
   sreq->dev.segment as well as segment_first and segment_size. */
23
24
int MPIDI_CH3_SendNoncontig_iov( MPIDI_VC_t *vc, MPID_Request *sreq,
                                 void *header, MPIDI_msg_sz_t hdr_sz )
25
26
27
28
{
    int mpi_errno = MPI_SUCCESS;
    int iov_n;
    MPID_IOV iov[MPID_IOV_LIMIT];
29
30
31
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_SENDNONCONTIG_IOV);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_SENDNONCONTIG_IOV);
32
33
34
35
36

    iov[0].MPID_IOV_BUF = header;
    iov[0].MPID_IOV_LEN = hdr_sz;

    iov_n = MPID_IOV_LIMIT - 1;
37

38
39
40
41
42
    mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n);
    if (mpi_errno == MPI_SUCCESS)
    {
	iov_n += 1;
	
43
44
	/* Note this routine is invoked withing a CH3 critical section */
	/* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
45
	mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iov_n);
46
	/* MPIU_THREAD_CS_EXIT(CH3COMM,vc); */
47
48
49
50
51
	/* --BEGIN ERROR HANDLING-- */
	if (mpi_errno != MPI_SUCCESS)
	{
	    MPIU_Object_set_ref(sreq, 0);
	    MPIDI_CH3_Request_destroy(sreq);
52
            MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|eagermsg");
53
54
55
56
57
58
59
60
61
62
63
	}
	/* --END ERROR HANDLING-- */

	/* Note that in the non-blocking case, we need to add a ref to the
	   datatypes */
    }
    else
    {
	/* --BEGIN ERROR HANDLING-- */
	MPIU_Object_set_ref(sreq, 0);
	MPIDI_CH3_Request_destroy(sreq);
64
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadsendiov");
65
66
67
68
69
	/* --END ERROR HANDLING-- */
    }


 fn_exit:
70
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_SENDNONCONTIG_IOV);
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    return mpi_errno;
 fn_fail:
    goto fn_exit;
}

/* This function will allocate a segment.  That segment must be freed when
   it is no longer needed */
#undef FUNCNAME
#define FUNCNAME MPIDI_EagerNoncontigSend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
/* MPIDI_CH3_EagerNoncontigSend - Eagerly send noncontiguous data */
int MPIDI_CH3_EagerNoncontigSend( MPID_Request **sreq_p, 
				  MPIDI_CH3_Pkt_type_t reqtype, 
				  const void * buf, int count, 
				  MPI_Datatype datatype, MPIDI_msg_sz_t data_sz,
				  int rank, 
				  int tag, MPID_Comm * comm, 
				  int context_offset )
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_VC_t * vc;
    MPID_Request *sreq = *sreq_p;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_eager_send_t * const eager_pkt = &upkt.eager_send;
    
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
                     "sending non-contiguous eager message, data_sz=" MPIDI_MSG_SZ_FMT,
					data_sz));
100
101
102
    sreq->dev.OnDataAvail = 0;
    sreq->dev.OnFinal = 0;

103
    MPIDI_Pkt_init(eager_pkt, reqtype);
104
105
106
    eager_pkt->match.parts.rank	= comm->rank;
    eager_pkt->match.parts.tag	= tag;
    eager_pkt->match.parts.context_id	= comm->context_id + context_offset;
107
108
109
    eager_pkt->sender_req_id	= MPI_REQUEST_NULL;
    eager_pkt->data_sz		= data_sz;
    
110
    MPIDI_Comm_get_vc_set_active(comm, rank, &vc);
111
112
113
114
115

    MPIDI_VC_FAI_send_seqnum(vc, seqnum);
    MPIDI_Pkt_set_seqnum(eager_pkt, seqnum);
    MPIDI_Request_set_seqnum(sreq, seqnum);

116
    MPIU_DBG_MSGPKT(vc,tag,eager_pkt->match.parts.context_id,rank,data_sz,
117
118
119
                    "Eager");
	    
    sreq->dev.segment_ptr = MPID_Segment_alloc( );
120
121
    MPIU_ERR_CHKANDJUMP1((sreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");

122
123
124
125
    MPID_Segment_init(buf, count, datatype, sreq->dev.segment_ptr, 0);
    sreq->dev.segment_first = 0;
    sreq->dev.segment_size = data_sz;
	    
126
    MPIU_THREAD_CS_ENTER(CH3COMM,vc);
127
    mpi_errno = vc->sendNoncontig_fn(vc, sreq, eager_pkt, 
128
                                     sizeof(MPIDI_CH3_Pkt_eager_send_t));
129
    MPIU_THREAD_CS_EXIT(CH3COMM,vc);
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

 fn_exit:
    return mpi_errno;
 fn_fail:
    *sreq_p = NULL;
    goto fn_exit;
}

/* Send a contiguous eager message.  We'll want to optimize (and possibly
   inline) this.

   Make sure that buf is at the beginning of the data to send; 
   adjust by adding dt_true_lb if necessary 
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_EagerContigSend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_EagerContigSend( MPID_Request **sreq_p, 
			       MPIDI_CH3_Pkt_type_t reqtype, 
			       const void * buf, MPIDI_msg_sz_t data_sz, int rank, 
			       int tag, MPID_Comm * comm, int context_offset )
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_VC_t * vc;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_eager_send_t * const eager_pkt = &upkt.eager_send;
    MPID_Request *sreq = *sreq_p;
    MPID_IOV iov[2];
    
    MPIDI_Pkt_init(eager_pkt, reqtype);
162
163
164
    eager_pkt->match.parts.rank	= comm->rank;
    eager_pkt->match.parts.tag	= tag;
    eager_pkt->match.parts.context_id	= comm->context_id + context_offset;
165
166
167
168
169
170
171
172
173
174
175
176
177
    eager_pkt->sender_req_id	= MPI_REQUEST_NULL;
    eager_pkt->data_sz		= data_sz;
    
    iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)eager_pkt;
    iov[0].MPID_IOV_LEN = sizeof(*eager_pkt);
    
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
	       "sending contiguous eager message, data_sz=" MPIDI_MSG_SZ_FMT,
					data_sz));
	    
    iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) buf;
    iov[1].MPID_IOV_LEN = data_sz;
    
178
    MPIDI_Comm_get_vc_set_active(comm, rank, &vc);
179
180
181
    MPIDI_VC_FAI_send_seqnum(vc, seqnum);
    MPIDI_Pkt_set_seqnum(eager_pkt, seqnum);
    
182
183
    MPIU_DBG_MSGPKT(vc,tag,eager_pkt->match.parts.context_id,rank,data_sz,"EagerContig");
    MPIU_THREAD_CS_ENTER(CH3COMM,vc);
184
    mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, 2, sreq_p);
185
    MPIU_THREAD_CS_EXIT(CH3COMM,vc);
186
    if (mpi_errno != MPI_SUCCESS) {
187
	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**ch3|eagermsg");
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    }

    sreq = *sreq_p;
    if (sreq != NULL)
    {
	MPIDI_Request_set_seqnum(sreq, seqnum);
	MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
    }

 fn_fail:
    return mpi_errno;
}

#ifdef USE_EAGER_SHORT
/* Send a short contiguous eager message.  We'll want to optimize (and possibly
   inline) this 

   Make sure that buf is at the beginning of the data to send; 
   adjust by adding dt_true_lb if necessary 

   We may need a nonblocking (cancellable) version of this, which will 
   have a smaller payload.
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_EagerContigShortSend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_EagerContigShortSend( MPID_Request **sreq_p, 
				    MPIDI_CH3_Pkt_type_t reqtype, 
				    const void * buf, MPIDI_msg_sz_t data_sz, int rank, 
				    int tag, MPID_Comm * comm, 
				    int context_offset )
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_VC_t * vc;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_eagershort_send_t * const eagershort_pkt = 
	&upkt.eagershort_send;
    MPID_Request *sreq = *sreq_p;
    
    /*    printf( "Sending short eager\n"); fflush(stdout); */
    MPIDI_Pkt_init(eagershort_pkt, reqtype);
230
231
232
    eagershort_pkt->match.parts.rank	     = comm->rank;
    eagershort_pkt->match.parts.tag	     = tag;
    eagershort_pkt->match.parts.context_id = comm->context_id + context_offset;
233
234
235
236
237
238
    eagershort_pkt->data_sz	     = data_sz;
    
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
       "sending contiguous short eager message, data_sz=" MPIDI_MSG_SZ_FMT,
					data_sz));
	    
239
    MPIDI_Comm_get_vc_set_active(comm, rank, &vc);
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
    MPIDI_VC_FAI_send_seqnum(vc, seqnum);
    MPIDI_Pkt_set_seqnum(eagershort_pkt, seqnum);

    /* Copy the payload. We could optimize this if data_sz & 0x3 == 0 
       (copy (data_sz >> 2) ints, inline that since data size is 
       currently limited to 4 ints */
    {
	unsigned char * restrict p = 
	    (unsigned char *)eagershort_pkt->data;
	unsigned char const * restrict bufp = (unsigned char *)buf;
	int i;
	for (i=0; i<data_sz; i++) {
	    *p++ = *bufp++;
	}
    }

256
    MPIU_DBG_MSGPKT(vc,tag,eagershort_pkt->match.parts.context_id,rank,data_sz,
257
		    "EagerShort");
258
    MPIU_THREAD_CS_ENTER(CH3COMM,vc);
259
    mpi_errno = MPIDI_CH3_iStartMsg(vc, eagershort_pkt, sizeof(*eagershort_pkt), sreq_p);
260
    MPIU_THREAD_CS_EXIT(CH3COMM,vc);
261
    if (mpi_errno != MPI_SUCCESS) {
262
	MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**ch3|eagermsg");
263
264
265
266
267
    }
    sreq = *sreq_p;
    if (sreq != NULL) {
	/*printf( "Surprise, did not complete send of eagershort (starting connection?)\n" ); 
	  fflush(stdout); */
268
269
270
271
        /* MT FIXME setting fields in the request after it has been given to the
         * progress engine is racy.  The start call above is protected by
         * CH3COMM:vc CS, but the progress engine is protected by MPIDCOMM.  So
         * we can't just extend CH3COMM below this point... what's the fix? */
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
	MPIDI_Request_set_seqnum(sreq, seqnum);
	MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
    }

 fn_fail:    
    return mpi_errno;
}

/* This is the matching handler for the EagerShort message defined above */

#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_EagerShortSend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_EagerShortSend( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, 
					 MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
    MPIDI_CH3_Pkt_eagershort_send_t * eagershort_pkt = &pkt->eagershort_send;
    MPID_Request * rreq;
    int found;
    int mpi_errno = MPI_SUCCESS;

294
295
    MPIU_THREAD_CS_ENTER(MSGQUEUE,);

296
297
298
    /* printf( "Receiving short eager!\n" ); fflush(stdout); */
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
	"received eagershort send pkt, rank=%d, tag=%d, context=%d",
299
300
301
	eagershort_pkt->match.parts.rank, 
	eagershort_pkt->match.parts.tag, 
	eagershort_pkt->match.parts.context_id));
302
	    
303
304
305
    MPIU_DBG_MSGPKT(vc,eagershort_pkt->match.parts.tag,
		    eagershort_pkt->match.parts.context_id,
		    eagershort_pkt->match.parts.rank,eagershort_pkt->data_sz,
306
307
		    "ReceivedEagerShort");
    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&eagershort_pkt->match, &found);
308
    MPIU_ERR_CHKANDJUMP1(!rreq, mpi_errno,MPI_ERR_OTHER, "**nomemreq", "**nomemuereq %d", MPIDI_CH3U_Recvq_count_unexp());
309

310
311
    (rreq)->status.MPI_SOURCE = (eagershort_pkt)->match.parts.rank;
    (rreq)->status.MPI_TAG    = (eagershort_pkt)->match.parts.tag;
Pavan Balaji's avatar
Pavan Balaji committed
312
    MPIR_STATUS_SET_COUNT((rreq)->status, (eagershort_pkt)->data_sz);
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
    (rreq)->dev.recv_data_sz  = (eagershort_pkt)->data_sz;
    MPIDI_Request_set_seqnum((rreq), (eagershort_pkt)->seqnum);
    /* FIXME: Why do we set the message type? */
    MPIDI_Request_set_msg_type((rreq), MPIDI_REQUEST_EAGER_MSG);

    /* This packed completes the reception of the indicated data.
       The packet handler returns null for a request that requires
       no further communication */
    *rreqp = NULL;
    *buflen = sizeof(MPIDI_CH3_Pkt_t);

    /* Extract the data from the packet */
    /* Note that if the data size if zero, we're already done */
    if (rreq->dev.recv_data_sz > 0) {
	if (found) {
	    int            dt_contig;
	    MPI_Aint       dt_true_lb;
	    MPIDI_msg_sz_t userbuf_sz;
	    MPID_Datatype *dt_ptr;
	    MPIDI_msg_sz_t data_sz;

	    /* Make sure that we handle the general (non-contiguous)
	       datatypes correctly while optimizing for the 
	       special case */
	    MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, 
				    dt_contig, userbuf_sz, dt_ptr, dt_true_lb);
		
	    if (rreq->dev.recv_data_sz <= userbuf_sz) {
		data_sz = rreq->dev.recv_data_sz;
	    }
	    else {
		MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
		    "receive buffer too small; message truncated, msg_sz=" 
					  MPIDI_MSG_SZ_FMT ", userbuf_sz="
				          MPIDI_MSG_SZ_FMT,
				 rreq->dev.recv_data_sz, userbuf_sz));
		rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, 
                     MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_TRUNCATE,
		     "**truncate", "**truncate %d %d %d %d", 
		     rreq->status.MPI_SOURCE, rreq->status.MPI_TAG, 
		     rreq->dev.recv_data_sz, userbuf_sz );
Pavan Balaji's avatar
Pavan Balaji committed
354
		MPIR_STATUS_SET_COUNT(rreq->status, userbuf_sz);
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
		data_sz = userbuf_sz;
	    }

	    if (dt_contig && data_sz == rreq->dev.recv_data_sz) {
		/* user buffer is contiguous and large enough to store the
		   entire message.  We can just copy the code */

		/* Copy the payload. We could optimize this 
		   if data_sz & 0x3 == 0 
		   (copy (data_sz >> 2) ints, inline that since data size is 
		   currently limited to 4 ints */
		{
		    unsigned char const * restrict p = 
			(unsigned char *)eagershort_pkt->data;
		    unsigned char * restrict bufp = 
			(unsigned char *)(char*)(rreq->dev.user_buf) + 
			dt_true_lb;
		    int i;
		    for (i=0; i<data_sz; i++) {
			*bufp++ = *p++;
		    }
		}
		/* FIXME: We want to set the OnDataAvail to the appropriate 
		   function, which depends on whether this is an RMA 
		   request or a pt-to-pt request. */
		rreq->dev.OnDataAvail = 0;
		/* The recv_pending_count must be one here (!) because of
		   the way the pending count is queried.  We may want 
		   to fix this, but it will require a sweep of the code */
	    }
	    else {
386
387
		MPIDI_msg_sz_t recv_data_sz;
		MPI_Aint last;
388
389
390
		/* user buffer is not contiguous.  Use the segment
		   code to unpack it, handling various errors and 
		   exceptional cases */
391
		/* FIXME: The MPICH tests do not exercise this branch */
392
393
		/* printf( "Surprise!\n" ); fflush(stdout);*/
		rreq->dev.segment_ptr = MPID_Segment_alloc( );
394
395
                MPIU_ERR_CHKANDJUMP1((rreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");

396
397
398
		MPID_Segment_init(rreq->dev.user_buf, rreq->dev.user_count, 
				  rreq->dev.datatype, rreq->dev.segment_ptr, 0);

399
400
		recv_data_sz = rreq->dev.recv_data_sz;
		last    = recv_data_sz;
401
402
		MPID_Segment_unpack( rreq->dev.segment_ptr, 0, 
				     &last, eagershort_pkt->data );
403
		if (last != recv_data_sz) {
404
405
406
407
		    /* --BEGIN ERROR HANDLING-- */
		    /* There are two cases:  a datatype mismatch (could
		       not consume all data) or a too-short buffer. We
		       need to distinguish between these two types. */
408
		    MPIR_STATUS_SET_COUNT(rreq->status, last);
409
410
411
412
413
414
415
416
417
		    if (rreq->dev.recv_data_sz <= userbuf_sz) {
			MPIU_ERR_SETSIMPLE(rreq->status.MPI_ERROR,MPI_ERR_TYPE,
					   "**dtypemismatch");
		    }
		    /* --END ERROR HANDLING-- */
		}
		rreq->dev.OnDataAvail = 0;
	    }
	}
418
419
420
421
422
423
424
425
	else { /* (!found) */
            /* MT note: unexpected rreq is currently protected by MSGQUEUE CS */

            /* FIXME the AEU branch gives (rreq->cc==1) but the complete at the
             * bottom of this function will decr it.  Is everything going to be
             * cool in this case?  No upper layer has a pointer to rreq yet
             * (it's unexpected and freshly allocated) 
             */
426
	    MPIDI_msg_sz_t recv_data_sz;
427
428
429
430
431
432
433
434
435
436
437
	    /* This is easy; copy the data into a temporary buffer.
	       To begin with, we use the same temporary location as
	       is used in receiving eager unexpected data.
	     */
	    /* FIXME: When eagershort is enabled, provide a preallocated
               space for short messages (which is used even if eager short
	       is not used), since we don't want to have a separate check
	       to figure out which buffer we're using (or perhaps we should 
	       have a free-buffer-pointer, which can be null if it isn't
               a buffer that we've allocated). */
	    /* printf( "Allocating into tmp\n" ); fflush(stdout); */
438
	    recv_data_sz = rreq->dev.recv_data_sz;
439
        MPIR_T_PVAR_LEVEL_INC(RECVQ, unexpected_recvq_buffer_size, recv_data_sz);
440
	    rreq->dev.tmpbuf = MPIU_Malloc(recv_data_sz);
441
442
443
	    if (!rreq->dev.tmpbuf) {
		MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
	    }
444
445
446
	    rreq->dev.tmpbuf_sz = recv_data_sz;
 	    /* Copy the payload. We could optimize this if recv_data_sz & 0x3 == 0 
	       (copy (recv_data_sz >> 2) ints, inline that since data size is 
447
	       currently limited to 4 ints */
448
449
450
451
452
453
454
455
            /* We actually could optimize this a lot of ways, including just
             * putting a memcpy here.  Modern compilers will inline fast
             * versions of the memcpy here (__builtin_memcpy, etc).  Another
             * option is a classic word-copy loop with a switch block at the end
             * for a remainder.  Alternatively a Duff's device loop could work.
             * Any replacement should be profile driven, and no matter what
             * we're likely to pick something suboptimal for at least one
             * compiler out there. [goodell@ 2012-02-10] */
456
457
458
459
460
461
	    {
		unsigned char const * restrict p = 
		    (unsigned char *)eagershort_pkt->data;
		unsigned char * restrict bufp = 
		    (unsigned char *)rreq->dev.tmpbuf;
		int i;
462
		for (i=0; i<recv_data_sz; i++) {
463
464
465
466
467
468
469
		    *bufp++ = *p++;
		}
	    }
	    /* printf( "Unexpected eager short\n" ); fflush(stdout); */
	    /* These next two indicate that once matched, there is
	       one more step (the unpack into the user buffer) to perform. */
	    rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_UnpackUEBufComplete;
470
471
472

            /* normally starts at 2, but we are implicitly decrementing it
             * because all of the data arrived in the pkt (see mpidpre.h) */
473
474
475
476
477
478
479
480
481
	    rreq->dev.recv_pending_count = 1;
	}

	if (mpi_errno != MPI_SUCCESS) {
	    MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv",
		     "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGERSHORT_SEND");
	}
    }

482
483
484
485
486
487
488
489
    /* The request is still complete (in the sense of having all data), decr the
     * cc and kick the progress engine. */
    /* MT note: when multithreaded, completing a request (cc==0) also signifies
     * that an upper layer may acquire exclusive ownership of the request, so
     * all rreq field modifications must be complete at this point.  This macro
     * also kicks the progress engine, which was previously done here via
     * MPIDI_CH3_Progress_signal_completion(). */
    MPIDI_CH3U_Request_complete(rreq);
490
491

 fn_fail:
492
493
494
    /* MT note: it may be possible to narrow this CS after careful
     * consideration.  Note though that the (!found) case must be wholly
     * protected by this CS. */
495
    MPIU_THREAD_CS_EXIT(MSGQUEUE,);
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
    return mpi_errno;
}

#endif

/* Send a contiguous eager message that can be cancelled (e.g., 
   a nonblocking eager send).  We'll want to optimize (and possibly
   inline) this 

   Make sure that buf is at the beginning of the data to send; 
   adjust by adding dt_true_lb if necessary 
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_EagerContigIsend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_EagerContigIsend( MPID_Request **sreq_p, 
				MPIDI_CH3_Pkt_type_t reqtype, 
				const void * buf, MPIDI_msg_sz_t data_sz, int rank, 
				int tag, MPID_Comm * comm, int context_offset )
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_VC_t * vc;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_eager_send_t * const eager_pkt = &upkt.eager_send;
    MPID_Request *sreq = *sreq_p;
    MPID_IOV iov[MPID_IOV_LIMIT];

    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
	       "sending contiguous eager message, data_sz=" MPIDI_MSG_SZ_FMT,
					data_sz));
	    
    sreq->dev.OnDataAvail = 0;
    
    MPIDI_Pkt_init(eager_pkt, reqtype);
531
532
533
    eager_pkt->match.parts.rank	= comm->rank;
    eager_pkt->match.parts.tag	= tag;
    eager_pkt->match.parts.context_id	= comm->context_id + context_offset;
534
535
536
537
538
539
540
541
542
    eager_pkt->sender_req_id	= sreq->handle;
    eager_pkt->data_sz		= data_sz;
    
    iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)eager_pkt;
    iov[0].MPID_IOV_LEN = sizeof(*eager_pkt);
    
    iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) buf;
    iov[1].MPID_IOV_LEN = data_sz;
    
543
    MPIDI_Comm_get_vc_set_active(comm, rank, &vc);
544
545
546
547
    MPIDI_VC_FAI_send_seqnum(vc, seqnum);
    MPIDI_Pkt_set_seqnum(eager_pkt, seqnum);
    MPIDI_Request_set_seqnum(sreq, seqnum);
    
548
549
    MPIU_DBG_MSGPKT(vc,tag,eager_pkt->match.parts.context_id,rank,data_sz,"EagerIsend");
    MPIU_THREAD_CS_ENTER(CH3COMM,vc);
550
    mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, 2);
551
    MPIU_THREAD_CS_EXIT(CH3COMM,vc);
552
553
554
555
556
557
    /* --BEGIN ERROR HANDLING-- */
    if (mpi_errno != MPI_SUCCESS)
    {
	MPIU_Object_set_ref(sreq, 0);
	MPIDI_CH3_Request_destroy(sreq);
	*sreq_p = NULL;
558
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|eagermsg");
559
560
561
562
563
    }
    /* --END ERROR HANDLING-- */
    
 fn_exit:
    return mpi_errno;
564
565
 fn_fail:
    goto fn_exit;
566
567
568
569
570
571
572
573
574
575
}

/* 
 * Here are the routines that are called by the progress engine to handle
 * the various rendezvous message requests (cancel of sends is in 
 * mpid_cancel_send.c).
 */    

#define set_request_info(rreq_, pkt_, msg_type_)		\
{								\
576
577
    (rreq_)->status.MPI_SOURCE = (pkt_)->match.parts.rank;	\
    (rreq_)->status.MPI_TAG = (pkt_)->match.parts.tag;		\
Pavan Balaji's avatar
Pavan Balaji committed
578
    MPIR_STATUS_SET_COUNT((rreq_)->status, (pkt_)->data_sz);		\
579
580
581
582
583
584
585
586
587
    (rreq_)->dev.sender_req_id = (pkt_)->sender_req_id;		\
    (rreq_)->dev.recv_data_sz = (pkt_)->data_sz;		\
    MPIDI_Request_set_seqnum((rreq_), (pkt_)->seqnum);		\
    MPIDI_Request_set_msg_type((rreq_), (msg_type_));		\
}

/* FIXME: This is not optimized for short messages, which 
   should have the data in the same packet when the data is
   particularly short (e.g., one 8 byte long word) */
588
589
590
591
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_EagerSend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
592
593
594
595
596
597
598
599
600
601
int MPIDI_CH3_PktHandler_EagerSend( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, 
				    MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
    MPIDI_CH3_Pkt_eager_send_t * eager_pkt = &pkt->eager_send;
    MPID_Request * rreq;
    int found;
    int complete;
    char *data_buf;
    MPIDI_msg_sz_t data_len;
    int mpi_errno = MPI_SUCCESS;
602
603
604

    MPIU_THREAD_CS_ENTER(MSGQUEUE,);

605
606
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
	"received eager send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",
607
608
609
610
611
	eager_pkt->sender_req_id, eager_pkt->match.parts.rank, 
	eager_pkt->match.parts.tag, eager_pkt->match.parts.context_id));
    MPIU_DBG_MSGPKT(vc,eager_pkt->match.parts.tag,
		    eager_pkt->match.parts.context_id,
		    eager_pkt->match.parts.rank,eager_pkt->data_sz,
612
613
614
		    "ReceivedEager");
	    
    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&eager_pkt->match, &found);
615
    MPIU_ERR_CHKANDJUMP1(!rreq, mpi_errno,MPI_ERR_OTHER, "**nomemreq", "**nomemuereq %d", MPIDI_CH3U_Recvq_count_unexp());
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
    
    set_request_info(rreq, eager_pkt, MPIDI_REQUEST_EAGER_MSG);
    
    data_len = ((*buflen - sizeof(MPIDI_CH3_Pkt_t) >= rreq->dev.recv_data_sz)
                ? rreq->dev.recv_data_sz : *buflen - sizeof(MPIDI_CH3_Pkt_t));
    data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t);
    
    if (rreq->dev.recv_data_sz == 0) {
        /* return the number of bytes processed in this function */
        *buflen = sizeof(MPIDI_CH3_Pkt_t);
	MPIDI_CH3U_Request_complete(rreq);
	*rreqp = NULL;
    }
    else {
	if (found) {
	    mpi_errno = MPIDI_CH3U_Receive_data_found( rreq, data_buf,
                                                       &data_len, &complete );
	}
	else {
	    mpi_errno = MPIDI_CH3U_Receive_data_unexpected( rreq, data_buf,
                                                            &data_len, &complete );
	}

	if (mpi_errno != MPI_SUCCESS) {
	    MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, "**ch3|postrecv",
			     "**ch3|postrecv %s", "MPIDI_CH3_PKT_EAGER_SEND");
	}

        /* return the number of bytes processed in this function */
        *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;

        if (complete) 
        {
            MPIDI_CH3U_Request_complete(rreq);
            *rreqp = NULL;
        }
        else
        {
            *rreqp = rreq;
        }
    }

 fn_fail:
659
    MPIU_THREAD_CS_EXIT(MSGQUEUE,);
660
661
662
663
    return mpi_errno;
}


664
665
666
667
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_ReadySend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
668
669
670
671
672
673
674
675
676
677
678
679
680
int MPIDI_CH3_PktHandler_ReadySend( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
				    MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
    MPIDI_CH3_Pkt_ready_send_t * ready_pkt = &pkt->ready_send;
    MPID_Request * rreq;
    int found;
    int complete;
    char *data_buf;
    MPIDI_msg_sz_t data_len;
    int mpi_errno = MPI_SUCCESS;
    
    MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
	"received ready send pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",
681
682
683
684
685
686
687
			ready_pkt->sender_req_id, 
			ready_pkt->match.parts.rank, 
                        ready_pkt->match.parts.tag, 
			ready_pkt->match.parts.context_id));
    MPIU_DBG_MSGPKT(vc,ready_pkt->match.parts.tag,
		    ready_pkt->match.parts.context_id,
		    ready_pkt->match.parts.rank,ready_pkt->data_sz,
688
689
690
		    "ReceivedReady");
	    
    rreq = MPIDI_CH3U_Recvq_FDP_or_AEU(&ready_pkt->match, &found);
691
    MPIU_ERR_CHKANDJUMP1(!rreq, mpi_errno,MPI_ERR_OTHER, "**nomemreq", "**nomemuereq %d", MPIDI_CH3U_Recvq_count_unexp());
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
    
    set_request_info(rreq, ready_pkt, MPIDI_REQUEST_EAGER_MSG);
    
    data_len = ((*buflen - sizeof(MPIDI_CH3_Pkt_t) >= rreq->dev.recv_data_sz)
                ? rreq->dev.recv_data_sz : *buflen - sizeof(MPIDI_CH3_Pkt_t));
    data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t);
    
    if (found) {
	if (rreq->dev.recv_data_sz == 0) {
            /* return the number of bytes processed in this function */
            *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;;
	    MPIDI_CH3U_Request_complete(rreq);
	    *rreqp = NULL;
	}
	else {
	    mpi_errno = MPIDI_CH3U_Receive_data_found(rreq, data_buf, &data_len,
                                                      &complete);
	    if (mpi_errno != MPI_SUCCESS) {
		MPIU_ERR_SETANDJUMP1(mpi_errno,MPI_ERR_OTHER, 
				     "**ch3|postrecv",
				     "**ch3|postrecv %s", 
				     "MPIDI_CH3_PKT_READY_SEND");
	    }

            /* return the number of bytes processed in this function */
            *buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;

            if (complete) 
            {
                MPIDI_CH3U_Request_complete(rreq);
                *rreqp = NULL;
            }
            else
            {
                *rreqp = rreq;
            }
	}
    }
    else
    {
	/* FIXME: an error packet should be sent back to the sender 
	   indicating that the ready-send failed.  On the send
	   side, the error handler for the communicator can be invoked
	   even if the ready-send request has already
	   completed. */
	
	/* We need to consume any outstanding associated data and 
	   mark the request with an error. */
	
	rreq->status.MPI_ERROR = MPIR_Err_create_code(MPI_SUCCESS, 
				      MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, 
				      MPI_ERR_OTHER, "**rsendnomatch", 
				      "**rsendnomatch %d %d", 
745
746
				      ready_pkt->match.parts.rank,
				      ready_pkt->match.parts.tag);
Pavan Balaji's avatar
Pavan Balaji committed
747
	MPIR_STATUS_SET_COUNT(rreq->status, 0);
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
	if (rreq->dev.recv_data_sz > 0)
	{
	    /* force read of extra data */
	    *rreqp = rreq;
	    rreq->dev.segment_first = 0;
	    rreq->dev.segment_size = 0;
	    mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
	    if (mpi_errno != MPI_SUCCESS) {
		MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,
				    "**ch3|loadrecviov");
	    }
	}
	else
	{
	    /* mark data transfer as complete and decrement CC */
	    MPIDI_CH3U_Request_complete(rreq);
	    *rreqp = NULL;
	}
        /* we didn't process anything but the header in this case */
        *buflen = sizeof(MPIDI_CH3_Pkt_t);
    }
 fn_fail:
    return mpi_errno;
}


/*
 * Define the routines that can print out the cancel packets if 
 * debugging is enabled.
 */
#ifdef MPICH_DBG_OUTPUT
int MPIDI_CH3_PktPrint_EagerSend( FILE *fp, MPIDI_CH3_Pkt_t *pkt )
{
    MPIU_DBG_PRINTF((" type ......... EAGER_SEND\n"));
    MPIU_DBG_PRINTF((" sender_reqid . 0x%08X\n", pkt->eager_send.sender_req_id));
783
784
785
    MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->eager_send.match.parts.context_id));
    MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->eager_send.match.parts.tag));
    MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->eager_send.match.parts.rank));
786
787
788
789
790
791
    MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->eager_send.data_sz));
#ifdef MPID_USE_SEQUENCE_NUMBERS
    MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->eager_send.seqnum));
#endif
}

792
#if defined(USE_EAGER_SHORT)
793
794
795
796
797
int MPIDI_CH3_PktPrint_EagerShortSend( FILE *fp, MPIDI_CH3_Pkt_t *pkt )
{
    int datalen;
    unsigned char *p = (unsigned char *)pkt->eagershort_send.data;
    MPIU_DBG_PRINTF((" type ......... EAGERSHORT_SEND\n"));
798
799
800
    MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->eagershort_send.match.parts.context_id));
    MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->eagershort_send.match.parts.tag));
    MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->eagershort_send.match.parts.rank));
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
    MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->eagershort_send.data_sz));
#ifdef MPID_USE_SEQUENCE_NUMBERS
    MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->eagershort_send.seqnum));
#endif
    datalen = pkt->eagershort_send.data_sz;
    if (datalen > 0) {
	char databytes[64+1];
	int i;
	if (datalen > 32) datalen = 32;
	for (i=0; i<datalen; i++) {
	    MPIU_Snprintf( &databytes[2*i], 64 - 2*i, "%2x", p[i] );
	}
	MPIU_DBG_PRINTF((" data ......... %s\n", databytes));
    }
}
816
#endif /* defined(USE_EAGER_SHORT) */
817
818
819
820
821

int MPIDI_CH3_PktPrint_ReadySend( FILE *fp, MPIDI_CH3_Pkt_t *pkt )
{
    MPIU_DBG_PRINTF((" type ......... READY_SEND\n"));
    MPIU_DBG_PRINTF((" sender_reqid . 0x%08X\n", pkt->ready_send.sender_req_id));
822
823
824
    MPIU_DBG_PRINTF((" context_id ... %d\n", pkt->ready_send.match.parts.context_id));
    MPIU_DBG_PRINTF((" tag .......... %d\n", pkt->ready_send.match.parts.tag));
    MPIU_DBG_PRINTF((" rank ......... %d\n", pkt->ready_send.match.parts.rank));
825
826
827
828
829
830
831
    MPIU_DBG_PRINTF((" data_sz ...... %d\n", pkt->ready_send.data_sz));
#ifdef MPID_USE_SEQUENCE_NUMBERS
    MPIU_DBG_PRINTF((" seqnum ....... %d\n", pkt->ready_send.seqnum));
#endif
}

#endif /* MPICH_DBG_OUTPUT */