Commit e2e4d311 authored by Shane Snyder's avatar Shane Snyder
Browse files

add a sequence id to segment keys

parent 56a13b9a
......@@ -41,13 +41,14 @@ static void read_op_exec_end(void*);
extern uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts);
oid_t oid, time_t ts);
static oid_t get_oid_from_name(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
const char* name);
#if 0
struct read_request_t {
double timestamp; // timestamp at which the segment was created
uint64_t absolute_start_index; // start index within the object
......@@ -57,6 +58,7 @@ struct read_request_t {
uint64_t client_offset; // offset within the client's buffer
bake_region_id_t region; // region id
};
#endif
static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
......@@ -103,7 +105,7 @@ void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
return;
}
double ts = ABT_get_wtime();
time_t ts = time(NULL);
*psize = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);
LEAVING;
......@@ -139,7 +141,8 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ABT_get_wtime();
lb.timestamp = time(NULL);
lb.seq_id = MOBJECT_SEQ_ID_MAX;
covermap<uint64_t> coverage(offset, offset+len);
......@@ -264,6 +267,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
} // end switch
// update the start key timestamp to that of the last processed segment
lb.timestamp = seg.timestamp;
lb.seq_id = seg.seq_id;
} // end for
seg_start_ndx = 1;
......
......@@ -43,32 +43,28 @@ static oid_t get_or_create_oid(
const char* object_name);
static void insert_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
struct mobject_server_context *srv_ctx,
oid_t oid, uint64_t offset, uint64_t len,
bake_region_id_t* region, double ts = -1.0);
bake_region_id_t* region, time_t ts = 0);
static void insert_small_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
struct mobject_server_context *srv_ctx,
oid_t oid, uint64_t offset, uint64_t len,
const char* data, double ts = -1.0);
const char* data, time_t ts = 0);
static void insert_zero_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
struct mobject_server_context *srv_ctx,
oid_t oid, uint64_t offset,
uint64_t len, double ts=-1.0);
uint64_t len, time_t ts=0);
static void insert_punch_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, double ts=-1.0);
struct mobject_server_context *srv_ctx,
oid_t oid, uint64_t offset, time_t ts=0);
uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts);
oid_t oid, time_t ts);
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
......@@ -124,8 +120,6 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
......@@ -155,7 +149,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR bake_perror("bake_persist", ret);
}
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
insert_region_log_entry(vargs->srv_ctx, oid, offset, len, &rid);
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
......@@ -175,7 +169,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
insert_small_region_log_entry(vargs->srv_ctx, oid, offset, len, data);
}
LEAVING;
}
......@@ -194,8 +188,6 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
......@@ -234,11 +226,11 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
size_t i;
//double ts = ABT_get_wtime();
//time_t ts = time(NULL);
for(i=0; i < write_len; i += data_len) {
// TODO normally we should have the same timestamps but right now it bugs...
insert_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
insert_region_log_entry(vargs->srv_ctx, oid, offset+i,
std::min(data_len, write_len - i), &rid);//, ts);
}
} else {
......@@ -263,8 +255,8 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
size_t i;
for(i=0; i < write_len; i+= data_len) {
insert_small_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len-i), data);
insert_small_region_log_entry(vargs->srv_ctx, oid, offset+i,
std::min(data_len, write_len-i), data);
}
}
LEAVING;
......@@ -275,8 +267,6 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
......@@ -289,8 +279,9 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
int ret;
// find out the current length of the object
double ts = ABT_get_wtime();
uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);
time_t ts = time(NULL);
uint64_t offset = mobject_compute_object_size(vargs->srv_ctx->sdskv_ph,
vargs->srv_ctx->segment_db_id, oid, ts);
if(len > SMALL_REGION_THRESHOLD) {
......@@ -305,7 +296,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
ret = bake_persist(bph, rid, 0, len);
if (ret != 0) bake_perror("bake_persist", ret);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);
insert_region_log_entry(vargs->srv_ctx, oid, offset, len, &rid, ts);
} else {
......@@ -327,7 +318,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
insert_small_region_log_entry(vargs->srv_ctx, oid, offset, len, data);
}
LEAVING;
}
......@@ -367,7 +358,8 @@ void write_op_exec_remove(void* u)
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ABT_get_wtime();
lb.timestamp = time(NULL);
lb.seq_id = MOBJECT_SEQ_ID_MAX;
size_t max_segments = 128; // XXX this is a pretty arbitrary number
segment_key_t segment_keys[max_segments];
......@@ -445,14 +437,12 @@ void write_op_exec_truncate(void* u, uint64_t offset)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
}
insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
insert_punch_log_entry(vargs->srv_ctx, oid, offset);
LEAVING;
}
......@@ -461,15 +451,13 @@ void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
insert_zero_log_entry(vargs->srv_ctx, oid, offset, len);
LEAVING;
}
......@@ -611,19 +599,24 @@ static oid_t get_or_create_oid(
}
static void insert_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
struct mobject_server_context* srv_ctx,
oid_t oid, uint64_t offset, uint64_t len,
bake_region_id_t* region, double ts)
bake_region_id_t* region, time_t ts)
{
ENTERING;
sdskv_provider_handle_t sdskv_ph = srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = srv_ctx->segment_db_id;
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.timestamp = ts == 0 ? time(NULL) : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::BAKE_REGION;
int ret = sdskv_put(ph, seg_db_id,
ABT_mutex_lock(srv_ctx->mutex);
seg.seq_id = srv_ctx->seq_id++;
ABT_mutex_unlock(srv_ctx->mutex);
int ret = sdskv_put(sdskv_ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)region, sizeof(*region));
if(ret != SDSKV_SUCCESS) {
......@@ -633,19 +626,24 @@ static void insert_region_log_entry(
}
static void insert_small_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
struct mobject_server_context* srv_ctx,
oid_t oid, uint64_t offset, uint64_t len,
const char* data, double ts)
const char* data, time_t ts)
{
ENTERING;
sdskv_provider_handle_t sdskv_ph = srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = srv_ctx->segment_db_id;
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.timestamp = ts == 0 ? time(NULL) : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::SMALL_REGION;
int ret = sdskv_put(ph, seg_db_id,
ABT_mutex_lock(srv_ctx->mutex);
seg.seq_id = srv_ctx->seq_id++;
ABT_mutex_unlock(srv_ctx->mutex);
int ret = sdskv_put(sdskv_ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)data, len);
if(ret != SDSKV_SUCCESS) {
......@@ -655,18 +653,23 @@ static void insert_small_region_log_entry(
}
static void insert_zero_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len, double ts)
struct mobject_server_context* srv_ctx,
oid_t oid, uint64_t offset, uint64_t len, time_t ts)
{
ENTERING;
sdskv_provider_handle_t sdskv_ph = srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = srv_ctx->segment_db_id;
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.timestamp = ts == 0 ? time(NULL) : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::ZERO;
int ret = sdskv_put(ph, seg_db_id,
ABT_mutex_lock(srv_ctx->mutex);
seg.seq_id = srv_ctx->seq_id++;
ABT_mutex_unlock(srv_ctx->mutex);
int ret = sdskv_put(sdskv_ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)nullptr, 0);
if(ret != SDSKV_SUCCESS) {
......@@ -676,18 +679,23 @@ static void insert_zero_log_entry(
}
static void insert_punch_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, double ts)
struct mobject_server_context* srv_ctx,
oid_t oid, uint64_t offset, time_t ts)
{
ENTERING;
sdskv_provider_handle_t sdskv_ph = srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = srv_ctx->segment_db_id;
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.timestamp = ts == 0 ? time(NULL) : ts;
seg.start_index = offset;
seg.end_index = std::numeric_limits<uint64_t>::max();
seg.type = seg_type_t::TOMBSTONE;
int ret = sdskv_put(ph, seg_db_id,
ABT_mutex_lock(srv_ctx->mutex);
seg.seq_id = srv_ctx->seq_id++;
ABT_mutex_unlock(srv_ctx->mutex);
int ret = sdskv_put(sdskv_ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)nullptr, 0);
if(ret != SDSKV_SUCCESS) {
......@@ -699,12 +707,13 @@ static void insert_punch_log_entry(
uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts)
oid_t oid, time_t ts)
{
ENTERING;
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ts;
lb.seq_id = MOBJECT_SEQ_ID_MAX;
uint64_t size = 0; // current assumed size
uint64_t max_size = std::numeric_limits<uint64_t>::max();
......@@ -758,6 +767,7 @@ uint64_t mobject_compute_object_size(
break;
}
lb.timestamp = seg.timestamp;
lb.seq_id = seg.seq_id;
}
if(num_items != max_segments) {
done = true;
......
......@@ -35,8 +35,10 @@ typedef enum seg_type_t {
typedef struct segment_key_t {
oid_t oid;
time_t timestamp;
//double timestamp;
uint32_t seq_id;
uint32_t type; /* seg_type */
double timestamp;
uint64_t start_index; // first index, included
uint64_t end_index; // end index is not included
} segment_key_t;
......
......@@ -85,10 +85,8 @@ static int seg_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk
if(seg1->oid > seg2->oid) return 1;
if(seg1->timestamp > seg2->timestamp) return -1;
if(seg1->timestamp < seg2->timestamp) return 1;
if(seg1->start_index < seg2->start_index) return -1;
if(seg1->start_index > seg2->start_index) return 1;
if(seg1->end_index < seg2->end_index) return -1;
if(seg1->end_index > seg2->end_index) return 1;
if(seg1->seq_id > seg2->seq_id) return -1;
if(seg1->seq_id < seg2->seq_id) return 1;
return 0;
}
......
......@@ -16,12 +16,15 @@
extern "C" {
#endif
#define MOBJECT_SEQ_ID_MAX UINT32_MAX
struct mobject_server_context
{
/* margo, bake, sds-keyval, ssg state */
/* margo/ABT state */
margo_instance_id mid;
uint16_t provider_id;
ABT_pool pool;
ABT_mutex mutex;
/* ssg-related data */
ssg_group_id_t gid;
/* bake-related data */
......@@ -34,6 +37,7 @@ struct mobject_server_context
sdskv_database_id_t segment_db_id;
sdskv_database_id_t omap_db_id;
/* other data */
uint32_t seq_id;
int ref_count;
};
......
......@@ -67,6 +67,7 @@ int mobject_provider_register(
srv_ctx->provider_id = provider_id;
srv_ctx->pool = pool;
srv_ctx->ref_count = 1;
ABT_mutex_create(&srv_ctx->mutex);
srv_ctx->gid = gid;
my_id = ssg_get_group_self_id(srv_ctx->gid);
......@@ -262,6 +263,7 @@ static void mobject_finalize_cb(void* data)
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
bake_provider_handle_release(srv_ctx->bake_ph);
ABT_mutex_free(&srv_ctx->mutex);
free(srv_ctx);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment