From 6db5162230007632995c960c08fae6cd9403857b Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Tue, 17 Jul 2018 18:02:02 +0200 Subject: [PATCH] added multi- operations --- Makefile.am | 8 +- include/sdskv-client.h | 109 ++++++--- src/sdskv-client.c | 383 +++++++++++++++++++++++++++----- src/sdskv-rpc-types.h | 39 +++- src/sdskv-server.cc | 469 +++++++++++++++++++++++++++++---------- test/multi-test.sh | 30 +++ test/sdskv-multi-test.cc | 231 +++++++++++++++++++ 7 files changed, 1058 insertions(+), 211 deletions(-) create mode 100755 test/multi-test.sh create mode 100644 test/sdskv-multi-test.cc diff --git a/Makefile.am b/Makefile.am index add4157..af7a298 100644 --- a/Makefile.am +++ b/Makefile.am @@ -18,6 +18,7 @@ check_PROGRAMS = test/sdskv-open-test \ test/sdskv-list-keys-prefix-test \ test/sdskv-custom-cmp-test \ test/sdskv-migrate-test \ + test/sdskv-multi-test \ test/sdskv-custom-server-daemon bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c @@ -107,7 +108,8 @@ TESTS = test/basic.sh \ test/list-keyvals-test.sh \ test/list-keys-prefix-test.sh \ test/migrate-test.sh \ - test/custom-cmp-test.sh + test/custom-cmp-test.sh \ + test/multi-test.sh TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \ MKTEMP="$(MKTEMP)" @@ -157,6 +159,10 @@ test_sdskv_migrate_test_SOURCES = test/sdskv-migrate-test.cc test_sdskv_migrate_test_DEPENDENCIES = lib/libsdskv-client.la test_sdskv_migrate_test_LDFLAGS = -Llib -lsdskv-client +test_sdskv_multi_test_SOURCES = test/sdskv-multi-test.cc +test_sdskv_multi_test_DEPENDENCIES = lib/libsdskv-client.la +test_sdskv_multi_test_LDFLAGS = -Llib -lsdskv-client + pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = maint/sdskv-server.pc \ maint/sdskv-client.pc diff --git a/include/sdskv-client.h b/include/sdskv-client.h index 638c878..eda024a 100644 --- a/include/sdskv-client.h +++ b/include/sdskv-client.h @@ -104,6 +104,26 @@ int sdskv_put(sdskv_provider_handle_t provider, const void *key, hg_size_t ksize, const void *value, hg_size_t vsize); +/** + * @brief Puts multiple key/value pairs into the database. + * This method will send all the key/value pairs in batch, + * thus optimizing transfers by avoiding many RPC round trips. + * + * @param provider provider handle managing the database + * @param db_id targeted database id + * @param num number of key/value pairs to put + * @param keys array of keys + * @param ksizes array of key sizes + * @param values array of values + * @param vsizes array of value sizes + * + * @return SDSKV_SUCCESS or error code defined in sdskv-common.h + */ +int sdskv_put_multi(sdskv_provider_handle_t provider, + sdskv_database_id_t db_id, + size_t num, const void** keys, const hg_size_t* ksizes, + const void** values, const hg_size_t *vsizes); + /** * @brief Gets the value associated with a given key. * vsize needs to be set to the current size of the allocated @@ -125,6 +145,34 @@ int sdskv_get(sdskv_provider_handle_t provider, const void *key, hg_size_t ksize, void *value, hg_size_t* vsize); +/** + * @brief Gets multiple values from the database. The transfers + * will be performed in a single batch. The vsize array should + * initially contain the size of the buffer allocated to receive + * each value. After a successful call, this array will contain + * the actual sizes of the values received. Contrary to sdskv_get, + * this function will not produce an error if one of the keys + * does not exist or if an allocated buffer is too small to hold + * a given value: the corresponding value entry will simply not + * be filled and its size will be set to 0 (so users must have + * another way to distinguish a 0-sized value and a value for + * which there was an error). + * + * @param[in] provider provider handle + * @param[in] db_id database id + * @param[in] num number of keys to retrieve + * @param[in] keys array of keys to retrieve + * @param[in] ksizes size of the keys + * @param[out] values array of allocated memory segments to receive the keys + * @param[inout] vsizes sizes allocated (in) and actual sizes (out) + * + * @return SDSKV_SUCCESS or error code defined in sdskv-common.h + */ +int sdskv_get_multi(sdskv_provider_handle_t provider, + sdskv_database_id_t db_id, + size_t num, const void** keys, const hg_size_t* ksizes, + void** values, hg_size_t *vsizes); + /** * @brief Gets the length of a value associated with a given key. * @@ -140,6 +188,26 @@ int sdskv_length(sdskv_provider_handle_t handle, sdskv_database_id_t db_id, const void *key, hg_size_t ksize, hg_size_t* vsize); +/** + * @brief Gets the length of values associated with multiple keys. + * If a particular key does not exists, this function will set the length + * of its value to 0 (so the user needs another way to differenciate + * between a key that does not exists and a 0-sized value). + * + * @param[in] handle provider handle + * @param[in] db_id database id + * @param[in] num number of keys + * @param[in] keys array of keys + * @param[in] ksizes array of key sizes + * @param[out] vsizes array where to put value sizes + * + * @return SDSKV_SUCCESS or error code defined in sdskv-common.h + */ +int sdskv_length_multi(sdskv_provider_handle_t handle, + sdskv_database_id_t db_id, size_t num, + const void** keys, const hg_size_t* ksizes, + hg_size_t *vsizes); + /** * @brief Checks if the given key exists in the database. * @@ -304,7 +372,7 @@ int sdskv_list_keyvals_with_prefix( * @param num_keys number of keys * @param keys array of keys * @param key_sizes array of key sizes - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER + * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL * * @return SDSKV_SUCCESS or error code defined in sdskv-common.h */ @@ -330,7 +398,7 @@ int sdskv_migrate_keys( * @param target_db_id target database id * @param key key to migrate * @param key_size size of the key - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER + * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL * * @return SDSKV_SUCCESS or error code defined in sdskv-common.h */ @@ -362,8 +430,8 @@ inline int sdskv_migrate_key( * expressed by the array key_range, which contains two elements. * key_range[0] must be a lower bound lb. * key_range[1] must be an upper bound ub. - * The set of keys migrated are within the range [lb, ub[ (i.e. lb - * included, ub is not included). + * The set of keys migrated are within the range ]lb, ub[ (i.e. lb + * and ub not included). * * @param source_provider source provider * @param source_db_id source database id @@ -372,7 +440,7 @@ inline int sdskv_migrate_key( * @param target_db_id target database id * @param key_range range of keys to migrate * @param key_range_sizes size of the keys provided for the range - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER + * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL * * @return SDSKV_SUCCESS or error code defined in sdskv-common.h */ @@ -398,7 +466,7 @@ int sdskv_migrate_key_range( * @param target_db_id target database id * @param key_prefix prefix of keys to migrate * @param key_prefix_size size of the prefix provided - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER + * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL * * @return SDSKV_SUCCESS or error code defined in sdskv-common.h */ @@ -421,7 +489,7 @@ int sdskv_migrate_keys_prefixed( * @param target_addr target address * @param target_provider_id target provider id * @param target_db_id target database id - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER + * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL * * @return SDSKV_SUCCESS or error code defined in sdskv-common.h */ @@ -433,33 +501,6 @@ int sdskv_migrate_all_keys( sdskv_database_id_t target_db_id, int flag); -/** - * @brief Migrates a database from a source provider - * to a target provider. The difference with sdskv_migrate_all_keys is - * that the target database does not exist yet and the id of the newly - * created database will be returned to the called. - * Contrary to sdskv_migrate_all_keys, if SDSKV_REMOVE_BEFORE or - * SDSKV_REMOVE_AFTER are used as flag, the source database is deleted - * on its provider (while sdskv_migrate_all_keys only removes all the keys, - * leaving the database present). - * - * @param source_provider source provider - * @param source_db_id source database id - * @param target_addr target address - * @param target_provider_id target provider id - * @param target_db_id resulting target database id - * @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER - * - * @return SDSKV_SUCCESS or error code defined in sdskv-common.h - */ -int sdskv_migrate_database( - sdskv_provider_handle_t source_provider, - sdskv_database_id_t source_db_id, - const char* target_addr, - uint16_t target_provider_id, - sdskv_database_id_t* target_db_id, - int flag); - /** * Shuts down a remote SDSKV service (given an address). * This will shutdown all the providers on the target address. diff --git a/src/sdskv-client.c b/src/sdskv-client.c index 2d1e8ed..c4a3054 100644 --- a/src/sdskv-client.c +++ b/src/sdskv-client.c @@ -7,11 +7,14 @@ struct sdskv_client { margo_instance_id mid; hg_id_t sdskv_put_id; + hg_id_t sdskv_put_multi_id; hg_id_t sdskv_bulk_put_id; hg_id_t sdskv_get_id; + hg_id_t sdskv_get_multi_id; hg_id_t sdskv_exists_id; hg_id_t sdskv_erase_id; hg_id_t sdskv_length_id; + hg_id_t sdskv_length_multi_id; hg_id_t sdskv_bulk_get_id; hg_id_t sdskv_open_id; hg_id_t sdskv_list_keys_id; @@ -21,7 +24,6 @@ struct sdskv_client { hg_id_t sdskv_migrate_key_range_id; hg_id_t sdskv_migrate_keys_prefixed_id; hg_id_t sdskv_migrate_all_keys_id; - hg_id_t sdskv_migrate_database_id; uint64_t num_provider_handles; }; @@ -45,35 +47,43 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid) if(flag == HG_TRUE) { /* RPCs already registered */ margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag); + margo_registered_name(mid, "sdskv_put_multi_rpc", &client->sdskv_put_multi_id, &flag); margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag); margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag); + margo_registered_name(mid, "sdskv_get_multi_rpc", &client->sdskv_get_multi_id, &flag); margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag); margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag); margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag); + margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag); margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag); margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag); margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag); margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag); margo_registered_name(mid, "sdskv_migrate_keys_rpc", &client->sdskv_migrate_keys_id, &flag); - margo_registered_name(mid, "sdskv_migrate_key_range_rpc", &client->sdskv_migrate_key_range_id, &flag); + margo_registered_name(mid, "sdskv_migrate_key_range_rpc", &client->sdskv_migrate_key_range_id, &flag); margo_registered_name(mid, "sdskv_migrate_keys_prefixed_rpc", &client->sdskv_migrate_keys_prefixed_id, &flag); margo_registered_name(mid, "sdskv_migrate_all_keys_rpc", &client->sdskv_migrate_all_keys_id, &flag); - margo_registered_name(mid, "sdskv_migrate_database_rpc", &client->sdskv_migrate_database_id, &flag); } else { client->sdskv_put_id = MARGO_REGISTER(mid, "sdskv_put_rpc", put_in_t, put_out_t, NULL); + client->sdskv_put_multi_id = + MARGO_REGISTER(mid, "sdskv_put_multi_rpc", put_multi_in_t, put_multi_out_t, NULL); client->sdskv_bulk_put_id = MARGO_REGISTER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, NULL); client->sdskv_get_id = MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL); + client->sdskv_get_multi_id = + MARGO_REGISTER(mid, "sdskv_get_multi_rpc", get_multi_in_t, get_multi_out_t, NULL); client->sdskv_erase_id = MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL); client->sdskv_exists_id = MARGO_REGISTER(mid, "sdskv_exists_rpc", exists_in_t, exists_out_t, NULL); client->sdskv_length_id = MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL); + client->sdskv_length_multi_id = + MARGO_REGISTER(mid, "sdskv_length_multi_rpc", length_multi_in_t, length_multi_out_t, NULL); client->sdskv_bulk_get_id = MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL); client->sdskv_open_id = @@ -90,8 +100,6 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid) MARGO_REGISTER(mid, "sdskv_migrate_keys_prefixed_rpc", migrate_keys_prefixed_in_t, migrate_keys_out_t, NULL); client->sdskv_migrate_all_keys_id = MARGO_REGISTER(mid, "sdskv_migrate_all_keys_rpc", migrate_all_keys_in_t, migrate_keys_out_t, NULL); - client->sdskv_migrate_database_id = - MARGO_REGISTER(mid, "sdskv_migrate_database_rpc", migrate_database_in_t, migrate_database_out_t, NULL); } return SDSKV_SUCCESS; @@ -220,7 +228,7 @@ int sdskv_put(sdskv_provider_handle_t provider, const void *value, hg_size_t vsize) { hg_return_t hret; - int ret; + int ret = SDSKV_SUCCESS; hg_handle_t handle; hg_size_t msize = ksize + vsize + 2*sizeof(hg_size_t); @@ -316,7 +324,111 @@ int sdskv_put(sdskv_provider_handle_t provider, } margo_destroy(handle); - return SDSKV_SUCCESS; + return ret; +} + +int sdskv_put_multi(sdskv_provider_handle_t provider, + sdskv_database_id_t db_id, + size_t num, const void** keys, const hg_size_t* ksizes, + const void** values, const hg_size_t *vsizes) +{ + hg_return_t hret; + int ret; + hg_handle_t handle = HG_HANDLE_NULL; + put_multi_in_t in; + put_multi_out_t out; + void** key_seg_ptrs = NULL; + hg_size_t* key_seg_sizes = NULL; + void** val_seg_ptrs = NULL; + hg_size_t* val_seg_sizes = NULL; + + in.db_id = db_id; + in.num_keys = num; + in.keys_bulk_handle = HG_BULK_NULL; + in.keys_bulk_size = 0; + in.vals_bulk_handle = HG_BULK_NULL; + in.vals_bulk_size = 0; + + /* create an array of key sizes and key pointers */ + key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1)); + key_seg_sizes[0] = num*sizeof(hg_size_t); + memcpy(key_seg_sizes+1, ksizes, num*sizeof(hg_size_t)); + key_seg_ptrs = malloc(sizeof(void*)*(num+1)); + key_seg_ptrs[0] = (void*)ksizes; + memcpy(key_seg_ptrs+1, keys, num*sizeof(void*)); + int i; + for(i=0; i < num+1; i++) { + in.keys_bulk_size += key_seg_sizes[i]; + } + /* create an array of val sizes and val pointers */ + val_seg_sizes = malloc(sizeof(hg_size_t)*(num+1)); + val_seg_sizes[0] = num*sizeof(hg_size_t); + memcpy(val_seg_sizes+1, vsizes, num*sizeof(hg_size_t)); + val_seg_ptrs = malloc(sizeof(void*)*(num+1)); + val_seg_ptrs[0] = (void*)vsizes; + memcpy(val_seg_ptrs+1, values, num*sizeof(void*)); + for(i=0; i < num+1; i++) { + in.vals_bulk_size += val_seg_sizes[i]; + } + + /* create the bulk handle to access the keys */ + hret = margo_bulk_create(provider->client->mid, num+1, key_seg_ptrs, key_seg_sizes, + HG_BULK_READ_ONLY, &in.keys_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* create the bulk handle to access the values */ + hret = margo_bulk_create(provider->client->mid, num+1, val_seg_ptrs, val_seg_sizes, + HG_BULK_READ_ONLY, &in.vals_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* create a RPC handle */ + hret = margo_create( + provider->client->mid, + provider->addr, + provider->client->sdskv_put_multi_id, + &handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* forward the RPC */ + hret = margo_provider_forward(provider->provider_id, handle, &in); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* get the output */ + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + ret = out.ret; + +finish: + margo_free_output(handle, &out); + margo_bulk_free(in.keys_bulk_handle); + margo_bulk_free(in.vals_bulk_handle); + free(key_seg_sizes); + free(key_seg_ptrs); + free(val_seg_sizes); + free(val_seg_ptrs); + margo_destroy(handle); + return ret; } int sdskv_get(sdskv_provider_handle_t provider, @@ -422,6 +534,121 @@ int sdskv_get(sdskv_provider_handle_t provider, return ret; } +int sdskv_get_multi(sdskv_provider_handle_t provider, + sdskv_database_id_t db_id, + size_t num, const void** keys, const hg_size_t* ksizes, + void** values, hg_size_t *vsizes) +{ + hg_return_t hret; + int ret; + hg_handle_t handle = HG_HANDLE_NULL; + get_multi_in_t in; + get_multi_out_t out; + void** key_seg_ptrs = NULL; + hg_size_t* key_seg_sizes = NULL; + char* vals_buffer = NULL; + + in.db_id = db_id; + in.num_keys = num; + in.keys_bulk_handle = HG_BULK_NULL; + in.keys_bulk_size = 0; + in.vals_bulk_handle = HG_BULK_NULL; + in.vals_bulk_size = 0; + + /* create an array of key sizes and key pointers */ + key_seg_sizes = malloc(sizeof(hg_size_t)*(num+1)); + key_seg_sizes[0] = num*sizeof(hg_size_t); + memcpy(key_seg_sizes+1, ksizes, num*sizeof(hg_size_t)); + key_seg_ptrs = malloc(sizeof(void*)*(num+1)); + key_seg_ptrs[0] = (void*)ksizes; + memcpy(key_seg_ptrs+1, keys, num*sizeof(void*)); + + int i; + for(i=0; iclient->mid, num+1, key_seg_ptrs, key_seg_sizes, + HG_BULK_READ_ONLY, &in.keys_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* allocate memory to send max value sizes and receive values */ + for(i=0; iclient->mid, num, (void**)&vals_buffer, &in.vals_bulk_size, + HG_BULK_READWRITE, &in.vals_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* create a RPC handle */ + hret = margo_create( + provider->client->mid, + provider->addr, + provider->client->sdskv_get_multi_id, + &handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* forward the RPC handle */ + hret = margo_provider_forward(provider->provider_id, handle, &in); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* get the response */ + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + ret = out.ret; + if(out.ret != SDSKV_SUCCESS) { + goto finish; + } + + /* copy the values from the buffer into the user-provided buffer */ + char* value_ptr = vals_buffer + num*sizeof(hg_size_t); + for(i=0; iclient->mid, num+1, key_seg_ptrs, key_seg_sizes, + HG_BULK_READ_ONLY, &in.keys_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* create the bulk handle for the server to put the values sizes */ + hg_size_t vals_size_bulk_size = num*sizeof(hg_size_t); + hret = margo_bulk_create(provider->client->mid, num, (void**)&vsizes, &vals_size_bulk_size, + HG_BULK_WRITE_ONLY, &in.vals_size_bulk_handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* create a RPC handle */ + hret = margo_create( + provider->client->mid, + provider->addr, + provider->client->sdskv_length_multi_id, + &handle); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* forward the RPC handle */ + hret = margo_provider_forward(provider->provider_id, handle, &in); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_get_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + /* get the response */ + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) { + fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put_multi()\n"); + out.ret = SDSKV_ERR_MERCURY; + goto finish; + } + + ret = out.ret; + if(out.ret != SDSKV_SUCCESS) { + goto finish; + } + +finish: + margo_free_output(handle, &out); + margo_bulk_free(in.keys_bulk_handle); + margo_bulk_free(in.vals_size_bulk_handle); + free(key_seg_sizes); + free(key_seg_ptrs); + margo_destroy(handle); + return ret; +} + int sdskv_erase(sdskv_provider_handle_t provider, sdskv_database_id_t db_id, const void *key, hg_size_t ksize) @@ -840,10 +1160,10 @@ int sdskv_migrate_keys( /* create bulk to expose key sizes and keys */ hg_size_t* seg_sizes = (hg_size_t*)calloc(num_keys+1, sizeof(hg_size_t)); seg_sizes[0] = num_keys*sizeof(hg_size_t); - memcpy(seg_sizes, key_sizes, num_keys*sizeof(hg_size_t)); + memcpy(seg_sizes+1, key_sizes, num_keys*sizeof(hg_size_t)); void** segs = (void**)calloc(num_keys+1, sizeof(void*)); segs[0] = (void*)key_sizes; - memcpy(segs, keys, num_keys*sizeof(void*)); + memcpy(segs+1, keys, num_keys*sizeof(void*)); /* compute the total size of the array */ int i; in.bulk_size = 0; @@ -1044,51 +1364,6 @@ finish: return ret; } -int sdskv_migrate_database( - sdskv_provider_handle_t source_provider, - sdskv_database_id_t source_db_id, - const char* target_addr, - uint16_t target_provider_id, - sdskv_database_id_t* target_db_id, - int flag) -{ - int ret = HG_SUCCESS; - hg_return_t hret = HG_SUCCESS; - hg_handle_t handle; - migrate_database_in_t in; - migrate_database_out_t out; - in.source_db_id = source_db_id; - in.target_addr = (hg_string_t)target_addr; - in.target_provider_id = target_provider_id; - in.flag = flag; - /* create handle */ - hret = margo_create( - source_provider->client->mid, - source_provider->addr, - source_provider->client->sdskv_migrate_database_id, - &handle); - if(hret != HG_SUCCESS) { - ret = SDSKV_ERR_MERCURY; - goto finish; - } - /* forward to provider */ - hret = margo_provider_forward(source_provider->provider_id, handle, &in); - if(hret != HG_SUCCESS) { - ret = SDSKV_ERR_MERCURY; - goto finish; - } - /* get the output from provider */ - hret = margo_get_output(handle, &out); - if(hret != HG_SUCCESS) { - ret = SDSKV_ERR_MERCURY; - goto finish; - } - ret = out.ret; -finish: - margo_free_output(handle, &out); - margo_destroy(handle); - return ret; -} int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr) { diff --git a/src/sdskv-rpc-types.h b/src/sdskv-rpc-types.h index 39b9faf..9446874 100644 --- a/src/sdskv-rpc-types.h +++ b/src/sdskv-rpc-types.h @@ -113,6 +113,35 @@ MERCURY_GEN_PROC(bulk_get_in_t, ((uint64_t)(db_id))\ ((hg_bulk_t)(handle))) MERCURY_GEN_PROC(bulk_get_out_t, ((hg_size_t)(size)) ((int32_t)(ret))) +// ------------- PUT MULTI ------------- // +MERCURY_GEN_PROC(put_multi_in_t, \ + ((uint64_t)(db_id))\ + ((hg_size_t)(num_keys))\ + ((hg_bulk_t)(keys_bulk_handle))\ + ((hg_size_t)(keys_bulk_size))\ + ((hg_bulk_t)(vals_bulk_handle))\ + ((hg_size_t)(vals_bulk_size))) +MERCURY_GEN_PROC(put_multi_out_t, ((int32_t)(ret))) + +// ------------- GET MULTI ------------- // +MERCURY_GEN_PROC(get_multi_in_t, \ + ((uint64_t)(db_id))\ + ((hg_size_t)(num_keys))\ + ((hg_bulk_t)(keys_bulk_handle))\ + ((hg_size_t)(keys_bulk_size))\ + ((hg_bulk_t)(vals_bulk_handle))\ + ((hg_size_t)(vals_bulk_size))) +MERCURY_GEN_PROC(get_multi_out_t, ((int32_t)(ret))) + +// ------------- LENGTH MULTI ------------- // +MERCURY_GEN_PROC(length_multi_in_t, \ + ((uint64_t)(db_id))\ + ((hg_size_t)(num_keys))\ + ((hg_bulk_t)(keys_bulk_handle))\ + ((hg_size_t)(keys_bulk_size))\ + ((hg_bulk_t)(vals_size_bulk_handle))) +MERCURY_GEN_PROC(length_multi_out_t, ((int32_t)(ret))) + // ------------- MIGRATE KEYS ----------- // MERCURY_GEN_PROC(migrate_keys_in_t, ((uint64_t)(source_db_id))\ @@ -154,15 +183,5 @@ MERCURY_GEN_PROC(migrate_all_keys_in_t, ((uint64_t)(target_db_id))\ ((int32_t)(flag))) -// ------------- MIGRATE DATABASE ----------- // -MERCURY_GEN_PROC(migrate_database_in_t, - ((uint64_t)(source_db_id))\ - ((hg_string_t)(target_addr))\ - ((uint16_t)(target_provider_id))\ - ((int32_t)(flag))) - -MERCURY_GEN_PROC(migrate_database_out_t, - ((int32_t)(ret))\ - ((uint64_t)(db_id))) #endif diff --git a/src/sdskv-server.cc b/src/sdskv-server.cc index 9df4e2b..e22ba04 100644 --- a/src/sdskv-server.cc +++ b/src/sdskv-server.cc @@ -19,11 +19,14 @@ struct sdskv_server_context_t std::map id2name; hg_id_t sdskv_put_id; + hg_id_t sdskv_put_multi_id; hg_id_t sdskv_bulk_put_id; hg_id_t sdskv_get_id; + hg_id_t sdskv_get_multi_id; hg_id_t sdskv_exists_id; hg_id_t sdskv_erase_id; hg_id_t sdskv_length_id; + hg_id_t sdskv_length_multi_id; hg_id_t sdskv_bulk_get_id; hg_id_t sdskv_open_id; hg_id_t sdskv_list_keys_id; @@ -49,8 +52,11 @@ inline scoped_call at_exit(F&& f) { } DECLARE_MARGO_RPC_HANDLER(sdskv_put_ult) +DECLARE_MARGO_RPC_HANDLER(sdskv_put_multi_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_length_ult) +DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult) +DECLARE_MARGO_RPC_HANDLER(sdskv_get_multi_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult) @@ -62,14 +68,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_key_range_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_prefixed_ult) DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_all_keys_ult) -DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult) -/* -DECLARE_MARGO_RPC_HANDLER(__migrate_keys_ult) -DECLARE_MARGO_RPC_HANDLER(__migrate_key_range_ult) -DECLARE_MARGO_RPC_HANDLER(__migrate_keys_prefixed_ult) -DECLARE_MARGO_RPC_HANDLER(__migrate_all_keys_ult) -DECLARE_MARGO_RPC_HANDLER(__migrate_database_ult) -*/ + static void sdskv_server_finalize_cb(void *data); extern "C" int sdskv_provider_register( @@ -103,6 +102,11 @@ extern "C" int sdskv_provider_register( sdskv_put_ult, provider_id, abt_pool); tmp_svr_ctx->sdskv_put_id = rpc_id; margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); + rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_multi_rpc", + put_multi_in_t, put_multi_out_t, + sdskv_put_multi_ult, provider_id, abt_pool); + tmp_svr_ctx->sdskv_put_multi_id = rpc_id; + margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, sdskv_bulk_put_ult, provider_id, abt_pool); @@ -113,11 +117,21 @@ extern "C" int sdskv_provider_register( sdskv_get_ult, provider_id, abt_pool); tmp_svr_ctx->sdskv_get_id = rpc_id; margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); + rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_get_multi_rpc", + get_multi_in_t, get_multi_out_t, + sdskv_get_multi_ult, provider_id, abt_pool); + tmp_svr_ctx->sdskv_get_multi_id = rpc_id; + margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_rpc", length_in_t, length_out_t, sdskv_length_ult, provider_id, abt_pool); tmp_svr_ctx->sdskv_length_id = rpc_id; margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); + rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_multi_rpc", + length_multi_in_t, length_multi_out_t, + sdskv_length_multi_ult, provider_id, abt_pool); + tmp_svr_ctx->sdskv_length_multi_id = rpc_id; + margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_rpc", exists_in_t, exists_out_t, sdskv_exists_ult, provider_id, abt_pool); @@ -169,33 +183,6 @@ extern "C" int sdskv_provider_register( sdskv_migrate_all_keys_ult, provider_id, abt_pool); tmp_svr_ctx->sdskv_migrate_all_keys_id = rpc_id; margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_database_rpc", - migrate_database_in_t, migrate_database_out_t, - sdskv_migrate_database_ult, provider_id, abt_pool); - tmp_svr_ctx->sdskv_migrate_database_id = rpc_id; - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - /* - rpc_id = margo_register_provider(mid, "__migrate_keys_rpc", - __migrate_keys_in_t, __migrate_keys_out_t, - sdskv_migrate_keys_ult, provider_id, abt_pool); - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - rpc_id = margo_register_provider(mid, "sdskv_migrate_key_range_rpc", - migrate_key_range_in_t, migrate_keys_out_t, - sdskv_migrate_key_range_ult, provider_id, abt_pool); - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - rpc_id = margo_register_provider(mid, "sdskv_migrate_keys_prefixed_rpc", - migrate_prefixed_keys_in_t, migrate_keys_out_t, - sdskv_migrate_prefixed_keys_ult, provider_id, abt_pool); - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - rpc_id = margo_register_provider(mid, "sdskv_migrate_all_keys_rpc", - migrate_all_keys_in_t, migrate_keys_out_t, - sdskv_migrate_all_keys_ult, provider_id, abt_pool); - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - rpc_id = margo_register_provider(mid, "sdskv_migrate_database_rpc", - migrate_database_in_t, migrate_database_out_t, - sdskv_migrate_database_ult, provider_id, abt_pool); - margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); - */ /* install the bake server finalize callback */ margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx); @@ -306,7 +293,7 @@ static void sdskv_put_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_put_ult): SDSKV could not find provider\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -345,9 +332,105 @@ static void sdskv_put_ult(hg_handle_t handle) } DEFINE_MARGO_RPC_HANDLER(sdskv_put_ult) -static void sdskv_length_ult(hg_handle_t handle) +static void sdskv_put_multi_ult(hg_handle_t handle) { + hg_return_t hret; + put_multi_in_t in; + put_multi_out_t out; + out.ret = SDSKV_SUCCESS; + std::vector local_keys_buffer; + std::vector local_vals_buffer; + hg_bulk_t local_keys_bulk_handle; + hg_bulk_t local_vals_bulk_handle; + + auto r1 = at_exit([&handle]() { margo_destroy(handle); }); + auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); }); + + + margo_instance_id mid = margo_hg_handle_get_instance(handle); + const struct hg_info* info = margo_get_info(handle); + sdskv_provider_t svr_ctx = + (sdskv_provider_t)margo_registered_data(mid, info->id); + if(!svr_ctx) { + out.ret = SDSKV_ERR_UNKNOWN_PR; + return; + } + hret = margo_get_input(handle, &in); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); }); + + auto it = svr_ctx->databases.find(in.db_id); + if(it == svr_ctx->databases.end()) { + out.ret = SDSKV_ERR_UNKNOWN_DB; + return; + } + + // allocate a buffer to receive the keys and a buffer to receive the values + local_keys_buffer.resize(in.keys_bulk_size); + local_vals_buffer.resize(in.vals_bulk_size); + std::vector keys_addr(1); keys_addr[0] = (void*)local_keys_buffer.data(); + std::vector vals_addr(1); vals_addr[0] = (void*)local_vals_buffer.data(); + + /* create bulk handle to receive keys */ + hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size, + HG_BULK_WRITE_ONLY, &local_keys_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); }); + + /* create bulk handle to receive values */ + hret = margo_bulk_create(mid, 1, vals_addr.data(), &in.vals_bulk_size, + HG_BULK_WRITE_ONLY, &local_vals_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r7 = at_exit([&local_vals_bulk_handle]() { margo_bulk_free(local_vals_bulk_handle); }); + + /* transfer keys */ + hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0, + local_keys_bulk_handle, 0, in.keys_bulk_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + /* transfer values */ + hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.vals_bulk_handle, 0, + local_vals_bulk_handle, 0, in.vals_bulk_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + /* interpret beginning of the key buffer as a list of key sizes */ + hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data(); + /* interpret beginning of the value buffer as a list of value sizes */ + hg_size_t* val_sizes = (hg_size_t*)local_vals_buffer.data(); + + /* go through the key/value pairs and insert them */ + uint64_t keys_offset = sizeof(hg_size_t)*in.num_keys; + uint64_t vals_offset = sizeof(hg_size_t)*in.num_keys; + for(unsigned i=0; i < in.num_keys; i++) { + ds_bulk_t kdata(local_keys_buffer.data()+keys_offset, local_keys_buffer.data()+keys_offset+key_sizes[i]); + ds_bulk_t vdata(local_vals_buffer.data()+vals_offset, local_vals_buffer.data()+vals_offset+val_sizes[i]); + it->second->put(kdata, vdata); + keys_offset += key_sizes[i]; + vals_offset += val_sizes[i]; + } + + return; +} +DEFINE_MARGO_RPC_HANDLER(sdskv_put_multi_ult) + +static void sdskv_length_ult(hg_handle_t handle) +{ hg_return_t hret; length_in_t in; length_out_t out; @@ -469,6 +552,236 @@ static void sdskv_get_ult(hg_handle_t handle) } DEFINE_MARGO_RPC_HANDLER(sdskv_get_ult) +static void sdskv_get_multi_ult(hg_handle_t handle) +{ + + hg_return_t hret; + get_multi_in_t in; + get_multi_out_t out; + out.ret = SDSKV_SUCCESS; + std::vector local_keys_buffer; + std::vector local_vals_buffer; + hg_bulk_t local_keys_bulk_handle; + hg_bulk_t local_vals_bulk_handle; + + auto r1 = at_exit([&handle]() { margo_destroy(handle); }); + auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); }); + + /* get margo instance and provider */ + margo_instance_id mid = margo_hg_handle_get_instance(handle); + const struct hg_info* info = margo_get_info(handle); + sdskv_provider_t svr_ctx = + (sdskv_provider_t)margo_registered_data(mid, info->id); + if(!svr_ctx) { + out.ret = SDSKV_ERR_UNKNOWN_PR; + return; + } + + /* deserialize input */ + hret = margo_get_input(handle, &in); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); }); + + /* find the target database */ + auto it = svr_ctx->databases.find(in.db_id); + if(it == svr_ctx->databases.end()) { + out.ret = SDSKV_ERR_UNKNOWN_DB; + return; + } + + /* allocate buffers to receive the keys */ + local_keys_buffer.resize(in.keys_bulk_size); + std::vector keys_addr(1); + keys_addr[0] = (void*)local_keys_buffer.data(); + + /* create bulk handle to receive key sizes and packed keys */ + hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size, + HG_BULK_WRITE_ONLY, &local_keys_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); }); + + /* allocate buffer to send/receive the values */ + local_vals_buffer.resize(in.vals_bulk_size); + std::vector vals_addr(1); + vals_addr[0] = (void*)local_vals_buffer.data(); + + /* create bulk handle to receive max value sizes and to send values */ + hret = margo_bulk_create(mid, 1, vals_addr.data(), &in.vals_bulk_size, + HG_BULK_READWRITE, &local_vals_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r7 = at_exit([&local_vals_bulk_handle]() { margo_bulk_free(local_vals_bulk_handle); }); + + /* transfer keys */ + hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0, + local_keys_bulk_handle, 0, in.keys_bulk_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + /* transfer sizes allocated by user for the values (beginning of value segment) */ + hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.vals_bulk_handle, 0, + local_vals_bulk_handle, 0, in.num_keys*sizeof(hg_size_t)); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + /* interpret beginning of the key buffer as a list of key sizes */ + hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data(); + /* find beginning of packed keys */ + char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t); + /* interpret beginning of the value buffer as a list of value sizes */ + hg_size_t* val_sizes = (hg_size_t*)local_vals_buffer.data(); + /* find beginning of region where to pack values */ + char* packed_values = local_vals_buffer.data() + in.num_keys*sizeof(hg_size_t); + + /* go through the key/value pairs and get the values from the database */ + for(unsigned i=0; i < in.num_keys; i++) { + ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]); + ds_bulk_t vdata; + size_t client_allocated_value_size = val_sizes[i]; + if(it->second->get(kdata, vdata)) { + if(vdata.size() > val_sizes[i]) { + val_sizes[i] = 0; + } else { + val_sizes[i] = vdata.size(); + memcpy(packed_values, vdata.data(), val_sizes[i]); + } + } else { + val_sizes[i] = 0; + } + packed_keys += key_sizes[i]; + packed_values += client_allocated_value_size; + } + + /* do a PUSH operation to push back the values to the client */ + hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.vals_bulk_handle, 0, + local_vals_bulk_handle, 0, in.vals_bulk_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + return; +} +DEFINE_MARGO_RPC_HANDLER(sdskv_get_multi_ult) + +static void sdskv_length_multi_ult(hg_handle_t handle) +{ + + hg_return_t hret; + length_multi_in_t in; + length_multi_out_t out; + out.ret = SDSKV_SUCCESS; + std::vector local_keys_buffer; + std::vector local_vals_size_buffer; + hg_bulk_t local_keys_bulk_handle; + hg_bulk_t local_vals_size_bulk_handle; + + auto r1 = at_exit([&handle]() { margo_destroy(handle); }); + auto r2 = at_exit([&handle,&out]() { margo_respond(handle, &out); }); + + /* get margo instance and provider */ + margo_instance_id mid = margo_hg_handle_get_instance(handle); + const struct hg_info* info = margo_get_info(handle); + sdskv_provider_t svr_ctx = + (sdskv_provider_t)margo_registered_data(mid, info->id); + if(!svr_ctx) { + out.ret = SDSKV_ERR_UNKNOWN_PR; + return; + } + + /* deserialize input */ + hret = margo_get_input(handle, &in); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r3 = at_exit([&handle,&in]() { margo_free_input(handle, &in); }); + + /* find the target database */ + auto it = svr_ctx->databases.find(in.db_id); + if(it == svr_ctx->databases.end()) { + out.ret = SDSKV_ERR_UNKNOWN_DB; + return; + } + + /* allocate buffers to receive the keys */ + local_keys_buffer.resize(in.keys_bulk_size); + std::vector keys_addr(1); + keys_addr[0] = (void*)local_keys_buffer.data(); + + /* create bulk handle to receive key sizes and packed keys */ + hret = margo_bulk_create(mid, 1, keys_addr.data(), &in.keys_bulk_size, + HG_BULK_WRITE_ONLY, &local_keys_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); }); + + /* allocate buffer to send the values */ + local_vals_size_buffer.resize(in.num_keys); + std::vector vals_sizes_addr(1); + hg_size_t local_vals_size_buffer_size = in.num_keys * sizeof(hg_size_t); + vals_sizes_addr[0] = (void*)local_vals_size_buffer.data(); + + /* create bulk handle to send values sizes */ + hret = margo_bulk_create(mid, 1, vals_sizes_addr.data(), &local_vals_size_buffer_size, + HG_BULK_READ_ONLY, &local_vals_size_bulk_handle); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + auto r7 = at_exit([&local_vals_size_bulk_handle]() { margo_bulk_free(local_vals_size_bulk_handle); }); + + /* transfer keys */ + hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0, + local_keys_bulk_handle, 0, in.keys_bulk_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + /* interpret beginning of the key buffer as a list of key sizes */ + hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data(); + /* find beginning of packed keys */ + char* packed_keys = local_keys_buffer.data() + in.num_keys*sizeof(hg_size_t); + + /* go through the key/value pairs and get the values from the database */ + for(unsigned i=0; i < in.num_keys; i++) { + ds_bulk_t kdata(packed_keys, packed_keys+key_sizes[i]); + ds_bulk_t vdata; + if(it->second->get(kdata, vdata)) { + local_vals_size_buffer[i] = vdata.size(); + } else { + local_vals_size_buffer[i] = 0; + } + packed_keys += key_sizes[i]; + } + + /* do a PUSH operation to push back the value sizes to the client */ + hret = margo_bulk_transfer(mid, HG_BULK_PUSH, info->addr, in.vals_size_bulk_handle, 0, + local_vals_size_bulk_handle, 0, local_vals_size_buffer_size); + if(hret != HG_SUCCESS) { + out.ret = SDSKV_ERR_MERCURY; + return; + } + + return; +} +DEFINE_MARGO_RPC_HANDLER(sdskv_length_multi_ult) + static void sdskv_open_ult(hg_handle_t handle) { @@ -482,7 +795,7 @@ static void sdskv_open_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_open_ult): SDSKV could not find provider with id\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -530,7 +843,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_bulk_put_ult): SDSKV could not find provider\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -617,7 +930,7 @@ static void sdskv_bulk_get_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_bulk_get_ult): SDSKV could not find provider\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -708,7 +1021,7 @@ static void sdskv_erase_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_erase_ult): SDSKV could not find provider\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -761,7 +1074,7 @@ static void sdskv_exists_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - fprintf(stderr, "Error: SDSKV could not find provider\n"); + fprintf(stderr, "Error (sdskv_exists_ult): SDSKV could not find provider\n"); out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -816,7 +1129,7 @@ static void sdskv_list_keys_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - std::cerr << "Error: SDSKV list_keys could not find provider" << std::endl; + std::cerr << "Error (sdskv_list_keys_ult): SDSKV list_keys could not find provider" << std::endl; out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -970,7 +1283,7 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle) sdskv_provider_t svr_ctx = (sdskv_provider_t)margo_registered_data(mid, info->id); if(!svr_ctx) { - std::cerr << "Error: SDSKV list_keyvals could not find provider" << std::endl; + std::cerr << "Error (sdskv_list_keyvals_ult): SDSKV list_keyvals could not find provider" << std::endl; out.ret = SDSKV_ERR_UNKNOWN_PR; margo_respond(handle, &out); margo_destroy(handle); @@ -1538,74 +1851,6 @@ static void sdskv_migrate_all_keys_ult(hg_handle_t handle) } DEFINE_MARGO_RPC_HANDLER(sdskv_migrate_all_keys_ult) -static void sdskv_migrate_database_ult(hg_handle_t handle) -{ - hg_return_t hret; - migrate_database_in_t in; - migrate_database_out_t out; - - /* get the provider handling this request */ - margo_instance_id mid = margo_hg_handle_get_instance(handle); - const struct hg_info* info = margo_get_info(handle); - sdskv_provider_t provider = - (sdskv_provider_t)margo_registered_data(mid, info->id); - if(!provider) { - out.ret = SDSKV_ERR_UNKNOWN_PR; - margo_respond(handle, &out); - margo_destroy(handle); - return; - } - /* get the input */ - hret = margo_get_input(handle, &in); - if(hret != HG_SUCCESS) { - out.ret = SDSKV_ERR_MERCURY; - margo_respond(handle, &out); - margo_destroy(handle); - return; - } - - // TODO implement this operation - out.ret = SDSKV_OP_NOT_IMPL; - out.db_id = 0; - margo_respond(handle, &out); - - margo_free_input(handle, &in); - margo_destroy(handle); -} -DEFINE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult) - -#if 0 -static void __migrate_keys_ult(hg_handle_t handle) -{ - // TODO -} -DEFINE_MARGO_RPC_HANDLER(__migrate_keys_ult) - -static void __migrate_key_range_ult(hg_handle_t handle) -{ - // TODO -} -DEFINE_MARGO_RPC_HANDLER(__migrate_key_range_ult) - -static void __migrate_keys_prefixed_ult(hg_handle_t handle) -{ - // TODO -} -DEFINE_MARGO_RPC_HANDLER(__migrate_keys_prefixed_ult) - -static void __migrate_all_keys_ult(hg_handle_t handle) -{ - // TODO -} -DEFINE_MARGO_RPC_HANDLER(__migrate_all_keys_ult) - -static void __migrate_database_ult(hg_handle_t handle) -{ - // TODO -} -DEFINE_MARGO_RPC_HANDLER(__migrate_database_ult) -#endif - static void sdskv_server_finalize_cb(void *data) { sdskv_provider_t svr_ctx = (sdskv_provider_t)data; diff --git a/test/multi-test.sh b/test/multi-test.sh new file mode 100755 index 0000000..d272b14 --- /dev/null +++ b/test/multi-test.sh @@ -0,0 +1,30 @@ +#!/bin/bash -x + +if [ -z $srcdir ]; then + echo srcdir variable not set. + exit 1 +fi +source $srcdir/test/test-util.sh + +find_db_name + +# start a server with 2 second wait, +# 20s timeout, and my_test_db as database +test_start_server 2 20 $test_db_full + +sleep 1 + +##################### + +run_to 20 test/sdskv-multi-test $svr_addr 1 $test_db_name 100 +if [ $? -ne 0 ]; then + wait + exit 1 +fi + +wait + +echo cleaning up $TMPBASE +rm -rf $TMPBASE + +exit 0 diff --git a/test/sdskv-multi-test.cc b/test/sdskv-multi-test.cc new file mode 100644 index 0000000..4a32c0d --- /dev/null +++ b/test/sdskv-multi-test.cc @@ -0,0 +1,231 @@ +/* + * (C) 2015 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sdskv-client.h" + +static std::string gen_random_string(size_t len); + +int main(int argc, char *argv[]) +{ + char cli_addr_prefix[64] = {0}; + char *sdskv_svr_addr_str; + char *db_name; + margo_instance_id mid; + hg_addr_t svr_addr; + uint8_t mplex_id; + uint32_t num_keys; + sdskv_client_t kvcl; + sdskv_provider_handle_t kvph; + hg_return_t hret; + int ret; + + if(argc != 5) + { + fprintf(stderr, "Usage: %s \n", argv[0]); + fprintf(stderr, " Example: %s tcp://localhost:1234 1 foo 1000\n", argv[0]); + return(-1); + } + sdskv_svr_addr_str = argv[1]; + mplex_id = atoi(argv[2]); + db_name = argv[3]; + num_keys = atoi(argv[4]); + + /* initialize Margo using the transport portion of the server + * address (i.e., the part before the first : character if present) + */ + for(unsigned i=0; (i<63 && sdskv_svr_addr_str[i] != '\0' && sdskv_svr_addr_str[i] != ':'); i++) + cli_addr_prefix[i] = sdskv_svr_addr_str[i]; + + /* start margo */ + mid = margo_init(cli_addr_prefix, MARGO_SERVER_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) + { + fprintf(stderr, "Error: margo_init()\n"); + return(-1); + } + + ret = sdskv_client_init(mid, &kvcl); + if(ret != 0) + { + fprintf(stderr, "Error: sdskv_client_init()\n"); + margo_finalize(mid); + return -1; + } + + /* look up the SDSKV server address */ + hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr); + if(hret != HG_SUCCESS) + { + fprintf(stderr, "Error: margo_addr_lookup()\n"); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return(-1); + } + + /* create a SDSKV provider handle */ + ret = sdskv_provider_handle_create(kvcl, svr_addr, mplex_id, &kvph); + if(ret != 0) + { + fprintf(stderr, "Error: sdskv_provider_handle_create()\n"); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return(-1); + } + + /* open the database */ + sdskv_database_id_t db_id; + ret = sdskv_open(kvph, db_name, &db_id); + if(ret == 0) { + printf("Successfuly open database %s, id is %ld\n", db_name, db_id); + } else { + fprintf(stderr, "Error: could not open database %s\n", db_name); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return(-1); + } + + /* **** generate multiple key/vals ***** */ + std::vector keys; + std::vector vals; + std::map reference; + size_t max_value_size = 24; + + for(unsigned i=0; i < num_keys; i++) { + auto k = gen_random_string(16); + auto v = gen_random_string(3+i*(max_value_size-3)/num_keys); + reference[k] = v; + keys.push_back(k); + vals.push_back(v); + } + + std::vector keys_ptr(keys.size()); + std::vector vals_ptr(vals.size()); + std::vector keys_size(keys.size()); + std::vector vals_size(vals.size()); + for(unsigned i=0; i < num_keys; i++) { + keys_ptr[i] = (const void*)keys[i].data(); + vals_ptr[i] = (const void*)vals[i].data(); + keys_size[i] = keys[i].size()+1; // +1 because of the null character + vals_size[i] = vals[i].size()+1; + } + + /* **** issue a put_multi ***** */ + ret = sdskv_put_multi(kvph, db_id, num_keys, &keys_ptr[0], keys_size.data(), + &vals_ptr[0], vals_size.data()); + if(ret != 0) { + fprintf(stderr, "Error: sdskv_put_multi() failed\n"); + sdskv_shutdown_service(kvcl, svr_addr); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return -1; + } + printf("Successfuly inserted %d keys\n", num_keys); + + /* retrieve the length of the values */ + std::vector rval_len(num_keys); + + ret = sdskv_length_multi(kvph, db_id, num_keys, + keys_ptr.data(), keys_size.data(), rval_len.data()); + if(ret != 0) { + fprintf(stderr, "Error: sdskv_length_multi() failed\n"); + sdskv_shutdown_service(kvcl, svr_addr); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return -1; + } + + /* check if the lengths are correct */ + for(unsigned i=0; i < num_keys; i++) { + if(rval_len[i] != vals_size[i]) { + fprintf(stderr, "Error: value %d doesn't have the right length (%ld != %ld)\n", i, + rval_len[i], vals_size[i]); + sdskv_shutdown_service(kvcl, svr_addr); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return -1; + } + } + + /* **** get keys **** */ + std::vector> read_values(num_keys); + for(unsigned i=0; i < num_keys; i++) { + read_values[i].resize(rval_len[i]); + } + std::vector read_values_ptr(num_keys); + for(unsigned i=0; i < num_keys; i++) { + read_values_ptr[i] = read_values[i].data(); + } + + ret = sdskv_get_multi(kvph, db_id, num_keys, + keys_ptr.data(), keys_size.data(), + read_values_ptr.data(), rval_len.data()); + if(ret != 0) { + fprintf(stderr, "Error: sdskv_get_multi() failed\n"); + sdskv_shutdown_service(kvcl, svr_addr); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return -1; + } + + /* check the keys we received against reference */ + for(unsigned i=0; i < num_keys; i++) { + std::string vstring(read_values[i].data()); + auto& k = keys[i]; + std::cout << "Got " << k << " ===> " << vstring << "\t" << "expected: " << reference[k] << std::endl; + if(vstring != reference[k]) { + fprintf(stderr, "Error: sdskv_get_multi() returned a value different from the reference\n"); + sdskv_shutdown_service(kvcl, svr_addr); + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return -1; + } + } + + /* shutdown the server */ + ret = sdskv_shutdown_service(kvcl, svr_addr); + + /**** cleanup ****/ + sdskv_provider_handle_release(kvph); + margo_addr_free(mid, svr_addr); + sdskv_client_finalize(kvcl); + margo_finalize(mid); + return(ret); +} + +static std::string gen_random_string(size_t len) { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + std::string s(len, ' '); + for (unsigned i = 0; i < len; ++i) { + s[i] = alphanum[rand() % (sizeof(alphanum) - 1)]; + } + return s; +} -- 2.26.2