Commit a0a58558 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

implemented SWM_Allreduce and SWM_Barrier

parent 2ed8b59d
......@@ -20,6 +20,8 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#define ALLREDUCE_SHORT_MSG_SIZE 2048
#define DBG_COMM 1
using namespace std;
......@@ -30,18 +32,18 @@ static int total_rank_cnt = 0;
ABT_thread global_prod_thread = NULL;
struct shared_context {
int my_rank;
int num_ranks;
char workload_name[MAX_NAME_LENGTH_WKLD];
void * swm_obj;
ABT_thread producer;
std::deque<struct codes_workload_op*> fifo;
int my_rank;
int num_ranks;
char workload_name[MAX_NAME_LENGTH_WKLD];
void * swm_obj;
ABT_thread producer;
std::deque<struct codes_workload_op*> fifo;
};
struct rank_mpi_context {
struct qhash_head hash_link;
int app_id;
struct shared_context sctx;
struct qhash_head hash_link;
int app_id;
struct shared_context sctx;
};
typedef struct rank_mpi_compare {
......@@ -60,18 +62,18 @@ typedef struct rank_mpi_compare {
* reqrt and rsprt: routing types (to be ignored) */
void SWM_Send(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
/* add an event in the shared queue and then yield */
// printf("\n Sending to rank %d ", comm_id);
// printf("\n Sending to rank %d ", comm_id);
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_SEND;
......@@ -114,6 +116,7 @@ void SWM_Barrier(
SWM_ROUTING_TYPE rsprt)
{
/* Add an event in the shared queue and then yield */
#if 0
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_DELAY;
......@@ -134,22 +137,50 @@ void SWM_Barrier(
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err;
int rank, size, src, dst, mask;
err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
rank = sctx->my_rank;
size = sctx->num_ranks;
mask = 0x1;
while(mask < size) {
dst = (rank + mask) % size;
src = (rank - mask + size) % size;
args.dest = dst;
args.source = src;
SWM_Sendrecv(comm_id, dest, 1234, reqvc, rspvc, NULL, 0, 0,
src, 1234, NULL, reqrt, rsprt);
mask <<= 1;
}
}
void SWM_Isend(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
uint32_t * handle,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
uint32_t * handle,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
/* add an event in the shared queue and then yield */
// printf("\n Sending to rank %d ", comm_id);
// printf("\n Sending to rank %d ", comm_id);
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ISEND;
......@@ -284,7 +315,7 @@ void SWM_Wait(uint32_t req_id)
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
......@@ -314,26 +345,26 @@ void SWM_Waitall(int len, uint32_t * req_ids)
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Sendrecv(
SWM_COMM_ID comm_id,
SWM_PEER sendpeer,
SWM_TAG sendtag,
SWM_VC sendreqvc,
SWM_VC sendrspvc,
SWM_BUF sendbuf,
SWM_BYTES sendbytes,
SWM_BYTES pktrspbytes,
SWM_PEER recvpeer,
SWM_TAG recvtag,
SWM_BUF recvbuf,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
SWM_COMM_ID comm_id,
SWM_PEER sendpeer,
SWM_TAG sendtag,
SWM_VC sendreqvc,
SWM_VC sendrspvc,
SWM_BUF sendbuf,
SWM_BYTES sendbytes,
SWM_BYTES pktrspbytes,
SWM_PEER recvpeer,
SWM_TAG recvtag,
SWM_BUF recvbuf,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
// printf("\n Sending to %d receiving from %d ", sendpeer, recvpeer);
// printf("\n Sending to %d receiving from %d ", sendpeer, recvpeer);
struct codes_workload_op send_op;
send_op.op_type = CODES_WK_SEND;
......@@ -347,7 +378,7 @@ void SWM_Sendrecv(
recv_op.op_type = CODES_WK_RECV;
recv_op.u.recv.tag = recvtag;
recv_op.u.recv.source_rank = recvpeer;
#ifdef DBG_COMM
printf("\n send/recv op send-tag %d send-bytes %d recv-tag: %d recv-source: %d ", sendtag, sendbytes, recvtag, recvpeer);
#endif
......@@ -367,7 +398,7 @@ void SWM_Sendrecv(
ABT_thread_yield_to(global_prod_thread);
}
/* @param bytes: number of bytes in Allreduce
/* @param count: number of bytes in Allreduce
* @param respbytes: number of bytes to be sent in response (ignore for our
* purpose)
* $params comm_id: communicator ID (MPI_COMM_WORLD for our case)
......@@ -378,7 +409,7 @@ void SWM_Sendrecv(
* @param sendbuf and rcvbuf: buffers for send and receive calls (ignore for
* our purpose) */
void SWM_Allreduce(
SWM_BYTES bytes,
SWM_BYTES count,
SWM_BYTES respbytes,
SWM_COMM_ID comm_id,
SWM_VC sendreqvc,
......@@ -386,8 +417,9 @@ void SWM_Allreduce(
SWM_BUF sendbuf,
SWM_BUF rcvbuf)
{
#if 0
/* TODO: For now, simulate a constant delay for ALlreduce*/
// printf("\n Allreduce bytes %d ", bytes);
// printf("\n Allreduce bytes %d ", bytes);
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
......@@ -409,6 +441,161 @@ void SWM_Allreduce(
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
int comm_size, i, send_idx, recv_idx, last_idx, send_cnt, recv_cnt;
int pof2, mask, rem, newrank, newdst, dst, *cnts, *disps;
int rank = sctx->my_rank;
comm_size = sctx->num_ranks;
cnts = disps = NULL;
pof2 = 1;
while (pof2 <= comm_size) pof2 <<= 1;
pof2 >>=1;
rem = comm_size - pof2;
/* In the non-power-of-two case, all even-numbered
processes of rank < 2*rem send their data to
(rank+1). These even-numbered processes no longer
participate in the algorithm until the very end. The
remaining processes form a nice power-of-two. */
if (rank < 2*rem) {
if (rank % 2 == 0) { /* even */
SWM_Send(rank+1, comm_id, 1235, sendreqvc, sendrspvc, NULL, count, 1, 0, 0);
newrank = -1;
} else { /* odd */
SWM_Recv(rank-1, comm_id, 1235, NULL);
newrank = rank / 2;
}
} else {
newrank = rank - rem;
}
/* If op is user-defined or count is less than pof2, use
recursive doubling algorithm. Otherwise do a reduce-scatter
followed by allgather. (If op is user-defined,
derived datatypes are allowed and the user could pass basic
datatypes on one process and derived on another as long as
the type maps are the same. Breaking up derived
datatypes to do the reduce-scatter is tricky, therefore
using recursive doubling in that case.) */
if (newrank != -1) {
if ((count <= ALLREDUCE_SHORT_MSG_SIZE) || (count < pof2)) {
mask = 0x1;
while (mask < pof2) {
newdst = newrank ^ mask;
dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
SWM_Sendrecv(comm_id, dst, 1235, sendreqvc, sendrspvc, NULL,
count, 1, dst, 1235, NULL, 0, 0);
mask <<= 1;
}
} else {
/* do a reduce-scatter followed by allgather */
/* for the reduce-scatter, calculate the count that
each process receives and the displacement within
the buffer */
cnts = (int*)malloc(pof2*sizeof(int));
disps = (int*)malloc(pof2*sizeof(int));
for (i=0; i<(pof2-1); i++)
cnts[i] = count/pof2;
cnts[pof2-1] = count - (count/pof2)*(pof2-1);
disps[0] = 0;
for (i=1; i<pof2; i++)
disps[i] = disps[i-1] + cnts[i-1];
mask = 0x1;
send_idx = recv_idx = 0;
last_idx = pof2;
while (mask < pof2) {
newdst = newrank ^ mask;
dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
send_cnt = recv_cnt = 0;
if (newrank < newdst) {
send_idx = recv_idx + pof2/(mask*2);
for (i=send_idx; i<last_idx; i++)
send_cnt += cnts[i];
for (i=recv_idx; i<send_idx; i++)
recv_cnt += cnts[i];
} else {
recv_idx = send_idx + pof2/(mask*2);
for (i=send_idx; i<recv_idx; i++)
send_cnt += cnts[i];
for (i=recv_idx; i<last_idx; i++)
recv_cnt += cnts[i];
}
SWM_Sendrecv(comm_id, dst, 1235, sendreqvc, sendrspvc, NULL,
send_cnt, 1, dst, 1235, NULL, 0, 0);
send_idx = recv_idx;
mask <<= 1;
if(mask < pof2)
last_idx = recv_idx + pof2/mask;
}
/* now do the allgather */
mask >>= 1;
while (mask > 0) {
newdst = newrank ^ mask;
/* find real rank of dest */
dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem;
send_cnt = recv_cnt = 0;
if (newrank < newdst) {
if (mask != pof2/2)
last_idx = last_idx + pof2/(mask*2);
recv_idx = send_idx + pof2/(mask*2);
for (i=send_idx; i<recv_idx; i++)
send_cnt += cnts[i];
for (i=recv_idx; i<last_idx; i++)
recv_cnt += cnts[i];
} else {
recv_idx = send_idx - pof2/(mask*2);
for (i=send_idx; i<last_idx; i++)
send_cnt += cnts[i];
for (i=recv_idx; i<send_idx; i++)
recv_cnt += cnts[i];
}
SWM_Sendrecv(comm_id, dst, 1235, sendreqvc, sendrspvc, NULL,
send_cnt, 1, dst, 1235, NULL, 0, 0);
if (newrank > newdst) send_idx = recv_idx;
mask >>= 1;
}
}
}
if(rank < 2*rem) {
if(rank % 2) {/* odd */
SWM_Send(rank-1, comm_id, 1235, sendreqvc, sendrspvc, NULL, count, 1, 0, 0);
} else {
SWM_Recv(rank+1, comm_id, 1235, NULL);
}
}
if(cnts) free(cnts);
if(disps) free(disps);
}
void SWM_Allreduce(
......@@ -480,7 +667,7 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
/* LOAD parameters from JSON file*/
online_comm_params * o_params = (online_comm_params*)params;
int nprocs = o_params->nprocs;
rank_mpi_context *my_ctx;
my_ctx = (rank_mpi_context*)calloc(1, sizeof(rank_mpi_context));
assert(my_ctx);
......@@ -509,19 +696,19 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
else
tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name);
printf("\n path %s ", path.c_str());
try {
std::ifstream jsonFile(path);
// root.put("C:.Windows.System", "20 files");
// boost::property_tree::json_parser::write_json("file.json", root);
boost::property_tree::json_parser::read_json(jsonFile, root);
uint32_t process_cnt = root.get<uint32_t>("jobs.size", 1);
}
catch(std::exception & e)
{
printf("%s \n", e.what());
return -1;
}
printf("\n path %s ", path.c_str());
try {
std::ifstream jsonFile(path);
// root.put("C:.Windows.System", "20 files");
// boost::property_tree::json_parser::write_json("file.json", root);
boost::property_tree::json_parser::read_json(jsonFile, root);
uint32_t process_cnt = root.get<uint32_t>("jobs.size", 1);
}
catch(std::exception & e)
{
printf("%s \n", e.what());
return -1;
}
if(strcmp(o_params->workload_name, "lammps") == 0)
{
LAMMPS_SWM * lammps_swm = new LAMMPS_SWM(root, generic_ptrs);
......@@ -529,8 +716,8 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
}
else if(strcmp(o_params->workload_name, "nekbone") == 0)
{
NEKBONESWMUserCode * nekbone_swm = new NEKBONESWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)nekbone_swm;
NEKBONESWMUserCode * nekbone_swm = new NEKBONESWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)nekbone_swm;
}
ABT_xstream self_es;
......@@ -561,9 +748,9 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
static void comm_online_workload_get_next(int app_id, int rank, struct codes_workload_op * op)
{
/* At this point, we will use the "call" function. The send/receive/wait
* definitions will be replaced by our own function definitions that will do a
* yield to argobots if an event is not available. */
/* At this point, we will use the "call" function. The send/receive/wait
* definitions will be replaced by our own function definitions that will do a
* yield to argobots if an event is not available. */
/* if shared queue is empty then yield */
rank_mpi_context * temp_data;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment