Commit ed028c2c authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

clean up the codes

parent dfb153ba
......@@ -71,7 +71,7 @@ SSD = {
.mspace_total = 137438953472,
.mspace_left = 137438953472,
.offset = 0
};
};
H5Dwrite_cache_metadata
H5DWMM = {
.mpi.ppn = 1, // number of proc per node
......@@ -141,10 +141,12 @@ void *H5Dwrite_pthread_func(void *arg) {
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, H5DWMM.mmap.fd, data->offset);
msync(data->buf, data->size, MS_SYNC);
#ifdef THETA
memcpy(MEM_BUFFER, data->buf, data->size);
H5DWMM.mmap.tmp_buf = malloc(data->size);
memcpy(H5DWMM.mmap.tmp_buf, data->buf, data->size);
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
data->xfer_plist_id, MEM_BUFFER);
data->xfer_plist_id, H5DWMM.mmap.tmp_buf);
free(H5DWMM.mmap.tmp_buf);
#else
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
......@@ -184,6 +186,7 @@ hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fa
H5Pget_fapl_mpio(fapl_id, &comm, &info);
MPI_Comm_dup(comm, &H5DWMM.mpi.comm);
MPI_Comm_rank(comm, &H5DWMM.mpi.rank);
MPI_Comm_size(comm, &H5DWMM.mpi.nproc);
MPI_Comm_split_type(H5DWMM.mpi.comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &H5DWMM.mpi.node_comm);
MPI_Comm_rank(H5DWMM.mpi.node_comm, &H5DWMM.mpi.local_rank);
MPI_Comm_size(H5DWMM.mpi.node_comm, &H5DWMM.mpi.ppn);
......@@ -395,7 +398,6 @@ void *H5Dread_pthread_func(void *args) {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while (not H5DRMM.io.dset_cached) {
if (not H5DRMM.io.batch_cached) {
char *p_mmap = (char *) H5DRMM.mmap.buf;
char *p_mem = (char *) H5DRMM.mmap.tmp_buf;
MPI_Win_fence(MPI_MODE_NOPRECEDE, H5DRMM.mpi.win);
int batch_size = H5DRMM.dset.batch.size();
......@@ -434,7 +436,7 @@ void *H5Dread_pthread_func(void *args) {
/*
Create memory map files on the local storage attach them to MPI_Win
*/
void create_mmap(const char *prefix) {
void create_mmap_win(const char *prefix) {
setH5SSD();
hsize_t ss = (H5DRMM.dset.size/PAGESIZE+1)*PAGESIZE;
if (strcmp("MEMORY", getenv("SSD_PATH"))!=0) {
......@@ -469,6 +471,7 @@ void create_mmap(const char *prefix) {
MPI_Type_commit(&H5DRMM.dset.mpi_datatype);
MPI_Win_create(H5DRMM.mmap.buf, ss, H5DRMM.dset.esize, MPI_INFO_NULL, H5DRMM.mpi.comm, &H5DRMM.mpi.win);
}
hid_t H5Fopen_cache( const char *name, hid_t fcpl_id, hid_t fapl_id ) {
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
......@@ -481,7 +484,6 @@ hid_t H5Fopen_cache( const char *name, hid_t fcpl_id, hid_t fapl_id ) {
MPI_Comm_split_type(H5DRMM.mpi.comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &H5DRMM.mpi.node_comm);
MPI_Comm_rank(H5DRMM.mpi.node_comm, &H5DRMM.mpi.local_rank);
MPI_Comm_size(H5DRMM.mpi.node_comm, &H5DRMM.mpi.ppn);
H5DRMM.ssd->mspace_per_rank_total = H5DRMM.ssd->mspace_total / H5DRMM.mpi.ppn;
H5DRMM.ssd->mspace_per_rank_left = H5DRMM.ssd->mspace_per_rank_total;
return H5Fopen(name, fcpl_id, fapl_id);
......@@ -491,9 +493,9 @@ hid_t H5Fopen_cache( const char *name, hid_t fcpl_id, hid_t fapl_id ) {
Open a dataset, can create memory map.
*/
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
hid_t dset = H5Dopen(loc_id, name, dapl_id);
int rc = pthread_create(&H5DRMM.io.pthread, NULL, H5Dread_pthread_func, NULL);
srand(time(NULL)); // Initialization, should only be called once.
hid_t dset = H5Dopen(loc_id, name, dapl_id);
H5DRMM.dset.h5_datatype = H5Dget_type(dset);
H5DRMM.dset.esize = H5Tget_size(H5DRMM.dset.h5_datatype);
hid_t fspace = H5Dget_space(dset);
......@@ -514,7 +516,7 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
cout << gdims[0] << " " << H5DRMM.mpi.rank << "/" << H5DRMM.mpi.nproc << " " << H5DRMM.dset.ns_loc << " " << H5DRMM.dset.s_offset << " " << H5DRMM.dset.size << endl;
}
double t0 = MPI_Wtime();
create_mmap(name);
create_mmap_win(name);
double t1 = MPI_Wtime() - t0;
if (io_node() == H5DRMM.mpi.rank and debug_level() > 1)
cout << "Time for creating memory map files: " << t1 << " seconds" << endl;
......@@ -540,6 +542,7 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
cout << "H5PthreadWait time: " << t1 << endl;
}
free(H5DRMM.mmap.tmp_buf);
// copy the buffer
H5DRMM.mmap.tmp_buf = malloc(bytes);
t0 = MPI_Wtime();
memcpy(H5DRMM.mmap.tmp_buf, dat, bytes);
......@@ -548,9 +551,9 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
cout << "H5Dread_cache memcpy: " << t1 - t0 << endl;
}
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = false;
H5DRMM.io.batch_cached = false;
// wake up I/O thread
pthread_cond_signal(&H5DRMM.io.io_cond);
// wake up I/O thread rightawayx
pthread_mutex_unlock(&H5DRMM.io.request_lock);
return err;
}
......@@ -558,7 +561,6 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
/*
Reading data directly from local storage.
*/
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
......
/*
This is the header file
This is the header files for node local storage incorporated HDF5
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
......@@ -9,7 +9,7 @@
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform write
// The meta data for I/O thread to perform parallel write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
......@@ -19,7 +19,7 @@ typedef struct _thread_data_t {
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset; // offset in the files on SSD
hid_t offset; // offset in memory mapped file on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
......@@ -27,6 +27,7 @@ typedef struct _thread_data_t {
using namespace std;
// SSD related meta data
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
......@@ -36,58 +37,60 @@ typedef struct _SSD_INFO {
hsize_t offset;
} SSD_INFO;
// MPI infos
typedef struct _MPI_INFO {
int ppn;
int rank;
int local_rank;
int nproc;
int local_rank; // rank id in the local communicator
int ppn; // number or processors in the
MPI_Comm comm; // global communicator
MPI_Comm node_comm; // node local communicator
MPI_Win win;
} MPI_INFO;
// I/O threads
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
bool batch_cached;
bool dset_cached;
int num_request;
int num_request; // for parallel write
thread_data_t *request_list, *current_request, *first_request; // task queue
bool batch_cached; // for parallel read, -- whether the batch data is cached to SSD or not
bool dset_cached; // whether the entire dataset is cached to SSD or not.
pthread_t pthread;
thread_data_t *request_list, *current_request, *first_request;
} IO_THREAD;
// Memory mapped files
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf;
// for read
void *tmp_buf; // temporally buffer, used for parallel read: copy the read buffer, return the H5Dread_to_cache function, the back ground thread write the data to the SSD.
} MMAP;
// Dataset
typedef struct _SAMPLE {
size_t dim;
size_t size;
size_t nel;
size_t dim; // the number of dimension
size_t size; // the size of the sample in bytes.
size_t nel; // the number of elements per sample,
} SAMPLE;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc;
size_t ns_glob;
size_t s_offset;
hsize_t size;
vector<int> batch;
bool contig_read;
MPI_Datatype mpi_datatype;
hid_t h5_datatype;
size_t esize;
size_t ns_loc; // number of samples per rank
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
vector<int> batch; // batch data to read
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
size_t esize; // the size of an element in bytes.
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
......@@ -101,36 +104,61 @@ typedef struct _H5Dread_cache_metadata {
SSD_INFO *ssd;
} H5Dread_cache_metadata;
/**************************************
* Function APIs for parallel write *
**************************************/
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace);
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contiguous);
//void create_mmap(char *path, H5Dio_mmap &f);
void parallel_dist(size_t dim, int nproc, int rank, size_t &ldim, size_t &start);
hid_t H5Fopen_cache(const char *name, hid_t fcpl_id, hid_t fapl_id);
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dclose_cache_read(hid_t dset);
herr_t H5DRMMF_remap();
void H5PthreadWait();
void H5PthreadTerminate();
// Create HDF5 file: create memory mapped file on the SSD
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
// Close HDF5 file: clean up the memory mapped file
herr_t H5Fclose_cache( hid_t file_id );
// The main program write the dataset, and the I/O thread perform the data migration
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
// close the dataset, property list, etc: check whether all the tasks have been finished or not.
herr_t H5Dclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
herr_t H5Pclose_cache( hid_t id);
void H5WPthreadWait();
void H5WPthreadTerminate();
/****************************************
* Function APIs for Parallel read *
****************************************/
//
hid_t H5Fopen_cache(const char *name, hid_t fcpl_id, hid_t fapl_id);
// Open the dataset, create memory mapped file
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
// Reading dataset (one batch), and then the I/O thread write them to the SSDs
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// Reading dataset (one batch) from the SSDs
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// close the dataset
herr_t H5Dclose_cache_read(hid_t dset);
/****************************************
* I/O thread sync functions *
****************************************/
// waiting for the write (or read) I/O thread to finish the work
void H5WPthreadWait();
void H5RPthreadWait();
// terminate (join) the write (or read) I/O thread after finish all the tasks
void H5WPthreadTerminate();
void H5RPthreadTerminate();
void test_mmap_buf();
void check_pthread_data(thread_data_t *pt);
/***************************************
* Other utils functions *
***************************************/
herr_t H5DRMMF_remap(); // remap the memory mapped file to remove the cache effect
// set hyperslab selection given the sample list
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace);
// get the list of the samples from the filespace
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
#endif //H5SSD_H_
void parallel_dist(size_t dim, int nproc, int rank, size_t &ldim, size_t &start);
#endif //H5Dio_cache.h
......@@ -119,7 +119,12 @@ int main(int argc, char **argv) {
setenv("SSD_PATH", local_storage, 1);
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
hid_t fd;
if (cache)
H5Fopen_cache(fname, H5F_ACC_RDONLY, plist_id);
else
H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
cout << "H5Fopen" << endl;
hid_t dset;
tt.start_clock("H5Dopen");
if (cache) {
......@@ -127,7 +132,8 @@ int main(int argc, char **argv) {
} else {
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
}
tt.stop_clock("H5Dopen");
tt.stop_clock("H5Dopen");
cout << "H5Dopen" << endl;
hid_t fspace = H5Dget_space(dset);
int ndims = H5Sget_simple_extent_ndims(fspace);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment