Commit 34bbe5b8 authored by Philip Carns's avatar Philip Carns
Browse files

add delay operation example

parent 9497a9c7
...@@ -52,14 +52,33 @@ static int test_workload_load(const char* params, int rank) ...@@ -52,14 +52,33 @@ static int test_workload_load(const char* params, int rank)
new->rank = rank; new->rank = rank;
/* arbitrary synthetic workload for testing purposes */ /* arbitrary synthetic workload for testing purposes */
new->op_array_len = 2; /* rank 0 sleeps 43 seconds, then does open and barrier, while all other
new->op_array_index = 0; * ranks immediately do open and barrier
new->op_array[0].op_type = CODES_WK_OPEN; */
new->op_array[0].u.open.file_id = 3; if(rank == 0)
new->op_array[0].u.open.create_flag = 1; {
new->op_array[1].op_type = CODES_WK_BARRIER; new->op_array_len = 3;
new->op_array[1].u.barrier.root = 0; new->op_array_index = 0;
new->op_array[1].u.barrier.count = -1; /* all ranks */ new->op_array[0].op_type = CODES_WK_DELAY;
new->op_array[0].u.delay.seconds = 43;
new->op_array[1].op_type = CODES_WK_OPEN;
new->op_array[1].u.open.file_id = 3;
new->op_array[1].u.open.create_flag = 1;
new->op_array[2].op_type = CODES_WK_BARRIER;
new->op_array[2].u.barrier.root = 0;
new->op_array[2].u.barrier.count = -1; /* all ranks */
}
else
{
new->op_array_len = 2;
new->op_array_index = 0;
new->op_array[0].op_type = CODES_WK_OPEN;
new->op_array[0].u.open.file_id = 3;
new->op_array[0].u.open.create_flag = 1;
new->op_array[1].op_type = CODES_WK_BARRIER;
new->op_array[1].u.barrier.root = 0;
new->op_array[1].u.barrier.count = -1; /* all ranks */
}
/* add to front of list of streams that we are tracking */ /* add to front of list of streams that we are tracking */
new->next = wkload_streams; new->next = wkload_streams;
...@@ -82,7 +101,12 @@ static void test_workload_get_next(int rank, struct codes_workload_op *op) ...@@ -82,7 +101,12 @@ static void test_workload_get_next(int rank, struct codes_workload_op *op)
tmp = tmp->next; tmp = tmp->next;
} }
assert(tmp); if(!tmp)
{
op->op_type = CODES_WK_END;
return;
}
assert(tmp->rank == rank); assert(tmp->rank == rank);
if(tmp->op_array_index < tmp->op_array_len) if(tmp->op_array_index < tmp->op_array_len)
......
...@@ -68,6 +68,7 @@ static void handle_client_op_barrier_event( ...@@ -68,6 +68,7 @@ static void handle_client_op_barrier_event(
tw_lp * lp); tw_lp * lp);
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count); static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count);
static void cn_enter_barrier_rc(tw_lp *lp); static void cn_enter_barrier_rc(tw_lp *lp);
static void cn_delay(tw_lp *lp, double seconds);
static void client_init( static void client_init(
client_state * ns, client_state * ns,
...@@ -285,6 +286,7 @@ static void handle_client_op_loop_event( ...@@ -285,6 +286,7 @@ static void handle_client_op_loop_event(
if(m->event_type == CLIENT_KICKOFF) if(m->event_type == CLIENT_KICKOFF)
{ {
/* first operation; initialize the desired workload generator */ /* first operation; initialize the desired workload generator */
printf("codes_workload_load on gid: %ld\n", lp->gid);
ns->wkld_id = codes_workload_load("test", NULL, ns->my_rank); ns->wkld_id = codes_workload_load("test", NULL, ns->my_rank);
assert(ns->wkld_id > -1); assert(ns->wkld_id > -1);
} }
...@@ -302,6 +304,9 @@ static void handle_client_op_loop_event( ...@@ -302,6 +304,9 @@ static void handle_client_op_loop_event(
switch(m->op_rc.op_type) switch(m->op_rc.op_type)
{ {
/* this first set of operation types are handled exclusively by the
* client
*/
case CODES_WK_END: case CODES_WK_END:
ns->completion_time = tw_now(lp); ns->completion_time = tw_now(lp);
printf("Client rank %d completed workload.\n", ns->my_rank); printf("Client rank %d completed workload.\n", ns->my_rank);
...@@ -313,6 +318,13 @@ static void handle_client_op_loop_event( ...@@ -313,6 +318,13 @@ static void handle_client_op_loop_event(
cn_enter_barrier(lp, m->op_rc.u.barrier.root, m->op_rc.u.barrier.count); cn_enter_barrier(lp, m->op_rc.u.barrier.root, m->op_rc.u.barrier.count);
return; return;
break; break;
case CODES_WK_DELAY:
printf("Client rank %d will delay for %f seconds.\n", ns->my_rank,
m->op_rc.u.delay.seconds);
cn_delay(lp, m->op_rc.u.delay.seconds);
return;
break;
/* "normal" io operations: we just calculate the destination and /* "normal" io operations: we just calculate the destination and
* then continue after the switch block to send the specified * then continue after the switch block to send the specified
* operation to a server. * operation to a server.
...@@ -337,6 +349,20 @@ static void cn_enter_barrier_rc(tw_lp *lp) ...@@ -337,6 +349,20 @@ static void cn_enter_barrier_rc(tw_lp *lp)
return; return;
} }
static void cn_delay(tw_lp *lp, double seconds)
{
tw_event *e;
client_msg *m_out;
/* message to self */
e = codes_event_new(lp->gid, seconds, lp);
m_out = tw_event_data(e);
m_out->event_type = CLIENT_OP_COMPLETE;
tw_event_send(e);
return;
}
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count) static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count)
{ {
tw_event *e; tw_event *e;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment