core-write-op.cpp 24.5 KB
Newer Older
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6
#include <map>
7
#include <cstring>
8 9
#include <string>
#include <iostream>
10
#include <limits>
11
#include <bake-client.h>
12 13 14
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"

15
#if 0
16 17 18 19
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
20 21 22 23 24
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
25

26 27 28 29 30 31 32 33 34 35 36 37 38
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

39 40 41 42 43 44
static oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name);

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
static void insert_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len, 
                bake_region_id_t* region, double ts = -1.0);

static void insert_small_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len,
                const char* data, double ts = -1.0);

static void insert_zero_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, 
                uint64_t len, double ts=-1.0);

static void insert_punch_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, double ts=-1.0);

uint64_t mobject_compute_object_size(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, double ts);
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
97 98 99 100 101
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
    oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
    vargs->oid = oid;
102 103 104 105 106 107 108 109 110
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
111
    ENTERING;
112
	auto vargs = static_cast<server_visitor_args_t>(u);
113
    oid_t oid = vargs->oid; 
114 115 116
    if(oid == 0) {
        ERROR fprintf(stderr,"oid == 0\n");
    }
117 118
    /* nothing to do, the object is actually created in write_op_exec_begin
       if it did not exist before */
119
    LEAVING;
120 121 122 123
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
124
    ENTERING;
125
	auto vargs = static_cast<server_visitor_args_t>(u);
126
    oid_t oid = vargs->oid;
127 128
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id    = vargs->srv_ctx->segment_db_id;
129
    if(oid == 0) {
130 131 132
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
133
    }
134

135 136
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
137
    bake_region_id_t rid;
138
    hg_bulk_t remote_bulk = vargs->bulk_handle;
139 140
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
141 142
    int ret;

143
    if(len > SMALL_REGION_THRESHOLD) {
144
        ret = bake_create(bake_ph, bti, len, &rid);
145 146 147 148
        if(ret != 0) {
            ERROR fprintf(stderr,"bake_create returned %d\n",ret);
            return;
        }
149
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
150 151 152
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
        }
153
        ret = bake_persist(bake_ph, rid);
154 155 156
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
        }
157
   
158
        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
159 160 161 162 163 164 165
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
166 167 168
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
169
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
170 171 172
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
173
        ret = margo_bulk_free(handle);
174 175 176
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
177

178
        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
179
    }
180
    LEAVING;
181 182 183 184
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
185
    ENTERING;
186 187 188
    // truncate to 0 then write
    write_op_exec_truncate(u,0);
    write_op_exec_write(u, buf, len, 0);
189
    LEAVING;
190 191 192 193
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
194
    ENTERING;
195
    auto vargs = static_cast<server_visitor_args_t>(u);
196
    oid_t oid = vargs->oid;
197 198
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
199
    if(oid == 0) {
200 201 202
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
203
    }
204

205
    hg_bulk_t remote_bulk = vargs->bulk_handle;
206 207
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
208 209
    int ret;

210 211 212 213 214
    if(data_len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;
215

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
        ret = bake_create(bph, bti, data_len, &rid);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_create returned %d\n", ret);
            LEAVING;
            return;
        }
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
            LEAVING;
            return;
        }
        ret = bake_persist(bph, rid);
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
            LEAVING;
            return;
        }

        size_t i;

        //double ts = ABT_get_wtime();
        for(i=0; i < write_len; i += data_len) {
            // TODO normally we should have the same timestamps but right now it bugs...
            insert_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
        }

    } else {
        
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {data_len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
263

264 265 266 267 268
        size_t i;
        for(i=0; i < write_len; i+= data_len) {
            insert_small_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len-i), data);
        }
269
    }
270
    LEAVING;
271 272 273 274
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
275
    ENTERING;
276
	auto vargs = static_cast<server_visitor_args_t>(u);
277
    oid_t oid = vargs->oid;
278 279
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
280
    if(oid == 0) {
281 282 283
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
284
    }
285

286
    hg_bulk_t remote_bulk = vargs->bulk_handle;
287 288
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
289 290
    int ret;

291
     // find out the current length of the object
292
    double ts = ABT_get_wtime();
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);

    if(len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;

        ret = bake_create(bph, bti, len, &rid);
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
        ret = bake_persist(bph, rid);

        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);

    } else {

        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }

        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
    }
329
    LEAVING;
330 331 332 333
}

void write_op_exec_remove(void* u)
{
334
    ENTERING;
335
	auto vargs = static_cast<server_visitor_args_t>(u);
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
    const char *object_name = vargs->object_name;
    oid_t oid = vargs->oid;
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
    int ret;

    /* remove name->OID entry to make object no longer visible to clients */
    ret = sdskv_erase(sdskv_ph, name_db_id, (const void *)object_name,
            strlen(object_name)+1);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in name_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    /* TODO bg thread for everything beyond this point */

    ret = sdskv_erase(sdskv_ph, oid_db_id, &oid, sizeof(oid));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in oid_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ABT_get_wtime();

    size_t max_segments = 10; // XXX this is a pretty arbitrary number
    segment_key_t       segment_keys[max_segments];
    void*               segment_keys_addrs[max_segments];
    hg_size_t           segment_keys_size[max_segments];
    bake_region_id_t    segment_data[max_segments];
    void*               segment_data_addrs[max_segments];
    hg_size_t           segment_data_size[max_segments];
    for(auto i = 0 ; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)(&segment_keys[i]);
        segment_keys_size[i]  = sizeof(segment_key_t);
        segment_data_addrs[i] = (void*)(&segment_data[i]);
        segment_data_size[i]  = sizeof(bake_region_id_t);
    }

    /* iterate over and remove all segments for this oid */
    size_t num_segments = max_segments;
    do {
        ret = sdskv_list_keyvals(sdskv_ph, seg_db_id,
                    (const void *)&lb, sizeof(lb),
                    segment_keys_addrs, segment_keys_size,
                    segment_data_addrs, segment_data_size,
                    &num_segments);

        if(ret != SDSKV_SUCCESS) {
            /* XXX should save the error and keep removing */
            ERROR fprintf(stderr, "write_op_exec_remove: "
                "error in sdskv_list_keyvals() (ret = %d)\n", ret);
            LEAVING;
            return;
        }

        size_t i;
        for(i = 0; i < num_segments; i++) {
            const segment_key_t&    seg    = segment_keys[i];
            const bake_region_id_t& region = segment_data[i];

            if(seg.type == seg_type_t::BAKE_REGION) {
                ret = bake_remove(bake_ph, region);
                if (ret != BAKE_SUCCESS) {
                    /* XXX should save the error and keep removing */
                    ERROR fprintf(stderr, "write_op_exec_remove: "
                        "error in bake_remove() (ret = %d)\n", ret);
                    LEAVING;
                    return;
                }
            }
            ret = sdskv_erase(sdskv_ph, seg_db_id, &seg, sizeof(seg));
            if(ret != SDSKV_SUCCESS) {
                ERROR fprintf(stderr,"write_op_exec_remove: "
                    "error in seg_db sdskv_erase() (ret = %d)\n", ret);
                LEAVING;
                return;
            }
        }
    } while(num_segments == max_segments);

425
    LEAVING;
426 427 428 429
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
430
    ENTERING;
431
	auto vargs = static_cast<server_visitor_args_t>(u);
432
    oid_t oid = vargs->oid;
433 434
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
435
    if(oid == 0) {
436 437
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
438
    }
439

440
    insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
441
    LEAVING;
442 443 444 445
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
446
    ENTERING;
447
	auto vargs = static_cast<server_visitor_args_t>(u);
448
    oid_t oid = vargs->oid;
449 450
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
451
    if(oid == 0) {
452 453 454
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
455
    }
456

457
    insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
458
    LEAVING;
459 460 461 462 463 464 465
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
466
    ENTERING;
467
    int ret;
468
	auto vargs = static_cast<server_visitor_args_t>(u);
469
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
470 471
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
472
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
473 474
    oid_t oid = vargs->oid;
    if(oid == 0) {
475 476 477
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
478 479
    }

480 481 482 483 484 485 486 487 488 489
    /* find out the max key length */
    size_t max_k_len = 0;
    for(auto i=0; i<num; i++) {
        size_t s = strlen(keys[i]);
        max_k_len = max_k_len < s ? s : max_k_len;
    }

    /* create an omap key of the right size */
    omap_key_t* k = (omap_key_t*)calloc(1, max_k_len + sizeof(omap_key_t));

490
    for(auto i=0; i<num; i++) {
491 492 493 494 495 496
        size_t k_len = strlen(keys[i])+sizeof(omap_key_t);
        memset(k, 0, max_k_len + sizeof(omap_key_t));
        k->oid = oid;
        strcpy(k->key, keys[i]);
        ret = sdskv_put(sdskv_ph, omap_db_id,
                (const void*)k, k_len,
497 498 499 500 501
                (const void*)vals[i], lens[i]);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
        }
    }
502 503
    free(k);
    LEAVING;
504 505 506 507
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
508 509
    int ret;

510
    ENTERING;
511
	auto vargs = static_cast<server_visitor_args_t>(u);
512
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
513 514
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
515
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
516 517
    oid_t oid = vargs->oid;
    if(oid == 0) {
518 519 520
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
521 522
    }

523 524 525 526 527 528
    for(auto i=0; i<num_keys; i++) {
        ret = sdskv_erase(sdskv_ph, omap_db_id, 
                (const void*)keys[i], strlen(keys[i])+1);
        if(ret != SDSKV_SUCCESS)
            fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
    }
529
    LEAVING;
530 531
}

532 533 534 535 536
oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name) 
537
{
538 539
    ENTERING;

540
    oid_t oid = 0;
541 542 543 544 545 546
    hg_size_t vsize = sizeof(oid);
    int ret;

    ret = sdskv_get(ph, name_db_id, (const void*)object_name,
            strlen(object_name)+1, &oid, &vsize);
    if(SDSKV_ERR_UNKNOWN_KEY == ret) {
547
        std::hash<std::string> hash_fn;
548 549 550 551 552 553 554 555
        oid = hash_fn(std::string(object_name));
        ret = SDSKV_SUCCESS;
        while(ret == SDSKV_SUCCESS) {
            hg_size_t s = 0;
            if(oid != 0) {
                ret = sdskv_length(ph, oid_db_id, (const void*)&oid,
                            sizeof(oid), &s);
            }
556 557
            oid += 1;
        }
558
        // we make sure we stopped at an unknown key (not another SDSKV error)
559 560 561 562 563
        if(ret != SDSKV_ERR_UNKNOWN_KEY) {
            fprintf(stderr, "[ERROR] ret != SDSKV_ERR_UNKNOWN_KEY (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
564 565 566
        // set name => oid
        ret = sdskv_put(ph, name_db_id, (const void*)object_name,
                strlen(object_name)+1, &oid, sizeof(oid));
567 568 569 570 571
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(name->oid), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
572 573 574
        // set oid => name
        ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
                (const void*)object_name, strlen(object_name)+1);
575 576 577 578 579
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(oid->name), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
580
    }
581
    LEAVING;
582 583 584
    return oid;
}

585 586 587 588 589
static void insert_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, 
        bake_region_id_t* region, double ts)
590
{
591
    ENTERING;
592 593 594 595 596 597
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
598 599 600 601 602 603
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)region, sizeof(*region));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
604
    LEAVING;
605 606
}

607 608 609 610 611
static void insert_small_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len,
        const char* data, double ts)
612
{
613
    ENTERING;
614 615 616 617 618 619
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
620 621 622 623 624 625
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)data, len);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
626
    LEAVING;
627 628
}

629 630 631 632
static void insert_zero_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, double ts)
633
{
634
    ENTERING;
635 636 637 638 639 640
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
641 642 643 644 645 646
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
647
    LEAVING;
648 649
}

650 651 652 653
static void insert_punch_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, double ts)
654
{
655
    ENTERING;
656 657 658 659 660 661
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
662 663 664 665 666 667
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
668
    LEAVING;
669 670
}

671 672 673 674
uint64_t mobject_compute_object_size(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, double ts)
675
{
676
    ENTERING;
677 678 679 680
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;

681 682 683
    uint64_t size = 0; // current assumed size
    uint64_t max_size = std::numeric_limits<uint64_t>::max();

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
    size_t max_segments = 10;
    segment_key_t segment_keys[max_segments];
    void* segment_keys_addrs[max_segments];
    hg_size_t segment_keys_size[max_segments];

    for(auto i = 0; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)&segment_keys[i];
        segment_keys_size[i] = sizeof(segment_key_t);
    }

    bool done = false;
    while(!done) {

        size_t num_items = max_segments;

        int ret = sdskv_list_keys(ph, seg_db_id,
            (const void*)&lb, sizeof(lb),
            segment_keys_addrs, segment_keys_size,
            &num_items);

        if(ret != SDSKV_SUCCESS) {
            ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
            LEAVING;
            return 0;
        }

        size_t i = 0;
        for(; i < num_items; i++) {
            if(segment_keys[i].oid != oid) {
                done = true;
                break;
            }
            auto& seg = segment_keys[i];
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) {
719 720 721 722 723 724 725
                    size = std::min(seg.end_index, max_size);
                }
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(max_size > seg.start_index) {
                    max_size = seg.start_index;
                }
                if(size < seg.start_index) {
726 727 728 729 730 731 732 733 734 735 736
                    size = seg.start_index;
                }
                done = true;
                break;
            }
            lb.timestamp = seg.timestamp;
        }
        if(num_items != max_segments) {
            done = true;
        }
    }
737
    LEAVING;
738 739
    return size;
}