Commit dc4058fc authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

fixed some bugs

parent 1d36da8c
......@@ -179,15 +179,16 @@ hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fa
sprintf(rnd, "%d", H5DWMM.mpi.rank);
strcat(H5DWMM.mmap.fname, rnd);
H5DWMM.io.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
H5DWMM.ssd->mspace_per_rank_total = H5DWMM.ssd->mspace_total / H5DWMM.mpi.ppn;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total;
H5DWMM.mmap.fd = open(H5DWMM.mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, &H5DWMM);
pthread_mutex_lock(&H5DWMM.io.request_lock);
H5DWMM.io.request_list->id = 0;
H5DWMM.io.current_request = H5DWMM.io.request_list;
H5DWMM.io.first_request = H5DWMM.io.request_list;
pthread_mutex_unlock(&H5DWMM.io.request_lock);
H5DWMM.ssd->mspace_per_rank_total = H5DWMM.ssd->mspace_total / H5DWMM.mpi.ppn;
H5DWMM.ssd->mspace_per_rank_left = H5DWMM.ssd->mspace_per_rank_total;
H5DWMM.mmap.fd = open(H5DWMM.mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, &H5DWMM);
return H5Fcreate(name, flags, fcpl_id, fapl_id);
}
......
......@@ -75,7 +75,8 @@ int main(int argc, char **argv) {
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Info info = MPI_INFO_NULL;
int rank, nproc, provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
//MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Init(&argc, &argv);
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
......@@ -95,14 +96,16 @@ int main(int argc, char **argv) {
}
hsize_t offset[2] = {0, 0};
// setup file access property list for mpio
cout << "H5Pcreate" << endl;
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, comm, info);
cout << "H5Pcreate done" << endl;
char f[255];
strcpy(f, scratch);
strcat(f, "/parallel_file.h5");
tt.start_clock("H5Fcreate");
cout << "cccccc" << endl;
hid_t file_id;
if (cache)
file_id = H5Fcreate_cache(f, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
......@@ -160,12 +163,12 @@ int main(int argc, char **argv) {
}
tt.start_clock("H5close");
if (cache) {
H5Dclose_cache(dset_id);
H5Fclose_cache(file_id);
H5Pclose(dxf_id);
H5Pclose(plist_id);
H5Sclose(filespace);
H5Sclose(memspace);
H5Dclose_cache(dset_id);
H5Fclose_cache(file_id);
} else {
H5Pclose(dxf_id);
H5Pclose(plist_id);
......
......@@ -90,6 +90,7 @@ typedef struct H5VL_pass_through_ext_t {
H5Dwrite_cache_metadata *H5DWMM;
bool read_cache;
bool write_cache;
int num_request_dataset;
} H5VL_pass_through_ext_t;
/* The pass through VOL wrapper context */
......@@ -1147,6 +1148,7 @@ H5VL_pass_through_ext_dataset_create(void *obj, const H5VL_loc_params_t *loc_par
if (o->write_cache) {
dset->H5DWMM = o->H5DWMM;
dset->write_cache = o->write_cache;
dset->num_request_dataset = 0;
}
return (void *)dset;
} /* end H5VL_pass_through_ext_dataset_create() */
......@@ -1231,7 +1233,6 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, wmm->mmap.fd, data->offset);
msync(data->buf, data->size, MS_SYNC);
H5VL_pass_through_ext_t *o = (H5VL_pass_through_ext_t *)data->dataset_obj;
printf("o: %d of %d\n", o->H5DWMM->mpi.rank, o->H5DWMM->mpi.nproc);
void **req;
#ifdef THETA
wmm->mmap.tmp_buf = malloc(data->size);
......@@ -1247,7 +1248,7 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
data->file_space_id, data->xfer_plist_id,
data->buf, req);
#endif
if (o->H5DWMM->mpi.rank==0) printf("write dataset done using H5VLdataset_write\n");
if (wmm->mpi.rank==0) printf("dataset %d written to filesystem, done!\n", data->id);
munmap(data->buf, data->size);
H5Sclose(data->mem_space_id);
H5Sclose(data->file_space_id);
......@@ -1255,6 +1256,7 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
H5Tclose(data->mem_type_id);
wmm->io.current_request=wmm->io.current_request->next;
wmm->io.num_request--;
o->num_request_dataset--;
} if (wmm->io.num_request == 0) {
pthread_cond_signal(&wmm->io.master_cond);
pthread_cond_wait(&wmm->io.io_cond, &wmm->io.request_lock);
......@@ -1264,8 +1266,6 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
return NULL;
}
/*-------------------------------------------------------------------------
* Function: H5VL_pass_through_ext_dataset_write
......@@ -1283,51 +1283,51 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
{
H5VL_pass_through_ext_t *o = (H5VL_pass_through_ext_t *)dset;
herr_t ret_value;
printf("pass through dataset_write\n");
#ifdef ENABLE_EXT_PASSTHRU_LOGGING
printf("------- EXT PASS THROUGH VOL DATASET Write\n");
#endif
if (o->write_cache) {
hsize_t size = get_buf_size(mem_space_id, mem_type_id);
if (o->H5DWMM->ssd->mspace_per_rank_left < size) {
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->H5DWMM->io.num_request>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_cond_wait(&o->H5DWMM->io.master_cond, &o->H5DWMM->io.request_lock);
}
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
o->H5DWMM->ssd->offset = 0;
o->H5DWMM->ssd->mspace_per_rank_left = o->H5DWMM->ssd->mspace_per_rank_total;
o->H5DWMM->ssd->mspace_left = o->H5DWMM->ssd->mspace_total;
}
int err = pwrite(o->H5DWMM->mmap.fd, (char*)buf, size, o->H5DWMM->ssd->offset);
o->H5DWMM->io.request_list->offset = o->H5DWMM->ssd->offset;
o->H5DWMM->ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
o->H5DWMM->ssd->mspace_per_rank_left = o->H5DWMM->ssd->mspace_per_rank_total
- o->H5DWMM->ssd->offset*o->H5DWMM->mpi.ppn;
#ifdef __APPLE__
fcntl(o->H5DWMM->mmap.fd, F_NOCACHE, 1);
#else
fsync(o->H5DWMM->mmap.fd);
#endif
o->H5DWMM->io.request_list->dataset_obj = (void*)dset;
o->H5DWMM->io.request_list->dataset_obj = dset;
o->H5DWMM->io.request_list->mem_type_id = H5Tcopy(mem_type_id);
o->H5DWMM->io.request_list->mem_space_id = H5Scopy(mem_space_id);
o->H5DWMM->io.request_list->file_space_id = H5Scopy(file_space_id);
o->H5DWMM->io.request_list->xfer_plist_id = H5Pcopy(plist_id);
o->H5DWMM->io.request_list->size = size;
o->H5DWMM->io.request_list->offset = o->H5DWMM->ssd->offset;
o->H5DWMM->io.request_list->next = (thread_data_t*) malloc(sizeof(thread_data_t));
o->H5DWMM->io.request_list->next->id = o->H5DWMM->io.request_list->id + 1;
thread_data_t *data = o->H5DWMM->io.request_list;
o->H5DWMM->io.request_list = o->H5DWMM->io.request_list->next;
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
o->H5DWMM->io.num_request++;
o->num_request_dataset++;
pthread_cond_signal(&o->H5DWMM->io.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
o->H5DWMM->ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
o->H5DWMM->ssd->mspace_per_rank_left = o->H5DWMM->ssd->mspace_per_rank_total
- o->H5DWMM->ssd->offset*o->H5DWMM->mpi.ppn;
} else {
ret_value = H5VLdataset_write(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, file_space_id, plist_id, buf, req);
}
//ret_value = H5VLdataset_write(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, file_space_id, plist_id, buf, req);
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
......@@ -1453,6 +1453,15 @@ H5VL_pass_through_ext_dataset_close(void *dset, hid_t dxpl_id, void **req)
#ifdef ENABLE_EXT_PASSTHRU_LOGGING
printf("------- EXT PASS THROUGH VOL DATASET Close\n");
#endif
if (o->write_cache) {
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->num_request_dataset>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_cond_wait(&o->H5DWMM->io.master_cond, &o->H5DWMM->io.request_lock);
}
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
}
ret_value = H5VLdataset_close(o->under_object, o->under_vol_id, dxpl_id, req);
......@@ -1463,14 +1472,6 @@ H5VL_pass_through_ext_dataset_close(void *dset, hid_t dxpl_id, void **req)
/* Release our wrapper, if underlying dataset was closed */
if(ret_value >= 0)
H5VL_pass_through_ext_free_obj(o);
if (o->write_cache) {
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->H5DWMM->io.num_request>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_cond_wait(&o->H5DWMM->io.master_cond, &o->H5DWMM->io.request_lock);
}
}
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
return ret_value;
} /* end H5VL_pass_through_ext_dataset_close() */
......@@ -1732,18 +1733,20 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
} /* end if */
else
file = NULL;
printf("create|\n");
srand(time(NULL)); // Initialization, should only be called once.
file->H5DWMM = (H5Dwrite_cache_metadata*) malloc(sizeof(H5Dwrite_cache_metadata));
file->H5DWMM->io.num_request = 0;
pthread_cond_init(&file->H5DWMM->io.io_cond, NULL);
pthread_cond_init(&file->H5DWMM->io.master_cond, NULL);
pthread_mutex_init(&file->H5DWMM->io.request_lock, NULL);
setH5SSD(&SSD);
file->H5DWMM->ssd = &SSD;
MPI_Comm comm, comm_dup;
MPI_Info mpi_info;
file->H5DWMM->io.num_request = 0;
H5Pget_fapl_mpio(fapl_id, &comm, &mpi_info);
pthread_cond_init(&file->H5DWMM->io.io_cond, NULL);
pthread_cond_init(&file->H5DWMM->io.master_cond, NULL);
pthread_mutex_init(&file->H5DWMM->io.request_lock, NULL);
MPI_Comm_dup(comm, &file->H5DWMM->mpi.comm);
MPI_Comm_rank(comm, &file->H5DWMM->mpi.rank);
MPI_Comm_size(comm, &file->H5DWMM->mpi.nproc);
......@@ -1760,23 +1763,22 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
sprintf(rnd, "%d", file->H5DWMM->mpi.rank);
strcat(file->H5DWMM->mmap.fname, rnd);
file->H5DWMM->io.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
pthread_mutex_lock(&file->H5DWMM->io.request_lock);
file->H5DWMM->io.request_list->id = 0;
file->H5DWMM->io.current_request = file->H5DWMM->io.request_list;
file->H5DWMM->io.first_request = file->H5DWMM->io.request_list;
pthread_mutex_unlock(&file->H5DWMM->io.request_lock);
file->H5DWMM->ssd->mspace_per_rank_total = file->H5DWMM->ssd->mspace_total / file->H5DWMM->mpi.ppn;
file->H5DWMM->ssd->mspace_per_rank_left = file->H5DWMM->ssd->mspace_per_rank_total;
file->H5DWMM->mmap.fd = open(file->H5DWMM->mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
file->write_cache = true;
int rc = pthread_create(&file->H5DWMM->io.pthread, NULL, H5Dwrite_pthread_func_vol, file->H5DWMM);
pthread_mutex_lock(&file->H5DWMM->io.request_lock);
file->H5DWMM->io.request_list->id = 0;
file->H5DWMM->io.current_request = file->H5DWMM->io.request_list;
file->H5DWMM->io.first_request = file->H5DWMM->io.request_list;
pthread_mutex_unlock(&file->H5DWMM->io.request_lock);
/* Close underlying FAPL */
H5Pclose(under_fapl_id);
/* Release copy of our VOL info */
H5VL_pass_through_ext_info_free(info);
printf("create done|\n");
return (void *)file;
} /* end H5VL_pass_through_ext_file_create() */
......@@ -2056,17 +2058,9 @@ H5VL_pass_through_ext_file_close(void *file, hid_t dxpl_id, void **req)
#ifdef ENABLE_EXT_PASSTHRU_LOGGING
printf("------- EXT PASS THROUGH VOL FILE Close\n");
#endif
ret_value = H5VLfile_close(o->under_object, o->under_vol_id, dxpl_id, req);
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
/* Release our wrapper, if underlying file was closed */
if(ret_value >= 0)
H5VL_pass_through_ext_free_obj(o);
if (o->write_cache) {
printf("file_close\n");
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->H5DWMM->io.num_request>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
......@@ -2077,11 +2071,23 @@ H5VL_pass_through_ext_file_close(void *file, hid_t dxpl_id, void **req)
o->H5DWMM->io.num_request=-1;
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
//pthread_join(o->H5DWMM->io.pthread, NULL);
pthread_join(o->H5DWMM->io.pthread, NULL);
close(o->H5DWMM->mmap.fd);
remove(o->H5DWMM->mmap.fname);
o->H5DWMM->ssd->mspace_left = o->H5DWMM->ssd->mspace_total;
}
ret_value = H5VLfile_close(o->under_object, o->under_vol_id, dxpl_id, req);
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
/* Release our wrapper, if underlying file was closed */
if(ret_value >= 0)
H5VL_pass_through_ext_free_obj(o);
return ret_value;
} /* end H5VL_pass_through_ext_file_close() */
......
CC=mpicc
CC=mpicc -DTHETA
#Your HDF5 install path
HDF5_DIR=$(HDF5_ROOT)
......@@ -7,8 +7,8 @@ LIBS=-L$(HDF5_DIR)/lib -lhdf5 -lz
#DEBUG=-DENABLE_EXT_PASSTHRU_LOGGING
CFLAGS=$(INCLUDES) $(LIBS)
TARGET=libh5passthrough_vol.dylib
CXX=mpicxx
TARGET=libh5passthrough_vol.so
CXX=mpicxx -DTHETA
all: makeso test_write_cache
%.o : %.cpp
......@@ -18,7 +18,7 @@ test_write_cache: test_write_cache.o ../utils/debug.o H5Dio_cache.o
makeso:
$(CC) -shared $(CFLAGS) $(DEBUG) -o $(TARGET) -fPIC H5VLpassthru_ext.c H5Dio_cache.c ../utils/debug.c
cp $(TARGET) $(HDF5_DIR)/../vol
mv $(TARGET) $(HDF5_DIR)/../vol
clean:
rm -f $(TARGET) *.o
......
......@@ -20,6 +20,7 @@
#include <stdlib.h>
#include "stat.h"
#include "debug.h"
#include <unistd.h>
int msleep(long miliseconds)
{
struct timespec req, rem;
......@@ -144,7 +145,8 @@ int main(int argc, char **argv) {
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, ldims, count);
tt.start_clock("H5Dwrite");
hid_t status = H5Dwrite(dset_id, H5T_NATIVE_INT, memspace, filespace, dxf_id, data); // write memory to file
H5Fflush(file_id, H5F_SCOPE_LOCAL);
// H5Fflush(file_id, H5F_SCOPE_LOCAL);
tt.stop_clock("H5Dwrite");
tt.start_clock("compute");
msleep(int(sleep*1000));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment