Commit 7c890ab2 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Modify SHM ACC/GACC to avoid allocate large buffer.



The original implementation of ACC/GACC on SHM first
allocates a temporary buffer which has the same data
layout as the target data, copies the entire origin
data to that temporary buffer, and then performs the
ACC computation between the temporary buffer and the
target buffer. The temporary buffer can use potentially
large amount of memory.

This patch fixes this issue as follows: (1) SHM ACC/GACC
routines directly call do_accumulate_op() function, which
requires the origin data to be in a 'packed manner';
(2) if the origin data is basic type, we directly perform
do_accumulate_op() between origin buffer and target buffer;
if the origin data is derived, we stream the origin data
by copying partial of origin data into a packed streaming
buffer and performing do_accumulate_op() between the
streaming buffer and target buffer each time.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 002ce8c8
......@@ -10,6 +10,13 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
/* define ACC stream size as the SRBuf size */
#define MPIDI_CH3U_Acc_stream_size MPIDI_CH3U_SRBuf_size
static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
void *target_buf, int target_count, MPI_Datatype target_dtp,
MPI_Aint stream_offset, MPI_Op acc_op);
#define ASSIGN_COPY(src, dest, count, type) \
{ \
type *src_ = (type *) src; \
......@@ -297,10 +304,14 @@ static inline int MPIDI_CH3I_Shm_acc_op(const void *origin_addr, int origin_coun
{
void *base = NULL;
int disp_unit, shm_op = 0;
MPI_User_function *uop = NULL;
MPID_Datatype *dtp;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(2);
int i;
MPI_Datatype predefined_type;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_extent, predefined_dtp_count;
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_ACC_OP);
......@@ -317,156 +328,95 @@ static inline int MPIDI_CH3I_Shm_acc_op(const void *origin_addr, int origin_coun
disp_unit = win_ptr->disp_unit;
}
if (op == MPI_REPLACE) {
if (shm_op)
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
if (shm_op) {
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
mpi_errno = shm_copy(origin_addr, origin_count, origin_datatype,
(char *) base + disp_unit * target_disp, target_count,
target_datatype);
if (shm_op)
}
mpi_errno = do_accumulate_op((void*)origin_addr, origin_count, origin_datatype,
(void*)((char *)base+disp_unit*target_disp), target_count, target_datatype,
0, op);
if (shm_op) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
MPIU_ERR_CHKANDJUMP1((HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN),
mpi_errno, MPI_ERR_OP, "**opnotpredefined", "**opnotpredefined %d", op);
/* Get total length of origin data */
MPID_Datatype_get_size_macro(origin_datatype, origin_dtp_size);
total_len = origin_dtp_size * origin_count;
/* get the function by indexing into the op table */
uop = MPIR_OP_HDL_TO_FN(op);
MPID_Datatype_get_ptr(origin_datatype, origin_dtp_ptr);
MPIU_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->eltype != MPI_DATATYPE_NULL);
predefined_type = origin_dtp_ptr->eltype;
MPID_Datatype_get_size_macro(predefined_type, predefined_dtp_size);
predefined_dtp_count = total_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(predefined_type, predefined_dtp_extent);
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype) &&
MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
/* Cast away const'ness for origin_address in order to
* avoid changing the prototype for MPI_User_function */
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop) ((void *) origin_addr, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
else {
/* derived datatype */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
MPID_Segment *segp;
DLOOP_VECTOR *dloop_vec;
rest_len = total_len;
for (i = 0; i < stream_unit_count; i++) {
MPID_Segment *seg = NULL;
void *packed_buf = NULL;
MPI_Aint first, last;
int vec_len, i, type_size, count;
MPI_Aint type_extent;
MPI_Datatype type;
MPI_Aint true_lb, true_extent, extent;
void *tmp_buf = NULL, *target_buf;
const void *source_buf;
MPI_Aint curr_len;
void *curr_loc;
if (origin_datatype != target_datatype) {
/* first copy the data into a temporary buffer with
* the same datatype as the target. Then do the
* accumulate operation. */
MPIR_Type_get_true_extent_impl(target_datatype, &true_lb, &true_extent);
MPID_Datatype_get_extent_macro(target_datatype, extent);
MPIU_CHKLMEM_MALLOC(tmp_buf, void *,
target_count * (MPIR_MAX(extent, true_extent)),
mpi_errno, "temporary buffer");
/* adjust for potential negative lower bound in datatype */
tmp_buf = (void *) ((char *) tmp_buf - true_lb);
mpi_errno = MPIR_Localcopy(origin_addr, origin_count,
origin_datatype, tmp_buf, target_count, target_datatype);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
int is_predef_contig;
MPI_Aint stream_offset, stream_size, stream_count;
stream_offset = i * stream_elem_count * predefined_dtp_size;
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
stream_count = stream_size / predefined_dtp_size;
rest_len -= stream_size;
first = stream_offset;
last = stream_offset + stream_size;
packed_buf = MPIU_Malloc(stream_size);
seg = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment");
MPID_Segment_init(origin_addr, origin_count, origin_datatype, seg, 0);
MPID_Segment_pack(seg, first, &last, packed_buf);
MPID_Segment_free(seg);
MPID_Datatype_is_contig(predefined_type, &is_predef_contig);
if (!is_predef_contig) {
void *tmpbuf = MPIU_Malloc(stream_count * predefined_dtp_extent);
mpi_errno = MPIR_Localcopy(tmpbuf, stream_count, predefined_type,
packed_buf, stream_size, MPI_BYTE);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Free(packed_buf);
packed_buf = tmpbuf;
}
if (MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
/* target predefined type, origin derived datatype */
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop) (tmp_buf, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (shm_op) {
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
}
else {
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((!segp), mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(NULL, target_count, target_datatype, segp, 0);
first = 0;
last = SEGMENT_IGNORE_LAST;
MPID_Datatype_get_ptr(target_datatype, dtp);
vec_len = dtp->max_contig_blocks * target_count + 1;
/* +1 needed because Rob says so */
MPIU_CHKLMEM_MALLOC(dloop_vec, DLOOP_VECTOR *,
vec_len * sizeof(DLOOP_VECTOR), mpi_errno, "dloop vector");
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
source_buf = (tmp_buf != NULL) ? (const void *) tmp_buf : origin_addr;
target_buf = (char *) base + disp_unit * target_disp;
type = dtp->eltype;
MPIU_Assert(type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(type, type_size);
MPID_Datatype_get_extent_macro(type, type_extent);
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
i = 0;
curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF;
curr_len = dloop_vec[0].DLOOP_VECTOR_LEN;
while (i != vec_len) {
if (curr_len < type_size) {
MPIU_Assert(i != vec_len);
i++;
curr_len += dloop_vec[i].DLOOP_VECTOR_LEN;
continue;
}
MPIU_Assign_trunc(count, curr_len/type_size, int);
(*uop)((char *)source_buf + MPIU_PtrToAint(curr_loc),
(char *)target_buf + MPIU_PtrToAint(curr_loc),
&count, &type);
if (curr_len % type_size == 0) {
i++;
if (i != vec_len) {
curr_loc = dloop_vec[i].DLOOP_VECTOR_BUF;
curr_len = dloop_vec[i].DLOOP_VECTOR_LEN;
}
}
else {
curr_loc = (void *)((char *)curr_loc + type_extent * count);
curr_len -= type_size * count;
}
}
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPID_Segment_free(segp);
mpi_errno = do_accumulate_op((void*)packed_buf, stream_count, predefined_type,
(void*)((char*)base+disp_unit*target_disp), target_count, target_datatype,
stream_offset, op);
if (shm_op) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Free(packed_buf);
}
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_ACC_OP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (shm_op)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -485,10 +435,14 @@ static inline int MPIDI_CH3I_Shm_get_acc_op(const void *origin_addr, int origin_
{
int disp_unit, shm_locked = 0;
void *base = NULL;
MPI_User_function *uop = NULL;
MPID_Datatype *dtp;
int i;
MPI_Datatype predefined_type;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_extent, predefined_dtp_count;
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(2);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_GET_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_GET_ACC_OP);
......@@ -523,135 +477,78 @@ static inline int MPIDI_CH3I_Shm_get_acc_op(const void *origin_addr, int origin_
goto fn_exit;
}
if (op == MPI_REPLACE) {
mpi_errno = shm_copy(origin_addr, origin_count, origin_datatype,
(char *) base + disp_unit * target_disp, target_count,
target_datatype);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
mpi_errno = do_accumulate_op((void*)origin_addr, origin_count, origin_datatype,
(void*)((char *)base+disp_unit*target_disp), target_count, target_datatype,
0, op);
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
MPIU_ERR_CHKANDJUMP1((HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN),
mpi_errno, MPI_ERR_OP, "**opnotpredefined", "**opnotpredefined %d", op);
/* Get total length of origin data */
MPID_Datatype_get_size_macro(origin_datatype, origin_dtp_size);
total_len = origin_dtp_size * origin_count;
/* get the function by indexing into the op table */
uop = MPIR_OP_HDL_TO_FN(op);
MPID_Datatype_get_ptr(origin_datatype, origin_dtp_ptr);
MPIU_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->eltype != MPI_DATATYPE_NULL);
predefined_type = origin_dtp_ptr->eltype;
MPID_Datatype_get_size_macro(predefined_type, predefined_dtp_size);
predefined_dtp_count = total_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(predefined_type, predefined_dtp_extent);
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
if ((op == MPI_NO_OP || MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) &&
MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
/* Cast away const'ness for origin_address in order to
* avoid changing the prototype for MPI_User_function */
(*uop) ((void *) origin_addr, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
}
else {
/* derived datatype */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
MPID_Segment *segp;
DLOOP_VECTOR *dloop_vec;
rest_len = total_len;
for (i = 0; i < stream_unit_count; i++) {
MPID_Segment *seg = NULL;
void *packed_buf = NULL;
MPI_Aint first, last;
int vec_len, i, type_size, count;
MPI_Datatype type;
MPI_Aint true_lb, true_extent, extent;
void *tmp_buf = NULL, *target_buf;
const void *source_buf;
MPI_Aint type_extent;
MPI_Aint curr_len;
void *curr_loc;
if (origin_datatype != target_datatype) {
/* first copy the data into a temporary buffer with
* the same datatype as the target. Then do the
* accumulate operation. */
MPIR_Type_get_true_extent_impl(target_datatype, &true_lb, &true_extent);
MPID_Datatype_get_extent_macro(target_datatype, extent);
MPIU_CHKLMEM_MALLOC(tmp_buf, void *,
target_count * (MPIR_MAX(extent, true_extent)),
mpi_errno, "temporary buffer");
/* adjust for potential negative lower bound in datatype */
tmp_buf = (void *) ((char *) tmp_buf - true_lb);
mpi_errno = MPIR_Localcopy(origin_addr, origin_count,
origin_datatype, tmp_buf, target_count, target_datatype);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
int is_predef_contig;
MPI_Aint stream_offset, stream_size, stream_count;
stream_offset = i * stream_elem_count * predefined_dtp_size;
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
stream_count = stream_size / predefined_dtp_size;
rest_len -= stream_size;
first = stream_offset;
last = stream_offset + stream_size;
packed_buf = MPIU_Malloc(stream_size);
seg = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment");
MPID_Segment_init(origin_addr, origin_count, origin_datatype, seg, 0);
MPID_Segment_pack(seg, first, &last, packed_buf);
MPID_Segment_free(seg);
MPID_Datatype_is_contig(predefined_type, &is_predef_contig);
if (!is_predef_contig) {
void *tmpbuf = MPIU_Malloc(stream_count * predefined_dtp_extent);
mpi_errno = MPIR_Localcopy(tmpbuf, stream_count, predefined_type,
packed_buf, stream_size, MPI_BYTE);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Free(packed_buf);
packed_buf = tmpbuf;
}
if (MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
/* target predefined type, origin derived datatype */
mpi_errno = do_accumulate_op((void*)packed_buf, stream_count, predefined_type,
(void*)((char*)base+disp_unit*target_disp), target_count, target_datatype,
stream_offset, op);
(*uop) (tmp_buf, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
}
else {
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((!segp), mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(NULL, target_count, target_datatype, segp, 0);
first = 0;
last = SEGMENT_IGNORE_LAST;
MPID_Datatype_get_ptr(target_datatype, dtp);
vec_len = dtp->max_contig_blocks * target_count + 1;
/* +1 needed because Rob says so */
MPIU_CHKLMEM_MALLOC(dloop_vec, DLOOP_VECTOR *,
vec_len * sizeof(DLOOP_VECTOR), mpi_errno, "dloop vector");
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
source_buf = (tmp_buf != NULL) ? (const void *) tmp_buf : origin_addr;
target_buf = (char *) base + disp_unit * target_disp;
type = dtp->eltype;
MPIU_Assert(type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(type, type_size);
MPID_Datatype_get_extent_macro(type, type_extent);
i = 0;
curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF;
curr_len = dloop_vec[0].DLOOP_VECTOR_LEN;
while (i != vec_len) {
if (curr_len < type_size) {
MPIU_Assert(i != vec_len);
i++;
curr_len += dloop_vec[i].DLOOP_VECTOR_LEN;
continue;
}
MPIU_Assign_trunc(count, curr_len/type_size, int);
(*uop)((char *)source_buf + MPIU_PtrToAint(curr_loc),
(char *)target_buf + MPIU_PtrToAint(curr_loc),
&count, &type);
if (curr_len % type_size == 0) {
i++;
if (i != vec_len) {
curr_loc = dloop_vec[i].DLOOP_VECTOR_BUF;
curr_len = dloop_vec[i].DLOOP_VECTOR_LEN;
}
}
else {
curr_loc = (void *)((char *)curr_loc + type_extent * count);
curr_len -= type_size * count;
}
}
MPID_Segment_free(segp);
}
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Free(packed_buf);
}
if (shm_locked) {
......@@ -660,7 +557,6 @@ static inline int MPIDI_CH3I_Shm_get_acc_op(const void *origin_addr, int origin_
}
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_GET_ACC_OP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment