core-write-op.cpp 25.3 KB
Newer Older
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6
#include <map>
7
#include <cstring>
8 9
#include <string>
#include <iostream>
10
#include <limits>
11
#include <bake-client.h>
12 13 14
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"

15
#if 0
16 17 18 19
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
20 21 22 23 24
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
25

26 27 28 29 30 31 32 33 34 35 36 37 38
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

39
static oid_t get_or_create_oid(
40 41 42 43 44
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name);

Matthieu Dorier's avatar
Matthieu Dorier committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
static void insert_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len, 
                bake_region_id_t* region, double ts = -1.0);

static void insert_small_region_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, uint64_t len,
                const char* data, double ts = -1.0);

static void insert_zero_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, 
                uint64_t len, double ts=-1.0);

static void insert_punch_log_entry(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, uint64_t offset, double ts=-1.0);

uint64_t mobject_compute_object_size(
                sdskv_provider_handle_t ph,
                sdskv_database_id_t seg_db_id,
                oid_t oid, double ts);
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
97 98 99
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
100
    oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
Matthieu Dorier's avatar
Matthieu Dorier committed
101
    vargs->oid = oid;
102 103 104 105 106 107 108 109 110
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
111
    ENTERING;
112
    auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
113
    oid_t oid = vargs->oid; 
114
    if(oid == 0) {
115
        ERROR fprintf(stderr,"oid == 0\n");
116
    }
117 118
    /* nothing to do, the object is actually created in write_op_exec_begin
       if it did not exist before */
119
    LEAVING;
120 121 122 123
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
124
    ENTERING;
125
	auto vargs = static_cast<server_visitor_args_t>(u);
126
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
127 128
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id    = vargs->srv_ctx->segment_db_id;
129
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
130 131 132
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
133
    }
134

135 136
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
137
    bake_region_id_t rid;
138
    hg_bulk_t remote_bulk = vargs->bulk_handle;
139 140
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
141 142
    int ret;

143
    if(len > SMALL_REGION_THRESHOLD) {
144
        ret = bake_create(bake_ph, bti, len, &rid);
145
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
146
            ERROR bake_perror("bake_create",ret);
147 148
            return;
        }
149
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
150
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
151
            ERROR bake_perror( "bake_proxy_write", ret);
152
        }
153
        ret = bake_persist(bake_ph, rid, 0, len);
154
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
155
            ERROR bake_perror("bake_persist", ret);
156
        }
157
   
Matthieu Dorier's avatar
Matthieu Dorier committed
158
        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
159 160 161 162 163 164 165
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
166 167 168
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
169
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
170 171 172
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
173
        ret = margo_bulk_free(handle);
174 175 176
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
177

Matthieu Dorier's avatar
Matthieu Dorier committed
178
        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
179
    }
180
    LEAVING;
181 182 183 184
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
185
    ENTERING;
186 187 188
    // truncate to 0 then write
    write_op_exec_truncate(u,0);
    write_op_exec_write(u, buf, len, 0);
189
    LEAVING;
190 191 192 193
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
194
    ENTERING;
195
    auto vargs = static_cast<server_visitor_args_t>(u);
196
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
197 198
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
199
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
200 201 202
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
203
    }
204

205
    hg_bulk_t remote_bulk = vargs->bulk_handle;
206 207
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
208 209
    int ret;

210 211 212 213 214
    if(data_len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;
215

216 217
        ret = bake_create(bph, bti, data_len, &rid);
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
218
            ERROR bake_perror("bake_create", ret);
219 220 221 222 223
            LEAVING;
            return;
        }
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
224
            ERROR bake_perror("bake_proxy_write", ret);
225 226 227
            LEAVING;
            return;
        }
228
        ret = bake_persist(bph, rid, 0, data_len);
229
        if(ret != 0) {
Rob Latham's avatar
Rob Latham committed
230
            ERROR bake_perror("bake_persist", ret);
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
            LEAVING;
            return;
        }

        size_t i;

        //double ts = ABT_get_wtime();
        for(i=0; i < write_len; i += data_len) {
            // TODO normally we should have the same timestamps but right now it bugs...
            insert_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
        }

    } else {
        
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {data_len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, data_len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
263

264 265 266 267 268
        size_t i;
        for(i=0; i < write_len; i+= data_len) {
            insert_small_region_log_entry(sdskv_ph, seg_db_id, 
                    oid, offset+i, std::min(data_len, write_len-i), data);
        }
269
    }
270
    LEAVING;
271 272 273 274
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
275
    ENTERING;
276
	auto vargs = static_cast<server_visitor_args_t>(u);
277
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
278 279
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
280
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
281 282 283
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
284
    }
285

286
    hg_bulk_t remote_bulk = vargs->bulk_handle;
287 288
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
289 290
    int ret;

291
     // find out the current length of the object
292
    double ts = ABT_get_wtime();
293 294 295 296 297 298 299 300 301
    uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);

    if(len > SMALL_REGION_THRESHOLD) {

        bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
        bake_target_id_t bti = vargs->srv_ctx->bake_tid;
        bake_region_id_t rid;

        ret = bake_create(bph, bti, len, &rid);
Rob Latham's avatar
Rob Latham committed
302
        if (ret != 0) bake_perror("bake_create", ret);
303
        ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
Rob Latham's avatar
Rob Latham committed
304
        if (ret != 0) bake_perror("bake_proxy_write", ret);
305
        ret = bake_persist(bph, rid, 0, len);
Rob Latham's avatar
Rob Latham committed
306
        if (ret != 0) bake_perror("bake_persist", ret);
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331

        insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);

    } else {

        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
        ret = margo_bulk_free(handle);
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }

        insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
    }
332
    LEAVING;
333 334 335 336
}

void write_op_exec_remove(void* u)
{
337
    ENTERING;
338
	auto vargs = static_cast<server_visitor_args_t>(u);
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    const char *object_name = vargs->object_name;
    oid_t oid = vargs->oid;
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
    int ret;

    /* remove name->OID entry to make object no longer visible to clients */
    ret = sdskv_erase(sdskv_ph, name_db_id, (const void *)object_name,
            strlen(object_name)+1);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in name_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    /* TODO bg thread for everything beyond this point */

    ret = sdskv_erase(sdskv_ph, oid_db_id, &oid, sizeof(oid));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr,"write_op_exec_remove: "
            "error in oid_db sdskv_erase() (ret = %d)\n", ret);
        LEAVING;
        return;
    }

    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ABT_get_wtime();

Shane Snyder's avatar
Shane Snyder committed
372
    size_t max_segments = 128; // XXX this is a pretty arbitrary number
373 374 375 376 377 378 379 380 381 382 383 384 385 386
    segment_key_t       segment_keys[max_segments];
    void*               segment_keys_addrs[max_segments];
    hg_size_t           segment_keys_size[max_segments];
    bake_region_id_t    segment_data[max_segments];
    void*               segment_data_addrs[max_segments];
    hg_size_t           segment_data_size[max_segments];
    for(auto i = 0 ; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)(&segment_keys[i]);
        segment_keys_size[i]  = sizeof(segment_key_t);
        segment_data_addrs[i] = (void*)(&segment_data[i]);
        segment_data_size[i]  = sizeof(bake_region_id_t);
    }

    /* iterate over and remove all segments for this oid */
387 388 389 390 391
    bool done = false;
    int seg_start_ndx = 0;
    while(!done) {
        size_t num_segments = max_segments;

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
        ret = sdskv_list_keyvals(sdskv_ph, seg_db_id,
                    (const void *)&lb, sizeof(lb),
                    segment_keys_addrs, segment_keys_size,
                    segment_data_addrs, segment_data_size,
                    &num_segments);

        if(ret != SDSKV_SUCCESS) {
            /* XXX should save the error and keep removing */
            ERROR fprintf(stderr, "write_op_exec_remove: "
                "error in sdskv_list_keyvals() (ret = %d)\n", ret);
            LEAVING;
            return;
        }

        size_t i;
407
        for(i = seg_start_ndx; i < num_segments; i++) {
408 409 410
            const segment_key_t&    seg    = segment_keys[i];
            const bake_region_id_t& region = segment_data[i];

411 412 413 414 415
            if(seg.oid != oid) {
                done = true;
                break;
            }

416 417 418 419
            if(seg.type == seg_type_t::BAKE_REGION) {
                ret = bake_remove(bake_ph, region);
                if (ret != BAKE_SUCCESS) {
                    /* XXX should save the error and keep removing */
Rob Latham's avatar
Rob Latham committed
420 421
                    ERROR bake_perror("write_op_exec_remove: "
                        "error in bake_remove()", ret);
422 423 424 425 426 427 428 429 430 431 432 433
                    LEAVING;
                    return;
                }
            }
            ret = sdskv_erase(sdskv_ph, seg_db_id, &seg, sizeof(seg));
            if(ret != SDSKV_SUCCESS) {
                ERROR fprintf(stderr,"write_op_exec_remove: "
                    "error in seg_db sdskv_erase() (ret = %d)\n", ret);
                LEAVING;
                return;
            }
        }
434 435 436 437 438
        if(num_segments != max_segments) {
            done = true;
        }
        seg_start_ndx = 1;
    }
439

440
    LEAVING;
441 442 443 444
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
445
    ENTERING;
446
	auto vargs = static_cast<server_visitor_args_t>(u);
447
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
448 449
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
450
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
451 452
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
453
    }
454

Matthieu Dorier's avatar
Matthieu Dorier committed
455
    insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
456
    LEAVING;
457 458 459 460
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
461
    ENTERING;
462
	auto vargs = static_cast<server_visitor_args_t>(u);
463
    oid_t oid = vargs->oid;
Matthieu Dorier's avatar
Matthieu Dorier committed
464 465
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
466
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
467 468 469
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
470
    }
471

Matthieu Dorier's avatar
Matthieu Dorier committed
472
    insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
473
    LEAVING;
474 475 476 477 478 479 480
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
481
    ENTERING;
Matthieu Dorier's avatar
Matthieu Dorier committed
482
    int ret;
483
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
484
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
485 486
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
487
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
488 489
    oid_t oid = vargs->oid;
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
490 491 492
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
493 494
    }

495 496 497 498 499 500 501 502 503 504
    /* find out the max key length */
    size_t max_k_len = 0;
    for(auto i=0; i<num; i++) {
        size_t s = strlen(keys[i]);
        max_k_len = max_k_len < s ? s : max_k_len;
    }

    /* create an omap key of the right size */
    omap_key_t* k = (omap_key_t*)calloc(1, max_k_len + sizeof(omap_key_t));

Matthieu Dorier's avatar
Matthieu Dorier committed
505
    for(auto i=0; i<num; i++) {
506 507 508 509 510 511
        size_t k_len = strlen(keys[i])+sizeof(omap_key_t);
        memset(k, 0, max_k_len + sizeof(omap_key_t));
        k->oid = oid;
        strcpy(k->key, keys[i]);
        ret = sdskv_put(sdskv_ph, omap_db_id,
                (const void*)k, k_len,
Matthieu Dorier's avatar
Matthieu Dorier committed
512 513 514 515 516
                (const void*)vals[i], lens[i]);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
        }
    }
517 518
    free(k);
    LEAVING;
519 520 521 522
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
523
    ENTERING;
524
    int ret;
525
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
526
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
527 528
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
529
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
530 531
    oid_t oid = vargs->oid;
    if(oid == 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
532 533 534
        ERROR fprintf(stderr,"oid == 0\n");
        LEAVING;
        return;
535 536
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
537 538 539 540 541 542
    for(auto i=0; i<num_keys; i++) {
        ret = sdskv_erase(sdskv_ph, omap_db_id, 
                (const void*)keys[i], strlen(keys[i])+1);
        if(ret != SDSKV_SUCCESS)
            fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
    }
543
    LEAVING;
544 545
}

546
static oid_t get_or_create_oid(
547 548 549 550 551 552 553
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name)
{
    ENTERING;
    oid_t oid = 0;
554
    hg_size_t s;
555
    int ret;
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581

    s = sizeof(oid);
    ret = sdskv_get(ph, name_db_id, (const void*)object_name,
            strlen(object_name)+1, &oid, &s);
    if(SDSKV_ERR_UNKNOWN_KEY == ret) {
        std::hash<std::string> hash_fn;
        oid = hash_fn(std::string(object_name));
        s = strlen(object_name)+1;
        char *name_check = (char *)malloc(s);
        if(!name_check) {
            LEAVING;
            return 0;
        }
        while(1) {
            /* avoid hash collisions by checking this oid mapping */
            ret = sdskv_get(ph, oid_db_id, (const void*)&oid,
                sizeof(oid), (void *)name_check, &s);
            if(ret == SDSKV_SUCCESS) {
                if(strncmp(object_name, name_check, s) == 0) {
                    /* the object has been created by someone else in the meantime...  */
                    free(name_check);
                    LEAVING;
                    return oid;
                }
                oid++;
                continue;
582
            }
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
            break;
        }
        free(name_check);
        // we make sure we stopped at an unknown key (not another SDSKV error)
        if(ret != SDSKV_ERR_UNKNOWN_KEY) {
            fprintf(stderr, "[ERROR] ret != SDSKV_ERR_UNKNOWN_KEY (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
        // set name => oid
        ret = sdskv_put(ph, name_db_id, (const void*)object_name,
                strlen(object_name)+1, &oid, sizeof(oid));
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[WARNING] after sdskv_put(name->oid), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
        // set oid => name
        ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
                (const void*)object_name, strlen(object_name)+1);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[WARNING] after sdskv_put(oid->name), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
607
        }
608
    }
609
    LEAVING;
610 611 612
    return oid;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
613 614 615 616 617
static void insert_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, 
        bake_region_id_t* region, double ts)
618
{
619
    ENTERING;
620 621 622 623 624 625
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
Matthieu Dorier's avatar
Matthieu Dorier committed
626 627 628 629 630 631
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)region, sizeof(*region));
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
632
    LEAVING;
633 634
}

Matthieu Dorier's avatar
Matthieu Dorier committed
635 636 637 638 639
static void insert_small_region_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len,
        const char* data, double ts)
640
{
641
    ENTERING;
642 643 644 645 646 647
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
Matthieu Dorier's avatar
Matthieu Dorier committed
648 649 650 651 652 653
    int ret = sdskv_put(ph, seg_db_id, 
            (const void*)&seg, sizeof(seg),
            (const void*)data, len);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
654
    LEAVING;
655 656
}

Matthieu Dorier's avatar
Matthieu Dorier committed
657 658 659 660
static void insert_zero_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, uint64_t len, double ts)
661
{
662
    ENTERING;
663 664 665 666 667 668
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
Matthieu Dorier's avatar
Matthieu Dorier committed
669 670 671 672 673 674
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
675
    LEAVING;
676 677
}

Matthieu Dorier's avatar
Matthieu Dorier committed
678 679 680 681
static void insert_punch_log_entry(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, uint64_t offset, double ts)
682
{
683
    ENTERING;
684 685 686 687 688 689
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
Matthieu Dorier's avatar
Matthieu Dorier committed
690 691 692 693 694 695
    int ret = sdskv_put(ph, seg_db_id,
            (const void*)&seg, sizeof(seg),
            (const void*)nullptr, 0);
    if(ret != SDSKV_SUCCESS) {
        ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
    }
696
    LEAVING;
697 698
}

Matthieu Dorier's avatar
Matthieu Dorier committed
699 700 701 702
uint64_t mobject_compute_object_size(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t seg_db_id,
        oid_t oid, double ts)
703
{
704
    ENTERING;
Matthieu Dorier's avatar
Matthieu Dorier committed
705 706 707 708
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;

709 710 711
    uint64_t size = 0; // current assumed size
    uint64_t max_size = std::numeric_limits<uint64_t>::max();

Shane Snyder's avatar
Shane Snyder committed
712
    size_t max_segments = 128;
Matthieu Dorier's avatar
Matthieu Dorier committed
713 714 715 716 717 718 719 720 721 722
    segment_key_t segment_keys[max_segments];
    void* segment_keys_addrs[max_segments];
    hg_size_t segment_keys_size[max_segments];

    for(auto i = 0; i < max_segments; i++) {
        segment_keys_addrs[i] = (void*)&segment_keys[i];
        segment_keys_size[i] = sizeof(segment_key_t);
    }

    bool done = false;
723
    int seg_start_ndx =  0;
Matthieu Dorier's avatar
Matthieu Dorier committed
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
    while(!done) {

        size_t num_items = max_segments;

        int ret = sdskv_list_keys(ph, seg_db_id,
            (const void*)&lb, sizeof(lb),
            segment_keys_addrs, segment_keys_size,
            &num_items);

        if(ret != SDSKV_SUCCESS) {
            ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
            LEAVING;
            return 0;
        }

739 740
        size_t i;
        for(i=seg_start_ndx; i < num_items; i++) {
Matthieu Dorier's avatar
Matthieu Dorier committed
741 742 743 744 745 746 747
            if(segment_keys[i].oid != oid) {
                done = true;
                break;
            }
            auto& seg = segment_keys[i];
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) {
748 749 750 751 752 753 754
                    size = std::min(seg.end_index, max_size);
                }
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(max_size > seg.start_index) {
                    max_size = seg.start_index;
                }
                if(size < seg.start_index) {
Matthieu Dorier's avatar
Matthieu Dorier committed
755 756 757 758 759 760 761 762 763 764
                    size = seg.start_index;
                }
                done = true;
                break;
            }
            lb.timestamp = seg.timestamp;
        }
        if(num_items != max_segments) {
            done = true;
        }
765
        seg_start_ndx = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
766
    }
767
    LEAVING;
768 769
    return size;
}