Commit 20a2951d authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

test

parent 70087891
......@@ -167,7 +167,6 @@ void int2char(int a, char str[255]) {
sprintf(str, "%d", a);
}
hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id ) {
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, NULL);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
......
......@@ -111,7 +111,7 @@ typedef struct _H5Dread_cache_metadata {
* Function APIs for parallel write *
**************************************/
// Create HDF5 file: create memory mapped file on the SSD
#if __cplusplus
#ifdef __cplusplus
extern "C" {
#endif
hid_t H5Fcreate_cache( const char *name, unsigned flags,
......@@ -167,7 +167,7 @@ void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
#if __cplusplus
#ifdef __cplusplus
}
#endif
#endif //H5Dio_cache.h
......@@ -11,7 +11,7 @@ include make.inc
all: test_read_cache test_write_cache prepare_dataset
test_read_cache: test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_read_cache.o ../utils/debug.o ../utils/profiling.o H5Dio_cache.o $(HDF5_LIB)
$(CXX) $(CFLAGS) -o $@ test_read_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB) -L$(HDF5_ROOT)/../vol/ -lh5passthrough_vol
test_vol: test_vol.o ../utils/debug.o
......
......@@ -128,7 +128,7 @@ int main(int argc, char **argv) {
hid_t dset;
tt.start_clock("H5Dopen");
if (cache) {
dset = H5Dopen_cache(fd, dataset, H5P_DEFAULT);
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
} else {
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
}
......
......@@ -54,7 +54,7 @@
Global variables to define information related to the local storage
*/
#define MAXDIM 32
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
// initialize H5DWMM data
SSD_INFO
SSD = {
......@@ -62,11 +62,8 @@ SSD = {
.mspace_left = 137438953472,
.offset = 0
};
H5Dwrite_cache_metadata
H5Dwrite_cache_metadata
H5DWMM = {
.mpi.ppn = 1, // number of proc per node
.mpi.rank = 0, // rank id in H5F comm
.mpi.local_rank = 0, // local rank id in a node
.io.num_request = 0,//number of I/O request
.io.master_cond = PTHREAD_COND_INITIALIZER, // condition variable
.io.io_cond = PTHREAD_COND_INITIALIZER,
......@@ -74,13 +71,13 @@ H5DWMM = {
.io.request_list = NULL,
.io.current_request = NULL,
.io.first_request = NULL,
.ssd = &SSD
.ssd = &SSD,
};
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
/*
Function for set up the local storage path and capacity.
*/
int setH5SSD() {
void setH5SSD() {
// set SSD_PATH
if (getenv("SSD_PATH")) {
strcpy(SSD.path, getenv("SSD_PATH"));
......@@ -92,7 +89,6 @@ int setH5SSD() {
SSD.mspace_total = atof(getenv("SSD_SIZE"))*1024*1024*1024;
SSD.mspace_left = SSD.mspace_total;
}
return 0;
}
......@@ -426,6 +422,9 @@ void *H5Dread_pthread_func(void *args) {
MPI_Win_fence(MPI_MODE_NOSUCCEED, H5DRMM.mpi.win);
if (io_node()==H5DRMM.mpi.rank && debug_level()>2) printf("PTHREAD DONE\n");
H5DRMM.io.batch_cached = true;
H5DRMM.dset.ns_cached += H5DRMM.dset.batch.size;
if (H5DRMM.dset.ns_cached>=H5DRMM.dset.ns_loc)
H5DRMM.io.dset_cached=true;
} else {
pthread_cond_signal(&H5DRMM.io.master_cond);
pthread_cond_wait(&H5DRMM.io.io_cond, &H5DRMM.io.request_lock);
......@@ -456,11 +455,8 @@ void create_mmap_win(const char *prefix) {
pwrite(fh, &a, 1, ss);
fsync(fh);
close(fh);
#ifdef __APPLE__
H5DRMM.mmap.fd = open(H5DRMM.mmap.fname, O_RDWR);
#else
H5DRMM.mmap.fd = open(H5DRMM.mmap.fname, O_RDWR | O_DIRECT);
#endif
H5DRMM.mmap.buf = mmap(NULL, ss, PROT_READ | PROT_WRITE, MAP_SHARED, H5DRMM.mmap.fd, 0);
msync(H5DRMM.mmap.buf, ss, MS_SYNC);
} else {
......@@ -510,9 +506,10 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id) {
for(int i=1; i<ndims; i++) {
dim = dim*gdims[i];
}
H5DRMM.dset.sample.nel = dim;
H5DRMM.dset.sample.nel = dim;
H5DRMM.dset.sample.dim = ndims-1;
H5DRMM.dset.ns_glob = gdims[0];
H5DRMM.dset.ns_cached=0;
parallel_dist(gdims[0], H5DRMM.mpi.nproc, H5DRMM.mpi.rank, &H5DRMM.dset.ns_loc, &H5DRMM.dset.s_offset);
H5DRMM.dset.sample.size = H5DRMM.dset.esize*H5DRMM.dset.sample.nel;
H5DRMM.dset.size = H5DRMM.dset.sample.size*H5DRMM.dset.ns_loc;
......@@ -551,7 +548,7 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
}
pthread_mutex_lock(&H5DRMM.io.request_lock);
H5DRMM.io.batch_cached = false;
// wake up I/O thread
// wake up I/O thread
pthread_cond_signal(&H5DRMM.io.io_cond);
pthread_mutex_unlock(&H5DRMM.io.request_lock);
return err;
......@@ -560,6 +557,14 @@ herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id,
/*
Reading data directly from local storage.
*/
herr_t H5Dread_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
if (H5DRMM.dset.ns_cached>=H5DRMM.dset.ns_loc)
return H5Dread_from_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
else
return H5Dread_to_cache(dataset_id, mem_type_id, mem_space_id, file_space_id, xfer_plist_id, dat);
}
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t xfer_plist_id, void * dat) {
......@@ -604,11 +609,7 @@ herr_t H5DRMMF_remap() {
if (strcmp("MEMORY", H5DRMM.ssd->path)!=0) {
munmap(H5DRMM.mmap.buf, ss);
close(H5DRMM.mmap.fd);
#ifdef __APPLE__
H5DRMM.mmap.fd = open(H5DRMM.mmap.fname, O_RDWR);
#else
H5DRMM.mmap.fd = open(H5DRMM.mmap.fname, O_RDWR | O_DIRECT);
#endif
H5DRMM.mmap.buf = mmap(NULL, ss, PROT_READ, MAP_SHARED, H5DRMM.mmap.fd, 0);
msync(H5DRMM.mmap.buf, ss, MS_SYNC);
}
......@@ -653,3 +654,4 @@ void H5RPthreadWait() {
}
pthread_mutex_unlock(&H5DRMM.io.request_lock);
}
......@@ -84,7 +84,8 @@ typedef struct _DSET {
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
BATCH batch; // batch data to read
BATCH batch; // batch data to read
int ns_cached;
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
......@@ -109,8 +110,10 @@ typedef struct _H5Dread_cache_metadata {
/**************************************
* Function APIs for parallel write *
**************************************/
// Create HDF5 file: create memory mapped file on the SSD
#if __cplusplus
extern "C" {
#endif
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
// Close HDF5 file: clean up the memory mapped file
......@@ -135,6 +138,7 @@ hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
// Reading dataset (one batch), and then the I/O thread write them to the SSDs
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dread_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// Reading dataset (one batch) from the SSDs
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
......@@ -162,7 +166,9 @@ void set_hyperslab_from_samples(int *samples, int nsample, hid_t *fspace);
void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
extern H5Dread_cache_metadata H5DRMM;
extern H5Dwrite_cache_metadata H5DWMM;
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
void setH5SSD();
#if __cplusplus
}
#endif
#endif //H5Dio_cache.h
......@@ -1700,7 +1700,7 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
/* Release copy of our VOL info */
H5VL_pass_through_ext_info_free(info);
printf("VOL passthru file create\n");
int rc = pthread_create(&H5DWMM.io.pthread, NULL, H5Dwrite_pthread_func, NULL);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
......
#ifndef DEBUG_H__
#define DEBUG_H__
#if __cplusplus
#ifdef __cplusplus
extern "C" {
#endif
int debug_level();
int io_node();
#if __cplusplus
#ifdef __cplusplus
}
#endif
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment