core-write-op.cpp 17.6 KB
Newer Older
1
#include <map>
2
#include <cstring>
3 4
#include <string>
#include <iostream>
5
#include <limits>
6
#include <bake-client.h>
7 8
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
9
#include "src/server/core/fake-kv.hpp"
10

11
#if 0
12 13 14 15
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING  {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR    {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
16 17 18 19 20
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
21

22 23 24 25 26 27 28 29 30 31 32 33 34
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

35 36 37 38 39 40
static oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name);

41
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts = -1.0);
42 43 44 45 46
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts=-1.0);
static uint64_t compute_size(oid_t oid, double ts);

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
80
    ENTERING;
81
	auto vargs = static_cast<server_visitor_args_t>(u);
82 83 84
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
85 86 87 88 89
    oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
    if(oid == 0) {
        ERROR fprintf(stderr,"oid == 0\n");
    }
    LEAVING;
90 91 92 93
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
94
    ENTERING;
95
	auto vargs = static_cast<server_visitor_args_t>(u);
96 97 98 99 100 101
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
102 103 104 105
        if(oid == 0) {
            ERROR fprintf(stderr,"oid == 0\n");
            return;
        }
106 107
        vargs->oid = oid;
    }
108

109 110
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
111
    bake_region_id_t rid;
112
    hg_bulk_t remote_bulk = vargs->bulk_handle;
113 114
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
115 116
    int ret;

117 118
    if(len > SMALL_REGION_THRESHOLD) {
        // TODO: check return values of those calls
119
        ret = bake_create(bake_ph, bti, len, &rid);
120 121 122 123
        if(ret != 0) {
            ERROR fprintf(stderr,"bake_create returned %d\n",ret);
            return;
        }
124
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
125 126 127
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
        }
128
        ret = bake_persist(bake_ph, rid);
129 130 131
        if(ret != 0) {
            ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
        }
132
   
133 134 135 136 137 138 139 140
        insert_region_log_entry(oid, offset, len, &rid);
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
141 142 143
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
        }
144
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
145 146 147
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
        }
148
        ret = margo_bulk_free(handle);
149 150 151
        if(ret != 0) {
            ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
        }
152 153 154

        insert_small_region_log_entry(oid, offset, len, data);
    }
155
    LEAVING;
156 157 158 159
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
160
    ENTERING;
161 162 163 164
    // TODO: this function will not be valid if the new object is
    // smaller than its previous version. Instead we should remove the object
    // and re-create it.

165
	auto vargs = static_cast<server_visitor_args_t>(u);
166 167 168 169 170 171 172 173
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }
174

175 176
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
177
    bake_region_id_t rid;
178
    hg_bulk_t remote_bulk = vargs->bulk_handle;
179 180
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
181 182
    int ret;

183
    unsigned i;
184
    // TODO: check return values of those calls
185
    ret = bake_create(bph, bti, len, &rid);
186 187 188 189 190
    if(ret != 0) {
        ERROR fprintf(stderr,"bake_create() returned %d\n", ret);
        LEAVING;
        return;
    }
191
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
192 193 194 195 196
    if(ret != 0) {
        ERROR fprintf(stderr,"bake_proxy_write() returned %d\n", ret);
        LEAVING;
        return;
    }
197
    ret = bake_persist(bph, rid);
198 199 200 201 202
    if(ret != 0) {
        ERROR fprintf(stderr, "bake_persist() returned %d\n", ret);
        LEAVING;
        return;
    }
203
    insert_region_log_entry(oid, 0, len, &rid);
204
    LEAVING;
205 206 207 208
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
209
    ENTERING;
210
    auto vargs = static_cast<server_visitor_args_t>(u);
211 212 213 214 215 216 217 218
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }
219

220 221
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
222
    bake_region_id_t rid;
223
    hg_bulk_t remote_bulk = vargs->bulk_handle;
224 225
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
226 227 228
    int ret;

    // TODO: check return values of those calls
229 230 231
    ret = bake_create(bph, bti, data_len, &rid);
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
    ret = bake_persist(bph, rid);
232

233 234 235 236 237 238 239 240
    size_t i;

    //double ts = ABT_get_wtime();
    for(i=0; i < write_len; i += data_len) {
        segment_key_t seg;
        // TODO normally we should have the same timestamps but right now it bugs...
        insert_region_log_entry(oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
    }
241
    LEAVING;
242 243 244 245
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
246
    ENTERING;
247
	auto vargs = static_cast<server_visitor_args_t>(u);
248 249 250 251 252 253 254 255
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }
256

257 258
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
259
    bake_region_id_t rid;
260
    hg_bulk_t remote_bulk = vargs->bulk_handle;
261 262
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
263 264
    int ret;

265 266 267
    ret = bake_create(bph, bti, len, &rid);
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
    ret = bake_persist(bph, rid);
268

269 270 271 272 273
    // find out the current length of the object
    double ts = ABT_get_wtime();
    uint64_t offset = compute_size(oid,ts);
    
    insert_region_log_entry(oid, offset, len, &rid, ts);
274
    LEAVING;
275 276 277 278
}

void write_op_exec_remove(void* u)
{
279
    ENTERING;
280
	auto vargs = static_cast<server_visitor_args_t>(u);
281 282
    write_op_exec_truncate(u,0);
    // TODO: technically should mark the object as removed
283
    LEAVING;
284 285 286 287
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
288
    ENTERING;
289
	auto vargs = static_cast<server_visitor_args_t>(u);
290 291 292 293 294 295 296 297
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }
298 299

    insert_punch_log_entry(oid, offset);
300
    LEAVING;
301 302 303 304
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
305
    ENTERING;
306
	auto vargs = static_cast<server_visitor_args_t>(u);
307 308 309 310 311 312 313 314
    oid_t oid = vargs->oid;
    if(oid == 0) {
        sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
        sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
        sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }
315 316

    insert_zero_log_entry(oid, offset, len);
317
    LEAVING;
318 319 320 321 322 323 324
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
325
    ENTERING;
Matthieu Dorier's avatar
Matthieu Dorier committed
326
    int ret;
327
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
328
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
329 330
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
331
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
332 333 334 335 336 337
    oid_t oid = vargs->oid;
    if(oid == 0) {
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }

338 339 340 341 342 343 344 345 346 347
    /* find out the max key length */
    size_t max_k_len = 0;
    for(auto i=0; i<num; i++) {
        size_t s = strlen(keys[i]);
        max_k_len = max_k_len < s ? s : max_k_len;
    }

    /* create an omap key of the right size */
    omap_key_t* k = (omap_key_t*)calloc(1, max_k_len + sizeof(omap_key_t));

Matthieu Dorier's avatar
Matthieu Dorier committed
348
    for(auto i=0; i<num; i++) {
349 350 351 352 353 354
        size_t k_len = strlen(keys[i])+sizeof(omap_key_t);
        memset(k, 0, max_k_len + sizeof(omap_key_t));
        k->oid = oid;
        strcpy(k->key, keys[i]);
        ret = sdskv_put(sdskv_ph, omap_db_id,
                (const void*)k, k_len,
Matthieu Dorier's avatar
Matthieu Dorier committed
355 356 357 358 359
                (const void*)vals[i], lens[i]);
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
        }
    }
360 361
    free(k);
    LEAVING;
362 363 364 365
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
366 367
    int ret;

368
    ENTERING;
369
	auto vargs = static_cast<server_visitor_args_t>(u);
Matthieu Dorier's avatar
Matthieu Dorier committed
370
    sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
371 372
    sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
    sdskv_database_id_t oid_db_id  = vargs->srv_ctx->oid_db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
373
    sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
374 375 376 377 378 379
    oid_t oid = vargs->oid;
    if(oid == 0) {
        oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
        vargs->oid = oid;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
380 381 382 383 384 385
    for(auto i=0; i<num_keys; i++) {
        ret = sdskv_erase(sdskv_ph, omap_db_id, 
                (const void*)keys[i], strlen(keys[i])+1);
        if(ret != SDSKV_SUCCESS)
            fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
    }
386
    LEAVING;
387 388
}

389 390 391 392 393
oid_t get_or_create_oid(
        sdskv_provider_handle_t ph,
        sdskv_database_id_t name_db_id,
        sdskv_database_id_t oid_db_id,
        const char* object_name) 
394
{
395 396
    ENTERING;

397
    oid_t oid = 0;
398 399 400 401 402 403
    hg_size_t vsize = sizeof(oid);
    int ret;

    ret = sdskv_get(ph, name_db_id, (const void*)object_name,
            strlen(object_name)+1, &oid, &vsize);
    if(SDSKV_ERR_UNKNOWN_KEY == ret) {
404
        std::hash<std::string> hash_fn;
405 406 407 408 409 410 411 412
        oid = hash_fn(std::string(object_name));
        ret = SDSKV_SUCCESS;
        while(ret == SDSKV_SUCCESS) {
            hg_size_t s = 0;
            if(oid != 0) {
                ret = sdskv_length(ph, oid_db_id, (const void*)&oid,
                            sizeof(oid), &s);
            }
413 414
            oid += 1;
        }
415
        // we make sure we stopped at an unknown key (not another SDSKV error)
416 417 418 419 420
        if(ret != SDSKV_ERR_UNKNOWN_KEY) {
            fprintf(stderr, "[ERROR] ret != SDSKV_ERR_UNKNOWN_KEY (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
421 422 423
        // set name => oid
        ret = sdskv_put(ph, name_db_id, (const void*)object_name,
                strlen(object_name)+1, &oid, sizeof(oid));
424 425 426 427 428
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(name->oid), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
429 430 431
        // set oid => name
        ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
                (const void*)object_name, strlen(object_name)+1);
432 433 434 435 436
        if(ret != SDSKV_SUCCESS) {
            fprintf(stderr, "[ERROR] after sdskv_put(oid->name), ret != SDSKV_SUCCESS (ret = %d)\n", ret);
            LEAVING;
            return 0;
        }
437
    }
438
    LEAVING;
439 440 441
    return oid;
}

442
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts)
443
{
444
    ENTERING;
445 446 447 448 449 450 451
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
    segment_map[seg] = *region;
452
    LEAVING;
453 454 455 456
}

static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts)
{
457
    ENTERING;
458 459 460 461 462 463 464 465
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
    void* b = static_cast<void*>(&segment_map[seg]);
    std::memcpy(b, data, len);
466
    LEAVING;
467 468 469 470
}

static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts)
{
471
    ENTERING;
472 473 474 475 476 477
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
478
    segment_map[seg] = bake_region_id_t();
479
    LEAVING;
480 481 482 483
}

static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts)
{
484
    ENTERING;
485 486 487 488 489 490
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
491
    segment_map[seg] = bake_region_id_t();
492
    LEAVING;
493 494 495 496
}

uint64_t compute_size(oid_t oid, double ts)
{
497
    ENTERING;
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;
    uint64_t size = 0;
    auto it = segment_map.lower_bound(lb);
    for(; it != segment_map.end(); it++) {
        if(it->first.oid != oid) break;
            auto& seg = it->first;
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.end_index;
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.start_index;
                break;
            }
    }
513
    LEAVING;
514 515
    return size;
}