Commit 8f27f837 authored by Matthieu Dorier's avatar Matthieu Dorier

correction

parent 786a2896
...@@ -163,7 +163,7 @@ int BerkeleyDBDataStore::put_multi(size_t num_items, ...@@ -163,7 +163,7 @@ int BerkeleyDBDataStore::put_multi(size_t num_items,
size_t sv = 0; size_t sv = 0;
for(unsigned i = 0; i < num_items; i++) { for(unsigned i = 0; i < num_items; i++) {
sk += ksizes[i]; sk += ksizes[i];
sv += vsizes[i]; sv += vsizes[i]+8;
} }
sk *= 2; sk *= 2;
sv *= 2; sv *= 2;
...@@ -178,12 +178,11 @@ int BerkeleyDBDataStore::put_multi(size_t num_items, ...@@ -178,12 +178,11 @@ int BerkeleyDBDataStore::put_multi(size_t num_items,
mkey.set_ulen(kbuffer.size()); mkey.set_ulen(kbuffer.size());
mkey.set_data(kbuffer.data()); mkey.set_data(kbuffer.data());
mkey.set_flags(DB_DBT_USERMEM);
mdata.set_ulen(vbuffer.size()); mdata.set_ulen(vbuffer.size());
mdata.set_data(vbuffer.data()); mdata.set_data(vbuffer.data());
mdata.set_flags(DB_DBT_USERMEM);
std::cerr << "mkey has length " << kbuffer.size() << std::endl;
std::cerr << "mdata has length " << vbuffer.size() << std::endl;
DbMultipleDataBuilder keybuilder(mkey); DbMultipleDataBuilder keybuilder(mkey);
DbMultipleDataBuilder databuilder(mdata); DbMultipleDataBuilder databuilder(mdata);
...@@ -195,9 +194,9 @@ int BerkeleyDBDataStore::put_multi(size_t num_items, ...@@ -195,9 +194,9 @@ int BerkeleyDBDataStore::put_multi(size_t num_items,
int flag = DB_MULTIPLE; int flag = DB_MULTIPLE;
if(!_no_overwrite) flag |= DB_OVERWRITE_DUP; if(!_no_overwrite) flag |= DB_OVERWRITE_DUP;
int status = _dbm->put(NULL, &mkey, &mdata, flag); int status = _dbm->put(NULL, &mkey, &mdata, flag);
if(status == 0) return SDSKV_SUCCESS;
if(status == DB_KEYEXIST) return SDSKV_ERR_KEYEXISTS; if(status == DB_KEYEXIST) return SDSKV_ERR_KEYEXISTS;
return SDSKV_ERR_PUT; if(status != 0) return SDSKV_ERR_PUT;
return SDSKV_SUCCESS;
} }
bool BerkeleyDBDataStore::exists(const void* key, size_t size) const { bool BerkeleyDBDataStore::exists(const void* key, size_t size) const {
......
...@@ -78,9 +78,15 @@ class MapDataStore : public AbstractDataStore { ...@@ -78,9 +78,15 @@ class MapDataStore : public AbstractDataStore {
} }
virtual int put(const void* key, size_t ksize, const void* value, size_t vsize) override { virtual int put(const void* key, size_t ksize, const void* value, size_t vsize) override {
ds_bulk_t k((const char*)key, ((const char*)key)+ksize); if(vsize != 0) {
ds_bulk_t v((const char*)value, ((const char*)value)+vsize); ds_bulk_t k((const char*)key, ((const char*)key)+ksize);
return put(std::move(k), std::move(v)); ds_bulk_t v((const char*)value, ((const char*)value)+vsize);
return put(std::move(k), std::move(v));
} else {
ds_bulk_t k((const char*)key, ((const char*)key)+ksize);
ds_bulk_t v;
return put(std::move(k), std::move(v));
}
} }
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override { virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override {
......
...@@ -476,6 +476,20 @@ int sdskv_put_multi(sdskv_provider_handle_t provider, ...@@ -476,6 +476,20 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
in.vals_bulk_handle = HG_BULK_NULL; in.vals_bulk_handle = HG_BULK_NULL;
in.vals_bulk_size = 0; in.vals_bulk_size = 0;
/* check that none of the keys have a size of 0 */
int i;
for(i=0; i < num; i++) {
if(ksizes[i] == 0) return SDSKV_ERR_INVALID_ARG;
}
int non_empty_values = 0;
/* check if we are trying to write some empty values */
/* XXX normally we shouldn't have to do that but Mercury
bugs if we try to expose empty segments for RDMA. */
for(i=0; i < num; i++) {
if(vsizes[i] != 0) non_empty_values += 1;
}
/* create an array of key sizes and key pointers */ /* create an array of key sizes and key pointers */
key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1)); key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1));
key_seg_sizes[0] = num*sizeof(hg_size_t); key_seg_sizes[0] = num*sizeof(hg_size_t);
...@@ -483,18 +497,28 @@ int sdskv_put_multi(sdskv_provider_handle_t provider, ...@@ -483,18 +497,28 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
key_seg_ptrs = malloc(sizeof(void*)*(num+1)); key_seg_ptrs = malloc(sizeof(void*)*(num+1));
key_seg_ptrs[0] = (void*)ksizes; key_seg_ptrs[0] = (void*)ksizes;
memcpy(key_seg_ptrs+1, keys, num*sizeof(void*)); memcpy(key_seg_ptrs+1, keys, num*sizeof(void*));
int i;
for(i=0; i < num+1; i++) { for(i=0; i < num+1; i++) {
in.keys_bulk_size += key_seg_sizes[i]; in.keys_bulk_size += key_seg_sizes[i];
} }
/* create an array of val sizes and val pointers */ val_seg_sizes = malloc(sizeof(hg_size_t)*(non_empty_values+1));
val_seg_sizes = malloc(sizeof(hg_size_t)*(num+1)); val_seg_sizes[0] = num*sizeof(hg_size_t);
val_seg_sizes[0] = num*sizeof(hg_size_t); int j = 1;
memcpy(val_seg_sizes+1, vsizes, num*sizeof(hg_size_t)); for(i=0; i < num; i++) {
val_seg_ptrs = malloc(sizeof(void*)*(num+1)); if(vsizes[i] != 0) {
val_seg_ptrs[0] = (void*)vsizes; val_seg_sizes[j] = vsizes[i];
memcpy(val_seg_ptrs+1, values, num*sizeof(void*)); j++;
for(i=0; i < num+1; i++) { }
}
val_seg_ptrs = malloc(sizeof(void*)*(non_empty_values+1));
val_seg_ptrs[0] = (void*)vsizes;
j = 1;
for(i=0; i < num; i++) {
if(vsizes[i] != 0) {
val_seg_ptrs[j] = (void*)values[i];
j++;
}
}
for(i=0; i < non_empty_values+1; i++) {
in.vals_bulk_size += val_seg_sizes[i]; in.vals_bulk_size += val_seg_sizes[i];
} }
...@@ -508,7 +532,7 @@ int sdskv_put_multi(sdskv_provider_handle_t provider, ...@@ -508,7 +532,7 @@ int sdskv_put_multi(sdskv_provider_handle_t provider,
} }
/* create the bulk handle to access the values */ /* create the bulk handle to access the values */
hret = margo_bulk_create(provider->client->mid, num+1, val_seg_ptrs, val_seg_sizes, hret = margo_bulk_create(provider->client->mid, non_empty_values+1, val_seg_ptrs, val_seg_sizes,
HG_BULK_READ_ONLY, &in.vals_bulk_handle); HG_BULK_READ_ONLY, &in.vals_bulk_handle);
if(hret != HG_SUCCESS) { if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()\n"); fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()\n");
......
...@@ -777,7 +777,7 @@ static void sdskv_put_multi_ult(hg_handle_t handle) ...@@ -777,7 +777,7 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
std::vector<const void*> vptrs(in.num_keys); std::vector<const void*> vptrs(in.num_keys);
for(unsigned i=0; i < in.num_keys; i++) { for(unsigned i=0; i < in.num_keys; i++) {
kptrs[i] = local_keys_buffer.data()+keys_offset; kptrs[i] = local_keys_buffer.data()+keys_offset;
vptrs[i] = local_vals_buffer.data()+vals_offset; vptrs[i] = val_sizes[i] == 0 ? nullptr : local_vals_buffer.data()+vals_offset;
keys_offset += key_sizes[i]; keys_offset += key_sizes[i];
vals_offset += val_sizes[i]; vals_offset += val_sizes[i];
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment