Commit c25ca482 authored by Rob Latham's avatar Rob Latham
Browse files

try non-blocking abt-io routines

parent e0d8c5f3
......@@ -17,6 +17,7 @@
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/margo_exception.hpp>
#include <list>
#include "bv-provider.h"
......@@ -213,14 +214,17 @@ static void write_ult(void *_args)
// - what if the client has a really long file descripton but for some reason only a small amount of memory?
// - what if the client has a really large amount of memory but a short file description?
// -- write returns the smaller of the two
std::list<abt_io_op_t *> ops;
std::list<ssize_t> rets;
io_time = ABT_get_wtime();
while (file_idx < args->file_starts.size() && file_xfer < local_bufsize) {
// we might be able to only write a partial block
rets.push_back(-1);
nbytes = MIN(args->file_sizes[file_idx]-fileblk_cursor, client_xfer-buf_cursor);
io_time = ABT_get_wtime();
file_xfer += abt_io_pwrite(args->abt_id, args->fd, (char*)local_buffer+buf_cursor, nbytes, args->file_starts[file_idx]+fileblk_cursor);
io_time = ABT_get_wtime() - io_time;
total_io_time += io_time;
abt_io_op_t * write_op = abt_io_pwrite_nb(args->abt_id, args->fd, (char*)local_buffer+buf_cursor, nbytes, args->file_starts[file_idx]+fileblk_cursor, &(rets.back()) );
ops.push_back(write_op);
write_count++;
if (nbytes + fileblk_cursor >= args->file_sizes[file_idx]) {
......@@ -237,6 +241,19 @@ static void write_ult(void *_args)
xfered += nbytes;
}
for (auto x : ops) {
abt_io_op_wait(x);
abt_io_op_free(x);
}
io_time = ABT_get_wtime() - io_time;
total_io_time += io_time;
for (auto x: rets)
file_xfer += x;
ops.clear();
rets.clear();
//fprintf(stderr, " SERVER: ABT-IO POOL: %ld items\n", abt_io_get_pool_size(args->abt_id));
client_cursor += client_xfer;
margo_bulk_pool_release(args->mr_pool, local_bulk);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment