Commit 76aa5537 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

fixed bugs

parent e31ec441
......@@ -49,16 +49,15 @@
// Debug
#include "debug.h"
/*
Global variables to define information related to the local storage
*/
#define MAXDIM 32
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
// initialize H5DWMM data
SSD_INFO SSD;
H5Dwrite_cache_metadata H5DWMM;
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
/*
Function for set up the local storage path and capacity.
*/
......@@ -79,11 +78,12 @@ void setH5SSD(SSD_INFO *ssd) {
}
ssd->offset = 0;
}
void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
/*
Purpose: get the size of the buffer from the memory space
Input: memory space id, data type id;
Purpose: get the size of the buffer from the memory space and type id
Output: the size of the memory space in bytes.
*/
hsize_t get_buf_size(hid_t mspace, hid_t tid) {
......@@ -99,16 +99,15 @@ hsize_t get_buf_size(hid_t mspace, hid_t tid) {
return s;
}
/*
Thread function for performing H5Dwrite. This function will create
a memory mapped buffer to the file that is on the local storage which
contains the data to be written to the file system.
On Theta, the memory mapped buffer currently does not work with H5Dwrite,
we instead allocate a buffer directly to the memory.
On Theta, the memory mapped buffer currently does not work with H5Dwrite, we instead allocate a buffer directly to the memory.
*/
void *H5Dwrite_pthread_func(void *arg) {
// this is to us the H5DWMM as an input
H5Dwrite_cache_metadata *wmm = (H5Dwrite_cache_metadata*) arg;
pthread_mutex_lock(&wmm->io.request_lock);
while (wmm->io.num_request>=0) {
......@@ -152,15 +151,11 @@ void *H5Dwrite_pthread_func(void *arg) {
enough for storing the buffer of the current task, it will wait for the
I/O thread to finsh all the previous tasks.
*/
void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id ) {
H5DWMM.io.num_request = 0;
pthread_cond_init(&H5DWMM.io.io_cond, NULL);
pthread_cond_init(&H5DWMM.io.master_cond, NULL);
pthread_mutex_init(&H5DWMM.io.request_lock, NULL);
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, &H5DWMM);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD(&SSD);
H5DWMM.ssd = &SSD;
......@@ -184,11 +179,12 @@ hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fa
pthread_mutex_lock(&H5DWMM.io.request_lock);
H5DWMM.io.request_list->id = 0;
H5DWMM.io.current_request = H5DWMM.io.request_list;
H5DWMM.io.first_request = H5DWMM.io.request_list;
H5DWMM.io.first_request = H5DWMM.io.request_list;
pthread_mutex_unlock(&H5DWMM.io.request_lock);
H5DWMM.ssd->mspace_per_rank_total = H5DWMM.ssd->mspace_total / H5DWMM.mpi.ppn;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total;
H5DWMM.mmap.fd = open(H5DWMM.mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, &H5DWMM);
return H5Fcreate(name, flags, fcpl_id, fapl_id);
}
......@@ -204,7 +200,6 @@ herr_t
H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
hid_t file_space_id, hid_t dxpl_id, const void *buf) {
hsize_t size = get_buf_size(mem_space_id, mem_type_id);
if (H5DWMM.ssd->mspace_per_rank_left < size) {
H5WPthreadWait();
H5DWMM.ssd->offset = 0;
......@@ -212,6 +207,10 @@ H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
H5DWMM.ssd->mspace_left = H5DWMM.ssd->mspace_total;
}
int err = pwrite(H5DWMM.mmap.fd, (char*)buf, size, H5DWMM.ssd->offset);
H5DWMM.io.request_list->offset = H5DWMM.ssd->offset;
H5DWMM.ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total
- H5DWMM.ssd->offset*H5DWMM.mpi.ppn;
#ifdef __APPLE__
fcntl(H5DWMM.mmap.fd, F_NOCACHE, 1);
#else
......@@ -223,18 +222,13 @@ H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
H5DWMM.io.request_list->file_space_id =file_space_id;
H5DWMM.io.request_list->xfer_plist_id = dxpl_id;
H5DWMM.io.request_list->size = size;
H5DWMM.io.request_list->offset = H5DWMM.ssd->offset;
H5DWMM.io.request_list->next = (thread_data_t*) malloc(sizeof(thread_data_t));
H5DWMM.io.request_list->next->id = H5DWMM.io.request_list->id + 1;
thread_data_t *data = H5DWMM.io.request_list;
H5DWMM.io.request_list = H5DWMM.io.request_list->next;
pthread_mutex_lock(&H5DWMM.io.request_lock);
H5DWMM.io.num_request++;
pthread_cond_signal(&H5DWMM.io.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&H5DWMM.io.request_lock);
H5DWMM.ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total
- H5DWMM.ssd->offset*H5DWMM.mpi.ppn;
return err;
}
......@@ -410,7 +404,6 @@ void *H5Dread_pthread_func(void *args) {
MPI_Win_fence(MPI_MODE_NOSUCCEED, dmm->mpi.win);
if (io_node()==dmm->mpi.rank && debug_level()>2) printf("PTHREAD DONE\n");
dmm->io.batch_cached = true;
dmm->dset.ns_cached += dmm->dset.batch.size;
if (dmm->dset.ns_cached>=dmm->dset.ns_loc)
dmm->io.dset_cached=true;
} else {
......@@ -510,7 +503,7 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
double t0 = MPI_Wtime();
create_mmap_win(name);
double t1 = MPI_Wtime() - t0;
int rc = pthread_create(&H5DRMM.io.pthread, NULL, H5Dread_pthread_func, (void *) &H5DRMM);
int rc = pthread_create(&H5DRMM.io.pthread, NULL, H5Dread_pthread_func, &H5DRMM);
if (io_node() == H5DRMM.mpi.rank && debug_level() > 1)
printf("Time for creating memory map files: %f seconds\n", t1);
free(gdims);
......@@ -541,9 +534,9 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
if (io_node()==H5DRMM.mpi.rank && debug_level() > 1) {
printf("H5Dread_cache memcpy: %f\n", t1-t0);
}
H5DRMM.dset.ns_cached += H5DRMM.dset.batch.size;
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = false;
// wake up I/O thread
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_mutex_unlock(&H5DRMM.io.request_lock);
return err;
......@@ -555,10 +548,10 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
herr_t H5Dread_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
if (H5DRMM.dset.ns_cached>=H5DRMM.dset.ns_loc) {
return H5Dread_from_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
} else {
if (H5DRMM.dset.ns_cached<H5DRMM.dset.ns_loc) {
return H5Dread_to_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
} else {
return H5Dread_from_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
}
}
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
......@@ -567,12 +560,10 @@ herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
if (io_node()==H5DRMM.mpi.rank && debug_level()>1) {
printf("Reading data from cache (H5Dread_from_cache)\n");
}
bool contig = false;
BATCH b;
H5RPthreadWait();
get_samples_from_filespace(file_space_id, &b, &contig);
MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOPRECEDE, H5DRMM.mpi.win);
char *p_mem = (char *) dat;
int batch_size = b.size;
......@@ -646,7 +637,7 @@ void H5RPthreadTerminate() {
*/
void H5RPthreadWait() {
pthread_mutex_lock(&H5DRMM.io.request_lock);
while((!H5DRMM.io.batch_cached) && (!H5DRMM.io.dset_cached)) {
while(!H5DRMM.io.batch_cached) {
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_cond_wait(&H5DRMM.io.master_cond, &H5DRMM.io.request_lock);
}
......
......@@ -251,14 +251,14 @@ int main(int argc, char **argv) {
}
}
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
if (cache) {
H5Dclose_cache_read(dset);
} else {
H5Dclose(dset);
}
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Fclose(fd);
delete [] dat;
delete [] ldims;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment