bsendutil.c 20.3 KB
Newer Older
1
/* -*- MODE: C; c-basic-offset:4 ; -*- */
2
3
4
5
6
7
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpiimpl.h"
8
#include "mpibsend.h"
9
10
11
12
#include "bsendutil.h"

/*
 * Miscellaneous comments
13
14
15
16
17
 * By storing total_size along with "size available for messages", we
 * avoid any complexities associated with alignment, since we must
 * ensure that each KPIR_Bsend_data_t structure is properly aligned
 * (i.e., we can't simply do (sizeof(MPIR_Bsend_data_t) + size) to get
 * total_size).
18
19
20
21
22
23
 *
 * Function Summary
 *   MPIR_Bsend_attach - Performs the work of MPI_Buffer_attach
 *   MPIR_Bsend_detach - Performs the work of MPI_Buffer_detach
 *   MPIR_Bsend_isend  - Essentially performs an MPI_Ibsend.  Returns
 *                an MPID_Request that is also stored internally in the
24
 *                corresponding MPIR_Bsend_data_t entry
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 *   MPIR_Bsend_free_segment - Free a buffer that is no longer needed,
 *                merging with adjacent segments
 *   MPIR_Bsend_check_active - Check for completion of any pending sends
 *                for bsends (all bsends, both MPI_Ibsend and MPI_Bsend,
 *                are internally converted into Isends on the data
 *                in the Bsend buffer)
 *   MPIR_Bsend_retry_pending - Routine for future use to handle the
 *                case where an Isend cannot be initiated.
 *   MPIR_Bsend_find_buffer - Find a buffer in the bsend buffer large 
 *                enough for the message.  However, does not acquire that
 *                buffer (see MPIR_Bsend_take_buffer)
 *   MPIR_Bsend_take_buffer - Find and acquire a buffer for a message
 *   MPIR_Bsend_finalize - Finalize handler when Bsend routines are used 
 *   MPIR_Bsend_dump - Debugging routine to print the contents of the control
39
 *                information in the bsend buffer (the MPIR_Bsend_data_t entries)
40
41
42
43
44
45
 */

#ifdef USE_DBG_LOGGING
static void MPIR_Bsend_dump( void );
#endif

46
#define BSENDDATA_HEADER_TRUE_SIZE (sizeof(MPIR_Bsend_data_t) - sizeof(double))
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

/* BsendBuffer is the structure that describes the overall Bsend buffer */
/* 
 * We use separate buffer and origbuffer because we may need to align
 * the buffer (we *could* always memcopy the header to an aligned region,
 * but it is simpler to just align it internally.  This does increase the
 * BSEND_OVERHEAD, but that is already relatively large.  We could instead
 * make sure that the initial header was set at an aligned location (
 * taking advantage of the "alignpad"), but this would require more changes.
 */
static struct BsendBuffer {
    void               *buffer;        /* Pointer to the begining of the user-
					  provided buffer */
    int                buffer_size;    /* Size of the user-provided buffer */
    void               *origbuffer;    /* Pointer to the buffer provided by
					  the user */
    int                origbuffer_size; /* Size of the buffer as provided 
					    by the user */
65
    MPIR_Bsend_data_t  *avail;         /* Pointer to the first available block
66
					  of space */
67
    MPIR_Bsend_data_t  *pending;       /* Pointer to the first message that
68
69
70
					  could not be sent because of a 
					  resource limit (e.g., no requests
					  available) */
71
    MPIR_Bsend_data_t  *active;        /* Pointer to the first active (sending)
72
73
74
75
76
77
78
79
					  message */
} BsendBuffer = { 0, 0, 0, 0, 0, 0, 0 };

static int initialized = 0;   /* keep track of the first call to any
				 bsend routine */

/* Forward references */
static void MPIR_Bsend_retry_pending( void );
80
static int MPIR_Bsend_check_active ( void );
81
82
static MPIR_Bsend_data_t *MPIR_Bsend_find_buffer( int );
static void MPIR_Bsend_take_buffer( MPIR_Bsend_data_t *, int );
83
84
85
86
87
88
static int MPIR_Bsend_finalize( void * );

/*
 * Attach a buffer.  This checks for the error conditions and then
 * initialized the avail buffer.
 */    
89
90
91
92
#undef FUNCNAME
#define FUNCNAME MPIR_Bsend_attach
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
93
94
int MPIR_Bsend_attach( void *buffer, int buffer_size )
{
95
    MPIR_Bsend_data_t *p;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    long        offset;

#   ifdef HAVE_ERROR_CHECKING
    {
        MPID_BEGIN_ERROR_CHECKS;
        {
	    if (BsendBuffer.buffer) {
		return MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
                         "MPIR_Bsend_attach", __LINE__, MPI_ERR_BUFFER, 
					     "**bufexists", 0 );
	    }
	    if (buffer_size < MPI_BSEND_OVERHEAD) {
		/* MPI_ERR_OTHER is another valid choice for this error,
		 but the Intel test wants MPI_ERR_BUFFER, and it seems
		 to violate the principle of least surprise to not use
		 MPI_ERR_BUFFER for errors with the Buffer */
		return MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
		    "MPIR_Bsend_attach", __LINE__, MPI_ERR_BUFFER, 
		    "**bsendbufsmall", 
                    "**bsendbufsmall %d %d", buffer_size, MPI_BSEND_OVERHEAD );
	    }
	}
	MPID_END_ERROR_CHECKS;
    }
#   endif /* HAVE_ERROR_CHECKING */

    if (!initialized) {
	initialized = 1;
	MPIR_Add_finalize( MPIR_Bsend_finalize, (void *)0, 10 );
    }

    BsendBuffer.origbuffer	= buffer;
    BsendBuffer.origbuffer_size	= buffer_size;
    BsendBuffer.buffer		= buffer;
    BsendBuffer.buffer_size	= buffer_size;
    offset = ((long)buffer) % sizeof(void *);
    if (offset) {
	/* Make sure that the buffer that we use is aligned for pointers,
	   because the code assumes that */
	offset = sizeof(void *) - offset;
	buffer = (char *)buffer + offset;
	BsendBuffer.buffer      = buffer;
	BsendBuffer.buffer_size -= offset;
    }
    BsendBuffer.avail		= buffer;
    BsendBuffer.pending		= 0;
    BsendBuffer.active		= 0;

    /* Set the first block */
145
    p		  = (MPIR_Bsend_data_t *)buffer;
146
147
148
149
150
151
152
153
154
155
156
157
    p->size	  = buffer_size - BSENDDATA_HEADER_TRUE_SIZE;
    p->total_size = buffer_size;
    p->next	  = p->prev = 0;
    p->msg.msgbuf = (char *)p + BSENDDATA_HEADER_TRUE_SIZE;

    return MPI_SUCCESS;
}

/* 
 * Detach a buffer.  This routine must wait until any pending bsends 
 * are complete.
 */
158
159
160
161
#undef FUNCNAME
#define FUNCNAME MPIR_Bsend_detach
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
162
163
164
int MPIR_Bsend_detach( void *bufferp, int *size )
{
    if (BsendBuffer.pending) {
165
	/* FIXME: Process pending bsend requests in detach */
166
	return MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, 
167
            "MPIR_Bsend_detach", __LINE__, MPI_ERR_OTHER, "**bsendpending", 0 );
168
169
170
    }
    if (BsendBuffer.active) {
	/* Loop through each active element and wait on it */
171
	MPIR_Bsend_data_t *p = BsendBuffer.active;
172
173
174

	while (p) {
	    MPI_Request r = p->request->handle;
175
	    MPIR_Wait_impl( &r, MPI_STATUS_IGNORE );
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
	    p = p->next;
	}
    }

/* Note that this works even when the buffer does not exist */
    *(void **) bufferp  = BsendBuffer.origbuffer;
    *size = BsendBuffer.origbuffer_size;
    BsendBuffer.origbuffer = NULL;
    BsendBuffer.origbuffer_size = 0;
    BsendBuffer.buffer  = 0;
    BsendBuffer.buffer_size  = 0;
    BsendBuffer.avail   = 0;
    BsendBuffer.active  = 0;
    BsendBuffer.pending = 0;

    return MPI_SUCCESS;
}

/*
 * Initiate an ibsend.  We'll used this for Bsend as well.
 */
197
198
199
200
#undef FUNCNAME
#define FUNCNAME MPIR_Bsend_isend
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
201
202
int MPIR_Bsend_isend( void *buf, int count, MPI_Datatype dtype, 
		      int dest, int tag, MPID_Comm *comm_ptr, 
203
		      MPIR_Bsend_kind_t kind, MPID_Request **request )
204
{
205
    int mpi_errno = MPI_SUCCESS;
206
207
    MPIR_Bsend_data_t *p;
    MPIR_Bsend_msg_t *msg;
208
    int packsize, pass;
209
210
211
212
213
214
215
    MPIU_THREADPRIV_DECL;

    /* Find a free segment and copy the data into it.  If we could 
       have, we would already have used tBsend to send the message with
       no copying.

       We may want to decide here whether we need to pack at all 
216
       or if we can just use (a MPIU_Memcpy) of the buffer.
217
218
219
220
221
222
223
    */

    MPIU_THREADPRIV_GET;
    MPIR_Nest_incr();

    /* We check the active buffer first.  This helps avoid storage 
       fragmentation */
224
225
    mpi_errno = MPIR_Bsend_check_active();
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
226

227
228
229
230
231
232
233
234
    if (dtype != MPI_PACKED)
    {
        (void)NMPI_Pack_size( count, dtype, comm_ptr->handle, &packsize );
    }
    else
    {
        packsize = count;
    }
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

    MPIU_DBG_MSG_D(BSEND,TYPICAL,"looking for buffer of size %d", packsize);
    /*
     * Use two passes.  Each pass is the same; between the two passes,
     * attempt to complete any active requests, and start any pending
     * ones.  If the message can be initiated in the first pass,
     * do not perform the second pass.
     */
    for (pass = 0; pass < 2; pass++) {
	
	p = MPIR_Bsend_find_buffer( packsize );
	if (p) {
	    MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
                     "found buffer of size %d with address %p",packsize,p));
	    /* Found a segment */
250
251

	    msg = &p->msg;
252
253
254
255
	    
	    /* Pack the data into the buffer */
	    /* We may want to optimize for the special case of
	       either primative or contiguous types, and just
256
	       use MPIU_Memcpy and the provided datatype */
257
	    msg->count = 0;
258
259
260
261
262
263
264
            if (dtype != MPI_PACKED)
            {
                (void)NMPI_Pack( buf, count, dtype, p->msg.msgbuf, packsize, 
                                 &p->msg.count, comm_ptr->handle );
            }
            else
            {
265
                MPIU_Memcpy(p->msg.msgbuf, buf, count);
266
267
                p->msg.count = count;
            }
268
269
	    /* Try to send the message.  We must use MPID_Isend
	       because this call must not block */
270
	    mpi_errno = MPID_Isend(msg->msgbuf, msg->count, MPI_PACKED, 
271
272
				   dest, tag, comm_ptr,
				   MPID_CONTEXT_INTRA_PT2PT, &p->request );
273
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
	    if (p->request) {
		MPIU_DBG_MSG_FMT(BSEND,TYPICAL,
		    (MPIU_DBG_FDEST,"saving request %p in %p",p->request,p));
		/* An optimization is to check to see if the 
		   data has already been sent.  The original code
		   to do this was commented out and probably did not match
		   the current request internals */
		MPIR_Bsend_take_buffer( p, p->msg.count );
		p->kind  = kind;
		*request = p->request;
	    }
	    break;
	}
	/* If we found a buffer or we're in the seccond pass, then break.
	    Note that the test on phere is redundant, as the code breaks 
	    out of the loop in the test above if a block p is found. */
	if (p || pass == 1) break;
	MPIU_DBG_MSG(BSEND,TYPICAL,"Could not find storage, checking active");
	/* Try to complete some pending bsends */
	MPIR_Bsend_check_active( );
	/* Give priority to any pending operations */
	MPIR_Bsend_retry_pending( );
    }
    MPIR_Nest_decr();
    
    if (!p) {
	/* Return error for no buffer space found */
	/* Generate a traceback of the allocated space, explaining why
	   packsize could not be found */
	MPIU_DBG_MSG(BSEND,TYPICAL,"Could not find space; dumping arena" );
	MPIU_DBG_STMT(BSEND,TYPICAL,MPIR_Bsend_dump());

306
307
308
309
	mpi_errno = MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE, "MPIR_Bsend_isend", __LINE__, MPI_ERR_BUFFER, "**bufbsend", 
                                          "**bufbsend %d %d", packsize, 
                                          BsendBuffer.buffer_size );
        MPIU_ERR_POP(mpi_errno);
310
    }
311
312
313
314
315
    
 fn_exit:
    return mpi_errno;
 fn_fail:
    goto fn_exit;
316
317
318
319
320
321
322
323
324
325
326
327
}

/*
 * The following routines are used to manage the allocation of bsend segments
 * in the user buffer.  These routines handle, for example, merging segments
 * when an active segment that is adjacent to a free segment becomes free.
 *
 */

/* Add block p to the free list. Merge into adjacent blocks.  Used only 
   within the check_active */
/* begin:nested */
328
329
330
331
#undef FUNCNAME
#define FUNCNAME MPIR_Bsend_free_segment
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
332
static void MPIR_Bsend_free_segment( MPIR_Bsend_data_t *p )
333
{
334
    MPIR_Bsend_data_t *prev = p->prev, *avail = BsendBuffer.avail, *avail_prev;
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420

    MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
                 "Freeing bsend segment at %p of size %d, next at %p",
		 p,p->size, ((char *)p)+p->total_size));

    MPIU_DBG_MSG_D(BSEND,TYPICAL,
	     "At the begining of free_segment with size %d:", p->total_size );
    MPIU_DBG_STMT(BSEND,TYPICAL,MPIR_Bsend_dump());

    /* Remove the segment from the active list */
    if (prev) {
	MPIU_DBG_MSG(BSEND,TYPICAL,"free segment is within active list");
	prev->next = p->next;
    }
    else {
	/* p was at the head of the active list */
	MPIU_DBG_MSG(BSEND,TYPICAL,"free segment is head of active list");
	BsendBuffer.active = p->next;
	/* The next test sets the prev pointer to null */
    }
    if (p->next) {
	p->next->prev = prev;
    }

    MPIU_DBG_STMT(BSEND,VERBOSE,MPIR_Bsend_dump());

    /* Merge into the avail list */
    /* Find avail_prev, avail, such that p is between them.
       either may be null if p is at either end of the list */
    avail_prev = 0;
    while (avail) {
	if (avail > p) {
	    break;
	}
	avail_prev = avail;
	avail      = avail->next;
    }

    /* Try to merge p with the next block */
    if (avail) {
	if ((char *)p + p->total_size == (char *)avail) {
	    p->total_size += avail->total_size;
	    p->size       = p->total_size - BSENDDATA_HEADER_TRUE_SIZE;
	    p->next = avail->next;
	    if (avail->next) avail->next->prev = p;
	    avail = 0;
	}
	else {
	    p->next = avail;
	    avail->prev = p;
	}
    }
    else {
	p->next = 0;
    }
    /* Try to merge p with the previous block */
    if (avail_prev) {
	if ((char *)avail_prev + avail_prev->total_size == (char *)p) {
	    avail_prev->total_size += p->total_size;
	    avail_prev->size       = avail_prev->total_size - BSENDDATA_HEADER_TRUE_SIZE;
	    avail_prev->next = p->next;
	    if (p->next) p->next->prev = avail_prev;
	}
	else {
	    avail_prev->next = p;
	    p->prev          = avail_prev;
	}
    }
    else {
	/* p is the new head of the list */
	BsendBuffer.avail = p;
	p->prev           = 0;
    }

    MPIU_DBG_MSG(BSEND,TYPICAL,"At the end of free_segment:" );
    MPIU_DBG_STMT(BSEND,TYPICAL,MPIR_Bsend_dump());
}
/* end:nested */
/* 
 * The following routine tests for completion of active sends and 
 * frees the related storage
 *
 * To make it easier to identify the source of the request, we keep
 * track of the type of MPI routine (ibsend, bsend, or bsend_init/start)
 * that created the bsend entry.
 */
421
422
423
424
#undef FUNCNAME
#define FUNCNAME MPIR_Bsend_check_active
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
425
static int MPIR_Bsend_check_active( void )
426
{
427
    int mpi_errno = MPI_SUCCESS;
428
    MPIR_Bsend_data_t *active = BsendBuffer.active, *next_active;
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444

    MPIU_DBG_MSG_P(BSEND,TYPICAL,"Checking active starting at %p", active);
    while (active) {
	MPI_Request r = active->request->handle;
	int         flag;
	
	next_active = active->next;

	if (active->kind == IBSEND) {
	    /* We handle ibsend specially to allow for the user
	       to attempt and cancel the request. Also, to allow
	       for a cancel attempt (which must be attempted before
	       a successful test or wait), we only start
	       testing when the user has successfully released
	       the request (it is a grequest, the free call will do it) */
	    flag = 0;
445
446
            /* XXX DJG FIXME-MT should we be checking this? */
	    if (MPIU_Object_get_ref(active->request) == 1) {
447
448
449
		mpi_errno = MPIR_Test_impl(&r, &flag, MPI_STATUS_IGNORE );
                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
	    } else {
450
451
452
453
		/* We need to invoke the progress engine in case we 
		 need to advance other, incomplete communication.  */
		MPID_Progress_state progress_state;
		MPID_Progress_start(&progress_state);
454
		mpi_errno = MPID_Progress_test( );
455
		MPID_Progress_end(&progress_state);
456
                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
457
	    }
458
459
460
	} else {
	    mpi_errno = MPIR_Test_impl( &r, &flag, MPI_STATUS_IGNORE );
            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
461
462
463
464
465
466
467
468
469
	}
	if (flag) {
	    /* We're done.  Remove this segment */
	    MPIU_DBG_MSG_P(BSEND,TYPICAL,"Removing segment %p", active);
	    MPIR_Bsend_free_segment( active );
	}
	active = next_active;
	MPIU_DBG_MSG_P(BSEND,TYPICAL,"Next active is %p",active);
    }
470
471
472
473
474

 fn_exit:
    return mpi_errno;
 fn_fail:
    goto fn_exit;
475
476
477
}

/* 
478
479
 * FIXME : For each pending item (that is, items that we couldn't even start 
 * sending), try to get them going.  
480
481
482
 */
static void MPIR_Bsend_retry_pending( void )
{
483
    MPIR_Bsend_data_t *pending = BsendBuffer.pending, *next_pending;
484
485
486
487

    while (pending) {
	next_pending = pending->next;
	/* Retry sending this item */
488
	/* FIXME: Unimplemented retry of pending bsend operations */
489
490
491
492
493
494
495
496
	pending = next_pending;
    }
}

/* 
 * Find a slot in the avail buffer that can hold size bytes.  Does *not*
 * remove the slot from the avail buffer (see MPIR_Bsend_take_buffer) 
 */
497
static MPIR_Bsend_data_t *MPIR_Bsend_find_buffer( int size )
498
{
499
    MPIR_Bsend_data_t *p = BsendBuffer.avail;
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

    while (p) {
	if (p->size >= size) { 
	    return p;
	}
	p = p->next;
    }
    return 0;
}

/* This is the minimum number of bytes that a segment must be able to
   hold. */
#define MIN_BUFFER_BLOCK 8
/*
 * Carve off size bytes from buffer p and leave the remainder
 * on the avail list.  Handle the head/tail cases. 
 * If there isn't enough left of p, remove the entire segment from
 * the avail list.
 */
519
static void MPIR_Bsend_take_buffer( MPIR_Bsend_data_t *p, int size  )
520
{
521
    MPIR_Bsend_data_t *prev;
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
    int         alloc_size;

    /* Compute the remaining size.  This must include any padding 
       that must be added to make the new block properly aligned */
    alloc_size = size;
    if (alloc_size & 0x7) 
	alloc_size += (8 - (alloc_size & 0x7));
    /* alloc_size is the amount of space (out of size) that we will 
       allocate for this buffer. */

    MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
			    "Taking %d bytes from a block with %d bytes\n", 
				    alloc_size, p->total_size ));

    /* Is there enough space left to create a new block? */
    if (alloc_size + (int)BSENDDATA_HEADER_TRUE_SIZE + MIN_BUFFER_BLOCK <= p->size) {
	/* Yes, the available space (p->size) is large enough to 
	   carve out a new block */
540
	MPIR_Bsend_data_t *newp;
541
542
	
	MPIU_DBG_MSG_P(BSEND,TYPICAL,"Breaking block into used and allocated at %p", p );
543
	newp = (MPIR_Bsend_data_t *)( (char *)p + BSENDDATA_HEADER_TRUE_SIZE + 
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
				alloc_size );
	newp->total_size = p->total_size - alloc_size - 
	    BSENDDATA_HEADER_TRUE_SIZE;
	newp->size = newp->total_size - BSENDDATA_HEADER_TRUE_SIZE;
	newp->msg.msgbuf = (char *)newp + BSENDDATA_HEADER_TRUE_SIZE;

	/* Insert this new block after p (we'll remove p from the avail list
	   next) */
	newp->next = p->next;
	newp->prev = p;
	if (p->next) {
	    p->next->prev = newp;
	}
	p->next       = newp;
	p->total_size = (char *)newp - (char*)p;
	p->size       = p->total_size - BSENDDATA_HEADER_TRUE_SIZE;

	MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
		   "broken blocks p (%d) and new (%d)\n",
		    p->total_size, newp->total_size ));
    }

    /* Remove p from the avail list and add it to the active list */
    prev = p->prev;
    if (prev) {
	prev->next = p->next;
    }
    else {
	BsendBuffer.avail = p->next;
    }

    if (p->next) {
	p->next->prev = p->prev;
    }
	
    if (BsendBuffer.active) {
	BsendBuffer.active->prev = p;
    }
    p->next	       = BsendBuffer.active;
    p->prev	       = 0;
    BsendBuffer.active = p;

    MPIU_DBG_MSG_P(BSEND,VERBOSE,"segment %p now head of active",p); 
    MPIU_DBG_MSG(BSEND,TYPICAL,"At end of take buffer" );
    MPIU_DBG_STMT(BSEND,TYPICAL,MPIR_Bsend_dump());
}

591
static int MPIR_Bsend_finalize( void *p ATTRIBUTE((unused)) )
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
{
    void *b;
    int  s;

    MPIU_UNREFERENCED_ARG(p);

    if (BsendBuffer.buffer) {
	/* Use detach to complete any communication */
	MPIR_Bsend_detach( &b, &s );
    }
    return 0;
}

/* 
 * These routines are defined only if debug logging is enabled
 */
#ifdef USE_DBG_LOGGING
static void MPIR_Bsend_dump( void )
{
611
    MPIR_Bsend_data_t *a = BsendBuffer.avail;
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642

    MPIU_DBG_MSG_D(BSEND,TYPICAL,"Total size is %d",BsendBuffer.buffer_size );
    MPIU_DBG_MSG(BSEND,TYPICAL,"Avail list is:" );
    while (a) {
	MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
				"[%p] totalsize = %d(%x)", a, a->total_size, 
					a->total_size ));
	if (a == a->next) {
	    MPIU_DBG_MSG(BSEND,TYPICAL,
			 "@@@Corrupt list; avail block points at itself" );
	    break;
	}
	a = a->next;
    }
    
    MPIU_DBG_MSG(BSEND,TYPICAL,"Active list is:" );
    a = BsendBuffer.active;
    while (a) {
	MPIU_DBG_MSG_FMT(BSEND,TYPICAL,(MPIU_DBG_FDEST,
				"[%p] totalsize = %d(%x)", a, a->total_size, 
					a->total_size ));
	if (a == a->next) {
	    MPIU_DBG_MSG(BSEND,TYPICAL,
			 "@@@Corrupt list; active block points at itself" );
	    break;
	}
	a = a->next;
    }
    MPIU_DBG_MSG(BSEND,TYPICAL,"end of list" );
}
#endif