Commit 80e9fef0 authored by Misbah Mubarak's avatar Misbah Mubarak

Adding MPI rendezvous procotol, statistics recording for dragonfly validation

parent da5879aa
......@@ -29,12 +29,15 @@ int main(int argc, char **argv) {
int r = atoi(argv[2]);
int c = atoi(argv[3]);
int total_routers = g * r * c;
FILE *intra = fopen(argv[4], "wb");
FILE *inter = fopen(argv[5], "wb");
int router = 0;
int green = 0, black = 1;
int groups = 0;
printf("\n Rows %d Cols %d Groups %d ", r, c, g);
for(int rows = 0; rows < r; rows++) {
for(int cols = 0; cols < c; cols++) {
for(int cols1 = 0; cols1 < c; cols1++) {
......@@ -79,15 +82,19 @@ int main(int argc, char **argv) {
int dstB = (nsrcg % (gs/2)) * 2;
srcr = srcrB + srcB;
dstr = dstrB + dstB;
for(int r = 0; r < 2; r++) {
for(int block = 0; block < gsize; block++) {
fwrite(&srcr, sizeof(int), 1, inter);
fwrite(&dstr, sizeof(int), 1, inter);
printf("INTER %d %d\n", srcr, dstr);
if(srcr >= total_routers || dstr >= total_routers)
printf("\n connection between invalid routers src %d and dest %d ", srcr, dstr);
for(int r = 0; r < 2; r++) {
for(int block = 0; block < gsize; block++) {
fwrite(&srcr, sizeof(int), 1, inter);
fwrite(&dstr, sizeof(int), 1, inter);
//printf("INTER %d %d srcg %d destg %d srcrb %d dstrB %d \n", srcr, dstr, srcg, dstg, srcrB, dstrB);
}
}
srcr++;
dstr++;
}
}
}
}
......
......@@ -49,9 +49,9 @@ PARAMS
# number of global channels per router
num_global_channels="10";
# network config file for intra-group connections
intra-group-connections="@abs_srcdir@/intra-custom-small";
intra-group-connections="@abs_srcdir@/intra-theta";
# network config file for inter-group connections
inter-group-connections="@abs_srcdir@/inter-custom-small";
inter-group-connections="@abs_srcdir@/inter-theta";
# routing protocol to be used
routing="prog-adaptive";
}
......@@ -49,9 +49,9 @@ PARAMS
# number of global channels per router
num_global_channels="10";
# network config file for intra-group connections
intra-group-connections="../src/network-workloads/conf/dragonfly-custom/intra-custom";
intra-group-connections="../src/network-workloads/conf/dragonfly-custom/intra-custom-upd";
# network config file for inter-group connections
inter-group-connections="../src/network-workloads/conf/dragonfly-custom/inter-custom";
inter-group-connections="../src/network-workloads/conf/dragonfly-custom/inter-custom-upd";
# routing protocol to be used
routing="prog-adaptive";
}
......@@ -37,7 +37,7 @@ PARAMS
#bandwidth in GiB/s for local channels
local_bandwidth="5.25";
# bandwidth in GiB/s for global channels
global_bandwidth="18.75";
global_bandwidth="4.69";
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="16.0";
# ROSS message size
......@@ -52,5 +52,5 @@ PARAMS
# network config file for inter-group connections
inter-group-connections="../src/network-workloads/conf/dragonfly-custom/inter-theta";
# routing protocol to be used
routing="adaptive";
routing="prog-adaptive";
}
......@@ -13,26 +13,42 @@
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
#include "codes/quickhash.h"
#include "codes/codes-jobmap.h"
/* turning on track lp will generate a lot of output messages */
#define MN_LP_NM "modelnet_dragonfly_custom"
#define CONTROL_MSG_SZ 64
#define TRACK_LP -1
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 0
#define EAGER_THRESHOLD 8192
#define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
#define PAYLOAD_SZ 1024
static int msg_size_hash_compare(
void *key, struct qhash_head *link);
int enable_msg_tracking = 1;
int unmatched = 0;
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
static int num_dumpi_traces = 0;
static int alloc_spec = 0;
static double self_overhead = 10.0;
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
......@@ -41,15 +57,16 @@ static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
/* variables for loading multiple applications */
/* Xu's additions start */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
tw_stime soft_delay_mpi = 2500;
tw_stime nic_delay = 1000;
tw_stime copy_per_byte_eager = 0.55;
char file_name_of_job[5][8192];
struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;
/* Xu's additions end */
/* Variables for Cortex Support */
/* Matthieu's additions start */
......@@ -65,9 +82,12 @@ typedef int32_t dumpi_req_id;
static int net_id = 0;
static float noise = 5.0;
static int num_net_lps = 0, num_mpi_lps = 0;
static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
FILE * workload_log = NULL;
FILE * msg_size_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;
......@@ -76,6 +96,9 @@ static uint64_t sample_bytes_written = 0;
long long num_bytes_sent=0;
long long num_bytes_recvd=0;
long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;
double max_time = 0, max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;
......@@ -102,6 +125,12 @@ enum MPI_NW_EVENTS
MPI_SEND_ARRIVED,
MPI_SEND_ARRIVED_CB, // for tracking message times on sender
MPI_SEND_POSTED,
MPI_REND_ARRIVED,
MPI_REND_ACK_ARRIVED,
CLI_BCKGND_FIN,
CLI_BCKGND_ARRIVE,
CLI_BCKGND_GEN,
CLI_NBR_FINISH,
};
struct mpi_workload_sample
......@@ -145,6 +174,15 @@ struct pending_waits
struct qlist_head ql;
};
struct msg_size_info
{
int64_t msg_size;
int num_msgs;
tw_stime agg_latency;
tw_stime avg_latency;
struct qhash_head * hash_link;
struct qlist_head ql;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;
......@@ -158,6 +196,9 @@ struct nw_state
int app_id;
int local_rank;
int is_finished;
int neighbor_completed;
struct rc_stack * processed_ops;
struct rc_stack * matched_reqs;
......@@ -194,9 +235,18 @@ struct nw_state
/* Pending wait operation */
struct pending_waits * wait_op;
/* Message size latency information */
struct qhash_table * msg_sz_table;
struct qlist_head msg_sz_list;
/* quick hash for maintaining message latencies */
unsigned long num_bytes_sent;
unsigned long num_bytes_recvd;
unsigned long syn_data;
unsigned long gen_data;
/* For sampling data */
int sampling_indx;
int max_arr_size;
......@@ -242,9 +292,10 @@ struct nw_message
} rc;
};
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, int is_rend);
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
......@@ -292,6 +343,281 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
/* update the message size */
static void update_message_size(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m,
mpi_msgs_queue * qitem,
int is_eager,
int is_send)
{
struct qhash_head * hash_link = NULL;
tw_stime copy_overhead = 0;
tw_stime msg_init_time = qitem->req_init_time;
if(!ns->msg_sz_table)
ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ);
hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));
if(is_eager)
copy_overhead = copy_per_byte_eager * qitem->num_bytes;
if(is_send)
msg_init_time = m->fwd.sim_start_time;
/* update hash table */
if(!hash_link)
{
struct msg_size_info * msg_info = malloc(sizeof(struct msg_size_info));
msg_info->msg_size = qitem->num_bytes;
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) + copy_overhead - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
else
{
struct msg_size_info * tmp = qhash_entry(hash_link, struct msg_size_info, hash_link);
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) + copy_overhead - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
// printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
}
static void notify_background_traffic_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
tw_rand_reverse_unif(lp->rng);
}
static void notify_background_traffic(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx);
for(int other_id = 0; other_id < num_jobs; other_id++)
{
if(other_id == jid.job)
continue;
struct codes_jobmap_id other_jid;
other_jid.job = other_id;
int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);
lprintf("\n Other ranks %ld ", num_other_ranks);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
tw_lpid global_dest_id;
for(int k = 0; k < num_other_ranks; k++)
{
other_jid.rank = k;
int intm_dest_id = codes_jobmap_to_global_id(other_jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new->msg_type = CLI_BCKGND_FIN;
tw_event_send(e);
}
}
return;
}
static void notify_neighbor_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
if(bf->c0)
{
notify_background_traffic_rc(ns, lp, bf, m);
return;
}
if(bf->c1)
{
tw_rand_reverse_unif(lp->rng);
}
}
static void notify_neighbor(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
if(ns->local_rank == num_dumpi_traces - 1
&& ns->is_finished == 1
&& ns->neighbor_completed == 1)
{
printf("\n All workloads completed, notifying background traffic ");
bf->c0 = 1;
notify_background_traffic(ns, lp, bf, m);
return;
}
struct codes_jobmap_id nbr_jid;
nbr_jid.job = ns->app_id;
tw_lpid global_dest_id;
if(ns->is_finished == 1 && (ns->neighbor_completed == 1 || ns->local_rank == 0))
{
bf->c1 = 1;
printf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
nbr_jid.rank = ns->local_rank + 1;
/* Send a notification to the neighbor about completion */
int intm_dest_id = codes_jobmap_to_global_id(nbr_jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new->msg_type = CLI_NBR_FINISH;
tw_event_send(e);
}
}
void finish_bckgnd_traffic_rc(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->is_finished = 0;
return;
}
void finish_bckgnd_traffic(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->is_finished = 1;
lprintf("\n LP %llu completed sending data %lld completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
return;
}
void finish_nbr_wkld_rc(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->neighbor_completed = 0;
notify_neighbor_rc(ns, lp, b, msg);
}
void finish_nbr_wkld(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
printf("\n Workload completed, notifying neighbor ");
ns->neighbor_completed = 1;
notify_neighbor(ns, lp, b, msg);
}
static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(bf->c0)
return;
model_net_event_rc2(lp, &m->event_rc);
s->gen_data -= PAYLOAD_SZ;
num_syn_bytes_sent -= PAYLOAD_SZ;
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
}
/* generate synthetic traffic */
static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(s->is_finished == 1)
{
bf->c0 = 1;
return;
}
/* Get job information */
tw_lpid global_dest_id;
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
int dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr == s->local_rank)
{
dest_svr = (s->local_rank + 1) % num_clients;
}
jid.rank = dest_svr;
int intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
nw_message remote_m;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = dest_svr;
remote_m.msg_type = CLI_BCKGND_ARRIVE;
remote_m.fwd.num_bytes = PAYLOAD_SZ;
remote_m.fwd.app_id = s->app_id;
remote_m.fwd.src_rank = s->local_rank;
m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, PAYLOAD_SZ, 0.0,
sizeof(nw_message), (const void*)&remote_m,
0, NULL, lp);
s->gen_data += PAYLOAD_SZ;
num_syn_bytes_sent += PAYLOAD_SZ;
/* New event after MEAN_INTERVAL */
tw_stime ts = mean_interval + tw_rand_exponential(lp->rng, NOISE);
tw_event * e;
nw_message * m_new;
e = tw_event_new(lp->gid, ts, lp);
m_new = tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
}
void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
s->syn_data -= data;
num_syn_bytes_recvd -= data;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
s->syn_data += data;
num_syn_bytes_recvd += data;
}
/* Debugging functions, may generate unused function warning */
static void print_waiting_reqs(int32_t * reqs, int count)
{
......@@ -331,20 +657,32 @@ static int clear_completed_reqs(nw_state * s,
int32_t * reqs, int count)
{
int i, matched = 0;
for( i = 0; i < count; i++)
{
struct qlist_head * ent = NULL;
struct completed_requests * current = NULL;
struct completed_requests * prev = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
struct completed_requests* current =
qlist_entry(ent, completed_requests, ql);
current = qlist_entry(ent, completed_requests, ql);
if(prev)
rc_stack_push(lp, prev, free, s->matched_reqs);
if(current->req_id == reqs[i])
{
++matched;
qlist_del(&current->ql);
rc_stack_push(lp, current, free, s->matched_reqs);
prev = current;
}
else
prev = NULL;
}
if(prev)
rc_stack_push(lp, prev, free, s->matched_reqs);
}
return matched;
}
......@@ -497,6 +835,7 @@ static void codes_exec_mpi_wait_all_rc(
}
return;
}
static void codes_exec_mpi_wait_all(
nw_state* s,
tw_bf * bf,
......@@ -605,10 +944,11 @@ static int rm_matching_rcv(nw_state * ns,
qi = qlist_entry(ent, mpi_msgs_queue, ql);
if(//(qi->num_bytes == qitem->num_bytes)
//&&
((qi->tag == qitem->tag) || qi->tag == -1)
((qi->tag == qitem->tag) || qi->tag == -1)
&& ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
{
matched = 1;
//qitem->num_bytes = qi->num_bytes;
break;
}
++index;
......@@ -616,8 +956,18 @@ static int rm_matching_rcv(nw_state * ns,
if(matched)
{
if(enable_msg_tracking && qitem->num_bytes < EAGER_THRESHOLD)
{
update_message_size(ns, lp, bf, m, qitem, 1, 1);
}
if(qitem->num_bytes >= EAGER_THRESHOLD)
{
/* Matching receive found, need to notify the sender to transmit
* the data */
send_ack_back(ns, bf, m, lp, qitem);
}
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qi->req_init_time);
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
if(qi->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qi->req_id);
......@@ -649,7 +999,7 @@ static int rm_matching_send(nw_state * ns,
(qi->tag == qitem->tag || qitem->tag == -1)
&& ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
{
qitem->num_bytes = qi->num_bytes;
//qi->num_bytes = qitem->num_bytes;
matched = 1;
break;
}
......@@ -658,6 +1008,15 @@ static int rm_matching_send(nw_state * ns,
if(matched)
{
if(enable_msg_tracking && (qi->num_bytes < EAGER_THRESHOLD))
update_message_size(ns, lp, bf, m, qi, 1, 0);
if(qitem->num_bytes >= EAGER_THRESHOLD)
{
/* Matching receive found, need to notify the sender to transmit
* the data */
send_ack_back(ns, bf, m, lp, qi);
}
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
......@@ -830,7 +1189,8 @@ static void codes_exec_mpi_send(nw_state* s,
tw_bf * bf,
nw_message * m,
tw_lp* lp,
struct codes_workload_op * mpi_op)
struct codes_workload_op * mpi_op,
int is_rend)
{
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
......@@ -840,14 +1200,16 @@ static void codes_exec_mpi_send(nw_state* s,
global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
}
//printf("\n Sender rank %d global dest rank %d ", s->nw_id, global_dest_rank);
//printf("\n Sender rank %d global dest rank %d dest-rank %d rend %d", s->nw_id, global_dest_rank, mpi_op->u.send.dest_rank, is_rend);
m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
/* model-net event */
tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
if(!is_rend)
{
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
}
if(enable_sampling)
{
if(tw_now(lp) >= s->cur_interval_end)
......@@ -875,7 +1237,6 @@ static void codes_exec_mpi_send(nw_state* s,
nw_message local_m;
nw_message remote_m;
local_m.fwd.sim_start_time = tw_now(lp);
local_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
local_m.fwd.src_rank = mpi_op->u.send.source_rank;
local_m.op_type = mpi_op->op_type;
......@@ -884,14 +1245,48 @@ static void codes_exec_mpi_send(nw_state* s,
local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
local_m.fwd.req_id = mpi_op->u.send.req_id;
local_m.fwd.app_id = s->app_id;
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{
/* directly issue a model-net send */
local_m.fwd.sim_start_time = tw_now(lp);
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, self_overhead,
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
{
/* Initiate the handshake. Issue a control message to the destination first. No local message,
* only remote message sent. */
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
remote_m.fwd.src_rank = mpi_op->u.send.source_rank;
remote_m.msg_type = MPI_SEND_ARRIVED;
remote_m.op_type = mpi_op->op_type;
remote_m.fwd.tag = mpi_op->u.send.tag;