Commit 8674633b authored by buntinas's avatar buntinas
Browse files

added noncontiguous send function to GM

parent 1e880001
......@@ -37,6 +37,7 @@ int MPID_nem_gm_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz
MPID_Request **sreq_ptr);;
int MPID_nem_gm_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz,
void *data, MPIDI_msg_sz_t data_sz);;
int MPID_nem_gm_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header, MPIDI_msg_sz_t hdr_sz);
int MPID_nem_gm_module_lmt_init();
int MPID_nem_gm_module_lmt_finalize();
......
......@@ -253,6 +253,7 @@ MPID_nem_gm_module_vc_init (MPIDI_VC_t *vc, const char *business_card)
vc_ch->iStartContigMsg = MPID_nem_gm_iStartContigMsg;
vc_ch->iSendContig = MPID_nem_gm_iSendContig;
vc->sendNoncontig_fn = MPID_nem_gm_SendNoncontig;
VC_FIELD(vc, source_id) = my_pg_rank; /* FIXME: this is only valid for processes in COMM_WORLD */
mpi_errno = MPID_nem_gm_module_get_port_unique_from_bc (business_card, &VC_FIELD(vc, gm_port_id), VC_FIELD(vc, gm_unique_id));
......
......@@ -175,6 +175,59 @@ static inline void send_pkt(int node_id, int port_id, int source_id, char **data
MPIDI_FUNC_EXIT(MPID_STATE_SEND_PKT);
}
/* sends a packet consisting of an optional header and noncontiguous
data described by a segment. */
#undef FUNCNAME
#define FUNCNAME send_noncontig_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline void send_noncontig_pkt(int node_id, int port_id, int source_id, void *hdr, MPIDI_msg_sz_t hdr_sz,
MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size)
{
sendbuf_t *sb;
MPIDI_msg_sz_t payload_len;
MPIDI_msg_sz_t last;
MPIDI_msg_sz_t h_len;
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT);
MPIU_Assert(MPID_nem_module_gm_num_send_tokens);
MPIU_Assert(!SENDBUF_S_EMPTY());
MPIU_Assert(SENDPKT_DATALEN > sizeof(MPIDI_CH3_Pkt_t));
SENDBUF_S_POP(&sb);
if (hdr)
{
/* copy header */
MPID_NEM_MEMCPY(&sb->pkt.buf, hdr, hdr_sz);
h_len = sizeof(MPIDI_CH3_Pkt_t);
}
else
h_len = 0;
/* copy data */
if (h_len + segment_size - *segment_first <= SENDPKT_DATALEN)
last = segment_size;
else
last = *segment_first + SENDPKT_DATALEN - h_len;
MPID_Segment_pack(segment, *segment_first, &last, ((char *)&sb->pkt.buf) + h_len);
payload_len = h_len + last - *segment_first;
*segment_first = last;
sb->node_id = node_id;
sb->port_id = port_id;
sb->datalen = PKT_HEADER_LEN + payload_len;
sb->pkt.source_id = source_id;
gm_send_with_callback(MPID_nem_module_gm_port, &sb->pkt, PACKET_SIZE, sb->datalen, GM_LOW_PRIORITY, node_id, port_id,
send_callback, (void *)sb);
--MPID_nem_module_gm_num_send_tokens;
MPIDI_FUNC_EXIT(MPID_STATE_SEND_NONCONTIG_PKT);
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_send_from_queue
......@@ -185,6 +238,7 @@ int MPID_nem_send_from_queue()
int mpi_errno = MPI_SUCCESS;
char *dataptr;
int datalen;
int complete;
while (active_send || !SEND_Q_EMPTY())
{
......@@ -196,15 +250,32 @@ int MPID_nem_send_from_queue()
if (MPID_nem_module_gm_num_send_tokens == 0)
goto fn_exit;
dataptr = active_send->dev.iov[0].MPID_IOV_BUF;
datalen = active_send->dev.iov[0].MPID_IOV_LEN;
send_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &dataptr, &datalen);
active_send->dev.iov[0].MPID_IOV_BUF = dataptr;
active_send->dev.iov[0].MPID_IOV_LEN = datalen;
if (datalen == 0)
if (active_send->ch.noncontig)
{
/* send only if there's something left to send */
if (active_send->dev.segment_first != active_send->dev.segment_size)
send_noncontig_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), NULL, 0,
active_send->dev.segment_ptr, &active_send->dev.segment_first, active_send->dev.segment_size);
/* have we finished sending the whole message? */
complete = active_send->dev.segment_first == active_send->dev.segment_size;
}
else
{
dataptr = active_send->dev.iov[0].MPID_IOV_BUF;
datalen = active_send->dev.iov[0].MPID_IOV_LEN;
/* send only if there's something left to send */
if (datalen != 0)
send_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &dataptr, &datalen);
active_send->dev.iov[0].MPID_IOV_BUF = dataptr;
active_send->dev.iov[0].MPID_IOV_LEN = datalen;
/* have we finished sending the whole message? */
complete = datalen == 0;
}
if (complete)
{
/* finished sending message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
......@@ -218,8 +289,7 @@ int MPID_nem_send_from_queue()
}
else
{
int complete = 0;
complete = 0;
mpi_errno = reqFn(vc, active_send, &complete);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......@@ -244,15 +314,25 @@ int MPID_nem_send_from_queue()
SEND_Q_DEQUEUE(&sreq);
active_send = sreq;
vc = sreq->ch.vc;
dataptr = sreq->dev.iov[0].MPID_IOV_BUF;
datalen = sreq->dev.iov[0].MPID_IOV_LEN;
if (active_send->ch.noncontig)
{
send_noncontig_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &sreq->dev.pending_pkt,
sreq->ch.header_sz, sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size);
}
else
{
dataptr = sreq->dev.iov[0].MPID_IOV_BUF;
datalen = sreq->dev.iov[0].MPID_IOV_LEN;
send_header_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &sreq->dev.pending_pkt,
sreq->ch.header_sz, &dataptr, &datalen);
send_header_pkt(VC_FIELD(vc, gm_node_id), VC_FIELD(vc, gm_port_id), VC_FIELD(vc, source_id), &sreq->dev.pending_pkt,
sreq->ch.header_sz, &dataptr, &datalen);
sreq->dev.iov[0].MPID_IOV_BUF = dataptr;
sreq->dev.iov[0].MPID_IOV_LEN = datalen;
}
sreq->dev.iov[0].MPID_IOV_BUF = dataptr;
sreq->dev.iov[0].MPID_IOV_LEN = datalen;
}
}
}
fn_exit:
......@@ -316,6 +396,7 @@ int MPID_nem_gm_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz
sreq->dev.OnDataAvail = 0;
sreq->dev.pending_pkt = *(MPIDI_CH3_PktGeneric_t *) hdr;
sreq->ch.noncontig = FALSE;
sreq->ch.header_sz = hdr_sz;
sreq->dev.iov[0].MPID_IOV_BUF = dataptr;
sreq->dev.iov[0].MPID_IOV_LEN = dataleft;
......@@ -411,6 +492,7 @@ int MPID_nem_gm_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
sreq->dev.pending_pkt = *(MPIDI_CH3_PktGeneric_t *) hdr;
sreq->ch.noncontig = FALSE;
sreq->ch.header_sz = hdr_sz;
sreq->dev.iov[0].MPID_IOV_BUF = dataptr;
sreq->dev.iov[0].MPID_IOV_LEN = dataleft;
......@@ -431,3 +513,95 @@ int MPID_nem_gm_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_gm_SendNoncontig
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_gm_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header, MPIDI_msg_sz_t hdr_sz)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_GM_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_GM_SENDNONCONTIG);
if (SEND_Q_EMPTY() && MPID_nem_module_gm_num_send_tokens)
/* MT */
{
sendbuf_t *sb;
int node_id = VC_FIELD(vc, gm_node_id);
int port_id = VC_FIELD(vc, gm_port_id);
int source_id = VC_FIELD(vc, source_id);
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "gm_SendNoncontig");
/* send first packet with header */
send_noncontig_pkt(node_id, port_id, source_id, header, hdr_sz,
sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size);
/* send additional packets of data */
while (sreq->dev.segment_first != sreq->dev.segment_size && MPID_nem_module_gm_num_send_tokens)
{
send_noncontig_pkt(node_id, port_id, source_id, NULL, 0, sreq->dev.segment_ptr,
&sreq->dev.segment_first, sreq->dev.segment_size);
}
if (sreq->dev.segment_first == sreq->dev.segment_size)
{
/* sent whole message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
reqFn = sreq->dev.OnDataAvail;
if (!reqFn)
{
MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
MPIDI_CH3U_Request_complete(sreq);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
else
{
int complete = 0;
mpi_errno = reqFn(vc, sreq, &complete);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
goto fn_exit;
}
sreq->ch.vc = vc;
active_send = sreq;
goto fn_exit;
}
}
}
/* enqueue request */
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
sreq->dev.pending_pkt = *(MPIDI_CH3_PktGeneric_t *)header;
sreq->ch.noncontig = TRUE;
sreq->ch.header_sz = hdr_sz;
sreq->ch.vc = vc;
/* if we sent anything, then the queue must have been empty */
MPIU_Assert(sreq->dev.segment_first == 0 || SEND_Q_EMPTY());
/* if we started sending this message, make it the active send */
if (sreq->dev.segment_first != 0)
active_send = sreq;
else
SEND_Q_ENQUEUE(sreq);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_GM_SENDNONCONTIG);
return mpi_errno;
fn_fail:
goto fn_exit;
}
......@@ -618,8 +618,8 @@ typedef struct MPIDI_VC
/* eager message threshold */
int eager_max_msg_sz;
/* eager noncontiguous send function pointer. Called to send a
noncontiguous eager message. Caller must initialize
/* noncontiguous send function pointer. Called to send a
noncontiguous message. Caller must initialize
sreq->dev.segment, _first and _size. Contiguous messages are
called directly from CH3 and cannot be overridden. */
int (* sendNoncontig_fn)( struct MPIDI_VC *vc, struct MPID_Request *sreq,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment