Commit af1caee0 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

reorder

parent edeae228
/*
This is for the prototype design of using node local storage
to improve parallel I/O performance. We modify the H5Dwrite function
so that the data will write to the local SSD first and then the
background thread will take care of the data migration from
the local SSD to the file system.
We create a pthread for doing I/O work using a first-in-first-out
framework.
Huihuo Zheng <huihuo.zheng@anl.gov>
1/24/2020
*/
#include <stdlib.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include "mpi.h"
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/statvfs.h>
#include <stdlib.h>
/***********/
/* Headers */
/***********/
#include "hdf5.h"
#include "H5FDmpio.h"
#include "H5SSD.h"
/*
Global variable to define information related to the local storage
*/
SSD_CACHE_IO
H5SSD = {
.mspace_total = 137438953472,
.mspace_left = 137438953472,
.num_request = 0,//number of I/O request
.offset = 0, // current offset in the mmap file
.ppn = 1, // number of proc per node
.rank = 0, // rank id in H5F comm
.local_rank = 0, // local rank id in a node
.master_cond =PTHREAD_COND_INITIALIZER, // condition variable
.io_cond = PTHREAD_COND_INITIALIZER,
.request_lock = PTHREAD_MUTEX_INITIALIZER,
.write_lock = PTHREAD_MUTEX_INITIALIZER,
.request_list = NULL,
.current_request = NULL,
.first_request = NULL
};
hsize_t PAGESIZE = sysconf(_SC_PAGE_SIZE);
#ifdef THETA
char *MEM_BUFFER[100000000];
#endif
/*
Function for set up the local storage path and capacity.
*/
int setH5SSD() {
if (getenv("SSD_CACHE_PATH")) {
strcpy(H5SSD.path, getenv("SSD_CACHE_PATH"));
} else {
strcpy(H5SSD.path, "/local/scratch/");
}
return 0;
}
/*
Function for print out debug information about current I/O task;
*/
void check_pthread_data(thread_data_t *pt) {
printf("********************************************\n");
printf("***task id: %d\n", pt->id);
printf("***values: dset-%lld, mtype-%lld, mspace-%lld, fspace-%lld, xfer-%lld\n",
pt->dataset_id,
pt->mem_type_id,
pt->mem_space_id,
pt->file_space_id,
pt->xfer_plist_id);
printf("*** type: dset-%d, mtype-%d, mspace-%d, fspace-%d, xfer-%d\n",
H5Iget_type(pt->dataset_id),
H5Iget_type(pt->mem_type_id),
H5Iget_type(pt->mem_space_id),
H5Iget_type(pt->file_space_id),
H5Iget_type(pt->xfer_plist_id));
printf("********************************************\n");
}
/*
Obtain size of the buffer from the memory space and type;
*/
hsize_t get_buf_size(hid_t mspace, hid_t tid) {
int n= H5Sget_simple_extent_ndims(mspace);
hsize_t *dim = (hsize_t *) malloc(sizeof(hsize_t)*n);
hsize_t *mdim = (hsize_t *) malloc(sizeof(hsize_t)*n);
H5Sget_simple_extent_dims(mspace, dim, mdim);
hsize_t s = 1;
for(int i=0; i<n; i++) {
s=s*dim[i];
}
s = s*H5Tget_size(tid);
free(dim);
free(mdim);
return s;
}
/*
Test current memory buffer by print out the
first and last element of all the I/O request.
*/
void test_mmap_buf() {
thread_data_t *data = H5SSD.first_request;
while((data->next!=NULL) && (data->buf!=NULL)) {
int *p = (int*) data->buf;
printf("Test buffer: %d, %d\n", p[0], p[data->size/sizeof(int)-1]);
data = data->next;
}
}
/*
Thread function for performing H5Dwrite. This function will create
a memory mapped buffer to the file that is on the local storage which
contains the data to be written to the file system.
On Theta, the memory mapped buffer currently does not work with H5Dwrite,
we instead allocate a buffer directly to the memory.
*/
void *H5Dwrite_pthread_func(void *arg) {
pthread_mutex_lock(&H5SSD.request_lock);
while (H5SSD.num_request>=0) {
if (H5SSD.num_request >0) {
thread_data_t *data = H5SSD.current_request;
data->buf = mmap(NULL, data->size, PROT_READ, MAP_SHARED, H5SSD.fd, data->offset);
msync(data->buf, data->size, MS_SYNC);
#ifdef THETA
memcpy(MEM_BUFFER, data->buf, data->size);
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
data->xfer_plist_id, MEM_BUFFER);
#else
H5Dwrite(data->dataset_id, data->mem_type_id,
data->mem_space_id, data->file_space_id,
data->xfer_plist_id, data->buf);
#endif
munmap(data->buf, data->size);
H5SSD.current_request=H5SSD.current_request->next;
H5SSD.num_request--;
} if (H5SSD.num_request == 0) {
pthread_cond_signal(&H5SSD.master_cond);
pthread_cond_wait(&H5SSD.io_cond, &H5SSD.request_lock);
}
}
pthread_mutex_unlock(&H5SSD.request_lock);
return NULL;
}
/*
Create HDF5 file. Each rank will create a file on the local storage
for temperally store the data to be written to the file system.
We also create a local communicator including all the processes on the node.
A pthread is created for migrating data from the local storage to the
file system asynchonously.
The function return directly without waiting the I/O thread to finish
the I/O task. However, if the space left on the local storage is not
enough for storing the buffer of the current task, it will wait for the
I/O thread to finsh all the previous tasks.
*/
hid_t H5Fcreate_cache( const char *name, unsigned flags, hid_t fcpl_id, hid_t fapl_id ) {
int rc = pthread_create(&H5SSD.pthread, NULL, H5Dwrite_pthread_func, NULL);
srand(time(NULL)); // Initialization, should only be called once.
setH5SSD();
MPI_Comm comm, comm_dup;
MPI_Info info;
H5Pget_fapl_mpio(fapl_id, &comm, &info);
MPI_Comm_dup(comm, &comm_dup);
MPI_Comm_rank(comm, &H5SSD.rank);
MPI_Comm_split_type(comm_dup, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &H5SSD.comm);
MPI_Comm_rank(H5SSD.comm, &H5SSD.local_rank);
MPI_Comm_size(H5SSD.comm, &H5SSD.ppn);
strcpy(H5SSD.fname, H5SSD.path);
char rnd[255];
sprintf(rnd, "%d", rand());
strcat(H5SSD.fname, rnd);
strcat(H5SSD.fname, "-");
sprintf(rnd, "%d", H5SSD.rank);
strcat(H5SSD.fname, rnd);
H5SSD.request_list = (thread_data_t*) malloc(sizeof(thread_data_t));
pthread_mutex_lock(&H5SSD.request_lock);
H5SSD.request_list->id = 0;
H5SSD.current_request = H5SSD.request_list;
H5SSD.first_request = H5SSD.request_list;
pthread_mutex_unlock(&H5SSD.request_lock);
H5SSD.mspace_per_rank_total = H5SSD.mspace_total / H5SSD.ppn;
H5SSD.mspace_per_rank_left = H5SSD.mspace_per_rank_total;
H5SSD.fd = open(H5SSD.fname, O_RDWR | O_CREAT | O_TRUNC, 0600);
return H5Fcreate(name, flags, fcpl_id, fapl_id);
}
/*
This is the write function appears to the user.
The function arguments are the same with H5Dwrite.
This function writes the buffer to the local storage
first and It will create an I/O task and add it to the task
lists, and then wake up the I/O thread to execute
the H5Dwrite function.
*/
herr_t
H5Dwrite_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id,
hid_t file_space_id, hid_t dxpl_id, const void *buf) {
// H5Fwait();
hsize_t size = get_buf_size(mem_space_id, mem_type_id);
if (H5SSD.mspace_per_rank_left < size) {
H5Fwait();
H5SSD.offset=0;
H5SSD.mspace_per_rank_left = H5SSD.mspace_per_rank_total;
}
int err = pwrite(H5SSD.fd, (char*)buf, size, H5SSD.offset);
#ifdef __APPLE__
fcntl(H5SSD.fd, F_NOCACHE, 1);
#else
fsync(H5SSD.fd);
#endif
H5SSD.request_list->dataset_id = dataset_id;
H5SSD.request_list->mem_type_id = mem_type_id;
H5SSD.request_list->mem_space_id = mem_space_id;
H5SSD.request_list->file_space_id =file_space_id;
H5SSD.request_list->xfer_plist_id = dxpl_id;
H5SSD.request_list->size = size;
H5SSD.request_list->offset = H5SSD.offset;
H5SSD.request_list->next = (thread_data_t*) malloc(sizeof(thread_data_t));
H5SSD.request_list->next->id = H5SSD.request_list->id + 1;
thread_data_t *data = H5SSD.request_list;
H5SSD.request_list = H5SSD.request_list->next;
pthread_mutex_lock(&H5SSD.request_lock);
H5SSD.num_request++;
pthread_cond_signal(&H5SSD.io_cond);// wake up I/O thread rightawayx
pthread_mutex_unlock(&H5SSD.request_lock);
H5SSD.offset += (size/PAGESIZE+1)*PAGESIZE;
H5SSD.mspace_per_rank_left = H5SSD.mspace_per_rank_total - H5SSD.offset*H5SSD.ppn;
return err;
}
/*
Wait for the pthread to finish all the I/O request
*/
void H5Fwait() {
pthread_mutex_lock(&H5SSD.request_lock);
while(H5SSD.num_request>0) {
pthread_cond_signal(&H5SSD.io_cond);
pthread_cond_wait(&H5SSD.master_cond, &H5SSD.request_lock);
}
pthread_mutex_unlock(&H5SSD.request_lock);
}
/*
Terminate the pthread through joining
*/
void H5TerminatePthread() {
pthread_mutex_lock(&H5SSD.request_lock);
H5SSD.num_request=-1;
pthread_cond_signal(&H5SSD.io_cond);
pthread_mutex_unlock(&H5SSD.request_lock);
pthread_join(H5SSD.pthread, NULL);
}
/*
Wait for the pthread to finish the work and close the file
and terminate the pthread, remove the files on the SSD.
*/
herr_t H5Fclose_cache( hid_t file_id ) {
H5Fwait();
H5TerminatePthread();
close(H5SSD.fd);
remove(H5SSD.fname);
H5SSD.mspace_left = H5SSD.mspace_total;
return H5Fclose(file_id);
}
/*
Wait for pthread to finish the work and close the property
*/
herr_t H5Pclose_cache(hid_t dxf_id) {
H5Fwait();
return H5Pclose(dxf_id);
}
/*
Wait for pthread to finish the work and close the dataset
*/
herr_t H5Dclose_cache(hid_t dset_id) {
H5Fwait();
return H5Dclose(dset_id);
}
/*
Wait for the pthread to finish the work and close the memory space
*/
herr_t H5Sclose_cache(hid_t filespace) {
H5Fwait();
return H5Sclose(filespace);
}
#ifndef H5FSSD_H_
#define H5FSSD_H_
#include "hdf5.h"
hsize_t get_buf_size(hid_t mspace, hid_t tid);
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
herr_t H5Fclose_cache( hid_t file_id );
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
herr_t H5Dclose_cache( hid_t id);
herr_t H5Pclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
void H5Fwait();
void test_mmap_buf();
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset;
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
void check_pthread_data(thread_data_t *pt);
typedef struct _SSD_CACHE_IO {
int fd;
char fname[255];
char path[255];
double mspace_total;
double mspace_left;
double mspace_per_rank_left;
double mspace_per_rank_total;
int num_request;
hsize_t offset;
int ppn;
int rank;
int local_rank;
void *mmap_ptr;
MPI_Comm comm;
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock, write_lock;
pthread_t pthread;
thread_data_t *request_list, *current_request, *first_request;
} SSD_CACHE_IO;
#endif //H5SSD_H_
#Makefile
#!/bin/sh
include make.inc
all: parallel_hdf5
parallel_hdf5: H5SSD.o parallel_hdf5.o
$(CXX) $(CFLAGS) -I./ -o parallel_hdf5.x parallel_hdf5.o H5SSD.o $(LIBS)
parallel_hdf5.o : parallel_hdf5.cpp
$(CXX) $(CFLAGS) -o parallel_hdf5.o -c parallel_hdf5.cpp
H5SSD.o : H5SSD.c
$(CXX) $(CFLAGS) -o H5SSD.o -c H5SSD.c
clean:
rm -rf *.x *.h5 *.hdf5 *.o
# Incorparating the local storage in HDF5
Huihuo Zheng @ Argonne Leadership Computing Facility
This folder contains the source code for system-aware
optimization of HDF5 custom collective VFD.
* H5SSD.c, H5SSD.h - files for HDF5 functions incorparating
local storage.
* parallel_hdf5.cpp, run.py -- benchmark scripts for
performance evaluation.
* Makefile - makefile. make.inc need to be created
indicating information about the compiler, libraries.
#Makefile
#!/bin/sh
# Makefile inclusion for Cooley
# Huihuo Zheng @ ALCF Theta
# ------------------------------
HDF5_ROOT=/soft/libraries/hdf5-1.10.5-parallel/
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
LIBS=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
LIBS=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=cc
CXX=CC -dynamic
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
CFLAGS+= -DTHETA
LIBS=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "stdio.h"
#include <sys/time.h>
#include <string.h>
#include "timing.h"
#include "H5SSD.h"
#include <stdlib.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include "mpi.h"
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/statvfs.h>
#include <stdlib.h>
#include "stat.h"
int msleep(long miliseconds)
{
struct timespec req, rem;
if(miliseconds > 999)
{
req.tv_sec = (int)(miliseconds / 1000); /* Must be Non-Negative */
req.tv_nsec = (miliseconds - ((long)req.tv_sec * 1000)) * 1000000; /* Must be in range of 0 to 999999999 */
}
else
{
req.tv_sec = 0; /* Must be Non-Negative */
req.tv_nsec = miliseconds * 1000000; /* Must be in range of 0 to 999999999 */
}
return nanosleep(&req , &rem);
}
int main(int argc, char **argv) {
char ssd_cache [255] = "no";
if (getenv("SSD_CACHE")) {
strcpy(ssd_cache, getenv("SSD_CACHE"));
}
bool cache = false;
if (strcmp(ssd_cache, "yes")==0) {
cache=true;
}
Timing tt;
// Assuming that the dataset is a two dimensional array of 8x5 dimension;
int d1 = 2048;
int d2 = 2048;
int niter = 10;
char scratch[255] = "/tmp/";
double sleep=0.0;
for(int i=1; i<argc; i++) {
if (strcmp(argv[i], "--dim")==0) {
d1 = int(atoi(argv[i+1]));
d2 = int(atoi(argv[i+2]));
i+=2;
} else if (strcmp(argv[i], "--niter")==0) {
niter = int(atoi(argv[i+1]));
i+=1;
} else if (strcmp(argv[i], "--scratch")==0) {
strcpy(scratch, argv[i+1]);
i+=1;
} else if (strcmp(argv[i],"--sleep")==0) {
sleep = atof(argv[i+1]);
i+=1;
}
}
hsize_t ldims[2] = {d1, d2};
hsize_t oned = d1*d2;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Info info = MPI_INFO_NULL;
int rank, nproc;
MPI_Init(&argc, &argv);
MPI_Comm_size(comm, &nproc);
MPI_Comm_rank(comm, &rank);
//printf(" MPI: I am rank %d of %d \n", rank, nproc);
// find local array dimension and offset;
hsize_t gdims[2] = {d1*nproc, d2};
if (rank==0) {
printf("=============================================\n");
printf(" Buf dim: %llu x %llu\n", ldims[0], ldims[1]);
printf("Buf size: %f MB\n", float(d1*d2)/1024/1024*sizeof(int));
printf(" Scratch: %s\n", scratch);
printf(" nproc: %d\n", nproc);
printf("=============================================\n");
if (cache) printf("** using SSD as a cache **\n");
}
hsize_t offset[2] = {0, 0};
// setup file access property list for mpio
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, comm, info);
char f[255];
strcpy(f, scratch);
strcat(f, "/parallel_file.h5");
tt.start_clock("H5Fcreate");
hid_t file_id;
if (cache)
file_id = H5Fcreate_cache(f, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
else
file_id = H5Fcreate(f, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
tt.stop_clock("H5Fcreate");
// create memory space
hid_t memspace = H5Screate_simple(2, ldims, NULL);
// define local data
int* data = new int[ldims[0]*ldims[1]];
// set up dataset access property list
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
// define local memory space
// create file space and dataset
hsize_t ggdims[2] = {gdims[0]*niter, gdims[1]};
hid_t filespace = H5Screate_simple(2, ggdims, NULL);
hid_t dt = H5Tcopy(H5T_NATIVE_INT);
tt.start_clock("H5Dcreate");
hid_t dset_id = H5Dcreate(file_id, "dset", dt, filespace, H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
tt.stop_clock("H5Dcreate");
hsize_t size;
size = get_buf_size(memspace, dt);
if (rank==0)
printf(" mspace size: %5.5f MB | sizeof (element) %lu\n", float(size)/1024/1024, H5Tget_size(H5T_NATIVE_INT));
size = get_buf_size(filespace, dt);
if (rank==0)
printf(" fspace size: %5.5f MB \n", float(size)/1024/1024);
hsize_t count[2] = {1, 1};
tt.start_clock("Init_array");
for(int j=0; j<ldims[0]*ldims[1]; j++)
data[j] = 0;
tt.stop_clock("Init_array");
for (int i=0; i<niter; i++) {
offset[0]= i*gdims[0] + rank*ldims[0];
// select hyperslab
H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, ldims, count);
tt.start_clock("H5Dwrite");
if (cache)
hid_t status = H5Dwrite_cache(dset_id, H5T_NATIVE_INT, memspace, filespace, dxf_id, data); // write memory to file
else {
hid_t status = H5Dwrite(dset_id, H5T_NATIVE_INT, memspace, filespace, dxf_id, data); // write memory to file
H5Fflush(file_id, H5F_SCOPE_LOCAL);
}
tt.stop_clock("H5Dwrite");
tt.start_clock("compute");
msleep(int(sleep*1000));
tt.stop_clock("compute");