Commit 0e6bae54 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added proxy functions and more comments

parent c2a9e75b
......@@ -80,7 +80,7 @@ class BakeProviderHandle():
Set the threshold size bellow which this provider handle
will embed data within the RPC arguments.
"""
return _pybakeclient.set_eager_limit(self._ph, limit)
_pybakeclient.set_eager_limit(self._ph, limit)
def probe(self, max_targets=0):
"""
......@@ -115,8 +115,6 @@ class BakeProviderHandle():
A BakeRegionID object representing the region. None if an error occured.
"""
rid = _pybakeclient.create(self._ph, bti._tid, region_size)
if(rid == None):
return None
return BakeRegionID(rid)
def write(self, rid, offset, data):
......@@ -127,10 +125,8 @@ class BakeProviderHandle():
rid (BakeRegionID): region in which to write.
offset (int): offset at which to write.
data (str): data to write.
Returns:
True if the data was correctly written, False otherwise.
"""
return _pybakeclient.write(self._ph, rid._rid, offset, data)
_pybakeclient.write(self._ph, rid._rid, offset, data)
def write_numpy(self, rid, offset, array):
"""
......@@ -140,21 +136,39 @@ class BakeProviderHandle():
rid (BakeRegionID): region in which to write.
offset (int): offset at which to write.
data (numpy.ndarray): numpy array to write.
Returns:
True if the data was correctly written, False otherwise.
"""
return _pybakeclient.write_numpy(self._ph, rid._rid, offset, array)
_pybakeclient.write_numpy(self._ph, rid._rid, offset, array)
def proxy_write(self, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
"""
Writes data that is already exposed in a pymargo.bulk.Bulk object.
Args:
rid (BakeRegionID): region in which to write.
bulk (Bulk): bulk handle from which to get the data.
size (int): size to write.
offset_in_region (int): offset at which to write in the region.
offset_in_bulk (int): offset from which to read in the bulk object.
remote_addr (str): address of the process that created the bulk object.
"""
_pybakeclient.proxy_write(self._ph, rid._rid, offset_in_region,
bulk._hg_bulk, offset_in_bulk, remote_addr, size)
def persist(self, rid):
def persist(self, rid, offset=0, size=-1):
"""
Make the changes to a given region persist.
Offset is 0 by default. If size is not provided, PyBake will
call get_size to get it, which will throw an exception if Bake
has not been compiled with --enable-sizecheck.
Args:
rid (BakeRegionID): region to persist.
Returns:
True if the region was correctly persisted, False otherwise.
offset (int): offset in the region.
size (int): number of bytes to persist.
"""
return _pybakeclient.persist(self._ph, rid._rid)
if(size < 0):
size = self.get_size(rid) - offset
_pybakeclient.persist(self._ph, rid._rid, offset, size)
def create_write_persist(self, bti, data):
"""
......@@ -167,11 +181,27 @@ class BakeProviderHandle():
offset (int): offset at which to write data in the region.
data (str): data to write.
Returns:
True if the region was correctly written and persisted, False otherwise.
The created BakeRegionID.
"""
rid = _pybakeclient.create_write_persist(self._ph, bti._tid, data)
if(rid is None):
return None
return BakeRegionID(rid)
def proxy_create_write_persist(self, bti, bulk, size, offset_in_bulk=0, remote_addr=''):
"""
Version of create_write_persist that takes an already created bulk handle
as argument as well as the address of the process that created it.
Args:
bti (BakeTargetID): target id in which to create the region.
bulk (Bulk): bulk handle.
size (int): size of the region to create.
offset_in_bulk (int): offset in the bulk data.
remote_addr (str): Address of the process that created the bulk.
Returns:
The created BakeRegionID.
"""
rid = _pybakeclient.create_write_persist_proxy(self._ph,
bti._tid, bulk._hg_bulk, offset_in_bulk, remote_addr, size)
return BakeRegionID(rid)
def create_write_persist_numpy(self, bti, array):
......@@ -185,22 +215,21 @@ class BakeProviderHandle():
offset (int): offset at which to write data in the region.
array (numpy.ndarray): numpy array to write.
Returns:
True if the region was correctly written and persisted, False otherwise.
The created BakeRegionID.
"""
rid = _pybakeclient.create_write_persist_numpy(self._ph, bti._tid, array)
if(rid is None):
return None
return BakeRegionID(rid)
def get_size(self, rid):
"""
Get the size of a given region.
Get the size of a given region. Note thate bake should have been compiler
with --enable-sizecheck, otherwise this function will throw an exception.
Args:
rid (BakeRegionID): region id.
Returns:
The size (int) of the provided region. None in case of error.
The size (int) of the provided region.
"""
return _pybakeclient.get_size(self._ph, rid._rid)
......@@ -212,6 +241,10 @@ class BakeProviderHandle():
and then read from the offset up to the end of the region. If the
offset is not provided, the entire region is read.
Note that if bake has not been compiled with --enable-sizecheck,
then not providing the size argument will make this function throw
an exception.
Args:
rid (BakeRegionID): region id.
offset (int): offset at which to read.
......@@ -224,7 +257,24 @@ class BakeProviderHandle():
if(size < 0):
size = self.get_size(rid) - offset
return _pybakeclient.read(self._ph, rid._rid, offset, size)
def proxy_read(self, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
"""
Reads the data contained in a given region and pushes it to a provided bulk handle.
Args:
rid (BakeRegionID): region id.
bulk (Bulk): bulk handle where to push the data.
size (int): size to read.
offset_in_region (int): offset in the region where to start reading.
offset_in_bulk (int): offset in the bulk where to start placing data.
remote_addr (str): address of the process that created the Bulk object.
Returns:
the effective number of bytes read.
"""
return _pybakeclient.proxy_read(self._ph, rid._rid, offset_in_region, bulk._hg_bulk,
offset_in_bulk, remote_addr, size)
def read_numpy(self, rid, offset, shape, dtype):
"""
Reads the data contained in a given region, at a given offset,
......@@ -251,8 +301,7 @@ class BakeProviderHandle():
Args:
rid (BakeRegionID): region to remove.
"""
ret = _pybakeclient.remove(self._ph, rid._rid)
return (ret == 0)
_pybakeclient.remove(self._ph, rid._rid)
def migrate_region(self, source_rid, dest_addr, dest_provider_id, dest_target, remove_source=True):
"""
......@@ -271,8 +320,6 @@ class BakeProviderHandle():
"""
ret = _pybakeclient.migrate_region(self._ph, source_rid._rid, remove_source,
str(dest_addr), int(dest_provider_id), dest_target._tid)
if(ret == None):
return ret
return BakeRegionID(ret)
def migrate_target(self, source_tid, dest_addr, dest_provider_id, dest_root, remove_source=True):
......@@ -290,5 +337,5 @@ class BakeProviderHandle():
Returns:
True if the target was correctly migrated.
"""
return _pybakeclient.migrate_target(self._ph, source_tid._tid, remove_source,
_pybakeclient.migrate_target(self._ph, source_tid._tid, remove_source,
str(dest_addr), int(dest_provider_id), str(dest_root))
......@@ -9,27 +9,63 @@ def make_pool(name, size, mode):
_pybakeserver.make_pool(name, size, mode)
class BakeProvider(pymargo.Provider):
"""
The BakeProvide class wraps a C-level bake_provider_t object.
"""
def __init__(self, mid, provider_id):
super(BakeProvider, self).__init__(mid, provider_id)
def __init__(self, engine, provider_id):
"""
Constructor. Initializes a provider with an Engine and provider_id.
"""
super(BakeProvider, self).__init__(engine, provider_id)
self._provider = _pybakeserver.register(mid._mid, provider_id)
def add_storage_target(self, name):
tid = _pybakeserver.add_storage_target(self._provider, name)
def add_storage_target(self, path):
"""
Adds a storage target to the provider.
Returns a BakeTargetID instance that can be used to access the storage target.
"""
tid = _pybakeserver.add_storage_target(self._provider, path)
return BakeTargetID(tid)
def remove_storage_target(self, target):
return _pybakeserver.remove_storage_target(self._provider, target._tid)
"""
Removes a storage target from the provider. This does not delete the underlying file.
The target argument must be a BakeTargetID object.
"""
_pybakeserver.remove_storage_target(self._provider, target._tid)
def remove_all_storage_targets(self):
return _pybakeserver.remove_all_storage_targets(self._provider)
"""
Removes all the storage targets managed by this provider.
"""
_pybakeserver.remove_all_storage_targets(self._provider)
def count_storage_targets(self):
"""
Returns the number of storage targets that this provider manages.
"""
return _pybakeserver.count_storage_targets(self._provider)
def list_storage_targets(self):
"""
Returns the list of storage targets (BakeTargetIDs) that this provider manages.
"""
l = _pybakeserver.list_storage_targets(self._provider)
if(l is None):
return []
else:
return [ BakeTargetID(tid) for tid in l ]
def set_target_xfer_buffer(self, target, count, size):
"""
Sets the number and size of intermediate buffers that can be used to
execute transfers to a specific target.
"""
_pybakeserver.set_target_xfer_buffer(self._provider, target._tid, count, size)
def set_target_xfer_concurrency(self, target, num_threads):
"""
Sets the number of ULTs that can be used to execute transfers concurrently.
"""
_pybakeserver.set_target_xfer_concurrency(self._provider, target._tid, num_threads)
......@@ -5,6 +5,8 @@
*/
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <stdexcept>
#include <sstream>
#include <string>
#include <vector>
#include <cstring>
......@@ -18,17 +20,28 @@ namespace np = py11;
typedef py11::capsule pymargo_instance_id;
typedef py11::capsule pymargo_addr;
typedef py11::capsule pymargo_bulk;
typedef py11::capsule pybake_client_t;
typedef py11::capsule pybake_provider_handle_t;
#define HANDLE_ERROR(__func, __ret) do {\
if(__ret != BAKE_SUCCESS) {\
std::stringstream ss;\
ss << #__func << "() failed (ret = " << __ret << ")";\
throw std::runtime_error(ss.str());\
}\
} while(0)
#define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id", nullptr)
#define ADDR2CAPSULE(__addr) py11::capsule((void*)(__addr), "hg_addr_t", nullptr)
#define BULK2CAPSULE(__blk) py11::capsule((void*)(__blk), "hg_bulk_t", nullptr)
#define BAKEPH2CAPSULE(__bph) py11::capsule((void*)(__bph), "bake_provider_handle_t", nullptr)
#define BAKECL2CAPSULE(__bcl) py11::capsule((void*)(__bcl), "bake_client_t", nullptr)
static pybake_client_t pybake_client_init(pymargo_instance_id mid) {
bake_client_t result = BAKE_CLIENT_NULL;
bake_client_init(mid, &result);
int ret = bake_client_init(mid, &result);
HANDLE_ERROR(bake_client_init, ret);
return BAKECL2CAPSULE(result);
}
......@@ -38,7 +51,8 @@ static pybake_provider_handle_t pybake_provider_handle_create(
uint8_t provider_id) {
bake_provider_handle_t providerHandle = BAKE_PROVIDER_HANDLE_NULL;
bake_provider_handle_create(client, addr, provider_id, &providerHandle);
int ret = bake_provider_handle_create(client, addr, provider_id, &providerHandle);
HANDLE_ERROR(bake_provider_handle_create, ret);
return BAKEPH2CAPSULE(providerHandle);
}
......@@ -47,7 +61,7 @@ static uint64_t pybake_get_eager_limit(
{
uint64_t limit;
int ret = bake_provider_handle_get_eager_limit(ph, &limit);
if(ret != 0) return 0;
HANDLE_ERROR(bake_provider_handle_get_eager_limit, ret);
return limit;
}
......@@ -62,7 +76,7 @@ static py11::object pybake_probe(
Py_BEGIN_ALLOW_THREADS
ret = bake_probe(ph, max_targets, targets.data(), &num_targets);
Py_END_ALLOW_THREADS
if(ret != 0) return py11::object();
HANDLE_ERROR(bake_probe, ret);
for(uint64_t i=0; i < num_targets; i++) {
result.append(py11::cast(targets[i]));
}
......@@ -80,11 +94,11 @@ static py11::object pybake_create(
Py_BEGIN_ALLOW_THREADS
ret = bake_create(ph, bti, region_size, &rid);
Py_END_ALLOW_THREADS
if(ret != 0) return py11::none();
else return py11::cast(rid);
HANDLE_ERROR(bake_create, ret);
return py11::cast(rid);
}
static py11::object pybake_write(
static void pybake_write(
pybake_provider_handle_t ph,
const bake_region_id_t& rid,
uint64_t offset,
......@@ -95,21 +109,36 @@ static py11::object pybake_write(
std::string data = (std::string)bdata;
ret = bake_write(ph, rid, offset, (const void*)data.data(), data.size());
Py_END_ALLOW_THREADS
if(ret == 0) return py11::cast(true);
else return py11::cast(false);
HANDLE_ERROR(bake_write, ret);
}
static void pybake_proxy_write(
pybake_provider_handle_t ph,
const bake_region_id_t& rid,
uint64_t offset,
pymargo_bulk bulk,
uint64_t remote_offset,
const std::string& remote_addr,
uint64_t size)
{
int ret;
const char* addr = remote_addr.size() > 0 ? remote_addr.c_str() : NULL;
Py_BEGIN_ALLOW_THREADS
ret = bake_proxy_write(ph, rid, offset, bulk, remote_offset, addr, size);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_proxy_write, ret);
}
#if HAS_NUMPY
static py11::object pybake_write_numpy(
static void pybake_write_numpy(
pybake_provider_handle_t ph,
const bake_region_id_t& rid,
uint64_t offset,
const np::array& data)
{
if(!(data.flags() &
(np::array::f_style | np::array::c_style))) {
std::cerr << "[pyBAKE error]: non-contiguous numpy arrays not yet supported" << std::endl;
return py11::cast(false);
if(!(data.flags() & (np::array::f_style | np::array::c_style))) {
throw std::runtime_error("Non-contiguous numpy arrays not yet supported in PyBake");
}
size_t size = data.dtype().itemsize();
for(int i = 0; i < data.ndim(); i++) {
......@@ -120,21 +149,21 @@ static py11::object pybake_write_numpy(
Py_BEGIN_ALLOW_THREADS
ret = bake_write(ph, rid, offset, buffer, size);
Py_END_ALLOW_THREADS
if(ret != 0) return py11::cast(false);
else return py11::cast(true);
HANDLE_ERROR(bake_write, ret);
}
#endif
static py11::object pybake_persist(
static void pybake_persist(
pybake_provider_handle_t ph,
const bake_region_id_t& rid)
const bake_region_id_t& rid,
size_t offset,
size_t size)
{
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_persist(ph, rid);
ret = bake_persist(ph, rid, offset, size);
Py_END_ALLOW_THREADS
if(ret == 0) return py11::cast(true);
else return py11::cast(false);
HANDLE_ERROR(bake_persist, ret);
}
static py11::object pybake_create_write_persist(
......@@ -149,8 +178,27 @@ static py11::object pybake_create_write_persist(
ret = bake_create_write_persist(ph, tid,
data.data(), data.size(), &rid);
Py_END_ALLOW_THREADS
if(ret == 0) return py11::cast(rid);
else return py11::none();
HANDLE_ERROR(bake_create_write_persist, ret);
return py11::cast(rid);
}
static py11::object pybake_create_write_persist_proxy(
pybake_provider_handle_t ph,
bake_target_id_t tid,
pymargo_bulk bulk,
uint64_t remote_offset,
const std::string& remote_addr,
uint64_t size)
{
bake_region_id_t rid;
int ret;
const char* addr = remote_addr.size() != 0 ? remote_addr.c_str() : NULL;
Py_BEGIN_ALLOW_THREADS
ret = bake_create_write_persist_proxy(ph, tid,
bulk, remote_offset, addr, size, &rid);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_create_write_persist_proxy, ret);
return py11::cast(rid);
}
#if HAS_NUMPY
......@@ -161,8 +209,7 @@ static py11::object pybake_create_write_persist_numpy(
{
bake_region_id_t rid;
if(!(data.flags() & (np::array::f_style | np::array::c_style))) {
std::cerr << "[pyBAKE error]: non-contiguous numpy arrays not yet supported" << std::endl;
return py11::none();
throw std::runtime_error("Non-contiguous numpy arrays not yet supported in PyBake");
}
size_t size = data.dtype().itemsize();
for(int i = 0; i < data.ndim(); i++) {
......@@ -174,8 +221,8 @@ static py11::object pybake_create_write_persist_numpy(
ret = bake_create_write_persist(ph, tid,
buffer, size, &rid);
Py_END_ALLOW_THREADS
if(ret == 0) return py11::cast(rid);
else return py11::none();
HANDLE_ERROR(bake_create_write_persist, ret);
return py11::cast(rid);
}
#endif
......@@ -188,8 +235,8 @@ static py11::object pybake_get_size(
Py_BEGIN_ALLOW_THREADS
ret = bake_get_size(ph, rid, &size);
Py_END_ALLOW_THREADS
if(ret == 0) return py11::cast(size);
else return py11::none();
HANDLE_ERROR(bake_get_size, ret);
return py11::cast(size);
}
static py11::object pybake_read(
......@@ -204,14 +251,34 @@ static py11::object pybake_read(
Py_BEGIN_ALLOW_THREADS
ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
Py_END_ALLOW_THREADS
if(ret != 0) return py11::none();
HANDLE_ERROR(bake_read, ret);
result.resize(bytes_read);
return py11::bytes(result);
}
static size_t pybake_proxy_read(
pybake_provider_handle_t ph,
const bake_region_id_t& rid,
uint64_t region_offset,
pymargo_bulk bulk,
uint64_t remote_offset,
const std::string& remote_addr,
size_t size)
{
uint64_t bytes_read;
int ret;
const char* addr = remote_addr.size() == 0 ? NULL : remote_addr.c_str();
Py_BEGIN_ALLOW_THREADS
ret = bake_proxy_read(ph, rid, region_offset, bulk, remote_offset, addr, size, &bytes_read);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_proxy_read, ret);
return bytes_read;
}
static py11::object pybake_migrate_region(
pybake_provider_handle_t source_ph,
const bake_region_id_t& source_rid,
size_t region_size,
bool remove_source,
const std::string& dest_addr,
uint16_t dest_provider_id,
......@@ -219,15 +286,15 @@ static py11::object pybake_migrate_region(
bake_region_id_t dest_rid;
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_migrate_region(source_ph, source_rid,
ret = bake_migrate_region(source_ph, source_rid, region_size,
remove_source, dest_addr.c_str(), dest_provider_id,
dest_target_id, &dest_rid);
Py_END_ALLOW_THREADS
if(ret != BAKE_SUCCESS) return py11::none();
HANDLE_ERROR(bake_migrate_region, ret);
return py11::cast(dest_rid);
}
static py11::object pybake_migrate_target(
static void pybake_migrate_target(
pybake_provider_handle_t source_ph,
const bake_target_id_t& source_tid,
bool remove_source,
......@@ -240,8 +307,7 @@ static py11::object pybake_migrate_target(
remove_source, dest_addr.c_str(), dest_provider_id,
dest_root.c_str());
Py_END_ALLOW_THREADS
if(ret != BAKE_SUCCESS) return py11::cast(false);
return py11::cast(true);
HANDLE_ERROR(bake_migrate_target, ret);
}
#if HAS_NUMPY
......@@ -263,46 +329,55 @@ static py11::object pybake_read_numpy(
Py_BEGIN_ALLOW_THREADS
ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
Py_END_ALLOW_THREADS
if(ret != 0) return py11::none();
if(bytes_read != size) return py11::none();
else return result;
HANDLE_ERROR(bake_read, ret);
if(bytes_read != size) {
std::stringstream ss;
ss << "bake_read could not read full numpy object (" << bytes_read << " bytes read";
throw std::runtime_error(ss.str());
}
return result;
}
#endif
PYBIND11_MODULE(_pybakeclient, m)
{
#if HAS_NUMPY
try { py11::module::import("numpy"); }
catch (...) {
std::cerr << "[Py-BAKE] Error: could not import numpy at C++ level" << std::endl;
exit(-1);
}
//try {
py11::module::import("numpy");
//}
//catch (...) {
// std::cerr << "[Py-BAKE] error: could not import numpy at C++ level" << std::endl;
// exit(-1);
//}
#endif
py11::module::import("_pybaketarget");
m.def("client_init", &pybake_client_init);
m.def("client_finalize", [](pybake_client_t clt) {
return bake_client_finalize(clt);} );
int ret = bake_client_finalize(clt); HANDLE_ERROR(bake_client_finalize, ret); } );
m.def("provider_handle_create", &pybake_provider_handle_create);
m.def("provider_handle_ref_incr", [](pybake_provider_handle_t pbph) {
return bake_provider_handle_ref_incr(pbph); });
int ret = bake_provider_handle_ref_incr(pbph); HANDLE_ERROR(bake_provider_handle_ref_incr, ret); });
m.def("provider_handle_release", [](pybake_provider_handle_t pbph) {
return bake_provider_handle_release(pbph); });
int ret = bake_provider_handle_release(pbph); HANDLE_ERROR(bake_provider_handle_release, ret); });
m.def("get_eager_limit", &pybake_get_eager_limit);
m.def("set_eager_limit", [](pybake_provider_handle_t pbph, uint64_t lim) {
return bake_provider_handle_set_eager_limit(pbph, lim); });
int ret = bake_provider_handle_set_eager_limit(pbph, lim); HANDLE_ERROR(bake_provider_handle_set_eager_limit, ret); });
m.def("probe", &pybake_probe);
m.def("create", &pybake_create);
m.def("write", &pybake_write);
m.def("proxy_write", &pybake_proxy_write);
m.def("persist", &pybake_persist);
m.def("create_write_persist", &pybake_create_write_persist);
m.def("create_write_persist_proxy", &pybake_create_write_persist_proxy);
m.def("get_size", &pybake_get_size);
m.def("read", &pybake_read);
m.def("proxy_read", &pybake_proxy_read);
m.def("remove", [](pybake_provider_handle_t pbph, bake_region_id_t rid) {
return bake_remove(pbph, rid);} );
int ret = bake_remove(pbph, rid); HANDLE_ERROR(bake_remove, ret); } );
m.def("migrate_region", &pybake_migrate_region);
m.def("migrate_target", &pybake_migrate_target);
m.def("shutdown_service", [](pybake_client_t client, pymargo_addr addr) {
return bake_shutdown_service(client, addr); });
int ret = bake_shutdown_service(client, addr); HANDLE_ERROR(bake_shutdown_service, ret); });
#if HAS_NUMPY
m.def("write_numpy", &pybake_write_numpy);
m.def("create_write_persist_numpy", &pybake_create_write_persist_numpy);
......
......@@ -4,6 +4,8 @@
* See COPYRIGHT in top-level directory.
*/
#include <pybind11/pybind11.h>
#include <sstream>
#include <stdexcept>
#include <string>
#include <vector>
#include <cstring>
......@@ -14,6 +16,14 @@