Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
sds-keyval
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
2
Issues
2
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sds
sds-keyval
Commits
6db51622
Commit
6db51622
authored
Jul 17, 2018
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added multi- operations
parent
a506507d
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
1058 additions
and
211 deletions
+1058
-211
Makefile.am
Makefile.am
+7
-1
include/sdskv-client.h
include/sdskv-client.h
+75
-34
src/sdskv-client.c
src/sdskv-client.c
+329
-54
src/sdskv-rpc-types.h
src/sdskv-rpc-types.h
+29
-10
src/sdskv-server.cc
src/sdskv-server.cc
+357
-112
test/multi-test.sh
test/multi-test.sh
+30
-0
test/sdskv-multi-test.cc
test/sdskv-multi-test.cc
+231
-0
No files found.
Makefile.am
View file @
6db51622
...
...
@@ -18,6 +18,7 @@ check_PROGRAMS = test/sdskv-open-test \
test
/sdskv-list-keys-prefix-test
\
test
/sdskv-custom-cmp-test
\
test
/sdskv-migrate-test
\
test
/sdskv-multi-test
\
test
/sdskv-custom-server-daemon
bin_sdskv_server_daemon_SOURCES
=
src/sdskv-server-daemon.c
...
...
@@ -107,7 +108,8 @@ TESTS = test/basic.sh \
test
/list-keyvals-test.sh
\
test
/list-keys-prefix-test.sh
\
test
/migrate-test.sh
\
test
/custom-cmp-test.sh
test
/custom-cmp-test.sh
\
test
/multi-test.sh
TESTS_ENVIRONMENT
=
TIMEOUT
=
"
$(TIMEOUT)
"
\
MKTEMP
=
"
$(MKTEMP)
"
...
...
@@ -157,6 +159,10 @@ test_sdskv_migrate_test_SOURCES = test/sdskv-migrate-test.cc
test_sdskv_migrate_test_DEPENDENCIES
=
lib/libsdskv-client.la
test_sdskv_migrate_test_LDFLAGS
=
-Llib
-lsdskv-client
test_sdskv_multi_test_SOURCES
=
test
/sdskv-multi-test.cc
test_sdskv_multi_test_DEPENDENCIES
=
lib/libsdskv-client.la
test_sdskv_multi_test_LDFLAGS
=
-Llib
-lsdskv-client
pkgconfigdir
=
$(libdir)
/pkgconfig
pkgconfig_DATA
=
maint/sdskv-server.pc
\
maint/sdskv-client.pc
...
...
include/sdskv-client.h
View file @
6db51622
...
...
@@ -104,6 +104,26 @@ int sdskv_put(sdskv_provider_handle_t provider,
const
void
*
key
,
hg_size_t
ksize
,
const
void
*
value
,
hg_size_t
vsize
);
/**
* @brief Puts multiple key/value pairs into the database.
* This method will send all the key/value pairs in batch,
* thus optimizing transfers by avoiding many RPC round trips.
*
* @param provider provider handle managing the database
* @param db_id targeted database id
* @param num number of key/value pairs to put
* @param keys array of keys
* @param ksizes array of key sizes
* @param values array of values
* @param vsizes array of value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int
sdskv_put_multi
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
const
void
**
values
,
const
hg_size_t
*
vsizes
);
/**
* @brief Gets the value associated with a given key.
* vsize needs to be set to the current size of the allocated
...
...
@@ -125,6 +145,34 @@ int sdskv_get(sdskv_provider_handle_t provider,
const
void
*
key
,
hg_size_t
ksize
,
void
*
value
,
hg_size_t
*
vsize
);
/**
* @brief Gets multiple values from the database. The transfers
* will be performed in a single batch. The vsize array should
* initially contain the size of the buffer allocated to receive
* each value. After a successful call, this array will contain
* the actual sizes of the values received. Contrary to sdskv_get,
* this function will not produce an error if one of the keys
* does not exist or if an allocated buffer is too small to hold
* a given value: the corresponding value entry will simply not
* be filled and its size will be set to 0 (so users must have
* another way to distinguish a 0-sized value and a value for
* which there was an error).
*
* @param[in] provider provider handle
* @param[in] db_id database id
* @param[in] num number of keys to retrieve
* @param[in] keys array of keys to retrieve
* @param[in] ksizes size of the keys
* @param[out] values array of allocated memory segments to receive the keys
* @param[inout] vsizes sizes allocated (in) and actual sizes (out)
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int
sdskv_get_multi
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
void
**
values
,
hg_size_t
*
vsizes
);
/**
* @brief Gets the length of a value associated with a given key.
*
...
...
@@ -140,6 +188,26 @@ int sdskv_length(sdskv_provider_handle_t handle,
sdskv_database_id_t
db_id
,
const
void
*
key
,
hg_size_t
ksize
,
hg_size_t
*
vsize
);
/**
* @brief Gets the length of values associated with multiple keys.
* If a particular key does not exists, this function will set the length
* of its value to 0 (so the user needs another way to differenciate
* between a key that does not exists and a 0-sized value).
*
* @param[in] handle provider handle
* @param[in] db_id database id
* @param[in] num number of keys
* @param[in] keys array of keys
* @param[in] ksizes array of key sizes
* @param[out] vsizes array where to put value sizes
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int
sdskv_length_multi
(
sdskv_provider_handle_t
handle
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
hg_size_t
*
vsizes
);
/**
* @brief Checks if the given key exists in the database.
*
...
...
@@ -304,7 +372,7 @@ int sdskv_list_keyvals_with_prefix(
* @param num_keys number of keys
* @param keys array of keys
* @param key_sizes array of key sizes
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
BEFORE, or SDSKV_REMOVE_AFTER
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
...
...
@@ -330,7 +398,7 @@ int sdskv_migrate_keys(
* @param target_db_id target database id
* @param key key to migrate
* @param key_size size of the key
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
BEFORE, or SDSKV_REMOVE_AFTER
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
...
...
@@ -362,8 +430,8 @@ inline int sdskv_migrate_key(
* expressed by the array key_range, which contains two elements.
* key_range[0] must be a lower bound lb.
* key_range[1] must be an upper bound ub.
* The set of keys migrated are within the range
[
lb, ub[ (i.e. lb
*
included, ub is
not included).
* The set of keys migrated are within the range
]
lb, ub[ (i.e. lb
*
and ub
not included).
*
* @param source_provider source provider
* @param source_db_id source database id
...
...
@@ -372,7 +440,7 @@ inline int sdskv_migrate_key(
* @param target_db_id target database id
* @param key_range range of keys to migrate
* @param key_range_sizes size of the keys provided for the range
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
BEFORE, or SDSKV_REMOVE_AFTER
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
...
...
@@ -398,7 +466,7 @@ int sdskv_migrate_key_range(
* @param target_db_id target database id
* @param key_prefix prefix of keys to migrate
* @param key_prefix_size size of the prefix provided
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
BEFORE, or SDSKV_REMOVE_AFTER
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
...
...
@@ -421,7 +489,7 @@ int sdskv_migrate_keys_prefixed(
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
BEFORE, or SDSKV_REMOVE_AFTER
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_
ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
...
...
@@ -433,33 +501,6 @@ int sdskv_migrate_all_keys(
sdskv_database_id_t
target_db_id
,
int
flag
);
/**
* @brief Migrates a database from a source provider
* to a target provider. The difference with sdskv_migrate_all_keys is
* that the target database does not exist yet and the id of the newly
* created database will be returned to the called.
* Contrary to sdskv_migrate_all_keys, if SDSKV_REMOVE_BEFORE or
* SDSKV_REMOVE_AFTER are used as flag, the source database is deleted
* on its provider (while sdskv_migrate_all_keys only removes all the keys,
* leaving the database present).
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id resulting target database id
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int
sdskv_migrate_database
(
sdskv_provider_handle_t
source_provider
,
sdskv_database_id_t
source_db_id
,
const
char
*
target_addr
,
uint16_t
target_provider_id
,
sdskv_database_id_t
*
target_db_id
,
int
flag
);
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
...
...
src/sdskv-client.c
View file @
6db51622
...
...
@@ -7,11 +7,14 @@ struct sdskv_client {
margo_instance_id
mid
;
hg_id_t
sdskv_put_id
;
hg_id_t
sdskv_put_multi_id
;
hg_id_t
sdskv_bulk_put_id
;
hg_id_t
sdskv_get_id
;
hg_id_t
sdskv_get_multi_id
;
hg_id_t
sdskv_exists_id
;
hg_id_t
sdskv_erase_id
;
hg_id_t
sdskv_length_id
;
hg_id_t
sdskv_length_multi_id
;
hg_id_t
sdskv_bulk_get_id
;
hg_id_t
sdskv_open_id
;
hg_id_t
sdskv_list_keys_id
;
...
...
@@ -21,7 +24,6 @@ struct sdskv_client {
hg_id_t
sdskv_migrate_key_range_id
;
hg_id_t
sdskv_migrate_keys_prefixed_id
;
hg_id_t
sdskv_migrate_all_keys_id
;
hg_id_t
sdskv_migrate_database_id
;
uint64_t
num_provider_handles
;
};
...
...
@@ -45,35 +47,43 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
if
(
flag
==
HG_TRUE
)
{
/* RPCs already registered */
margo_registered_name
(
mid
,
"sdskv_put_rpc"
,
&
client
->
sdskv_put_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_put_multi_rpc"
,
&
client
->
sdskv_put_multi_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_bulk_put_rpc"
,
&
client
->
sdskv_bulk_put_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_get_rpc"
,
&
client
->
sdskv_get_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_get_multi_rpc"
,
&
client
->
sdskv_get_multi_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_erase_rpc"
,
&
client
->
sdskv_erase_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_exists_rpc"
,
&
client
->
sdskv_exists_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_length_rpc"
,
&
client
->
sdskv_length_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_length_multi_rpc"
,
&
client
->
sdskv_length_multi_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_bulk_get_rpc"
,
&
client
->
sdskv_bulk_get_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_open_rpc"
,
&
client
->
sdskv_open_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_list_keys_rpc"
,
&
client
->
sdskv_list_keys_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_list_keyvals_rpc"
,
&
client
->
sdskv_list_keyvals_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_keys_rpc"
,
&
client
->
sdskv_migrate_keys_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_key_range_rpc"
,
&
client
->
sdskv_migrate_key_range_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_key_range_rpc"
,
&
client
->
sdskv_migrate_key_range_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_keys_prefixed_rpc"
,
&
client
->
sdskv_migrate_keys_prefixed_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_all_keys_rpc"
,
&
client
->
sdskv_migrate_all_keys_id
,
&
flag
);
margo_registered_name
(
mid
,
"sdskv_migrate_database_rpc"
,
&
client
->
sdskv_migrate_database_id
,
&
flag
);
}
else
{
client
->
sdskv_put_id
=
MARGO_REGISTER
(
mid
,
"sdskv_put_rpc"
,
put_in_t
,
put_out_t
,
NULL
);
client
->
sdskv_put_multi_id
=
MARGO_REGISTER
(
mid
,
"sdskv_put_multi_rpc"
,
put_multi_in_t
,
put_multi_out_t
,
NULL
);
client
->
sdskv_bulk_put_id
=
MARGO_REGISTER
(
mid
,
"sdskv_bulk_put_rpc"
,
bulk_put_in_t
,
bulk_put_out_t
,
NULL
);
client
->
sdskv_get_id
=
MARGO_REGISTER
(
mid
,
"sdskv_get_rpc"
,
get_in_t
,
get_out_t
,
NULL
);
client
->
sdskv_get_multi_id
=
MARGO_REGISTER
(
mid
,
"sdskv_get_multi_rpc"
,
get_multi_in_t
,
get_multi_out_t
,
NULL
);
client
->
sdskv_erase_id
=
MARGO_REGISTER
(
mid
,
"sdskv_erase_rpc"
,
erase_in_t
,
erase_out_t
,
NULL
);
client
->
sdskv_exists_id
=
MARGO_REGISTER
(
mid
,
"sdskv_exists_rpc"
,
exists_in_t
,
exists_out_t
,
NULL
);
client
->
sdskv_length_id
=
MARGO_REGISTER
(
mid
,
"sdskv_length_rpc"
,
length_in_t
,
length_out_t
,
NULL
);
client
->
sdskv_length_multi_id
=
MARGO_REGISTER
(
mid
,
"sdskv_length_multi_rpc"
,
length_multi_in_t
,
length_multi_out_t
,
NULL
);
client
->
sdskv_bulk_get_id
=
MARGO_REGISTER
(
mid
,
"sdskv_bulk_get_rpc"
,
bulk_get_in_t
,
bulk_get_out_t
,
NULL
);
client
->
sdskv_open_id
=
...
...
@@ -90,8 +100,6 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER
(
mid
,
"sdskv_migrate_keys_prefixed_rpc"
,
migrate_keys_prefixed_in_t
,
migrate_keys_out_t
,
NULL
);
client
->
sdskv_migrate_all_keys_id
=
MARGO_REGISTER
(
mid
,
"sdskv_migrate_all_keys_rpc"
,
migrate_all_keys_in_t
,
migrate_keys_out_t
,
NULL
);
client
->
sdskv_migrate_database_id
=
MARGO_REGISTER
(
mid
,
"sdskv_migrate_database_rpc"
,
migrate_database_in_t
,
migrate_database_out_t
,
NULL
);
}
return
SDSKV_SUCCESS
;
...
...
@@ -220,7 +228,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
const
void
*
value
,
hg_size_t
vsize
)
{
hg_return_t
hret
;
int
ret
;
int
ret
=
SDSKV_SUCCESS
;
hg_handle_t
handle
;
hg_size_t
msize
=
ksize
+
vsize
+
2
*
sizeof
(
hg_size_t
);
...
...
@@ -316,7 +324,111 @@ int sdskv_put(sdskv_provider_handle_t provider,
}
margo_destroy
(
handle
);
return
SDSKV_SUCCESS
;
return
ret
;
}
int
sdskv_put_multi
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
const
void
**
values
,
const
hg_size_t
*
vsizes
)
{
hg_return_t
hret
;
int
ret
;
hg_handle_t
handle
=
HG_HANDLE_NULL
;
put_multi_in_t
in
;
put_multi_out_t
out
;
void
**
key_seg_ptrs
=
NULL
;
hg_size_t
*
key_seg_sizes
=
NULL
;
void
**
val_seg_ptrs
=
NULL
;
hg_size_t
*
val_seg_sizes
=
NULL
;
in
.
db_id
=
db_id
;
in
.
num_keys
=
num
;
in
.
keys_bulk_handle
=
HG_BULK_NULL
;
in
.
keys_bulk_size
=
0
;
in
.
vals_bulk_handle
=
HG_BULK_NULL
;
in
.
vals_bulk_size
=
0
;
/* create an array of key sizes and key pointers */
key_seg_sizes
=
malloc
(
sizeof
(
hg_size_t
)
*
(
num
+
1
));
key_seg_sizes
[
0
]
=
num
*
sizeof
(
hg_size_t
);
memcpy
(
key_seg_sizes
+
1
,
ksizes
,
num
*
sizeof
(
hg_size_t
));
key_seg_ptrs
=
malloc
(
sizeof
(
void
*
)
*
(
num
+
1
));
key_seg_ptrs
[
0
]
=
(
void
*
)
ksizes
;
memcpy
(
key_seg_ptrs
+
1
,
keys
,
num
*
sizeof
(
void
*
));
int
i
;
for
(
i
=
0
;
i
<
num
+
1
;
i
++
)
{
in
.
keys_bulk_size
+=
key_seg_sizes
[
i
];
}
/* create an array of val sizes and val pointers */
val_seg_sizes
=
malloc
(
sizeof
(
hg_size_t
)
*
(
num
+
1
));
val_seg_sizes
[
0
]
=
num
*
sizeof
(
hg_size_t
);
memcpy
(
val_seg_sizes
+
1
,
vsizes
,
num
*
sizeof
(
hg_size_t
));
val_seg_ptrs
=
malloc
(
sizeof
(
void
*
)
*
(
num
+
1
));
val_seg_ptrs
[
0
]
=
(
void
*
)
vsizes
;
memcpy
(
val_seg_ptrs
+
1
,
values
,
num
*
sizeof
(
void
*
));
for
(
i
=
0
;
i
<
num
+
1
;
i
++
)
{
in
.
vals_bulk_size
+=
val_seg_sizes
[
i
];
}
/* create the bulk handle to access the keys */
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
+
1
,
key_seg_ptrs
,
key_seg_sizes
,
HG_BULK_READ_ONLY
,
&
in
.
keys_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* create the bulk handle to access the values */
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
+
1
,
val_seg_ptrs
,
val_seg_sizes
,
HG_BULK_READ_ONLY
,
&
in
.
vals_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* create a RPC handle */
hret
=
margo_create
(
provider
->
client
->
mid
,
provider
->
addr
,
provider
->
client
->
sdskv_put_multi_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_create() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* forward the RPC */
hret
=
margo_provider_forward
(
provider
->
provider_id
,
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_forward() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* get the output */
hret
=
margo_get_output
(
handle
,
&
out
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_get_output() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
ret
=
out
.
ret
;
finish:
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
keys_bulk_handle
);
margo_bulk_free
(
in
.
vals_bulk_handle
);
free
(
key_seg_sizes
);
free
(
key_seg_ptrs
);
free
(
val_seg_sizes
);
free
(
val_seg_ptrs
);
margo_destroy
(
handle
);
return
ret
;
}
int
sdskv_get
(
sdskv_provider_handle_t
provider
,
...
...
@@ -422,6 +534,121 @@ int sdskv_get(sdskv_provider_handle_t provider,
return
ret
;
}
int
sdskv_get_multi
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
void
**
values
,
hg_size_t
*
vsizes
)
{
hg_return_t
hret
;
int
ret
;
hg_handle_t
handle
=
HG_HANDLE_NULL
;
get_multi_in_t
in
;
get_multi_out_t
out
;
void
**
key_seg_ptrs
=
NULL
;
hg_size_t
*
key_seg_sizes
=
NULL
;
char
*
vals_buffer
=
NULL
;
in
.
db_id
=
db_id
;
in
.
num_keys
=
num
;
in
.
keys_bulk_handle
=
HG_BULK_NULL
;
in
.
keys_bulk_size
=
0
;
in
.
vals_bulk_handle
=
HG_BULK_NULL
;
in
.
vals_bulk_size
=
0
;
/* create an array of key sizes and key pointers */
key_seg_sizes
=
malloc
(
sizeof
(
hg_size_t
)
*
(
num
+
1
));
key_seg_sizes
[
0
]
=
num
*
sizeof
(
hg_size_t
);
memcpy
(
key_seg_sizes
+
1
,
ksizes
,
num
*
sizeof
(
hg_size_t
));
key_seg_ptrs
=
malloc
(
sizeof
(
void
*
)
*
(
num
+
1
));
key_seg_ptrs
[
0
]
=
(
void
*
)
ksizes
;
memcpy
(
key_seg_ptrs
+
1
,
keys
,
num
*
sizeof
(
void
*
));
int
i
;
for
(
i
=
0
;
i
<
num
+
1
;
i
++
)
{
in
.
keys_bulk_size
+=
key_seg_sizes
[
i
];
}
/* create the bulk handle to access the keys */
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
+
1
,
key_seg_ptrs
,
key_seg_sizes
,
HG_BULK_READ_ONLY
,
&
in
.
keys_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* allocate memory to send max value sizes and receive values */
for
(
i
=
0
;
i
<
num
;
i
++
)
{
in
.
vals_bulk_size
+=
vsizes
[
i
];
}
in
.
vals_bulk_size
+=
sizeof
(
hg_size_t
)
*
num
;
vals_buffer
=
malloc
(
in
.
vals_bulk_size
);
hg_size_t
*
value_sizes
=
(
hg_size_t
*
)
vals_buffer
;
// beginning of the buffer used to hold sizes
for
(
i
=
0
;
i
<
num
;
i
++
)
{
value_sizes
[
i
]
=
vsizes
[
i
];
}
/* create the bulk handle to access the values */
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
,
(
void
**
)
&
vals_buffer
,
&
in
.
vals_bulk_size
,
HG_BULK_READWRITE
,
&
in
.
vals_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* create a RPC handle */
hret
=
margo_create
(
provider
->
client
->
mid
,
provider
->
addr
,
provider
->
client
->
sdskv_get_multi_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* forward the RPC handle */
hret
=
margo_provider_forward
(
provider
->
provider_id
,
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_forward() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* get the response */
hret
=
margo_get_output
(
handle
,
&
out
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_get_output() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
ret
=
out
.
ret
;
if
(
out
.
ret
!=
SDSKV_SUCCESS
)
{
goto
finish
;
}
/* copy the values from the buffer into the user-provided buffer */
char
*
value_ptr
=
vals_buffer
+
num
*
sizeof
(
hg_size_t
);
for
(
i
=
0
;
i
<
num
;
i
++
)
{
memcpy
(
values
[
i
],
value_ptr
,
value_sizes
[
i
]);
vsizes
[
i
]
=
value_sizes
[
i
];
value_ptr
+=
value_sizes
[
i
];
}
finish:
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
keys_bulk_handle
);
margo_bulk_free
(
in
.
vals_bulk_handle
);
free
(
key_seg_sizes
);
free
(
key_seg_ptrs
);
free
(
vals_buffer
);
margo_destroy
(
handle
);
return
ret
;
}
int
sdskv_exists
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
const
void
*
key
,
hg_size_t
ksize
,
int
*
flag
)
...
...
@@ -508,6 +735,99 @@ int sdskv_length(sdskv_provider_handle_t provider,
return
ret
;
}
int
sdskv_length_multi
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
size_t
num
,
const
void
**
keys
,
const
hg_size_t
*
ksizes
,
hg_size_t
*
vsizes
)
{
hg_return_t
hret
;
int
ret
;
hg_handle_t
handle
=
HG_HANDLE_NULL
;
length_multi_in_t
in
;
length_multi_out_t
out
;
void
**
key_seg_ptrs
=
NULL
;
hg_size_t
*
key_seg_sizes
=
NULL
;
in
.
db_id
=
db_id
;
in
.
num_keys
=
num
;
in
.
keys_bulk_handle
=
HG_BULK_NULL
;
in
.
keys_bulk_size
=
0
;
in
.
vals_size_bulk_handle
=
HG_BULK_NULL
;
/* create an array of key sizes and key pointers */
key_seg_sizes
=
malloc
(
sizeof
(
hg_size_t
)
*
(
num
+
1
));
key_seg_sizes
[
0
]
=
num
*
sizeof
(
hg_size_t
);
memcpy
(
key_seg_sizes
+
1
,
ksizes
,
num
*
sizeof
(
hg_size_t
));
key_seg_ptrs
=
malloc
(
sizeof
(
void
*
)
*
(
num
+
1
));
key_seg_ptrs
[
0
]
=
(
void
*
)
ksizes
;
memcpy
(
key_seg_ptrs
+
1
,
keys
,
num
*
sizeof
(
void
*
));
int
i
;
for
(
i
=
0
;
i
<
num
+
1
;
i
++
)
{
in
.
keys_bulk_size
+=
key_seg_sizes
[
i
];
}
/* create the bulk handle to access the keys */
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
+
1
,
key_seg_ptrs
,
key_seg_sizes
,
HG_BULK_READ_ONLY
,
&
in
.
keys_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* create the bulk handle for the server to put the values sizes */
hg_size_t
vals_size_bulk_size
=
num
*
sizeof
(
hg_size_t
);
hret
=
margo_bulk_create
(
provider
->
client
->
mid
,
num
,
(
void
**
)
&
vsizes
,
&
vals_size_bulk_size
,
HG_BULK_WRITE_ONLY
,
&
in
.
vals_size_bulk_handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* create a RPC handle */
hret
=
margo_create
(
provider
->
client
->
mid
,
provider
->
addr
,
provider
->
client
->
sdskv_length_multi_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_create() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* forward the RPC handle */
hret
=
margo_provider_forward
(
provider
->
provider_id
,
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_forward() failed in sdskv_get_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* get the response */
hret
=
margo_get_output
(
handle
,
&
out
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"[SDSKV] margo_get_output() failed in sdskv_put_multi()
\n
"
);
out
.
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
ret
=
out
.
ret
;
if
(
out
.
ret
!=
SDSKV_SUCCESS
)
{
goto
finish
;
}
finish:
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
keys_bulk_handle
);
margo_bulk_free
(
in
.
vals_size_bulk_handle
);
free
(
key_seg_sizes
);
free
(
key_seg_ptrs
);
margo_destroy
(
handle
);
return
ret
;
}
int
sdskv_erase
(
sdskv_provider_handle_t
provider
,
sdskv_database_id_t
db_id
,
const
void
*
key
,
hg_size_t
ksize
)
...
...
@@ -840,10 +1160,10 @@ int sdskv_migrate_keys(
/* create bulk to expose key sizes and keys */
hg_size_t
*
seg_sizes
=
(
hg_size_t
*
)
calloc
(
num_keys
+
1
,
sizeof
(
hg_size_t
));
seg_sizes
[
0
]
=
num_keys
*
sizeof
(
hg_size_t
);
memcpy
(
seg_sizes
,
key_sizes
,
num_keys
*
sizeof
(
hg_size_t
));
memcpy
(
seg_sizes
+
1
,
key_sizes
,
num_keys
*
sizeof
(
hg_size_t
));
void
**
segs
=
(
void
**
)
calloc
(
num_keys
+
1
,
sizeof
(
void
*
));
segs
[
0
]
=
(
void
*
)
key_sizes
;
memcpy
(
segs
,
keys
,
num_keys
*
sizeof
(
void
*
));
memcpy
(
segs
+
1
,
keys
,
num_keys
*
sizeof
(
void
*
));
/* compute the total size of the array */
int
i
;
in
.
bulk_size
=
0
;
...
...
@@ -1044,51 +1364,6 @@ finish:
return
ret
;
}
int
sdskv_migrate_database
(
sdskv_provider_handle_t
source_provider
,
sdskv_database_id_t
source_db_id
,
const
char
*
target_addr
,
uint16_t
target_provider_id
,
sdskv_database_id_t
*
target_db_id
,
int
flag
)
{
int
ret
=
HG_SUCCESS
;
hg_return_t
hret
=
HG_SUCCESS
;
hg_handle_t
handle
;
migrate_database_in_t
in
;
migrate_database_out_t
out
;
in
.
source_db_id
=
source_db_id
;
in
.
target_addr
=
(
hg_string_t
)
target_addr
;
in
.
target_provider_id
=
target_provider_id
;
in
.
flag
=
flag
;
/* create handle */
hret
=
margo_create
(
source_provider
->
client
->
mid
,
source_provider
->
addr
,
source_provider
->
client
->
sdskv_migrate_database_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* forward to provider */
hret
=
margo_provider_forward
(
source_provider
->
provider_id
,
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
/* get the output from provider */
hret
=
margo_get_output
(
handle
,
&
out
);
if
(
hret
!=
HG_SUCCESS
)
{
ret
=
SDSKV_ERR_MERCURY
;
goto
finish
;
}
ret
=
out
.
ret
;
finish:
margo_free_output
(
handle
,
&
out
);
margo_destroy
(
handle
);
return
ret
;
}
int
sdskv_shutdown_service
(
sdskv_client_t
client
,
hg_addr_t
addr
)
{
...
...
src/sdskv-rpc-types.h
View file @
6db51622
...
...
@@ -113,6 +113,35 @@ MERCURY_GEN_PROC(bulk_get_in_t, ((uint64_t)(db_id))\
((
hg_bulk_t
)(
handle
)))
MERCURY_GEN_PROC
(
bulk_get_out_t
,
((
hg_size_t
)(
size
))
((
int32_t
)(
ret
)))
// ------------- PUT MULTI ------------- //
MERCURY_GEN_PROC
(
put_multi_in_t
,
\
((
uint64_t
)(
db_id
))
\
((
hg_size_t
)(
num_keys
))
\
((
hg_bulk_t
)(
keys_bulk_handle
))
\
((
hg_size_t
)(
keys_bulk_size
))
\
((
hg_bulk_t
)(
vals_bulk_handle
))
\
((
hg_size_t
)(
vals_bulk_size
)))
MERCURY_GEN_PROC
(
put_multi_out_t
,
((
int32_t
)(
ret
)))
// ------------- GET MULTI ------------- //
MERCURY_GEN_PROC
(
get_multi_in_t
,
\
((
uint64_t
)(
db_id
))
\
((
hg_size_t
)(
num_keys
))
\
((
hg_bulk_t
)(
keys_bulk_handle
))
\
((
hg_size_t
)(
keys_bulk_size
))
\
((
hg_bulk_t
)(
vals_bulk_handle
))
\
((
hg_size_t
)(
vals_bulk_size
)))
MERCURY_GEN_PROC
(
get_multi_out_t
,
((
int32_t
)(
ret
)))
// ------------- LENGTH MULTI ------------- //
MERCURY_GEN_PROC
(
length_multi_in_t
,
\
((
uint64_t
)(
db_id
))
\
((
hg_size_t
)(
num_keys
))
\
((
hg_bulk_t
)(
keys_bulk_handle
))
\
((
hg_size_t
)(
keys_bulk_size
))
\
((
hg_bulk_t
)(
vals_size_bulk_handle
)))
MERCURY_GEN_PROC
(
length_multi_out_t
,
((
int32_t
)(
ret
)))
// ------------- MIGRATE KEYS ----------- //
MERCURY_GEN_PROC
(
migrate_keys_in_t
,