Commit 372d96de authored by Kaiyuan Hou's avatar Kaiyuan Hou
Browse files

merge master

parents 23be2f94 b7dc019b
......@@ -14,7 +14,8 @@ lib_LTLIBRARIES = lib/libmochio-client.la \
lib_libmochio_client_la_SOURCES = src/client/mochio-client.cc \
src/client/calc-request.cc
lib_libmochio_provider_la_SOURCES = src/provider/mochio-provider.cc
lib_libmochio_provider_la_SOURCES = src/provider/mochio-provider.cc \
src/provider/lustre-utils.c
include_HEADERS = include/mochio.h \
include/mochio-provider.h
......@@ -23,7 +24,11 @@ noinst_HEADERS = include/io_stats.h \
include/access.h \
include/common.h \
include/calc-request.h \
include/file_stats.h
include/file_stats.h \
include/lustre-utils.h
TESTS = tests/simple \
tests/null
SUBDIRS = tests
......@@ -36,4 +41,5 @@ SUBDIRS = tests
#tests_strided_LDADD = lib/libmochio-client.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/mochio.pc
pkgconfig_DATA = maint/mochio-provider.pc \
maint/mochio-client.pc
......@@ -68,5 +68,5 @@ AC_CHECK_HEADERS([lustre/lustreapi.h])
AM_EXTRA_RECURSIVE_TARGETS([tests])
AC_CONFIG_FILES([Makefile tests/Makefile maint/mochio.pc])
AC_CONFIG_FILES([Makefile tests/Makefile maint/mochio-client.pc maint/mochio-provider.pc])
AC_OUTPUT
# Running on Cray
Argonne's Theta sysem is a Cray XC40. It uses 'aprun' to launch jobs.
As long as you have requested enough nodes, you can launch as many 'aprun'
processes as you want. However, as a security precaution those processes
cannot talk to each other without first setting up a "protection domain"
## managing protection domains
Other projects have done a good job documenting protection domains. In
particular I found
https://github.com/ovis-hpc/ovis/wiki/Protection-Domain-Tags-(Cray-Specific)
helpful. Here are some areas where Theta differs from other sites.
One manages protection domains with utiliites like `apstat` and `apmgr`. On
theta those utilities are not available on the login nodes. They are only
available on the monitor nodes (the nodes from which your job scripts are run,
or where your interactive shell lands).
Protection domains persist until someone explicitly releases them. While
convienent in many cases, and necessary if one submission script starts a
server and another starts a client, it does mean that the protection domains
can end up hanging around longer than expected. You might get an error like
this:
apmgr pdomain create mochio-test failed: cannot allocate protection domain; reached limit of 10
No way around that except to contact the support desk and ask for help cleaning up old ones.
## Building MPICH
Like any other external library, the 'mochio' driver (
https://github.com/roblatham00/mpich/tree/mochio
) requires CPPFLAGS, LDFLAGS, and LIBS to be set. Furthermore, builidng on
the Cray requires a few extra steps, documented here:
https://wiki.mpich.org/mpich/index.php/Cray
# MOCHIO and MPICH
- check out https://github.com/roblatham00/mpich/tree/mochio
- set CPPFLAGS and LDFLAGS to point to mochio installation
- add 'mochio' to the `--with-file-system` list: e.g. --with-file-system=lustre+mochio
- Start however many MOCHIO targets you want.
- set the environemnt variable MOCHIO_STATEFILE to point to the server statefile.
- if you want extra information about mochio behavior, set the MOCHIO_SHOW_STATS
......@@ -21,15 +21,15 @@ struct access {
std::vector<uint64_t> len;
void print() {
std::cout << "mem: ";
std::cout << " mem: ";
for (auto x: mem_vec)
std::cout << x.first << " " << x.second << " ";
std::cout << std::endl;
std::cout << "file offset: ";
std::cout << " file offset: ";
for (auto x: offset)
std::cout << x << " ";
std::cout << std::endl;
std::cout << "file length: ";
std::cout << " file length: ";
for (auto x: len)
std::cout << x << " ";
std::cout << std::endl;
......
......@@ -31,7 +31,7 @@ void compute_striping_info(int stripe_size, int stripe_count, int *server_count,
* returns: 0 on sucess, non-zero on failure
*
**/
int calc_requests(int iovec_count, const struct iovec *memvec,
int calc_requests(int mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
int file_count, const off_t *file_starts, const uint64_t *file_sizes,
int stripe_size, int targets_used,
std::vector<struct access> & my_reqs);
......@@ -2,7 +2,7 @@
class file_stats {
public:
file_stats() : blocksize(4096), stripe_size(4096), stripe_count(1) {};
file_stats() : stripe_size(4096), stripe_count(1), blocksize(4096) {};
template<typename A> void serialize(A &ar) {
ar & stripe_size;
......
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
int lustre_getstripe(const char * filename, int32_t *stripe_size, int32_t *stripe_count);
#ifdef __cplusplus
}
#endif
......@@ -3,7 +3,6 @@
#include <stdint.h>
#include <unistd.h>
#include <sys/uio.h>
#include <aio.h>
#include <mpi.h>
......@@ -27,25 +26,24 @@ mochio_client_t mochio_init(MPI_Comm comm, const char * ssg_statefile);
int mochio_finalize(mochio_client_t client);
/* stateless api: always pass in a file name? */
/* an iovec describes memory. less well suited for I/O
* - Better to have four arrays (memory offset, mem length, file offset, file length)
* - or use a 'struct iovec' for the memory parts? */
ssize_t mochio_write(mochio_client_t client,
const char *file,
int64_t iovcnt,
const struct iovec iov[],
int64_t file_count,
const int64_t mem_count,
const char *mem_addresses[],
const uint64_t mem_sizes[],
const int64_t file_count,
const off_t file_starts[],
uint64_t file_sizes[]);
const uint64_t file_sizes[]);
ssize_t mochio_read (mochio_client_t client,
const char *file,
int64_t iovcnt,
const struct iovec iov[],
int64_t file_count,
const int64_t mem_count,
const char *mem_addresses[],
const uint64_t mem_sizes[],
const int64_t file_count,
const off_t file_starts[],
uint64_t file_sizes[]);
const uint64_t file_sizes[]);
/*
......@@ -65,7 +63,12 @@ struct mochio_stats {
};
int mochio_stat(mochio_client_t client, const char *filename, struct mochio_stats *stats);
int mochio_statistics(mochio_client_t client);
/**
* if `show_server` set, statistics will also include information from every remote target
* Use case: at end of MPI job, every process will want to show client statistis
* but only one process will want to show the server information
*/
int mochio_statistics(mochio_client_t client, int show_server);
/* flush: request all cached data written to disk */
int mochio_flush(mochio_client_t client, const char *filename);
......
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: MOCHIO
Description: A mochiservice for I/O: client-side library
URL: https://xgitlab.cels.anl.gov/sds/mochio
Version: 0.0.1
Requires: thallium abt-io margo
Libs: -L${libdir} -lmochio-client
Cflags: -I${includedir}
......@@ -4,8 +4,9 @@ libdir=@libdir@
includedir=@includedir@
Name: MOCHIO
Description: A mochi-service for I/O
Description: A mochi service for I/O: library for provider-side
URL: https://xgitlab.cels.anl.gov/sds/mochio
Version: 0.0.1
Requires: thallium abt-io margo
Libs: -L${libdir} -lmochio-provider
Cflags: -I${includedir}
......@@ -11,7 +11,6 @@
#include "common.h"
#include "access.h"
#include "sys/uio.h"
#include "calc-request.h"
......@@ -87,7 +86,7 @@ int calc_aggregator(off_t off, uint64_t * len,
int stripe_size, int server_count)
{
int rank_index, rank;
int64_t avail_bytes;
uint64_t avail_bytes;
int avail_cb_nodes = server_count;
/* Produce the stripe-contiguous pattern for Lustre */
......@@ -126,7 +125,7 @@ int calc_aggregator(off_t off, uint64_t * len,
* returns: 0 on sucess, non-zero on failure
*
**/
int calc_requests(int iovec_count, const struct iovec *memvec,
int calc_requests(int mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
int file_count, const off_t *file_starts, const uint64_t *file_sizes,
int stripe_size, int targets_used,
std::vector<struct access> & my_reqs)
......@@ -136,12 +135,20 @@ int calc_requests(int iovec_count, const struct iovec *memvec,
// how much of a memory/file block is left to process
u_int64_t memblk_used=0, fileblk_used=0;
while (memblk < iovec_count && fileblk < file_count) {
while (memblk < mem_count && fileblk < file_count) {
uint64_t len;
int target;
char *addr;
off_t offset;
if (mem_sizes[memblk] == 0) {
memblk++;
continue;
}
if (file_sizes[fileblk] == 0) {
fileblk++;
continue;
}
// for each file block we might split it into smaller pieces based on
// - which target handles this offset and how much of the data goes there
// - a memory block that might be smaller still
......@@ -152,13 +159,17 @@ int calc_requests(int iovec_count, const struct iovec *memvec,
// may have to further reduce 'len' if the corresponding memory block
// is smaller
len = MIN(memvec[memblk].iov_len - memblk_used, len);
addr = (char *)memvec[memblk].iov_base + memblk_used;
len = MIN(mem_sizes[memblk] - memblk_used, len);
addr = (char *)mem_addresses[memblk] + memblk_used;
offset = file_starts[fileblk] + fileblk_used;
my_reqs[target].mem_vec.push_back(std::make_pair(addr, len));
my_reqs[target].offset.push_back(offset);
my_reqs[target].len.push_back(len);
#ifdef DUMP_REQUESTS
printf(" target: %d address: %p len %lld offset: %ld\n",
target, addr, len, offset);
#endif
/* a bunch of bookeeping for the next time through: do we need to work
* on the next memory or file block? how much of current block is left
......@@ -166,13 +177,14 @@ int calc_requests(int iovec_count, const struct iovec *memvec,
memblk_used += len;
fileblk_used += len;
if (memblk_used >= memvec[memblk].iov_len) {
if (memblk_used >= mem_sizes[memblk]) {
memblk++;
memblk_used = 0;
}
if ((int64_t)fileblk_used >= file_sizes[fileblk]) {
if (fileblk_used >= file_sizes[fileblk]) {
fileblk++;
fileblk_used = 0;
}
}
return 0;
}
......@@ -62,11 +62,13 @@ mochio_client_t mochio_init(MPI_Comm comm, const char * cfg_file)
struct mochio_client * client = (struct mochio_client *)calloc(1,sizeof(*client));
char *ssg_group_buf;
double init_time = ABT_get_wtime();
MPI_Comm dupcomm;
/* scalable read-and-broadcast of group information: only one process reads
* cfg from file system. These routines can all be called before ssg_init */
MPI_Comm_rank(comm, &rank);
MPI_Comm_dup(comm, &dupcomm);
MPI_Comm_rank(dupcomm, &rank);
uint64_t ssg_serialize_size;
ssg_group_buf = (char *) malloc(1024); // how big do these get?
if (rank == 0) {
......@@ -74,8 +76,9 @@ mochio_client_t mochio_init(MPI_Comm comm, const char * cfg_file)
assert (ret == SSG_SUCCESS);
ssg_group_id_serialize(client->gid, &ssg_group_buf, &ssg_serialize_size);
}
MPI_Bcast(&ssg_serialize_size, 1, MPI_UINT64_T, 0, comm);
MPI_Bcast(ssg_group_buf, ssg_serialize_size, MPI_CHAR, 0, comm);
MPI_Bcast(&ssg_serialize_size, 1, MPI_UINT64_T, 0, dupcomm);
MPI_Bcast(ssg_group_buf, ssg_serialize_size, MPI_CHAR, 0, dupcomm);
MPI_Comm_free(&dupcomm);
ssg_group_id_deserialize(ssg_group_buf, ssg_serialize_size, &(client->gid));
addr_str = ssg_group_id_get_addr_str(client->gid);
if (set_proto_from_addr(client, addr_str) != 0) return NULL;
......@@ -103,16 +106,16 @@ mochio_client_t mochio_init(MPI_Comm comm, const char * cfg_file)
client->flush_op = client->engine->define("flush");
client->statistics_op = client->engine->define("statistics");
struct mochio_stats stats;
// TODO: might want to be able to set distribution on a per-file basis
mochio_stat(client, "/dev/null", &stats);
client->blocksize = stats.blocksize;
// fake some lustre information for now until we add that to the RPC
client->stripe_size=4092;
client->stripe_count=54;
/* used to think the server would know something about how it wanted to
* distribute data, but now that's probably best handled on a per-file
* basis. Pick some reasonable defaults, but don't talk to server. */
client->blocksize = 4096;
client->stripe_size= 4096;
client->stripe_count=1;
free(addr_str);
free(ssg_group_buf);
client->statistics.client_init_time = ABT_get_wtime() - init_time;
return client;
......@@ -147,14 +150,15 @@ int mochio_finalize(mochio_client_t client)
// - are the file lists monotonically non-decreasing? Any benefit if we relax that requirement?
static size_t mochio_io(mochio_client_t client, const char *filename, io_kind op,
int64_t iovcnt, const struct iovec iovec_iov[],
int64_t file_count, const off_t file_starts[], uint64_t file_sizes[])
const int64_t mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
const int64_t file_count, const off_t file_starts[], const uint64_t file_sizes[])
{
std::vector<struct access> my_reqs(client->targets.size());
size_t bytes_moved = 0;
/* need to move this out of the I/O path. Maybe 'mochio_stat' can cache
* these values on the client struct? */
/* How expensive is this? do we need to move this out of the I/O path?
* Maybe 'mochio_stat' can cache these values on the client struct? */
client->targets_used = client->targets.size();
compute_striping_info(client->stripe_size, client->stripe_count, &client->targets_used, 1);
/* two steps:
......@@ -163,7 +167,8 @@ static size_t mochio_io(mochio_client_t client, const char *filename, io_kind op
* or file block across multiple targets.
* - second, for each target construct a bulk description for memory and
* send the file offset/length pairs in its bin. */
calc_requests(iovcnt, iovec_iov, file_count, file_starts, file_sizes, client->stripe_size, client->targets_used, my_reqs);
calc_requests(mem_count, mem_addresses, mem_sizes,
file_count, file_starts, file_sizes, client->stripe_size, client->targets_used, my_reqs);
tl::bulk myBulk;
auto mode = tl::bulk_mode::read_only;
......@@ -175,7 +180,7 @@ static size_t mochio_io(mochio_client_t client, const char *filename, io_kind op
}
std::vector<tl::async_response> responses;
for (int i=0; i< client->targets.size(); i++) {
for (unsigned int i=0; i< client->targets.size(); i++) {
if (my_reqs[i].mem_vec.size() == 0) continue; // no work for this target
myBulk = client->engine->expose(my_reqs[i].mem_vec, mode);
......@@ -191,14 +196,16 @@ static size_t mochio_io(mochio_client_t client, const char *filename, io_kind op
return bytes_moved;
}
ssize_t mochio_write(mochio_client_t client, const char *filename, int64_t iovcnt, const struct iovec iov[],
int64_t file_count, const off_t file_starts[], uint64_t file_sizes[])
ssize_t mochio_write(mochio_client_t client, const char *filename,
const int64_t mem_count, const char * mem_addresses[], const uint64_t mem_sizes[],
const int64_t file_count, const off_t file_starts[], const uint64_t file_sizes[])
{
ssize_t ret;
double write_time = ABT_get_wtime();
client->statistics.client_write_calls++;
ret = mochio_io(client, filename, MOCHIO_WRITE, iovcnt, iov, file_count, file_starts, file_sizes);
ret = mochio_io(client, filename, MOCHIO_WRITE, mem_count, mem_addresses, mem_sizes,
file_count, file_starts, file_sizes);
write_time = ABT_get_wtime() - write_time;
client->statistics.client_write_time += write_time;
......@@ -206,14 +213,16 @@ ssize_t mochio_write(mochio_client_t client, const char *filename, int64_t iovcn
return ret;
}
ssize_t mochio_read(mochio_client_t client, const char *filename, int64_t iovcnt, const struct iovec iov[],
int64_t file_count, const off_t file_starts[], uint64_t file_sizes[])
ssize_t mochio_read(mochio_client_t client, const char *filename,
const int64_t mem_count, const char *mem_addresses[], const uint64_t mem_sizes[],
const int64_t file_count, const off_t file_starts[], const uint64_t file_sizes[])
{
ssize_t ret;
double read_time = ABT_get_wtime();
client->statistics.client_read_calls++;
ret = mochio_io(client, filename, MOCHIO_READ, iovcnt, iov, file_count, file_starts, file_sizes);
ret = mochio_io(client, filename, MOCHIO_READ, mem_count, mem_addresses, mem_sizes,
file_count, file_starts, file_sizes);
read_time = ABT_get_wtime() - read_time;
client->statistics.client_read_time += read_time;
......@@ -224,20 +233,27 @@ ssize_t mochio_read(mochio_client_t client, const char *filename, int64_t iovcnt
int mochio_stat(mochio_client_t client, const char *filename, struct mochio_stats *stats)
{
struct file_stats response = client->stat_op.on(client->targets[0])(std::string(filename));
stats->blocksize = client->stat_op.on(client->targets[0])(std::string(filename) );
stats->blocksize = response.blocksize;
stats->stripe_size = response.stripe_size;
stats->stripe_count = response.stripe_count;
/* also update client information. This should probably be a 'map' keyed on file name */
client->blocksize = response.blocksize;
client->stripe_size = response.stripe_size;
client->stripe_count = response.stripe_count;
return(1);
}
int mochio_statistics(mochio_client_t client)
int mochio_statistics(mochio_client_t client, int show_server)
{
int ret =0;
for (auto target : client->targets) {
auto s = client->statistics_op.on(target)();
std::cout << "SERVER: ";
io_stats(s).print_server();
if (show_server) {
for (auto target : client->targets) {
auto s = client->statistics_op.on(target)();
std::cout << "SERVER: ";
io_stats(s).print_server();
}
}
std::cout << "CLIENT: ";
client->statistics.print_client();
......
#include "mochio-config.h"
#include <stdlib.h>
#include <errno.h>
#include <libgen.h>
#include "common.h"
#ifdef HAVE_LUSTRE_LUSTREAPI_H
#include <lustre/lustreapi.h>
#endif
#ifdef HAVE_LIBLUSTREAPI
/* there are two(!!) kinds of data the lustre library might return, so we need
* something big enough to hold either one */
static size_t get_lumsize()
{
int v1, v3;
v1 = sizeof(struct lov_user_md_v1) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data_v1);
v3 = sizeof(struct lov_user_md_v3) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data_v1);
return MAX(v1, v3);
}
#endif
int lustre_getstripe(const char * filename, int32_t *stripe_size, int32_t *stripe_count)
{
int ret = 0;
/* guess some reasonable defaults for non-lustre systems */
*stripe_size = 4096;
*stripe_count = 1;
#ifdef HAVE_LIBLUSTREAPI
struct lov_user_md *lov;
lov = alloca(get_lumsize());
/* - maybe the file wasn't there?
* - check parent directory
* - maybe the file is not lustre? */
ret = llapi_file_get_stripe(filename, lov);
switch(errno) {
char *dup, *parent;
case ENOENT:
dup = strdup(filename);
parent = dirname(dup);
ret = llapi_file_get_stripe(parent, lov);
/* if still enoent, we give up */
free(dup);
if (errno == ENOENT)
goto fn_exit;
break;
case ENOTTY:
ret = 0;
goto fn_exit;
default:
if (ret != 0) perror("Unable to get Lustre stripe info");
};
*stripe_size = lov->lmm_stripe_size;
*stripe_count = lov->lmm_stripe_count;
#endif
fn_exit:
return ret;
}
......@@ -2,6 +2,9 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <libgen.h>
#include <string.h>
#include <margo.h>
#include <thallium.hpp>
......@@ -12,7 +15,6 @@
#include <thallium/serialization/stl/string.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include "mochio-config.h"
#include "mochio-provider.h"
......@@ -20,33 +22,12 @@
#include "io_stats.h"
#include "file_stats.h"
#include "lustre-utils.h"
namespace tl = thallium;
#define BUFSIZE 1024
#ifdef HAVE_LUSTRE_LUSTREAPI_H
#include <lustre/lustreapi.h>
static inline int maxint(int a, int b)
{
return a > b ? a : b;
}
static void *alloc_lum()
{
int v1, v3;
v1 = sizeof(struct lov_user_md_v1) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data_v1);
v3 = sizeof(struct lov_user_md_v3) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data_v1);
return malloc(maxint(v1, v3));
}
#endif
struct mochio_svc_provider : public tl::provider<mochio_svc_provider>
{
......@@ -83,9 +64,11 @@ struct mochio_svc_provider : public tl::provider<mochio_svc_provider>
std::vector<off_t> &file_starts, std::vector<uint64_t> &file_sizes)
{
double write_time = ABT_get_wtime();
/* What else can we do with an empty memory description or file
description other than return immediately? */
if (client_bulk.size() == 0 ||
if (client_bulk.is_null() ||
client_bulk.size() == 0 ||
file_starts.size() == 0) {
req.respond(0);
write_time = ABT_get_wtime() - write_time;
......@@ -108,7 +91,7 @@ struct mochio_svc_provider : public tl::provider<mochio_svc_provider>
// than whatever the client has sent our way. We will repeatedly bulk
// transfer into this region. We'll need to keep track of how many file
// offset/length pairs we have processed and how far into them we are.
// Code is going to start looking a lot like mochio...
// Code is going to start looking a lot like ROMIO...
//
// TODO: configurable how many segments at a time we can process
// ?? is there a way to get all of them?
......@@ -135,7 +118,13 @@ struct mochio_svc_provider : public tl::provider<mochio_svc_provider>
// the '>>' operator moves bytes from one bulk descriptor to the
// other, moving the smaller of the two
file_xfer = 0;
try {
client_xfer = client_bulk(client_cursor, client_bulk.size()-client_cursor).on(ep) >> local;