Commit 7773042e authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

done adding small-region-optimization everywhere

parent 9fb2d579
......@@ -109,108 +109,6 @@ void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
LEAVING;
}
#if 0
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
uint64_t client_start_index = offset;
uint64_t client_end_index = offset+len;
*prval = 0;
// find oid
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
segment_key_t lb;
lb.oid = oid;
lb.timestamp = std::numeric_limits<double>::max();
auto it = segment_map.lower_bound(lb);
covermap<uint64_t> coverage(offset, offset+len);
while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) {
const segment_key_t& seg = it->first;
const bake_region_id_t& region = it->second;
switch(seg.type) {
case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.end_index) *bytes_read = seg.end_index;
break;
case seg_type_t::TOMBSTONE:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.start_index) *bytes_read = seg.start_index;
break;
case seg_type_t::BAKE_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size);
if(ret != 0) {
ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
break;
}
case seg_type_t::SMALL_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
const char* base = static_cast<const char*>((void*)(&region));
margo_instance_id mid = vargs->srv_ctx->mid;
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
void* buf_ptrs[1] = { const_cast<char*>(base + region_offset) };
hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_free(handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
}
}
it++;
}
LEAVING;
}
#endif
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
......@@ -507,7 +405,6 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
memset(lb, 0, lb_size);
lb->oid = oid;
strcpy(lb->key, k);
//lb_size = strlen(k) + sizeof(omap_key_t);
} while(items_retrieved == max_items && count < max_return);
......
......@@ -136,7 +136,6 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
int ret;
if(len > SMALL_REGION_THRESHOLD) {
// TODO: check return values of those calls
ret = bake_create(bake_ph, bti, len, &rid);
if(ret != 0) {
ERROR fprintf(stderr,"bake_create returned %d\n",ret);
......@@ -179,48 +178,9 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
ENTERING;
// TODO: this function will not be valid if the new object is
// smaller than its previous version. Instead we should remove the object
// and re-create it.
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
unsigned i;
// TODO: check return values of those calls
ret = bake_create(bph, bti, len, &rid);
if(ret != 0) {
ERROR fprintf(stderr,"bake_create() returned %d\n", ret);
LEAVING;
return;
}
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
if(ret != 0) {
ERROR fprintf(stderr,"bake_proxy_write() returned %d\n", ret);
LEAVING;
return;
}
ret = bake_persist(bph, rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist() returned %d\n", ret);
LEAVING;
return;
}
insert_region_log_entry(sdskv_ph, seg_db_id, oid, 0, len, &rid);
// truncate to 0 then write
write_op_exec_truncate(u,0);
write_op_exec_write(u, buf, len, 0);
LEAVING;
}
......@@ -237,26 +197,70 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
return;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
// TODO: check return values of those calls
ret = bake_create(bph, bti, data_len, &rid);
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
ret = bake_persist(bph, rid);
if(data_len > SMALL_REGION_THRESHOLD) {
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
size_t i;
ret = bake_create(bph, bti, data_len, &rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_create returned %d\n", ret);
LEAVING;
return;
}
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
LEAVING;
return;
}
ret = bake_persist(bph, rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
LEAVING;
return;
}
size_t i;
//double ts = ABT_get_wtime();
for(i=0; i < write_len; i += data_len) {
// TODO normally we should have the same timestamps but right now it bugs...
insert_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
}
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
void* buf_ptrs[1] = {(void*)(&data[0])};
hg_size_t buf_sizes[1] = {data_len};
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
}
ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, data_len);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
}
ret = margo_bulk_free(handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
//double ts = ABT_get_wtime();
for(i=0; i < write_len; i += data_len) {
// TODO normally we should have the same timestamps but right now it bugs...
insert_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
size_t i;
for(i=0; i < write_len; i+= data_len) {
insert_small_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len-i), data);
}
}
LEAVING;
}
......@@ -274,23 +278,49 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
return;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
ret = bake_create(bph, bti, len, &rid);
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_persist(bph, rid);
// find out the current length of the object
// find out the current length of the object
double ts = ABT_get_wtime();
uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid,ts);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);
uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);
if(len > SMALL_REGION_THRESHOLD) {
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
ret = bake_create(bph, bti, len, &rid);
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_persist(bph, rid);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
void* buf_ptrs[1] = {(void*)(&data[0])};
hg_size_t buf_sizes[1] = {len};
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
}
ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
}
ret = margo_bulk_free(handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
}
LEAVING;
}
......@@ -473,9 +503,6 @@ static void insert_region_log_entry(
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::BAKE_REGION;
#if 0
segment_map[seg] = *region;
#endif
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)region, sizeof(*region));
......@@ -498,10 +525,6 @@ static void insert_small_region_log_entry(
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::SMALL_REGION;
#if 0
void* b = static_cast<void*>(&segment_map[seg]);
std::memcpy(b, data, len);
#endif
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)data, len);
......@@ -526,9 +549,6 @@ static void insert_zero_log_entry(
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)nullptr, 0);
#if 0
segment_map[seg] = bake_region_id_t();
#endif
if(ret != SDSKV_SUCCESS) {
ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
}
......@@ -547,9 +567,6 @@ static void insert_punch_log_entry(
seg.start_index = offset;
seg.end_index = std::numeric_limits<uint64_t>::max();
seg.type = seg_type_t::TOMBSTONE;
#if 0
segment_map[seg] = bake_region_id_t();
#endif
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)nullptr, 0);
......@@ -565,28 +582,13 @@ uint64_t mobject_compute_object_size(
oid_t oid, double ts)
{
ENTERING;
#if 0
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ts;
uint64_t size = 0;
auto it = segment_map.lower_bound(lb);
for(; it != segment_map.end(); it++) {
if(it->first.oid != oid) break;
auto& seg = it->first;
if(seg.type < seg_type_t::TOMBSTONE) {
if(size < seg.end_index) size = seg.end_index;
} else if(seg.type == seg_type_t::TOMBSTONE) {
if(size < seg.end_index) size = seg.start_index;
break;
}
}
#else
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ts;
uint64_t size = 0;
uint64_t size = 0; // current assumed size
uint64_t max_size = std::numeric_limits<uint64_t>::max();
size_t max_segments = 10;
segment_key_t segment_keys[max_segments];
void* segment_keys_addrs[max_segments];
......@@ -621,9 +623,14 @@ uint64_t mobject_compute_object_size(
}
auto& seg = segment_keys[i];
if(seg.type < seg_type_t::TOMBSTONE) {
if(size < seg.end_index) size = seg.end_index;
} else if(seg.type == seg_type_t::TOMBSTONE) {
if(size < seg.end_index) {
size = std::min(seg.end_index, max_size);
}
} else if(seg.type == seg_type_t::TOMBSTONE) {
if(max_size > seg.start_index) {
max_size = seg.start_index;
}
if(size < seg.start_index) {
size = seg.start_index;
}
done = true;
......@@ -635,7 +642,6 @@ uint64_t mobject_compute_object_size(
done = true;
}
}
#endif
LEAVING;
return size;
}
......@@ -13,6 +13,21 @@ typedef enum seg_type_t {
TOMBSTONE = 3
} seg_type_t;
/* a ZERO segment has no data attached in the kv database,
it indicates that the segment is filled with 0s */
/* a BAKE_REGION segment has a bake_region_id_t as value
in a kv database. It indicates that the data for the
[start_index, end_index[ segment of the object can be
found in this bulk region. */
/* a SMALL_REGION segment is the same as a BAKE_REGION
but the content of the value in the database is the
data itself, not a bake_region_id_t. Note that the
threshold to use small region optimizations
(SMALL_REGION_THRESHOLD) should not exceed the value
of sizeof(bake_region_id_t). */
/* a TOMSTONE segment is used to invalidate a portion
of an object. This portion if [start_index, +infinity[. */
typedef struct segment_key_t {
oid_t oid;
uint32_t type; /* seg_type */
......@@ -30,4 +45,5 @@ typedef struct omap_key_t {
#define MAX_OMAP_VAL_SIZE 256
#define SMALL_REGION_THRESHOLD (sizeof(bake_region_id_t))
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment