core-read-op.cpp 15.5 KB
Newer Older
1 2 3
#include <map>
#include <string>
#include <iostream>
4 5
#include <algorithm>
#include <list>
6
#include <bake-client.h>
7 8 9 10 11
#include "src/server/core/core-read-op.h"
#include "src/server/visitor-args.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
12
#include "src/server/core/key-types.h"
13 14
#include "src/server/core/fake-kv.hpp"
#include "src/server/core/covermap.hpp"
15

16
static int tabs = 0;
17
/*
18 19 20
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
21 22 23 24
*/
#define ENTERING
#define LEAVING
#define ERROR
25

26 27 28 29 30 31 32 33
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_exec_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);

34 35 36 37 38
static oid_t get_oid_from_name(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        const char* name);

39 40 41 42 43 44 45
struct read_request_t {
    double timestamp;              // timestamp at which the segment was created
    uint64_t absolute_start_index; // start index within the object
    uint64_t absolute_end_index;   // end index within the object
    uint64_t region_start_index;   // where to start within the region
    uint64_t region_end_index;     // where to end within the region
    uint64_t client_offset;        // offset within the client's buffer
46
    bake_region_id_t region;  // region id
47 48
};

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
static struct read_op_visitor read_op_exec = {
	.visit_begin                 = read_op_exec_begin,
	.visit_stat                  = read_op_exec_stat,
	.visit_read                  = read_op_exec_read,
	.visit_omap_get_keys         = read_op_exec_omap_get_keys,
	.visit_omap_get_vals         = read_op_exec_omap_get_vals,
	.visit_omap_get_vals_by_keys = read_op_exec_omap_get_vals_by_keys,
	.visit_end                   = read_op_exec_end
};

extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs)
{
	execute_read_op_visitor(&read_op_exec, read_op, (void*)vargs);
}

void read_op_exec_begin(void* u)
{
66
    ENTERING;
67
    auto vargs = static_cast<server_visitor_args_t>(u);
68
    LEAVING;
69 70 71 72
}

void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
73
    ENTERING;
74
    auto vargs = static_cast<server_visitor_args_t>(u);
75 76
    // TODO
    LEAVING;
77 78 79 80
}

void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
81
    ENTERING;
82
    auto vargs = static_cast<server_visitor_args_t>(u);
83 84
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
85
    bake_region_id_t rid;
86 87 88 89 90 91 92 93 94 95 96 97
    hg_bulk_t remote_bulk = vargs->bulk_handle;
    const char* object_name = vargs->object_name;
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
    int ret;

    uint64_t client_start_index = offset;
    uint64_t client_end_index = offset+len;

    *prval = 0;

    // find oid
98 99 100 101 102 103 104 105
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
        vargs->oid = oid;
    }
    if(oid == 0) {
106
        *prval = -1;
107 108
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
109 110 111 112 113 114 115 116 117 118 119
        return;
    }

    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = std::numeric_limits<double>::max();
    auto it = segment_map.lower_bound(lb);
    covermap<uint64_t> coverage(offset, offset+len);

    while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) {
        const segment_key_t& seg = it->first;
120
        const bake_region_id_t& region = it->second;
121
       
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
        switch(seg.type) {
        case seg_type_t::ZERO:
            coverage.set(seg.start_index, seg.end_index);
            if(*bytes_read < seg.end_index) *bytes_read = seg.end_index;
            break;
        case seg_type_t::TOMBSTONE:
            coverage.set(seg.start_index, seg.end_index);
            if(*bytes_read < seg.start_index) *bytes_read = seg.start_index;
            break;
        case seg_type_t::BAKE_REGION: {
            auto ranges = coverage.set(seg.start_index, seg.end_index);
            for(auto r : ranges) {
                uint64_t segment_size  = r.end - r.start;
                uint64_t region_offset = r.start - seg.start_index;
                uint64_t remote_offset = r.start - offset;
137
                ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
138
                        remote_offset, remote_addr_str, segment_size);
139 140 141 142 143
                if(ret != 0) {
                    ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
                    LEAVING;
                    return;
                }
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
                if(*bytes_read < r.end) *bytes_read = r.end;
            }
            break;
          }
        case seg_type_t::SMALL_REGION: {
            auto ranges = coverage.set(seg.start_index, seg.end_index);
            const char* base = static_cast<const char*>((void*)(&region));
            margo_instance_id mid = vargs->srv_ctx->mid;
            for(auto r : ranges) {
                uint64_t segment_size  = r.end - r.start;
                uint64_t region_offset = r.start - seg.start_index;
                uint64_t remote_offset = r.start - offset;
                void* buf_ptrs[1] = { const_cast<char*>(base + region_offset) };
                hg_size_t buf_sizes[1] = { segment_size };
                hg_bulk_t handle;
                ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
160 161 162 163 164
                if(ret != HG_SUCCESS) {
                    ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
                    LEAVING;
                    return;
                }
165
                ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size);
166 167 168 169 170
                if(ret != HG_SUCCESS) {
                    ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
                    LEAVING;
                    return;
                }
171
                ret = margo_bulk_free(handle);
172 173 174 175 176
                if(ret != HG_SUCCESS) {
                    ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
                    LEAVING;
                    return;
                }
177 178 179 180 181 182
                if(*bytes_read < r.end) *bytes_read = r.end;
            }
          }
        }
        it++;
    }
183
    LEAVING;
184 185 186 187 188
}

void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return, 
				mobject_store_omap_iter_t* iter, int* prval)
{
189
    ENTERING;
190
    auto vargs = static_cast<server_visitor_args_t>(u);
191
    const char* object_name = vargs->object_name;
Matthieu Dorier's avatar
Matthieu Dorier committed
192 193 194
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
    int ret;
195 196
    *prval = 0;

197 198 199 200 201 202 203 204
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
        vargs->oid = oid;
    }
    if(oid == 0) {
205
        *prval = -1;
206 207
        ERROR fprintf(stderr, "oid == 0\n");
        LEAVING;
208 209
        return;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
210
    
211
    omap_iter_create(iter);
Matthieu Dorier's avatar
Matthieu Dorier committed
212 213 214 215
    size_t lb_size = sizeof(omap_key_t)+strlen(start_after);
    omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
    lb->oid = oid;
    strcpy(lb->key, start_after);
216

Matthieu Dorier's avatar
Matthieu Dorier committed
217
    hg_size_t max_keys = 10;
218
    hg_size_t key_len  = MAX_OMAP_KEY_SIZE+sizeof(omap_key_t); 
Matthieu Dorier's avatar
Matthieu Dorier committed
219 220 221 222
    std::vector<void*> keys(max_keys);
    std::vector<hg_size_t> ksizes(max_keys, key_len);
    std::vector<std::vector<char>> buffers(max_keys, std::vector<char>(key_len));
    for(auto i=0; i < max_keys; i++) keys[i] = (void*)buffers[i].data();
223

Matthieu Dorier's avatar
Matthieu Dorier committed
224 225 226 227 228 229 230 231 232
    hg_size_t keys_retrieved = max_keys;
    hg_size_t count = 0;
    do {
        ret = sdskv_list_keys(sdskv_ph, omap_db_id,
                (const void*)lb, lb_size,
                keys.data(), ksizes.data(),
                &keys_retrieved);
        if(ret != SDSKV_SUCCESS) {
            *prval = -1;
233
            ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
            break;
        }
        const char* k;
        for(auto i = 0; i < keys_retrieved && count < max_return; i++, count++) {
            // extract the actual key part, without the oid
            k = ((omap_key_t*)keys[i])->key;
            /* this key is not part of the same object, we should leave the loop */
            if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */

            omap_iter_append(*iter, k, nullptr, 0);
        }
        strcpy(lb->key, k);
        lb_size = strlen(k) + sizeof(omap_key_t);

    } while(keys_retrieved == max_keys && count < max_return);

out:
    free(lb);
252
    LEAVING;
253 254 255 256
}

void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
257
    ENTERING;
258
    auto vargs = static_cast<server_visitor_args_t>(u);
259
    const char* object_name = vargs->object_name;
Matthieu Dorier's avatar
Matthieu Dorier committed
260 261 262
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
    int ret;
263 264
    *prval = 0;

265 266 267 268 269 270 271 272
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
        vargs->oid = oid;
    }
    if(oid == 0) {
273
        *prval = -1;
274 275
        ERROR fprintf(stderr, "oid == 0\n");
        LEAVING;
276 277 278
        return;
    }

279 280 281
    hg_size_t max_items = std::min(max_return, (decltype(max_return))10);
    hg_size_t key_len  = MAX_OMAP_KEY_SIZE + sizeof(omap_key_t);
    hg_size_t val_len  = MAX_OMAP_VAL_SIZE;
Matthieu Dorier's avatar
Matthieu Dorier committed
282

283 284
    omap_iter_create(iter);

Matthieu Dorier's avatar
Matthieu Dorier committed
285
    /* omap_key_t equivalent of start_key */
286
    hg_size_t lb_size = key_len;
Matthieu Dorier's avatar
Matthieu Dorier committed
287 288 289
    omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
    lb->oid = oid;
    strcpy(lb->key, start_after);
290

Matthieu Dorier's avatar
Matthieu Dorier committed
291 292 293 294 295
    /* omap_key_t equivalent of the filter_prefix */
    hg_size_t prefix_size = sizeof(omap_key_t)+strlen(filter_prefix);
    omap_key_t* prefix = (omap_key_t*)calloc(1, prefix_size);
    prefix->oid = oid;
    strcpy(prefix->key, filter_prefix);
296 297
    hg_size_t prefix_actual_size = offsetof(omap_key_t, key)+strlen(filter_prefix);
    /* we need the above because the prefix in sdskv is not considered a string */
Matthieu Dorier's avatar
Matthieu Dorier committed
298 299 300 301 302 303 304 305 306 307 308

    /* initialize structures to pass to SDSKV functions */
    std::vector<void*> keys(max_items);
    std::vector<void*> vals(max_items);
    std::vector<hg_size_t> ksizes(max_items, key_len);
    std::vector<hg_size_t> vsizes(max_items, val_len);
    std::vector<std::vector<char>> key_buffers(max_items, std::vector<char>(key_len));
    std::vector<std::vector<char>> val_buffers(max_items, std::vector<char>(val_len));
    for(auto i=0; i < max_items; i++) {
        keys[i] = (void*)key_buffers[i].data();
        vals[i] = (void*)val_buffers[i].data();
309
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
310 311 312 313 314 315 316

    hg_size_t items_retrieved = max_items;
    hg_size_t count = 0;
    do {
        ret = sdskv_list_keyvals_with_prefix(
                sdskv_ph, omap_db_id,
                (const void*)lb, lb_size,
317
                (const void*)prefix, prefix_actual_size,
Matthieu Dorier's avatar
Matthieu Dorier committed
318 319 320 321 322
                keys.data(), ksizes.data(),
                vals.data(), vsizes.data(),
                &items_retrieved);
        if(ret != SDSKV_SUCCESS) {
            *prval = -1;
323
            ERROR fprintf(stderr, "sdskv_list_keyvals_with_prefix returned %d\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
324 325 326 327 328 329 330 331 332 333 334
            break;
        }
        const char* k;
        for(auto i = 0; i < items_retrieved && count < max_return; i++, count++) {
            // extract the actual key part, without the oid
            k = ((omap_key_t*)keys[i])->key;
            /* this key is not part of the same object, we should leave the loop */
            if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */

            omap_iter_append(*iter, k, (const char*)vals[i], vsizes[i]);
        }
335 336
        memset(lb, 0, lb_size);
        lb->oid = oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
337
        strcpy(lb->key, k);
338
        //lb_size = strlen(k) + sizeof(omap_key_t);
Matthieu Dorier's avatar
Matthieu Dorier committed
339 340 341 342 343

    } while(items_retrieved == max_items && count < max_return);

out:
    free(lb);
344
    LEAVING;
345 346 347 348
}

void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
349
    ENTERING;
350
    auto vargs = static_cast<server_visitor_args_t>(u);
351
    const char* object_name = vargs->object_name;
Matthieu Dorier's avatar
Matthieu Dorier committed
352 353 354
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
    int ret;
355 356
    *prval = 0;

357 358 359 360 361 362 363 364
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
        vargs->oid = oid;
    }
    if(oid == 0) {
365
        *prval = -1;
366 367
        ERROR fprintf(stderr, "oid == 0\n");
        LEAVING;
368 369
        return;
    }
370
    
371 372
    omap_iter_create(iter);

Matthieu Dorier's avatar
Matthieu Dorier committed
373 374 375 376
    // figure out key sizes
    std::vector<hg_size_t> ksizes(num_keys);
    hg_size_t max_ksize = 0;
    for(auto i=0; i < num_keys; i++) {
377
        hg_size_t s = offsetof(omap_key_t,key)+strlen(keys[i])+1;
Matthieu Dorier's avatar
Matthieu Dorier committed
378 379 380
        if(s > max_ksize) max_ksize = s;
        ksizes[i] = s;
    }
381
    max_ksize += sizeof(omap_key_t);
382

383
    omap_key_t* key = (omap_key_t*)calloc(1, max_ksize);
Matthieu Dorier's avatar
Matthieu Dorier committed
384
    for(size_t i=0; i < num_keys; i++) {
385 386
        memset(key, 0, max_ksize);
        key->oid = oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
387 388 389 390 391 392 393
        strcpy(key->key, keys[i]);
        // get length of the value
        hg_size_t vsize;
        ret = sdskv_length(sdskv_ph, omap_db_id, 
                (const void*)key, ksizes[i], &vsize);
        if(ret != SDSKV_SUCCESS) {
            *prval = -1;
394
            ERROR fprintf(stderr, "sdskv_length returned %d\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
395 396 397 398 399 400 401 402
            break;
        }
        std::vector<char> value(vsize);
        ret = sdskv_get(sdskv_ph, omap_db_id,
                (const void*)key, ksizes[i],
                (void*)value.data(), &vsize);
        if(ret != SDSKV_SUCCESS) {
            *prval = -1;
403
            ERROR fprintf(stderr, "sdskv_get returned %d\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
404
            break;
405
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
406
        omap_iter_append(*iter, keys[i], value.data(), vsize);
407
    }
408
    LEAVING;
409 410 411 412 413 414
}

void read_op_exec_end(void* u)
{
    auto vargs = static_cast<server_visitor_args_t>(u);
}
415 416 417 418 419 420

static oid_t get_oid_from_name( 
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        const char* name)
{
421
    ENTERING;
422 423
    oid_t result = 0;
    hg_size_t oid_size = sizeof(result);
424 425 426 427 428 429
    int ret = sdskv_get(ph, name_db_id, (const void*)name, strlen(name)+1, (void*)&result, &oid_size);
    if(ret != SDSKV_SUCCESS) result = 0;
    if(result == 0) {
        ERROR fprintf(stderr,"oid == 0\n");
    }
    LEAVING;
430 431
    return result;
}