client.cpp 8.92 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
8 9 10 11 12 13 14 15
#include <string>
#include <vector>
#include <cstring>
#include <iostream>
#include <margo.h>
#include <bake.h>
#include <bake-client.h>

16 17
namespace py11 = pybind11;
namespace np = py11;
Matthieu Dorier's avatar
Matthieu Dorier committed
18

19 20 21 22
typedef py11::capsule pymargo_instance_id;
typedef py11::capsule pymargo_addr;
typedef py11::capsule pybake_client_t;
typedef py11::capsule pybake_provider_handle_t;
Matthieu Dorier's avatar
Matthieu Dorier committed
23

24 25 26 27 28 29
#define MID2CAPSULE(__mid)   py11::capsule((void*)(__mid), "margo_instance_id", nullptr)
#define ADDR2CAPSULE(__addr) py11::capsule((void*)(__addr), "hg_addr_t", nullptr)
#define BAKEPH2CAPSULE(__bph) py11::capsule((void*)(__bph), "bake_provider_handle_t", nullptr)
#define BAKECL2CAPSULE(__bcl) py11::capsule((void*)(__bcl), "bake_client_t", nullptr)

static pybake_client_t pybake_client_init(pymargo_instance_id mid) {
Matthieu Dorier's avatar
Matthieu Dorier committed
30 31
    bake_client_t result = BAKE_CLIENT_NULL;
    bake_client_init(mid, &result);
32
    return BAKECL2CAPSULE(result);
Matthieu Dorier's avatar
Matthieu Dorier committed
33 34
}

35 36 37
static pybake_provider_handle_t pybake_provider_handle_create(
        pybake_client_t client,
        pymargo_addr addr,
38
        uint8_t provider_id) {
Matthieu Dorier's avatar
Matthieu Dorier committed
39 40

    bake_provider_handle_t providerHandle = BAKE_PROVIDER_HANDLE_NULL;
41
    bake_provider_handle_create(client, addr, provider_id, &providerHandle);
42
    return BAKEPH2CAPSULE(providerHandle);
Matthieu Dorier's avatar
Matthieu Dorier committed
43 44
}

45
static uint64_t pybake_get_eager_limit(
46
    pybake_provider_handle_t ph)
47 48 49 50 51 52 53
{
    uint64_t limit;
    int ret = bake_provider_handle_get_eager_limit(ph, &limit);
    if(ret != 0) return 0;
    return limit;
}

54 55
static py11::object pybake_probe(
        pybake_provider_handle_t ph,
56 57
        uint64_t max_targets)
{
58
    py11::list result;
59 60
    std::vector<bake_target_id_t> targets(max_targets);
    uint64_t num_targets;
Matthieu Dorier's avatar
Matthieu Dorier committed
61 62 63 64
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_probe(ph, max_targets, targets.data(), &num_targets);
    Py_END_ALLOW_THREADS
65
    if(ret != 0) return py11::object();
66
    for(uint64_t i=0; i < num_targets; i++) {
67
        result.append(py11::cast(targets[i]));
68 69 70 71
    }
    return result;
}

72 73
static py11::object pybake_create(
        pybake_provider_handle_t ph,
74 75 76 77 78
        bake_target_id_t bti,
        size_t region_size)
{
    bake_region_id_t rid;
    std::memset(&rid, 0, sizeof(rid));
Matthieu Dorier's avatar
Matthieu Dorier committed
79 80 81 82
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_create(ph, bti, region_size, &rid);
    Py_END_ALLOW_THREADS
83 84
    if(ret != 0) return py11::none();
    else return py11::cast(rid);
85 86
}

87 88
static py11::object pybake_write(
        pybake_provider_handle_t ph,
89 90
        const bake_region_id_t& rid,
        uint64_t offset,
91
        const py11::bytes& bdata)
92
{
Matthieu Dorier's avatar
Matthieu Dorier committed
93 94
    int ret;
    Py_BEGIN_ALLOW_THREADS
95
    std::string data = (std::string)bdata;
Matthieu Dorier's avatar
Matthieu Dorier committed
96 97
    ret = bake_write(ph, rid, offset, (const void*)data.data(), data.size());
    Py_END_ALLOW_THREADS
98 99
    if(ret == 0) return py11::cast(true);
    else return py11::cast(false);
100 101
}

Matthieu Dorier's avatar
Matthieu Dorier committed
102
#if HAS_NUMPY
103 104
static py11::object pybake_write_numpy(
        pybake_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
105 106
        const bake_region_id_t& rid,
        uint64_t offset,
107
        const np::array& data)
Matthieu Dorier's avatar
Matthieu Dorier committed
108
{
109 110
    if(!(data.flags() & 
            (np::array::f_style | np::array::c_style))) {
Matthieu Dorier's avatar
Matthieu Dorier committed
111
        std::cerr << "[pyBAKE error]: non-contiguous numpy arrays not yet supported" << std::endl;
112
        return py11::cast(false);
Matthieu Dorier's avatar
Matthieu Dorier committed
113
    }
114 115
    size_t size = data.dtype().itemsize();
    for(int i = 0; i < data.ndim(); i++) {
Matthieu Dorier's avatar
Matthieu Dorier committed
116 117
        size *= data.shape(i);
    }
118
    const void* buffer = data.data();
Matthieu Dorier's avatar
Matthieu Dorier committed
119 120 121 122
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_write(ph, rid, offset, buffer, size);
    Py_END_ALLOW_THREADS
123 124
    if(ret != 0) return py11::cast(false);
    else return py11::cast(true);
Matthieu Dorier's avatar
Matthieu Dorier committed
125 126 127
}
#endif

128 129
static py11::object pybake_persist(
        pybake_provider_handle_t ph,
130 131
        const bake_region_id_t& rid)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
132 133 134 135
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_persist(ph, rid);
    Py_END_ALLOW_THREADS
136 137
    if(ret == 0) return py11::cast(true);
    else return py11::cast(false);
138 139
}

140 141
static py11::object pybake_create_write_persist(
        pybake_provider_handle_t ph,
142
        bake_target_id_t tid,
143
        const py11::bytes& bdata)
144 145
{
    bake_region_id_t rid;
Matthieu Dorier's avatar
Matthieu Dorier committed
146 147
    int ret;
    Py_BEGIN_ALLOW_THREADS
148
    std::string data = (std::string)bdata;
Matthieu Dorier's avatar
Matthieu Dorier committed
149
    ret = bake_create_write_persist(ph, tid, 
150
            data.data(), data.size(), &rid);
Matthieu Dorier's avatar
Matthieu Dorier committed
151
    Py_END_ALLOW_THREADS
152 153
    if(ret == 0) return py11::cast(rid);
    else return py11::none();
154 155
}

Matthieu Dorier's avatar
Matthieu Dorier committed
156
#if HAS_NUMPY
157 158
static py11::object pybake_create_write_persist_numpy(
        pybake_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
159
        bake_target_id_t tid,
160
        const np::array& data)
Matthieu Dorier's avatar
Matthieu Dorier committed
161 162
{
    bake_region_id_t rid;
163
    if(!(data.flags() & (np::array::f_style | np::array::c_style))) {
Matthieu Dorier's avatar
Matthieu Dorier committed
164
        std::cerr << "[pyBAKE error]: non-contiguous numpy arrays not yet supported" << std::endl;
165
        return py11::none();
Matthieu Dorier's avatar
Matthieu Dorier committed
166
    }
167 168
    size_t size = data.dtype().itemsize();
    for(int i = 0; i < data.ndim(); i++) {
Matthieu Dorier's avatar
Matthieu Dorier committed
169 170
        size *= data.shape(i);
    }
171
    const void* buffer = data.data();
Matthieu Dorier's avatar
Matthieu Dorier committed
172 173 174
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_create_write_persist(ph, tid, 
Matthieu Dorier's avatar
Matthieu Dorier committed
175
            buffer, size, &rid);
Matthieu Dorier's avatar
Matthieu Dorier committed
176
    Py_END_ALLOW_THREADS
177 178
    if(ret == 0) return py11::cast(rid);
    else return py11::none();
Matthieu Dorier's avatar
Matthieu Dorier committed
179 180 181
}
#endif

182 183
static py11::object pybake_get_size(
        pybake_provider_handle_t ph,
184 185 186
        const bake_region_id_t& rid)
{
    uint64_t size;
Matthieu Dorier's avatar
Matthieu Dorier committed
187 188 189 190
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_get_size(ph, rid, &size);
    Py_END_ALLOW_THREADS
191 192
    if(ret == 0) return py11::cast(size);
    else return py11::none();
193 194
}

195 196
static py11::object pybake_read(
        pybake_provider_handle_t ph,
197 198 199 200 201 202
        const bake_region_id_t& rid,
        uint64_t offset,
        size_t size) 
{
    std::string result(size, '\0');
    uint64_t bytes_read;
Matthieu Dorier's avatar
Matthieu Dorier committed
203 204 205 206
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
    Py_END_ALLOW_THREADS
207
    if(ret != 0) return py11::none();
208
    result.resize(bytes_read);
209
    return py11::bytes(result);
210 211
}

212 213
static py11::object pybake_migrate(
        pybake_provider_handle_t source_ph,
214 215 216 217 218 219
        const bake_region_id_t& source_rid,
        bool remove_source,
        const std::string& dest_addr,
        uint16_t dest_provider_id,
        bake_target_id_t dest_target_id) {
    bake_region_id_t dest_rid;
Matthieu Dorier's avatar
Matthieu Dorier committed
220 221 222
    int ret;
    Py_BEGIN_ALLOW_THREADS
    ret = bake_migrate(source_ph, source_rid,
223 224
            remove_source, dest_addr.c_str(), dest_provider_id,
            dest_target_id, &dest_rid);
Matthieu Dorier's avatar
Matthieu Dorier committed
225
    Py_END_ALLOW_THREADS
226 227
    if(ret != BAKE_SUCCESS) return py11::none();
    return py11::cast(dest_rid);
228 229
}

Matthieu Dorier's avatar
Matthieu Dorier committed
230
#if HAS_NUMPY
231 232
static py11::object pybake_read_numpy(
        pybake_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
233 234
        const bake_region_id_t& rid,
        uint64_t offset,
235
        const py11::tuple& shape,
Matthieu Dorier's avatar
Matthieu Dorier committed
236 237
        const np::dtype& dtype)
{
238 239 240 241 242
    std::vector<ssize_t> sshape(shape.size());
    for(unsigned int i=0; i<sshape.size(); i++) sshape[i] = shape[i].cast<ssize_t>();
    np::array result(dtype, sshape);
    size_t size = dtype.itemsize();
    for(int i=0; i < result.ndim(); i++) 
Matthieu Dorier's avatar
Matthieu Dorier committed
243 244
        size *= result.shape(i);
    uint64_t bytes_read;
Matthieu Dorier's avatar
Matthieu Dorier committed
245 246
    int ret;
    Py_BEGIN_ALLOW_THREADS
247
    ret = bake_read(ph, rid, offset, (void*)result.data(), size, &bytes_read);
Matthieu Dorier's avatar
Matthieu Dorier committed
248
    Py_END_ALLOW_THREADS
249 250
    if(ret != 0) return py11::none();
    if(bytes_read != size) return py11::none();
Matthieu Dorier's avatar
Matthieu Dorier committed
251 252 253 254
    else return result;
}
#endif

255
PYBIND11_MODULE(_pybakeclient, m)
Matthieu Dorier's avatar
Matthieu Dorier committed
256
{
Matthieu Dorier's avatar
Matthieu Dorier committed
257
#if HAS_NUMPY
258 259 260 261 262
    try { py11::module::import("numpy"); }
    catch (...) {
        std::cerr << "[Py-BAKE] Error: could not import numpy at C++ level" << std::endl;
        exit(-1);
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
263
#endif
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
    py11::module::import("_pybaketarget");
    m.def("client_init", &pybake_client_init);
    m.def("client_finalize", [](pybake_client_t clt) {
            return bake_client_finalize(clt);} );
    m.def("provider_handle_create", &pybake_provider_handle_create);
    m.def("provider_handle_ref_incr", [](pybake_provider_handle_t pbph) {
            return bake_provider_handle_ref_incr(pbph); });
    m.def("provider_handle_release", [](pybake_provider_handle_t pbph) {
            return bake_provider_handle_release(pbph); });
    m.def("get_eager_limit", &pybake_get_eager_limit);
    m.def("set_eager_limit", [](pybake_provider_handle_t pbph, uint64_t lim) {
            return bake_provider_handle_set_eager_limit(pbph, lim); });
    m.def("probe", &pybake_probe);
    m.def("create", &pybake_create);
    m.def("write", &pybake_write);
    m.def("persist", &pybake_persist);
    m.def("create_write_persist", &pybake_create_write_persist);
    m.def("get_size", &pybake_get_size);
    m.def("read", &pybake_read);
    m.def("remove", [](pybake_provider_handle_t pbph, bake_region_id_t rid) {
            return bake_remove(pbph, rid);} );
    m.def("migrate", &pybake_migrate);
    m.def("shutdown_service", [](pybake_client_t client, pymargo_addr addr) {
            return bake_shutdown_service(client, addr); });
Matthieu Dorier's avatar
Matthieu Dorier committed
288
#if HAS_NUMPY
289 290 291
    m.def("write_numpy", &pybake_write_numpy);
    m.def("create_write_persist_numpy", &pybake_create_write_persist_numpy);
    m.def("read_numpy", &pybake_read_numpy);
Matthieu Dorier's avatar
Matthieu Dorier committed
292
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
293
}