Commit 26798857 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

SSD

parent a6fe5a6f
......@@ -115,9 +115,6 @@ void *H5Dwrite_pthread_func(void *arg) {
bool done = (wmm->io.num_request==0);
pthread_mutex_unlock(&wmm->io.request_lock);
while (loop) {
pthread_mutex_lock(&wmm->io.request_lock);
pthread_mutex_unlock(&wmm->io.request_lock);
if (working) {
thread_data_t *data = wmm->io.current_request;
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, wmm->mmap.fd, data->offset);
......
......@@ -59,7 +59,8 @@
#include <unistd.h>
#include <stdio.h>
#include <sys/statvfs.h>
//debug
#include "debug.h"
#define PAGESIZE sysconf(_SC_PAGE_SIZE)
#ifndef SUCCEED
#define SUCCEED 0
......@@ -1231,71 +1232,74 @@ void *H5Dwrite_pthread_func_vol(void *arg) {
H5Dwrite_cache_metadata *wmm = (H5Dwrite_cache_metadata*) arg;
pthread_mutex_lock(&wmm->io.request_lock);
bool loop = (wmm->io.num_request>=0);
bool working = (wmm->io.num_request>0);
bool done = (wmm->io.num_request==0);
bool empty = (wmm->io.num_request==0);
pthread_mutex_unlock(&wmm->io.request_lock);
while (loop) {
if (working) {
thread_data_t *data = wmm->io.current_request;
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, wmm->mmap.fd, data->offset);
msync(data->buf, data->size, MS_SYNC);
H5VL_pass_through_ext_t *o = (H5VL_pass_through_ext_t *)data->dataset_obj;
hbool_t acq=false;
while(acq==false)
H5TSmutex_acquire(&acq);
H5VLrestore_lib_state(data->h5_state);
sleep(1);
if (!empty) {
thread_data_t *task = wmm->io.current_request;
msync(task->buf, task->size, MS_SYNC);
if (wmm->mpi.rank== io_node() && debug_level()>1) {
printf("\n===================================\n");
printf("pthread: Executing I/O task %d\n", task->id);
}
task->buf = mmap(NULL, task->size, PROT_READ, MAP_SHARED, wmm->mmap.fd, task->offset);
H5VL_pass_through_ext_t *o = (H5VL_pass_through_ext_t *)task->dataset_obj;
hbool_t acquired=false;
while(!acquired)
H5TSmutex_acquire(&acquired);
if (wmm->mpi.rank== io_node() && debug_level()>1) printf("pthread: acquired global mutex\n");
H5VLrestore_lib_state(task->h5_state);
void **req;
if (wmm->mpi.rank==0) printf("pthread: writing dataset %d to filesystem ... !\n", data->id);
#ifdef THETA
wmm->mmap.tmp_buf = malloc(data->size);
memcpy(wmm->mmap.tmp_buf, data->buf, data->size);
wmm->mmap.tmp_buf = malloc(task->size);
memcpy(wmm->mmap.tmp_buf, task->buf, task->size);
H5VLdataset_write(o->under_object, o->under_vol_id,
data->mem_type_id, data->mem_space_id,
data->file_space_id, data->xfer_plist_id,
task->mem_type_id, task->mem_space_id,
task->file_space_id, task->xfer_plist_id,
wmm->mmap.tmp_buf, req);
free(wmm->mmap.tmp_buf);
#else
H5VLdataset_write(o->under_object, o->under_vol_id,
data->mem_type_id, data->mem_space_id,
data->file_space_id, data->xfer_plist_id,
data->buf, req);
#endif
task->mem_type_id, task->mem_space_id,
task->file_space_id, task->xfer_plist_id,
task->buf, req);
#endif
if (wmm->mpi.rank==io_node() && debug_level()>1) printf("pthread: I/O task %d is done!\n", task->id);
munmap(task->buf, task->size);
H5Sclose(task->mem_space_id);
H5Sclose(task->file_space_id);
H5Pclose(task->xfer_plist_id);
H5Tclose(task->mem_type_id);
H5VLreset_lib_state();
H5VLfree_lib_state(data->h5_state);
if (wmm->mpi.rank==0) printf("pthread: dataset %d written to filesystem, done!\n", data->id);
munmap(data->buf, data->size);
H5Sclose(data->mem_space_id);
H5Sclose(data->file_space_id);
H5Pclose(data->xfer_plist_id);
H5Tclose(data->mem_type_id);
H5VLfree_lib_state(task->h5_state);
H5TSmutex_release();
pthread_mutex_lock(&wmm->io.request_lock);
if (wmm->mpi.rank==io_node() && debug_level()>1) printf("pthread: global mutex_released\n");
if (wmm->mpi.rank== io_node() && debug_level()>1) {
printf("===================================\n");
}
pthread_mutex_lock(&wmm->io.request_lock);
wmm->io.current_request=wmm->io.current_request->next;
wmm->io.num_request--;
o->num_request_dataset--;
pthread_mutex_unlock(&wmm->io.request_lock);
}
pthread_mutex_lock(&wmm->io.request_lock);
loop = (wmm->io.num_request>=0);
working = (wmm->io.num_request>0);
done = (wmm->io.num_request==0);
empty = (wmm->io.num_request==0);
pthread_mutex_unlock(&wmm->io.request_lock);
if (done) {
if (empty) {
pthread_mutex_lock(&wmm->io.request_lock);
pthread_cond_signal(&wmm->io.master_cond);
pthread_cond_wait(&wmm->io.io_cond, &wmm->io.request_lock);
pthread_mutex_unlock(&wmm->io.request_lock);
}
pthread_mutex_lock(&wmm->io.request_lock);
loop = (wmm->io.num_request>=0);
working = (wmm->io.num_request>0);
done = (wmm->io.num_request==0);
empty = (wmm->io.num_request==0);
pthread_mutex_unlock(&wmm->io.request_lock);
}
if (wmm->mpi.rank==0) printf("Jump out of the loop\n");
return NULL;
}
......@@ -1322,7 +1326,6 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
if (o->write_cache) {
hsize_t size = get_buf_size(mem_space_id, mem_type_id);
if (o->H5DWMM->ssd->mspace_per_rank_left < size) {
printf("Wait to finish? \n");
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->H5DWMM->io.num_request>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
......@@ -1345,7 +1348,9 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
#else
fsync(o->H5DWMM->mmap.fd);
#endif
o->H5DWMM->io.request_list->dataset_obj = dset;
o->H5DWMM->io.request_list->dataset_obj = dset;
// retrieve current library state;
H5VLretrieve_lib_state(&o->H5DWMM->io.request_list->h5_state);
o->H5DWMM->io.request_list->mem_type_id = H5Tcopy(mem_type_id);
o->H5DWMM->io.request_list->mem_space_id = H5Scopy(mem_space_id);
......@@ -1353,14 +1358,16 @@ H5VL_pass_through_ext_dataset_write(void *dset, hid_t mem_type_id, hid_t mem_spa
o->H5DWMM->io.request_list->xfer_plist_id = H5Pcopy(plist_id);
o->H5DWMM->io.request_list->size = size;
o->H5DWMM->io.request_list->next = (thread_data_t*) malloc(sizeof(thread_data_t));
if (o->H5DWMM->mpi.rank==0) printf("added task %d to the list;\n", o->H5DWMM->io.request_list->id);
if (o->H5DWMM->mpi.rank==io_node() && debug_level()>1) printf("added task %d to the list;\n", o->H5DWMM->io.request_list->id);
o->H5DWMM->io.request_list->next->id = o->H5DWMM->io.request_list->id + 1;
o->H5DWMM->io.request_list = o->H5DWMM->io.request_list->next;
// waken up the Background thread
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
o->H5DWMM->io.num_request++;
o->num_request_dataset++;
pthread_cond_signal(&o->H5DWMM->io.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
//
ret_value=SUCCEED;
} else {
ret_value = H5VLdataset_write(o->under_object, o->under_vol_id, mem_type_id, mem_space_id, file_space_id, plist_id, buf, req);
......@@ -1489,19 +1496,19 @@ H5VL_pass_through_ext_dataset_close(void *dset, hid_t dxpl_id, void **req)
#ifdef ENABLE_EXT_PASSTHRU_LOGGING
printf("------- EXT PASS THROUGH VOL DATASET Close\n");
#endif
H5TSmutex_release();
if (o->write_cache) {
if (o->H5DWMM->mpi.rank==0) printf("check dataset_close\n");
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
while(o->num_request_dataset>0) {
while(o->num_request_dataset>0) {
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_cond_wait(&o->H5DWMM->io.master_cond, &o->H5DWMM->io.request_lock);
}
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
}
hbool_t acq;
H5TSmutex_acquire(&acq);
ret_value = H5VLdataset_close(o->under_object, o->under_vol_id, dxpl_id, req);
/* Check for async request */
if(req && *req)
*req = H5VL_pass_through_ext_new_obj(*req, o->under_vol_id);
......@@ -1744,7 +1751,6 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
#ifdef ENABLE_EXT_PASSTHRU_LOGGING
printf("------- EXT PASS THROUGH VOL FILE Create\n");
#endif
printf("fapl_id: %lld\n", fapl_id);
/* Get copy of our VOL info from FAPL */
H5Pget_vol_info(fapl_id, (void **)&info);
......@@ -1770,9 +1776,11 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
} /* end if */
else
file = NULL;
if (strcmp(getenv("SSD_CACHE"), "yes")==0)
file->write_cache=true;
file->write_cache = true;
if (getenv("SSD_CACHE"))
if (strcmp(getenv("SSD_CACHE"), "no")==0)
file->write_cache=false;
if (file->write_cache) {
srand(time(NULL)); // Initialization, should only be called once.
file->H5DWMM = (H5Dwrite_cache_metadata*) malloc(sizeof(H5Dwrite_cache_metadata));
......@@ -1780,7 +1788,6 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
pthread_cond_init(&file->H5DWMM->io.io_cond, NULL);
pthread_cond_init(&file->H5DWMM->io.master_cond, NULL);
pthread_mutex_init(&file->H5DWMM->io.request_lock, NULL);
setH5SSD(&SSD);
file->H5DWMM->ssd = &SSD;
MPI_Comm comm, comm_dup;
......@@ -1804,14 +1811,18 @@ H5VL_pass_through_ext_file_create(const char *name, unsigned flags, hid_t fcpl_i
file->H5DWMM->io.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
file->H5DWMM->ssd->mspace_per_rank_total = file->H5DWMM->ssd->mspace_total / file->H5DWMM->mpi.ppn;
file->H5DWMM->ssd->mspace_per_rank_left = file->H5DWMM->ssd->mspace_per_rank_total;
if (debug_level()>1 && io_node()==file->H5DWMM->mpi.rank==0) {
printf("**Using node local storage as a cache\n");
printf("**path: %s\n", file->H5DWMM->ssd->path);
}
file->H5DWMM->mmap.fd = open(file->H5DWMM->mmap.fname, O_RDWR | O_CREAT | O_TRUNC, 0644);
file->write_cache = true;
int rc = pthread_create(&file->H5DWMM->io.pthread, NULL, H5Dwrite_pthread_func_vol, file->H5DWMM);
pthread_mutex_lock(&file->H5DWMM->io.request_lock);
file->H5DWMM->io.request_list->id = 0;
file->H5DWMM->io.current_request = file->H5DWMM->io.request_list;
file->H5DWMM->io.first_request = file->H5DWMM->io.request_list;
pthread_mutex_unlock(&file->H5DWMM->io.request_lock);
}
/* Close underlying FAPL */
H5Pclose(under_fapl_id);
......@@ -2098,27 +2109,21 @@ H5VL_pass_through_ext_file_close(void *file, hid_t dxpl_id, void **req)
printf("------- EXT PASS THROUGH VOL FILE Close\n");
#endif
if (o->write_cache) {
if (o->H5DWMM->mpi.rank==0) printf("file_close\n");
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
bool loop = (o->H5DWMM->io.num_request>0);
bool empty = (o->H5DWMM->io.num_request>0);
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
if (o->H5DWMM->mpi.rank==0) printf("o->H5DWMM->io.num_request: %d\n", o->H5DWMM->io.num_request);
while(loop) {
printf("upload\n");
while(!empty) {
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_cond_wait(&o->H5DWMM->io.master_cond, &o->H5DWMM->io.request_lock);
loop = (o->H5DWMM->io.num_request==0);
empty = (o->H5DWMM->io.num_request==0);
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
}
pthread_mutex_lock(&o->H5DWMM->io.request_lock);
o->H5DWMM->io.num_request=-1;
pthread_cond_signal(&o->H5DWMM->io.io_cond);
pthread_mutex_unlock(&o->H5DWMM->io.request_lock);
if (o->H5DWMM->mpi.rank==0) printf("o->H5DWMM->io.num_request: %d\n", o->H5DWMM->io.num_request);
if (o->H5DWMM->mpi.rank==0) printf("joining\n");
pthread_join(o->H5DWMM->io.pthread, NULL);
if (o->H5DWMM->mpi.rank==0) printf("joined\n");
close(o->H5DWMM->mmap.fd);
remove(o->H5DWMM->mmap.fname);
o->H5DWMM->ssd->mspace_left = o->H5DWMM->ssd->mspace_total;
......
#Makefile
#!/bin/sh
CC=mpicc
CXX=mpicxx
......@@ -11,7 +10,7 @@ INCLUDES=-I$(HDF5_DIR)/include -I../utils/
LIBS=-L$(HDF5_DIR)/lib -lhdf5 -lz
#DEBUG=-DENABLE_EXT_PASSTHRU_LOGGING
CFLAGS=$(INCLUDES)
CFLAGS=$(INCLUDES) $(DEBUG)
##TARGET=libh5passthrough_vol.dylib
TARGET=libh5passthrough_vol.dylib
......
......@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
if (rank==0) cout << "provided: " << provided << endl;
if (rank==0) cout << "MPI_Init_thread provided: " << provided << endl;
Timing tt(rank==io_node());
tt.start_clock("total");
//printf(" MPI: I am rank %d of %d \n", rank, nproc);
......@@ -144,8 +144,6 @@ int main(int argc, char **argv) {
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, ldims, count);
tt.start_clock("H5Dwrite");
hid_t status = H5Dwrite(dset_id, H5T_NATIVE_INT, memspace, filespace, dxf_id, data); // write memory to file
H5Sclose(filespace);
H5Sclose(memspace);
tt.stop_clock("H5Dwrite");
tt.start_clock("compute");
msleep(int(sleep*1000));
......@@ -156,7 +154,7 @@ int main(int argc, char **argv) {
H5Fclose(file_id);
H5Pclose(dxf_id);
H5Pclose(plist_id);
//H5Sclose(memspace);
H5Sclose(memspace);
tt.stop_clock("H5close");
bool master = (rank==0);
delete [] data;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment