core-write-op.cpp 21 KB
Newer Older
1
#include <map>
2
#include <cstring>
3 4
#include <string>
#include <iostream>
5
#include <limits>
6
#include <bake-client.h>
7 8 9
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"

10
#if 0
11 12 13 14
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
15 16 17 18 19
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
20

21 22 23 24 25 26 27 28 29 30 31 32 33
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

34 35 36 37 38 39
static oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name);

Matthieu Dorier's avatar
Matthieu Dorier committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
static void insert_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len, 
                bake_region_id_t* region, double ts = -1.0);

static void insert_small_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len,
                const char* data, double ts = -1.0);

static void insert_zero_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, 
                uint64_t len, double ts=-1.0);

static void insert_punch_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, double ts=-1.0);

uint64_t mobject_compute_object_size(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, double ts);
67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
92 93 94 95 96
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
    oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
    vargs->oid = oid;
97 98 99 100 101 102 103 104 105
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
106
    ENTERING;
107
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
108
    oid_t oid = vargs->oid; 
109 110 111
    if(oid == 0) {
        ERROR fprintf(stderr,"oid == 0\n");
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
112 113
    /* nothing to do, the object is actually created in write_op_exec_begin
       if it did not exist before */
114
    LEAVING;
115 116 117 118
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
119
    ENTERING;
120
	auto vargs = static_cast<server_visitor_args_t>(u);
121
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
122 123
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id    = vargs->srv_ctx->segment_db_id;
124
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
125 126 127
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
128
    }
129

130 131
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
132
    bake_region_id_t rid;
133
    hg_bulk_t remote_bulk = vargs->bulk_handle;
134 135
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
136 137
    int ret;

138
    if(len > SMALL_REGION_THRESHOLD) {
139
        ret = bake_create(bake_ph, bti, len, &rid);
140 141 142 143
        if(ret != 0) {
            ERROR fprintf(stderr,"bake_create returned %d\n",ret);
            return;
        }
144
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
145 146 147
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
        }
148
        ret = bake_persist(bake_ph, rid);
149 150 151
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
        }
152
   
Matthieu Dorier's avatar
Matthieu Dorier committed
153
        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
154 155 156 157 158 159 160
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
161 162 163
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
164
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
165 166 167
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
168
        ret = margo_bulk_free(handle);
169 170 171
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
172

Matthieu Dorier's avatar
Matthieu Dorier committed
173
        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
174
    }
175
    LEAVING;
176 177 178 179
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
180
    ENTERING;
181 182 183
    // truncate to 0 then write
    write_op_exec_truncate(u,0);
    write_op_exec_write(u, buf, len, 0);
184
    LEAVING;
185 186 187 188
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
189
    ENTERING;
190
    auto vargs = static_cast<server_visitor_args_t>(u);
191
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
192 193
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
194
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
195 196 197
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
198
    }
199

200
    hg_bulk_t remote_bulk = vargs->bulk_handle;
201 202
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
203 204
    int ret;

205 206 207 208 209
    if(data_len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
        ret = bake_create(bph, bti, data_len, &rid);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_create returned %d\n", ret);
            LEAVING;
            return;
        }
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
            LEAVING;
            return;
        }
        ret = bake_persist(bph, rid);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
            LEAVING;
            return;
        }

        size_t i;

        //double ts = ABT_get_wtime();
        for(i=0; i < write_len; i += data_len) {
            // TODO normally we should have the same timestamps but right now it bugs...
            insert_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
        }

    } else {
        
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {data_len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
258

259 260 261 262 263
        size_t i;
        for(i=0; i < write_len; i+= data_len) {
            insert_small_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len-i), data);
        }
264
    }
265
    LEAVING;
266 267 268 269
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
270
    ENTERING;
271
	auto vargs = static_cast<server_visitor_args_t>(u);
272
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
273 274
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
275
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
276 277 278
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
279
    }
280

281
    hg_bulk_t remote_bulk = vargs->bulk_handle;
282 283
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
284 285
    int ret;

286
     // find out the current length of the object
287
    double ts = ABT_get_wtime();
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);

    if(len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;

        ret = bake_create(bph, bti, len, &rid);
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
        ret = bake_persist(bph, rid);

        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);

    } else {

        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }

        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
    }
324
    LEAVING;
325 326 327 328
}

void write_op_exec_remove(void* u)
{
329
    ENTERING;
330
	auto vargs = static_cast<server_visitor_args_t>(u);
331 332
    write_op_exec_truncate(u,0);
    // TODO: technically should mark the object as removed
333
    LEAVING;
334 335 336 337
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
338
    ENTERING;
339
	auto vargs = static_cast<server_visitor_args_t>(u);
340
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
341 342
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
343
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
344 345
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
346
    }
347

Matthieu Dorier's avatar
Matthieu Dorier committed
348
    insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
349
    LEAVING;
350 351 352 353
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
354
    ENTERING;
355
	auto vargs = static_cast<server_visitor_args_t>(u);
356
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
357 358
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
359
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
360 361 362
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
363
    }
364

Matthieu Dorier's avatar
Matthieu Dorier committed
365
    insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
366
    LEAVING;
367 368 369 370 371 372 373
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
374
    ENTERING;
Matthieu Dorier's avatar
Matthieu Dorier committed
375
    int ret;
376
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
377
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
378 379
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
380
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
381 382
    oid_t oid = vargs->oid;
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
383 384 385
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
386 387
    }

388 389 390 391 392 393 394 395 396 397
    /* find out the max key length */
    size_t max_k_len = 0;
    for(auto i=0; i<num; i++) {
        size_t s = strlen(keys[i]);
        max_k_len = max_k_len < s ? s : max_k_len;
    }

    /* create an omap key of the right size */
    omap_key_t* k = (omap_key_t*)calloc(1, max_k_len + sizeof(omap_key_t));

Matthieu Dorier's avatar
Matthieu Dorier committed
398
    for(auto i=0; i<num; i++) {
399 400 401 402 403 404
        size_t k_len = strlen(keys[i])+sizeof(omap_key_t);
        memset(k, 0, max_k_len + sizeof(omap_key_t));
        k->oid = oid;
        strcpy(k->key, keys[i]);
        ret = sdskv_put(sdskv_ph, omap_db_id,
                (const void*)k, k_len,
Matthieu Dorier's avatar
Matthieu Dorier committed
405 406 407 408 409
                (const void*)vals[i], lens[i]);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
        }
    }
410 411
    free(k);
    LEAVING;
412 413 414 415
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
416 417
    int ret;

418
    ENTERING;
419
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
420
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
421 422
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
423
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
424 425
    oid_t oid = vargs->oid;
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
426 427 428
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
429 430
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
431 432 433 434 435 436
    for(auto i=0; i<num_keys; i++) {
        ret = sdskv_erase(sdskv_ph, omap_db_id, 
                (const void*)keys[i], strlen(keys[i])+1);
        if(ret != SDSKV_SUCCESS)
            fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
    }
437
    LEAVING;
438 439
}

440 441 442 443 444
oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name) 
445
{
446 447
    ENTERING;

448
    oid_t oid = 0;
449 450 451 452 453 454
    hg_size_t vsize = sizeof(oid);
    int ret;

    ret = sdskv_get(ph, name_db_id, (const void*)object_name,
            strlen(object_name)+1, &oid, &vsize);
    if(SDSKV_ERR_UNKNOWN_KEY == ret) {
455
        std::hash<std::string> hash_fn;
456 457 458 459 460 461 462 463
        oid = hash_fn(std::string(object_name));
        ret = SDSKV_SUCCESS;
        while(ret == SDSKV_SUCCESS) {
            hg_size_t s = 0;
            if(oid != 0) {
                ret = sdskv_length(ph, oid_db_id, (const void*)&oid,
                            sizeof(oid), &s);
            }
464 465
            oid += 1;
        }
466
        // we make sure we stopped at an unknown key (not another SDSKV error)
467 468 469 470 471
        if(ret != SDSKV_ERR_UNKNOWN_KEY) {
            fprintf(stderr, "[ERROR] ret != SDSKV_ERR_UNKNOWN_KEY (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
472 473 474
        // set name => oid
        ret = sdskv_put(ph, name_db_id, (const void*)object_name,
                strlen(object_name)+1, &oid, sizeof(oid));
475 476 477 478 479
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(name->oid), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
480 481 482
        // set oid => name
        ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
                (const void*)object_name, strlen(object_name)+1);
483 484 485 486 487
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(oid->name), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
488
    }
489
    LEAVING;
490 491 492
    return oid;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
493 494 495 496 497
static void insert_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, 
        bake_region_id_t* region, double ts)
498
{
499
    ENTERING;
500 501 502 503 504 505
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
Matthieu Dorier's avatar
Matthieu Dorier committed
506 507 508 509 510 511
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)region, sizeof(*region));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
512
    LEAVING;
513 514
}

Matthieu Dorier's avatar
Matthieu Dorier committed
515 516 517 518 519
static void insert_small_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len,
        const char* data, double ts)
520
{
521
    ENTERING;
522 523 524 525 526 527
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
Matthieu Dorier's avatar
Matthieu Dorier committed
528 529 530 531 532 533
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)data, len);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
534
    LEAVING;
535 536
}

Matthieu Dorier's avatar
Matthieu Dorier committed
537 538 539 540
static void insert_zero_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, double ts)
541
{
542
    ENTERING;
543 544 545 546 547 548
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
Matthieu Dorier's avatar
Matthieu Dorier committed
549 550 551 552 553 554
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
555
    LEAVING;
556 557
}

Matthieu Dorier's avatar
Matthieu Dorier committed
558 559 560 561
static void insert_punch_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, double ts)
562
{
563
    ENTERING;
564 565 566 567 568 569
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
Matthieu Dorier's avatar
Matthieu Dorier committed
570 571 572 573 574 575
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
576
    LEAVING;
577 578
}

Matthieu Dorier's avatar
Matthieu Dorier committed
579 580 581 582
uint64_t mobject_compute_object_size(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, double ts)
583
{
584
    ENTERING;
Matthieu Dorier's avatar
Matthieu Dorier committed
585 586 587 588
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;

589 590 591
    uint64_t size = 0; // current assumed size
    uint64_t max_size = std::numeric_limits<uint64_t>::max();

Matthieu Dorier's avatar
Matthieu Dorier committed
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
    size_t max_segments = 10;
    segment_key_t segment_keys[max_segments];
    void* segment_keys_addrs[max_segments];
    hg_size_t segment_keys_size[max_segments];

    for(auto i = 0; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)&segment_keys[i];
        segment_keys_size[i] = sizeof(segment_key_t);
    }

    bool done = false;
    while(!done) {

        size_t num_items = max_segments;

        int ret = sdskv_list_keys(ph, seg_db_id,
            (const void*)&lb, sizeof(lb),
            segment_keys_addrs, segment_keys_size,
            &num_items);

        if(ret != SDSKV_SUCCESS) {
            ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
            LEAVING;
            return 0;
        }

        size_t i = 0;
        for(; i < num_items; i++) {
            if(segment_keys[i].oid != oid) {
                done = true;
                break;
            }
            auto& seg = segment_keys[i];
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) {
627 628 629 630 631 632 633
                    size = std::min(seg.end_index, max_size);
                }
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(max_size > seg.start_index) {
                    max_size = seg.start_index;
                }
                if(size < seg.start_index) {
Matthieu Dorier's avatar
Matthieu Dorier committed
634 635 636 637 638 639 640 641 642 643 644
                    size = seg.start_index;
                }
                done = true;
                break;
            }
            lb.timestamp = seg.timestamp;
        }
        if(num_items != max_segments) {
            done = true;
        }
    }
645
    LEAVING;
646 647
    return size;
}