Commit f7bb134b authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

ssd_vol

parent 7538cb2c
......@@ -5,7 +5,6 @@
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#include <vector>
#ifndef MAXDIM
#define MAXDIM 32
#endif
......@@ -26,7 +25,6 @@ typedef struct _thread_data_t {
} thread_data_t;
using namespace std;
// SSD related meta data
typedef struct _SSD_INFO {
double mspace_total;
......@@ -76,13 +74,17 @@ typedef struct _SAMPLE {
size_t nel; // the number of elements per sample,
} SAMPLE;
typedef struct _BATCH {
int *list;
int size;
} BATCH;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc; // number of samples per rank
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
vector<int> batch; // batch data to read
BATCH batch; // batch data to read
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
......@@ -155,10 +157,12 @@ void H5RPthreadTerminate();
***************************************/
herr_t H5DRMMF_remap(); // remap the memory mapped file to remove the cache effect
// set hyperslab selection given the sample list
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace);
void set_hyperslab_from_samples(int *samples, int nsample, hid_t *fspace);
// get the list of the samples from the filespace
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contiguous);
void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t &ldim, size_t &start);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
extern H5Dread_cache_metadata H5DRMM;
extern H5Dwrite_cache_metadata H5DWMM;
#endif //H5Dio_cache.h
......@@ -11,7 +11,11 @@ include make.inc
all: test_read_cache test_write_cache prepare_dataset
test_read_cache: test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB)
$(CXX) $(CFLAGS) -o $@ test_read_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB) -L$(HDF5_ROOT)/../vol/ -lh5passthrough_vol
test_vol: test_vol.o ../utils/debug.o
$(CXX) $(CFLAGS) -o $@ test_vol.o ../utils/debug.o $(HDF5_LIB) -Wl,-rpath,$(HDF5_ROOT)/../vol -L$(HDF5_ROOT)/../vol/ -lh5passthrough_vol
test_write_cache: test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB)
......
......@@ -65,7 +65,7 @@ int main(int argc, char **argv) {
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
printf("\ndone\n");
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
......
......@@ -40,7 +40,7 @@
#include <sys/mman.h>
#include "H5Dio_cache.h"
#include "profiling.h"
extern H5Dread_cache_metadata H5DRMM;
using namespace std;
int msleep(long miliseconds)
{
......@@ -120,11 +120,8 @@ int main(int argc, char **argv) {
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd;
if (cache)
fd = H5Fopen_cache(fname, H5F_ACC_RDONLY, plist_id);
else
fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
cout << H5DRMM.mpi.rank << " " << H5DRMM.mpi.nproc << endl;
hid_t dset;
tt.start_clock("H5Dopen");
if (cache) {
......@@ -184,7 +181,7 @@ int main(int argc, char **argv) {
mt19937 g(100);
size_t ns_loc, fs_loc; // number of sample per worker, first sample
parallel_dist(num_images, nproc, rank, ns_loc, fs_loc);
parallel_dist(num_images, nproc, rank, &ns_loc, &fs_loc);
// buffer for loading one batch of data
float *dat = new float[dim*batch_size];// buffer to store one batch of data
......@@ -204,7 +201,7 @@ int main(int argc, char **argv) {
for(int nb = 0; nb < num_batches; nb++) {
//// reading from file system to memory using H5Dread.
tt.start_clock("Select");
set_hyperslab_from_samples(&id[fs_loc + nb*batch_size], batch_size, fspace);
set_hyperslab_from_samples(&id[fs_loc + nb*batch_size], batch_size, &fspace);
tt.stop_clock("Select");
tt.start_clock("H5Dread");
if (cache) {
......@@ -242,7 +239,7 @@ int main(int argc, char **argv) {
cout << "VM: " << vm << "; RSS:" << rss << endl;
if (shuffle) ::shuffle(id.begin(), id.end(), g);
parallel_dist(num_images, nproc, (rank+e*rank_shift)%nproc, ns_loc, fs_loc);
parallel_dist(num_images, nproc, (rank+e*rank_shift)%nproc, &ns_loc, &fs_loc);
double t1 = 0.0;
for (int nb = 0; nb < num_batches; nb++) {
vector<int> b = vector<int> (id.begin() + fs_loc+nb*batch_size, id.begin() + fs_loc+(nb+1)*batch_size);
......@@ -250,7 +247,7 @@ int main(int argc, char **argv) {
if (io_node()==rank and debug_level() > 1) cout << "Batch: " << nb << endl;
double t0 = MPI_Wtime();
tt.start_clock("Select");
set_hyperslab_from_samples(&b[0], batch_size, fspace);
set_hyperslab_from_samples(&b[0], batch_size, &fspace);
tt.stop_clock("Select");
tt.start_clock("H5Dread");
if (cache) {
......
......@@ -23,15 +23,12 @@
*/
#include <pthread.h>
#include "mpi.h"
#include <iostream>
#include "stdlib.h"
#include "string.h"
#include "unistd.h"
// POSIX I/O
#include "sys/stat.h"
#include <fcntl.h>
#include <vector>
#include "H5Dio_cache.h"
// Memory map
......@@ -51,20 +48,13 @@
// Debug
#include "profiling.h"
#include "debug.h"
using namespace std;
/*
Global variables to define information related to the local storage
*/
#define MAXDIM 32
hsize_t PAGESIZE = sysconf(_SC_PAGE_SIZE);
#ifdef THETA
char *MEM_BUFFER[100000000];
#endif
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
// initialize H5DWMM data
SSD_INFO
SSD = {
......@@ -176,8 +166,11 @@ void *H5Dwrite_pthread_func(void *arg) {
enough for storing the buffer of the current task, it will wait for the
I/O thread to finsh all the previous tasks.
*/
void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id ) {
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, NULL);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
......@@ -333,14 +326,14 @@ H5DRMM = {
/*
Helper function to compute the local number of samples and the offset.
*/
void parallel_dist(size_t gdim, int nproc, int rank, size_t &ldim, size_t &start) {
ldim = gdim/nproc;
start = ldim*rank;
void parallel_dist(size_t gdim, int nproc, int rank, size_t *ldim, size_t *start) {
*ldim = gdim/nproc;
*start = *ldim*rank;
if (rank < gdim%nproc) {
ldim += 1;
start += rank;
*ldim += 1;
*start += rank;
} else {
start += gdim%nproc;
*start += gdim%nproc;
}
}
......@@ -348,9 +341,9 @@ void parallel_dist(size_t gdim, int nproc, int rank, size_t &ldim, size_t &start
/*
Given a sample list, perform hyperslab selection for filespace;
*/
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace) {
void set_hyperslab_from_samples(int *samples, int nsample, hid_t *fspace) {
static hsize_t gdims[MAXDIM], count[MAXDIM], sample[MAXDIM], offset[MAXDIM];
int ndims = H5Sget_simple_extent_dims(fspace, gdims, NULL);
int ndims = H5Sget_simple_extent_dims(*fspace, gdims, NULL);
sample[0] = 1;
count[0] = 1;
offset[0] = samples[0]; // set the offset
......@@ -359,34 +352,43 @@ void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace) {
sample[i] = gdims[i];
count[i] = 1;
}
H5Sselect_hyperslab(fspace, H5S_SELECT_SET, offset, NULL, sample, count);
H5Sselect_hyperslab(*fspace, H5S_SELECT_SET, offset, NULL, sample, count);
for(int i=1; i<nsample; i++) {
offset[0] = samples[i];
H5Sselect_hyperslab(fspace, H5S_SELECT_OR, offset, NULL, sample, count);
H5Sselect_hyperslab(*fspace, H5S_SELECT_OR, offset, NULL, sample, count);
}
}
/*
Get the indices of the samples that have been selected from filespace, and check
whether it is contiguous or not.
*/
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contig) {
void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contig) {
hssize_t numblocks = H5Sget_select_hyper_nblocks(fspace);
hsize_t gdims[MAXDIM];
int ndims = H5Sget_simple_extent_dims(fspace, gdims, NULL);
hsize_t *block_buf = new hsize_t [numblocks*2*ndims];
hsize_t *block_buf = (hsize_t*)malloc(numblocks*2*ndims*sizeof(hsize_t));
H5Sget_select_hyper_blocklist(fspace, 0, numblocks, block_buf);
samples.resize(0);
int n=0;
samples->size = 0;
for(int i=0; i<numblocks; i++) {
int start = block_buf[2*i*ndims];
int end = block_buf[2*i*ndims+ndims];
for(int j=start; j<end+1; j++) {
samples.push_back(j);
samples->size=samples->size+1;
}
}
samples->list = (int*) malloc(sizeof(int)*samples->size);
int n=0;
for(int i=0; i<numblocks; i++) {
int start = block_buf[2*i*ndims];
int end = block_buf[2*i*ndims+ndims];
for(int j=start; j<end+1; j++) {
samples->list[n] = j;
n=n+1;
}
}
contig = H5Sis_regular_hyperslab(fspace);
delete [] block_buf;
*contig = H5Sis_regular_hyperslab(fspace);
free(block_buf);
}
......@@ -396,13 +398,13 @@ void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contig
*/
void *H5Dread_pthread_func(void *args) {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while (not H5DRMM.io.dset_cached) {
if (not H5DRMM.io.batch_cached) {
while (!H5DRMM.io.dset_cached) {
if (!H5DRMM.io.batch_cached) {
char *p_mem = (char *) H5DRMM.mmap.tmp_buf;
MPI_Win_fence(MPI_MODE_NOPRECEDE, H5DRMM.mpi.win);
int batch_size = H5DRMM.dset.batch.size();
int batch_size = H5DRMM.dset.batch.size;
if (H5DRMM.dset.contig_read) {
int dest = H5DRMM.dset.batch[0];
int dest = H5DRMM.dset.batch.list[0];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Put(p_mem, H5DRMM.dset.sample.nel*batch_size,
......@@ -411,7 +413,7 @@ void *H5Dread_pthread_func(void *args) {
H5DRMM.dset.mpi_datatype, H5DRMM.mpi.win);
} else {
for(int i=0; i<batch_size; i++) {
int dest = H5DRMM.dset.batch[i];
int dest = H5DRMM.dset.batch.list[i];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Put(&p_mem[i*H5DRMM.dset.sample.size],
......@@ -422,7 +424,7 @@ void *H5Dread_pthread_func(void *args) {
}
}
MPI_Win_fence(MPI_MODE_NOSUCCEED, H5DRMM.mpi.win);
if (io_node()==H5DRMM.mpi.rank and debug_level()>2) cout << "PTHREAD DONE" << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level()>2) printf("PTHREAD DONE\n");
H5DRMM.io.batch_cached = true;
} else {
pthread_cond_signal(&H5DRMM.io.master_cond);
......@@ -443,13 +445,15 @@ void create_mmap_win(const char *prefix) {
strcpy(H5DRMM.mmap.fname, H5DRMM.ssd->path);
strcpy(H5DRMM.mmap.fname, prefix);
strcat(H5DRMM.mmap.fname, "-");
strcat(H5DRMM.mmap.fname, to_string(H5DRMM.mpi.rank).c_str());
char cc[255];
int2char(H5DRMM.mpi.rank, cc);
strcat(H5DRMM.mmap.fname, cc);
strcat(H5DRMM.mmap.fname, ".dat");
if (io_node()==H5DRMM.mpi.rank and debug_level() > 1)
cout << " Creating memory mapped files on local storage: " << H5DRMM.mmap.fname << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level() > 1)
printf(" Creating memory mapped files on local storage: %s\n", H5DRMM.mmap.fname);
int fh = open(H5DRMM.mmap.fname, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
char a = 'A';
::pwrite(fh, &a, 1, ss);
pwrite(fh, &a, 1, ss);
fsync(fh);
close(fh);
#ifdef __APPLE__
......@@ -460,8 +464,8 @@ void create_mmap_win(const char *prefix) {
H5DRMM.mmap.buf = mmap(NULL, ss, PROT_READ | PROT_WRITE, MAP_SHARED, H5DRMM.mmap.fd, 0);
msync(H5DRMM.mmap.buf, ss, MS_SYNC);
} else {
if (io_node()==H5DRMM.mpi.rank and debug_level()>1)
cout << " Allocate buffer in the memory and attached it to a MPI_Win" << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level()>1)
printf(" Allocate buffer in the memory and attached it to a MPI_Win\n");
H5DRMM.mmap.buf = malloc(ss);
}
MPI_Datatype type[1] = {MPI_BYTE};
......@@ -500,7 +504,7 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
H5DRMM.dset.esize = H5Tget_size(H5DRMM.dset.h5_datatype);
hid_t fspace = H5Dget_space(dset);
int ndims = H5Sget_simple_extent_ndims(fspace);
hsize_t *gdims = new hsize_t [ndims];
hsize_t *gdims = (hsize_t*) malloc(ndims*sizeof(hsize_t));
H5Sget_simple_extent_dims(fspace, gdims, NULL);
hsize_t dim = 1; // compute the size of a single sample
for(int i=1; i<ndims; i++) {
......@@ -509,18 +513,15 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
H5DRMM.dset.sample.nel = dim;
H5DRMM.dset.sample.dim = ndims-1;
H5DRMM.dset.ns_glob = gdims[0];
parallel_dist(gdims[0], H5DRMM.mpi.nproc, H5DRMM.mpi.rank, H5DRMM.dset.ns_loc, H5DRMM.dset.s_offset);
parallel_dist(gdims[0], H5DRMM.mpi.nproc, H5DRMM.mpi.rank, &H5DRMM.dset.ns_loc, &H5DRMM.dset.s_offset);
H5DRMM.dset.sample.size = H5DRMM.dset.esize*H5DRMM.dset.sample.nel;
H5DRMM.dset.size = H5DRMM.dset.sample.size*H5DRMM.dset.ns_loc;
if (debug_level()>1) {
cout << gdims[0] << " " << H5DRMM.mpi.rank << "/" << H5DRMM.mpi.nproc << " " << H5DRMM.dset.ns_loc << " " << H5DRMM.dset.s_offset << " " << H5DRMM.dset.size << endl;
}
double t0 = MPI_Wtime();
create_mmap_win(name);
double t1 = MPI_Wtime() - t0;
if (io_node() == H5DRMM.mpi.rank and debug_level() > 1)
cout << "Time for creating memory map files: " << t1 << " seconds" << endl;
delete [] gdims;
if (io_node() == H5DRMM.mpi.rank && debug_level() > 1)
printf("Time for creating memory map files: %f seconds\n", t1);
free(gdims);
return dset;
};
......@@ -531,15 +532,13 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
herr_t err = H5Dread(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
if (io_node()==H5DRMM.mpi.rank and debug_level()>1)
cout << " H5Dread from file system done.." << endl;
hsize_t bytes = get_buf_size(mem_space_id, mem_type_id);
double t0 = MPI_Wtime();
H5RPthreadWait();// notice that the first batch it will not wait
get_samples_from_filespace(file_space_id, H5DRMM.dset.batch, H5DRMM.dset.contig_read);
get_samples_from_filespace(file_space_id, &H5DRMM.dset.batch, &H5DRMM.dset.contig_read);
double t1 = MPI_Wtime() - t0;
if (io_node()==H5DRMM.mpi.rank and debug_level() > 1) {
cout << "H5PthreadWait time: " << t1 << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level() > 1) {
printf("H5PthreadWait time: %f seconds\n", t1);
}
free(H5DRMM.mmap.tmp_buf);
// copy the buffer
......@@ -547,8 +546,8 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
t0 = MPI_Wtime();
memcpy(H5DRMM.mmap.tmp_buf, dat, bytes);
t1 = MPI_Wtime();
if (io_node()==H5DRMM.mpi.rank and debug_level() > 1) {
cout << "H5Dread_cache memcpy: " << t1 - t0 << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level() > 1) {
printf("H5Dread_cache memcpy: %f\n", t1-t0);
}
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = false;
......@@ -564,19 +563,19 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
if (io_node()==H5DRMM.mpi.rank and debug_level()>1) {
cout << "Reading data from cache (H5Dread_from_cache)" << endl;
if (io_node()==H5DRMM.mpi.rank && debug_level()>1) {
printf("Reading data from cache (H5Dread_from_cache)\n");
}
bool contig = false;
vector<int> b;
BATCH b;
H5RPthreadWait();
get_samples_from_filespace(file_space_id, b, contig);
get_samples_from_filespace(file_space_id, &b, &contig);
MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOPRECEDE, H5DRMM.mpi.win);
char *p_mem = (char *) dat;
int batch_size = b.size();
if (not contig) {
int batch_size = b.size;
if (!contig) {
for(int i=0; i< batch_size; i++) {
int dest = b[i];
int dest = b.list[i];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Get(&p_mem[i*H5DRMM.dset.sample.size],
......@@ -586,7 +585,7 @@ herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
H5DRMM.dset.mpi_datatype, H5DRMM.mpi.win);
}
} else {
int dest = b[0];
int dest = b.list[0];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Get(p_mem, H5DRMM.dset.sample.nel*batch_size,
......@@ -648,7 +647,7 @@ void H5RPthreadTerminate() {
*/
void H5RPthreadWait() {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while(not H5DRMM.io.batch_cached and not H5DRMM.io.dset_cached) {
while((!H5DRMM.io.batch_cached) && (!H5DRMM.io.dset_cached)) {
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_cond_wait(&H5DRMM.io.master_cond, &H5DRMM.io.request_lock);
}
......
/*
This is the header files for node local storage incorporated HDF5
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform parallel write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset; // offset in memory mapped file on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
// SSD related meta data
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
double mspace_per_rank_total;
double mspace_per_rank_left;
char path[255];
hsize_t offset;
} SSD_INFO;
// MPI infos
typedef struct _MPI_INFO {
int rank;
int nproc;
int local_rank; // rank id in the local communicator
int ppn; // number or processors in the
MPI_Comm comm; // global communicator
MPI_Comm node_comm; // node local communicator
MPI_Win win;
} MPI_INFO;
// I/O threads
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
int num_request; // for parallel write
thread_data_t *request_list, *current_request, *first_request; // task queue
bool batch_cached; // for parallel read, -- whether the batch data is cached to SSD or not
bool dset_cached; // whether the entire dataset is cached to SSD or not.
pthread_t pthread;
} IO_THREAD;
// Memory mapped files
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf; // temporally buffer, used for parallel read: copy the read buffer, return the H5Dread_to_cache function, the back ground thread write the data to the SSD.
} MMAP;
// Dataset
typedef struct _SAMPLE {
size_t dim; // the number of dimension
size_t size; // the size of the sample in bytes.
size_t nel; // the number of elements per sample,
} SAMPLE;
typedef struct _BATCH {
int *list;
int size;
} BATCH;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc; // number of samples per rank
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
BATCH batch; // batch data to read
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
size_t esize; // the size of an element in bytes.
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
} H5Dwrite_cache_metadata;
typedef struct _H5Dread_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
DSET dset;
SSD_INFO *ssd;
} H5Dread_cache_metadata;
/**************************************
* Function APIs for parallel write *
**************************************/
// Create HDF5 file: create memory mapped file on the SSD
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
// Close HDF5 file: clean up the memory mapped file
herr_t H5Fclose_cache( hid_t file_id );
// The main program write the dataset, and the I/O thread perform the data migration
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
// close the dataset, property list, etc: check whether all the tasks have been finished or not.
herr_t H5Dclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
herr_t H5Pclose_cache( hid_t id);
/****************************************
* Function APIs for Parallel read *
****************************************/
//
hid_t H5Fopen_cache(const char *name, hid_t fcpl_id, hid_t fapl_id);
// Open the dataset, create memory mapped file
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
// Reading dataset (one batch), and then the I/O thread write them to the SSDs
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// Reading dataset (one batch) from the SSDs
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// close the dataset
herr_t H5Dclose_cache_read(hid_t dset);
/****************************************
* I/O thread sync functions *
****************************************/
// waiting for the write (or read) I/O thread to finish the work
void H5WPthreadWait();
void H5RPthreadWait();
// terminate (join) the write (or read) I/O thread after finish all the tasks
void H5WPthreadTerminate();
void H5RPthreadTerminate();
/***************************************
* Other utils functions *
***************************************/
herr_t H5DRMMF_remap(); // remap the memory mapped file to remove the cache effect
// set hyperslab selection given the sample list
void set_hyperslab_from_samples(int *samples, int nsample, hid_t *fspace);
// get the list of the samples from the filespace
void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);