Commit 3f94cf2d authored by Matthieu Dorier's avatar Matthieu Dorier

implemented first prototype of Mochi backend

parent ff2ddb0f
.db/
.flamestore
**/*.pyc
......@@ -13,7 +13,7 @@ mid.enable_remote_shutdown()
worker_id = 43
bake_id = 45
bake_target_size = 2*8388608
bake_target_size = 20*8388608
bake_target_name = "/dev/shm/flamestore.dat"
# Create a Bake pool
......
import os
import sys
import numpy as np
import json
from keras.models import model_from_json
from pymargo import MargoInstance
from pysdskv.client import *
from pybake.target import BakeRegionID, BakeTargetID
from pybake.client import *
class MochiBackend():
def __init__(self, path='.'):
self._path = path
with open(path+'/.flamestore') as f:
info = json.loads(f.read())
# Initialize Margo
self._mid = MargoInstance('tcp', mode=pymargo.client)
# Initialize Manager client info
self._select_storage_worker_rpc_id = self._mid.register("flame_select_storage_worker")
self._manager_addr = self._mid.lookup(str(info['manager']['address']))
self._manager_id = int(info['manager']['provider_id'])
# Initialize KeyVal client info
self._keyval_addr = self._mid.lookup(str(info['keyval']['address']))
self._keyval_id = int(info['keyval']['provider_id'])
self._keyval_client = SDSKVClient(self._mid)
self._keyval_ph = self._keyval_client.create_provider_handle(self._keyval_addr, self._keyval_id)
self._model_db = self._keyval_ph.open("models")
self._input_db = self._keyval_ph.open("input")
# Initialize Bake client info
self._bake_client = BakeClient(self._mid)
def __del__(self):
del self._model_db
del self._input_db
del self._keyval_ph
del self._keyval_addr
self._keyval_client.finalize()
del self._keyval_client
del self._manager_addr
self._bake_client.finalize()
del self._bake_client
self._mid.finalize()
del self._mid
def store_model(self, name, model):
# Find a bake target and provider to store to
metadata = json.loads(model.to_json())
bake_info = self.__select_bake_target(name, metadata)
bake_target = list(bake_info.keys())[0]
bake_addr_str = str(bake_info[bake_target]['address'])
bake_provider_id = int(bake_info[bake_target]['bake_provider_id'])
# Store data in this bake provider
for i, l in enumerate(model.layers):
layer_md = self.__store_layer_data(bake_addr_str, bake_provider_id, bake_target, l)
metadata['config'][i]['data'] = layer_md
self.__store_model_metadata(name, metadata)
def load_model(self, name):
metadata = self.__load_model_metadata(name)
model = model_from_json(json.dumps(metadata))
for i, l in enumerate(model.layers):
self.__load_layer_data(name, i, l, metadata['config'][i]['data'])
return model
def __store_layer_data(self, bake_addr_str, bake_provider_id, bake_target_str, layer):
"""
Stores a Keras layer for a given model.
Args:
bake_addr_str (str): Address of the bake provider in which to store the data
bake_provider_id (int): provider if of the bake provider
bake_target (str): bake target
layer (keras Layer): Layer to store.
Returns:
A dictionary of metadata describing the
stored weight arrays.
"""
bake_addr = self._mid.lookup(bake_addr_str)
bake_ph = self._bake_client.create_provider_handle(bake_addr, bake_provider_id)
bake_target = BakeTargetID.from_str(str(bake_target_str))
weights = layer.get_weights()
metadata = []
for w in weights:
region = bake_ph.create_write_persist_numpy(bake_target, w)
location = { 'type' : 'bake',
'address' : bake_addr_str,
'provider_id' : bake_provider_id,
'target' : bake_target_str,
'region' : str(region) }
md = { 'location' : location,
'dtype' : str(w.dtype),
'shape' : w.shape }
metadata.append(md)
return metadata
def __store_model_metadata(self, name, metadata):
"""
Stores the metadata associated with a given model.
Args:
name (str): Name of the model.
metadata (dict): Dictionary of metadata to store.
Returns:
True if the model was stored, False otherwise.
"""
self._model_db.put(name, json.dumps(metadata))
# TODO check return value of the above call
return True
def __load_layer_data(self, name, i, layer, config):
"""
Loads the data of a given layer for a given model.
Args:
name (str): Name of the model.
i (int): Index of the layer to load.
layer (keras Layer): Reference to the layer to load.
config: List of weight metadata.
Returns:
Nothing. This function sets the weights in the provided
layer object.
"""
weights = []
for md in config:
location = md['location']
bake_addr_str = location['address']
bake_provider_id = location['provider_id']
bake_region_str = location['region']
bake_addr = self._mid.lookup(str(bake_addr_str))
bake_region = BakeRegionID.from_str(str(bake_region_str))
bake_ph = self._bake_client.create_provider_handle(bake_addr, bake_provider_id)
dtype = np.dtype(getattr(np, md['dtype']))
w = bake_ph.read_numpy(bake_region, 0, shape=tuple(md['shape']), dtype=dtype)
weights.append(w)
layer.set_weights(weights)
def __load_model_metadata(self, name):
"""
Loads the metadata associated with a given model.
Args:
name (str): Name of the model.
Returns:
A metadata dictionary corresponding to the model,
or None if the model does not exists.
"""
metadata = self._model_db.get(name)
return json.loads(metadata)
def __select_bake_target(self, name, metadata):
handle = self._mid.create_handle(self._manager_addr, self._select_storage_worker_rpc_id)
args = json.dumps({ 'name': name, 'metadata' : metadata })
resp = handle.forward(self._manager_id, args)
return json.loads(resp)
from pymargo import MargoInstance
from pymargo import Provider
import pickle
import random
import json
......@@ -27,7 +26,7 @@ class StorageManager(Provider):
f.write(json.dumps(info))
def _add_storage_worker(self, handle, args):
metadata = pickle.loads(args)
metadata = json.loads(args)
address = metadata['address']
worker_pr_id = metadata['worker']['provider_id']
bake_pr_id = metadata['bake']['provider_id']
......@@ -41,10 +40,15 @@ class StorageManager(Provider):
handle.respond('')
def _select_storage_worker(self, handle, args):
info = pickle.loads(args)
target = random.choice(self._storage_targets.keys())
print("==> Request for a worker, returning "+str(resp))
handle.respond(pickle.dumps({ target : self._storage_targets[target] }))
info = json.loads(args)
if(len(self._storage_targets) == 0):
print("==> Request for a worker but there is none connected")
handle.respond(json.dumps(None))
else:
target = random.choice(self._storage_targets.keys())
resp = { target : self._storage_targets[target] }
print("==> Request for a worker, returning "+target)
handle.respond(json.dumps(resp))
class StorageWorker():
......@@ -63,6 +67,6 @@ class StorageWorker():
handle = self._mid.create_handle(
self._manager_addr,
self._add_storage_worker_rpc_id)
args = pickle.dumps(info)
args = json.dumps(info)
res = handle.forward(self._manager_provider_id, args)
print("----> StorageWorker connected")
import os
import sys
from keras.models import model_from_json
import numpy as np
import json
......@@ -8,7 +9,21 @@ class FileSystemBackend():
def __init__(self, path='.'):
self._path = path
def store_layer_data(self, name, i, layer):
def store_model(self, name, model):
metadata = json.loads(model.to_json())
for i, l in enumerate(model.layers):
layer_md = self.__store_layer_data(name, i, l)
metadata['config'][i]['data'] = layer_md
self.__store_model_metadata(name, metadata)
def load_model(self, name):
metadata = self.__load_model_metadata(name)
model = model_from_json(json.dumps(metadata))
for i, l in enumerate(model.layers):
self.__load_layer_data(name, i, l, metadata['config'][i]['data'])
return model
def __store_layer_data(self, name, i, layer):
"""
Stores a Keras layer for a given model.
......@@ -36,7 +51,7 @@ class FileSystemBackend():
w.tofile(f)
return metadata
def store_model_metadata(self, name, metadata):
def __store_model_metadata(self, name, metadata):
"""
Stores the metadata associated with a given model.
......@@ -57,7 +72,7 @@ class FileSystemBackend():
f.write(json.dumps(metadata, indent=2, sort_keys=True))
return True
def load_layer_data(self, name, i, layer, config):
def __load_layer_data(self, name, i, layer, config):
"""
Loads the data of a given layer for a given model.
......@@ -81,7 +96,7 @@ class FileSystemBackend():
weights.append(w)
layer.set_weights(weights)
def load_model_metadata(self, name):
def __load_model_metadata(self, name):
"""
Loads the metadata associated with a given model.
......
from keras.models import model_from_json
from pfs import FileSystemBackend
from mochi.client import MochiBackend
import json
class WorkspaceHandle():
def __init__(self, path='.'):
self._backend = FileSystemBackend(path)
self._backends = {}
self._backends['fs'] = FileSystemBackend(path)
self._backends['mochi'] = MochiBackend(path)
def store(self, name, model):
self.__store_model(name, model)
def load(self, name):
return self.__load_model(name)
def __store_model(self, name, model):
metadata = json.loads(model.to_json())
metadata['config']
for i, l in enumerate(model.layers):
layer_md = self._backend.store_layer_data(name, i, l)
metadata['config'][i]['data'] = layer_md
self._backend.store_model_metadata(name, metadata)
def __load_model(self, name):
metadata = self._backend.load_model_metadata(name)
model = model_from_json(json.dumps(metadata))
for i, l in enumerate(model.layers):
self._backend.load_layer_data(name, i, l, metadata['config'][i]['data'])
return model
def store(self, name, model, backend='mochi'):
self._backends[backend].store_model(name, model)
def load(self, name, backend='mochi'):
return self._backends[backend].load_model(name)
......@@ -3,8 +3,11 @@
# It works with Keras 2 and Theano 0.9
# Import libraries and modules
import sys
sys.path.append('.')
import json
import numpy as np
import theano.ifelse
np.random.seed(7) # for reproducibility
from keras.models import Sequential
......@@ -17,8 +20,10 @@ K.set_image_dim_ordering('th')
from flamestore.workspace import WorkspaceHandle
# Initialize workspace
print("==> Instantiating WorkSpaceHandle")
ws = WorkspaceHandle(".")
print("==> Loading MNIST dataset")
# Load pre-shuffled MNIST data into train and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
......@@ -33,6 +38,7 @@ Y_train = np_utils.to_categorical(y_train)
Y_test = np_utils.to_categorical(y_test)
num_classes = Y_test.shape[1]
print("==> Creating model")
# Define model architecture
model = Sequential()
model.add(Conv2D(32, (5, 5), input_shape=(1, 28, 28), activation='relu'))
......@@ -42,29 +48,36 @@ model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
print("==> Compiling model")
# Compile model
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
# Fit model on training data
model.fit(X_train, Y_train, validation_data=(X_test, Y_test), epochs=5, batch_size=200, verbose=2)
print("==> Fitting model (this can take a while...)")
#model.fit(X_train, Y_train, validation_data=(X_test, Y_test), epochs=5, batch_size=200, verbose=2)
# Evaluate model on test data
print("==> Evaluating model")
score = model.evaluate(X_test, Y_test, verbose=0)
print "Score of trained model: "+str(score)
print "----> Score of trained model: "+str(score)
# Store the model
print("==> Storing model")
ws.store("mymodel", model)
print("==> Reloading model")
# Reload the model
reloaded_model = ws.load("mymodel")
# Recompile the loaded model
print("==> Compiling reloaded model")
reloaded_model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
# Evaluate the reloaded model
print("==> Evaluating reloaded model")
score = reloaded_model.evaluate(X_test, Y_test, verbose=0)
print "Score of reloaded model: "+str(score)
print("----> Score of reloaded model: "+str(score))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment