Commit 25e40e43 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Add extended packet header in CH3 layer used by RMA messages



Here we added extended packet header in CH3 layer used to
transmit attributes that are only needed in RMA and are not
needed in two-sided communication. The key implementation
details are listed as follows:

Origin side:

(1) The extended packet header is stored in the request, and
the request is passed to the issuing function (iSendv() or
sendNoncontig_fn()) in the lower layer. The issuing function
checks if the extended packet header exists in the request,
if so, it will issue that header. (The modifications in lower
layer are in the next commit.)

(2) There is a fast path used when (origin data is contiguous &&
target data is predefined && extended packet header is not used).
In such case, we do not need to create a request beforehand
but can use iStartMsgv() issuing function which try to issue
the entire message as soon as possible.

Target side:

(1) There are two req handler being used when extended packet header
is used or target datatype is derived. The first req handler is
triggered when extended packet header / target datatype info is
arrived, and the second req handler is triggered when actual data
is arrived.

(2) When target side receives a stream unit which is piggybacked with
LOCK, it will drop the stream_offset in extended packet header, since
the stream unit must be the first one and stream_offset must be 0.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent e0eaed63
...@@ -361,6 +361,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -361,6 +361,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL; MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig; int is_origin_contig;
MPID_IOV iov[MPID_IOV_LIMIT]; MPID_IOV iov[MPID_IOV_LIMIT];
int iovcnt = 0;
MPID_Request *req = NULL; MPID_Request *req = NULL;
int count; int count;
int *ints = NULL; int *ints = NULL;
...@@ -368,6 +369,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -368,6 +369,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPI_Aint *displaces = NULL; MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL; MPI_Datatype *datatypes = NULL;
MPI_Aint dt_true_lb; MPI_Aint dt_true_lb;
MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM); MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
...@@ -398,18 +400,23 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -398,18 +400,23 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig); MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb); MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt); iov[iovcnt].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt); iov[iovcnt].MPID_IOV_LEN = sizeof(rma_op->pkt);
iovcnt++;
if (target_dtp == NULL) {
/* basic datatype on target */ MPIDI_CH3_PKT_RMA_GET_FLAGS(rma_op->pkt, flags, mpi_errno);
if (is_origin_contig) { if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && target_dtp == NULL && is_origin_contig) {
/* origin data is contiguous */ /* Fast path --- use iStartMsgv() to issue the data, which does not need a request
int iovcnt = 2; * to be passed in:
* (1) non-streamed op (do not need to send extended packet header);
* (2) target datatype is predefined (do not need to send derived datatype info);
* (3) origin datatype is contiguous (do not need to pack the data and send);
*/
iov[1].MPID_IOV_BUF = iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset); (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[1].MPID_IOV_LEN = stream_size; iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
MPIU_THREAD_CS_ENTER(CH3COMM, vc); MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req); mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
...@@ -426,24 +433,71 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -426,24 +433,71 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
req->dev.datatype_ptr = origin_dtp; req->dev.datatype_ptr = origin_dtp;
} }
} }
goto fn_exit;
} }
else {
/* origin data is non-contiguous */ /* Normal path: use iSendv() and sendNoncontig_fn() to issue the data, which
* always need a request to be passed in. */
/* create a new request */
req = MPID_Request_create(); req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq"); MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(req, 2); MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND; req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc(); /* allocate and fill in extended packet header in the request */
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc"); MPIU_Assert(rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_accum_t));
if (!req->dev.ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_CH3_Ext_pkt_accum_t");
}
req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_t);
((MPIDI_CH3_Ext_pkt_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
}
else {
req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
if (!req->dev.ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_CH3_Ext_pkt_get_accum_t");
}
req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
((MPIDI_CH3_Ext_pkt_get_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
}
}
if (target_dtp == NULL) {
/* basic datatype on target */
if (origin_dtp != NULL) { if (origin_dtp != NULL) {
req->dev.datatype_ptr = origin_dtp; req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request /* this will cause the datatype to be freed when the request
* is freed. */ * is freed. */
} }
if (is_origin_contig) {
/* origin data is contiguous */
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
/* origin data is non-contiguous */
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0); rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = stream_offset; req->dev.segment_first = stream_offset;
...@@ -468,14 +522,6 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -468,14 +522,6 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIDI_msg_sz_t first = stream_offset; MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size; MPIDI_msg_sz_t last = stream_offset + stream_size;
req = MPID_Request_create();
if (req == NULL) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
}
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc(); req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER, MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc"); "**nomem", "**nomem %s", "MPID_Segment_alloc");
...@@ -556,9 +602,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * ...@@ -556,9 +602,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype_release(target_dtp); MPID_Datatype_release(target_dtp);
} }
fn_exit:
(*req_ptr) = req; (*req_ptr) = req;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM); MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
return mpi_errno; return mpi_errno;
fn_fail: fn_fail:
...@@ -709,6 +755,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, ...@@ -709,6 +755,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1; stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0); MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
/* If there are more than one stream unit, mark the current packet
* as stream packet */
if (stream_unit_count > 1)
flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
rest_len = total_len; rest_len = total_len;
MPIU_Assert(rma_op->issued_stream_count >= 0); MPIU_Assert(rma_op->issued_stream_count >= 0);
for (j = 0; j < stream_unit_count; j++) { for (j = 0; j < stream_unit_count; j++) {
...@@ -734,8 +785,6 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, ...@@ -734,8 +785,6 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len); stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size; rest_len -= stream_size;
accum_pkt->info.metadata.stream_offset = stream_offset;
mpi_errno = mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req); issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
...@@ -884,6 +933,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, ...@@ -884,6 +933,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1; stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0); MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
/* If there are more than one stream unit, mark the current packet
* as stream packet */
if (stream_unit_count > 1)
flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
rest_len = total_len; rest_len = total_len;
rma_op->reqs_size = stream_unit_count; rma_op->reqs_size = stream_unit_count;
...@@ -943,9 +997,18 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, ...@@ -943,9 +997,18 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len); stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size; rest_len -= stream_size;
get_accum_pkt->info.metadata.stream_offset = stream_offset; if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
/* allocate extended packet header in request */
resp_req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
if (!resp_req->dev.ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_CH3_Ext_pkt_get_accum_t");
}
resp_req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
resp_req->dev.stream_offset = stream_offset; ((MPIDI_CH3_Ext_pkt_get_accum_t *) resp_req->dev.ext_hdr_ptr)->stream_offset =
stream_offset;
}
mpi_errno = mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req); issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
......
...@@ -354,6 +354,8 @@ extern MPIDI_Process_t MPIDI_Process; ...@@ -354,6 +354,8 @@ extern MPIDI_Process_t MPIDI_Process;
(sreq_)->dev.iov_count = 0; \ (sreq_)->dev.iov_count = 0; \
(sreq_)->dev.iov_offset = 0; \ (sreq_)->dev.iov_offset = 0; \
(sreq_)->dev.tmpbuf = NULL; \ (sreq_)->dev.tmpbuf = NULL; \
(sreq_)->dev.ext_hdr_ptr = NULL; \
(sreq_)->dev.ext_hdr_sz = 0; \
MPIDI_Request_clear_dbg(sreq_); \ MPIDI_Request_clear_dbg(sreq_); \
} }
...@@ -388,6 +390,8 @@ extern MPIDI_Process_t MPIDI_Process; ...@@ -388,6 +390,8 @@ extern MPIDI_Process_t MPIDI_Process;
(rreq_)->dev.OnFinal = NULL; \ (rreq_)->dev.OnFinal = NULL; \
(rreq_)->dev.drop_data = FALSE; \ (rreq_)->dev.drop_data = FALSE; \
(rreq_)->dev.tmpbuf = NULL; \ (rreq_)->dev.tmpbuf = NULL; \
(rreq_)->dev.ext_hdr_ptr = NULL; \
(rreq_)->dev.ext_hdr_sz = 0; \
MPIDI_CH3_REQUEST_INIT(rreq_);\ MPIDI_CH3_REQUEST_INIT(rreq_);\
} }
......
...@@ -129,7 +129,8 @@ typedef enum { ...@@ -129,7 +129,8 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED = 1024, MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED = 1024,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED = 2048, MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED = 2048,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 4096, MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 4096,
MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 8192 MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 8192,
MPIDI_CH3_PKT_FLAG_RMA_STREAM = 16384
} MPIDI_CH3_Pkt_flags_t; } MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send { typedef struct MPIDI_CH3_Pkt_send {
...@@ -522,31 +523,16 @@ MPIDI_CH3_PKT_DEFS ...@@ -522,31 +523,16 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \ err_ = MPI_SUCCESS; \
switch((pkt_).type) { \ switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \ case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \ (pkt_).put.info.dataloop_size = (dataloop_size_); \
break; \ break; \
case (MPIDI_CH3_PKT_GET): \ case (MPIDI_CH3_PKT_GET): \
(pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \ (pkt_).get.info.dataloop_size = (dataloop_size_); \
break; \ break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \ case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \ (pkt_).accum.info.dataloop_size = (dataloop_size_); \
break; \ break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \ case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \ (pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
break; \ break; \
default: \ default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \ MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
...@@ -609,12 +595,7 @@ typedef struct MPIDI_CH3_Pkt_put { ...@@ -609,12 +595,7 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle; MPI_Win target_win_handle;
MPI_Win source_win_handle; MPI_Win source_win_handle;
union { union {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size; int dataloop_size;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES]; char data[MPIDI_RMA_IMMED_BYTES];
} info; } info;
} MPIDI_CH3_Pkt_put_t; } MPIDI_CH3_Pkt_put_t;
...@@ -625,13 +606,8 @@ typedef struct MPIDI_CH3_Pkt_get { ...@@ -625,13 +606,8 @@ typedef struct MPIDI_CH3_Pkt_get {
void *addr; void *addr;
int count; int count;
MPI_Datatype datatype; MPI_Datatype datatype;
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct { struct {
int dataloop_size; /* for derived datatypes */ int dataloop_size; /* for derived datatypes */
} metadata;
} info; } info;
MPI_Request request_handle; MPI_Request request_handle;
MPI_Win target_win_handle; MPI_Win target_win_handle;
...@@ -662,10 +638,7 @@ typedef struct MPIDI_CH3_Pkt_accum { ...@@ -662,10 +638,7 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle; MPI_Win target_win_handle;
MPI_Win source_win_handle; MPI_Win source_win_handle;
union { union {
struct {
int dataloop_size; int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES]; char data[MPIDI_RMA_IMMED_BYTES];
} info; } info;
} MPIDI_CH3_Pkt_accum_t; } MPIDI_CH3_Pkt_accum_t;
...@@ -680,10 +653,7 @@ typedef struct MPIDI_CH3_Pkt_get_accum { ...@@ -680,10 +653,7 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op; MPI_Op op;
MPI_Win target_win_handle; MPI_Win target_win_handle;
union { union {
struct {
int dataloop_size; int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES]; char data[MPIDI_RMA_IMMED_BYTES];
} info; } info;
} MPIDI_CH3_Pkt_get_accum_t; } MPIDI_CH3_Pkt_get_accum_t;
...@@ -865,6 +835,14 @@ typedef union MPIDI_CH3_Pkt { ...@@ -865,6 +835,14 @@ typedef union MPIDI_CH3_Pkt {
#endif #endif
} MPIDI_CH3_Pkt_t; } MPIDI_CH3_Pkt_t;
typedef struct MPIDI_CH3_Ext_pkt_accum {
MPI_Aint stream_offset;
} MPIDI_CH3_Ext_pkt_accum_t;
typedef struct MPIDI_CH3_Ext_pkt_get_accum {
MPI_Aint stream_offset;
} MPIDI_CH3_Ext_pkt_get_accum_t;
#if defined(MPID_USE_SEQUENCE_NUMBERS) #if defined(MPID_USE_SEQUENCE_NUMBERS)
typedef struct MPIDI_CH3_Pkt_send_container { typedef struct MPIDI_CH3_Pkt_send_container {
MPIDI_CH3_Pkt_send_t pkt; MPIDI_CH3_Pkt_send_t pkt;
......
...@@ -447,8 +447,8 @@ typedef struct MPIDI_Request { ...@@ -447,8 +447,8 @@ typedef struct MPIDI_Request {
struct MPIDI_RMA_Lock_entry *lock_queue_entry; struct MPIDI_RMA_Lock_entry *lock_queue_entry;
MPI_Request resp_request_handle; /* Handle for get_accumulate response */ MPI_Request resp_request_handle; /* Handle for get_accumulate response */
MPI_Aint stream_offset; /* used when streaming ACC/GACC packets, specifying the start void *ext_hdr_ptr; /* pointer to extended packet header */
location of the current streaming unit. */ MPIDI_msg_sz_t ext_hdr_sz;
MPIDI_REQUEST_SEQNUM MPIDI_REQUEST_SEQNUM
......
...@@ -364,11 +364,13 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc, ...@@ -364,11 +364,13 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
int complete = 0; int complete = 0;
MPIDI_msg_sz_t data_len; MPIDI_msg_sz_t data_len;
char *data_buf = NULL; char *data_buf = NULL;
MPIDI_CH3_Pkt_flags_t flags;
/* This is PUT, ACC, GACC, FOP */ /* This is PUT, ACC, GACC, FOP */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno); MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno); MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPID_Datatype_get_extent_macro(target_dtp, type_extent); MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size); MPID_Datatype_get_size_macro(target_dtp, type_size);
...@@ -378,17 +380,29 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc, ...@@ -378,17 +380,29 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
buf_size = type_extent * target_count; buf_size = type_extent * target_count;
} }
else { else {
MPI_Aint stream_offset, stream_elem_count; MPI_Aint stream_elem_count;
MPI_Aint total_len, rest_len; MPI_Aint total_len;
MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent; stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count; total_len = type_size * target_count;
rest_len = total_len - stream_offset; recv_data_sz = MPIR_MIN(total_len, type_size * stream_elem_count);
recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size); buf_size = type_extent * (recv_data_sz / type_size);
} }
if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE ||
pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
if (pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
buf_size += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
}
else {
recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
buf_size += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
}
}
if (new_ptr != NULL) { if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) { if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
new_ptr->data = MPIU_Malloc(buf_size); new_ptr->data = MPIU_Malloc(buf_size);
...@@ -404,10 +418,8 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc, ...@@ -404,10 +418,8 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPIDI_CH3_Pkt_t new_pkt; MPIDI_CH3_Pkt_t new_pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock; MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
MPI_Win target_win_handle; MPI_Win target_win_handle;
MPIDI_CH3_Pkt_flags_t flags;
MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno); MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) { if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno); MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
......
...@@ -139,6 +139,7 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, ...@@ -139,6 +139,7 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq,
MPIDI_CH3_Pkt_flags_t flags = rreq->dev.flags; MPIDI_CH3_Pkt_flags_t flags = rreq->dev.flags;
MPI_Datatype basic_type; MPI_Datatype basic_type;
MPI_Aint predef_count, predef_dtp_size; MPI_Aint predef_count, predef_dtp_size;
MPI_Aint stream_offset;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE); MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
...@@ -176,12 +177,19 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, ...@@ -176,12 +177,19 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq,
predef_count = rreq->dev.recv_data_sz / predef_dtp_size; predef_count = rreq->dev.recv_data_sz / predef_dtp_size;
MPIU_Assert(predef_count > 0); MPIU_Assert(predef_count > 0);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
stream_offset = ((MPIDI_CH3_Ext_pkt_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
}
else
stream_offset = 0;
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* accumulate data from tmp_buf into user_buf */ /* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, predef_count, basic_type, mpi_errno = do_accumulate_op(rreq->dev.user_buf, predef_count, basic_type,
rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype, rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
rreq->dev.stream_offset, rreq->dev.op); stream_offset, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) { if (mpi_errno) {
...@@ -233,6 +241,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq ...@@ -233,6 +241,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPI_Datatype basic_type; MPI_Datatype basic_type;
MPI_Aint predef_count, predef_dtp_size; MPI_Aint predef_count, predef_dtp_size;
MPI_Aint dt_true_lb; MPI_Aint dt_true_lb;
MPI_Aint stream_offset;
MPIU_CHKPMEM_DECL(1); MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE); MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
...@@ -262,6 +271,14 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq ...@@ -262,6 +271,14 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK; get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
stream_offset = ((MPIDI_CH3_Ext_pkt_get_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
}