Commit e50bec3a authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Issue extended packet header in nemesis/netmod.



There are two APIs that are used to issue data with a request
created and passed:

iSendv() --- issue contiguous data;
sendNoncontig_fn() --- issue non-contiguous data;

In this patch, we modify the implementation of those two functions
in nemesis and netmod (tcp/mxm/ptl) to make them issue the extended
packet header stored in the request.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 25e40e43
......@@ -19,15 +19,17 @@ static inline int MPID_nem_mpich_send_header (void* buf, int size, MPIDI_VC_t *v
static inline int MPID_nem_mpich_sendv (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again);
static inline void MPID_nem_mpich_dequeue_fastbox (int local_rank);
static inline void MPID_nem_mpich_enqueue_fastbox (int local_rank);
static inline int MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again);
static inline int MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov,
void *ext_header, MPIDI_msg_sz_t ext_header_sz,
MPIDI_VC_t *vc, int *again);
static inline int MPID_nem_recv_seqno_matches (MPID_nem_queue_ptr_t qhead);
static inline int MPID_nem_mpich_test_recv (MPID_nem_cell_ptr_t *cell, int *in_fbox, int in_blocking_progress);
static inline int MPID_nem_mpich_blocking_recv (MPID_nem_cell_ptr_t *cell, int *in_fbox, int completions);
static inline int MPID_nem_mpich_test_recv_wait (MPID_nem_cell_ptr_t *cell, int *in_fbox, int timeout);
static inline int MPID_nem_mpich_release_cell (MPID_nem_cell_ptr_t cell, MPIDI_VC_t *vc);
static inline void MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first,
MPIDI_msg_sz_t segment_size, void *header, MPIDI_msg_sz_t header_sz,
MPIDI_VC_t *vc, int *again);
MPIDI_msg_sz_t segment_size, void *header, MPIDI_msg_sz_t header_sz,
void *ext_header, MPIDI_msg_sz_t ext_header_sz, MPIDI_VC_t *vc, int *again);
static inline void MPID_nem_mpich_send_seg (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
MPIDI_VC_t *vc, int *again);
......@@ -271,7 +273,9 @@ MPID_nem_mpich_sendv (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again)
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int
MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again)
MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov,
void *ext_hdr_ptr, MPIDI_msg_sz_t ext_hdr_sz,
MPIDI_VC_t *vc, int *again)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_cell_ptr_t el;
......@@ -279,6 +283,7 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
MPIDI_msg_sz_t payload_len;
int my_rank;
MPIDI_CH3I_VC *vc_ch = &vc->ch;
MPI_Aint buf_offset = 0;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_MPICH_SENDV_HEADER);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_MPICH_SENDV_HEADER);
......@@ -291,7 +296,8 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
my_rank = MPID_nem_mem_region.rank;
#ifdef USE_FASTBOX
if (*n_iov == 2 && (*iov)[1].MPID_IOV_LEN + sizeof(MPIDI_CH3_Pkt_t) <= MPID_NEM_FBOX_DATALEN)
/* Note: use fastbox only when there is no streaming optimization. */
if (ext_hdr_sz == 0 && *n_iov == 2 && (*iov)[1].MPID_IOV_LEN + sizeof(MPIDI_CH3_Pkt_t) <= MPID_NEM_FBOX_DATALEN)
{
MPID_nem_fbox_mpich_t *pbox = vc_ch->fbox_out;
......@@ -343,12 +349,19 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
#endif /*PREFETCH_CELL */
MPIU_Memcpy((void *)el->pkt.mpich.p.payload, (*iov)->MPID_IOV_BUF, sizeof(MPIDI_CH3_Pkt_t));
buf_offset += sizeof(MPIDI_CH3_Pkt_t);
cell_buf = (char *)(el->pkt.mpich.p.payload) + sizeof(MPIDI_CH3_Pkt_t);
if (ext_hdr_sz > 0) {
/* when extended packet header exists, copy it */
MPIU_Memcpy((void *)((char *)(el->pkt.mpich.p.payload) + buf_offset), ext_hdr_ptr, ext_hdr_sz);
buf_offset += ext_hdr_sz;
}
cell_buf = (char *)(el->pkt.mpich.p.payload) + buf_offset;
++(*iov);
--(*n_iov);
payload_len = MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t);
payload_len = MPID_NEM_MPICH_DATA_LEN - buf_offset;
while (*n_iov && payload_len >= (*iov)->MPID_IOV_LEN)
{
size_t _iov_len = (*iov)->MPID_IOV_LEN;
......@@ -419,13 +432,15 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline void
MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
void *header, MPIDI_msg_sz_t header_sz, MPIDI_VC_t *vc, int *again)
void *header, MPIDI_msg_sz_t header_sz, void *ext_header, MPIDI_msg_sz_t ext_header_sz,
MPIDI_VC_t *vc, int *again)
{
MPID_nem_cell_ptr_t el;
MPIDI_msg_sz_t datalen;
int my_rank;
MPIDI_msg_sz_t last;
MPIDI_CH3I_VC *vc_ch = &vc->ch;
MPI_Aint buf_offset = 0;
MPIU_Assert(vc_ch->is_local); /* netmods will have their own implementation */
MPIU_Assert(header_sz <= sizeof(MPIDI_CH3_Pkt_t));
......@@ -436,7 +451,7 @@ MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_f
my_rank = MPID_nem_mem_region.rank;
#ifdef USE_FASTBOX
if (sizeof(MPIDI_CH3_Pkt_t) + segment_size <= MPID_NEM_FBOX_DATALEN)
if (ext_header_sz == 0 && sizeof(MPIDI_CH3_Pkt_t) + segment_size <= MPID_NEM_FBOX_DATALEN)
{
MPID_nem_fbox_mpich_t *pbox = vc_ch->fbox_out;
......@@ -508,14 +523,22 @@ MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_f
/* copy header */
MPIU_Memcpy((void *)el->pkt.mpich.p.payload, header, header_sz);
buf_offset += sizeof(MPIDI_CH3_Pkt_t);
if (ext_header_sz > 0) {
/* when extended packet header exists, copy it */
MPIU_Memcpy((void *)((char *)(el->pkt.mpich.p.payload) + buf_offset), ext_header, ext_header_sz);
buf_offset += ext_header_sz;
}
/* copy data */
if (segment_size - *segment_first <= MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t))
if (segment_size - *segment_first <= MPID_NEM_MPICH_DATA_LEN - buf_offset)
last = segment_size;
else
last = *segment_first + MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t);
MPID_Segment_pack(segment, *segment_first, &last, (char *)el->pkt.mpich.p.payload + sizeof(MPIDI_CH3_Pkt_t));
datalen = sizeof(MPIDI_CH3_Pkt_t) + last - *segment_first;
last = *segment_first + MPID_NEM_MPICH_DATA_LEN - buf_offset;
MPID_Segment_pack(segment, *segment_first, &last, (char *)el->pkt.mpich.p.payload + buf_offset);
datalen = buf_offset + last - *segment_first;
*segment_first = last;
el->pkt.mpich.source = my_rank;
......
......@@ -95,10 +95,12 @@ int MPID_nem_mpich_getv (MPID_IOV **s_iov, int *s_niov, MPID_IOV **d_iov, int *d
#if !defined (MPID_NEM_INLINE) || !MPID_NEM_INLINE
int MPID_nem_mpich_send_header(void* buf, int size, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_sendv(MPID_IOV **iov, int *n_iov, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_sendv_header(MPID_IOV **iov, int *n_iov, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_sendv_header(MPID_IOV **iov, int *n_iov, void *ext_header,
MPIDI_msg_sz_t ext_header_sz, struct MPIDI_VC *vc, int *again);
void MPID_nem_mpich_send_seg(MPID_Segment segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_sz, struct MPIDI_VC *vc, int *again);
void MPID_nem_mpich_send_seg_header(MPID_Segment segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
void *header, MPIDI_msg_sz_t header_sz, struct MPIDI_VC *vc, int *again);
void *header, MPIDI_msg_sz_t header_sz, void *ext_header,
MPIDI_msg_sz_t ext_header_sz, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_test_recv(MPID_nem_cell_ptr_t *cell, int *in_fbox, int in_blocking_progress);
int MPID_nem_mpich_test_recv_wait(MPID_nem_cell_ptr_t *cell, int *in_fbox, int timeout);
int MPID_nem_recv_seqno_matches(MPID_nem_queue_ptr_t qhead) ;
......
......@@ -132,7 +132,7 @@ static inline list_item_t *list_dequeue(list_head_t * list_head)
#define MXM_MPICH_MAX_ADDR_SIZE 512
#define MXM_MPICH_ENDPOINT_KEY "endpoint_id"
#define MXM_MPICH_MAX_REQ 100
#define MXM_MPICH_MAX_IOV 2
#define MXM_MPICH_MAX_IOV 3
/* The vc provides a generic buffer in which network modules can store
......
......@@ -54,13 +54,22 @@ int MPID_nem_mxm_iSendContig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr, MP
req_area->ctx = sreq;
req_area->iov_buf = req_area->tmp_buf;
req_area->iov_count = 1;
req_area->iov_buf[0].ptr = (void *) &(sreq->dev.pending_pkt);
req_area->iov_buf[0].length = sizeof(MPIDI_CH3_Pkt_t);
req_area->iov_count = 0;
req_area->iov_buf[req_area->iov_count].ptr = (void *) &(sreq->dev.pending_pkt);
req_area->iov_buf[req_area->iov_count].length = sizeof(MPIDI_CH3_Pkt_t);
(req_area->iov_count)++;
if (sreq->dev.ext_hdr_sz != 0) {
req_area->iov_buf[req_area->iov_count].ptr = (void *) (sreq->dev.ext_hdr_ptr);
req_area->iov_buf[req_area->iov_count].length = sreq->dev.ext_hdr_sz;
(req_area->iov_count)++;
}
if (data_sz) {
req_area->iov_count = 2;
req_area->iov_buf[1].ptr = (void *) data;
req_area->iov_buf[1].length = data_sz;
req_area->iov_buf[req_area->iov_count].ptr = (void *) data;
req_area->iov_buf[req_area->iov_count].length = data_sz;
(req_area->iov_count)++;
}
vc_area->pending_sends += 1;
......@@ -175,9 +184,17 @@ int MPID_nem_mxm_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr,
req_area->ctx = sreq;
req_area->iov_buf = req_area->tmp_buf;
req_area->iov_count = 1;
req_area->iov_buf[0].ptr = (void *) &(sreq->dev.pending_pkt);
req_area->iov_buf[0].length = sizeof(MPIDI_CH3_Pkt_t);
req_area->iov_count = 0;
req_area->iov_buf[req_area->iov_count].ptr = (void *) &(sreq->dev.pending_pkt);
req_area->iov_buf[req_area->iov_count].length = sizeof(MPIDI_CH3_Pkt_t);
(req_area->iov_count)++;
if (sreq->dev.ext_hdr_ptr != NULL) {
req_area->iov_buf[req_area->iov_count].ptr = (void *) (sreq->dev.ext_hdr_ptr);
req_area->iov_buf[req_area->iov_count].length = sreq->dev.ext_hdr_sz;
(req_area->iov_count)++;
}
last = sreq->dev.segment_size;
......@@ -193,9 +210,9 @@ int MPID_nem_mxm_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr,
MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, sreq->dev.tmpbuf);
MPIU_Assert(last == sreq->dev.segment_size);
req_area->iov_count = 2;
req_area->iov_buf[1].ptr = sreq->dev.tmpbuf;
req_area->iov_buf[1].length = last - sreq->dev.segment_first;
req_area->iov_buf[req_area->iov_count].ptr = sreq->dev.tmpbuf;
req_area->iov_buf[req_area->iov_count].length = last - sreq->dev.segment_first;
(req_area->iov_count)++;
}
vc_area->pending_sends += 1;
......
......@@ -96,6 +96,8 @@ static inline MPID_nem_ptl_req_area * REQ_PTL(MPID_Request *req) {
(sreq_)->dev.datatype_ptr = NULL; \
(sreq_)->dev.segment_ptr = NULL; \
(sreq_)->dev.tmpbuf = NULL; \
(sreq_)->dev.ext_hdr_ptr = NULL; \
(sreq_)->dev.ext_hdr_sz = 0; \
\
MPID_nem_ptl_init_req(sreq_); \
} while (0)
......
......@@ -152,8 +152,8 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
char *sendbuf;
const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
const size_t remaining = data_sz - sent_sz;
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);
......@@ -163,12 +163,19 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
if (sreq->dev.ext_hdr_sz > 0) {
/* copy extended packet header to send buf */
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
}
TMPBUF(sreq) = NULL;
REQ_PTL(sreq)->num_gets = 0;
REQ_PTL(sreq)->put_done = 0;
if (data_sz) {
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t), data_p, sent_sz);
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz, data_p, sent_sz);
if (remaining) /* Post MEs for the remote gets */
mpi_errno = meappend_large(vc_ptl->id, sreq, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank),
(char *)data_p + sent_sz, remaining);
......@@ -208,8 +215,8 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
int ret;
char *sendbuf;
const size_t data_sz = sreq->dev.segment_size - sreq->dev.segment_first;
const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
const size_t remaining = data_sz - sent_sz;
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
......@@ -218,6 +225,13 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
if (sreq->dev.ext_hdr_sz > 0) {
/* copy extended packet header to send buf */
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
}
TMPBUF(sreq) = NULL;
REQ_PTL(sreq)->num_gets = 0;
REQ_PTL(sreq)->put_done = 0;
......@@ -225,7 +239,7 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
if (data_sz) {
MPIDI_msg_sz_t first = sreq->dev.segment_first;
MPIDI_msg_sz_t last = sreq->dev.segment_first + sent_sz;
MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf + sizeof(MPIDI_CH3_Pkt_t));
MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz);
if (remaining) { /* Post MEs for the remote gets */
TMPBUF(sreq) = MPIU_Malloc(remaining);
......
......@@ -524,14 +524,24 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
iov[0].MPID_IOV_BUF = hdr;
iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov[1].MPID_IOV_BUF = data;
iov[1].MPID_IOV_LEN = data_sz;
MPID_IOV iov[3];
int iov_n = 0;
iov[iov_n].MPID_IOV_BUF = hdr;
iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov_n++;
if (sreq->dev.ext_hdr_sz != 0) {
iov[iov_n].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
iov[iov_n].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
iov_n++;
}
iov[iov_n].MPID_IOV_BUF = data;
iov[iov_n].MPID_IOV_LEN = data_sz;
iov_n++;
offset = MPL_large_writev(sc->fd, iov, 2);
offset = MPL_large_writev(sc->fd, iov, iov_n);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
......@@ -556,7 +566,7 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
if (offset == sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz + data_sz)
{
/* sent whole message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
......@@ -599,25 +609,45 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
/* save iov */
sreq->dev.iov_count = 0;
if (offset < sizeof(MPIDI_CH3_Pkt_t))
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
sreq->dev.iov[0].MPID_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
sreq->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
sreq->dev.iov_count++;
if (sreq->dev.ext_hdr_sz > 0) {
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
sreq->dev.iov_count++;
}
if (data_sz)
{
sreq->dev.iov[1].MPID_IOV_BUF = data;
sreq->dev.iov[1].MPID_IOV_LEN = data_sz;
sreq->dev.iov_count = 2;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = data;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz;
sreq->dev.iov_count++;
}
}
else if (offset < sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz) {
MPIU_Assert(sreq->dev.ext_hdr_sz > 0);
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
sreq->dev.iov_count++;
if (data_sz) {
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = data;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz;
sreq->dev.iov_count++;
}
else
sreq->dev.iov_count = 1;
}
else
{
sreq->dev.iov[0].MPID_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov[0].MPID_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
sreq->dev.iov_count = 1;
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
sreq->dev.iov_count++;
}
enqueue_request:
......@@ -668,21 +698,31 @@ int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
MPIDI_msg_sz_t offset;
int complete;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
int iov_offset;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_SendNoncontig");
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
iov[0].MPID_IOV_BUF = header;
iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov_n = MPID_IOV_LIMIT - 1;
iov_n = 0;
iov[iov_n].MPID_IOV_BUF = header;
iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
iov_n++;
if (sreq->dev.ext_hdr_ptr != NULL) {
iov[iov_n].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
iov[iov_n].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
iov_n++;
}
iov_offset = iov_n;
mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n);
mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[iov_offset], &iov_n);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|loadsendiov");
iov_n += 1;
iov_n += iov_offset;
offset = 0;
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
......
......@@ -72,7 +72,9 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
int remaining_n_iov = n_iov;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "iSendv");
mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov, vc, &again);
mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov,
sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz,
vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
while (!again && remaining_n_iov > 0)
{
......
......@@ -89,7 +89,8 @@ int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, " + len=%d ", total);
});
mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov, vc, &again);
mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov,
NULL, 0, vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
while (!again && (remaining_n_iov > 0))
{
......
......@@ -195,7 +195,8 @@ int MPIDI_CH3I_Shm_send_progress(void)
iov = &sreq->dev.iov[sreq->dev.iov_offset];
n_iov = sreq->dev.iov_count;
mpi_errno = MPID_nem_mpich_sendv_header(&iov, &n_iov, sreq->ch.vc, &again);
mpi_errno = MPID_nem_mpich_sendv_header(&iov, &n_iov, sreq->dev.ext_hdr_ptr,
sreq->dev.ext_hdr_sz, sreq->ch.vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
if (!again)
{
......@@ -219,7 +220,8 @@ int MPIDI_CH3I_Shm_send_progress(void)
else
{
MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
&sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again);
&sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->dev.ext_hdr_ptr,
sreq->dev.ext_hdr_sz, sreq->ch.vc, &again);
if (!again)
{
MPIDI_CH3I_shm_active_send = sreq;
......
......@@ -50,7 +50,8 @@ int MPIDI_CH3I_SendNoncontig( MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
}
/* send as many cells of data as you can */
MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, header, hdr_sz, vc, &again);
MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
header, hdr_sz, sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz, vc, &again);
while(!again && sreq->dev.segment_first < sreq->dev.segment_size)
MPID_nem_mpich_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, vc, &again);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment