Commit 848eaa5c authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r8031] Unified send queue implementations to use the same macros. This...

[svn-r8031] Unified send queue implementations to use the same macros.  This also fixes a bug that accessed the dev.next pointer in the request after it had been freed.
parent 0a6af971
......@@ -10,6 +10,7 @@
#include "mpidi_ch3_conf.h"
#include "mpidimpl.h"
#include "mpiu_os_wrappers.h"
#include "mpid_nem_generic_queue.h"
#if defined(HAVE_ASSERT_H)
#include <assert.h>
......@@ -18,66 +19,33 @@
extern void *MPIDI_CH3_packet_buffer;
extern int MPIDI_CH3I_my_rank;
#define CH3_NORMAL_QUEUE 0
#define CH3_RNDV_QUEUE 1
#define CH3_NUM_QUEUES 2
extern struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES];
extern struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES];
extern struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES];
#define MPIDI_CH3I_SendQ_enqueue(req, queue) \
do { \
MPIU_Assert(req != NULL); \
/* MT - not thread safe! */ \
MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_enqueue req=0x%08x", req->handle)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPIDI_CH3I_SendQ_enqueue(req=%p (handle=0x%x), queue=%s (%d))", \
(req), \
(req)->handle, \
#queue, queue)); \
/* because an OnDataAvail function might complete this request and cause */ \
/* it to be freed before it is dequeued, we have to add a reference */ \
/* whenever a req is added to a queue */ \
MPIR_Request_add_ref(req); \
req->dev.next = NULL; \
if (MPIDI_CH3I_sendq_tail[queue] != NULL) \
{ \
MPIDI_CH3I_sendq_tail[queue]->dev.next = req; \
} \
else \
{ \
MPIDI_CH3I_sendq_head[queue] = req; \
} \
MPIDI_CH3I_sendq_tail[queue] = req; \
} while (0)
/* NOTE: this macro may result in the dequeued request being freed (via
* MPID_Request_release) */
#define MPIDI_CH3I_SendQ_dequeue(queue) \
do { \
MPID_Request *req_; \
/* MT - not thread safe! */ \
MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_dequeue req=0x%08x", \
MPIDI_CH3I_sendq_head[queue]->handle)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPIDI_CH3I_SendQ_dequeue(queue=%s (%d)), head req=%p (handle=0x%x)", \
#queue, queue, \
MPIDI_CH3I_sendq_head[queue], \
((MPIDI_CH3I_sendq_head[queue]) ? MPIDI_CH3I_sendq_head[queue]->handle : -1))); \
/* see the comment in _enqueue above about refcounts */ \
req_ = MPIDI_CH3I_sendq_head[queue]; \
MPIDI_CH3I_sendq_head[queue] = MPIDI_CH3I_sendq_head[queue]->dev.next; \
MPID_Request_release(req_); \
if (MPIDI_CH3I_sendq_head[queue] == NULL) \
{ \
MPIDI_CH3I_sendq_tail[queue] = NULL; \
} \
} while (0)
#define MPIDI_CH3I_SendQ_head(queue) (MPIDI_CH3I_sendq_head[queue])
#define MPIDI_CH3I_SendQ_empty(queue) (MPIDI_CH3I_sendq_head[queue] == NULL)
typedef GENERIC_Q_DECL(struct MPID_Request) MPIDI_CH3I_shm_sendq_t;
extern MPIDI_CH3I_shm_sendq_t MPIDI_CH3I_shm_sendq;
extern struct MPID_Request *MPIDI_CH3I_shm_active_send;
/* Send queue macros */
/* MT - not thread safe! */
#define MPIDI_CH3I_Sendq_empty(q) GENERIC_Q_EMPTY (q)
#define MPIDI_CH3I_Sendq_head(q) GENERIC_Q_HEAD (q)
#define MPIDI_CH3I_Sendq_enqueue(qp, ep) do { \
/* add refcount so req doesn't get freed before it's dequeued */ \
MPIR_Request_add_ref(ep); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPIDI_CH3I_Sendq_enqueue req=%p (handle=%#x), queue=%p", \
ep, (ep)->handle, qp)); \
GENERIC_Q_ENQUEUE (qp, ep, dev.next); \
} while (0)
#define MPIDI_CH3I_Sendq_dequeue(qp, ep) do { \
GENERIC_Q_DEQUEUE (qp, ep, dev.next); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPIDI_CH3I_Sendq_dequeuereq=%p (handle=%#x), queue=%p", \
*(ep), *(ep) ? (*(ep))->handle : -1, qp)); \
MPID_Request_release(*(ep)); \
} while (0)
#define MPIDI_CH3I_Sendq_enqueue_multiple_no_refcount(qp, ep0, ep1) \
/* used to move reqs from one queue to another, so we don't update */ \
/* the refcounts */ \
GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
int MPIDI_CH3I_Progress_init(void);
int MPIDI_CH3I_Progress_finalize(void);
......
......@@ -11,10 +11,11 @@
the next pointer field in the element (e.g., "ch.tcp_sendq_next") */
#define PRINT_QUEUE(qp, next_field) do { \
} while(0)
#define PRINTM_QUEUE(qp, next_field_macro, next_field) do { \
} while(0)
} while(0)
#define PRINTM_QUEUE(qp, next_field_macro, next_field) do { \
} while(0)
#define GENERIC_Q_DECL(type) struct { type *head, *tail; }
#define GENERIC_Q_EMPTY(q) ((q).head == NULL)
......@@ -78,6 +79,7 @@
(qp)->head = (*(epp))->next_field; \
if ((qp)->head == NULL) \
(qp)->tail = NULL; \
PRINT_QUEUE (qp, next_field); \
} while (0)
/* remove the elements from the top of the queue starting with ep0 through ep1 */
......@@ -147,6 +149,8 @@
/* queue macros that use another macro to find the 'next' field, e.g.,
when the next field is in the channel private area of a request.
The macro is of the form "macro_name(element_ptr, next_field)"*/
#define GENERICM_Q_DECL(type, q_name) typedef struct { type *head, *tail; } q_name;
#define GENERICM_Q_EMPTY(q) ((q).head == NULL)
#define GENERICM_Q_HEAD(q) ((q).head)
......
......@@ -16,10 +16,6 @@
#include "mpid_nem_fbox.h"
#include "mpid_nem_nets.h"
#include "mpid_nem_queue.h"
#include "mpid_nem_generic_queue.h"
#include "mpiu_os_wrappers.h"
#define MPID_NEM__BYPASS_Q_MAX_VAL ((MPID_NEM_MPICH2_DATA_LEN) - (sizeof(MPIDI_CH3_Pkt_t)))
......
......@@ -34,10 +34,10 @@ static inline void MPID_nem_mpich2_send_seg (MPID_Segment *segment, MPIDI_msg_sz
/* evaluates to TRUE if it is safe to block on recv operations in the progress
* loop, FALSE otherwise */
#define MPID_nem_safe_to_block_recv() \
(!MPID_nem_local_lmt_pending && \
!MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] && \
!MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) && \
#define MPID_nem_safe_to_block_recv() \
(!MPID_nem_local_lmt_pending && \
!MPIDI_CH3I_shm_active_send && \
!MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq) && \
!MPIDU_Sched_are_pending())
#undef FUNCNAME
......
......@@ -11,12 +11,6 @@ MPIU_SUPPRESS_OSX_HAS_NO_SYMBOLS_WARNING;
#ifdef ENABLE_CHECKPOINTING
#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
#define SENDQ_ENQUEUE_MULTIPLE(qp, ep0, ep1) GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_ckpt_pause_send_vc
#undef FCNAME
......@@ -54,13 +48,13 @@ int MPID_nem_tcp_pkt_unpause_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI
vc_tcp->send_paused = FALSE;
/* There may be a unpause message in the send queue. If so, just enqueue everything on the send queue. */
if (SENDQ_EMPTY(vc_tcp->send_queue))
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->paused_send_queue);
/* if anything is left on the paused queue, put it on the send queue and wait for the reconnect */
if (!SENDQ_EMPTY(vc_tcp->paused_send_queue)) {
if (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
SENDQ_ENQUEUE_MULTIPLE(&vc_tcp->send_queue, vc_tcp->paused_send_queue.head, vc_tcp->paused_send_queue.tail);
MPIDI_CH3I_Sendq_enqueue_multiple_no_refcount(&vc_tcp->send_queue, vc_tcp->paused_send_queue.head, vc_tcp->paused_send_queue.tail);
vc_tcp->paused_send_queue.head = vc_tcp->paused_send_queue.tail = NULL;
}
......
......@@ -28,11 +28,7 @@ typedef enum{MPID_NEM_TCP_VC_STATE_DISCONNECTED,
#define MPIDI_NEM_TCP_MAX_CONNECT_RETRIES 100
typedef struct MPIDI_nem_tcp_request_queue
{
struct MPID_Request *head;
struct MPID_Request *tail;
} MPIDI_nem_tcp_request_queue_t;
typedef GENERIC_Q_DECL(struct MPID_Request) MPIDI_nem_tcp_request_queue_t;
/* The vc provides a generic buffer in which network modules can store
private fields This removes all dependencies from the VC struction
......@@ -147,13 +143,6 @@ int MPID_nem_tcp_pkt_unpause_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI
#define Q_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, next)
#define Q_REMOVE_ELEMENTS(qp, ep0, ep1) GENERIC_Q_REMOVE_ELEMENTS (qp, ep0, ep1, next)
/* Send queue macros */
#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
#define SENDQ_ENQUEUE_MULTIPLE(qp, ep0, ep1) GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
/* VC list macros */
#define VC_L_EMPTY(q) GENERIC_L_EMPTY (q)
......
......@@ -9,12 +9,6 @@
#define NUM_PREALLOC_SENDQ 10
#define MAX_SEND_IOV 10
#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
typedef struct MPID_nem_tcp_send_q_element
{
struct MPID_nem_tcp_send_q_element *next;
......@@ -87,13 +81,14 @@ int MPID_nem_tcp_send_queued(MPIDI_VC_t *vc, MPIDI_nem_tcp_request_queue_t *send
MPIU_Assert(vc != NULL);
if (SENDQ_EMPTY(*send_queue))
if (MPIDI_CH3I_Sendq_empty(*send_queue))
goto fn_exit;
while (!SENDQ_EMPTY(*send_queue))
while (!MPIDI_CH3I_Sendq_empty(*send_queue))
{
sreq = SENDQ_HEAD(*send_queue);
sreq = MPIDI_CH3I_Sendq_head(*send_queue);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "Sending %p", sreq);
iov = &sreq->dev.iov[sreq->dev.iov_offset];
CHECK_EINTR(offset, writev(vc_tcp->sc->fd, iov, sreq->dev.iov_count));
......@@ -155,7 +150,7 @@ int MPID_nem_tcp_send_queued(MPIDI_VC_t *vc, MPIDI_nem_tcp_request_queue_t *send
MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
MPIDI_CH3U_Request_complete(sreq);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
SENDQ_DEQUEUE(send_queue, &sreq);
MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
......@@ -166,14 +161,14 @@ int MPID_nem_tcp_send_queued(MPIDI_VC_t *vc, MPIDI_nem_tcp_request_queue_t *send
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
SENDQ_DEQUEUE(send_queue, &sreq);
MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
sreq->dev.iov_offset = 0;
}
}
if (SENDQ_EMPTY(*send_queue))
if (MPIDI_CH3I_Sendq_empty(*send_queue))
UNSET_PLFD(vc_tcp);
fn_exit:
......@@ -216,7 +211,7 @@ int MPID_nem_tcp_conn_est (MPIDI_VC_t *vc)
MPIDI_CHANGE_VC_STATE(vc, ACTIVE);
if (!SENDQ_EMPTY (vc_tcp->send_queue))
if (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
SET_PLFD(vc_tcp);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
......@@ -252,7 +247,7 @@ int MPID_nem_tcp_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_s
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (SENDQ_EMPTY(vc_tcp->send_queue))
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
......@@ -340,21 +335,21 @@ int MPID_nem_tcp_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_s
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (SENDQ_EMPTY(vc_tcp->send_queue)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
......@@ -391,7 +386,7 @@ int MPID_nem_tcp_iStartContigMsg_paused(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (SENDQ_EMPTY(vc_tcp->send_queue))
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
......@@ -479,18 +474,18 @@ int MPID_nem_tcp_iStartContigMsg_paused(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (SENDQ_EMPTY(vc_tcp->send_queue)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
*sreq_ptr = sreq;
......@@ -526,7 +521,7 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (SENDQ_EMPTY(vc_tcp->send_queue))
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
......@@ -633,21 +628,21 @@ enqueue_request:
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (SENDQ_EMPTY(vc_tcp->send_queue)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
......@@ -690,7 +685,7 @@ int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
if (SENDQ_EMPTY(vc_tcp->send_queue))
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
CHECK_EINTR(offset, writev(vc_tcp->sc->fd, iov, iov_n));
if (offset == 0) {
......@@ -785,21 +780,21 @@ int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
if (SENDQ_EMPTY(vc_tcp->send_queue)) {
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
......@@ -828,16 +823,16 @@ int MPID_nem_tcp_error_out_send_queue(struct MPIDI_VC *const vc, int req_errno)
an error condition and we just want to mark them as complete */
/* send queue */
while (!SENDQ_EMPTY(vc_tcp->send_queue)) {
SENDQ_DEQUEUE(&vc_tcp->send_queue, &req);
while (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
MPIDI_CH3I_Sendq_dequeue(&vc_tcp->send_queue, &req);
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
}
/* paused send queue */
while (!SENDQ_EMPTY(vc_tcp->paused_send_queue)) {
SENDQ_DEQUEUE(&vc_tcp->paused_send_queue, &req);
while (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
MPIDI_CH3I_Sendq_dequeue(&vc_tcp->paused_send_queue, &req);
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
......
......@@ -65,8 +65,8 @@ void MPID_nem_dbg_print_vc_sendq(FILE *stream, MPIDI_VC_t *vc)
fprintf(stream, "..VC ptr=%p pg_rank=%d state=%s:\n", vc, vc->pg_rank, vc_state_to_str(vc->state));
if (vc_ch->is_local) {
fprintf(stream, "....CH3_NORMAL_QUEUE active_send\n");
sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE];
fprintf(stream, "....shm_active_send\n");
sreq = MPIDI_CH3I_shm_active_send;
if (sreq) {
fprintf(stream, ".... sreq=%p ctx=%#x rank=%d tag=%d\n", sreq,
sreq->dev.match.parts.context_id,
......@@ -74,8 +74,8 @@ void MPID_nem_dbg_print_vc_sendq(FILE *stream, MPIDI_VC_t *vc)
sreq->dev.match.parts.tag);
}
fprintf(stream, "....CH3_NORMAL_QUEUE queue (head-to-tail)\n");
sreq = MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE);
fprintf(stream, "....shm send queue (head-to-tail)\n");
sreq = MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq);
i = 0;
while (sreq != NULL) {
fprintf(stream, "....[%d] sreq=%p ctx=%#x rank=%d tag=%d\n", i, sreq,
......
......@@ -49,7 +49,7 @@ int MPIDI_CH3_iSend (MPIDI_VC_t *vc, MPID_Request *sreq, void * hdr, MPIDI_msg_s
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
{
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, "iSend %d", (int) hdr_sz);
mpi_errno = MPID_nem_mpich2_send_header (hdr, hdr_sz, vc, &again);
......@@ -103,12 +103,12 @@ int MPIDI_CH3_iSend (MPIDI_VC_t *vc, MPID_Request *sreq, void * hdr, MPIDI_msg_s
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
......
......@@ -65,7 +65,7 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
{
MPID_IOV *remaining_iov = iov;
int remaining_n_iov = n_iov;
......@@ -105,9 +105,9 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
sreq->dev.iov_count = remaining_n_iov;
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
MPIDI_CH3I_shm_active_send = sreq;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, " enqueued");
}
else
......@@ -133,9 +133,9 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
sreq->dev.iov_offset = 0;
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
MPIDI_CH3I_shm_active_send = sreq;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, ".... reloaded and enqueued");
}
else
......@@ -167,7 +167,7 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
......
......@@ -52,7 +52,7 @@ int MPIDI_CH3_iStartMsg (MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, MPID_
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = 1;
if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
/* MT */
{
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, "iStartMsg %d", (int) hdr_sz);
......@@ -102,12 +102,12 @@ int MPIDI_CH3_iStartMsg (MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, MPID_
sreq->ch.vc = vc;
sreq->dev.OnDataAvail = 0;
if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
/* FIXME we are sometimes called from within the progress engine, we
* shouldn't be calling the progress engine again */
mpi_errno = MPIDI_CH3I_Shm_send_progress();
......
......@@ -74,7 +74,7 @@ int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
/* MT */
{
MPID_IOV *remaining_iov = iov;
......@@ -136,9 +136,9 @@ int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request
sreq->dev.iov[0].MPID_IOV_BUF = (char *) &sreq->dev.pending_pkt;
sreq->dev.iov[0].MPID_IOV_LEN = iov[0].MPID_IOV_LEN;
}
MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
MPIDI_CH3I_shm_active_send = sreq;
}
}
else
......@@ -168,12 +168,12 @@ int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */