Commit 7d2a86f0 authored by Philip Carns's avatar Philip Carns

barrier example

parent 6e1a5b40
......@@ -45,7 +45,7 @@ struct codes_workload_op
double seconds;
} delay;
struct {
int count; /* num ranks in barrier */
int count; /* num ranks in barrier, -1 means "all" */
int root; /* root rank */
} barrier;
struct {
......
......@@ -50,12 +50,15 @@ int test_workload_load(const char* params, int rank)
new->rank = rank;
/* synthetic workload, just open file for now */
new->op_array_len = 1;
/* synthetic workload for testing */
new->op_array_len = 2;
new->op_array_index = 0;
new->op_array[0].op_type = CODES_WK_OPEN;
new->op_array[0].u.open.file_id = 3;
new->op_array[0].u.open.create_flag = 1;
new->op_array[1].op_type = CODES_WK_BARRIER;
new->op_array[1].u.barrier.root = 0;
new->op_array[1].u.barrier.count = -1; /* all ranks */
/* add to front of list of streams that we are tracking */
new->next = wkload_streams;
......
......@@ -23,19 +23,23 @@ typedef struct client_state client_state;
enum client_event_type
{
CLIENT_KICKOFF, /* initial event */
CLIENT_KICKOFF = 64, /* initial event */
CLIENT_OP_COMPLETE, /* finished previous I/O operation */
CLIENT_OP_BARRIER, /* event received at root to indicate barrier entry */
};
struct client_state
{
int my_rank;
int wkld_id;
int target_barrier_count;
int current_barrier_count;
};
struct client_msg
{
enum client_event_type event_type;
int barrier_count;
};
static void handle_client_op_loop_rev_event(
......@@ -48,6 +52,17 @@ static void handle_client_op_loop_event(
tw_bf * b,
client_msg * m,
tw_lp * lp);
static void handle_client_op_barrier_rev_event(
client_state * ns,
tw_bf * b,
client_msg * m,
tw_lp * lp);
static void handle_client_op_barrier_event(
client_state * ns,
tw_bf * b,
client_msg * m,
tw_lp * lp);
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count);
static void client_init(
client_state * ns,
......@@ -117,6 +132,9 @@ static void client_event(
case CLIENT_OP_COMPLETE:
handle_client_op_loop_event(ns, b, m, lp);
break;
case CLIENT_OP_BARRIER:
handle_client_op_barrier_event(ns, b, m, lp);
break;
default:
assert(0);
break;
......@@ -132,8 +150,12 @@ static void client_rev_event(
switch (m->event_type)
{
case CLIENT_KICKOFF:
case CLIENT_OP_COMPLETE:
handle_client_op_loop_rev_event(ns, b, m, lp);
break;
case CLIENT_OP_BARRIER:
handle_client_op_barrier_rev_event(ns, b, m, lp);
break;
default:
assert(0);
break;
......@@ -155,6 +177,18 @@ static tw_peid node_mapping(
return (tw_peid) gid / g_tw_nlp;
}
static void handle_client_op_barrier_rev_event(
client_state * ns,
tw_bf * b,
client_msg * m,
tw_lp * lp)
{
/* TODO: fill this in */
assert(0);
return;
}
static void handle_client_op_loop_rev_event(
client_state * ns,
tw_bf * b,
......@@ -167,7 +201,44 @@ static void handle_client_op_loop_rev_event(
return;
}
/* handle initial event */
/* handle barrier */
static void handle_client_op_barrier_event(
client_state * ns,
tw_bf * b,
client_msg * m,
tw_lp * lp)
{
tw_event *e;
client_msg *m_out;
int i;
assert(ns->target_barrier_count == 0 || ns->target_barrier_count == m->barrier_count);
if(ns->target_barrier_count == 0)
{
ns->target_barrier_count = m->barrier_count;
ns->current_barrier_count = 0;
}
ns->current_barrier_count++;
if(ns->current_barrier_count == ns->target_barrier_count)
{
/* release all clients, including self */
for(i=0; i<ns->current_barrier_count; i++)
{
e = codes_event_new(lp->gid+i, codes_local_latency(lp), lp);
m_out = tw_event_data(e);
m_out->event_type = CLIENT_OP_COMPLETE;
tw_event_send(e);
}
ns->current_barrier_count=0;
ns->target_barrier_count=0;
}
return;
}
/* event indicates that we can issue the next operation */
static void handle_client_op_loop_event(
client_state * ns,
tw_bf * b,
......@@ -197,8 +268,18 @@ static void handle_client_op_loop_event(
{
case CODES_WK_END:
printf("Client rank %d completed workload.\n", ns->my_rank);
/* stop issuing events; we are done */
return;
break;
case CODES_WK_BARRIER:
printf("Client rank %d hit barrier.\n", ns->my_rank);
cn_enter_barrier(lp, op.u.barrier.root, op.u.barrier.count);
return;
break;
/* "normal" io operations: we just calculate the destination and
* then continue after the switch block to send the specified
* operation to a server.
*/
case CODES_WK_OPEN:
printf("Client rank %d will issue an open request.\n", ns->my_rank);
dest_svr_id = g_num_clients + op.u.open.file_id % g_num_servers;
......@@ -213,6 +294,24 @@ static void handle_client_op_loop_event(
return;
}
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count)
{
tw_event *e;
client_msg *m_out;
e = codes_event_new(gid, codes_local_latency(lp), lp);
m_out = tw_event_data(e);
m_out->event_type = CLIENT_OP_BARRIER;
if(count == -1)
m_out->barrier_count = g_num_clients;
else
m_out->barrier_count = count;
tw_event_send(e);
return;
}
void cn_op_complete(tw_lp *lp, tw_lpid gid)
{
tw_event *e;
......
......@@ -21,7 +21,7 @@ typedef struct svr_state svr_state;
enum svr_event_type
{
SVR_OP,
SVR_OP = 128,
};
struct svr_state
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment