core-write-op.cpp 24.5 KB
Newer Older
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6
#include <map>
7
#include <cstring>
8 9
#include <string>
#include <iostream>
10
#include <limits>
11
#include <bake-client.h>
12 13 14
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"

15
#if 0
16 17 18 19
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
20 21 22 23 24
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
25

26 27 28 29 30 31 32 33 34 35 36 37 38
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

39 40 41 42 43 44
static oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name);

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
static void insert_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len, 
                bake_region_id_t* region, double ts = -1.0);

static void insert_small_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len,
                const char* data, double ts = -1.0);

static void insert_zero_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, 
                uint64_t len, double ts=-1.0);

static void insert_punch_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, double ts=-1.0);

uint64_t mobject_compute_object_size(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, double ts);
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
97 98 99 100 101
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
    oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
    vargs->oid = oid;
102 103 104 105 106 107 108 109 110
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
111
    ENTERING;
112
	auto vargs = static_cast<server_visitor_args_t>(u);
113
    oid_t oid = vargs->oid; 
114 115 116
    if(oid == 0) {
        ERROR fprintf(stderr,"oid == 0\n");
    }
117 118
    /* nothing to do, the object is actually created in write_op_exec_begin
       if it did not exist before */
119
    LEAVING;
120 121 122 123
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
124
    ENTERING;
125
	auto vargs = static_cast<server_visitor_args_t>(u);
126
    oid_t oid = vargs->oid;
127 128
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id    = vargs->srv_ctx->segment_db_id;
129
    if(oid == 0) {
130 131 132
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
133
    }
134

135 136
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
137
    bake_region_id_t rid;
138
    hg_bulk_t remote_bulk = vargs->bulk_handle;
139 140
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
141 142
    int ret;

143
    if(len > SMALL_REGION_THRESHOLD) {
144
        ret = bake_create(bake_ph, bti, len, &rid);
145
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
146
            ERROR bake_perror("bake_create",ret);
147 148
            return;
        }
149
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
150
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
151
            ERROR bake_perror( "bake_proxy_write", ret);
152
        }
Rob Latham's avatar
Rob Latham committed
153
        ret = bake_persist(bake_ph, rid, offset, len);
154
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
155
            ERROR bake_perror("bake_persist", ret);
156
        }
157
   
158
        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
159 160 161 162 163 164 165
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
166 167 168
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
169
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
170 171 172
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
173
        ret = margo_bulk_free(handle);
174 175 176
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
177

178
        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
179
    }
180
    LEAVING;
181 182 183 184
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
185
    ENTERING;
186 187 188
    // truncate to 0 then write
    write_op_exec_truncate(u,0);
    write_op_exec_write(u, buf, len, 0);
189
    LEAVING;
190 191 192 193
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
194
    ENTERING;
195
    auto vargs = static_cast<server_visitor_args_t>(u);
196
    oid_t oid = vargs->oid;
197 198
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
199
    if(oid == 0) {
200 201 202
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
203
    }
204

205
    hg_bulk_t remote_bulk = vargs->bulk_handle;
206 207
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
208 209
    int ret;

210 211 212 213 214
    if(data_len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;
215

216 217
        ret = bake_create(bph, bti, data_len, &rid);
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
218
            ERROR bake_perror("bake_create", ret);
219 220 221 222 223
            LEAVING;
            return;
        }
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
224
            ERROR bake_perror("bake_proxy_write", ret);
225 226 227
            LEAVING;
            return;
        }
Rob Latham's avatar
Rob Latham committed
228
        ret = bake_persist(bph, rid, offset, data_len);
229
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
230
            ERROR bake_perror("bake_persist", ret);
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
            LEAVING;
            return;
        }

        size_t i;

        //double ts = ABT_get_wtime();
        for(i=0; i < write_len; i += data_len) {
            // TODO normally we should have the same timestamps but right now it bugs...
            insert_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
        }

    } else {
        
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {data_len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
263

264 265 266 267 268
        size_t i;
        for(i=0; i < write_len; i+= data_len) {
            insert_small_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len-i), data);
        }
269
    }
270
    LEAVING;
271 272 273 274
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
275
    ENTERING;
276
	auto vargs = static_cast<server_visitor_args_t>(u);
277
    oid_t oid = vargs->oid;
278 279
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
280
    if(oid == 0) {
281 282 283
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
284
    }
285

286
    hg_bulk_t remote_bulk = vargs->bulk_handle;
287 288
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
289 290
    int ret;

291
     // find out the current length of the object
292
    double ts = ABT_get_wtime();
293 294 295 296 297 298 299 300 301
    uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);

    if(len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;

        ret = bake_create(bph, bti, len, &rid);
Rob Latham's avatar
Rob Latham committed
302
        if (ret != 0) bake_perror("bake_create", ret);
303
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
Rob Latham's avatar
Rob Latham committed
304
        if (ret != 0) bake_perror("bake_proxy_write", ret);
Rob Latham's avatar
Rob Latham committed
305
        ret = bake_persist(bph, rid, offset, len);
Rob Latham's avatar
Rob Latham committed
306
        if (ret != 0) bake_perror("bake_persist", ret);
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331

        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);

    } else {

        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }

        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
    }
332
    LEAVING;
333 334 335 336
}

void write_op_exec_remove(void* u)
{
337
    ENTERING;
338
	auto vargs = static_cast<server_visitor_args_t>(u);
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
    const char *object_name = vargs->object_name;
    oid_t oid = vargs->oid;
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
    int ret;

    /* remove name->OID entry to make object no longer visible to clients */
    ret = sdskv_erase(sdskv_ph, name_db_id, (const void *)object_name,
            strlen(object_name)+1);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in name_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    /* TODO bg thread for everything beyond this point */

    ret = sdskv_erase(sdskv_ph, oid_db_id, &oid, sizeof(oid));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in oid_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ABT_get_wtime();

    size_t max_segments = 10; // XXX this is a pretty arbitrary number
    segment_key_t       segment_keys[max_segments];
    void*               segment_keys_addrs[max_segments];
    hg_size_t           segment_keys_size[max_segments];
    bake_region_id_t    segment_data[max_segments];
    void*               segment_data_addrs[max_segments];
    hg_size_t           segment_data_size[max_segments];
    for(auto i = 0 ; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)(&segment_keys[i]);
        segment_keys_size[i]  = sizeof(segment_key_t);
        segment_data_addrs[i] = (void*)(&segment_data[i]);
        segment_data_size[i]  = sizeof(bake_region_id_t);
    }

    /* iterate over and remove all segments for this oid */
    size_t num_segments = max_segments;
    do {
        ret = sdskv_list_keyvals(sdskv_ph, seg_db_id,
                    (const void *)&lb, sizeof(lb),
                    segment_keys_addrs, segment_keys_size,
                    segment_data_addrs, segment_data_size,
                    &num_segments);

        if(ret != SDSKV_SUCCESS) {
            /* XXX should save the error and keep removing */
            ERROR fprintf(stderr, "write_op_exec_remove: "
                "error in sdskv_list_keyvals() (ret = %d)\n", ret);
            LEAVING;
            return;
        }

        size_t i;
        for(i = 0; i < num_segments; i++) {
            const segment_key_t&    seg    = segment_keys[i];
            const bake_region_id_t& region = segment_data[i];

            if(seg.type == seg_type_t::BAKE_REGION) {
                ret = bake_remove(bake_ph, region);
                if (ret != BAKE_SUCCESS) {
                    /* XXX should save the error and keep removing */
Rob Latham's avatar
Rob Latham committed
412 413
                    ERROR bake_perror("write_op_exec_remove: "
                        "error in bake_remove()", ret);
414 415 416 417 418 419 420 421 422 423 424 425 426 427
                    LEAVING;
                    return;
                }
            }
            ret = sdskv_erase(sdskv_ph, seg_db_id, &seg, sizeof(seg));
            if(ret != SDSKV_SUCCESS) {
                ERROR fprintf(stderr,"write_op_exec_remove: "
                    "error in seg_db sdskv_erase() (ret = %d)\n", ret);
                LEAVING;
                return;
            }
        }
    } while(num_segments == max_segments);

428
    LEAVING;
429 430 431 432
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
433
    ENTERING;
434
	auto vargs = static_cast<server_visitor_args_t>(u);
435
    oid_t oid = vargs->oid;
436 437
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
438
    if(oid == 0) {
439 440
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
441
    }
442

443
    insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
444
    LEAVING;
445 446 447 448
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
449
    ENTERING;
450
	auto vargs = static_cast<server_visitor_args_t>(u);
451
    oid_t oid = vargs->oid;
452 453
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
454
    if(oid == 0) {
455 456 457
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
458
    }
459

460
    insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
461
    LEAVING;
462 463 464 465 466 467 468
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
469
    ENTERING;
470
    int ret;
471
	auto vargs = static_cast<server_visitor_args_t>(u);
472
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
473 474
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
475
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
476 477
    oid_t oid = vargs->oid;
    if(oid == 0) {
478 479 480
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
481 482
    }

483 484 485 486 487 488 489 490 491 492
    /* find out the max key length */
    size_t max_k_len = 0;
    for(auto i=0; i<num; i++) {
        size_t s = strlen(keys[i]);
        max_k_len = max_k_len < s ? s : max_k_len;
    }

    /* create an omap key of the right size */
    omap_key_t* k = (omap_key_t*)calloc(1, max_k_len + sizeof(omap_key_t));

493
    for(auto i=0; i<num; i++) {
494 495 496 497 498 499
        size_t k_len = strlen(keys[i])+sizeof(omap_key_t);
        memset(k, 0, max_k_len + sizeof(omap_key_t));
        k->oid = oid;
        strcpy(k->key, keys[i]);
        ret = sdskv_put(sdskv_ph, omap_db_id,
                (const void*)k, k_len,
500 501 502 503 504
                (const void*)vals[i], lens[i]);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
        }
    }
505 506
    free(k);
    LEAVING;
507 508 509 510
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
511 512
    int ret;

513
    ENTERING;
514
	auto vargs = static_cast<server_visitor_args_t>(u);
515
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
516 517
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
518
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
519 520
    oid_t oid = vargs->oid;
    if(oid == 0) {
521 522 523
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
524 525
    }

526 527 528 529 530 531
    for(auto i=0; i<num_keys; i++) {
        ret = sdskv_erase(sdskv_ph, omap_db_id, 
                (const void*)keys[i], strlen(keys[i])+1);
        if(ret != SDSKV_SUCCESS)
            fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
    }
532
    LEAVING;
533 534
}

535 536 537 538 539
oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name) 
540
{
541 542
    ENTERING;

543
    oid_t oid = 0;
544 545 546 547 548 549
    hg_size_t vsize = sizeof(oid);
    int ret;

    ret = sdskv_get(ph, name_db_id, (const void*)object_name,
            strlen(object_name)+1, &oid, &vsize);
    if(SDSKV_ERR_UNKNOWN_KEY == ret) {
550
        std::hash<std::string> hash_fn;
551 552 553 554 555 556 557 558
        oid = hash_fn(std::string(object_name));
        ret = SDSKV_SUCCESS;
        while(ret == SDSKV_SUCCESS) {
            hg_size_t s = 0;
            if(oid != 0) {
                ret = sdskv_length(ph, oid_db_id, (const void*)&oid,
                            sizeof(oid), &s);
            }
559 560
            oid += 1;
        }
561
        // we make sure we stopped at an unknown key (not another SDSKV error)
562 563 564 565 566
        if(ret != SDSKV_ERR_UNKNOWN_KEY) {
            fprintf(stderr, "[ERROR] ret != SDSKV_ERR_UNKNOWN_KEY (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
567 568 569
        // set name => oid
        ret = sdskv_put(ph, name_db_id, (const void*)object_name,
                strlen(object_name)+1, &oid, sizeof(oid));
570 571 572 573 574
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(name->oid), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
575 576 577
        // set oid => name
        ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
                (const void*)object_name, strlen(object_name)+1);
578 579 580 581 582
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(oid->name), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
583
    }
584
    LEAVING;
585 586 587
    return oid;
}

588 589 590 591 592
static void insert_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, 
        bake_region_id_t* region, double ts)
593
{
594
    ENTERING;
595 596 597 598 599 600
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
601 602 603 604 605 606
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)region, sizeof(*region));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
607
    LEAVING;
608 609
}

610 611 612 613 614
static void insert_small_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len,
        const char* data, double ts)
615
{
616
    ENTERING;
617 618 619 620 621 622
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
623 624 625 626 627 628
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)data, len);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
629
    LEAVING;
630 631
}

632 633 634 635
static void insert_zero_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, double ts)
636
{
637
    ENTERING;
638 639 640 641 642 643
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
644 645 646 647 648 649
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
650
    LEAVING;
651 652
}

653 654 655 656
static void insert_punch_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, double ts)
657
{
658
    ENTERING;
659 660 661 662 663 664
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
665 666 667 668 669 670
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
671
    LEAVING;
672 673
}

674 675 676 677
uint64_t mobject_compute_object_size(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, double ts)
678
{
679
    ENTERING;
680 681 682 683
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;

684 685 686
    uint64_t size = 0; // current assumed size
    uint64_t max_size = std::numeric_limits<uint64_t>::max();

687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
    size_t max_segments = 10;
    segment_key_t segment_keys[max_segments];
    void* segment_keys_addrs[max_segments];
    hg_size_t segment_keys_size[max_segments];

    for(auto i = 0; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)&segment_keys[i];
        segment_keys_size[i] = sizeof(segment_key_t);
    }

    bool done = false;
    while(!done) {

        size_t num_items = max_segments;

        int ret = sdskv_list_keys(ph, seg_db_id,
            (const void*)&lb, sizeof(lb),
            segment_keys_addrs, segment_keys_size,
            &num_items);

        if(ret != SDSKV_SUCCESS) {
            ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
            LEAVING;
            return 0;
        }

        size_t i = 0;
        for(; i < num_items; i++) {
            if(segment_keys[i].oid != oid) {
                done = true;
                break;
            }
            auto& seg = segment_keys[i];
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) {
722 723 724 725 726 727 728
                    size = std::min(seg.end_index, max_size);
                }
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(max_size > seg.start_index) {
                    max_size = seg.start_index;
                }
                if(size < seg.start_index) {
729 730 731 732 733 734 735 736 737 738 739
                    size = seg.start_index;
                }
                done = true;
                break;
            }
            lb.timestamp = seg.timestamp;
        }
        if(num_items != max_segments) {
            done = true;
        }
    }
740
    LEAVING;
741 742
    return size;
}