Commit a7c8ccb6 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

finished and testes keyval list retrieval

parent 2813e7e9
......@@ -5,16 +5,15 @@ CLIENT_LiBS=@CLIENT_LIBS@
AM_CPPFLAGS = -I${srcdir}/src -I${srcdir}/include
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-erase-test \
test/sdskv-list-keys-test
#test/sdskv-list-keyvals-test
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-erase-test \
test/sdskv-list-keys-test \
test/sdskv-list-keyvals-test
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
......@@ -96,9 +95,8 @@ TESTS = test/basic.sh \
test/length-test.sh \
test/get-test.sh \
test/erase-test.sh \
test/list-keys-test.sh
# test/list-keyvals-test.sh
test/list-keys-test.sh \
test/list-keyvals-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
MKTEMP="$(MKTEMP)"
......@@ -123,9 +121,9 @@ test_sdskv_list_keys_test_SOURCES = test/sdskv-list-keys-test.cc
test_sdskv_list_keys_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_keys_test_LDFLAGS = -Llib -lsdskv-client
#test_sdskv_list_keyvals_test_SOURCES = test/sdskv-list-kv-test.cc
#test_sdskv_list_keyvals_test_DEPENDENCIES = lib/libsdskv-client.la
#test_sdskv_list_keyvals_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_list_keyvals_test_SOURCES = test/sdskv-list-kv-test.cc
test_sdskv_list_keyvals_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_keyvals_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_erase_test_SOURCES = test/sdskv-erase-test.cc
test_sdskv_erase_test_DEPENDENCIES = lib/libsdskv-client.la
......
......@@ -65,8 +65,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_list_keys_id =
MARGO_REGISTER(mid, "sdskv_list_keys_rpc", list_keys_in_t, list_keys_out_t, NULL);
//client->sdskv_list_keyvals_id =
// MARGO_REGISTER(mid, "sdskv_list_keyvals_rpc", list_in_t, list_out_t, NULL);
client->sdskv_list_keyvals_id =
MARGO_REGISTER(mid, "sdskv_list_keyvals_rpc", list_keyvals_in_t, list_keyvals_out_t, NULL);
}
return 0;
......@@ -624,6 +624,130 @@ finish:
return ret;
}
int sdskv_list_keyvals(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
void **keys, // pointer to an array of void* pointers,
// this array has size *max_keys
hg_size_t* ksizes, // pointer to an array of hg_size_t sizes
// representing sizes allocated in
// keys for each key
void **values, // pointer to an array of void* pointers,
// this array has size *max_keys
hg_size_t* vsizes, // pointer to an array of hg_size_t sizes
// representing sizes allocated in
// values for each value
hg_size_t* max_keys) // maximum number of keys requested
{
list_keyvals_in_t in;
list_keyvals_out_t out;
in.keys_bulk_handle = HG_BULK_NULL;
in.ksizes_bulk_handle = HG_BULK_NULL;
in.vals_bulk_handle = HG_BULK_NULL;
in.vsizes_bulk_handle = HG_BULK_NULL;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
int ret = 0;
int i;
in.db_id = db_id;
in.start_key = (kv_data_t) start_key;
in.start_ksize = start_ksize;
in.max_keys = *max_keys;
/* create bulk handle to expose the segments with key sizes */
hg_size_t ksize_bulk_size = (*max_keys)*sizeof(*ksizes);
void* ksizes_buf_ptr[1] = { ksizes };
hret = margo_bulk_create(provider->client->mid,
1, ksizes_buf_ptr, &ksize_bulk_size,
HG_BULK_READWRITE,
&in.ksizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create bulk handle to expose the segments with value sizes */
hg_size_t vsize_bulk_size = (*max_keys)*sizeof(*vsizes);
void* vsizes_buf_ptr[1] = { vsizes };
hret = margo_bulk_create(provider->client->mid,
1, vsizes_buf_ptr, &ksize_bulk_size,
HG_BULK_READWRITE,
&in.vsizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create bulk handle to expose where the keys should be placed */
hret = margo_bulk_create(provider->client->mid,
*max_keys, keys, ksizes,
HG_BULK_WRITE_ONLY,
&in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create bulk handle to expose where the keys should be placed */
hret = margo_bulk_create(provider->client->mid,
*max_keys, values, vsizes,
HG_BULK_WRITE_ONLY,
&in.vals_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_keyvals_id,
&handle);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* get the output from provider */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
ret = -1;
goto finish;
}
/* set return values */
*max_keys = out.nkeys;
ret = out.ret;
finish:
/* free everything we created */
margo_bulk_free(in.ksizes_bulk_handle);
margo_bulk_free(in.keys_bulk_handle);
margo_bulk_free(in.vsizes_bulk_handle);
margo_bulk_free(in.vals_bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
#if 0
int sdskv_list_keyvals(
sdskv_provider_handle_t provider,
......
......@@ -296,6 +296,59 @@ static inline hg_return_t hg_proc_list_keys_in_t(hg_proc_t proc, void *data)
return ret;
}
// ------------- LIST KEYVALS ------------- //
typedef struct {
uint64_t db_id;
kv_data_t start_key;
hg_size_t start_ksize;
hg_size_t max_keys;
hg_bulk_t ksizes_bulk_handle;
hg_bulk_t keys_bulk_handle;
hg_bulk_t vsizes_bulk_handle;
hg_bulk_t vals_bulk_handle;
} list_keyvals_in_t;
MERCURY_GEN_PROC(list_keyvals_out_t, ((hg_size_t)(nkeys)) ((int32_t)(ret)))
static inline hg_return_t hg_proc_list_keyvals_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
list_keyvals_in_t *in = (list_keyvals_in_t*)data;
ret = hg_proc_uint64_t(proc, &in->db_id);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &in->start_ksize);
if(ret != HG_SUCCESS) return ret;
if (in->start_ksize) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->max_keys);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_bulk_t(proc, &in->ksizes_bulk_handle);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_bulk_t(proc, &in->keys_bulk_handle);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_bulk_t(proc, &in->vsizes_bulk_handle);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_bulk_t(proc, &in->vals_bulk_handle);
return ret;
}
// ------------- BULK ------------- //
// for handling bulk puts/gets (e.g. for ParSplice use case)
......
......@@ -85,10 +85,10 @@ extern "C" int sdskv_provider_register(
list_keys_in_t, list_keys_out_t,
sdskv_list_keys_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
// rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keyvals_rpc",
// list_in_t, list_out_t,
// sdskv_list_keyvals_ult, mplex_id, abt_pool);
// margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keyvals_rpc",
list_keyvals_in_t, list_keyvals_out_t,
sdskv_list_keyvals_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
sdskv_erase_ult, mplex_id, abt_pool);
......@@ -671,6 +671,7 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
throw -1;
}
/* receive the key sizes from the client */
hg_addr_t origin_addr = info->addr;
hret = margo_bulk_transfer(mid, HG_BULK_PULL, origin_addr,
......@@ -681,6 +682,9 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
throw -1;
}
/* make a copy of the remote key sizes */
std::vector<hg_size_t> remote_ksizes(ksizes.begin(), ksizes.end());
/* get the keys from the underlying database */
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keys = db->list_keys(start_kdata, in.max_keys);
......@@ -724,13 +728,240 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
}
/* transfer the keys to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, 0, keys_local_bulk, 0, keys_bulk_size);
uint64_t remote_offset = 0;
uint64_t local_offset = 0;
for(unsigned i = 0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw -1;
}
remote_offset += remote_ksizes[i];
local_offset += true_ksizes[i];
}
out.nkeys = num_keys;
out.ret = 0;
} catch(int exc_no) {
out.ret = exc_no;
}
margo_bulk_free(ksizes_local_bulk);
margo_bulk_free(keys_local_bulk);
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
static void sdskv_list_keyvals_ult(hg_handle_t handle)
{
hg_return_t hret;
list_keyvals_in_t in;
list_keyvals_out_t out;
hg_bulk_t ksizes_local_bulk = HG_BULK_NULL;
hg_bulk_t keys_local_bulk = HG_BULK_NULL;
hg_bulk_t vsizes_local_bulk = HG_BULK_NULL;
hg_bulk_t vals_local_bulk = HG_BULK_NULL;
out.ret = -1;
out.nkeys = 0;
/* get the provider handling this request */
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
std::cerr << "Error: SDSKV list_keyvals could not find provider" << std::endl;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not get RPC input" << std::endl;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
try {
/* find the database targeted */
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
std::cerr << "Error: SDSKV list_keyvals could not get database with id " << in.db_id << std::endl;
throw -1;
}
auto db = it->second;
/* create a bulk handle to receive and send key sizes from client */
std::vector<hg_size_t> ksizes(in.max_keys);
std::vector<void*> ksizes_addr(1);
ksizes_addr[0] = (void*)ksizes.data();
hg_size_t ksizes_bulk_size = ksizes.size()*sizeof(hg_size_t);
hret = margo_bulk_create(mid, 1, ksizes_addr.data(),
&ksizes_bulk_size, HG_BULK_READWRITE, &ksizes_local_bulk);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not create bulk handle (ksizes_local_bulk)" << std::endl;
throw -1;
}
/* create a bulk handle to receive and send value sizes from client */
std::vector<hg_size_t> vsizes(in.max_keys);
std::vector<void*> vsizes_addr(1);
vsizes_addr[0] = (void*)vsizes.data();
hg_size_t vsizes_bulk_size = vsizes.size()*sizeof(hg_size_t);
hret = margo_bulk_create(mid, 1, vsizes_addr.data(),
&vsizes_bulk_size, HG_BULK_READWRITE, &vsizes_local_bulk);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keys could not issue bulk transfer (keys_local_bulk)" << std::endl;
std::cerr << "Error: SDSKV list_keyvals could not create bulk handle (vsizes_local_bulk)" << std::endl;
throw -1;
}
/* receive the key sizes from the client */
hg_addr_t origin_addr = info->addr;
hret = margo_bulk_transfer(mid, HG_BULK_PULL, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(pull from in.ksizes_bulk_handle to ksizes_local_bulk)" << std::endl;
throw -1;
}
/* receive the values sizes from the client */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, origin_addr,
in.vsizes_bulk_handle, 0, vsizes_local_bulk, 0, vsizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(pull from in.vsizes_bulk_handle to vsizes_local_bulk)" << std::endl;
throw -1;
}
/* make a copy of the remote key sizes and value sizes */
std::vector<hg_size_t> remote_ksizes(ksizes.begin(), ksizes.end());
std::vector<hg_size_t> remote_vsizes(vsizes.begin(), vsizes.end());
/* get the keys and values from the underlying database */
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keyvals = db->list_keyvals(start_kdata, in.max_keys);
hg_size_t num_keys = std::min(keyvals.size(), in.max_keys);
/* create the array of actual key sizes */
std::vector<hg_size_t> true_ksizes(num_keys);
hg_size_t keys_bulk_size = 0;
for(unsigned i = 0; i < num_keys; i++) {
true_ksizes[i] = keyvals[i].first.size();
if(true_ksizes[i] > ksizes[i]) {
// this key has a size that exceeds the allocated size on client
throw -1;
} else {
ksizes[i] = true_ksizes[i];
keys_bulk_size += ksizes[i];
}
}
/* create the array of actual value sizes */
std::vector<hg_size_t> true_vsizes(num_keys);
hg_size_t vals_bulk_size = 0;
for(unsigned i = 0; i < num_keys; i++) {
true_vsizes[i] = keyvals[i].second.size();
if(true_vsizes[i] > vsizes[i]) {
// this value has a size that exceeds the allocated size on client
throw -1;
} else {
vsizes[i] = true_vsizes[i];
vals_bulk_size += vsizes[i];
}
}
/* create an array of addresses pointing to keys */
std::vector<void*> keys_addr(num_keys);
for(unsigned i=0; i < num_keys; i++) {
keys_addr[i] = (void*)(keyvals[i].first.data());
}
/* create an array of addresses pointing to values */
std::vector<void*> vals_addr(num_keys);
for(unsigned i=0; i < num_keys; i++) {
vals_addr[i] = (void*)(keyvals[i].second.data());
}
/* expose the keys for bulk transfer */
hret = margo_bulk_create(mid, num_keys, keys_addr.data(),
true_ksizes.data(), HG_BULK_READ_ONLY, &keys_local_bulk);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not create bulk handle (keys_local_bulk)" << std::endl;
throw -1;
}
/* expose the values for bulk transfer */
hret = margo_bulk_create(mid, num_keys, vals_addr.data(),
true_vsizes.data(), HG_BULK_READ_ONLY, &vals_local_bulk);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not create bulk handle (vals_local_bulk)" << std::endl;
throw -1;
}
/* transfer the ksizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.ksizes_bulk_handle, 0, ksizes_local_bulk, 0, ksizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from ksizes_local_bulk to in.ksizes_bulk_handle)" << std::endl;
throw -1;
}
/* transfer the vsizes back to the client */
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vsizes_bulk_handle, 0, vsizes_local_bulk, 0, vsizes_bulk_size);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer "
<< "(push from vsizes_local_bulk to in.vsizes_bulk_handle)" << std::endl;
throw -1;
}
uint64_t remote_offset = 0;
uint64_t local_offset = 0;
/* transfer the keys to the client */
for(unsigned i=0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.keys_bulk_handle, remote_offset, keys_local_bulk, local_offset, true_ksizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (keys_local_bulk)" << std::endl;
throw -1;
}
remote_offset += remote_ksizes[i];
local_offset += true_ksizes[i];
}
remote_offset = 0;
local_offset = 0;
/* transfer the values to the client */
for(unsigned i=0; i < num_keys; i++) {
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, origin_addr,
in.vals_bulk_handle, remote_offset, vals_local_bulk, local_offset, true_vsizes[i]);
if(hret != HG_SUCCESS) {
std::cerr << "Error: SDSKV list_keyvals could not issue bulk transfer (vals_local_bulk)" << std::endl;
throw -1;
}
remote_offset += remote_vsizes[i];
local_offset += true_vsizes[i];
}
out.nkeys = num_keys;
out.ret = 0;
......@@ -740,13 +971,15 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
margo_bulk_free(ksizes_local_bulk);
margo_bulk_free(keys_local_bulk);
margo_bulk_free(vsizes_local_bulk);
margo_bulk_free(vals_local_bulk);
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
#if 0
static void sdskv_list_keyvals_ult(hg_handle_t handle)
......
......@@ -102,10 +102,11 @@ int main(int argc, char *argv[])
/* **** put keys ***** */
std::vector<std::string> keys;
size_t max_value_size = 8000;
size_t max_key_size = 16;
size_t max_value_size = 16;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
auto k = gen_random_string((max_key_size + (rand()%max_key_size))/2);
// half of the entries will be put using bulk
auto v = gen_random_string(i*max_value_size/num_keys);
ret = sdskv_put(kvph, db_id,
......@@ -121,6 +122,7 @@ int main(int argc, char *argv[])
return -1;
}
keys.push_back(k);
std::cerr << k << "\t ===> " << v << std::endl;
}
printf("Successfuly inserted %d keys\n", num_keys);
......@@ -132,9 +134,9 @@ int main(int argc, char *argv[])
auto keys_after = keys[i1-1];
hg_size_t max_keys = i2-i1;
std::vector<std::vector<char>> result_strings(max_keys, std::vector<char>(16+1));
std::vector<std::vector<char>> result_strings(max_keys, std::vector<char>(max_key_size+1));
std::vector<void*> list_result(max_keys);
std::vector<hg_size_t> ksizes(max_keys, 16+1);
std::vector<hg_size_t> ksizes(max_keys, max_key_size+1);
for(unsigned i=0; i<max_keys; i++) {
list_result[i] = (void*)result_strings[i].data();
......
......@@ -103,10 +103,11 @@ int main(int argc, char *argv[])
/* **** put keys ***** */
std::vector<std::string> keys;
std::map<std::string, std::string> reference;
size_t max_value_size = 8000;
size_t max_value_size = 16;
size_t max_key_size = 16;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
auto k = gen_random_string((max_key_size+(rand()%max_key_size))/2);
// half of the entries will be put using bulk
auto v = gen_random_string(i*max_value_size/num_keys);
ret = sdskv_put(kvph, db_id,
......@@ -123,6 +124,7 @@ int main(int argc, char *argv[])
}
keys.push_back(k);
reference[k] = v;
std::cout << k << " ===> " << v << std::endl;
}
printf("Successfuly inserted %d keys\n", num_keys);
......@@ -131,16 +133,16 @@ int main(int argc, char *argv[])
auto i1 = keys.size()/3;
auto i2 = 2*keys.size()/3;
auto keys_after = keys[i1-1];
std::vector<std::vector<char>> key_strings(i2-i1, std::vector<char>(16+1));
std::vector<std::vector<char>> val_strings(i2-i1, std::vector<char>(max_value_size+1));
std::vector<void*> keys_addr(i2-i1);
std::vector<void*> vals_addr(i2-i1);
std::vector<hg_size_t> ksizes(i2-i1, 16+1);