Commit 6cd75051 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented object migration function

parent acdce7fb
......@@ -30,7 +30,7 @@ typedef struct bake_provider_handle* bake_provider_handle_t;
* @param[in] mid margo instance
* @param[out] client resulting bake client object
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_client_init(margo_instance_id mid, bake_client_t* client);
......@@ -42,7 +42,7 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client);
*
* @param client BAKE client to destroy
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_client_finalize(bake_client_t client);
......@@ -54,7 +54,7 @@ int bake_client_finalize(bake_client_t client);
* @param provider_id id of the provider
* @param handle resulting handle
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_provider_handle_create(
bake_client_t client,
......@@ -67,7 +67,7 @@ int bake_provider_handle_create(
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_provider_handle_ref_incr(bake_provider_handle_t handle);
......@@ -78,7 +78,7 @@ int bake_provider_handle_ref_incr(bake_provider_handle_t handle);
* @param[in] handle provider handle
* @param[out] limit limit
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_provider_handle_get_eager_limit(bake_provider_handle_t handle, uint64_t* limit);
......@@ -89,7 +89,7 @@ int bake_provider_handle_get_eager_limit(bake_provider_handle_t handle, uint64_t
* @param[in] handle provider handle
* @param[in] limit limit
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_provider_handle_set_eager_limit(bake_provider_handle_t handle, uint64_t limit);
......@@ -100,7 +100,7 @@ int bake_provider_handle_set_eager_limit(bake_provider_handle_t handle, uint64_t
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_provider_handle_release(bake_provider_handle_t handle);
......@@ -113,7 +113,7 @@ int bake_provider_handle_release(bake_provider_handle_t handle);
* @param [in] max_targets maximum number of targets to retrieve
* @param [out] bti array of BAKE target identifiers with enough space for max_targets
* @param [out] num_targets number of targets returned (at most max_targets)
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_probe(
bake_provider_handle_t provider,
......@@ -131,7 +131,7 @@ int bake_probe(
* @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_create(
bake_provider_handle_t provider,
......@@ -152,7 +152,7 @@ int bake_create(
* @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_write(
bake_provider_handle_t provider,
......@@ -172,7 +172,7 @@ int bake_write(
* @param [in] remote_offset offset in the remote bulk handle to write from
* @param [in] remote_addr address string of the remote target to write from
* @param [in] size size to write from remote bulk handle
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_proxy_write(
bake_provider_handle_t provider,
......@@ -189,7 +189,7 @@ int bake_proxy_write(
*
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_persist(
bake_provider_handle_t provider,
......@@ -204,7 +204,7 @@ int bake_persist(
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_create_write_persist(
bake_provider_handle_t provider,
......@@ -222,7 +222,7 @@ int bake_create_write_persist(
* @param [in] remote_addr address string of the remote target to write from
* @param [in] size size to write from remote bulk handle
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_create_write_persist_proxy(
bake_provider_handle_t provider,
......@@ -239,7 +239,7 @@ int bake_create_write_persist_proxy(
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @param [out] size size of region
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_get_size(
bake_provider_handle_t provider,
......@@ -264,7 +264,7 @@ int bake_get_size(
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @param [out] ptr pointer to the address of the data
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_get_data(
bake_provider_handle_t provider,
......@@ -283,7 +283,7 @@ int bake_get_data(
* @param [in] buf local memory buffer read into
* @param [in] buf_size size of local memory buffer to read into
* @param [out] bytes_read number of bytes effectively read into the buffer
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_read(
bake_provider_handle_t provider,
......@@ -305,7 +305,7 @@ int bake_read(
* @param [in] remote_addr address string of the remote target to read to
* @param [in] size size to read to remote bulk handle
* @param [out] bytes_read number of bytes effectively read
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_proxy_read(
bake_provider_handle_t provider,
......@@ -317,13 +317,39 @@ int bake_proxy_read(
uint64_t size,
uint64_t* bytes_read);
/**
* @brief Requests the source provider to migrate a particular
* region (source_rid) to a destination provider. After the call,
* the designated region will have been removed from the source
* and the dest_rid parameter will be set to the new region id
* in the destination provider.
*
* @param source Source provider.
* @param source_rid Region to migrate.
* @param remove_source Whether the source region should be removed.
* @param dest_addr Address of the destination provider.
* @param dest_provider_id Id of the destination provider.
* @param dest_target_id Destination target.
* @param dest_rid Resulting region id in the destination target.
*
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_migrate(
bake_provider_handle_t source,
bake_region_id_t source_rid,
int remove_source,
const char* dest_addr,
uint16_t dest_provider_id,
bake_target_id_t dest_target_id,
bake_region_id_t* dest_rid);
/**
* Shuts down a remote BAKE service (given an address).
* This will shutdown all the providers on the target address.
*
* @param [in] client BAKE client
* @param [in] addr address of the server
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_shutdown_service(
bake_client_t client,
......@@ -333,12 +359,17 @@ int bake_shutdown_service(
* Issues a BAKE no-op operation.
*
* @param [in] provider provider handle
* @returns 0 on success, -1 on failure
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_noop(bake_provider_handle_t provider);
/**
* Removes a previously persisted BAKE region and frees its associated memory.
*
* @param provider Provider in which to remove the region.
* @param rid Region to remove.
*
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_remove(
bake_provider_handle_t provider,
......
......@@ -34,6 +34,7 @@ struct bake_client
hg_id_t bake_read_id;
hg_id_t bake_noop_id;
hg_id_t bake_remove_id;
hg_id_t bake_migrate_id;
uint64_t num_provider_handles;
};
......@@ -69,6 +70,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
margo_registered_name(mid, "bake_migrate_rpc", &client->bake_migrate_id, &flag);
} else { /* RPCs not already registered */
......@@ -108,6 +110,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client->bake_remove_id =
MARGO_REGISTER(mid, "bake_remove_rpc",
bake_remove_in_t, bake_remove_out_t, NULL);
client->bake_migrate_id =
MARGO_REGISTER(mid, "bake_migrate_rpc",
bake_migrate_in_t, bake_migrate_out_t, NULL);
}
return BAKE_SUCCESS;
......@@ -716,6 +721,56 @@ int bake_get_data(
return(ret);
}
int bake_migrate(
bake_provider_handle_t source,
bake_region_id_t source_rid,
int remove_source,
const char* dest_addr,
uint16_t dest_provider_id,
bake_target_id_t dest_target_id,
bake_region_id_t* dest_rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_migrate_in_t in;
bake_migrate_out_t out;
int ret;
in.source_rid = source_rid;
in.remove_src = remove_source;
in.dest_addr = dest_addr;
in.dest_provider_id = dest_provider_id;
in.dest_target_id = dest_target_id;
hret = margo_create(source->client->mid, source->addr,
source->client->bake_migrate_id, &handle);
if(hret != HG_SUCCESS)
return BAKE_ERR_MERCURY;
hret = margo_provider_forward(source->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
ret = out.ret;
if(ret == BAKE_SUCCESS)
*dest_rid = out.dest_rid;
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_noop(bake_provider_handle_t provider)
{
hg_return_t hret;
......
......@@ -16,8 +16,6 @@
static inline hg_return_t hg_proc_bake_target_id_t(hg_proc_t proc, bake_target_id_t *bti);
static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid);
static inline hg_return_t hg_proc_bake_probe_out_t(hg_proc_t proc, void* out);
/* BAKE shutdown */
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
/* BAKE create */
MERCURY_GEN_PROC(bake_create_in_t,
......@@ -26,7 +24,6 @@ MERCURY_GEN_PROC(bake_create_in_t,
MERCURY_GEN_PROC(bake_create_out_t,
((int32_t)(ret))\
((bake_region_id_t)(rid)))
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
/* BAKE write */
MERCURY_GEN_PROC(bake_write_in_t,
......@@ -38,7 +35,6 @@ MERCURY_GEN_PROC(bake_write_in_t,
((hg_string_t)(remote_addr_str)))
MERCURY_GEN_PROC(bake_write_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
/* BAKE eager write */
typedef struct
......@@ -51,14 +47,12 @@ typedef struct
static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_out_p);
MERCURY_GEN_PROC(bake_eager_write_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
/* BAKE persist */
MERCURY_GEN_PROC(bake_persist_in_t,
((bake_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_persist_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
/* BAKE create/write/persist */
MERCURY_GEN_PROC(bake_create_write_persist_in_t,
......@@ -72,7 +66,6 @@ MERCURY_GEN_PROC(bake_create_write_persist_in_t,
MERCURY_GEN_PROC(bake_create_write_persist_out_t,
((int32_t)(ret))\
((bake_region_id_t)(rid)))
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
/* BAKE get size */
MERCURY_GEN_PROC(bake_get_size_in_t,
......@@ -80,7 +73,6 @@ MERCURY_GEN_PROC(bake_get_size_in_t,
MERCURY_GEN_PROC(bake_get_size_out_t,
((int32_t)(ret))\
((uint64_t)(size)))
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
/* BAKE get data */
MERCURY_GEN_PROC(bake_get_data_in_t,
......@@ -88,7 +80,6 @@ MERCURY_GEN_PROC(bake_get_data_in_t,
MERCURY_GEN_PROC(bake_get_data_out_t,
((int32_t)(ret))\
((uint64_t)(ptr)))
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
/* BAKE read */
MERCURY_GEN_PROC(bake_read_in_t,
......@@ -101,7 +92,6 @@ MERCURY_GEN_PROC(bake_read_in_t,
MERCURY_GEN_PROC(bake_read_out_t,
((hg_size_t)(size))\
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
/* BAKE eager read */
MERCURY_GEN_PROC(bake_eager_read_in_t,
......@@ -115,7 +105,6 @@ typedef struct
char * buffer;
} bake_eager_read_out_t;
static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_out_p);
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
/* BAKE probe */
MERCURY_GEN_PROC(bake_probe_in_t,
......@@ -126,17 +115,23 @@ typedef struct
uint64_t num_targets;
bake_target_id_t* targets;
} bake_probe_out_t;
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
/* BAKE noop */
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
/* BAKE remove */
MERCURY_GEN_PROC(bake_remove_in_t,
((bake_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_remove_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
/* BAKE migrate */
MERCURY_GEN_PROC(bake_migrate_in_t,
((bake_region_id_t)(source_rid))\
((int32_t)(remove_src))\
((hg_const_string_t)(dest_addr))\
((uint16_t)(dest_provider_id))\
((bake_target_id_t)(dest_target_id)))
MERCURY_GEN_PROC(bake_migrate_out_t,
((int32_t)(ret))\
((bake_region_id_t)(dest_rid)))
static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid)
{
......
......@@ -12,6 +12,21 @@
#include "uthash.h"
#include "bake-rpc.h"
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_ult)
/* definition of BAKE root data structure (just a uuid for now) */
typedef struct
{
......@@ -41,6 +56,7 @@ typedef struct bake_server_context_t
{
uint64_t num_targets;
bake_pmem_entry_t* targets;
hg_id_t bake_create_write_persist_id;
} bake_server_context_t;
static void bake_server_finalize_cb(void *data);
......@@ -124,6 +140,7 @@ int bake_provider_register(
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
tmp_svr_ctx->bake_create_write_persist_id = rpc_id;
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, provider_id, abt_pool);
......@@ -147,6 +164,10 @@ int bake_provider_register(
bake_remove_in_t, bake_remove_out_t, bake_remove_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_rpc",
bake_migrate_in_t, bake_migrate_out_t, bake_migrate_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -1103,6 +1124,149 @@ static void bake_remove_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_remove_ult)
static void bake_migrate_ult(hg_handle_t handle)
{
bake_migrate_in_t in;
bake_migrate_out_t out;
hg_return_t hret;
pmemobj_region_id_t* prid;
memset(&out, 0, sizeof(out));
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.source_rid.data;
/* find memory address for target object */
region_content_t* region = pmemobj_direct(prid->oid);
if(!region)
{
out.ret = BAKE_ERR_UNKNOWN_REGION;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* get the size of the region */
size_t region_size = region->size;
char* region_data = region->data;
/* lookup the address of the destination provider */
hg_addr_t dest_addr;
hret = margo_addr_lookup(mid, in.dest_addr, &dest_addr);
if(hret != HG_SUCCESS) {
out.ret = BAKE_ERR_MERCURY;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
{ /* in this block we issue a create_write_persist to the destination */
hg_handle_t cwp_handle;
bake_create_write_persist_in_t cwp_in;
bake_create_write_persist_out_t cwp_out;
cwp_in.bti = in.dest_target_id;
cwp_in.bulk_offset = 0;
cwp_in.bulk_size = region_size;
cwp_in.remote_addr_str = NULL;
hret = margo_bulk_create(mid, 1, (void**)(&region_data), &region_size,
HG_BULK_READ_ONLY, &cwp_in.bulk_handle);
if(hret != HG_SUCCESS) {
out.ret = BAKE_ERR_MERCURY;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_create(mid, dest_addr,
svr_ctx->bake_create_write_persist_id, &cwp_handle);
if(hret != HG_SUCCESS) {
out.ret = BAKE_ERR_MERCURY;
margo_bulk_free(cwp_in.bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_provider_forward(in.dest_provider_id, cwp_handle, &cwp_in);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
margo_bulk_free(cwp_in.bulk_handle);
margo_destroy(cwp_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_output(handle, &cwp_out);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
margo_bulk_free(cwp_in.bulk_handle);
margo_destroy(cwp_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
if(cwp_out.ret != BAKE_SUCCESS)
{
out.ret = cwp_out.ret;
margo_bulk_free(cwp_in.bulk_handle);
margo_destroy(cwp_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
out.dest_rid = cwp_out.rid;
margo_free_output(cwp_handle, &cwp_out);
margo_bulk_free(cwp_in.bulk_handle);
margo_destroy(cwp_handle);
} /* end of create-write-persist block */
if(in.remove_src) {
pmemobj_free(&prid->oid);
}
out.ret = BAKE_SUCCESS;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_migrate_ult)
static void bake_server_finalize_cb(void *data)
{
bake_server_context_t *svr_ctx = (bake_server_context_t *)data;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment