diff --git a/codes/codes-workload.h b/codes/codes-workload.h index 04d5db993efb6fe4fc1ca01920b17c24a2b93398..5c29ce54447a939ac6dea6bd32d72ae3f8f2ddca 100644 --- a/codes/codes-workload.h +++ b/codes/codes-workload.h @@ -45,7 +45,7 @@ struct codes_workload_op double seconds; } delay; struct { - int count; /* num ranks in barrier */ + int count; /* num ranks in barrier, -1 means "all" */ int root; /* root rank */ } barrier; struct { diff --git a/src/workload/test-workload-method.c b/src/workload/test-workload-method.c index 49d33bad12c2c1b97ba583075c05ec4f0aaa7153..22becc74ec12a30cef0aae0e1ab42a419a2869cb 100644 --- a/src/workload/test-workload-method.c +++ b/src/workload/test-workload-method.c @@ -50,12 +50,15 @@ int test_workload_load(const char* params, int rank) new->rank = rank; - /* synthetic workload, just open file for now */ - new->op_array_len = 1; + /* synthetic workload for testing */ + new->op_array_len = 2; new->op_array_index = 0; new->op_array[0].op_type = CODES_WK_OPEN; new->op_array[0].u.open.file_id = 3; new->op_array[0].u.open.create_flag = 1; + new->op_array[1].op_type = CODES_WK_BARRIER; + new->op_array[1].u.barrier.root = 0; + new->op_array[1].u.barrier.count = -1; /* all ranks */ /* add to front of list of streams that we are tracking */ new->next = wkload_streams; diff --git a/tests/workload/codes-workload-test-cn-lp.c b/tests/workload/codes-workload-test-cn-lp.c index 0e76a4763891035fe953e1a94d38764512609fd5..65e586cc5e1f71d955dc8b759385dfa47995d470 100644 --- a/tests/workload/codes-workload-test-cn-lp.c +++ b/tests/workload/codes-workload-test-cn-lp.c @@ -23,19 +23,23 @@ typedef struct client_state client_state; enum client_event_type { - CLIENT_KICKOFF, /* initial event */ + CLIENT_KICKOFF = 64, /* initial event */ CLIENT_OP_COMPLETE, /* finished previous I/O operation */ + CLIENT_OP_BARRIER, /* event received at root to indicate barrier entry */ }; struct client_state { int my_rank; int wkld_id; + int target_barrier_count; + int current_barrier_count; }; struct client_msg { enum client_event_type event_type; + int barrier_count; }; static void handle_client_op_loop_rev_event( @@ -48,6 +52,17 @@ static void handle_client_op_loop_event( tw_bf * b, client_msg * m, tw_lp * lp); +static void handle_client_op_barrier_rev_event( + client_state * ns, + tw_bf * b, + client_msg * m, + tw_lp * lp); +static void handle_client_op_barrier_event( + client_state * ns, + tw_bf * b, + client_msg * m, + tw_lp * lp); +static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count); static void client_init( client_state * ns, @@ -117,6 +132,9 @@ static void client_event( case CLIENT_OP_COMPLETE: handle_client_op_loop_event(ns, b, m, lp); break; + case CLIENT_OP_BARRIER: + handle_client_op_barrier_event(ns, b, m, lp); + break; default: assert(0); break; @@ -132,8 +150,12 @@ static void client_rev_event( switch (m->event_type) { case CLIENT_KICKOFF: + case CLIENT_OP_COMPLETE: handle_client_op_loop_rev_event(ns, b, m, lp); break; + case CLIENT_OP_BARRIER: + handle_client_op_barrier_rev_event(ns, b, m, lp); + break; default: assert(0); break; @@ -155,6 +177,18 @@ static tw_peid node_mapping( return (tw_peid) gid / g_tw_nlp; } +static void handle_client_op_barrier_rev_event( + client_state * ns, + tw_bf * b, + client_msg * m, + tw_lp * lp) +{ + /* TODO: fill this in */ + assert(0); + + return; +} + static void handle_client_op_loop_rev_event( client_state * ns, tw_bf * b, @@ -167,7 +201,44 @@ static void handle_client_op_loop_rev_event( return; } -/* handle initial event */ +/* handle barrier */ +static void handle_client_op_barrier_event( + client_state * ns, + tw_bf * b, + client_msg * m, + tw_lp * lp) +{ + tw_event *e; + client_msg *m_out; + int i; + + assert(ns->target_barrier_count == 0 || ns->target_barrier_count == m->barrier_count); + if(ns->target_barrier_count == 0) + { + ns->target_barrier_count = m->barrier_count; + ns->current_barrier_count = 0; + } + + ns->current_barrier_count++; + + if(ns->current_barrier_count == ns->target_barrier_count) + { + /* release all clients, including self */ + for(i=0; icurrent_barrier_count; i++) + { + e = codes_event_new(lp->gid+i, codes_local_latency(lp), lp); + m_out = tw_event_data(e); + m_out->event_type = CLIENT_OP_COMPLETE; + tw_event_send(e); + } + ns->current_barrier_count=0; + ns->target_barrier_count=0; + } + + return; +} + +/* event indicates that we can issue the next operation */ static void handle_client_op_loop_event( client_state * ns, tw_bf * b, @@ -197,8 +268,18 @@ static void handle_client_op_loop_event( { case CODES_WK_END: printf("Client rank %d completed workload.\n", ns->my_rank); + /* stop issuing events; we are done */ return; break; + case CODES_WK_BARRIER: + printf("Client rank %d hit barrier.\n", ns->my_rank); + cn_enter_barrier(lp, op.u.barrier.root, op.u.barrier.count); + return; + break; + /* "normal" io operations: we just calculate the destination and + * then continue after the switch block to send the specified + * operation to a server. + */ case CODES_WK_OPEN: printf("Client rank %d will issue an open request.\n", ns->my_rank); dest_svr_id = g_num_clients + op.u.open.file_id % g_num_servers; @@ -213,6 +294,24 @@ static void handle_client_op_loop_event( return; } +static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count) +{ + tw_event *e; + client_msg *m_out; + + e = codes_event_new(gid, codes_local_latency(lp), lp); + m_out = tw_event_data(e); + m_out->event_type = CLIENT_OP_BARRIER; + if(count == -1) + m_out->barrier_count = g_num_clients; + else + m_out->barrier_count = count; + tw_event_send(e); + + return; +} + + void cn_op_complete(tw_lp *lp, tw_lpid gid) { tw_event *e; diff --git a/tests/workload/codes-workload-test-svr-lp.c b/tests/workload/codes-workload-test-svr-lp.c index 240fe1f81f9f53c19c63e0dd9097aeb893ad836a..38517757419503f2d9b10af04b2d53dae548d4da 100644 --- a/tests/workload/codes-workload-test-svr-lp.c +++ b/tests/workload/codes-workload-test-svr-lp.c @@ -21,7 +21,7 @@ typedef struct svr_state svr_state; enum svr_event_type { - SVR_OP, + SVR_OP = 128, }; struct svr_state