Commit b3fc8fc6 authored by Rob Latham's avatar Rob Latham
Browse files

tighten up critical section locks

In Thallium a class's member data is shared across threads, but variables in
methods are thread-local.  Move a bunch of stuff into methods lets us hold the
mutex for a lot less time.
parent 74ed8941
......@@ -73,6 +73,40 @@ class io_stats {
double client_read_time; // time client spent in "bv_read
double client_init_time; // how long does it take to set everything up
io_stats & operator += (const io_stats &rhs) {
write_rpc_calls += rhs.write_rpc_calls;
write_rpc_time += rhs.write_rpc_time;
read_rpc_calls += rhs.read_rpc_calls;
read_rpc_time += rhs.read_rpc_time;
getfd += rhs.getfd;
server_write_calls += rhs.server_write_calls;
server_write_time += rhs.server_write_time;
bytes_written += rhs.bytes_written;
server_read_calls += rhs.server_read_calls;
server_read_time += rhs.server_read_time;
bytes_read += rhs.bytes_read;
write_bulk_time += rhs.write_bulk_time;
write_bulk_xfers += rhs.write_bulk_xfers;
read_bulk_time += rhs.read_bulk_time;
read_bulk_xfers += rhs.read_bulk_xfers;
write_expose += rhs.write_expose;
read_expose += rhs.read_expose;
write_response += rhs.write_response;
read_response += rhs.read_response;
mutex_time += rhs.mutex_time;
client_write_calls += rhs.client_write_calls;
client_write_time += rhs.client_write_time;
client_read_calls += rhs.client_read_calls;
client_read_time += rhs.client_read_time;
client_init_time += rhs.client_init_time;
return *this;
}
void print_server() {
std::cout << "write_rpc_calls " << write_rpc_calls
<< " write_rpc_time " << write_rpc_time
......
......@@ -38,32 +38,30 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ssg_group_id_t gid;
tl::pool pool;
abt_io_instance_id abt_id;
ssize_t blocksize=1024*8; // todo: some kind of general distribution function perhaps
std::map<std::string, file_info> filetable; // filename to file id mapping
// probably needs to be larger and registered with mercury somehow
char *buffer; // intermediate buffer for read/write operations
unsigned int bufsize;
struct io_stats stats;
static const int default_mode = 0644;
tl::mutex op_mutex;
tl::mutex stats_mutex;
tl::mutex size_mutex;
tl::mutex fd_mutex;
// server will maintain a cache of open files
// std::map not great for LRU
// if we see a request for a file with a different 'flags' we will close and reopen
int getfd(const std::string &file, int flags, int mode=default_mode) {
int fd=-1;
std::lock_guard<tl::mutex> guard(fd_mutex);
auto entry = filetable.find(file);
if (entry == filetable.end() ) {
// no 'file' in table
fd = abt_io_open(abt_id, file.c_str(), flags, mode);
if (fd > 0) filetable[file] = {fd, flags};
} else {
// found the file but we will close and reopen if different flags requested
if (entry->second.flags == flags)
// found the file but we will close and reopen if flags are different
if ( entry->second.flags == flags) {
fd = entry->second.fd;
else {
} else {
abt_io_close(abt_id, entry->second.fd);
fd = abt_io_open(abt_id, file.c_str(), flags, mode);
if (fd > 0) filetable[file] = {fd, flags};
......@@ -78,6 +76,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ssize_t process_write(const tl::request& req, tl::bulk &client_bulk, const std::string &file,
std::vector<off_t> &file_starts, std::vector<uint64_t> &file_sizes)
{
struct io_stats local_stats;
double write_time = ABT_get_wtime();
/* What else can we do with an empty memory description or file
......@@ -93,9 +92,6 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
return 0;
}
double wr_mutex_time = ABT_get_wtime();
std::lock_guard<tl::mutex> guard(op_mutex);
wr_mutex_time = ABT_get_wtime() - wr_mutex_time;
/* cannot open read-only:
- might want to data-sieve the I/O requests
- might later read file */
......@@ -103,8 +99,9 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
double getfd_time = ABT_get_wtime();
int fd = getfd(file, flags);
getfd_time = ABT_get_wtime() - getfd_time;
stats.getfd += getfd_time;
local_stats.getfd += getfd_time;
char *buffer = new char[bufsize];
// we have a scratch buffer we can work with, which might be smaller
// than whatever the client has sent our way. We will repeatedly bulk
// transfer into this region. We'll need to keep track of how many file
......@@ -121,7 +118,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
tl::endpoint ep = req.get_endpoint();
tl::bulk local = engine->expose(segments, tl::bulk_mode::read_write);
expose_time = ABT_get_wtime() - expose_time;
stats.write_expose += expose_time;
local_stats.write_expose += expose_time;
// when are we done?
// - what if the client has a really long file descripton but for some reason only a small amount of memory?
......@@ -153,8 +150,8 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
std::cerr <<" Bulk get error. Ignoring " << std::endl;
}
bulk_time = ABT_get_wtime() - bulk_time;
stats.write_bulk_xfers++;
stats.write_bulk_time += bulk_time;
local_stats.write_bulk_xfers++;
local_stats.write_bulk_time += bulk_time;
// operator overloading might make this a little hard to parse at first.
// - >> and << do a bulk transfer between bulk endpoints, transfering
......@@ -163,17 +160,15 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
// if one wants a subset
// - select a subset on the client-side bulk descriptor before
// associating it with a connection.
double pwrite_time = ABT_get_wtime();
while (file_idx < file_starts.size() && file_xfer < client_xfer) {
// we might be able to only write a partial block
nbytes = MIN(file_sizes[file_idx]-fileblk_cursor, client_xfer-buf_cursor);
file_xfer += abt_io_pwrite(abt_id, fd, buffer+buf_cursor, nbytes, file_starts[file_idx]+fileblk_cursor);
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats.server_write_calls++;
stats.bytes_written += nbytes;
}
file_xfer += abt_io_pwrite(abt_id, fd, buffer+buf_cursor, nbytes, file_starts[file_idx]+fileblk_cursor);
local_stats.server_write_calls++;
local_stats.bytes_written += nbytes;
if (nbytes + fileblk_cursor >= file_sizes[file_idx]) {
file_idx++;
......@@ -192,19 +187,26 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
client_cursor += client_xfer;
pwrite_time = ABT_get_wtime() - pwrite_time;
stats.server_write_time += pwrite_time;
local_stats.server_write_time += pwrite_time;
}
double response_time = ABT_get_wtime();
req.respond(xfered);
response_time = ABT_get_wtime() - response_time;
stats.write_response += response_time;
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats.write_rpc_calls++;
stats.write_rpc_time += ABT_get_wtime() - write_time;
stats.mutex_time += wr_mutex_time;
}
local_stats.write_response = response_time;
local_stats.write_rpc_calls++;
local_stats.write_rpc_time += ABT_get_wtime() - write_time;
local_stats.mutex_time = ABT_get_wtime();
{
/* defer updating provider-wide statistics to the end so we
* don't have to lock the stats in the middle of i/o */
std::lock_guard<tl::mutex> guard(stats_mutex);
local_stats.mutex_time = ABT_get_wtime()-local_stats.mutex_time;
stats += local_stats;
}
delete[] buffer;
return 0;
}
......@@ -216,6 +218,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ssize_t process_read(const tl::request &req, tl::bulk &client_bulk, const std::string &file,
std::vector<off_t> &file_starts, std::vector<uint64_t> &file_sizes)
{
struct io_stats local_stats;
double read_time = ABT_get_wtime();
if (client_bulk.size() == 0 ||
......@@ -227,9 +230,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
return 0;
}
double rd_mutex_time = ABT_get_wtime();
std::lock_guard<tl::mutex> guard(op_mutex);
rd_mutex_time = ABT_get_wtime() - rd_mutex_time;
char *buffer = new char[bufsize];
/* like with write, open for both read and write in case file opened
* first for read then written to. can omit O_CREAT here because
......@@ -240,7 +241,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
double getfd_time = ABT_get_wtime();
int fd = getfd(file, flags);
getfd_time = ABT_get_wtime() - getfd_time;
stats.getfd += getfd_time;
local_stats.getfd += getfd_time;
tl::endpoint ep = req.get_endpoint();
/* Simliar algorithm as write, but data movement goes in the opposite direction */
......@@ -256,11 +257,8 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
while (file_idx < file_starts.size() && file_xfer < bufsize) {
nbytes = MIN(file_sizes[file_idx]-fileblk_cursor, bufsize-buf_cursor);
file_xfer += abt_io_pread(abt_id, fd, buffer+buf_cursor, nbytes, file_starts[file_idx]+fileblk_cursor);
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats.server_read_calls++;
stats.bytes_read += nbytes;
}
local_stats.server_read_calls++;
local_stats.bytes_read += nbytes;
if (nbytes + fileblk_cursor >= file_sizes[file_idx]) {
file_idx++;
......@@ -277,7 +275,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
xfered += nbytes;
}
pread_time = ABT_get_wtime() - pread_time;
stats.server_read_time += pread_time;
local_stats.server_read_time += pread_time;
double expose_time = ABT_get_wtime();
std::vector<std::pair<void *, std::size_t>>segments(1);
......@@ -285,7 +283,7 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
segments[0].second = file_xfer;
tl::bulk local = engine->expose(segments, tl::bulk_mode::read_write);
expose_time = ABT_get_wtime() - expose_time;
stats.read_expose += expose_time;
local_stats.read_expose += expose_time;
double bulk_time = ABT_get_wtime();
if (client_bulk.size()-client_cursor != 0) {
......@@ -303,20 +301,24 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
client_cursor += client_xfer;
}
bulk_time = ABT_get_wtime() - bulk_time;
stats.read_bulk_xfers++;
stats.read_bulk_time += bulk_time;
local_stats.read_bulk_xfers++;
local_stats.read_bulk_time += bulk_time;
}
double response_time = ABT_get_wtime();
req.respond(xfered);
response_time = ABT_get_wtime() - response_time;
stats.read_response += response_time;
{
std::lock_guard<tl::mutex> guard(stats_mutex);
stats.read_rpc_calls++;
stats.read_rpc_time += ABT_get_wtime() - read_time;
stats.mutex_time += rd_mutex_time;
local_stats.read_response += response_time;
local_stats.read_rpc_calls++;
local_stats.read_rpc_time += ABT_get_wtime() - read_time;
local_stats.mutex_time = ABT_get_wtime();
{
std::lock_guard<tl::mutex> guard(stats_mutex);
local_stats.mutex_time = ABT_get_wtime()-local_stats.mutex_time;
stats+=local_stats;
}
delete[] buffer;
return 0;
}
......@@ -391,7 +393,6 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
ssg_group_id_t gid, uint16_t provider_id, int bufsize, tl::pool &pool)
: tl::provider<bv_svc_provider>(*e, provider_id), engine(e), gid(gid), pool(pool), abt_id(abtio) {
buffer = new char[bufsize];
this->bufsize = bufsize;
define("write", &bv_svc_provider::process_write, pool);
define("read", &bv_svc_provider::process_read, pool);
......@@ -417,7 +418,6 @@ struct bv_svc_provider : public tl::provider<bv_svc_provider>
~bv_svc_provider() {
wait_for_finalize();
delete[] buffer;
}
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment