Commit 4ab45f2c authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

reorganize the data

parent 6fecfb09
This diff is collapsed.
/*
This is the header file
*/
#ifndef H5DIO_CACHE_H_
#define H5DIO_CACHE_H_
#include "hdf5.h"
#include "mpi.h"
#include <vector>
#ifndef MAXDIM
#define MAXDIM 32
#endif
// The meta data for I/O thread to perform write
typedef struct _thread_data_t {
// we will use the link structure in C to build the list of I/O tasks
char fname[255];
hid_t dataset_id;
hid_t mem_type_id;
hid_t mem_space_id;
hid_t file_space_id;
hid_t xfer_plist_id;
int id;
hid_t offset; // offset in the files on SSD
hsize_t size;
void *buf;
struct _thread_data_t *next;
} thread_data_t;
using namespace std;
typedef struct _SSD_INFO {
double mspace_total;
double mspace_left;
double mspace_per_rank_total;
double mspace_per_rank_left;
char path[255];
hsize_t offset;
} SSD_INFO;
typedef struct _MPI_INFO {
int ppn;
int rank;
int local_rank;
int nproc;
MPI_Comm comm;
MPI_Win win;
} MPI_INFO;
typedef struct _IO_THREAD {
pthread_cond_t master_cond;
pthread_cond_t io_cond;
pthread_mutex_t request_lock;
bool batch_cached;
bool dset_cached;
int num_request;
pthread_t pthread;
thread_data_t *request_list, *current_request, *first_request;
} IO_THREAD;
typedef struct _MMAP {
// for write
int fd; // file handle for write
char fname[255];// full path of the memory mapped file
void *buf; // pointer that map the file to the memory
void *tmp_buf;
// for read
} MMAP;
typedef struct _SAMPLE {
size_t dim;
size_t size;
size_t nel;
} SAMPLE;
typedef struct _DSET {
SAMPLE sample;
size_t ns_loc;
size_t ns_glob;
size_t s_offset;
hsize_t size;
vector<int> batch;
bool contig_read;
MPI_Datatype mpi_datatype;
hid_t h5_datatype;
size_t esize;
} DSET;
typedef struct _H5Dwrite_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
SSD_INFO *ssd;
} H5Dwrite_cache_metadata;
typedef struct _H5Dread_cache_metadata {
MMAP mmap;
MPI_INFO mpi;
IO_THREAD io;
DSET dset;
SSD_INFO *ssd;
} H5Dread_cache_metadata;
void set_hyperslab_from_samples(int *samples, int nsample, hid_t &fspace);
void get_samples_from_filespace(hid_t fspace, vector<int> &samples, bool &contiguous);
//void create_mmap(char *path, H5Dio_mmap &f);
void parallel_dist(size_t dim, int nproc, int rank, size_t &ldim, size_t &start);
hid_t H5Dopen_cache(hid_t loc_id, const char *name, hid_t dapl_id);
herr_t H5Dread_to_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dread_from_cache(hid_t dataset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t xfer_plist_id, void * buf);
herr_t H5Dclose_cache_read(hid_t dset);
herr_t H5DRMMF_remap();
void H5PthreadWait();
void H5PthreadTerminate();
hid_t H5Fcreate_cache( const char *name, unsigned flags,
hid_t fcpl_id, hid_t fapl_id );
herr_t H5Fclose_cache( hid_t file_id );
herr_t H5Dwrite_cache(hid_t dset_id, hid_t mem_type_id,
hid_t mem_space_id, hid_t file_space_id,
hid_t dxpl_id, const void *buf);
herr_t H5Dclose_cache( hid_t id);
herr_t H5Sclose_cache( hid_t id);
void H5WPthreadWait();
void H5WPthreadTerminate();
void H5RPthreadWait();
void H5RPthreadTerminate();
void test_mmap_buf();
void check_pthread_data(thread_data_t *pt);
#endif //H5SSD_H_
#Makefile
CXX=mpicxx -g -O3
#HDF5_ROOT=/blues/gpfs/software/centos7/spack/opt/spack/linux-centos7-x86_64/gcc-8.2.0/hdf5-1.10.5-vozfsah/
CFLAGS=-I$(HDF5_ROOT)/include -O3 -I../utils
HDF5_LIB=-L$(HDF5_ROOT)/lib -lhdf5
%.o: %.cpp
$(CXX) $(CFLAGS) $(CPPFLAGS) -o $@ -c $<
all: read_dataset_cache prepare_dataset
read_dataset_cache: read_dataset_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ read_dataset_cache.o H5Dio_cache.o ../utils/debug.o ../utils/profiling.o $(HDF5_LIB) -lpthread
prepare_dataset: prepare_dataset.o
$(CXX) $(CFLAGS) -o $@ $< $(HDF5_LIB)
test: test_mmap test_cache test_mmap_mem
test_mmap: test_mmap.o
$(CXX) $(CFLAGS) -o $@ $<
test_cache: test_cache.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ $< ../utils/profiling.o
test_mmap_mem: test_mmap_mem.o ../utils/profiling.o
$(CXX) $(CFLAGS) -o $@ $< ../utils/profiling.o
clean:
rm -rf *.o read_dataset_cache prepare_dataset test_mmap test_cache test_mmap_mem
# Parallel HDF5 Read incorporating node-local storage
This folder contains the prototype of system-aware HDF5 incoroprating node-local
storage. We developed this for multiple read workflows
**Preparing the dataset**
The benchmark relies on a dataset stored in a hdf5 file. One can generate the
dataset using prepare_dataset.py or prepare_dataset.cpp. The example
python prepare_dataset.py --num_images 8192 --sz 224 --output images.h5
This will generate a hdf5 file, images.h5, which contains 8192 samples, each with 224*224*3 (image-base dataset)
**Benchmarks**
read_dataset_cache.cpp is the benchmark script.
* --input: HDF5 file
* --dataset: the name of the dataset in the HDF5 file
* --num_epochs [Default: 2]: Number of epochs (at each epoch/iteration, we sweep through the dataset)
* --num_batches [Default: 16]: Number of batches to read per epoch
* --batch_size [Default: 32]: Number of samples per batch
* --shuffle [Default: False]: Whether to shuffle the samples at the beginning of each epoch.
* --cache [Default: False]: Whether the local storage cache is turned on or not. If False, each epoch it will read from the file system.
* --local_storage [Default: ./]: The path of the local storage.
/*
This file is for testing reading the data set in parallel in data paralle training.
We assume that the dataset is in a single HDF5 file. Each dataset is stored in the
following way:
(nsample, d1, d2 ..., dn)
Each sample are an n-dimensional array
When we read the data, each rank will read a batch of sample randomly or contiguously
from the HDF5 file. Each sample has a unique id associate with it. At the begining of
epoch, we mannually partition the entire dataset with nproc pieces - where nproc is
the number of workers.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
using namespace std;
#define MAXDIM 1024
void dim_dist(hsize_t gdim, int nproc, int rank, hsize_t *ldim, hsize_t *start) {
*ldim = gdim/nproc;
*start = *ldim*rank;
if (rank < gdim%nproc) {
*ldim += 1;
*start += rank;
} else {
*start += gdim%nproc;
}
}
int main(int argc, char **argv) {
int rank, nproc;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
char fname[255] = "images.h5";
char dataset[255] = "dataset";
size_t num_images = 1024;
size_t sz = 224;
int i=0;
// Timing tt(rank==0);
while (i<argc) {
if (strcmp(argv[i], "--output")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_images")==0) {
num_images =size_t(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--sz")==0) {
sz = size_t(atof(argv[i+1])); i+=2;
} else {
i=i+1;
}
}
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
hid_t fd = H5Fcreate(fname, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
hsize_t gdims[4] = {hsize_t(num_images), sz, sz, 3};
hid_t fspace = H5Screate_simple(4, gdims, NULL);
hsize_t ns_loc, fs_loc;
dim_dist(gdims[0], nproc, rank, &ns_loc, &fs_loc);
hsize_t ldims[4] = {ns_loc, sz, sz, 3};
hid_t mspace = H5Screate_simple(4, ldims, NULL);
hsize_t offset[4] = {fs_loc, 0, 0, 0};
hsize_t count[4] = {1, 1, 1, 1};
if (rank==0) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
}
float *dat = new float[ns_loc*sz*sz*3];
for(int i=0; i<ns_loc; i++) {
for(int j=0; j<sz*sz*3; j++)
dat[i*sz*sz*3+j] = fs_loc + i;
}
H5Sselect_hyperslab(fspace, H5S_SELECT_SET, offset, NULL, ldims, count);
hid_t dset = H5Dcreate(fd, dataset, H5T_NATIVE_FLOAT, fspace, H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
H5Dwrite(dset, H5T_NATIVE_FLOAT, mspace, fspace, H5P_DEFAULT, dat);
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Dclose(dset);
H5Fclose(fd);
delete [] dat;
MPI_Finalize();
return 0;
}
#!/usr/bin/env python
# This is for preparing fake images for I/O tests
# One can select
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
print(comm.size, comm.rank)
except:
class Comm:
def __init__(self, ):
self.rank=0
self.size=1
comm = Comm()
import h5py
import argparse
import numpy as np
import os
from tqdm import tqdm
parser = argparse.ArgumentParser(description="preparing data set")
parser.add_argument("--num_images", type=int, default=8192)
parser.add_argument("--sz", type=int, default=224)
parser.add_argument("--format", default='channel_last', type=str)
parser.add_argument("--output", default='images.h5', type=str)
parser.add_argument("--file_per_image", action="store_true")
args = parser.parse_args()
if args.format == "channel_last":
data = np.zeros((args.num_images, args.sz, args.sz, 3))
else:
data = np.zeros((args.num_images, 3, args.sz, args.sz))
for i in range(args.num_images):
data[i] = np.ones(data[i].shape)*i
if (args.file_per_image):
for i in tqdm(range(comm.rank, args.num_images, comm.size)):
f = h5py.File("images/%d.h5"%i, 'w')
f.create_dataset("images", data=data[i], dtype=np.float32)
f.close()
else:
f = h5py.File(args.output, 'w')
dset = f.create_dataset("image", data=data, dtype=np.float32)
dset.attrs["format"] = args.format
f.close()
/*
This benchmark is for testing reading the data set in parallel in data parallel training.
We assume that the entire dataset is in a single HDF5 file. Each dataset is stored in the
following way:
(nsample, d1, d2 ..., dn), where each sample are an n-dimensional array.
The data
When we read the data, each rank will read a batch of sample randomly or contiguously
from the HDF5 file depends on whether we do shuffling or not.
Each sample has a unique id associate with it.
At the begining of epoch, we mannually partition the entire dataset with nproc pieces - where nproc is
the number of workers.
Huihuo Zheng @ ALCF
Revision history:
Mar 1, 2020: added MPIIO support
Feb 29, 2020: Added debug info support.
Feb 28, 2020: Created with simple information.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#include <random>
#include <algorithm>
#include <vector>
#include "timing.h"
#include <assert.h>
#include "debug.h"
#include <iostream>
#include <unistd.h>
// POSIX I/O
#include <sys/stat.h>
#include <fcntl.h>
using namespace std;
#define MAXDIM 1024
void dim_dist(hsize_t gdim, int nproc, int rank, hsize_t *ldim, hsize_t *start) {
*ldim = gdim/nproc;
*start = *ldim*rank;
if (rank < gdim%nproc) {
*ldim += 1;
*start += rank;
} else {
*start += gdim%nproc;
}
}
int main(int argc, char **argv) {
int rank, nproc;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
char fname[255] = "images.h5";
char dataset[255] = "dataset";
bool shuffle = false;
bool mpio_collective = false;
bool mpio_independent = false;
int epochs = 1;
int num_batches = 16;
int batch_size = 32;
int rank_shift = 0;
int i=0;
Timing tt(rank==io_node());
while (i<argc) {
if (strcmp(argv[i], "--input")==0) {
strcpy(fname, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--dataset")==0) {
strcpy(dataset, argv[i+1]); i+=2;
} else if (strcmp(argv[i], "--num_batches")==0) {
num_batches = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--batch_size")==0) {
batch_size = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--shuffle")==0) {
shuffle = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_independent")==0) {
mpio_independent = true; i=i+1;
} else if (strcmp(argv[i], "--mpio_collective")==0) {
mpio_collective = true; i=i+1;
} else if (strcmp(argv[i], "--epochs") == 0) {
epochs = int(atof(argv[i+1])); i+=2;
} else if (strcmp(argv[i], "--rank_shift")==0) {
rank_shift = int(atof(argv[i+1])); i+=2;
} else {
i=i+1;
}
}
hid_t plist_id = H5Pcreate(H5P_FILE_ACCESS);
if (mpio_collective or mpio_independent) {
H5Pset_fapl_mpio(plist_id, MPI_COMM_WORLD, MPI_INFO_NULL);
}
hid_t fd = H5Fopen(fname, H5F_ACC_RDONLY, plist_id);
hid_t dset = H5Dopen(fd, dataset, H5P_DEFAULT);
hid_t fspace = H5Dget_space(dset);
hsize_t gdims[MAXDIM];
int ndims = H5Sget_simple_extent_dims(fspace, gdims, NULL);
if (rank==io_node()) {
cout << "\n====== dataset info ======" << endl;
cout << "Dataset file: " << fname << endl;
cout << "Dataset: " << dataset << endl;
cout << "Number of samples in the dataset: " << gdims[0] << endl;
cout << "Dimension of the sample: " << ndims - 1 << endl;
cout << "Size in each dimension: ";
for (int i=1; i<ndims; i++) {
cout << " " << gdims[i];
}
cout << endl;
cout << "\n====== I/O & MPI info ======" << endl;
cout << "MPIO_COLLECTIVE: " << mpio_collective << endl;
cout << "MPIO_INDEPENDENT: " << mpio_independent << endl;
cout << "rank shift: " << rank_shift << endl;
cout << "\n====== training info ======" << endl;
cout << "Batch size: " << batch_size << endl;
cout << "Number of batches per epoch: " << num_batches << endl;
cout << "Number of epochs: " << epochs << endl;
cout << "Shuffling the samples: " << shuffle << endl;
cout << "Number of workers: " << nproc << endl;
}
hsize_t dim = 1; // compute the size of a single sample
hsize_t *ldims = new hsize_t [ndims]; // for one batch of data
hsize_t *offset = new hsize_t [ndims]; // the offset
hsize_t *count = new hsize_t [ndims]; // number of samples to read
hsize_t *sample = new hsize_t [ndims]; // for one sample
for(int i=0; i<ndims; i++) {
dim = dim*gdims[i];
ldims[i] = gdims[i];
offset[i] = 0;
count[i] = 1;
sample[i] = gdims[i];
}
sample[0]=1;
dim = dim/gdims[0];
vector<int> id;
id.resize(gdims[0]);
for(int i=0; i<gdims[0]; i++) id[i] = i;
mt19937 g(100);
hsize_t ns_loc; // number of sample per worker
hsize_t fs_loc; // first sample
dim_dist(gdims[0], nproc, rank, &ns_loc, &fs_loc);
float *dat = new float[dim*batch_size];// buffer to store one batch of data
ldims[0] = batch_size;
hid_t mspace = H5Screate_simple(ndims, ldims, NULL); // memory space for one bach of data
hid_t dxf_id = H5Pcreate(H5P_DATASET_XFER);
if (mpio_collective) {
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_COLLECTIVE);
} else if (mpio_independent) {
H5Pset_dxpl_mpio(dxf_id, H5FD_MPIO_INDEPENDENT);
}
for (int ep = 0; ep < epochs; ep++) {
if (rank_shift > 0)
dim_dist(gdims[0], nproc, (rank+ep*rank_shift)%nproc, &ns_loc, &fs_loc);
if ((io_node()==rank) and (debug_level()>0)) cout << "Clear cache" << endl;
// shuffle the indices
if (shuffle==1) {
::shuffle(id.begin(), id.end(), g);
if (rank==io_node() and debug_level()>1) {
cout << "Shuffled index" << endl;
cout << "* ";
for (int i=0; i<gdims[0]; i++) {
cout << " " << id[i] << "("<< i<< ")";
if (i%8==7) cout << "\n* ";
}
cout << endl;
}
}
for(int nb = 0; nb < num_batches; nb++) {
// hyperslab selection for a batch of data to read for all the workers
tt.start_clock("Select");
offset[0] = id[fs_loc+(nb*batch_size)%ns_loc]; // set the offset
H5Sselect_hyperslab(fspace, H5S_SELECT_SET, offset, NULL, sample, count);
for(int i=1; i<batch_size; i++) {
offset[0] = id[fs_loc+(i+nb*batch_size)%ns_loc];
H5Sselect_hyperslab(fspace, H5S_SELECT_OR, offset, NULL, sample, count);
}
tt.stop_clock("Select");
// reading one batch of data
tt.start_clock("H5Dread");
H5Dread(dset, H5T_NATIVE_FLOAT, mspace, fspace, dxf_id, dat);
tt.stop_clock("H5Dread");
// sanity check whether this is what we want.
if (rank==io_node() and debug_level()>2) {
cout << "=== batch: "<< nb << " \n* " << endl;
vector<int> b = vector<int> (id.begin() + fs_loc+(nb*batch_size)%ns_loc, id.begin() + fs_loc+((nb+1)*batch_size)%ns_loc);
sort(b.begin(), b.end());
for(int i=0; i<batch_size; i++) {
cout << dat[i*dim] << "-" << b[i] << " ";
if (i%8==7) {
cout << "\n* " << endl;
}
}
}
}
}
H5Pclose(plist_id);
H5Sclose(mspace);
H5Sclose(fspace);
H5Dclose(dset);
H5Fclose(fd);
double w = num_batches*epochs*batch_size/tt["H5Dread"].t*nproc;
if (rank==io_node()) {
cout << "\n===== I/O rate =====" << endl;
cout << "# of images/sec: " << w << endl;
cout << "Read rate: " << w*sizeof(float)*dim/1024/1024 << " MB/s" << endl;
}
delete [] dat;
delete [] ldims;
delete [] offset;
delete [] count;
delete [] sample;
MPI_Finalize();
return 0;
}
/*
This code is to prototying the idea of incorparating node-local storage
into repeatedly read workflow. We assume that the application is reading
the same dataset periodically from the file system. Out idea is to bring
the data to the node-local storage in the first iteration, and read from
the node-local storage directly and subsequent iterations.
To start with, we assume that the entire dataset fit into the node-local
storage. We also assume that the dataset is stored in the following format:
(nsample, d1, d2 ..., dn), where each sample is an n-dimensional array.
When reading the data, each rank gets a batch of sample randomly or contiguously
from the HDF5 file through hyperslab selection.
Huihuo Zheng @ ALCF
Revision history:
Mar 8, 2020, added MPI_Put and MPI_Get
Mar 1, 2020: added MPIIO support
Feb 29, 2020: Added debug info support.
Feb 28, 2020: Created with simple information.
*/
#include <iostream>
#include "hdf5.h"
#include "mpi.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"