Commit f68228a9 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

Several updates to the rportals code.



We now use a target structure for each target ID that we want to send
data to.  This allows us to separate out target-specific states and
more cleanly manage operations to a single target.
Signed-off-by: default avatarAntonio Pena Monferrer <apenya@mcs.anl.gov>
parent 50873835
......@@ -461,7 +461,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
MPIU_ERR_POP(mpi_errno);
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
......@@ -531,7 +531,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
......
......@@ -60,39 +60,9 @@
#define IDS_ARE_EQUAL(t1, t2) \
(t1.phys.nid == t2.phys.nid && t1.phys.pid == t2.phys.pid)
#define RPTL_OP_POOL_SEGMENT_COUNT (1024)
static struct {
struct rptl *rptl_list;
struct rptl_op_pool_segment {
struct rptl_op op[RPTL_OP_POOL_SEGMENT_COUNT];
struct rptl_op_pool_segment *next;
struct rptl_op_pool_segment *prev;
} *op_segment_list;
struct rptl_op *op_pool;
struct rptl_op *op_list;
/* targets that we do not send messages to either because they
* sent a PAUSE message or because we received a NACK from them */
struct rptl_paused_target {
ptl_process_t id;
enum rptl_paused_target_state {
RPTL_TARGET_STATE_FLOWCONTROL,
RPTL_TARGET_STATE_DISABLED,
RPTL_TARGET_STATE_RECEIVED_PAUSE,
RPTL_TARGET_STATE_PAUSE_ACKED
} state;
/* the rptl on which the pause message came in, since we need
* to use it to send the pause ack to the right target
* portal */
struct rptl *rptl;
struct rptl_paused_target *next;
struct rptl_paused_target *prev;
} *paused_target_list;
struct rptl_target *target_list;
int world_size;
uint64_t origin_events_left;
......@@ -102,82 +72,42 @@ static struct {
#undef FUNCNAME
#define FUNCNAME alloc_target
#define FUNCNAME find_target
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int alloc_target(ptl_process_t id, enum rptl_paused_target_state state, struct rptl *rptl)
static int find_target(ptl_process_t id, struct rptl_target **target)
{
int mpi_errno = MPI_SUCCESS;
int ret = PTL_OK;
struct rptl_paused_target *target;
struct rptl_target *t;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_ALLOC_TARGET);
MPIDI_STATE_DECL(MPID_STATE_FIND_TARGET);
MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_TARGET);
MPIDI_FUNC_ENTER(MPID_STATE_FIND_TARGET);
for (target = rptl_info.paused_target_list; target; target = target->next)
if (IDS_ARE_EQUAL(target->id, id))
for (t = rptl_info.target_list; t; t = t->next)
if (IDS_ARE_EQUAL(t->id, id))
break;
/* if a paused target does not already exist, create one */
if (target == NULL) {
/* create a new paused target */
MPIU_CHKPMEM_MALLOC(target, struct rptl_paused_target *, sizeof(struct rptl_paused_target),
mpi_errno, "rptl paused target");
MPL_DL_APPEND(rptl_info.paused_target_list, target);
/* if the target does not already exist, create one */
if (t == NULL) {
MPIU_CHKPMEM_MALLOC(t, struct rptl_target *, sizeof(struct rptl_target), mpi_errno, "rptl target");
MPL_DL_APPEND(rptl_info.target_list, t);
target->id = id;
target->state = state;
target->rptl = rptl;
t->id = id;
t->state = RPTL_TARGET_STATE_ACTIVE;
t->rptl = NULL;
t->op_segment_list = NULL;
t->op_pool = NULL;
t->data_op_list = NULL;
t->control_op_list = NULL;
}
else if (target->state < state) {
target->state = state;
target->rptl = rptl;
}
else {
/* target already exists and is in a higher state than the
* state we are trying to set. e.g., this is possible if we
* got a PAUSE event from a different portal and acked. */
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_TARGET);
return ret;
fn_fail:
if (mpi_errno)
ret = PTL_FAIL;
MPIU_CHKPMEM_REAP();
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME alloc_op_segment
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int alloc_op_segment(void)
{
struct rptl_op_pool_segment *op_segment;
int mpi_errno = MPI_SUCCESS;
int i;
int ret = PTL_OK;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP_SEGMENT);
MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP_SEGMENT);
MPIU_CHKPMEM_MALLOC(op_segment, struct rptl_op_pool_segment *, sizeof(struct rptl_op_pool_segment),
mpi_errno, "op pool segment");
MPL_DL_APPEND(rptl_info.op_segment_list, op_segment);
for (i = 0; i < RPTL_OP_POOL_SEGMENT_COUNT; i++)
MPL_DL_APPEND(rptl_info.op_pool, &op_segment->op[i]);
*target = t;
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP_SEGMENT);
MPIDI_FUNC_EXIT(MPID_STATE_FIND_TARGET);
return ret;
fn_fail:
......@@ -205,14 +135,8 @@ int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
rptl_info.rptl_list = NULL;
rptl_info.target_list = NULL;
rptl_info.op_pool = NULL;
ret = alloc_op_segment();
RPTLU_ERR_POP(ret, "error allocating op segment\n");
rptl_info.op_list = NULL;
rptl_info.paused_target_list = NULL;
rptl_info.world_size = world_size;
rptl_info.origin_events_left = max_origin_events;
rptl_info.get_target_info = get_target_info;
......@@ -236,11 +160,13 @@ int MPID_nem_ptl_rptl_drain_eq(int eq_count, ptl_handle_eq_t *eq)
ptl_event_t event;
struct rptl_op_pool_segment *op_segment;
int i;
struct rptl_target *target, *t;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
while (rptl_info.op_list) {
for (target = rptl_info.target_list; target; target = target->next) {
while (target->control_op_list || target->data_op_list) {
for (i = 0; i < eq_count; i++) {
/* read and ignore all events */
ret = MPID_nem_ptl_rptl_eqget(eq[i], &event);
......@@ -249,13 +175,23 @@ int MPID_nem_ptl_rptl_drain_eq(int eq_count, ptl_handle_eq_t *eq)
RPTLU_ERR_POP(ret, "Error calling MPID_nem_ptl_rptl_eqget\n");
}
}
}
while (rptl_info.op_segment_list) {
op_segment = rptl_info.op_segment_list;
MPL_DL_DELETE(rptl_info.op_segment_list, op_segment);
for (target = rptl_info.target_list; target;) {
assert(target->data_op_list == NULL);
assert(target->control_op_list == NULL);
while (target->op_segment_list) {
op_segment = target->op_segment_list;
MPL_DL_DELETE(target->op_segment_list, op_segment);
MPIU_Free(op_segment);
}
t = target->next;
MPIU_Free(target);
target = t;
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
return ret;
......@@ -328,7 +264,7 @@ int MPID_nem_ptl_rptl_ptinit(ptl_handle_ni_t ni_handle, ptl_handle_eq_t eq_handl
MPIU_CHKPMEM_MALLOC(rptl, struct rptl *, sizeof(struct rptl), mpi_errno, "rptl");
MPL_DL_APPEND(rptl_info.rptl_list, rptl);
rptl->local_state = RPTL_LOCAL_STATE_NORMAL;
rptl->local_state = RPTL_LOCAL_STATE_ACTIVE;
rptl->pause_ack_counter = 0;
rptl->data.ob_max_count = 0;
......@@ -415,26 +351,40 @@ int MPID_nem_ptl_rptl_ptfini(ptl_pt_index_t pt_index)
#define FUNCNAME alloc_op
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int alloc_op(struct rptl_op **op)
static int alloc_op(struct rptl_op **op, struct rptl_target *target)
{
int ret = PTL_OK;
struct rptl_op_pool_segment *op_segment;
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP);
MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP);
if (rptl_info.op_pool == NULL) {
ret = alloc_op_segment();
RPTLU_ERR_POP(ret, "error allocating op segment\n");
assert(target);
if (target->op_pool == NULL) {
MPIU_CHKPMEM_MALLOC(op_segment, struct rptl_op_pool_segment *, sizeof(struct rptl_op_pool_segment),
mpi_errno, "op pool segment");
MPL_DL_APPEND(target->op_segment_list, op_segment);
for (i = 0; i < RPTL_OP_POOL_SEGMENT_COUNT; i++)
MPL_DL_APPEND(target->op_pool, &op_segment->op[i]);
}
*op = rptl_info.op_pool;
MPL_DL_DELETE(rptl_info.op_pool, *op);
*op = target->op_pool;
MPL_DL_DELETE(target->op_pool, *op);
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP);
return ret;
fn_fail:
if (mpi_errno)
ret = PTL_FAIL;
MPIU_CHKPMEM_REAP();
goto fn_exit;
}
......@@ -449,74 +399,199 @@ void free_op(struct rptl_op *op)
MPIDI_FUNC_ENTER(MPID_STATE_FREE_OP);
MPL_DL_APPEND(rptl_info.op_pool, op);
MPL_DL_APPEND(op->target->op_pool, op);
MPIDI_FUNC_EXIT(MPID_STATE_FREE_OP);
}
#undef FUNCNAME
#define FUNCNAME issue_op
#define FUNCNAME poke_progress
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int issue_op(struct rptl_op *op)
int poke_progress(void)
{
int ret = PTL_OK;
struct rptl_paused_target *target;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_OP);
struct rptl_target *target;
struct rptl_op *op;
struct rptl *rptl;
int i;
int mpi_errno = MPI_SUCCESS;
ptl_process_t id;
ptl_pt_index_t data_pt, control_pt;
MPIDI_STATE_DECL(MPID_STATE_POKE_PROGRESS);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_OP);
MPIDI_FUNC_ENTER(MPID_STATE_POKE_PROGRESS);
if (op->op_type == RPTL_OP_PUT) {
for (target = rptl_info.paused_target_list; target; target = target->next)
if (IDS_ARE_EQUAL(target->id, op->u.put.target_id))
/* make progress on local RPTLs */
for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) {
/* if the local state is active, there's nothing to do */
if (rptl->local_state == RPTL_LOCAL_STATE_ACTIVE)
continue;
/* if we are in a local AWAITING PAUSE ACKS state, see if we
* can send out the unpause message */
if (rptl->local_state == RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS &&
rptl->pause_ack_counter == rptl_info.world_size) {
/* if we are over the max count limit, do not send an
* unpause message yet */
if (rptl->data.ob_curr_count > rptl->data.ob_max_count)
continue;
ret = PtlPTEnable(rptl->ni, rptl->data.pt);
RPTLU_ERR_POP(ret, "Error returned while reenabling PT\n");
rptl->local_state = RPTL_LOCAL_STATE_ACTIVE;
for (i = 0; i < rptl_info.world_size; i++) {
mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt);
if (mpi_errno) {
ret = PTL_FAIL;
RPTLU_ERR_POP(ret, "Error getting target info\n");
}
/* make sure the user setup a control portal */
assert(control_pt != PTL_PT_ANY);
/* disable flow control for control messages */
ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
RPTLU_ERR_POP(ret, "Error sending unpause message\n");
}
}
}
/* make progress on targets */
for (target = rptl_info.target_list; target; target = target->next) {
if (target->state == RPTL_TARGET_STATE_RECEIVED_PAUSE) {
for (op = target->data_op_list; op; op = op->next)
if (op->state == RPTL_OP_STATE_ISSUED)
break;
if (op)
continue;
if (target && op->u.put.flow_control)
goto fn_exit;
/* send a pause ack message */
assert(target->rptl);
for (i = 0; i < rptl_info.world_size; i++) {
/* find the target that has this target id and get the
* control portal information for it */
mpi_errno = rptl_info.get_target_info(i, &id, target->rptl->data.pt, &data_pt, &control_pt);
if (mpi_errno) {
ret = PTL_FAIL;
RPTLU_ERR_POP(ret, "Error getting target info\n");
}
if (IDS_ARE_EQUAL(id, target->id))
break;
}
/* make sure the user setup a control portal */
assert(control_pt != PTL_PT_ANY);
target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
/* disable flow control for control messages */
ret = MPID_nem_ptl_rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
continue;
}
/* issue out all the control messages first */
for (op = target->control_op_list; op; op = op->next) {
assert(op->op_type == RPTL_OP_PUT);
/* skip all the issued ops */
if (op->state == RPTL_OP_STATE_ISSUED)
continue;
/* we should not get any NACKs on the control portal */
assert(op->state != RPTL_OP_STATE_NACKED);
if (rptl_info.origin_events_left < 2) {
ret = alloc_target(op->u.put.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
RPTLU_ERR_POP(ret, "error allocating paused target\n");
goto fn_exit;
/* too few origin events left. we can't issue this op
* or any following op to this target in order to
* maintain ordering */
break;
}
rptl_info.origin_events_left -= 2;
/* force request for an ACK even if the user didn't ask for
* it. replace the user pointer with the OP id. */
ret =
PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
/* force request for an ACK even if the user didn't ask
* for it. replace the user pointer with the OP id. */
ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
op->u.put.match_bits, op->u.put.remote_offset, op,
op->u.put.hdr_data);
RPTLU_ERR_POP(ret, "Error issuing PUT\n");
op->state = RPTL_OP_STATE_ISSUED;
}
else {
for (target = rptl_info.paused_target_list; target; target = target->next)
if (IDS_ARE_EQUAL(target->id, op->u.get.target_id))
if (target->state == RPTL_TARGET_STATE_DISABLED || target->state == RPTL_TARGET_STATE_PAUSE_ACKED)
continue;
/* then issue out all the data messages */
for (op = target->data_op_list; op; op = op->next) {
if (op->op_type == RPTL_OP_PUT) {
/* skip all the issued ops */
if (op->state == RPTL_OP_STATE_ISSUED)
continue;
/* if an op has been nacked, don't issue anything else
* to this target */
if (op->state == RPTL_OP_STATE_NACKED)
break;
if (target)
goto fn_exit;
if (rptl_info.origin_events_left < 2) {
/* too few origin events left. we can't issue
* this op or any following op to this target in
* order to maintain ordering */
break;
}
rptl_info.origin_events_left -= 2;
/* force request for an ACK even if the user didn't
* ask for it. replace the user pointer with the OP
* id. */
ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
op->u.put.match_bits, op->u.put.remote_offset, op,
op->u.put.hdr_data);
RPTLU_ERR_POP(ret, "Error issuing PUT\n");
}
else if (op->op_type == RPTL_OP_GET) {
/* skip all the issued ops */
if (op->state == RPTL_OP_STATE_ISSUED)
continue;
/* if an op has been nacked, don't issue anything else
* to this target */
if (op->state == RPTL_OP_STATE_NACKED)
break;
if (rptl_info.origin_events_left < 1) {
ret = alloc_target(op->u.get.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
RPTLU_ERR_POP(ret, "error allocating paused target\n");
goto fn_exit;
/* too few origin events left. we can't issue
* this op or any following op to this target in
* order to maintain ordering */
break;
}
rptl_info.origin_events_left--;
ret =
PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length,
ret = PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length,
op->u.get.target_id, op->u.get.pt_index, op->u.get.match_bits,
op->u.get.remote_offset, op);
RPTLU_ERR_POP(ret, "Error issuing GET\n");
}
op->state = RPTL_OP_STATE_ISSUED;
}
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_OP);
MPIDI_FUNC_EXIT(MPID_STATE_POKE_PROGRESS);
return ret;
fn_fail:
......@@ -535,11 +610,15 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
{
struct rptl_op *op;
int ret = PTL_OK;
struct rptl_target *target;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
ret = alloc_op(&op);
ret = find_target(target_id, &target);
RPTLU_ERR_POP(ret, "error finding target structure\n");
ret = alloc_op(&op, target);
RPTLU_ERR_POP(ret, "error allocating op\n");
op->op_type = RPTL_OP_PUT;
......@@ -562,12 +641,15 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
op->u.put.ack = NULL;
op->u.put.flow_control = flow_control;
op->events_ready = 0;
op->target = target;
MPL_DL_APPEND(rptl_info.op_list, op);
if (op->u.put.flow_control)
MPL_DL_APPEND(target->data_op_list, op);
else
MPL_DL_APPEND(target->control_op_list, op);
/* if we are not in a PAUSED state, issue the operation */
ret = issue_op(op);
RPTLU_ERR_POP(ret, "Error from issue_op\n");
ret = poke_progress();
RPTLU_ERR_POP(ret, "Error from poke_progress\n");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
......@@ -588,12 +670,15 @@ int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
{
struct rptl_op *op;
int ret = PTL_OK;
struct rptl_paused_target *target;
struct rptl_target *target;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
ret = alloc_op(&op);
ret = find_target(target_id, &target);
RPTLU_ERR_POP(ret, "error finding target structure\n");
ret = alloc_op(&op, target);
RPTLU_ERR_POP(ret, "error allocating op\n");
op->op_type = RPTL_OP_GET;
......@@ -610,15 +695,12 @@ int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
op->u.get.user_ptr = user_ptr;
op->events_ready = 0;
op->target = target;
MPL_DL_APPEND(rptl_info.op_list, op);
for (target = rptl_info.paused_target_list; target; target = target->next)
if (IDS_ARE_EQUAL(target->id, target_id))
break;
MPL_DL_APPEND(target->data_op_list, op);
ret = issue_op(op);
RPTLU_ERR_POP(ret, "Error from issue_op\n");
ret = poke_progress();
RPTLU_ERR_POP(ret, "Error from poke_progress\n");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
......@@ -657,6 +739,9 @@ static int send_pause_messages(struct rptl *rptl)
RPTLU_ERR_POP(ret, "Error getting target info while sending pause messages\n");
}
/* make sure the user setup a control portal */
assert(control_pt != PTL_PT_ANY);
/* disable flow control for control messages */
ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
NULL, RPTL_CONTROL_MSG_PAUSE, 0);
......@@ -673,167 +758,35 @@ static int send_pause_messages(struct rptl *rptl)
#undef FUNCNAME
#define FUNCNAME send_pause_ack_messages
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int send_pause_ack_messages(void)
{
struct rptl_op *op;