Commit 4e67237a authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files
parents da44605c 2bdeeb79
......@@ -49,53 +49,41 @@
// Debug
#include "debug.h"
/*
Global variables to define information related to the local storage
*/
#define MAXDIM 32
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
// initialize H5DWMM data
SSD_INFO
SSD = {
.mspace_total = 137438953472,
.mspace_left = 137438953472,
.offset = 0
};
H5Dwrite_cache_metadata
H5DWMM = {
.io.num_request = 0,//number of I/O request
.io.master_cond = PTHREAD_COND_INITIALIZER, // condition variable
.io.io_cond = PTHREAD_COND_INITIALIZER,
.io.request_lock = PTHREAD_MUTEX_INITIALIZER,
.io.request_list = NULL,
.io.current_request = NULL,
.io.first_request = NULL,
.ssd = &SSD,
};
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
SSD_INFO SSD;
H5Dwrite_cache_metadata H5DWMM;
/*
Function for set up the local storage path and capacity.
*/
int setH5SSD() {
// set SSD_PATH
void setH5SSD(SSD_INFO *ssd) {
// set ssd_PATH
if (getenv("SSD_PATH")) {
strcpy(SSD.path, getenv("SSD_PATH"));
strcpy(ssd->path, getenv("SSD_PATH"));
} else {
strcpy(SSD.path, "/local/scratch/");
strcpy(ssd->path, "/local/scratch/");
}
// set SSD_SIZE;
if (getenv("SSD_SIZE")) {
SSD.mspace_total = atof(getenv("SSD_SIZE"))*1024*1024*1024;
SSD.mspace_left = SSD.mspace_total;
ssd->mspace_total = atof(getenv("SSD_SIZE"))*1024*1024*1024;
ssd->mspace_left = ssd->mspace_total;
} else {
ssd->mspace_total = 137438953472;
ssd->mspace_left = 137438953472;
}
return 0;
ssd->offset = 0;
}
void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
/*
Purpose: get the size of the buffer from the memory space
Input: memory space id, data type id;
Purpose: get the size of the buffer from the memory space and type id
Output: the size of the memory space in bytes.
*/
hsize_t get_buf_size(hid_t mspace, hid_t tid) {
......@@ -111,43 +99,46 @@ hsize_t get_buf_size(hid_t mspace, hid_t tid) {
return s;
}
/*
Thread function for performing H5Dwrite. This function will create
a memory mapped buffer to the file that is on the local storage which
contains the data to be written to the file system.
On Theta, the memory mapped buffer currently does not work with H5Dwrite,
we instead allocate a buffer directly to the memory.
On Theta, the memory mapped buffer currently does not work with H5Dwrite, we instead allocate a buffer directly to the memory.
*/
void *H5Dwrite_pthread_func(void *arg) {
pthread_mutex_lock(&H5DWMM.io.request_lock);
while (H5DWMM.io.num_request>=0) {
if (H5DWMM.io.num_request >0) {
thread_data_t *data = H5DWMM.io.current_request;
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, H5DWMM.mmap.fd, data->offset);
// this is to us the H5DWMM as an input
H5Dwrite_cache_metadata *wmm = (H5Dwrite_cache_metadata*) arg;
pthread_mutex_lock(&wmm->io.request_lock);
while (wmm->io.num_request>=0) {
if (wmm->io.num_request >0) {
thread_data_t *data = wmm->io.current_request;
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, wmm->mmap.fd, data->offset);
msync(data->buf, data->size, MS_SYNC);
#ifdef THETA
H5DWMM.mmap.tmp_buf = malloc(data->size);
memcpy(H5DWMM.mmap.tmp_buf, data->buf, data->size);
wmm->mmap.tmp_buf = malloc(data->size);
memcpy(wmm->mmap.tmp_buf, data->buf, data->size);
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
data->xfer_plist_id, H5DWMM.mmap.tmp_buf);
free(H5DWMM.mmap.tmp_buf);
data->xfer_plist_id, wmm->mmap.tmp_buf);
free(wmm->mmap.tmp_buf);
#else
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
data->xfer_plist_id, data->buf);
#endif
H5Sclose(data->mem_space_id);
H5Sclose(data->file_space_id);
H5Tclose(data->mem_type_id);
munmap(data->buf, data->size);
H5DWMM.io.current_request=H5DWMM.io.current_request->next;
H5DWMM.io.num_request--;
} if (H5DWMM.io.num_request == 0) {
pthread_cond_signal(&H5DWMM.io.master_cond);
pthread_cond_wait(&H5DWMM.io.io_cond, &H5DWMM.io.request_lock);
wmm->io.current_request=wmm->io.current_request->next;
wmm->io.num_request--;
} if (wmm->io.num_request == 0) {
pthread_cond_signal(&wmm->io.master_cond);
pthread_cond_wait(&wmm->io.io_cond, &wmm->io.request_lock);
}
}
pthread_mutex_unlock(&H5DWMM.io.request_lock);
pthread_mutex_unlock(&wmm->io.request_lock);
return NULL;
}
......@@ -163,13 +154,14 @@ void *H5Dwrite_pthread_func(void *arg) {
enough for storing the buffer of the current task, it will wait for the
I/O thread to finsh all the previous tasks.
*/
void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id ) {
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, NULL);
H5DWMM.io.num_request = 0;
pthread_cond_init(&H5DWMM.io.io_cond, NULL);
pthread_cond_init(&H5DWMM.io.master_cond, NULL);
pthread_mutex_init(&H5DWMM.io.request_lock, NULL);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
setH5SSD(&SSD);
H5DWMM.ssd = &SSD;
MPI_Comm comm, comm_dup;
MPI_Info info;
H5Pget_fapl_mpio(fapl_id, &comm, &info);
......@@ -190,11 +182,12 @@ hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fa
pthread_mutex_lock(&H5DWMM.io.request_lock);
H5DWMM.io.request_list->id = 0;
H5DWMM.io.current_request = H5DWMM.io.request_list;
H5DWMM.io.first_request = H5DWMM.io.request_list;
H5DWMM.io.first_request = H5DWMM.io.request_list;
pthread_mutex_unlock(&H5DWMM.io.request_lock);
H5DWMM.ssd->mspace_per_rank_total = H5DWMM.ssd->mspace_total / H5DWMM.mpi.ppn;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total;
H5DWMM.mmap.fd = open(H5DWMM.mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, &H5DWMM);
return H5Fcreate(name, flags, fcpl_id, fapl_id);
}
......@@ -210,7 +203,6 @@ herr_t
H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
hid_t file_space_id, hid_t dxpl_id, const void *buf) {
hsize_t size = get_buf_size(mem_space_id, mem_type_id);
if (H5DWMM.ssd->mspace_per_rank_left < size) {
H5WPthreadWait();
H5DWMM.ssd->offset = 0;
......@@ -218,29 +210,28 @@ H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
H5DWMM.ssd->mspace_left = H5DWMM.ssd->mspace_total;
}
int err = pwrite(H5DWMM.mmap.fd, (char*)buf, size, H5DWMM.ssd->offset);
H5DWMM.io.request_list->offset = H5DWMM.ssd->offset;
H5DWMM.ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total
- H5DWMM.ssd->offset*H5DWMM.mpi.ppn;
#ifdef __APPLE__
fcntl(H5DWMM.mmap.fd, F_NOCACHE, 1);
#else
fsync(H5DWMM.mmap.fd);
#endif
H5DWMM.io.request_list->dataset_id = dataset_id;
H5DWMM.io.request_list->mem_type_id = mem_type_id;
H5DWMM.io.request_list->mem_space_id = mem_space_id;
H5DWMM.io.request_list->file_space_id =file_space_id;
H5DWMM.io.request_list->xfer_plist_id = dxpl_id;
H5DWMM.io.request_list->mem_type_id = H5Tcopy(mem_type_id);
H5DWMM.io.request_list->mem_space_id = H5Scopy(mem_space_id);
H5DWMM.io.request_list->file_space_id = H5Scopy(file_space_id);
H5DWMM.io.request_list->xfer_plist_id = H5Pcopy(dxpl_id);
H5DWMM.io.request_list->size = size;
H5DWMM.io.request_list->offset = H5DWMM.ssd->offset;
H5DWMM.io.request_list->next = (thread_data_t*) malloc(sizeof(thread_data_t));
H5DWMM.io.request_list->next->id = H5DWMM.io.request_list->id + 1;
thread_data_t *data = H5DWMM.io.request_list;
H5DWMM.io.request_list = H5DWMM.io.request_list->next;
pthread_mutex_lock(&H5DWMM.io.request_lock);
H5DWMM.io.num_request++;
pthread_cond_signal(&H5DWMM.io.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&H5DWMM.io.request_lock);
H5DWMM.ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total
- H5DWMM.ssd->offset*H5DWMM.mpi.ppn;
return err;
}
......@@ -281,14 +272,6 @@ herr_t H5Fclose_cache( hid_t file_id ) {
return H5Fclose(file_id);
}
/*
Wait for pthread to finish the work and close the property
*/
herr_t H5Pclose_cache(hid_t dxf_id) {
H5WPthreadWait();
return H5Pclose(dxf_id);
}
/*
Wait for pthread to finish the work and close the dataset
*/
......@@ -296,29 +279,12 @@ herr_t H5Dclose_cache(hid_t dset_id) {
H5WPthreadWait();
return H5Dclose(dset_id);
}
/*
Wait for the pthread to finish the work and close the memory space
*/
herr_t H5Sclose_cache(hid_t filespace) {
H5WPthreadWait();
return H5Sclose(filespace);
}
/*
The following functions are for parallel read.
*/
H5Dread_cache_metadata
H5DRMM = {
.io.master_cond = PTHREAD_COND_INITIALIZER,
.io.io_cond = PTHREAD_COND_INITIALIZER,
.io.request_lock = PTHREAD_MUTEX_INITIALIZER,
.io.batch_cached = true,
.io.dset_cached = false,
.ssd = &SSD
};
H5Dread_cache_metadata H5DRMM;
/*
Helper function to compute the local number of samples and the offset.
*/
......@@ -393,44 +359,44 @@ void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contig) {
mapped files on the local storage using MPI_Put
*/
void *H5Dread_pthread_func(void *args) {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while (!H5DRMM.io.dset_cached) {
if (!H5DRMM.io.batch_cached) {
char *p_mem = (char *) H5DRMM.mmap.tmp_buf;
MPI_Win_fence(MPI_MODE_NOPRECEDE, H5DRMM.mpi.win);
int batch_size = H5DRMM.dset.batch.size;
if (H5DRMM.dset.contig_read) {
int dest = H5DRMM.dset.batch.list[0];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Put(p_mem, H5DRMM.dset.sample.nel*batch_size,
H5DRMM.dset.mpi_datatype, src,
offset, H5DRMM.dset.sample.nel*batch_size,
H5DRMM.dset.mpi_datatype, H5DRMM.mpi.win);
H5Dread_cache_metadata *dmm = (H5Dread_cache_metadata*) args;
pthread_mutex_lock(&dmm->io.request_lock);
while (!dmm->io.dset_cached) {
if (!dmm->io.batch_cached) {
char *p_mem = (char *) dmm->mmap.tmp_buf;
MPI_Win_fence(MPI_MODE_NOPRECEDE, dmm->mpi.win);
int batch_size = dmm->dset.batch.size;
if (dmm->dset.contig_read) {
int dest = dmm->dset.batch.list[0];
int src = dest/dmm->dset.ns_loc;
MPI_Aint offset = (dest%dmm->dset.ns_loc)*dmm->dset.sample.nel;
MPI_Put(p_mem, dmm->dset.sample.nel*batch_size,
dmm->dset.mpi_datatype, src,
offset, dmm->dset.sample.nel*batch_size,
dmm->dset.mpi_datatype, dmm->mpi.win);
} else {
for(int i=0; i<batch_size; i++) {
int dest = H5DRMM.dset.batch.list[i];
int src = dest/H5DRMM.dset.ns_loc;
MPI_Aint offset = (dest%H5DRMM.dset.ns_loc)*H5DRMM.dset.sample.nel;
MPI_Put(&p_mem[i*H5DRMM.dset.sample.size],
H5DRMM.dset.sample.nel,
H5DRMM.dset.mpi_datatype, src,
offset, H5DRMM.dset.sample.nel,
H5DRMM.dset.mpi_datatype, H5DRMM.mpi.win);
int dest = dmm->dset.batch.list[i];
int src = dest/dmm->dset.ns_loc;
MPI_Aint offset = (dest%dmm->dset.ns_loc)*dmm->dset.sample.nel;
MPI_Put(&p_mem[i*dmm->dset.sample.size],
dmm->dset.sample.nel,
dmm->dset.mpi_datatype, src,
offset, dmm->dset.sample.nel,
dmm->dset.mpi_datatype, dmm->mpi.win);
}
}
MPI_Win_fence(MPI_MODE_NOSUCCEED, H5DRMM.mpi.win);
if (io_node()==H5DRMM.mpi.rank && debug_level()>2) printf("PTHREAD DONE\n");
H5DRMM.io.batch_cached = true;
H5DRMM.dset.ns_cached += H5DRMM.dset.batch.size;
if (H5DRMM.dset.ns_cached>=H5DRMM.dset.ns_loc)
H5DRMM.io.dset_cached=true;
MPI_Win_fence(MPI_MODE_NOSUCCEED, dmm->mpi.win);
if (io_node()==dmm->mpi.rank && debug_level()>2) printf("PTHREAD DONE\n");
dmm->io.batch_cached = true;
if (dmm->dset.ns_cached>=dmm->dset.ns_loc)
dmm->io.dset_cached=true;
} else {
pthread_cond_signal(&H5DRMM.io.master_cond);
pthread_cond_wait(&H5DRMM.io.io_cond, &H5DRMM.io.request_lock);
pthread_cond_signal(&dmm->io.master_cond);
pthread_cond_wait(&dmm->io.io_cond, &dmm->io.request_lock);
}
}
pthread_mutex_unlock(&H5DRMM.io.request_lock);
pthread_mutex_unlock(&dmm->io.request_lock);
return NULL;
}
......@@ -438,7 +404,6 @@ void *H5Dread_pthread_func(void *args) {
Create memory map files on the local storage attach them to MPI_Win
*/
void create_mmap_win(const char *prefix) {
setH5SSD();
hsize_t ss = (H5DRMM.dset.size/PAGESIZE+1)*PAGESIZE;
if (strcmp("MEMORY", getenv("SSD_PATH"))!=0) {
strcpy(H5DRMM.mmap.fname, H5DRMM.ssd->path);
......@@ -456,7 +421,6 @@ void create_mmap_win(const char *prefix) {
fsync(fh);
close(fh);
H5DRMM.mmap.fd = open(H5DRMM.mmap.fname, O_RDWR);
H5DRMM.mmap.buf = mmap(NULL, ss, PROT_READ | PROT_WRITE, MAP_SHARED, H5DRMM.mmap.fd, 0);
msync(H5DRMM.mmap.buf, ss, MS_SYNC);
} else {
......@@ -474,7 +438,8 @@ void create_mmap_win(const char *prefix) {
hid_t H5Fopen_cache( const char *name, hid_t fcpl_id, hid_t fapl_id ) {
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
setH5SSD(&SSD);
H5DRMM.ssd = &SSD;
MPI_Comm comm;
MPI_Info info;
H5Pget_fapl_mpio(fapl_id, &comm, &info);
......@@ -494,7 +459,14 @@ hid_t H5Fopen_cache( const char *name, hid_t fcpl_id, hid_t fapl_id ) {
*/
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
hid_t dset = H5Dopen(loc_id, name, dapl_id);
int rc = pthread_create(&H5DRMM.io.pthread, NULL, H5Dread_pthread_func, NULL);
pthread_cond_init(&H5DRMM.io.io_cond, NULL);
pthread_cond_init(&H5DRMM.io.master_cond, NULL);
pthread_mutex_init(&H5DRMM.io.request_lock, NULL);
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = true;
H5DRMM.io.dset_cached = false;
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_mutex_unlock(&H5DRMM.io.request_lock);
srand(time(NULL)); // Initialization, should only be called once.
H5DRMM.dset.h5_datatype = H5Dget_type(dset);
H5DRMM.dset.esize = H5Tget_size(H5DRMM.dset.h5_datatype);
......@@ -516,6 +488,7 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
double t0 = MPI_Wtime();
create_mmap_win(name);
double t1 = MPI_Wtime() - t0;
int rc = pthread_create(&H5DRMM.io.pthread, NULL, H5Dread_pthread_func, &H5DRMM);
if (io_node() == H5DRMM.mpi.rank && debug_level() > 1)
printf("Time for creating memory map files: %f seconds\n", t1);
free(gdims);
......@@ -546,9 +519,9 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
if (io_node()==H5DRMM.mpi.rank && debug_level() > 1) {
printf("H5Dread_cache memcpy: %f\n", t1-t0);
}
H5DRMM.dset.ns_cached += H5DRMM.dset.batch.size;
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = false;
// wake up I/O thread
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_mutex_unlock(&H5DRMM.io.request_lock);
return err;
......@@ -560,17 +533,18 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
herr_t H5Dread_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
if (H5DRMM.dset.ns_cached>=H5DRMM.dset.ns_loc)
return H5Dread_from_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
else
if (H5DRMM.dset.ns_cached<H5DRMM.dset.ns_loc) {
return H5Dread_to_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
} else {
return H5Dread_from_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
}
}
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
if (io_node()==H5DRMM.mpi.rank && debug_level()>1) {
printf("Reading data from cache (H5Dread_from_cache)\n");
}
}
bool contig = false;
BATCH b;
H5RPthreadWait();
......@@ -648,7 +622,7 @@ void H5RPthreadTerminate() {
*/
void H5RPthreadWait() {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while((!H5DRMM.io.batch_cached) && (!H5DRMM.io.dset_cached)) {
while(!H5DRMM.io.batch_cached) {
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_cond_wait(&H5DRMM.io.master_cond, &H5DRMM.io.request_lock);
}
......
......@@ -12,6 +12,7 @@
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
void *dataset_obj;
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
......@@ -126,8 +127,6 @@ herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
// close the dataset, property list, etc: check whether all the tasks have been finished or not.
herr_t H5Dclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
herr_t H5Pclose_cache( hid_t id);
/****************************************
* Function APIs for Parallel read *
****************************************/
......@@ -167,6 +166,7 @@ void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
void setH5SSD(SSD_INFO *);
#ifdef __cplusplus
}
#endif
......
......@@ -133,7 +133,6 @@ int main(int argc, char **argv) {
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
}
tt.stop_clock("H5Dopen");
hid_t fspace = H5Dget_space(dset);
int ndims = H5Sget_simple_extent_ndims(fspace);
......@@ -252,14 +251,14 @@ int main(int argc, char **argv) {
}
}
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
if (cache) {
H5Dclose_cache_read(dset);
} else {
H5Dclose(dset);
}
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Fclose(fd);
delete [] dat;
delete [] ldims;
......
......@@ -160,11 +160,11 @@ int main(int argc, char **argv) {
}
tt.start_clock("H5close");
if (cache) {
H5Pclose_cache(dxf_id);
H5Pclose_cache(plist_id);
H5Pclose(dxf_id);
H5Pclose(plist_id);
H5Sclose(filespace);
H5Sclose(memspace);
H5Dclose_cache(dset_id);
H5Sclose_cache(filespace);
H5Sclose_cache(memspace);
H5Fclose_cache(file_id);
} else {
H5Pclose(dxf_id);
......
This diff is collapsed.
../hdf5/H5Dio_cache.c
\ No newline at end of file
/*
This is the header files for node local storage incorporated HDF5
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform parallel write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset; // offset in memory mapped file on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
// SSD related meta data
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
double mspace_per_rank_total;
double mspace_per_rank_left;
char path[255];
hsize_t offset;
} SSD_INFO;
// MPI infos
typedef struct _MPI_INFO {
int rank;
int nproc;
int local_rank; // rank id in the local communicator
int ppn; // number or processors in the
MPI_Comm comm; // global communicator
MPI_Comm node_comm; // node local communicator
MPI_Win win;
} MPI_INFO;
// I/O threads
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
int num_request; // for parallel write
thread_data_t *request_list, *current_request, *first_request; // task queue
bool batch_cached; // for parallel read, -- whether the batch data is cached to SSD or not
bool dset_cached; // whether the entire dataset is cached to SSD or not.
pthread_t pthread;
} IO_THREAD;
// Memory mapped files
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf; // temporally buffer, used for parallel read: copy the read buffer, return the H5Dread_to_cache function, the back ground thread write the data to the SSD.
} MMAP;
// Dataset
typedef struct _SAMPLE {
size_t dim; // the number of dimension
size_t size; // the size of the sample in bytes.
size_t nel; // the number of elements per sample,
} SAMPLE;
typedef struct _BATCH {
int *list;
int size;
} BATCH;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc; // number of samples per rank
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
BATCH batch; // batch data to read
int ns_cached;
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
size_t esize; // the size of an element in bytes.
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
} H5Dwrite_cache_metadata;
typedef struct _H5Dread_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
DSET dset;
SSD_INFO *ssd;