diff --git a/include/bake-client.h b/include/bake-client.h index 8557ed54f6b29165363e12ca2ac65b93ea0209da..a66513c54a03af0bc7a02126d0ff1602c4b46a54 100644 --- a/include/bake-client.h +++ b/include/bake-client.h @@ -101,6 +101,26 @@ int bake_persist( bake_target_id_t bti, bake_region_id_t rid); +/** + * Creates a bounded-size BAKE region, writes data into it, and persists + * the reason all in one call/RPC (and thus 1 RTT). + * + * @param [in] bti BAKE target identifier + * @param [in] region_size size of region to be created + * @param [in] region_offset offset into the target region to write + * @param [in] buf local memory buffer to write + * @param [in] buf_size size of local memory buffer to write + * @param [out] rid identifier for new region + * @returns 0 on success, -1 on failure + */ +int bake_create_write_persist( + bake_target_id_t bti, + uint64_t region_size, + uint64_t region_offset, + void const *buf, + uint64_t buf_size, + bake_region_id_t *rid); + /** * Checks the size of an existing BAKE region. * diff --git a/src/bake-client.c b/src/bake-client.c index 0a48aae12578721cec324203dac44db11bb05922..f15068e7d69a30725d50d4182e4ea35db254eebc 100644 --- a/src/bake-client.c +++ b/src/bake-client.c @@ -29,6 +29,7 @@ struct bake_margo_instance hg_id_t bake_eager_read_id; hg_id_t bake_write_id; hg_id_t bake_persist_id; + hg_id_t bake_create_write_persist_id; hg_id_t bake_get_size_id; hg_id_t bake_read_id; hg_id_t bake_noop_id; @@ -59,45 +60,38 @@ static int bake_margo_instance_init(margo_instance_id mid) /* register RPCs */ g_margo_inst.bake_probe_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_probe_rpc", void, bake_probe_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_probe_rpc", + void, bake_probe_out_t, NULL); g_margo_inst.bake_shutdown_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_shutdown_rpc", void, void, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_shutdown_rpc", + void, void, NULL); g_margo_inst.bake_create_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_create_rpc", bake_create_in_t, bake_create_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_create_rpc", + bake_create_in_t, bake_create_out_t, NULL); g_margo_inst.bake_write_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_write_rpc", bake_write_in_t, bake_write_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_write_rpc", + bake_write_in_t, bake_write_out_t, NULL); g_margo_inst.bake_eager_write_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_eager_write_rpc", bake_eager_write_in_t, bake_eager_write_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_eager_write_rpc", + bake_eager_write_in_t, bake_eager_write_out_t, NULL); g_margo_inst.bake_eager_read_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_eager_read_rpc", bake_eager_read_in_t, bake_eager_read_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_eager_read_rpc", + bake_eager_read_in_t, bake_eager_read_out_t, NULL); g_margo_inst.bake_persist_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_persist_rpc", bake_persist_in_t, bake_persist_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_persist_rpc", + bake_persist_in_t, bake_persist_out_t, NULL); + g_margo_inst.bake_create_write_persist_id = + MARGO_REGISTER(g_margo_inst.mid, "bake_create_write_persist_rpc", + bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL); g_margo_inst.bake_get_size_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_get_size_rpc", + bake_get_size_in_t, bake_get_size_out_t, NULL); g_margo_inst.bake_read_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_read_rpc", bake_read_in_t, bake_read_out_t, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_read_rpc", + bake_read_in_t, bake_read_out_t, NULL); g_margo_inst.bake_noop_id = - MARGO_REGISTER(g_margo_inst.mid, - "bake_noop_rpc", void, void, - NULL); + MARGO_REGISTER(g_margo_inst.mid, "bake_noop_rpc", + void, void, NULL); return(0); } @@ -480,6 +474,73 @@ int bake_persist( return(ret); } +int bake_create_write_persist( + bake_target_id_t bti, + uint64_t region_size, + uint64_t region_offset, + void const *buf, + uint64_t buf_size, + bake_region_id_t *rid) +{ + hg_return_t hret; + hg_handle_t handle; + bake_create_write_persist_in_t in; + bake_create_write_persist_out_t out; + int ret; + struct bake_instance *instance = NULL; + + /* XXX eager path? */ + + HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance); + if(!instance) + return(-1); + + in.bti = bti; + in.region_size = region_size; + in.region_offset = region_offset; + in.bulk_offset = 0; + in.bulk_size = buf_size; + in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */ + + hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size, + HG_BULK_READ_ONLY, &in.bulk_handle); + if(hret != HG_SUCCESS) + return(-1); + + hret = margo_create(g_margo_inst.mid, instance->dest, + g_margo_inst.bake_create_write_persist_id, &handle); + if(hret != HG_SUCCESS) + { + margo_bulk_free(in.bulk_handle); + return(-1); + } + + hret = margo_forward(handle, &in); + if(hret != HG_SUCCESS) + { + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); + return(-1); + } + + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) + { + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); + return(-1); + } + + ret = out.ret; + if(ret == 0) + *rid = out.rid; + + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); + return(ret); +} + int bake_get_size( bake_target_id_t bti, bake_region_id_t rid, diff --git a/src/bake-rpc.h b/src/bake-rpc.h index 6d09f4e286206b98fd755c83f948c924a95ad58d..9fe67a2a307515f82b5fecbdc696bedc743c3f4c 100644 --- a/src/bake-rpc.h +++ b/src/bake-rpc.h @@ -62,6 +62,20 @@ MERCURY_GEN_PROC(bake_persist_out_t, ((int32_t)(ret))) DECLARE_MARGO_RPC_HANDLER(bake_persist_ult) +/* BAKE create/write/persist */ +MERCURY_GEN_PROC(bake_create_write_persist_in_t, + ((bake_target_id_t)(bti))\ + ((uint64_t)(region_size))\ + ((uint64_t)(region_offset))\ + ((hg_bulk_t)(bulk_handle))\ + ((uint64_t)(bulk_offset))\ + ((uint64_t)(bulk_size))\ + ((hg_string_t)(remote_addr_str))) +MERCURY_GEN_PROC(bake_create_write_persist_out_t, + ((int32_t)(ret))\ + ((bake_region_id_t)(rid))) +DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult) + /* BAKE get size */ MERCURY_GEN_PROC(bake_get_size_in_t, ((bake_target_id_t)(bti))\ diff --git a/src/bake-server.c b/src/bake-server.c index 3286b5f83fcf8d83598ef4f4a72e8512acf54644..b849439b6e4d60e862d89ae44a26d4de327bab00 100644 --- a/src/bake-server.c +++ b/src/bake-server.c @@ -134,6 +134,9 @@ int bake_server_init( bake_eager_read_in_t, bake_eager_read_out_t, bake_eager_read_ult); MARGO_REGISTER(mid, "bake_persist_rpc", bake_persist_in_t, bake_persist_out_t, bake_persist_ult); + MARGO_REGISTER(mid, "bake_create_write_persist_rpc", + bake_create_write_persist_in_t, bake_create_write_persist_out_t, + bake_create_write_persist_ult); MARGO_REGISTER(mid, "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t, bake_get_size_ult); MARGO_REGISTER(mid, "bake_read_rpc", @@ -286,6 +289,7 @@ static void bake_write_ult(hg_handle_t handle) hgi = margo_get_info(handle); assert(hgi); mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_get_input(handle, &in); if(hret != HG_SUCCESS) @@ -461,6 +465,125 @@ static void bake_persist_ult(hg_handle_t handle) } DEFINE_MARGO_RPC_HANDLER(bake_persist_ult) +static void bake_create_write_persist_ult(hg_handle_t handle) +{ + bake_create_write_persist_out_t out; + bake_create_write_persist_in_t in; + hg_addr_t src_addr; + char* buffer; + hg_bulk_t bulk_handle; + const struct hg_info *hgi; + margo_instance_id mid; + hg_return_t hret; + int ret; + pmemobj_region_id_t* prid; + + assert(g_svr_ctx); + + /* TODO: this check needs to be somewhere else */ + assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE); + + memset(&out, 0, sizeof(out)); + + hgi = margo_get_info(handle); + assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); + + hret = margo_get_input(handle, &in); + if(hret != HG_SUCCESS) + { + out.ret = -1; + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + + prid = (pmemobj_region_id_t*)out.rid.data; + prid->size = in.region_size; + ret = pmemobj_alloc(g_svr_ctx->pmem_pool, &prid->oid, + in.region_size, 0, NULL, NULL); + if(ret != 0) + { + out.ret = -1; + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + + /* find memory address for target object */ + buffer = pmemobj_direct(prid->oid); + if(!buffer) + { + out.ret = -1; + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + + /* create bulk handle for local side of transfer */ + hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size, + HG_BULK_WRITE_ONLY, &bulk_handle); + if(hret != HG_SUCCESS) + { + out.ret = -1; + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + + if(in.remote_addr_str) + { + /* a proxy address was provided to pull write data from */ + hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr); + if(hret != HG_SUCCESS) + { + out.ret = -1; + margo_bulk_free(bulk_handle); + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + } + else + { + /* no proxy write, use the source of this request */ + src_addr = hgi->addr; + } + + hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle, + in.bulk_offset, bulk_handle, 0, in.bulk_size); + if(hret != HG_SUCCESS) + { + out.ret = -1; + if(in.remote_addr_str) + margo_addr_free(mid, src_addr); + margo_bulk_free(bulk_handle); + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; + } + + /* TODO: should this have an abt shim in case it blocks? */ + pmemobj_persist(g_svr_ctx->pmem_pool, buffer, prid->size); + + out.ret = 0; + + if(in.remote_addr_str) + margo_addr_free(mid, src_addr); + margo_bulk_free(bulk_handle); + margo_free_input(handle, &in); + margo_respond(handle, &out); + margo_destroy(handle); + return; +} +DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult) + /* service a remote RPC that retrieves the size of a BAKE region */ static void bake_get_size_ult(hg_handle_t handle) {