Commit d5a75058 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

thread

parent ac306392
......@@ -137,7 +137,7 @@ void create_mmap(char *path, H5Dio_mmap &f) {
::pwrite(fh, &a, 1, ss);
fsync(fh);
close(fh);
f.fd = open(f.filename, O_RDWR);
f.fd = open(f.filename, O_RDWR | O_DIRECT);
f.buf = mmap(NULL, ss, PROT_READ | PROT_WRITE, MAP_SHARED, f.fd, 0);
msync(f.buf, ss, MS_SYNC);
} else {
......@@ -282,10 +282,10 @@ herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
int dest = b[i];
int src = dest/H5DMM.ns_loc;
MPI_Aint offset = (dest%H5DMM.ns_loc)*H5DMM.dim;
if (src==H5DMM.rank)
memcpy(&p_mem[i*H5DMM.dim*H5DMM.disp], &p_mmap[offset*H5DMM.disp],
H5DMM.dim*H5DMM.disp);
else
// if (src==H5DMM.rank)
//memcpy(&p_mem[i*H5DMM.dim*H5DMM.disp], &p_mmap[offset*H5DMM.disp],
//H5DMM.dim*H5DMM.disp);
//else
MPI_Get(&p_mem[i*H5DMM.dim*H5DMM.disp], H5DMM.dim,
H5DMM.buf_type, src,
offset, H5DMM.dim,
......@@ -295,10 +295,10 @@ herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
int dest = b[0];
int src = dest/H5DMM.ns_loc;
MPI_Aint offset = (dest%H5DMM.ns_loc)*H5DMM.dim;
if (src==H5DMM.rank)
memcpy(p_mem, &p_mmap[offset*H5DMM.disp], batch_size*H5DMM.dim*H5DMM.disp);
else
MPI_Get(p_mem, H5DMM.dim*batch_size,
//if (src==H5DMM.rank)
//memcpy(p_mem, &p_mmap[offset*H5DMM.disp], batch_size*H5DMM.dim*H5DMM.disp);
//else
MPI_Get(p_mem, H5DMM.dim*batch_size,
H5DMM.buf_type, src,
offset, H5DMM.dim*batch_size,
H5DMM.buf_type, H5DMM.win);
......@@ -311,6 +311,26 @@ herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
/*
Close the dataset, terminate the Pthread
*/
herr_t H5DMMF_clear_cache() {
H5PthreadWait();
hsize_t ss = (H5DMM.size/PAGESIZE+1)*PAGESIZE;
char fname[255];
if (strcmp("MEMORY", getenv("SSD_CACHE_PATH"))!=0) {
strcpy(fname, getenv("SSD_CACHE_PATH"));
strcat(fname, to_string(H5DMM.rank).c_str());
strcat(fname, ".dat");
int fd = open(fname, O_RDWR);
double *data = new double [1024576];
for (int i=0; i<10000; i++)
pwrite(fd, data, sizeof(double)*1024576, i*sizeof(double)*1024576);
H5DMM.buf = mmap(NULL, ss, PROT_READ, MAP_SHARED, H5DMM.fd, 0);
msync(H5DMM.buf, ss, MS_SYNC);
}
return 0;
}
herr_t H5DMMF_remap() {
H5PthreadWait();
hsize_t ss = (H5DMM.size/PAGESIZE+1)*PAGESIZE;
......@@ -324,7 +344,6 @@ herr_t H5DMMF_remap() {
#endif
H5DMM.buf = mmap(NULL, ss, PROT_READ, MAP_SHARED, H5DMM.fd, 0);
msync(H5DMM.buf, ss, MS_SYNC);
// MPI_Win_create(H5DMM.buf, ss, H5DMM.disp, MPI_INFO_NULL, MPI_COMM_WORLD, &H5DMM.win);
}
return 0;
}
......
......@@ -30,7 +30,7 @@ typedef struct _H5Dio_mmap {
void *buf; // buf mapped to the file on the local storage
void *tmp_buf; // memory copy of the data
MPI_Datatype buf_type; // we construct a MPI datatype to represent hid_t with the same size
MPI_Win win;
MPI_Win win;
MPI_Comm comm;
char filename[255]; // filename of the file on the local storage
vector<int> batch_list;
......
......@@ -11,52 +11,81 @@
#include "mpi.h"
using namespace std;
int main(int argc, char **argv) {
Timing tt;
int dim = 1024576;
int nbatch = 1024;
int niter = 10;
double dat[dim];
double vm, rss;
char path[255] = "./";
bool write = false;
for (int i=1; i<argc; i++) {
if (strcmp(argv[i], "--dim")==0) {
dim = int(atof(argv[i+1])); i=i+2;
dim = int(atof(argv[i+1])); i=i+1;
} else if (strcmp(argv[i], "--nbatch")==0) {
nbatch = int(atof(argv[i+1])); i=i+2;
nbatch = int(atof(argv[i+1])); i=i+1;
} else if (strcmp(argv[i], "--niter")==0) {
niter = int(atof(argv[i+1])); i=i+2;
niter = int(atof(argv[i+1])); i=i+1;
} else if (strcmp(argv[i], "--path")==0) {
strcpy(path, argv[i+1]); i=i+1;
} else if (strcmp(argv[i], "--write")==0) {
write = true;
}
}
cout << "dim: " << dim << endl;
cout << "nbatch: " << nbatch << endl;
cout << "size of file [MiB]: " << dim*nbatch*sizeof(double)/1024/1024 << endl;
cout << "niter: " << niter << endl;
for(int it =0; it < niter; it++) {
dat[0] = it;
tt.start_clock("write");
char fname[255] = "cache.dat";
strcat(fname, to_string(it).c_str());
int fd = open(fname, O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR);
for(int i=0; i<nbatch; i++) {
pwrite(fd, dat, sizeof(double)*dim, sizeof(double)*i*dim);
MPI_Init(&argc, &argv);
int rank, nproc;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
Timing tt(rank==0);
if (rank==0) {
cout << "dim: " << dim << endl;
cout << "nbatch: " << nbatch << endl;
cout << "size of file [MiB]: " << dim*nbatch*sizeof(double)/1024/1024 << endl;
cout << "niter: " << niter << endl;
cout << "path: " << path << endl;
cout << "nproc: " << nproc << endl;
}
if (write) {
for(int it =0; it < niter; it++) {
dat[0] = it;
char fname[255];
strcpy(fname, path);
strcat(fname, "/cache.dat");
strcat(fname, to_string(rank).c_str());
strcat(fname, "-");
strcat(fname, to_string(it).c_str());
tt.start_clock("write");
MPI_Barrier(MPI_COMM_WORLD);
int fd = open(fname, O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR);
for(int i=0; i<nbatch; i++) {
pwrite(fd, dat, sizeof(double)*dim, sizeof(double)*i*dim);
}
MPI_Barrier(MPI_COMM_WORLD);
tt.stop_clock("write");
fsync(fd);
if (rank==0) cout << "Write rate: " << sizeof(double)*dim*nbatch/tt["write"].t_iter[it]/1024/1024*nproc << endl;
close(fd);
}
tt.stop_clock("write");
fsync(fd);
cout << "Write rate: " << sizeof(double)*dim*nbatch/tt["write"].t_iter[it]/1024/1024 << endl;
close(fd);
}
for(int it =0; it < niter; it++) {
char fname[255] = "cache.dat";
char fname[255];
strcpy(fname, path);
strcat(fname, "/cache.dat");
strcat(fname, to_string(rank).c_str());
strcat(fname, "-");
strcat(fname, to_string(it).c_str());
MPI_Barrier(MPI_COMM_WORLD);
int fd = open(fname, O_RDONLY);
tt.start_clock("read");
tt.start_clock("read");
for(int i=0; i<nbatch; i++) {
pread(fd, dat, sizeof(double)*dim, sizeof(double)*i*dim);
}
MPI_Barrier(MPI_COMM_WORLD);
tt.stop_clock("read");
cout << "Read rate: " << sizeof(double)*dim*nbatch/tt["read"].t_iter[it]/1024/1024 << endl;
if (rank==0) cout << "Read rate: " << sizeof(double)*dim*nbatch/tt["read"].t_iter[it]/1024/1024*nproc << endl;
close(fd);
remove(fname);
//remove(fname);
}
MPI_Finalize();
return 0;
}
......@@ -74,8 +74,8 @@ int main(int argc, char **argv) {
hsize_t oned = d1*d2;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Info info = MPI_INFO_NULL;
int rank, nproc;
MPI_Init(&argc, &argv);
int rank, nproc, provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
Timing tt(rank==io_node());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment