Commit 50461978 authored by Antonio J. Pena's avatar Antonio J. Pena Committed by Kenneth Raffenetti
Browse files

Fix Portals4 RMA



Full redesign, mainly of the functions in ptl_nm.c and the
communications involving the "control" portal. Still some
problems with flow control.
Signed-off-by: Kenneth Raffenetti's avatarKen Raffenetti <raffenet@mcs.anl.gov>
parent 1ea97753
......@@ -99,6 +99,7 @@ typedef struct {
ptl_pt_index_t ptg;
ptl_pt_index_t ptc;
ptl_pt_index_t ptr;
ptl_pt_index_t ptrc;
int id_initialized; /* TRUE iff id and pt have been initialized */
MPIDI_msg_sz_t num_queued_sends; /* number of reqs for this vc in sendq */
} MPID_nem_ptl_vc_area;
......@@ -153,7 +154,7 @@ typedef struct {
int MPID_nem_ptl_nm_init(void);
int MPID_nem_ptl_nm_finalize(void);
int MPID_nem_ptl_nm_event_handler(const ptl_event_t *e);
int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e);
int MPID_nem_ptl_sendq_complete_with_error(MPIDI_VC_t *vc, int req_errno);
int MPID_nem_ptl_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz);
int MPID_nem_ptl_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
......@@ -165,7 +166,7 @@ int MPID_nem_ptl_poll_finalize(void);
int MPID_nem_ptl_poll(int is_blocking_poll);
int MPID_nem_ptl_vc_terminated(MPIDI_VC_t *vc);
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg,
ptl_pt_index_t *ptc, ptl_pt_index_t *ptr);
ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc);
void MPI_nem_ptl_pack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
MPID_nem_ptl_pack_overflow_t *overflow);
int MPID_nem_ptl_unpack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
......
......@@ -20,6 +20,7 @@
#define PTIG_KEY "PTIG"
#define PTIC_KEY "PTIC"
#define PTIR_KEY "PTIR"
#define PTIRC_KEY "PTIRC"
ptl_handle_ni_t MPIDI_nem_ptl_ni;
ptl_pt_index_t MPIDI_nem_ptl_pt;
......@@ -28,6 +29,7 @@ ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages *
ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
ptl_handle_eq_t MPIDI_nem_ptl_target_eq;
ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
ptl_pt_index_t MPIDI_nem_ptl_control_rpt_pt; /* portal for rportals control messages */
ptl_handle_md_t MPIDI_nem_ptl_global_md;
ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
......@@ -114,7 +116,7 @@ static int get_target_info(int rank, ptl_process_t *id, ptl_pt_index_t local_dat
}
else if (local_data_pt == MPIDI_nem_ptl_control_pt) {
*target_data_pt = vc_ptl->ptc;
*target_control_pt = PTL_PT_ANY;
*target_control_pt = vc_ptl->ptrc;
}
fn_exit:
......@@ -208,6 +210,11 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
PTL_PT_ANY, &MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create an MD that covers all of memory */
md.start = 0;
md.length = (ptl_size_t)-1;
......@@ -226,14 +233,14 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allow rportal to manage the get and control portals, but we
* don't expect retransmission to be needed on these portals, so
* don't expect retransmission to be needed on the get portal, so
* we pass PTL_PT_ANY as the dummy portal. unfortunately, portals
* does not have an "invalid" PT constant, which would have been
* more appropriate to pass over here. */
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_get_pt, PTL_PT_ANY);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_control_pt, PTL_PT_ANY);
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_control_pt, MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create business card */
......@@ -300,6 +307,9 @@ static int ptl_finalize(void)
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlNIFini(MPIDI_nem_ptl_ni);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlnifini", "**ptlnifini %s", MPID_nem_ptl_strerror(ret));
......@@ -367,6 +377,12 @@ static int get_business_card(int my_rank, char **bc_val_p, int *val_max_sz_p)
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIRC_KEY, (char *)&MPIDI_nem_ptl_control_rpt_pt,
sizeof(MPIDI_nem_ptl_control_rpt_pt));
if (str_errno) {
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_GET_BUSINESS_CARD);
......@@ -435,6 +451,8 @@ static int vc_init(MPIDI_VC_t *vc)
vc_ptl->id_initialized = FALSE;
vc_ptl->num_queued_sends = 0;
mpi_errno = MPID_nem_ptl_init_id(vc);
MPIDI_FUNC_EXIT(MPID_STATE_VC_INIT);
return mpi_errno;
}
......@@ -457,7 +475,7 @@ static int vc_destroy(MPIDI_VC_t *vc)
#define FUNCNAME MPID_nem_ptl_get_id_from_bc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr)
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc)
{
int mpi_errno = MPI_SUCCESS;
int ret;
......@@ -484,6 +502,9 @@ int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, pt
ret = MPIU_Str_get_binary_arg(business_card, PTIR_KEY, (char *)ptr, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptr), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
ret = MPIU_Str_get_binary_arg(business_card, PTIRC_KEY, (char *)ptrc, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptrc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_GET_ID_FROM_BC);
return mpi_errno;
......@@ -509,8 +530,6 @@ int vc_terminate(MPIDI_VC_t *vc)
outstanding sends with an error and terminate
connection immediately. */
MPIU_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
mpi_errno = MPID_nem_ptl_sendq_complete_with_error(vc, req_errno);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPID_nem_ptl_vc_terminated(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else if (vc_ptl->num_queued_sends == 0) {
......@@ -576,7 +595,7 @@ int MPID_nem_ptl_init_id(MPIDI_VC_t *vc)
mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr);
mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr, &vc_ptl->ptrc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
vc_ptl->id_initialized = TRUE;
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2012 by Argonne National Laboratory.
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "ptl_impl.h"
#include "stddef.h" /* C99; for offsetof */
#include <mpl_utlist.h>
#include "rptl.h"
#define NUM_SEND_BUFS 100
#define NUM_RECV_BUFS 100
#define BUFLEN (sizeof(MPIDI_CH3_Pkt_t) + PTL_MAX_EAGER)
#define NUM_RECV_BUFS 50
#define CTL_TAG 0
#define PAYLOAD_SIZE (PTL_MAX_EAGER - offsetof(buf_t, packet) - sizeof(MPIDI_CH3_Pkt_t))
#define SENDBUF_SIZE(sent_sz_) (offsetof(buf_t, packet) + sizeof(MPIDI_CH3_Pkt_t) + (sent_sz_))
#define SENDBUF(req_) REQ_PTL(req_)->chunk_buffer[0]
#define TMPBUF(req_) REQ_PTL(req_)->chunk_buffer[1]
#define NEW_TAG(tag_) do { \
global_tag += 2; \
if (global_tag == CTL_TAG) \
global_tag += 2; \
(tag_) = global_tag; \
} while(0)
#define GET_TAG(tag_) (((tag_) >> 1) << 1)
#define DONE_TAG(tag_) ((tag_) | 0x1)
typedef struct {
size_t remaining;
ptl_match_bits_t tag;
char packet[PTL_MAX_EAGER];
} buf_t;
static buf_t recvbufs[NUM_RECV_BUFS];
static ptl_me_t mes[NUM_RECV_BUFS];
static ptl_handle_me_t me_handles[NUM_RECV_BUFS];
static unsigned long long put_cnt = 0; /* required to not finalizing too early */
static MPID_Request *done_req;
static ptl_match_bits_t global_tag = 0;
typedef struct MPID_nem_ptl_sendbuf {
struct MPID_nem_ptl_sendbuf *next;
union {
struct {
MPIDI_CH3_Pkt_t hdr;
char payload[PTL_MAX_EAGER];
} hp; /* header+payload */
char p[BUFLEN]; /* just payload */
} buf;
} MPID_nem_ptl_sendbuf_t;
static MPID_nem_ptl_sendbuf_t sendbuf[NUM_SEND_BUFS];
static MPID_nem_ptl_sendbuf_t *free_head = NULL;
static MPID_nem_ptl_sendbuf_t *free_tail = NULL;
static char recvbuf[BUFLEN][NUM_RECV_BUFS];
static ptl_me_t recvbuf_me[NUM_RECV_BUFS];
static ptl_handle_me_t recvbuf_me_handle[NUM_RECV_BUFS];
#define FREE_EMPTY() (free_head == NULL)
#define FREE_HEAD() free_head
#define FREE_PUSH(buf_p) MPL_LL_PREPEND(free_head, free_tail, buf_p)
#define FREE_POP(buf_pp) do { *(buf_pp) = free_head; MPL_LL_DELETE(free_head, free_tail, free_head); } while (0)
static struct {MPID_Request *head, *tail;} send_queue;
static int send_queued(void);
static void vc_dbg_print_sendq(FILE *stream, MPIDI_VC_t *vc) {/* FIXME: write real function */ return;}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_init
......@@ -56,36 +52,33 @@ int MPID_nem_ptl_nm_init(void)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_INIT);
MPIU_Assert(BUFLEN == sizeof(sendbuf->buf));
/* init send */
for (i = 0; i < NUM_SEND_BUFS; ++i)
FREE_PUSH(&sendbuf[i]);
send_queue.head = send_queue.tail = NULL;
MPID_nem_net_module_vc_dbg_print_sendq = vc_dbg_print_sendq;
/* init recv */
id_any.phys.pid = PTL_PID_ANY;
id_any.phys.nid = PTL_NID_ANY;
for (i = 0; i < NUM_RECV_BUFS; ++i) {
recvbuf_me[i].start = recvbuf[i];
recvbuf_me[i].length = BUFLEN;
recvbuf_me[i].ct_handle = PTL_CT_NONE;
recvbuf_me[i].uid = PTL_UID_ANY;
recvbuf_me[i].options = (PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_IS_ACCESSIBLE);
recvbuf_me[i].match_id = id_any;
recvbuf_me[i].match_bits = 0;
recvbuf_me[i].ignore_bits = (ptl_match_bits_t)~0;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &recvbuf_me[i], PTL_PRIORITY_LIST, (void *)(uint64_t)i,
&recvbuf_me_handle[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
mes[i].start = &recvbufs[i];
mes[i].length = sizeof(buf_t);
mes[i].ct_handle = PTL_CT_NONE;
mes[i].uid = PTL_UID_ANY;
mes[i].options = (PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_IS_ACCESSIBLE);
mes[i].match_id = id_any;
mes[i].match_bits = CTL_TAG;
mes[i].ignore_bits = 0;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &mes[i],
PTL_PRIORITY_LIST, (void *)(uint64_t)i, &me_handles[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
MPID_nem_ptl_strerror(ret));
}
done_req = MPID_Request_create();
MPIU_Assert(done_req != NULL);
done_req->dev.OnDataAvail = NULL;
SENDBUF(done_req) = NULL;
REQ_PTL(done_req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_INIT);
return mpi_errno;
......@@ -106,11 +99,16 @@ int MPID_nem_ptl_nm_finalize(void)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
while (put_cnt) MPID_nem_ptl_poll(1); /* Wait for puts to finish */
for (i = 0; i < NUM_RECV_BUFS; ++i) {
ret = PtlMEUnlink(recvbuf_me_handle[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s", MPID_nem_ptl_strerror(ret));
ret = PtlMEUnlink(me_handles[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s",
MPID_nem_ptl_strerror(ret));
}
MPIDI_CH3_Request_destroy(done_req);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
return mpi_errno;
......@@ -119,111 +117,140 @@ int MPID_nem_ptl_nm_finalize(void)
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_sendq_complete_with_error
#define FUNCNAME meappend_done
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_sendq_complete_with_error(MPIDI_VC_t *vc, int req_errno)
static inline int meappend_done(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
int ret;
ptl_me_t me;
ptl_handle_me_t me_handle;
MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_DONE);
MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_DONE);
me.start = NULL;
me.length = 0;
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
me.match_id = id;
me.match_bits = DONE_TAG(tag);
me.ignore_bits = 0;
me.min_free = 0;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
&me_handle);
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, DONE_TAG(tag)));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
MPID_nem_ptl_strerror(ret));
++put_cnt;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_DONE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME save_iov
#define FUNCNAME meappend_large
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline void save_iov(MPID_Request *sreq, void *hdr, void *data, MPIDI_msg_sz_t data_sz)
static inline int meappend_large(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag, void *buf, size_t remaining)
{
int index = 0;
MPIDI_STATE_DECL(MPID_STATE_SAVE_IOV);
MPIDI_FUNC_ENTER(MPID_STATE_SAVE_IOV);
MPIU_Assert(hdr || data_sz);
if (hdr) {
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
sreq->dev.iov[index].MPID_IOV_BUF = &sreq->dev.pending_pkt;
sreq->dev.iov[index].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
++index;
}
if (data_sz) {
sreq->dev.iov[index].MPID_IOV_BUF = data;
sreq->dev.iov[index].MPID_IOV_LEN = data_sz;
++index;
int mpi_errno = MPI_SUCCESS;
int ret;
ptl_me_t me;
MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_LARGE);
MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_LARGE);
me.start = buf;
me.length = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_GET | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
me.match_id = id;
me.match_bits = GET_TAG(tag);
me.ignore_bits = 0;
me.min_free = 0;
while (remaining) {
int incomplete;
ptl_handle_me_t foo_me_handle;
MPIDI_CH3U_Request_increment_cc(req, &incomplete); /* Cannot avoid GET events from poll infrastructure */
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
&foo_me_handle);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, GET_TAG(tag)));
me.start = (char *)me.start + me.length;
remaining -= me.length;
if (remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
me.length = remaining;
}
sreq->dev.iov_count = index;
MPIDI_FUNC_EXIT(MPID_STATE_SAVE_IOV);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_LARGE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME send_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_msg_sz_t *data_sz_p)
static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_sz_t data_sz,
MPID_Request *sreq)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_ptl_sendbuf_t *sb;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
MPIDI_CH3_Pkt_t **hdr_p = (MPIDI_CH3_Pkt_t **)vhdr_p;
char **data_p = (char **)vdata_p;
buf_t *sendbuf;
const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_SEND_PKT);
if (!vc_ptl->id_initialized) {
mpi_errno = MPID_nem_ptl_init_id(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
sendbuf->remaining = data_sz - sent_sz;
NEW_TAG(sendbuf->tag);
TMPBUF(sreq) = NULL;
if (MPIDI_CH3I_Sendq_empty(send_queue) && !FREE_EMPTY()) {
MPIDI_msg_sz_t len;
/* send header and first chunk of data */
FREE_POP(&sb);
sb->buf.hp.hdr = **hdr_p;
len = *data_sz_p;
if (len > PTL_MAX_EAGER)
len = PTL_MAX_EAGER;
MPIU_Memcpy(sb->buf.hp.payload, *data_p, len);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + len, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*hdr_p = NULL;
*data_p += len;
*data_sz_p -= len;
/* send additional data chunks if necessary */
while (*data_sz_p && !FREE_EMPTY()) {
FREE_POP(&sb);
len = *data_sz_p;
if (len > BUFLEN)
len = BUFLEN;
MPIU_Memcpy(sb->buf.p, *data_p, len);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
vc_ptl->id.phys.nid, vc_ptl->id.phys.pid, vc_ptl->ptc, sb));
*data_p += len;
*data_sz_p -= len;
}
if (data_sz) {
MPIU_Memcpy(sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t), data_p, sent_sz);
if (sendbuf->remaining) /* Post MEs for the remote gets */
mpi_errno = meappend_large(vc_ptl->id, sreq, sendbuf->tag, (char *)data_p + sent_sz, sendbuf->remaining);
if (mpi_errno)
goto fn_fail;
}
SENDBUF(sreq) = sendbuf;
REQ_PTL(sreq)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
/* Post ME for the DONE message */
mpi_errno = meappend_done(vc_ptl->id, sreq, sendbuf->tag);
if (mpi_errno)
goto fn_fail;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
sendbuf_sz, vc_ptl->id.phys.nid,
vc_ptl->id.phys.pid, vc_ptl->ptc));
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_SEND_PKT);
return mpi_errno;
......@@ -235,116 +262,61 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
#define FUNCNAME send_noncontig_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p, int *complete)
static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_ptl_sendbuf_t *sb;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
MPIDI_msg_sz_t last;
MPIDI_CH3_Pkt_t **hdr_p = (MPIDI_CH3_Pkt_t **)vhdr_p;
buf_t *sendbuf;
const size_t sent_sz = sreq->dev.segment_size < PAYLOAD_SIZE ? sreq->dev.segment_size : PAYLOAD_SIZE;
size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT);
*complete = 0;
MPID_nem_ptl_init_req(sreq);
MPIU_Assert(sreq->dev.segment_first == 0);
if (!vc_ptl->id_initialized) {
mpi_errno = MPID_nem_ptl_init_id(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
sendbuf->remaining = sreq->dev.segment_size - sent_sz;
NEW_TAG(sendbuf->tag);
TMPBUF(sreq) = NULL;
if (MPIDI_CH3I_Sendq_empty(send_queue) && !FREE_EMPTY()) {
/* send header and first chunk of data */
FREE_POP(&sb);
sb->buf.hp.hdr = **hdr_p;
MPIU_Assert(sreq->dev.segment_first == 0);
last = sreq->dev.segment_size;
if (last > PTL_MAX_EAGER)
last = PTL_MAX_EAGER;
MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, 0, last, sb->buf.hp.payload, &REQ_PTL(sreq)->overflow[0]);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + last, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*vhdr_p = NULL;
if (last == sreq->dev.segment_size) {
*complete = 1;
goto fn_exit;
}
/* send additional data chunks */
sreq->dev.segment_first = last;
if (sreq->dev.segment_size) {
MPIDI_msg_sz_t last = sent_sz;
MPID_Segment_pack(sreq->dev.segment_ptr, 0, &last, sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t));
while (!FREE_EMPTY()) {
FREE_POP(&sb);
if (sendbuf->remaining) { /* Post MEs for the remote gets */
TMPBUF(sreq) = MPIU_Malloc(sendbuf->remaining);
sreq->dev.segment_first = last;
last = sreq->dev.segment_size;
if (last > sreq->dev.segment_first+BUFLEN)
last = sreq->dev.segment_first+BUFLEN;