Commit 754d29c6 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

fixed some issues

parent ace31cbd
Subproject commit ec76e94e062b5b38a1bfe1b1d6ac955dfa3185bc
Subproject commit 78bb427be57f97f0de6d7bdd3bb78bdb109e96e0
This diff is collapsed.
/*
This is the header files for node local storage incorporated HDF5
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform parallel write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
void *dataset_obj;
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
void *h5_state;
int id;
hid_t offset; // offset in memory mapped file on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
// SSD related meta data
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
double mspace_per_rank_total;
double mspace_per_rank_left;
char path[255];
hsize_t offset;
} SSD_INFO;
// MPI infos
typedef struct _MPI_INFO {
int rank;
int nproc;
int local_rank; // rank id in the local communicator
int ppn; // number or processors in the
MPI_Comm comm; // global communicator
MPI_Comm node_comm; // node local communicator
MPI_Win win;
} MPI_INFO;
// I/O threads
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
int num_request; // for parallel write
thread_data_t *request_list, *current_request, *first_request; // task queue
bool batch_cached; // for parallel read, -- whether the batch data is cached to SSD or not
bool dset_cached; // whether the entire dataset is cached to SSD or not.
pthread_t pthread;
hsize_t offset_current;
int round;
} IO_THREAD;
// Memory mapped files
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf; // temporally buffer, used for parallel read: copy the read buffer, return the H5Dread_to_cache function, the back ground thread write the data to the SSD.
} MMAP;
// Dataset
typedef struct _SAMPLE {
size_t dim; // the number of dimension
size_t size; // the size of the sample in bytes.
size_t nel; // the number of elements per sample,
} SAMPLE;
typedef struct _BATCH {
int *list;
int size;
} BATCH;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc; // number of samples per rank
size_t ns_glob; // total number of samples
size_t s_offset; // offset
hsize_t size; // the size of the entire dataset in bytes.
BATCH batch; // batch data to read
int ns_cached;
bool contig_read; // whether the batch of data to read is contigues or not.
MPI_Datatype mpi_datatype; // the constructed mpi dataset
hid_t h5_datatype; // hdf5 dataset
size_t esize; // the size of an element in bytes.
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
} H5Dwrite_cache_metadata;
typedef struct _H5Dread_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
DSET dset;
SSD_INFO *ssd;
} H5Dread_cache_metadata;
/**************************************
* Function APIs for parallel write *
**************************************/
// Create HDF5 file: create memory mapped file on the SSD
#ifdef __cplusplus
extern "C" {
#endif
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
// Close HDF5 file: clean up the memory mapped file
herr_t H5Fclose_cache( hid_t file_id );
// The main program write the dataset, and the I/O thread perform the data migration
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
// close the dataset, property list, etc: check whether all the tasks have been finished or not.
herr_t H5Dclose_cache( hid_t id);
/****************************************
* Function APIs for Parallel read *
****************************************/
//
hid_t H5Fopen_cache(const char *name, hid_t fcpl_id, hid_t fapl_id);
// Open the dataset, create memory mapped file
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
// Reading dataset (one batch), and then the I/O thread write them to the SSDs
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dread_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// Reading dataset (one batch) from the SSDs
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
// close the dataset
herr_t H5Dclose_cache_read(hid_t dset);
/****************************************
* I/O thread sync functions *
****************************************/
// waiting for the write (or read) I/O thread to finish the work
void H5WPthreadWait();
void H5RPthreadWait();
// terminate (join) the write (or read) I/O thread after finish all the tasks
void H5WPthreadTerminate();
void H5RPthreadTerminate();
/***************************************
* Other utils functions *
***************************************/
herr_t H5DRMMF_remap(); // remap the memory mapped file to remove the cache effect
// set hyperslab selection given the sample list
void set_hyperslab_from_samples(int *samples, int nsample, hid_t *fspace);
// get the list of the samples from the filespace
void get_samples_from_filespace(hid_t fspace, BATCH *samples, bool *contiguous);
// get the buffer size from the mspace and type ids.
hsize_t get_buf_size(hid_t mspace, hid_t tid);
void parallel_dist(size_t dim, int nproc, int rank, size_t *ldim, size_t *start);
void setH5SSD(SSD_INFO *);
#ifdef __cplusplus
}
#endif
#endif //H5Dio_cache.h
#Makefile
include make.inc
%.o: %.cpp
$(CXX) $(CFLAGS) $(CPPFLAGS) -o $@ -c $<
%.o: %.c
$(CC) $(CFLAGS) $(CPPFLAGS) -o $@ -c $<
all: test_read_cache test_write_cache prepare_dataset
test_read_cache: test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_read_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB) H5Dio_cache.o #-L$(HDF5_ROOT)/../vol/ -lh5passthrough_vol
test_vol: test_vol.o ../utils/debug.o
$(CXX) $(CFLAGS) -o $@ test_vol.o ../utils/debug.o $(HDF5_LIB) -Wl,-rpath,$(HDF5_ROOT)/../vol -L$(HDF5_ROOT)/../vol/ -lh5passthrough_vol
test_write_cache: test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB)
prepare_dataset: prepare_dataset.o
$(CXX) $(CFLAGS) -o $@ $< $(HDF5_LIB)
clean:
rm -rf *.o test_read_cache test_write_cache mpi_profile.* parallel_file.*
# Incorparating node local storage in HDF5
Authors: Huihuo Zheng <huihuo.zheng@anl.gov>, Venkatram Vishwanath <venkat@anl.gov>
This folder contains the prototype of system-aware HDF5 incoroprating node-local
storage. This is part of the ExaHDF5 ECP project lead by Suren Byna <sbyna@lbl.gov>.
## Source file
* H5Dio_cache.cpp, H5Dio_cache.h -- source codes for incorporating node-local storage into parallel read and write HDF5.
* test_read_cache.cpp -- testing code for read
* test_write_cache.cpp -- testing code for write
* prepare_dataset.cpp -- preparing dataset for the read testing.
## Function APIs
* H5Dwrite_cache -- writing data to node local storage and then the pthread move the data to the parallel file system
* H5Dread_to_cache -- reading data from the parallel file system and write the buffer to the local storage
* H5Dread_from_cache -- reading data directly from the local storage
## Parallel HDF5 Write incorporating node-local storage
test_write_cache.cpp is the benchmark code for evaluating the performance. In this testing case, each MPI rank has a local
buffer BI to be written into a HDF5 file organized in the following way: [B0|B1|B2|B3]|[B0|B1|B2|B3]|...|[B0|B1|B2|B3]. The repeatition of [B0|B1|B2|B3] is the number of iterations
* --dim: dimension of the 2D array [BI] // this is the local buffer size
* --niter: number of iterations. Notice that the data is accumulately written to the file.
* --scratch: the location of the raw data
* --sleep: sleep between different iterations
In this benchmark code, one can turns on the SSD cache effect by setting the environmental variable to SSD_CACH=yes.
SSD_PATH -- environmental variable setting the path of the SSD.
## Parallel HDF5 Read incorporating node-local storage
**Preparing the dataset**
The benchmark relies on a dataset stored in a hdf5 file. One can generate the
dataset using prepare_dataset.py or prepare_dataset.cpp. The example
python prepare_dataset.py --num_images 8192 --sz 224 --output images.h5
This will generate a hdf5 file, images.h5, which contains 8192 samples, each with 224*224*3 (image-base dataset)
**Benchmarks**
test_read_cache.cpp is the benchmark code for evaluating the performance.
* --input: HDF5 file
* --dataset: the name of the dataset in the HDF5 file
* --num_epochs [Default: 2]: Number of epochs (at each epoch/iteration, we sweep through the dataset)
* --num_batches [Default: 16]: Number of batches to read per epoch
* --batch_size [Default: 32]: Number of samples per batch
* --shuffle [Default: False]: Whether to shuffle the samples at the beginning of each epoch.
* --cache [Default: False]: Whether the local storage cache is turned on or not. If False, each epoch it will read from the file system.
* --local_storage [Default: ./]: The path of the local storage.
#Makefile
#!/bin/sh
# Makefile inclusion for Cooley
# Huihuo Zheng @ ALCF Theta
# ------------------------------
HDF5_ROOT=/soft/libraries/hdf5-1.10.5-parallel/
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=mpicc
CXX=mpicxx
#CC=cc
#CXX=CC
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
CPATH=
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
/*
This file is for testing reading the data set in parallel in data paralle training.
We assume that the dataset is in a single HDF5 file. Each dataset is stored in the
following way:
(nsample, d1, d2 ..., dn)
Each sample are an n-dimensional array
When we read the data, each rank will read a batch of sample randomly or contiguously
from the HDF5 file. Each sample has a unique id associate with it. At the begining of
epoch, we mannually partition the entire dataset with nproc pieces - where nproc is
the number of workers.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
using namespace std;
#define MAXDIM 1024
void dim_dist(hsize_t gdim, int nproc, int rank, hsize_t *ldim, hsize_t *start) {
*ldim = gdim/nproc;
*start = *ldim*rank;
if (rank < gdim%nproc) {
*ldim += 1;
*start += rank;
} else {
*start += gdim%nproc;
}
}
int main(int argc, char **argv) {
int rank, nproc;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
char fname[255] = "images.h5";
char dataset[255] = "dataset";
size_t num_images = 1024;
size_t sz = 224;
int i=0;
// Timing tt(rank==0);
while (i<argc) {
if (strcmp(argv[i], "--output")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_images")==0) {
num_images =size_t(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--sz")==0) {
sz = size_t(atof(argv[i+1])); i+=2;
} else {
i=i+1;
}
}
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
printf("\ndone\n");
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
hsize_t gdims[4] = {hsize_t(num_images), sz, sz, 3};
hid_t fspace = H5Screate_simple(4, gdims, NULL);
hsize_t ns_loc, fs_loc;
dim_dist(gdims[0], nproc, rank, &ns_loc, &fs_loc);
hsize_t ldims[4] = {ns_loc, sz, sz, 3};
hid_t mspace = H5Screate_simple(4, ldims, NULL);
hsize_t offset[4] = {fs_loc, 0, 0, 0};
hsize_t count[4] = {1, 1, 1, 1};
if (rank==0) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
}
float *dat = new float[ns_loc*sz*sz*3];
for(int i=0; i<ns_loc; i++) {
for(int j=0; j<sz*sz*3; j++)
dat[i*sz*sz*3+j] = fs_loc + i;
}
H5Sselect_hyperslab(fspace, H5S_SELECT_SET, offset, NULL, ldims, count);
hid_t dset = H5Dcreate(fd, dataset, H5T_NATIVE_FLOAT, fspace, H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
H5Dwrite(dset, H5T_NATIVE_FLOAT, mspace, fspace, H5P_DEFAULT, dat);
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Dclose(dset);
H5Fclose(fd);
delete [] dat;
MPI_Finalize();
return 0;
}
/*
This code is to prototying the idea of incorparating node-local storage
into repeatedly read workflow. We assume that the application is reading
the same dataset periodically from the file system. Out idea is to bring
the data to the node-local storage in the first iteration, and read from
the node-local storage directly and subsequent iterations.
To start with, we assume that the entire dataset fit into the node-local
storage. We also assume that the dataset is stored in the following format:
(nsample, d1, d2 ..., dn), where each sample is an n-dimensional array.
When reading the data, each rank gets a batch of sample randomly or contiguously
from the HDF5 file through hyperslab selection.
Huihuo Zheng @ ALCF
Revision history:
Mar 8, 2020, added MPI_Put and MPI_Get
Mar 1, 2020: added MPIIO support
Feb 29, 2020: Added debug info support.
Feb 28, 2020: Created with simple information.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
#include <assert.h>
#include "debug.h"
#include <unistd.h>
// POSIX I/O
#include <sys/stat.h>
#include <fcntl.h>
// Memory map
#include <sys/mman.h>
#include "H5Dio_cache.h"
#include "profiling.h"
extern H5Dread_cache_metadata H5DRMM;
using namespace std;
int msleep(long miliseconds)
{
struct timespec req, rem;
if(miliseconds > 999)
{
req.tv_sec = (int)(miliseconds / 1000); /* Must be Non-Negative */
req.tv_nsec = (miliseconds - ((long)req.tv_sec * 1000)) * 1000000; /* Must be in range of 0 to 999999999 */
}
else
{
req.tv_sec = 0; /* Must be Non-Negative */
req.tv_nsec = miliseconds * 1000000; /* Must be in range of 0 to 999999999 */
}
return nanosleep(&req , &rem);
}
int main(int argc, char **argv) {
int rank, nproc;
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
double compute = 0.0;
char fname[255] = "images.h5";
char dataset[255] = "dataset";
char local_storage[255] = "./";
bool shuffle = false;
bool mpio_collective = false;
bool mpio_independent = false;
bool cache = false;
int epochs = 4;
int num_batches = 16;
int batch_size = 32;
int rank_shift = 0;
int num_images = 1;
bool barrier = false; // set this always to be false. this is just for debug purpose
bool remap = false;
int i=0;
Timing tt(io_node()==rank);
// Input
while (i<argc) {
if (strcmp(argv[i], "--input")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_batches")==0) {
num_batches = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--batch_size")==0) {
batch_size = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--shuffle")==0) {
shuffle = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_independent")==0) {
mpio_independent = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_collective")==0) {
mpio_collective = true; i=i+1;
} else if (strcmp(argv[i], "--epochs") == 0) {
epochs = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--rank_shift")==0) {
rank_shift = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--cache") ==0){
cache = true; i=i+1;
} else if (strcmp(argv[i], "--remap") == 0) {
remap = true; i=i+1;
} else if (strcmp(argv[i], "--local_storage")==0) {
strcpy(local_storage, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--compute")==0) {
compute = atof(argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--barrier")==0) {
barrier = true; i = i+1;
} else {
i=i+1;
}
}
setenv("SSD_PATH", local_storage, 1);
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd;
if (cache)
fd = H5Fopen_cache(fname, H5F_ACC_RDONLY, plist_id);
else
fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
hid_t dset;
tt.start_clock("H5Dopen");
if (cache) {
dset = H5Dopen_cache(fd, dataset, H5P_DEFAULT);
} else {
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
}
tt.stop_clock("H5Dopen");
hid_t fspace = H5Dget_space(dset);
int ndims = H5Sget_simple_extent_ndims(fspace);
hsize_t *gdims = new hsize_t [ndims];
H5Sget_simple_extent_dims(fspace, gdims, NULL);
hsize_t dim = 1; // compute the size of a single sample
hsize_t *ldims = new hsize_t [ndims]; // for one batch of data
for(int i=0; i<ndims; i++) {
dim = dim*gdims[i];
ldims[i] = gdims[i];
}
dim = dim/gdims[0];
num_images = batch_size * num_batches * nproc;
if (num_images > gdims[0]) num_batches = gdims[0]/batch_size/nproc;
if (io_node()==rank) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset name: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
cout << "Number of images selected: " << num_images << endl;
cout << "Dimension of the sample: " << ndims - 1 << endl;
cout << "Size in each dimension: ";
for (int i=1; i<ndims; i++) {
cout << " " << gdims[i];
}
cout << endl;
cout << "\n====== I/O & MPI info ======" << endl;
cout << "MPIO_COLLECTIVE: " << mpio_collective << endl;
cout << "MPIO_INDEPENDENT: " << mpio_independent << endl;
cout << "\n====== training info ======" << endl;
cout << "Batch size: " << batch_size << endl;
cout << "Number of batches per epoch: " << num_batches << endl;
cout << "Number of epochs: " << epochs << endl;
cout << "Shuffling the samples: " << shuffle << endl;
cout << "Number of workers: " << nproc << endl;
cout << "Training time per batch: " << compute << endl;
cout << "\n======= Local storage path =====" << endl;
cout << "Path (MEMORY mains read everything to memory directly): " << local_storage << endl;
cout << endl;
}
// sample indices
vector<int> id;
id.resize(num_images);
for(int i=0; i<num_images; i++) id[i] = i;
mt19937 g(100);