Commit d8eb8de2 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Add stream_offset to ACC-related packets and request struct.



Add stream_offset area into ACC-related packets and request struct
to remember current stream unit's starting position in the entire
target data.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent c986b927
......@@ -730,6 +730,8 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
accum_pkt->info.metadata.stream_offset = stream_offset;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
......@@ -937,6 +939,10 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
get_accum_pkt->info.metadata.stream_offset = stream_offset;
resp_req->dev.stream_offset = stream_offset;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
......
......@@ -522,16 +522,16 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.info.dataloop_size = (dataloop_size_); \
(pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.info.dataloop_size = (dataloop_size_); \
(pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.info.dataloop_size = (dataloop_size_); \
(pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
(pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
......@@ -594,7 +594,12 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
int dataloop_size;
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
......@@ -608,8 +613,10 @@ typedef struct MPIDI_CH3_Pkt_get {
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.dataloop_size". */
int dataloop_size; /* for derived datatypes */
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size; /* for derived datatypes */
} metadata;
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
......@@ -640,7 +647,10 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
int dataloop_size;
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
......@@ -655,7 +665,10 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
int dataloop_size;
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
......
......@@ -447,6 +447,9 @@ typedef struct MPIDI_Request {
struct MPIDI_RMA_Lock_entry *lock_queue_entry;
MPI_Request resp_request_handle; /* Handle for get_accumulate response */
MPI_Aint stream_offset; /* used when streaming ACC/GACC packets, specifying the start
location of the current streaming unit. */
MPIDI_REQUEST_SEQNUM
/* Occasionally, when a message cannot be sent, we need to cache the
......
......@@ -94,6 +94,7 @@ MPID_Request * MPID_Request_create(void)
req->dev.OnFinal = NULL;
req->dev.user_buf = NULL;
req->dev.drop_data = FALSE;
req->dev.stream_offset = 0;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
#endif
......
......@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
put_pkt->info.dataloop_size = 0;
put_pkt->info.metadata.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
......@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
......@@ -612,10 +612,11 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
accum_pkt->info.dataloop_size = 0;
accum_pkt->info.metadata.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
......@@ -809,7 +810,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......@@ -930,9 +931,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
get_accum_pkt->info.dataloop_size = 0;
get_accum_pkt->info.metadata.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
......@@ -1344,7 +1346,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......
......@@ -289,24 +289,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
put_pkt->info.dataloop_size);
put_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
put_pkt->info.dataloop_size);
put_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
put_pkt->info.dataloop_size;
put_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
......@@ -321,7 +321,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -515,24 +515,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_pkt->info.dataloop_size);
get_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
get_pkt->info.dataloop_size);
get_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
get_pkt->info.dataloop_size;
get_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
......@@ -545,7 +545,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -635,6 +635,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
......@@ -694,21 +695,21 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->info.dataloop_size);
accum_pkt->info.metadata.dataloop_size);
}
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
accum_pkt->info.dataloop_size);
accum_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
accum_pkt->info.dataloop_size;
accum_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
......@@ -723,7 +724,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
......@@ -877,6 +878,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
......@@ -935,21 +937,22 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_accum_pkt->info.dataloop_size);
get_accum_pkt->info.metadata.dataloop_size);
}
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size) {
if (data_len >=
sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
get_accum_pkt->info.dataloop_size);
get_accum_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
get_accum_pkt->info.dataloop_size;
get_accum_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
......@@ -964,7 +967,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
......@@ -1931,7 +1934,7 @@ int MPIDI_CH3_PktPrint_Put(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->put.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->put.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->put.datatype));
MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.dataloop_size));
MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->put.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->put.source_win_handle));
/*MPIU_DBG_PRINTF((" win_ptr ...... 0x%08X\n", pkt->put.win_ptr)); */
......@@ -1944,7 +1947,7 @@ int MPIDI_CH3_PktPrint_Get(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->get.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->get.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->get.datatype));
MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.dataloop_size));
MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" request ...... 0x%08X\n", pkt->get.request_handle));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->get.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->get.source_win_handle));
......@@ -1969,7 +1972,7 @@ int MPIDI_CH3_PktPrint_Accumulate(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->accum.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->accum.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->accum.datatype));
MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.dataloop_size));
MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" op ........... 0x%08X\n", pkt->accum.op));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->accum.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->accum.source_win_handle));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment