Commit 22f066bb authored by Matthieu Dorier's avatar Matthieu Dorier

adapted to Bake 0.4

parent 27045528
......@@ -117,35 +117,38 @@ class BakeProviderHandle():
rid = _pybakeclient.create(self._ph, bti._tid, region_size)
return BakeRegionID(rid)
def write(self, rid, offset, data):
def write(self, tid, rid, offset, data):
"""
Writes data in a region, at a specified offset.
Args:
tid (BakeTargetID): target in which to write.
rid (BakeRegionID): region in which to write.
offset (int): offset at which to write.
data (str): data to write.
"""
if(isinstance(data,str)):
data = data.encode()
_pybakeclient.write(self._ph, rid._rid, offset, data)
_pybakeclient.write(self._ph, tid._tid, rid._rid, offset, data)
def write_numpy(self, rid, offset, array):
def write_numpy(self, tid, rid, offset, array):
"""
Writes a numpy array in a region at a specified offset.
Args:
tid (BakeTargetID): target in which to write.
rid (BakeRegionID): region in which to write.
offset (int): offset at which to write.
data (numpy.ndarray): numpy array to write.
"""
_pybakeclient.write_numpy(self._ph, rid._rid, offset, array)
_pybakeclient.write_numpy(self._ph, tid._tid, rid._rid, offset, array)
def proxy_write(self, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
def proxy_write(self, tid, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
"""
Writes data that is already exposed in a pymargo.bulk.Bulk object.
Args:
tid (BakeTargetID): target in which to write.
rid (BakeRegionID): region in which to write.
bulk (Bulk): bulk handle from which to get the data.
size (int): size to write.
......@@ -156,7 +159,7 @@ class BakeProviderHandle():
_pybakeclient.proxy_write(self._ph, rid._rid, offset_in_region,
bulk._hg_bulk, offset_in_bulk, remote_addr, size)
def persist(self, rid, offset=0, size=-1):
def persist(self, tid, rid, offset=0, size=-1):
"""
Make the changes to a given region persist.
Offset is 0 by default. If size is not provided, PyBake will
......@@ -164,13 +167,14 @@ class BakeProviderHandle():
has not been compiled with --enable-sizecheck.
Args:
tid (BakeTargetID): target in the region is.
rid (BakeRegionID): region to persist.
offset (int): offset in the region.
size (int): number of bytes to persist.
"""
if(size < 0):
size = self.get_size(rid) - offset
_pybakeclient.persist(self._ph, rid._rid, offset, size)
_pybakeclient.persist(self._ph, tid._tid, rid._rid, offset, size)
def create_write_persist(self, bti, data):
"""
......@@ -224,20 +228,21 @@ class BakeProviderHandle():
rid = _pybakeclient.create_write_persist_numpy(self._ph, bti._tid, array)
return BakeRegionID(rid)
def get_size(self, rid):
def get_size(self, tid, rid):
"""
Get the size of a given region. Note thate bake should have been compiler
with --enable-sizecheck, otherwise this function will throw an exception.
Args:
tid (BakeTargetID): target id.
rid (BakeRegionID): region id.
Returns:
The size (int) of the provided region.
"""
return _pybakeclient.get_size(self._ph, rid._rid)
return _pybakeclient.get_size(self._ph, tid._tid, rid._rid)
def read(self, rid, offset=0, size=-1):
def read(self, tid, rid, offset=0, size=-1):
"""
Reads the data contained in a given region, at a given offset
and with a given size. If the size is not provided,
......@@ -250,6 +255,7 @@ class BakeProviderHandle():
an exception.
Args:
tid (BakeTargetID): target id.
rid (BakeRegionID): region id.
offset (int): offset at which to read.
size (int): size to read.
......@@ -260,13 +266,14 @@ class BakeProviderHandle():
"""
if(size < 0):
size = self.get_size(rid) - offset
return _pybakeclient.read(self._ph, rid._rid, offset, size)
return _pybakeclient.read(self._ph, tid._tid, rid._rid, offset, size)
def proxy_read(self, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
def proxy_read(self, tid, rid, bulk, size, offset_in_region=0, offset_in_bulk=0, remote_addr=''):
"""
Reads the data contained in a given region and pushes it to a provided bulk handle.
Args:
tid (BakeTargetID): target id.
rid (BakeRegionID): region id.
bulk (Bulk): bulk handle where to push the data.
size (int): size to read.
......@@ -276,10 +283,10 @@ class BakeProviderHandle():
Returns:
the effective number of bytes read.
"""
return _pybakeclient.proxy_read(self._ph, rid._rid, offset_in_region, bulk._hg_bulk,
return _pybakeclient.proxy_read(self._ph, tid._tid, rid._rid, offset_in_region, bulk._hg_bulk,
offset_in_bulk, remote_addr, size)
def read_numpy(self, rid, offset, shape, dtype):
def read_numpy(self, tid, rid, offset, shape, dtype):
"""
Reads the data contained in a given region, at a given offset,
and interpret it as a numpy array of a given shape and datatype.
......@@ -288,6 +295,7 @@ class BakeProviderHandle():
compared with the size of the numpy that should result from the call)
Args:
tid (BakeTargetID): target id.
rid (BakeRegionID): region id.
offset (int): offset at which to read.
shape (tuple): shape of the resulting array.
......@@ -296,24 +304,26 @@ class BakeProviderHandle():
Returns:
A numpy array or None if it could not be read.
"""
return _pybakeclient.read_numpy(self._ph, rid._rid, offset, tuple(shape), dtype)
return _pybakeclient.read_numpy(self._ph, tid._tid, rid._rid, offset, tuple(shape), dtype)
def remove(self, rid):
def remove(self, tid, rid):
"""
Remove a region from its target.
Args:
tid (BakeTargetID): target id.
rid (BakeRegionID): region to remove.
"""
_pybakeclient.remove(self._ph, rid._rid)
_pybakeclient.remove(self._ph, tid._tid, rid._rid)
def migrate_region(self, source_rid, dest_addr, dest_provider_id, dest_target, remove_source=True):
def migrate_region(self, source_tid, source_rid, dest_addr, dest_provider_id, dest_target, remove_source=True):
"""
Migrates a give region from its source to a destination designated by
an address, a provider id, and a target id. This function will also remove
the original region if remove_source is set to True.
Args:
source_tid (BakeTargetID): source target id.
source_rid (BakeRegionID): region to remove.
dest_addr (str): destination address.
dest_provider_id (int): destination provider id.
......@@ -322,7 +332,7 @@ class BakeProviderHandle():
Returns:
The resulting BakeRegionID if successful, None otherwise.
"""
ret = _pybakeclient.migrate_region(self._ph, source_rid._rid, remove_source,
ret = _pybakeclient.migrate_region(self._ph, source_tid._tid, source_rid._rid, remove_source,
str(dest_addr), int(dest_provider_id), dest_target._tid)
return BakeRegionID(ret)
......
......@@ -85,7 +85,7 @@ static py11::object pybake_probe(
static py11::object pybake_create(
pybake_provider_handle_t ph,
bake_target_id_t bti,
const bake_target_id_t& bti,
size_t region_size)
{
bake_region_id_t rid;
......@@ -100,6 +100,7 @@ static py11::object pybake_create(
static void pybake_write(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t offset,
const py11::bytes& bdata)
......@@ -107,13 +108,14 @@ static void pybake_write(
int ret;
Py_BEGIN_ALLOW_THREADS
std::string data = (std::string)bdata;
ret = bake_write(ph, rid, offset, (const void*)data.data(), data.size());
ret = bake_write(ph, tid, rid, offset, (const void*)data.data(), data.size());
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_write, ret);
}
static void pybake_proxy_write(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t offset,
pymargo_bulk bulk,
......@@ -124,7 +126,7 @@ static void pybake_proxy_write(
int ret;
const char* addr = remote_addr.size() > 0 ? remote_addr.c_str() : NULL;
Py_BEGIN_ALLOW_THREADS
ret = bake_proxy_write(ph, rid, offset, bulk, remote_offset, addr, size);
ret = bake_proxy_write(ph, tid, rid, offset, bulk, remote_offset, addr, size);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_proxy_write, ret);
}
......@@ -132,6 +134,7 @@ static void pybake_proxy_write(
#if HAS_NUMPY
static void pybake_write_numpy(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t offset,
const np::array& data)
......@@ -147,7 +150,7 @@ static void pybake_write_numpy(
const void* buffer = data.data();
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_write(ph, rid, offset, buffer, size);
ret = bake_write(ph, tid, rid, offset, buffer, size);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_write, ret);
}
......@@ -155,13 +158,14 @@ static void pybake_write_numpy(
static void pybake_persist(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
size_t offset,
size_t size)
{
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_persist(ph, rid, offset, size);
ret = bake_persist(ph, tid, rid, offset, size);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_persist, ret);
}
......@@ -228,12 +232,13 @@ static py11::object pybake_create_write_persist_numpy(
static py11::object pybake_get_size(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid)
{
uint64_t size;
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_get_size(ph, rid, &size);
ret = bake_get_size(ph, tid, rid, &size);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_get_size, ret);
return py11::cast(size);
......@@ -241,6 +246,7 @@ static py11::object pybake_get_size(
static py11::object pybake_read(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t offset,
size_t size)
......@@ -249,7 +255,7 @@ static py11::object pybake_read(
uint64_t bytes_read;
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
ret = bake_read(ph, tid, rid, offset, (void*)result.data(), size, &bytes_read);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_read, ret);
result.resize(bytes_read);
......@@ -258,6 +264,7 @@ static py11::object pybake_read(
static size_t pybake_proxy_read(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t region_offset,
pymargo_bulk bulk,
......@@ -269,7 +276,7 @@ static size_t pybake_proxy_read(
int ret;
const char* addr = remote_addr.size() == 0 ? NULL : remote_addr.c_str();
Py_BEGIN_ALLOW_THREADS
ret = bake_proxy_read(ph, rid, region_offset, bulk, remote_offset, addr, size, &bytes_read);
ret = bake_proxy_read(ph, tid, rid, region_offset, bulk, remote_offset, addr, size, &bytes_read);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_proxy_read, ret);
return bytes_read;
......@@ -277,6 +284,7 @@ static size_t pybake_proxy_read(
static py11::object pybake_migrate_region(
pybake_provider_handle_t source_ph,
const bake_target_id_t& tid,
const bake_region_id_t& source_rid,
size_t region_size,
bool remove_source,
......@@ -286,7 +294,7 @@ static py11::object pybake_migrate_region(
bake_region_id_t dest_rid;
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_migrate_region(source_ph, source_rid, region_size,
ret = bake_migrate_region(source_ph, tid, source_rid, region_size,
remove_source, dest_addr.c_str(), dest_provider_id,
dest_target_id, &dest_rid);
Py_END_ALLOW_THREADS
......@@ -313,6 +321,7 @@ static void pybake_migrate_target(
#if HAS_NUMPY
static py11::object pybake_read_numpy(
pybake_provider_handle_t ph,
const bake_target_id_t& tid,
const bake_region_id_t& rid,
uint64_t offset,
const py11::tuple& shape,
......@@ -327,7 +336,7 @@ static py11::object pybake_read_numpy(
uint64_t bytes_read;
int ret;
Py_BEGIN_ALLOW_THREADS
ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
ret = bake_read(ph, tid, rid, offset, (void*)result.data(), size, &bytes_read);
Py_END_ALLOW_THREADS
HANDLE_ERROR(bake_read, ret);
if(bytes_read != size) {
......@@ -372,8 +381,8 @@ PYBIND11_MODULE(_pybakeclient, m)
m.def("get_size", &pybake_get_size);
m.def("read", &pybake_read);
m.def("proxy_read", &pybake_proxy_read);
m.def("remove", [](pybake_provider_handle_t pbph, bake_region_id_t rid) {
int ret = bake_remove(pbph, rid); HANDLE_ERROR(bake_remove, ret); } );
m.def("remove", [](pybake_provider_handle_t pbph, const bake_target_id_t& tid, const bake_region_id_t& rid) {
int ret = bake_remove(pbph, tid, rid); HANDLE_ERROR(bake_remove, ret); } );
m.def("migrate_region", &pybake_migrate_region);
m.def("migrate_target", &pybake_migrate_target);
m.def("shutdown_service", [](pybake_client_t client, pymargo_addr addr) {
......
......@@ -2,12 +2,12 @@
# See COPYRIGHT in top-level directory.
import sys
sys.path.append('.')
sys.path.append('build/lib.linux-x86_64-3.6')
from pymargo import MargoInstance
sys.path.append('build/lib.linux-x86_64-3.7')
from pymargo.core import Engine
from pybake.target import BakeRegionID
from pybake.client import *
mid = MargoInstance('tcp')
mid = Engine('ofi+tcp')
def test():
......@@ -37,20 +37,20 @@ def test():
region = BakeRegionID.from_str(regionstr)
print("reconverting region to string: "+str(region))
# write into the region
ph.write(region, 0, 'A'*16)
ph.write(region, 16, 'B'*16)
ph.write(target, region, 0, 'A'*16)
ph.write(target, region, 16, 'B'*16)
# get size of region
try:
s = ph.get_size(region)
s = ph.get_size(target, region)
print("Region size is "+str(s))
except:
print("Getting region size is not supported")
# persist region
ph.persist(region, size=32)
ph.persist(target, region, size=32)
# read region
result = ph.read(region, 8, 16)
result = ph.read(target, region, 8, 16)
print("Reading region at offset 8, size 16 gives: "+str(result))
del ph
......
# (C) 2018 The University of Chicago
# See COPYRIGHT in top-level directory.
import sys
from pymargo import MargoInstance
from pymargo.core import Engine
from pybake.target import BakeRegionID
from pybake.client import *
import numpy as np
mid = MargoInstance('tcp')
mid = Engine('ofi+tcp')
server_addr = sys.argv[1]
mplex_id = int(sys.argv[2])
......@@ -34,11 +34,11 @@ print str(arr)
region = ph.create_write_persist_numpy(target, arr)
# get size of region
s = ph.get_size(region)
s = ph.get_size(target, region)
print "Region size is "+str(s)
# read region
result = ph.read_numpy(region, 0, shape=(5,6), dtype=arr.dtype)
result = ph.read_numpy(target, region, 0, shape=(5,6), dtype=arr.dtype)
# check for equalit
print "Reading region gave the following numpy array: "
print str(result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment