Commit bcc74e03 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

omap, name, and oid databases working properly

parent a29d8576
...@@ -386,7 +386,8 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, ...@@ -386,7 +386,8 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
if(r != 0) return r; if(r != 0) return r;
r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags); r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags);
return 0; mobject_provider_handle_release(mph);
return r;
} }
// send a shutdown signal to a server cluster // send a shutdown signal to a server cluster
......
...@@ -13,6 +13,11 @@ ...@@ -13,6 +13,11 @@
#include "src/server/core/fake-kv.hpp" #include "src/server/core/fake-kv.hpp"
#include "src/server/core/covermap.hpp" #include "src/server/core/covermap.hpp"
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
static void read_op_exec_begin(void*); static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*); static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*); static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
...@@ -53,16 +58,22 @@ extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_arg ...@@ -53,16 +58,22 @@ extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_arg
void read_op_exec_begin(void* u) void read_op_exec_begin(void* u)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
LEAVING;
} }
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval) void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
// TODO
LEAVING;
} }
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval) void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph; bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid; bake_target_id_t bti = vargs->srv_ctx->bake_tid;
...@@ -88,6 +99,8 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_ ...@@ -88,6 +99,8 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
} }
if(oid == 0) { if(oid == 0) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return; return;
} }
...@@ -116,8 +129,13 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_ ...@@ -116,8 +129,13 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
uint64_t segment_size = r.end - r.start; uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index; uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset; uint64_t remote_offset = r.start - offset;
bake_proxy_read(bph, region, region_offset, remote_bulk, ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size); remote_offset, remote_addr_str, segment_size);
if(ret != 0) {
ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end; if(*bytes_read < r.end) *bytes_read = r.end;
} }
break; break;
...@@ -134,19 +152,36 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_ ...@@ -134,19 +152,36 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
hg_size_t buf_sizes[1] = { segment_size }; hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle; hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle); ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size); ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_free(handle); ret = margo_bulk_free(handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end; if(*bytes_read < r.end) *bytes_read = r.end;
} }
} }
} }
it++; it++;
} }
LEAVING;
} }
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return, void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval) mobject_store_omap_iter_t* iter, int* prval)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name; const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph; sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
...@@ -163,6 +198,8 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r ...@@ -163,6 +198,8 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
} }
if(oid == 0) { if(oid == 0) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
LEAVING;
return; return;
} }
...@@ -173,7 +210,7 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r ...@@ -173,7 +210,7 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
strcpy(lb->key, start_after); strcpy(lb->key, start_after);
hg_size_t max_keys = 10; hg_size_t max_keys = 10;
hg_size_t key_len = 128; hg_size_t key_len = MAX_OMAP_KEY_SIZE+sizeof(omap_key_t);
std::vector<void*> keys(max_keys); std::vector<void*> keys(max_keys);
std::vector<hg_size_t> ksizes(max_keys, key_len); std::vector<hg_size_t> ksizes(max_keys, key_len);
std::vector<std::vector<char>> buffers(max_keys, std::vector<char>(key_len)); std::vector<std::vector<char>> buffers(max_keys, std::vector<char>(key_len));
...@@ -188,6 +225,7 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r ...@@ -188,6 +225,7 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
&keys_retrieved); &keys_retrieved);
if(ret != SDSKV_SUCCESS) { if(ret != SDSKV_SUCCESS) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
break; break;
} }
const char* k; const char* k;
...@@ -206,10 +244,12 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r ...@@ -206,10 +244,12 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
out: out:
free(lb); free(lb);
LEAVING;
} }
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval) void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name; const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph; sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
...@@ -226,18 +266,19 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi ...@@ -226,18 +266,19 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
} }
if(oid == 0) { if(oid == 0) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
LEAVING;
return; return;
} }
hg_size_t max_items = 10; hg_size_t max_items = std::min(max_return, (decltype(max_return))10);
// TODO make this changeable hg_size_t key_len = MAX_OMAP_KEY_SIZE + sizeof(omap_key_t);
hg_size_t key_len = 128 + sizeof(omap_key_t); hg_size_t val_len = MAX_OMAP_VAL_SIZE;
hg_size_t val_len = 256;
omap_iter_create(iter); omap_iter_create(iter);
/* omap_key_t equivalent of start_key */ /* omap_key_t equivalent of start_key */
hg_size_t lb_size = sizeof(omap_key_t)+strlen(start_after); hg_size_t lb_size = key_len;
omap_key_t* lb = (omap_key_t*)calloc(1, lb_size); omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
lb->oid = oid; lb->oid = oid;
strcpy(lb->key, start_after); strcpy(lb->key, start_after);
...@@ -247,11 +288,9 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi ...@@ -247,11 +288,9 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
omap_key_t* prefix = (omap_key_t*)calloc(1, prefix_size); omap_key_t* prefix = (omap_key_t*)calloc(1, prefix_size);
prefix->oid = oid; prefix->oid = oid;
strcpy(prefix->key, filter_prefix); strcpy(prefix->key, filter_prefix);
hg_size_t prefix_actual_size = offsetof(omap_key_t, key)+strlen(filter_prefix);
/* we need the above because the prefix in sdskv is not considered a string */
/* key entry used to navigate the returned keys */
omap_key_t* key = (omap_key_t*)calloc(1, key_len);
key->oid = oid;
/* initialize structures to pass to SDSKV functions */ /* initialize structures to pass to SDSKV functions */
std::vector<void*> keys(max_items); std::vector<void*> keys(max_items);
std::vector<void*> vals(max_items); std::vector<void*> vals(max_items);
...@@ -270,12 +309,13 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi ...@@ -270,12 +309,13 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
ret = sdskv_list_keyvals_with_prefix( ret = sdskv_list_keyvals_with_prefix(
sdskv_ph, omap_db_id, sdskv_ph, omap_db_id,
(const void*)lb, lb_size, (const void*)lb, lb_size,
(const void*)prefix, prefix_size, (const void*)prefix, prefix_actual_size,
keys.data(), ksizes.data(), keys.data(), ksizes.data(),
vals.data(), vsizes.data(), vals.data(), vsizes.data(),
&items_retrieved); &items_retrieved);
if(ret != SDSKV_SUCCESS) { if(ret != SDSKV_SUCCESS) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "sdskv_list_keyvals_with_prefix returned %d\n", ret);
break; break;
} }
const char* k; const char* k;
...@@ -287,19 +327,21 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi ...@@ -287,19 +327,21 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
omap_iter_append(*iter, k, (const char*)vals[i], vsizes[i]); omap_iter_append(*iter, k, (const char*)vals[i], vsizes[i]);
} }
memset(lb, 0, lb_size);
lb->oid = oid;
strcpy(lb->key, k); strcpy(lb->key, k);
lb_size = strlen(k) + sizeof(omap_key_t); //lb_size = strlen(k) + sizeof(omap_key_t);
} while(items_retrieved == max_items && count < max_return); } while(items_retrieved == max_items && count < max_return);
out: out:
free(lb); free(lb);
free(key); LEAVING;
} }
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval) void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name; const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph; sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
...@@ -316,6 +358,8 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t ...@@ -316,6 +358,8 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
} }
if(oid == 0) { if(oid == 0) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
LEAVING;
return; return;
} }
...@@ -325,14 +369,16 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t ...@@ -325,14 +369,16 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
std::vector<hg_size_t> ksizes(num_keys); std::vector<hg_size_t> ksizes(num_keys);
hg_size_t max_ksize = 0; hg_size_t max_ksize = 0;
for(auto i=0; i < num_keys; i++) { for(auto i=0; i < num_keys; i++) {
hg_size_t s = strlen(keys[i])+1; hg_size_t s = offsetof(omap_key_t,key)+strlen(keys[i])+1;
if(s > max_ksize) max_ksize = s; if(s > max_ksize) max_ksize = s;
ksizes[i] = s; ksizes[i] = s;
} }
max_ksize += sizeof(omap_key_t);
omap_key_t* key = (omap_key_t*)calloc(1, sizeof(omap_key_t)+max_ksize); omap_key_t* key = (omap_key_t*)calloc(1, max_ksize);
key->oid = oid;
for(size_t i=0; i < num_keys; i++) { for(size_t i=0; i < num_keys; i++) {
memset(key, 0, max_ksize);
key->oid = oid;
strcpy(key->key, keys[i]); strcpy(key->key, keys[i]);
// get length of the value // get length of the value
hg_size_t vsize; hg_size_t vsize;
...@@ -340,6 +386,7 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t ...@@ -340,6 +386,7 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
(const void*)key, ksizes[i], &vsize); (const void*)key, ksizes[i], &vsize);
if(ret != SDSKV_SUCCESS) { if(ret != SDSKV_SUCCESS) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "sdskv_length returned %d\n", ret);
break; break;
} }
std::vector<char> value(vsize); std::vector<char> value(vsize);
...@@ -348,10 +395,12 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t ...@@ -348,10 +395,12 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
(void*)value.data(), &vsize); (void*)value.data(), &vsize);
if(ret != SDSKV_SUCCESS) { if(ret != SDSKV_SUCCESS) {
*prval = -1; *prval = -1;
ERROR fprintf(stderr, "sdskv_get returned %d\n", ret);
break; break;
} }
omap_iter_append(*iter, keys[i], value.data(), vsize); omap_iter_append(*iter, keys[i], value.data(), vsize);
} }
LEAVING;
} }
void read_op_exec_end(void* u) void read_op_exec_end(void* u)
...@@ -364,9 +413,14 @@ static oid_t get_oid_from_name( ...@@ -364,9 +413,14 @@ static oid_t get_oid_from_name(
sdskv_database_id_t name_db_id, sdskv_database_id_t name_db_id,
const char* name) const char* name)
{ {
ENTERING;
oid_t result = 0; oid_t result = 0;
hg_size_t oid_size = sizeof(result); hg_size_t oid_size = sizeof(result);
int ret = sdskv_get(ph, name_db_id, (const void*)name, strlen(name+1), (void*)&result, &oid_size); int ret = sdskv_get(ph, name_db_id, (const void*)name, strlen(name)+1, (void*)&result, &oid_size);
if(ret != SDSKV_SUCCESS) return 0; if(ret != SDSKV_SUCCESS) result = 0;
if(result == 0) {
ERROR fprintf(stderr,"oid == 0\n");
}
LEAVING;
return result; return result;
} }
...@@ -8,6 +8,11 @@ ...@@ -8,6 +8,11 @@
#include "src/io-chain/write-op-visitor.h" #include "src/io-chain/write-op-visitor.h"
#include "src/server/core/fake-kv.hpp" #include "src/server/core/fake-kv.hpp"
static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
static void write_op_exec_begin(void*); static void write_op_exec_begin(void*);
static void write_op_exec_end(void*); static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int); static void write_op_exec_create(void*, int);
...@@ -66,15 +71,21 @@ void write_op_exec_end(void* u) ...@@ -66,15 +71,21 @@ void write_op_exec_end(void* u)
void write_op_exec_create(void* u, int exclusive) void write_op_exec_create(void* u, int exclusive)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph; sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id; sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id; sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name); oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
}
LEAVING;
} }
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid; oid_t oid = vargs->oid;
if(oid == 0) { if(oid == 0) {
...@@ -82,6 +93,10 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ...@@ -82,6 +93,10 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id; sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id; sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name); oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
return;
}
vargs->oid = oid; vargs->oid = oid;
} }
...@@ -96,8 +111,18 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ...@@ -96,8 +111,18 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
if(len > SMALL_REGION_THRESHOLD) { if(len > SMALL_REGION_THRESHOLD) {
// TODO: check return values of those calls // TODO: check return values of those calls
ret = bake_create(bake_ph, bti, len, &rid); ret = bake_create(bake_ph, bti, len, &rid);
if(ret != 0) {
ERROR fprintf(stderr,"bake_create returned %d\n",ret);
return;
}
ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len); ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
}
ret = bake_persist(bake_ph, rid); ret = bake_persist(bake_ph, rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
}
insert_region_log_entry(oid, offset, len, &rid); insert_region_log_entry(oid, offset, len, &rid);
} else { } else {
...@@ -107,15 +132,26 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ...@@ -107,15 +132,26 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
hg_size_t buf_sizes[1] = {len}; hg_size_t buf_sizes[1] = {len};
hg_bulk_t handle; hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle); ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_create returned %d\n", ret);
}
ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len); ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_transfer returned %d\n", ret);
}
ret = margo_bulk_free(handle); ret = margo_bulk_free(handle);
if(ret != 0) {
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(oid, offset, len, data); insert_small_region_log_entry(oid, offset, len, data);
} }
LEAVING;
} }
void write_op_exec_write_full(void* u, buffer_u buf, size_t len) void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{ {
ENTERING;
// TODO: this function will not be valid if the new object is // TODO: this function will not be valid if the new object is
// smaller than its previous version. Instead we should remove the object // smaller than its previous version. Instead we should remove the object
// and re-create it. // and re-create it.
...@@ -139,18 +175,17 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len) ...@@ -139,18 +175,17 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
int ret; int ret;
unsigned i; unsigned i;
fprintf(stderr, "In Mobject, input bti is ");
for(i=0; i<16; i++) fprintf(stderr, "%d ", bti.id[i]);
fprintf(stderr, "\n");
// TODO: check return values of those calls // TODO: check return values of those calls
ret = bake_create(bph, bti, len, &rid); ret = bake_create(bph, bti, len, &rid);
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len); ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_persist(bph, rid); ret = bake_persist(bph, rid);
insert_region_log_entry(oid, 0, len, &rid); insert_region_log_entry(oid, 0, len, &rid);
LEAVING;
} }
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset) void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{ {
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u); auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid; oid_t oid = vargs->oid;
if(oid == 0) { if(oid == 0) {
...@@ -169,11 +204,6 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ ...@@ -169,11 +204,6 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
hg_addr_t remote_addr = vargs->client_addr; hg_addr_t remote_addr = vargs->client_addr;
int ret; int ret;
unsigned ii;
fprintf(stderr, "In Mobject, input bti is ");
for(ii=0; ii<16; ii++) fprintf(stderr, "%d ", bti.id[ii]);
fprintf(stderr, "\n");
// TODO: check return values of those calls // TODO: check return values of those calls