core-write-op.cpp 10.5 KB
Newer Older
1
#include <map>
2
#include <cstring>
3 4
#include <string>
#include <iostream>
5
#include <limits>
6
#include <bake-bulk-client.h>
7 8
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
9
#include "src/server/core/fake-kv.hpp"
10 11 12 13 14 15 16 17 18 19 20 21 22 23

static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

24 25 26 27 28 29 30
static oid_t get_or_create_oid(const char* name);
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_bulk_region_id_t* region, double ts = -1.0);
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts=-1.0);
static uint64_t compute_size(oid_t oid, double ts);

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
65
    get_or_create_oid(vargs->object_name);
66 67 68 69 70
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
71 72
    oid_t oid = get_or_create_oid(vargs->object_name);

73 74 75
    bake_target_id_t bti = vargs->srv_ctx->bake_id;
    bake_bulk_region_id_t rid;
    hg_bulk_t remote_bulk = vargs->bulk_handle;
76 77
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
78 79
    int ret;

80 81 82 83 84
    if(len > SMALL_REGION_THRESHOLD) {
        // TODO: check return values of those calls
        ret = bake_bulk_create(bti, len, &rid);
        ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
        ret = bake_bulk_persist(bti, rid);
85
   
86 87 88 89 90 91 92 93 94 95 96 97 98
        insert_region_log_entry(oid, offset, len, &rid);
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        ret = margo_bulk_free(handle);

        insert_small_region_log_entry(oid, offset, len, data);
    }
99 100 101 102
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
103 104 105 106
    // TODO: this function will not be valid if the new object is
    // smaller than its previous version. Instead we should remove the object
    // and re-create it.

107
	auto vargs = static_cast<server_visitor_args_t>(u);
108 109
    oid_t oid = get_or_create_oid(vargs->object_name);

110 111 112
    bake_target_id_t bti = vargs->srv_ctx->bake_id;
    bake_bulk_region_id_t rid;
    hg_bulk_t remote_bulk = vargs->bulk_handle;
113 114
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
115 116 117 118
    int ret;

    // TODO: check return values of those calls
    ret = bake_bulk_create(bti, len, &rid);
119
    ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
120
    ret = bake_bulk_persist(bti, rid);
121
    insert_region_log_entry(oid, 0, len, &rid);
122 123 124 125
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
126 127 128
    auto vargs = static_cast<server_visitor_args_t>(u);
    oid_t oid = get_or_create_oid(vargs->object_name);

129 130 131
    bake_target_id_t bti = vargs->srv_ctx->bake_id;
    bake_bulk_region_id_t rid;
    hg_bulk_t remote_bulk = vargs->bulk_handle;
132 133
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
134 135 136 137
    int ret;

    // TODO: check return values of those calls
    ret = bake_bulk_create(bti, data_len, &rid);
138
    ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
139 140
    ret = bake_bulk_persist(bti, rid);

141 142 143 144 145 146 147 148
    size_t i;

    //double ts = ABT_get_wtime();
    for(i=0; i < write_len; i += data_len) {
        segment_key_t seg;
        // TODO normally we should have the same timestamps but right now it bugs...
        insert_region_log_entry(oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
    }
149 150 151 152 153
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
154 155
    oid_t oid = get_or_create_oid(vargs->object_name);

156 157 158
    bake_target_id_t bti = vargs->srv_ctx->bake_id;
    bake_bulk_region_id_t rid;
    hg_bulk_t remote_bulk = vargs->bulk_handle;
159 160
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
161 162 163
    int ret;

    ret = bake_bulk_create(bti, len, &rid);
164
    ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
165 166
    ret = bake_bulk_persist(bti, rid);

167 168 169 170 171
    // find out the current length of the object
    double ts = ABT_get_wtime();
    uint64_t offset = compute_size(oid,ts);
    
    insert_region_log_entry(oid, offset, len, &rid, ts);
172 173 174 175 176
}

void write_op_exec_remove(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
177 178
    write_op_exec_truncate(u,0);
    // TODO: technically should mark the object as removed
179 180 181 182 183
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
184 185 186
    oid_t oid = get_or_create_oid(vargs->object_name);

    insert_punch_log_entry(oid, offset);
187 188 189 190 191
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
192 193 194
    oid_t oid = get_or_create_oid(vargs->object_name);

    insert_zero_log_entry(oid, offset, len);
195 196 197 198 199 200 201 202
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
203 204 205 206 207 208 209 210 211
    oid_t oid = get_or_create_oid(vargs->object_name);

    for(auto i=0; i<num; i++) {
        std::vector<char> val(vals[i], vals[i]+lens[i]);
        omap_key_t omk;
        omk.oid = oid;
        omk.key = std::string(keys[i]);
        omap_map[std::move(omk)] = std::move(val);
    }
212 213 214 215 216
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
217 218 219 220 221 222 223 224
    oid_t oid = get_or_create_oid(vargs->object_name);
    
    for(auto i=0; i < num_keys; i++) {
        omap_key_t omk;
        omk.oid = oid;
        omk.key = std::string(keys[i]);
        omap_map.erase(omk);
    }
225 226
}

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
oid_t get_or_create_oid(const char* object_name) 
{
    oid_t oid = 0;
    std::string name(object_name);
    // check that the object exists, if not, create the object
    if(name_map.count(name) == 0) {
        std::hash<std::string> hash_fn;
        oid = hash_fn(name);
        while(oid_map.count(oid) != 0 || oid == 0) {
            oid += 1;
        }
        name_map[name] = oid;
        oid_map[oid]   = name;
    } else {
        oid = name_map[name];
    }
    return oid;
}

static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_bulk_region_id_t* region, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
    segment_map[seg] = *region;
}

static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
    void* b = static_cast<void*>(&segment_map[seg]);
    std::memcpy(b, data, len);
}

static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
    segment_map[seg] = bake_bulk_region_id_t();
}

static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
    segment_map[seg] = bake_bulk_region_id_t();
}

uint64_t compute_size(oid_t oid, double ts)
{
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;
    uint64_t size = 0;
    auto it = segment_map.lower_bound(lb);
    for(; it != segment_map.end(); it++) {
        if(it->first.oid != oid) break;
            auto& seg = it->first;
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.end_index;
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.start_index;
                break;
            }
    }
    return size;
}