Commit e8d4c6d5 authored by Xin Zhao's avatar Xin Zhao
Browse files

Add IMMED area in packet header.



We add a IMMED data area (16 bytes by default) in
packet header which will contains as much origin
data as possible. If origin can put all data in
packet header, then it no longer needs to send
separate data packet. When target recieves the
packet header, it will first copy data out from
the IMMED data area. If there is still more
data coming, it continues to receive following
packets; if all data is included in header, then
recieving is done.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 1c638a12
......@@ -23,7 +23,7 @@
#define MPIDI_EAGER_SHORT_SIZE 16
/* This is the number of ints that can be carried within an RMA packet */
#define MPIDI_RMA_IMMED_INTS 1
#define MPIDI_RMA_IMMED_BYTES 16
/* Union over all types (integer, logical, and multi-language types) that are
allowed in a CAS operation. This is used to allocate enough space in the
......@@ -45,9 +45,6 @@ typedef union {
Fetch-and-op operation. This can be too large for the packet header, so we
limit the immediate space in the header to FOP_IMMED_INTS. */
#define MPIDI_RMA_FOP_IMMED_INTS 2
#define MPIDI_RMA_FOP_RESP_IMMED_INTS 8
/* *INDENT-OFF* */
/* Indentation turned off because "indent" is getting confused with
* the lack of a semi-colon in the fields below */
......@@ -411,6 +408,8 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
} MPIDI_CH3_Pkt_put_t;
typedef struct MPIDI_CH3_Pkt_get {
......@@ -454,6 +453,8 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
} MPIDI_CH3_Pkt_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum {
......@@ -472,6 +473,8 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
} MPIDI_CH3_Pkt_get_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum_resp {
......@@ -539,13 +542,15 @@ typedef struct MPIDI_CH3_Pkt_fop {
* epoch for decrementing rma op counter in
* active target rma and for unlocking window
* in passive target rma. Otherwise set to NULL*/
int origin_data[MPIDI_RMA_FOP_IMMED_INTS];
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
} MPIDI_CH3_Pkt_fop_t;
typedef struct MPIDI_CH3_Pkt_fop_resp {
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
int data[MPIDI_RMA_FOP_RESP_IMMED_INTS];
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
/* followings are used to decrement ack_counter at orign */
int target_rank;
MPI_Win source_win_handle;
......
......@@ -465,6 +465,7 @@ typedef struct MPIDI_Request {
MPI_Op op;
/* For accumulate, since data is first read into a tmp_buf */
void *real_user_buf;
void *final_user_buf;
/* For derived datatypes at target */
struct MPIDI_RMA_dtype_info *dtype_info;
void *dataloop;
......
......@@ -307,10 +307,12 @@ static inline int do_accumulate_op(MPID_Request *rreq)
MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP);
MPIU_Assert(rreq->dev.final_user_buf != NULL);
if (rreq->dev.op == MPI_REPLACE)
{
/* simply copy the data */
mpi_errno = MPIR_Localcopy(rreq->dev.user_buf, rreq->dev.user_count,
mpi_errno = MPIR_Localcopy(rreq->dev.final_user_buf, rreq->dev.user_count,
rreq->dev.datatype,
rreq->dev.real_user_buf,
rreq->dev.user_count,
......@@ -336,7 +338,7 @@ static inline int do_accumulate_op(MPID_Request *rreq)
if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
{
(*uop)(rreq->dev.user_buf, rreq->dev.real_user_buf,
(*uop)(rreq->dev.final_user_buf, rreq->dev.real_user_buf,
&(rreq->dev.user_count), &(rreq->dev.datatype));
}
else
......@@ -385,7 +387,7 @@ static inline int do_accumulate_op(MPID_Request *rreq)
for (i=0; i<vec_len; i++)
{
MPIU_Assign_trunc(count, (dloop_vec[i].DLOOP_VECTOR_LEN)/type_size, int);
(*uop)((char *)rreq->dev.user_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
(*uop)((char *)rreq->dev.final_user_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
(char *)rreq->dev.real_user_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
&count, &type);
}
......@@ -397,7 +399,7 @@ static inline int do_accumulate_op(MPID_Request *rreq)
fn_exit:
/* free the temporary buffer */
MPIR_Type_get_true_extent_impl(rreq->dev.datatype, &true_lb, &true_extent);
MPIU_Free((char *) rreq->dev.user_buf + true_lb);
MPIU_Free((char *) rreq->dev.final_user_buf + true_lb);
MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
......
......@@ -371,6 +371,7 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete( MPIDI_VC_t *vc ATTRIBUTE((u
tmp_buf = (void *)((char*)tmp_buf - true_lb);
rreq->dev.user_buf = tmp_buf;
rreq->dev.final_user_buf = rreq->dev.user_buf;
rreq->dev.datatype = new_dtp->handle;
rreq->dev.recv_data_sz = new_dtp->size *
rreq->dev.user_count;
......@@ -442,6 +443,7 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete( MPIDI_VC_t *vc ATTRIBUTE((
tmp_buf = (void *)((char*)tmp_buf - true_lb);
rreq->dev.user_buf = tmp_buf;
rreq->dev.final_user_buf = rreq->dev.user_buf;
rreq->dev.datatype = new_dtp->handle;
rreq->dev.recv_data_sz = new_dtp->size *
rreq->dev.user_count;
......
......@@ -92,6 +92,7 @@ MPID_Request * MPID_Request_create(void)
req->dev.OnDataAvail = NULL;
req->dev.OnFinal = NULL;
req->dev.user_buf = NULL;
req->dev.final_user_buf = NULL;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
#endif
......
......@@ -109,6 +109,7 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
put_pkt->dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->immed_len = 0;
/* FIXME: For contig and very short operations, use a streamlined op */
new_ptr->origin_addr = (void *) origin_addr;
......@@ -133,6 +134,28 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
new_ptr->is_dt = 1;
}
/* If both origin and target are basic datatype, try to
copy origin data to packet header as much as possible. */
if (!new_ptr->is_dt) {
size_t len;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
/* length of origin data */
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
/* length of origin data that can fit into immed area in pkt header */
MPIU_Assign_trunc(put_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / origin_type_size) * origin_type_size),
size_t);
if (put_pkt->immed_len > 0) {
void *src = new_ptr->origin_addr, *dest = put_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, put_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -418,6 +441,7 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
accum_pkt->source_win_handle = win_ptr->handle;
accum_pkt->immed_len = 0;
new_ptr->origin_addr = (void *) origin_addr;
new_ptr->origin_count = origin_count;
......@@ -441,6 +465,28 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
new_ptr->is_dt = 1;
}
/* If both origin and target are basic datatype, try to
copy origin data to packet header as much as possible. */
if (!new_ptr->is_dt) {
size_t len;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
/* length of origin data */
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
/* length of origin data that can fit into immed areas in packet header */
MPIU_Assign_trunc(accum_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / origin_type_size) * origin_type_size),
size_t);
if (accum_pkt->immed_len > 0) {
void *src = new_ptr->origin_addr, *dest = accum_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
issue_ops:
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -590,6 +636,7 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
get_accum_pkt->source_win_handle = win_ptr->handle;
get_accum_pkt->immed_len = 0;
new_ptr->origin_addr = (void *) origin_addr;
new_ptr->origin_count = origin_count;
......@@ -616,6 +663,28 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPID_Datatype_add_ref(dtp);
new_ptr->is_dt = 1;
}
/* If all buffers are basic datatype, try to copy origin data to
packet header as much as possible. */
if (!new_ptr->is_dt) {
size_t len;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
/* length of origin data */
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
/* length of origin data that can fit into immed area in packet header */
MPIU_Assign_trunc(get_accum_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / origin_type_size) * origin_type_size),
size_t);
if (get_accum_pkt->immed_len > 0) {
void *src = new_ptr->origin_addr, *dest = get_accum_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
}
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
......@@ -857,6 +926,9 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
}
else {
MPIDI_CH3_Pkt_fop_t *fop_pkt = &(new_ptr->pkt.fop);
size_t len;
MPI_Aint origin_type_size;
MPIDI_Pkt_init(fop_pkt, MPIDI_CH3_PKT_FOP);
fop_pkt->addr = (char *) win_ptr->base_addrs[target_rank] +
win_ptr->disp_units[target_rank] * target_disp;
......@@ -864,6 +936,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
fop_pkt->op = op;
fop_pkt->source_win_handle = win_ptr->handle;
fop_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
fop_pkt->immed_len = 0;
new_ptr->origin_addr = (void *) origin_addr;
new_ptr->origin_count = 1;
......@@ -871,6 +944,21 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
new_ptr->result_addr = result_addr;
new_ptr->result_datatype = datatype;
new_ptr->target_rank = target_rank;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
/* length of origin data */
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
/* length of origin data that can fit into immed area in pkt header */
MPIU_Assign_trunc(fop_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / origin_type_size) * origin_type_size),
size_t);
if (fop_pkt->immed_len > 0) {
void *src = new_ptr->origin_addr, *dest = fop_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, fop_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
......
......@@ -59,6 +59,26 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * put_pkt->count;
if (put_pkt->immed_len > 0) {
/* See if we can receive some data from packet header. */
MPIU_Memcpy(req->dev.user_buf, put_pkt->data, put_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + put_pkt->immed_len);
req->dev.recv_data_sz -= put_pkt->immed_len;
}
if (req->dev.recv_data_sz == 0) {
/* All data received, trigger req handler. */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(vc, req, &complete);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (complete) {
*rreqp = NULL;
goto fn_exit;
}
}
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
......@@ -349,10 +369,29 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
req->dev.user_buf = tmp_buf;
req->dev.final_user_buf = req->dev.user_buf;
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
if (accum_pkt->immed_len > 0) {
/* See if we can receive some data from packet header. */
MPIU_Memcpy(req->dev.user_buf, accum_pkt->data, accum_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + accum_pkt->immed_len);
req->dev.recv_data_sz -= accum_pkt->immed_len;
}
if (req->dev.recv_data_sz == 0) {
/* All data received, trigger req handler. */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, req, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (complete) {
*rreqp = NULL;
goto fn_exit;
}
}
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
......@@ -496,10 +535,31 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
req->dev.user_buf = tmp_buf;
req->dev.final_user_buf = req->dev.user_buf;
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * get_accum_pkt->count;
if (get_accum_pkt->immed_len > 0) {
/* See if we can receive some data from packet header. */
MPIU_Memcpy(req->dev.user_buf, get_accum_pkt->data, get_accum_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + get_accum_pkt->immed_len);
req->dev.recv_data_sz -= get_accum_pkt->immed_len;
}
if (req->dev.recv_data_sz == 0) {
/* All data received. */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, req, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (complete) {
*rreqp = NULL;
goto fn_exit;
}
}
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment