Commit 8ef60288 authored by Rob Latham's avatar Rob Latham
Browse files

put the timings back in

parent eb44ec60
......@@ -52,6 +52,8 @@ struct io_args {
ABT_mutex mutex; /* guards the 'args' state shared across all ULTs */
ABT_eventual eventual;
int ret;
/* statistics collection */
io_stats stats;
io_args(tl::engine *eng, tl::endpoint e, abt_io_instance_id id, int f, tl::bulk &b, margo_bulk_pool_t p, size_t x,
const std::vector<off_t> & start_vec,
......@@ -126,6 +128,7 @@ static void write_ult(void *_args)
* across every loop */
unsigned int file_idx=0;
size_t fileblk_cursor;
double mutex_time;
while (args->client_cursor < args->client_bulk.size() && file_idx < args->file_starts.size() )
{
......@@ -135,6 +138,9 @@ static void write_ult(void *_args)
size_t xfered=0, nbytes, file_xfer=0;
hg_bulk_t local_bulk;
hg_uint32_t actual_count;
double bulk_time, io_time, total_io_time;
int write_count=0;
/* Adopting same aproach as 'bake-server.c' : we will create lots of ULTs,
* some of which might not end up doing anything */
......@@ -146,7 +152,10 @@ static void write_ult(void *_args)
//margo_bulk_free(local_bulk); // thallium wrap() increments refcount
//but triggers segfault
mutex_time = ABT_get_wtime();
ABT_mutex_lock(args->mutex);
mutex_time = ABT_get_wtime() - mutex_time;
args->stats.mutex_time += mutex_time;
/* save these three for when we actually do I/O */
auto first_file_index = args->file_idx;
auto first_file_cursor = args->fileblk_cursor;
......@@ -176,6 +185,7 @@ static void write_ult(void *_args)
file_idx = first_file_index;
fileblk_cursor = first_file_cursor;
bulk_time = ABT_get_wtime();
// the '>>' operator moves bytes from one bulk descriptor to the
// other, moving the smaller of the two
// operator overloading might make this a little hard to parse at
......@@ -197,6 +207,7 @@ static void write_ult(void *_args)
} catch (...) {
std::cerr <<" Bulk get error. Ignoring " << std::endl;
}
bulk_time = ABT_get_wtime() - bulk_time;
// when are we done?
// - what if the client has a really long file descripton but for some reason only a small amount of memory?
......@@ -206,7 +217,11 @@ static void write_ult(void *_args)
// we might be able to only write a partial block
nbytes = MIN(args->file_sizes[file_idx]-fileblk_cursor, client_xfer-buf_cursor);
io_time = ABT_get_wtime();
file_xfer += abt_io_pwrite(args->abt_id, args->fd, (char*)local_buffer+buf_cursor, nbytes, args->file_starts[file_idx]+fileblk_cursor);
io_time = ABT_get_wtime() - io_time;
total_io_time += io_time;
write_count++;
if (nbytes + fileblk_cursor >= args->file_sizes[file_idx]) {
file_idx++;
......@@ -225,6 +240,13 @@ static void write_ult(void *_args)
client_cursor += client_xfer;
margo_bulk_pool_release(args->mr_pool, local_bulk);
ABT_mutex_lock(args->mutex);
args->stats.write_bulk_time += bulk_time;
args->stats.write_bulk_xfers++;
args->stats.server_write_time += total_io_time;
args->stats.server_write_calls += write_count;
args->stats.bytes_written += file_xfer;
ABT_mutex_unlock(args->mutex);
}
ABT_mutex_lock(args->mutex);
......@@ -258,6 +280,9 @@ static void read_ult(void *_args)
size_t xfered=0, nbytes, file_xfer=0;
hg_bulk_t local_bulk;
hg_uint32_t actual_count;
double bulk_time, io_time, total_io_time;
int read_count = 0;
/* Adopting same aproach as 'bake-server.c' : we will create lots of ULTs,
* some of which might not end up doing anything */
......@@ -269,7 +294,10 @@ static void read_ult(void *_args)
//margo_bulk_free(local_bulk); // thallium wrap() increments refcount,
//but triggers segfault
double mutex_time = ABT_get_wtime();
ABT_mutex_lock(args->mutex);
mutex_time = ABT_get_wtime() - mutex_time;
args->stats.mutex_time += mutex_time;
/* save these three for when we actually do I/O */
auto first_file_index = args->file_idx;
auto first_file_cursor = args->fileblk_cursor;
......@@ -297,7 +325,6 @@ static void read_ult(void *_args)
file_idx = first_file_index;
fileblk_cursor = first_file_cursor;
// see write_ult for more discussion of when we stop processing
while (file_idx < args->file_starts.size() && file_xfer < local_bufsize) {
......@@ -305,7 +332,12 @@ static void read_ult(void *_args)
// 'local_bufsize' here instead of 'client_xfer' because we are
// filling the local memory buffer first and then doing the rma put
nbytes = MIN(args->file_sizes[file_idx]-fileblk_cursor, local_bufsize-buf_cursor);
io_time = ABT_get_wtime();
file_xfer += abt_io_pread(args->abt_id, args->fd, (char*)local_buffer+buf_cursor, nbytes, args->file_starts[file_idx]+fileblk_cursor);
io_time = ABT_get_wtime()-io_time;
total_io_time += io_time;
read_count++;
if (nbytes + fileblk_cursor >= args->file_sizes[file_idx]) {
file_idx++;
......@@ -321,9 +353,6 @@ static void read_ult(void *_args)
xfered += nbytes;
}
ABT_mutex_lock(args->mutex);
args->xfered += xfered;
ABT_mutex_unlock(args->mutex);
// the '<<' operator moves bytes from one bulk descriptor to the
// other, moving the smaller of the two.
......@@ -336,6 +365,7 @@ static void read_ult(void *_args)
// - select a subset on the client-side bulk descriptor before
// associating it with a connection.
bulk_time = ABT_get_wtime();
try {
client_xfer = args->client_bulk(client_cursor, args->client_bulk.size()-client_cursor).on(args->ep) << local;
} catch (const tl::margo_exception &err) {
......@@ -347,6 +377,16 @@ static void read_ult(void *_args)
} catch (...) {
std::cerr << "Bulk put problem. Ignoring. " << std::endl;
}
bulk_time = ABT_get_wtime() - bulk_time;
ABT_mutex_lock(args->mutex);
args->xfered += xfered;
args->stats.read_bulk_time += bulk_time;
args->stats.read_bulk_xfers++;
args->stats.server_read_time += total_io_time;
args->stats.server_read_calls += read_count;
args->stats.bytes_read += file_xfer;
ABT_mutex_unlock(args->mutex);
client_cursor += client_xfer;
......@@ -453,7 +493,20 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ABT_eventual_wait(args.eventual, NULL);
ABT_eventual_free(&args.eventual);
local_stats.write_response = ABT_get_wtime();
req.respond(args.client_cursor);
local_stats.write_response = ABT_get_wtime() - local_stats.write_response;
local_stats += args.stats;
local_stats.write_rpc_calls++;
local_stats.write_rpc_time += ABT_get_wtime() - write_time;
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats += local_stats;
}
return 0;
}
......@@ -501,7 +554,20 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
}
ABT_eventual_wait(args.eventual, NULL);
ABT_eventual_free(&args.eventual);
local_stats.read_response = ABT_get_wtime();
req.respond(args.client_cursor);
local_stats.read_response = ABT_get_wtime() - local_stats.read_response;
local_stats += args.stats;
local_stats.read_rpc_calls++;
local_stats.read_rpc_time += ABT_get_wtime() - read_time;
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats += local_stats;
}
return 0;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment