Commit 8fe95a35 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

combined read and write

parent 49e8ab89
......@@ -16,11 +16,19 @@ For read workflow, the main program reads data from the filesystem and then the
## ./hdf5_read
* H5Dio_cache.c, H5Dio_cache.h -- source codes for incorporating node-local storage into parallel read HDF5.
* H5Dio_cache.c, H5Dio_cache.h -- source codes for incorporating node-local storage into parallel read HDF5.
* read_dataset_cache.cpp -- benchmark code for evaluating the performance (deep learning based)
* read_dataset_cache.cpp -- benchmark code for evaluating the performance (deep learning based)
* prepare_dataset.cpp -- code for preparing HDF5 dataset for the benchmark.
* prepare_dataset.cpp -- code for preparing HDF5 dataset for the benchmark.
## ./hdf5
* H5Dio_cache.c, H5Dio_cache.h -- source codes for incorporating node-local storage into parallel read and write HDF5.
* test_read_cache.cpp -- testing code for read
* test_write_cache.cpp -- testing code for write
## ./mpiio
mpiio_cache.c mpiio_cache.h -- source code for incorporating node-local storage into MPI I/O
......
This diff is collapsed.
/*
This is the header file
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#include <vector>
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset; // offset in the files on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
using namespace std;
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
double mspace_per_rank_total;
double mspace_per_rank_left;
char path[255];
hsize_t offset;
} SSD_INFO;
typedef struct _MPI_INFO {
int ppn;
int rank;
int local_rank;
int nproc;
MPI_Comm comm;
MPI_Win win;
} MPI_INFO;
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
bool batch_cached;
bool dset_cached;
int num_request;
pthread_t pthread;
thread_data_t *request_list, *current_request, *first_request;
} IO_THREAD;
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf;
// for read
} MMAP;
typedef struct _SAMPLE {
size_t dim;
size_t size;
size_t nel;
} SAMPLE;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc;
size_t ns_glob;
size_t s_offset;
hsize_t size;
vector<int> batch;
bool contig_read;
MPI_Datatype mpi_datatype;
hid_t h5_datatype;
size_t esize;
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
} H5Dwrite_cache_metadata;
typedef struct _H5Dread_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
DSET dset;
SSD_INFO *ssd;
} H5Dread_cache_metadata;
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace);
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contiguous);
//void create_mmap(char *path, H5Dio_mmap &f);
void parallel_dist(size_t dim, int nproc, int rank, size_t &ldim, size_t &start);
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dclose_cache_read(hid_t dset);
herr_t H5DRMMF_remap();
void H5PthreadWait();
void H5PthreadTerminate();
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
herr_t H5Fclose_cache( hid_t file_id );
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
herr_t H5Dclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
herr_t H5Pclose_cache( hid_t id);
void H5WPthreadWait();
void H5WPthreadTerminate();
void H5RPthreadWait();
void H5RPthreadTerminate();
void test_mmap_buf();
void check_pthread_data(thread_data_t *pt);
hsize_t get_buf_size(hid_t mspace, hid_t tid);
#endif //H5SSD_H_
#Makefile
include make.inc
%.o: %.cpp
$(CXX) $(CFLAGS) $(CPPFLAGS) -o $@ -c $<
%.o: %.c
$(CXX) $(CFLAGS) $(CPPFLAGS) -o $@ -c $<
all: test_read_cache test_write_cache prepare_dataset
test_read_cache: test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_read_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB)
test_write_cache: test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ test_write_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB)
prepare_dataset: prepare_dataset.o
$(CXX) $(CFLAGS) -o $@ $< $(HDF5_LIB)
clean:
rm -rf *.o test_read_cache test_write_cache mpi_profile.*
#Makefile
#!/bin/sh
# Makefile inclusion for Cooley
# Huihuo Zheng @ ALCF Theta
# ------------------------------
HDF5_ROOT=/soft/libraries/hdf5-1.10.5-parallel/
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=mpicc
CXX=mpicxx
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
#CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
#Makefile
#!/bin/sh
# Makefile inclusion for Theta
# Please load HDF5 modules
# module load cray-hdf5-parallel
# -------------------------------
CC=mpicc
CXX=mpicxx
#CC=cc
#CXX=CC
CFLAGS:=-O3 -I$(HDF5_ROOT)/include -I../utils
CPATH=
#CFLAGS+= -DSSD_CACHE_DEBUG -DEBUG
CFLAGS+= -DTHETA
HDF5_LIB=-Wl,-rpath,$(HDF5_ROOT)/lib -L$(HDF5_ROOT)/lib -lhdf5 $(HPCTW_LIBS) -pthread
#HPCTW=$(HOME)/opt/hpctw
#HPCTW=/home/morozov/lib/
#HPCTW_LIBS=-Wl,-rpath,$(HPCTW) -L$(HPCTW) -lhpmprof_c -liberty -lbfd -lz -lintl
/*
This file is for testing reading the data set in parallel in data paralle training.
We assume that the dataset is in a single HDF5 file. Each dataset is stored in the
following way:
(nsample, d1, d2 ..., dn)
Each sample are an n-dimensional array
When we read the data, each rank will read a batch of sample randomly or contiguously
from the HDF5 file. Each sample has a unique id associate with it. At the begining of
epoch, we mannually partition the entire dataset with nproc pieces - where nproc is
the number of workers.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
using namespace std;
#define MAXDIM 1024
void dim_dist(hsize_t gdim, int nproc, int rank, hsize_t *ldim, hsize_t *start) {
*ldim = gdim/nproc;
*start = *ldim*rank;
if (rank < gdim%nproc) {
*ldim += 1;
*start += rank;
} else {
*start += gdim%nproc;
}
}
int main(int argc, char **argv) {
int rank, nproc;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
char fname[255] = "images.h5";
char dataset[255] = "dataset";
size_t num_images = 1024;
size_t sz = 224;
int i=0;
// Timing tt(rank==0);
while (i<argc) {
if (strcmp(argv[i], "--output")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_images")==0) {
num_images =size_t(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--sz")==0) {
sz = size_t(atof(argv[i+1])); i+=2;
} else {
i=i+1;
}
}
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
hsize_t gdims[4] = {hsize_t(num_images), sz, sz, 3};
hid_t fspace = H5Screate_simple(4, gdims, NULL);
hsize_t ns_loc, fs_loc;
dim_dist(gdims[0], nproc, rank, &ns_loc, &fs_loc);
hsize_t ldims[4] = {ns_loc, sz, sz, 3};
hid_t mspace = H5Screate_simple(4, ldims, NULL);
hsize_t offset[4] = {fs_loc, 0, 0, 0};
hsize_t count[4] = {1, 1, 1, 1};
if (rank==0) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
}
float *dat = new float[ns_loc*sz*sz*3];
for(int i=0; i<ns_loc; i++) {
for(int j=0; j<sz*sz*3; j++)
dat[i*sz*sz*3+j] = fs_loc + i;
}
H5Sselect_hyperslab(fspace, H5S_SELECT_SET, offset, NULL, ldims, count);
hid_t dset = H5Dcreate(fd, dataset, H5T_NATIVE_FLOAT, fspace, H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
H5Dwrite(dset, H5T_NATIVE_FLOAT, mspace, fspace, H5P_DEFAULT, dat);
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Dclose(dset);
H5Fclose(fd);
delete [] dat;
MPI_Finalize();
return 0;
}
/*
This code is to prototying the idea of incorparating node-local storage
into repeatedly read workflow. We assume that the application is reading
the same dataset periodically from the file system. Out idea is to bring
the data to the node-local storage in the first iteration, and read from
the node-local storage directly and subsequent iterations.
To start with, we assume that the entire dataset fit into the node-local
storage. We also assume that the dataset is stored in the following format:
(nsample, d1, d2 ..., dn), where each sample is an n-dimensional array.
When reading the data, each rank gets a batch of sample randomly or contiguously
from the HDF5 file through hyperslab selection.
Huihuo Zheng @ ALCF
Revision history:
Mar 8, 2020, added MPI_Put and MPI_Get
Mar 1, 2020: added MPIIO support
Feb 29, 2020: Added debug info support.
Feb 28, 2020: Created with simple information.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
#include <assert.h>
#include "debug.h"
#include <unistd.h>
// POSIX I/O
#include <sys/stat.h>
#include <fcntl.h>
// Memory map
#include <sys/mman.h>
#include "H5Dio_cache.h"
#include "profiling.h"
using namespace std;
int msleep(long miliseconds)
{
struct timespec req, rem;
if(miliseconds > 999)
{
req.tv_sec = (int)(miliseconds / 1000); /* Must be Non-Negative */
req.tv_nsec = (miliseconds - ((long)req.tv_sec * 1000)) * 1000000; /* Must be in range of 0 to 999999999 */
}
else
{
req.tv_sec = 0; /* Must be Non-Negative */
req.tv_nsec = miliseconds * 1000000; /* Must be in range of 0 to 999999999 */
}
return nanosleep(&req , &rem);
}
int main(int argc, char **argv) {
int rank, nproc;
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
double compute = 0.0;
char fname[255] = "images.h5";
char dataset[255] = "dataset";
char local_storage[255] = "./";
bool shuffle = false;
bool mpio_collective = false;
bool mpio_independent = false;
bool cache = false;
int epochs = 4;
int num_batches = 16;
int batch_size = 32;
int rank_shift = 0;
int num_images = 1;
bool barrier = false; // set this always to be false. this is just for debug purpose
bool remap = false;
int i=0;
Timing tt(io_node()==rank);
// Input
while (i<argc) {
if (strcmp(argv[i], "--input")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_batches")==0) {
num_batches = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--batch_size")==0) {
batch_size = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--shuffle")==0) {
shuffle = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_independent")==0) {
mpio_independent = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_collective")==0) {
mpio_collective = true; i=i+1;
} else if (strcmp(argv[i], "--epochs") == 0) {
epochs = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--rank_shift")==0) {
rank_shift = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--cache") ==0){
cache = true; i=i+1;
} else if (strcmp(argv[i], "--remap") == 0) {
remap = true; i=i+1;
} else if (strcmp(argv[i], "--local_storage")==0) {
strcpy(local_storage, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--compute")==0) {
compute = atof(argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--barrier")==0) {
barrier = true; i = i+1;
} else {
i=i+1;
}
}
setenv("SSD_PATH", local_storage, 1);
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
hid_t dset;
tt.start_clock("H5Dopen");
if (cache) {
dset = H5Dopen_cache(fd, dataset, H5P_DEFAULT);
} else {
dset = H5Dopen(fd, dataset, H5P_DEFAULT);
}
tt.stop_clock("H5Dopen");
hid_t fspace = H5Dget_space(dset);
int ndims = H5Sget_simple_extent_ndims(fspace);
hsize_t *gdims = new hsize_t [ndims];
H5Sget_simple_extent_dims(fspace, gdims, NULL);
hsize_t dim = 1; // compute the size of a single sample
hsize_t *ldims = new hsize_t [ndims]; // for one batch of data
for(int i=0; i<ndims; i++) {
dim = dim*gdims[i];
ldims[i] = gdims[i];
}
dim = dim/gdims[0];
num_images = batch_size * num_batches * nproc;
if (num_images > gdims[0]) num_batches = gdims[0]/batch_size/nproc;
if (io_node()==rank) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset name: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
cout << "Number of images selected: " << num_images << endl;
cout << "Dimension of the sample: " << ndims - 1 << endl;
cout << "Size in each dimension: ";
for (int i=1; i<ndims; i++) {
cout << " " << gdims[i];
}
cout << endl;
cout << "\n====== I/O & MPI info ======" << endl;
cout << "MPIO_COLLECTIVE: " << mpio_collective << endl;
cout << "MPIO_INDEPENDENT: " << mpio_independent << endl;
cout << "\n====== training info ======" << endl;
cout << "Batch size: " << batch_size << endl;
cout << "Number of batches per epoch: " << num_batches << endl;
cout << "Number of epochs: " << epochs << endl;
cout << "Shuffling the samples: " << shuffle << endl;
cout << "Number of workers: " << nproc << endl;
cout << "Training time per batch: " << compute << endl;
cout << "\n======= Local storage path =====" << endl;
cout << "Path (MEMORY mains read everything to memory directly): " << local_storage << endl;
cout << endl;
}
// sample indices
vector<int> id;
id.resize(num_images);
for(int i=0; i<num_images; i++) id[i] = i;
mt19937 g(100);
size_t ns_loc, fs_loc; // number of sample per worker, first sample
parallel_dist(num_images, nproc, rank, ns_loc, fs_loc);
// buffer for loading one batch of data
float *dat = new float[dim*batch_size];// buffer to store one batch of data
ldims[0] = batch_size;
hid_t mspace = H5Screate_simple(ndims, ldims, NULL); // memory space for one bach of data
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
if (mpio_collective) {
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
} else if (mpio_independent) {
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_INDEPENDENT);
}
// First epoch -- reading the data from the file system and cache it to local storage
if (shuffle) ::shuffle(id.begin(), id.end(), g);
for(int nb = 0; nb < num_batches; nb++) {
//// reading from file system to memory using H5Dread.
tt.start_clock("Select");
set_hyperslab_from_samples(&id[fs_loc + nb*batch_size], batch_size, fspace);
tt.stop_clock("Select");
tt.start_clock("H5Dread");
if (cache) {
tt.start_clock("H5Dread_to_cache");
H5Dread_to_cache(dset, H5T_NATIVE_FLOAT, mspace, fspace, dxf_id, dat);
tt.stop_clock("H5Dread_to_cache");
} else {
H5Dread(dset, H5T_NATIVE_FLOAT, mspace, fspace, dxf_id, dat);
}
tt.stop_clock("H5Dread");
double vm, rss;
process_mem_usage(vm, rss);
if (io_node()==rank and debug_level()>1)
cout << "VM: " << vm << "; RSS:" << rss << endl;
msleep(int(compute*1000));
}
if (io_node()==rank)
printf("Epoch: %d --- time: %6.2f (sec) --- throughput: %6.2f (imgs/sec) --- rate: %6.2f (MB/sec)\n",
0, tt["H5Dread"].t, nproc*num_batches*batch_size/tt["H5Dread"].t,
num_batches*batch_size*dim*sizeof(float)/tt["H5Dread"].t/1024/1024*nproc);
// Epochs 1 - ... reading data directly from local storage
if (barrier)
MPI_Barrier(MPI_COMM_WORLD);
for(int e =1; e < epochs; e++) {
if (remap) {
tt.start_clock("REMAP");
H5DRMMF_remap();
tt.stop_clock("REMAP");
}
double vm, rss;
process_mem_usage(vm, rss);
if (io_node()==rank and debug_level()>1)
cout << "VM: " << vm << "; RSS:" << rss << endl;
if (shuffle) ::shuffle(id.begin(), id.end(), g);
parallel_dist(num_images, nproc, (rank+e*rank_shift)%nproc, ns_loc, fs_loc);
double t1 = 0.0;
for (int nb = 0; nb < num_batches; nb++) {
vector<int> b = vector<int> (id.begin() + fs_loc+nb*batch_size, id.begin() + fs_loc+(nb+1)*batch_size);
sort(b.begin(), b.end());
if (io_node()==rank and debug_level() > 1) cout << "Batch: " << nb << endl;
double t0 = MPI_Wtime();
tt.start_clock("Select");
set_hyperslab_from_samples(&b[0], batch_size, fspace);
tt.stop_clock("Select");
tt.start_clock("H5Dread");
if (cache) {