Commit b8748625 authored by Huihuo Zheng's avatar Huihuo Zheng
Browse files

h5py

parent 4ab45f2c
# Parallel HDF5 Read incorporating node-local storage
This folder contains the prototype of system-aware HDF5 incoroprating node-local
storage. We developed this for multiple read workflows
**Preparing the dataset**
The benchmark relies on a dataset stored in a hdf5 file. One can generate the
dataset using prepare_dataset.py or prepare_dataset.cpp. The example
python prepare_dataset.py --num_images 8192 --sz 224 --output images.h5
This will generate a hdf5 file, images.h5, which contains 8192 samples, each with 224*224*3 (image-base dataset)
**Benchmarks**
read_dataset_cache.cpp is the benchmark script.
* --input: HDF5 file
* --dataset: the name of the dataset in the HDF5 file
* --num_epochs [Default: 2]: Number of epochs (at each epoch/iteration, we sweep through the dataset)
* --num_batches [Default: 16]: Number of batches to read per epoch
* --batch_size [Default: 32]: Number of samples per batch
* --shuffle [Default: False]: Whether to shuffle the samples at the beginning of each epoch.
* --cache [Default: False]: Whether the local storage cache is turned on or not. If False, each epoch it will read from the file system.
* --local_storage [Default: ./]: The path of the local storage.
#!/usr/bin/env python
# This is for preparing fake images for I/O tests
# One can select
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
print(comm.size, comm.rank)
except:
class Comm:
def __init__(self, ):
self.rank=0
self.size=1
comm = Comm()
import h5py
import argparse
import numpy as np
import os
from tqdm import tqdm
parser = argparse.ArgumentParser(description="preparing data set")
parser.add_argument("--num_images", type=int, default=8192)
parser.add_argument("--sz", type=int, default=224)
parser.add_argument("--format", default='channel_last', type=str)
parser.add_argument("--output", default='images.h5', type=str)
parser.add_argument("--file_per_image", action="store_true")
args = parser.parse_args()
if args.format == "channel_last":
data = np.zeros((args.num_images, args.sz, args.sz, 3))
else:
data = np.zeros((args.num_images, 3, args.sz, args.sz))
for i in range(args.num_images):
data[i] = np.ones(data[i].shape)*i
if (args.file_per_image):
for i in tqdm(range(comm.rank, args.num_images, comm.size)):
f = h5py.File("images/%d.h5"%i, 'w')
f.create_dataset("images", data=data[i], dtype=np.float32)
f.close()
else:
f = h5py.File(args.output, 'w')
dset = f.create_dataset("image", data=data, dtype=np.float32)
dset.attrs["format"] = args.format
f.close()
#!/usr/bin/env python
# I/O testing for incorparting node local storage in deep learning applications
#
# -- Huihuo Zheng
class C:
def __init__(self):
self.rank = 0
self.size = 1
try:
import mpi4py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
except:
comm = C()
import h5py
import argparse
import numpy as np
from threading import Thread
from queue import Queue
import time
print("mpi4py", mpi4py.__version__)
import os
from os import path
import glob
import mmap
from tqdm import tqdm
from itertools import cycle
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--batch_size', type=int, default=32, help="batch size")
parser.add_argument('--num_batches', type=int, default=10, help='Number of batches per epoch')
parser.add_argument("--epochs", type=int, default=4, help="Number of epochs")
parser.add_argument("--verbose", type=int, default=0, help='Verbose level')
parser.add_argument("--nimages", type=int, default=-1, help="Number of image")
parser.add_argument("--simulate_compute", type=np.float32, default=0.5, help="Time per training step")
parser.add_argument("--node_local_storage", type=str, default="SSD/")
parser.add_argument("--space", type=int, default=1024*1024*1024*128)
parser.add_argument("--cache", action='store_true')
args = parser.parse_args()
sz = 224
sz = 224
nc = 3
verbose = args.verbose
node_local_storage=args.node_local_storage
space = args.space
print("============ Deep learning I/O test ===============")
print("* batch size: ", args.batch_size)
print("* number of batches: ", args.num_batches)
print("* Number of epochs: ", args.epochs)
print("* Number of images: ", args.nimages)
print("* node_local_storage: ", args.node_local_storage)
print("* time per step: ", args.simulate_compute)
print("===================================================")
def read_image(fstr):
''' reading a single image'''
fin = h5py.File(fstr, 'r')
a = fin['images'][:, :, :]
fin.close()
return a
def get_remote_image(fstr, target=0):
if (rank==target):
a = read_image(fstr)
MPI_Send()
MPI_Recv()
return a
def where_is_file(fstr):
''' checking where is the file '''
pos = np.zeros(nproc)
def parallel_dist(n, rank=0, nproc=1):
nloc = n//nproc
offset = nloc*rank
if (rank < n%nproc):
nloc = nloc + 1
offset = offset + rank
else:
offset = offset + n%nproc
return offset, nloc
def data_migration_thread(queue, terminate, wait=0.01):
fstr = queue.get()
while(not terminate):
a = read_image(fstr)
f = node_local_storage + "/" + fstr.split('/')[-1]
if (verbose>2):
print("Copying data %s to node local cache" %fstr)
os.system("cp %s %s"%(fstr, f))
try:
fstr = queue.get(timeout=wait)
except:
break
class ImageGenerator:
def __init__(self, directory="./", batch_size=32, shuffle=True, split=True, nimages=-1):
self.batch_size = batch_size
self.directory = directory
self.fname = glob.glob("%s/*.h5"%self.directory)
self.fname=self.fname[:nimages]
self.index = np.arange(len(self.fname))
self.file_iterator = None
self.shuffle=shuffle
self.split = split
self._get_iterator()
self.local_cache_list = []
self.files_to_move = Queue()
self.terminate=False
self.thread = Thread(target=data_migration_thread, args=(self.files_to_move,self.terminate))
self.start_thread = False
self.local_storage_space_left = space
def _get_iterator(self):
if (self.split):
np.random.seed(100)
else:
np.random.seed(rank)
if (self.shuffle):
np.random.shuffle(self.index)
if (self.split):
offset, nloc = parallel_dist(len(self.fname), rank=comm.rank, nproc = comm.size)
self.iterator = cycle(self.index[offset:offset+nloc])
else:
self.iterator = cycle(self.index)
def _get_batch_of_files(self):
batch_index = []
for i in range(self.batch_size):
batch_index.append(next(self.iterator))
return batch_index
def __next__(self):
batch = np.zeros((self.batch_size, sz, sz, nc))
i = 0
for n in self._get_batch_of_files():
batch[i] = self._get_one_image(n)
i=i+1
return batch
def __iter__(self):
return self
def _get_one_image(self, n):
fstr = self.fname[n]
if (not args.cache):
return read_image(fstr)
try:
place = self.local_cache_list.find(n)
f = node_local_storage + "/" + fstr.split('/')[-1]
return read_image(f)
except:
if (self.local_storage_space_left > 4*sz*sz*nc):
self.local_cache_list.append(n)
self.files_to_move.put(fstr)
if (self.start_thread != True):
self.start_thread = True
self.thread.start()
self.local_storage_space_left = self.local_storage_space_left - 4*sz*sz*nc
return read_image(fstr)
def _where_is_image(self, fstr):
rank = -1
return rank
def terminate_thread(self):
self.terminate=True
if (self.start_thread):
self.thread.join()
def on_epoch_end(self):
self._get_iterator()
def train(a, steps_per_epoch, epochs, t=5):
for epoch in range(epochs):
tt = 0
print("Epoch - %s" %epoch)
for i in tqdm(range(steps_per_epoch)):
t0= time.time()
b = next(a)
t1 = time.time()
tt += t1 - t0
if (verbose>1):
print("training on batch: ", b[:, 0, 0, 0])
time.sleep(t)
a.on_epoch_end()
print("Total time on I/O: %s" %tt)
print("I/O throughput: %s images/sec" %(args.num_batches*args.batch_size/tt))
a.terminate_thread()
a = ImageGenerator("images/", shuffle=True, batch_size=args.batch_size, split=True, nimages=args.nimages)
train(a, steps_per_epoch=args.num_batches, epochs=args.epochs, t=args.simulate_compute)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment