core-write-op.cpp 11.2 KB
Newer Older
1
#include <map>
2
#include <cstring>
3
4
#include <string>
#include <iostream>
5
#include <limits>
6
#include <bake-client.h>
7
8
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
9
#include "src/server/core/fake-kv.hpp"
10
11
12
13
14
15
16
17
18
19
20
21
22
23

static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);

24
static oid_t get_or_create_oid(const char* name);
25
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts = -1.0);
26
27
28
29
30
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts=-1.0);
static uint64_t compute_size(oid_t oid, double ts);

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
static struct write_op_visitor write_op_exec = {
	.visit_begin        = write_op_exec_begin,
	.visit_create       = write_op_exec_create,
	.visit_write        = write_op_exec_write,
	.visit_write_full   = write_op_exec_write_full,
	.visit_writesame    = write_op_exec_writesame,
	.visit_append       = write_op_exec_append,
	.visit_remove       = write_op_exec_remove,
	.visit_truncate     = write_op_exec_truncate,
	.visit_zero         = write_op_exec_zero,
	.visit_omap_set     = write_op_exec_omap_set,
	.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
	.visit_end          = write_op_exec_end
};

extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
	/* Execute the operation chain */
	execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}

void write_op_exec_begin(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_end(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
}

void write_op_exec_create(void* u, int exclusive)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
65
    get_or_create_oid(vargs->object_name);
66
67
68
69
70
}

void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
71
72
    oid_t oid = get_or_create_oid(vargs->object_name);

73
74
    bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
75
    bake_region_id_t rid;
76
    hg_bulk_t remote_bulk = vargs->bulk_handle;
77
78
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
79
80
    int ret;

81
82
    if(len > SMALL_REGION_THRESHOLD) {
        // TODO: check return values of those calls
83
84
85
        ret = bake_create(bake_ph, bti, len, &rid);
        ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
        ret = bake_persist(bake_ph, rid);
86
   
87
88
89
90
91
92
93
94
95
96
97
98
99
        insert_region_log_entry(oid, offset, len, &rid);
    } else {
        margo_instance_id mid = vargs->srv_ctx->mid;
        char data[SMALL_REGION_THRESHOLD];
        void* buf_ptrs[1] = {(void*)(&data[0])};
        hg_size_t buf_sizes[1] = {len};
        hg_bulk_t handle;
        ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
        ret = margo_bulk_free(handle);

        insert_small_region_log_entry(oid, offset, len, data);
    }
100
101
102
103
}

void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
104
105
106
107
    // TODO: this function will not be valid if the new object is
    // smaller than its previous version. Instead we should remove the object
    // and re-create it.

108
	auto vargs = static_cast<server_visitor_args_t>(u);
109
110
    oid_t oid = get_or_create_oid(vargs->object_name);

111
112
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
113
    bake_region_id_t rid;
114
    hg_bulk_t remote_bulk = vargs->bulk_handle;
115
116
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
117
118
    int ret;

119
120
121
122
    unsigned i;
    fprintf(stderr, "In Mobject, input bti is ");
    for(i=0; i<16; i++) fprintf(stderr, "%d ", bti.id[i]);
    fprintf(stderr, "\n");
123
    // TODO: check return values of those calls
124
125
126
    ret = bake_create(bph, bti, len, &rid);
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
    ret = bake_persist(bph, rid);
127
    insert_region_log_entry(oid, 0, len, &rid);
128
129
130
131
}

void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
132
133
134
    auto vargs = static_cast<server_visitor_args_t>(u);
    oid_t oid = get_or_create_oid(vargs->object_name);

135
136
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
137
    bake_region_id_t rid;
138
    hg_bulk_t remote_bulk = vargs->bulk_handle;
139
140
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
141
142
    int ret;

143
144
145
146
147
    unsigned ii;
    fprintf(stderr, "In Mobject, input bti is ");
    for(ii=0; ii<16; ii++) fprintf(stderr, "%d ", bti.id[ii]);
    fprintf(stderr, "\n");

148
    // TODO: check return values of those calls
149
150
151
    ret = bake_create(bph, bti, data_len, &rid);
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
    ret = bake_persist(bph, rid);
152

153
154
155
156
157
158
159
160
    size_t i;

    //double ts = ABT_get_wtime();
    for(i=0; i < write_len; i += data_len) {
        segment_key_t seg;
        // TODO normally we should have the same timestamps but right now it bugs...
        insert_region_log_entry(oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
    }
161
162
163
164
165
}

void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
166
167
    oid_t oid = get_or_create_oid(vargs->object_name);

168
169
    bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
    bake_target_id_t bti = vargs->srv_ctx->bake_tid;
170
    bake_region_id_t rid;
171
    hg_bulk_t remote_bulk = vargs->bulk_handle;
172
173
    const char* remote_addr_str = vargs->client_addr_str;
    hg_addr_t   remote_addr     = vargs->client_addr;
174
175
    int ret;

176
177
178
179
180
181
182
183
    unsigned i;
    fprintf(stderr, "In Mobject, input bti is ");
    for(i=0; i<16; i++) fprintf(stderr, "%d ", bti.id[i]);
    fprintf(stderr, "\n");

    ret = bake_create(bph, bti, len, &rid);
    ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
    ret = bake_persist(bph, rid);
184

185
186
187
188
189
    // find out the current length of the object
    double ts = ABT_get_wtime();
    uint64_t offset = compute_size(oid,ts);
    
    insert_region_log_entry(oid, offset, len, &rid, ts);
190
191
192
193
194
}

void write_op_exec_remove(void* u)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
195
196
    write_op_exec_truncate(u,0);
    // TODO: technically should mark the object as removed
197
198
199
200
201
}

void write_op_exec_truncate(void* u, uint64_t offset)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
202
203
204
    oid_t oid = get_or_create_oid(vargs->object_name);

    insert_punch_log_entry(oid, offset);
205
206
207
208
209
}

void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
210
211
212
    oid_t oid = get_or_create_oid(vargs->object_name);

    insert_zero_log_entry(oid, offset, len);
213
214
215
216
217
218
219
220
}

void write_op_exec_omap_set(void* u, char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
221
222
223
224
225
226
227
228
229
    oid_t oid = get_or_create_oid(vargs->object_name);

    for(auto i=0; i<num; i++) {
        std::vector<char> val(vals[i], vals[i]+lens[i]);
        omap_key_t omk;
        omk.oid = oid;
        omk.key = std::string(keys[i]);
        omap_map[std::move(omk)] = std::move(val);
    }
230
231
232
233
234
}

void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
	auto vargs = static_cast<server_visitor_args_t>(u);
235
236
237
238
239
240
241
242
    oid_t oid = get_or_create_oid(vargs->object_name);
    
    for(auto i=0; i < num_keys; i++) {
        omap_key_t omk;
        omk.oid = oid;
        omk.key = std::string(keys[i]);
        omap_map.erase(omk);
    }
243
244
}

245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
oid_t get_or_create_oid(const char* object_name) 
{
    oid_t oid = 0;
    std::string name(object_name);
    // check that the object exists, if not, create the object
    if(name_map.count(name) == 0) {
        std::hash<std::string> hash_fn;
        oid = hash_fn(name);
        while(oid_map.count(oid) != 0 || oid == 0) {
            oid += 1;
        }
        name_map[name] = oid;
        oid_map[oid]   = name;
    } else {
        oid = name_map[name];
    }
    return oid;
}

264
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts)
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::BAKE_REGION;
    segment_map[seg] = *region;
}

static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::SMALL_REGION;
    void* b = static_cast<void*>(&segment_map[seg]);
    std::memcpy(b, data, len);
}

static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index    = offset;
    seg.end_index      = offset+len;
    seg.type      = seg_type_t::ZERO;
295
    segment_map[seg] = bake_region_id_t();
296
297
298
299
300
301
302
303
304
305
}

static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts)
{
    segment_key_t seg;
    seg.oid       = oid;
    seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
    seg.start_index = offset;
    seg.end_index  = std::numeric_limits<uint64_t>::max();
    seg.type      = seg_type_t::TOMBSTONE;
306
    segment_map[seg] = bake_region_id_t();
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
}

uint64_t compute_size(oid_t oid, double ts)
{
    segment_key_t lb;
    lb.oid = oid;
    lb.timestamp = ts;
    uint64_t size = 0;
    auto it = segment_map.lower_bound(lb);
    for(; it != segment_map.end(); it++) {
        if(it->first.oid != oid) break;
            auto& seg = it->first;
            if(seg.type < seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.end_index;
            } else if(seg.type == seg_type_t::TOMBSTONE) {
                if(size < seg.end_index) size = seg.start_index;
                break;
            }
    }
    return size;
}