Commit b92afb18 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

test

parent b8a5cad6
......@@ -61,6 +61,9 @@
#include <sys/statvfs.h>
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
#ifndef SUCCEED
#define SUCCEED 0
#endif
extern SSD_INFO SSD;
/**********/
/* Macros */
......@@ -1238,6 +1241,7 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
msync(data->buf, data->size, MS_SYNC);
H5VL_pass_through_ext_t *o = (H5VL_pass_through_ext_t *)data->dataset_obj;
void **req;
if (wmm->mpi.rank==0) printf("pthread: writing dataset %d to filesystem ... !\n", data->id);
#ifdef THETA
wmm->mmap.tmp_buf = malloc(data->size);
memcpy(wmm->mmap.tmp_buf, data->buf, data->size);
......@@ -1252,7 +1256,7 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
data->file_space_id, data->xfer_plist_id,
data->buf, req);
#endif
if (wmm->mpi.rank==0) printf("dataset %d written to filesystem, done!\n", data->id);
if (wmm->mpi.rank==0) printf("pthread: dataset %d written to filesystem, done!\n", data->id);
munmap(data->buf, data->size);
H5Sclose(data->mem_space_id);
H5Sclose(data->file_space_id);
......@@ -1262,7 +1266,8 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
wmm->io.current_request=wmm->io.current_request->next;
wmm->io.num_request--;
o->num_request_dataset--;
pthread_mutex_unlock(&wmm->io.request_lock);
pthread_mutex_unlock(&wmm->io.request_lock);
}
pthread_mutex_lock(&wmm->io.request_lock);
loop = (wmm->io.num_request>=0);
......@@ -1321,6 +1326,7 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
o->H5DWMM->ssd->mspace_left = o->H5DWMM->ssd->mspace_total;
}
int err = pwrite(o->H5DWMM->mmap.fd, (char*)buf, size, o->H5DWMM->ssd->offset);
o->H5DWMM->io.request_list->offset = o->H5DWMM->ssd->offset;
o->H5DWMM->ssd->offset += (size/PAGESIZE+1)*PAGESIZE;
o->H5DWMM->ssd->mspace_per_rank_left = o->H5DWMM->ssd->mspace_per_rank_total
......@@ -1345,12 +1351,13 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
o->num_request_dataset++;
pthread_cond_signal(&o->H5DWMM->io.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
ret_value=SUCCEED;
} else {
ret_value = H5VLdataset_write(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, file_space_id, plist_id, buf, req);
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
}
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
return ret_value;
} /* end H5VL_pass_through_ext_dataset_write() */
......@@ -1484,7 +1491,7 @@ H5VL_pass_through_ext_dataset_close(void *dset, hid_t dxpl_id, void **req)
}
ret_value = H5VLdataset_close(o->under_object, o->under_vol_id, dxpl_id, req);
if (o->H5DWMM->mpi.rank==0) printf("dataset_closed\n");
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
......@@ -1754,46 +1761,48 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
else
file = NULL;
srand(time(NULL)); // Initialization, should only be called once.
file->H5DWMM = (H5Dwrite_cache_metadata*) malloc(sizeof(H5Dwrite_cache_metadata));
file->H5DWMM->io.num_request = 0;
pthread_cond_init(&file->H5DWMM->io.io_cond, NULL);
pthread_cond_init(&file->H5DWMM->io.master_cond, NULL);
pthread_mutex_init(&file->H5DWMM->io.request_lock, NULL);
setH5SSD(&SSD);
file->H5DWMM->ssd = &SSD;
MPI_Comm comm, comm_dup;
MPI_Info mpi_info;
H5Pget_fapl_mpio(fapl_id, &comm, &mpi_info);
MPI_Comm_dup(comm, &file->H5DWMM->mpi.comm);
MPI_Comm_rank(comm, &file->H5DWMM->mpi.rank);
MPI_Comm_size(comm, &file->H5DWMM->mpi.nproc);
MPI_Comm_split_type(file->H5DWMM->mpi.comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &file->H5DWMM->mpi.node_comm);
MPI_Comm_rank(file->H5DWMM->mpi.node_comm, &file->H5DWMM->mpi.local_rank);
MPI_Comm_size(file->H5DWMM->mpi.node_comm, &file->H5DWMM->mpi.ppn);
strcpy(file->H5DWMM->mmap.fname, file->H5DWMM->ssd->path);
strcpy(file->H5DWMM->mmap.fname, name);
strcat(file->H5DWMM->mmap.fname, "-");
char rnd[255];
sprintf(rnd, "%d", rand());
strcat(file->H5DWMM->mmap.fname, rnd);
strcat(file->H5DWMM->mmap.fname, "-");
sprintf(rnd, "%d", file->H5DWMM->mpi.rank);
strcat(file->H5DWMM->mmap.fname, rnd);
file->H5DWMM->io.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
file->H5DWMM->ssd->mspace_per_rank_total = file->H5DWMM->ssd->mspace_total / file->H5DWMM->mpi.ppn;
file->H5DWMM->ssd->mspace_per_rank_left = file->H5DWMM->ssd->mspace_per_rank_total;
file->H5DWMM->mmap.fd = open(file->H5DWMM->mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
file->write_cache = true;
int rc = pthread_create(&file->H5DWMM->io.pthread, NULL, H5Dwrite_pthread_func_vol, file->H5DWMM);
pthread_mutex_lock(&file->H5DWMM->io.request_lock);
file->H5DWMM->io.request_list->id = 0;
file->H5DWMM->io.current_request = file->H5DWMM->io.request_list;
file->H5DWMM->io.first_request = file->H5DWMM->io.request_list;
pthread_mutex_unlock(&file->H5DWMM->io.request_lock);
if (strcmp(getenv("SSD_CACHE"), "yes")==0)
file->write_cache=true;
if (file->write_cache) {
srand(time(NULL)); // Initialization, should only be called once.
file->H5DWMM = (H5Dwrite_cache_metadata*) malloc(sizeof(H5Dwrite_cache_metadata));
file->H5DWMM->io.num_request = 0;
pthread_cond_init(&file->H5DWMM->io.io_cond, NULL);
pthread_cond_init(&file->H5DWMM->io.master_cond, NULL);
pthread_mutex_init(&file->H5DWMM->io.request_lock, NULL);
setH5SSD(&SSD);
file->H5DWMM->ssd = &SSD;
MPI_Comm comm, comm_dup;
MPI_Info mpi_info;
H5Pget_fapl_mpio(fapl_id, &comm, &mpi_info);
MPI_Comm_dup(comm, &file->H5DWMM->mpi.comm);
MPI_Comm_rank(comm, &file->H5DWMM->mpi.rank);
MPI_Comm_size(comm, &file->H5DWMM->mpi.nproc);
MPI_Comm_split_type(file->H5DWMM->mpi.comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &file->H5DWMM->mpi.node_comm);
MPI_Comm_rank(file->H5DWMM->mpi.node_comm, &file->H5DWMM->mpi.local_rank);
MPI_Comm_size(file->H5DWMM->mpi.node_comm, &file->H5DWMM->mpi.ppn);
strcpy(file->H5DWMM->mmap.fname, file->H5DWMM->ssd->path);
strcpy(file->H5DWMM->mmap.fname, name);
strcat(file->H5DWMM->mmap.fname, "-");
char rnd[255];
sprintf(rnd, "%d", rand());
strcat(file->H5DWMM->mmap.fname, rnd);
strcat(file->H5DWMM->mmap.fname, "-");
sprintf(rnd, "%d", file->H5DWMM->mpi.rank);
strcat(file->H5DWMM->mmap.fname, rnd);
file->H5DWMM->io.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
file->H5DWMM->ssd->mspace_per_rank_total = file->H5DWMM->ssd->mspace_total / file->H5DWMM->mpi.ppn;
file->H5DWMM->ssd->mspace_per_rank_left = file->H5DWMM->ssd->mspace_per_rank_total;
file->H5DWMM->mmap.fd = open(file->H5DWMM->mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
file->write_cache = true;
int rc = pthread_create(&file->H5DWMM->io.pthread, NULL, H5Dwrite_pthread_func_vol, file->H5DWMM);
pthread_mutex_lock(&file->H5DWMM->io.request_lock);
file->H5DWMM->io.request_list->id = 0;
file->H5DWMM->io.current_request = file->H5DWMM->io.request_list;
file->H5DWMM->io.first_request = file->H5DWMM->io.request_list;
pthread_mutex_unlock(&file->H5DWMM->io.request_lock);
}
/* Close underlying FAPL */
H5Pclose(under_fapl_id);
......@@ -2106,7 +2115,7 @@ H5VL_pass_through_ext_file_close(void *file, hid_t dxpl_id, void **req)
}
ret_value = H5VLfile_close(o->under_object, o->under_vol_id, dxpl_id, req);
if (o->H5DWMM->mpi.rank==0) printf("file_closed\n");
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
......
CC=mpicc
#Your HDF5 install path
#Makefile
#!/bin/sh
CC=mpicc
CXX=mpicxx
HDF5_DIR=$(HDF5_ROOT)
INCLUDES=-I$(MPI_DIR)/include -I$(HDF5_DIR)/include -I../utils/
INCLUDES=-I$(HDF5_DIR)/include -I../utils/
LIBS=-L$(HDF5_DIR)/lib -lhdf5 -lz
#DEBUG=-DENABLE_EXT_PASSTHRU_LOGGING
CFLAGS=$(INCLUDES) $(LIBS)
CFLAGS=$(INCLUDES)
ifneq ("$(HOSTNAME)","zion")
TARGET=libh5passthrough_vol.dylib
else
TARGET=libh5passthrough_vol.so
CXX=mpicxx
endif
all: makeso test_write_cache
%.o : %.cpp
$(CXX) $(CFLAGS) -o $@ -c $<
%.o : %.c
$(CC) $(CFLAGS) -o $@ -c $<
test_write_cache: test_write_cache.o ../utils/debug.o H5Dio_cache.o
$(CXX) $(CFLAGS) -o $@ test_write_cache.o ../utils/debug.o H5Dio_cache.o $(HDF5_LIB)
$(CXX) $(CFLAGS) -o $@ test_write_cache.o ../utils/debug.o H5Dio_cache.o $(LIBS)
makeso:
$(CC) -shared $(CFLAGS) $(DEBUG) -o $(TARGET) -fPIC H5VLpassthru_ext.c H5Dio_cache.c ../utils/debug.c
makeso: H5VLpassthru_ext.o H5Dio_cache.o ../utils/debug.o
$(CC) -shared $(CFLAGS) $(DEBUG) -o $(TARGET) -fPIC H5VLpassthru_ext.o H5Dio_cache.o ../utils/debug.o $(LIBS)
mv $(TARGET) $(HDF5_DIR)/../vol
clean:
rm -f $(TARGET) *.o
rm -f $(TARGET) *.o parallel_file.h5*
......@@ -52,7 +52,7 @@ int main(int argc, char **argv) {
int d1 = 2048;
int d2 = 2048;
int niter = 10;
char scratch[255] = "/tmp/";
char scratch[255] = "./";
double sleep=0.0;
for(int i=1; i<argc; i++) {
if (strcmp(argv[i], "--dim")==0) {
......@@ -112,10 +112,8 @@ int main(int argc, char **argv) {
int* data = new int[ldims[0]*ldims[1]];
// set up dataset access property list
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_INDEPENDENT);
// define local memory space
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
// create file space and dataset
hsize_t ggdims[2] = {gdims[0]*niter, gdims[1]};
hid_t filespace = H5Screate_simple(2, ggdims, NULL);
......@@ -142,11 +140,12 @@ int main(int argc, char **argv) {
offset[0]= i*gdims[0] + rank*ldims[0];
// select hyperslab
hid_t filespace = H5Screate_simple(2, ggdims, NULL);
hid_t memspace = H5Screate_simple(2, ldims, NULL);
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, ldims, count);
tt.start_clock("H5Dwrite");
hid_t status = H5Dwrite(dset_id, H5T_NATIVE_INT, memspace, filespace, dxf_id, data); // write memory to file
// H5Fflush(file_id, H5F_SCOPE_LOCAL);
H5Sclose(filespace);
H5Sclose(memspace);
tt.stop_clock("H5Dwrite");
tt.start_clock("compute");
msleep(int(sleep*1000));
......@@ -157,11 +156,7 @@ int main(int argc, char **argv) {
H5Fclose(file_id);
H5Pclose(dxf_id);
H5Pclose(plist_id);
H5Sclose(memspace);
//H5Sclose(memspace);
tt.stop_clock("H5close");
bool master = (rank==0);
delete [] data;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment