diff --git a/include/margo.h b/include/margo.h index 20341ff85ed4a7a6f52d70b632223d7c48c6e251..b332b282f9204fb028bc9d685ad4767636deca4a 100644 --- a/include/margo.h +++ b/include/margo.h @@ -76,7 +76,8 @@ margo_instance_id margo_init_pool( * Shuts down margo library and its underlying abt and mercury resources * @param [in] mid Margo instance */ -void margo_finalize(margo_instance_id mid); +void margo_finalize( + margo_instance_id mid); /** * Suspends the caller until some other entity (e.g. an RPC, thread, or @@ -88,44 +89,65 @@ void margo_finalize(margo_instance_id mid); * * @param [in] mid Margo instance */ -void margo_wait_for_finalize(margo_instance_id mid); +void margo_wait_for_finalize( + margo_instance_id mid); /** * Registers an RPC with margo - * @param [in] mid Margo instance - * @param [in] id Mercury RPC identifier + * + * \param [in] mid Margo instance + * \param [in] func_name unique function name for RPC + * \param [in] in_proc_cb pointer to input proc callback + * \param [in] out_proc_cb pointer to output proc callback + * \param [in] rpc_cb RPC callback + * + * \return unique ID associated to the registered function */ -int margo_register(margo_instance_id mid, hg_id_t id); +hg_id_t margo_register_name( + margo_instance_id mid, + const char *func_name, + hg_proc_cb_t in_proc_cb, + hg_proc_cb_t out_proc_cb, + hg_rpc_cb_t rpc_cb); /** * Registers an RPC with margo that is associated with a multiplexed service - * @param [in] mid Margo instance - * @param [in] id Mercury RPC identifier - * @param [in] mplex_id multiplexing identifier - * @param [in] pool Argobots pool the handler will execute in + * + * \param [in] mid Margo instance + * \param [in] func_name unique function name for RPC + * \param [in] in_proc_cb pointer to input proc callback + * \param [in] out_proc_cb pointer to output proc callback + * \param [in] rpc_cb RPC callback + * \param [in] mplex_id multiplexing identifier + * \param [in] pool Argobots pool the handler will execute in + * + * \return unique ID associated to the registered function */ -int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool); +hg_id_t margo_register_name_mplex( + margo_instance_id mid, + const char *func_name, + hg_proc_cb_t in_proc_cb, + hg_proc_cb_t out_proc_cb, + hg_rpc_cb_t rpc_cb, + uint32_t mplex_id, + ABT_pool pool); /* - * Indicate whether HG_Register_name() has been called for the RPC specified by + * Indicate whether margo_register_name() has been called for the RPC specified by * func_name. * - * \param hg_class [IN] pointer to HG class - * \param func_name [IN] function name - * \param id [OUT] registered RPC ID - * \param flag [OUT] pointer to boolean + * \param [in] mid Margo instance + * \param [in] func_name function name + * \param [out] id registered RPC ID + * \param [out] flag pointer to boolean * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Registered_name( - hg_class_t *hg_class, - const char *func_name, - hg_id_t *id, - hg_bool_t *flag - ); -*/ +hg_return_t margo_registered_name( + margo_instance_id mid, + const char *func_name, + hg_id_t *id, + hg_bool_t *flag); /** * Register and associate user data to registered function. @@ -154,264 +176,192 @@ hg_return_t margo_register_data( * * \return Pointer to data or NULL */ -void* margo_registered_data(margo_instance_id mid, hg_id_t id); +void* margo_registered_data( + margo_instance_id mid, + hg_id_t id); /** - * Disable response for a given RPC ID. This allows an origin process to send an - * RPC to a target without waiting for a response. The RPC completes locally and - * the callback on the origin is therefore pushed to the completion queue once - * the RPC send is completed. By default, all RPCs expect a response to - * be sent back. + * Disable response for a given RPC ID. * - * \param hg_class [IN] pointer to HG class - * \param id [IN] registered function ID - * \param disable [IN] boolean (HG_TRUE to disable - * HG_FALSE to re-enable) + * \param [in] mid Margo instance + * \param [in] id registered function ID + * \param [in] disable_flag flag to disable (1) or re-enable (0) responses * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Registered_disable_response( - hg_class_t *hg_class, - hg_id_t id, - hg_bool_t disable - ); -*/ +hg_return_t margo_registered_disable_response( + margo_instance_id mid, + hg_id_t id, + int disable_flag); /** * Lookup an addr from a peer address/name. - * @param [in] name lookup name - * @param [out] addr return address - * @returns HG_SUCCESS on success + * \param [in] name lookup name + * \param [out] addr return address + * + * \return HG_SUCCESS or corresponding HG error code */ hg_return_t margo_addr_lookup( margo_instance_id mid, - const char *name, - hg_addr_t *addr); + const char *name, + hg_addr_t *addr); /** - * Free the addr from the list of peers. + * Free the given Mercury addr. * - * \param hg_class [IN] pointer to HG class - * \param addr [IN] abstract address + * \param [in] mid Margo instance + * \param [in] addr Mercury address * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Addr_free( - hg_class_t *hg_class, - hg_addr_t addr - ); -*/ +hg_return_t margo_addr_free( + margo_instance_id mid, + hg_addr_t addr); /** - * Access self address. Address must be freed with HG_Addr_free(). + * Access self address. Address must be freed with margo_addr_free(). * - * \param hg_class [IN] pointer to HG class - * \param addr [OUT] pointer to abstract address + * \param [in] mid Margo instance + * \param [in] addr pointer to abstract Mercury address * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Addr_self( - hg_class_t *hg_class, - hg_addr_t *addr - ); -*/ +hg_return_t margo_addr_self( + margo_instance_id mid, + hg_addr_t *addr); /** - * Duplicate an existing HG abstract address. The duplicated address can be - * stored for later use and the origin address be freed safely. The duplicated - * address must be freed with HG_Addr_free(). + * Duplicate an existing Mercury address. * - * \param hg_class [IN] pointer to HG class - * \param addr [IN] abstract address - * \param new_addr [OUT] pointer to abstract address + * \param [in] mid Margo instance + * \param [in] addr abstract Mercury address to duplicate + * \param [in] new_addr pointer to newly allocated abstract Mercury address * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Addr_dup( - hg_class_t *hg_class, - hg_addr_t addr, - hg_addr_t *new_addr - ); -*/ +hg_return_t margo_addr_dup( + margo_instance_id mid, + hg_addr_t addr, + hg_addr_t *new_addr); /** - * Convert an addr to a string (returned string includes the terminating - * null byte '\0'). If buf is NULL, the address is not converted and only - * the required size of the buffer is returned. If the input value passed - * through buf_size is too small, HG_SIZE_ERROR is returned and the buf_size - * output is set to the minimum size required. + * Convert a Mercury addr to a string (returned string includes the + * terminating null byte '\0'). If buf is NULL, the address is not + * converted and only the required size of the buffer is returned. + * If the input value passed through buf_size is too small, HG_SIZE_ERROR + * is returned and the buf_size output is set to the minimum size required. * - * \param hg_class [IN] pointer to HG class - * \param buf [IN/OUT] pointer to destination buffer - * \param buf_size [IN/OUT] pointer to buffer size - * \param addr [IN] abstract address + * \param [in] mid Margo instance + * \param [in/out] buf pointer to destination buffer + * \param [in/out] buf_size pointer to buffer size + * \param [in] addr abstract Mercury address * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Addr_to_string( - hg_class_t *hg_class, - char *buf, - hg_size_t *buf_size, - hg_addr_t addr - ); -*/ +hg_return_t margo_addr_to_string( + margo_instance_id mid, + char *buf, + hg_size_t *buf_size, + hg_addr_t addr); /** - * Initiate a new HG RPC using the specified function ID and the local/remote - * target defined by addr. The HG handle created can be used to query input - * and output, as well as issuing the RPC by calling HG_Forward(). - * After completion the handle must be freed using HG_Destroy(). + * Initiate a new Mercury RPC using the specified function ID and the + * local/remote target defined by addr. The handle created can be used to + * query input and output, as well as issuing the RPC by calling + * HG_Forward(). After completion the handle must be freed using HG_Destroy(). * - * \param context [IN] pointer to HG context - * \param addr [IN] abstract network address of destination - * \param id [IN] registered function ID - * \param handle [OUT] pointer to HG handle + * \param [in] mid Margo instance + * \param [in] addr abstract Mercury address of destination + * \param [in] id registered function ID + * \param [out] handle pointer to HG handle * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Create( - hg_context_t *context, - hg_addr_t addr, - hg_id_t id, - hg_handle_t *handle - ); -*/ +hg_return_t margo_create( + margo_instance_id mid, + hg_addr_t addr, + hg_id_t id, + hg_handle_t *handle); /** - * Destroy HG handle. Decrement reference count, resources associated to the - * handle are freed when the reference count is null. + * Destroy Mercury handle. * - * \param handle [IN] HG handle + * \param [in] handle Mercury handle * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Destroy( - hg_handle_t handle - ); -*/ +hg_return_t margo_destroy( + hg_handle_t handle); /** - * Increment ref count on handle. + * Increment ref count on a Mercury handle. * - * \param handle [IN] HG handle + * \param [in] handle Mercury handle * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Ref_incr( - hg_handle_t hg_handle - ); -*/ +hg_return_t margo_ref_incr( + hg_handle_t handle); /** * Get info from handle. * - * \remark Users must call HG_Addr_dup() to safely re-use the addr field. - * - * \param handle [IN] HG handle + * \param [in] handle Mercury handle * * \return Pointer to info or NULL in case of failure */ -/* XXX -HG_EXPORT const struct hg_info * -HG_Get_info( - hg_handle_t handle - ); -*/ +const struct hg_info *margo_get_info( + hg_handle_t handle); /** * Get input from handle (requires registration of input proc to deserialize - * parameters). Input must be freed using HG_Free_input(). + * parameters). Input must be freed using margo_free_input(). * - * \remark This is equivalent to: - * - HG_Core_get_input() - * - Call hg_proc to deserialize parameters - * - * \param handle [IN] HG handle - * \param in_struct [IN/OUT] pointer to input structure + * \param [in] handle Mercury handle + * \param [in/out] in_struct pointer to input structure * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Get_input( - hg_handle_t handle, - void *in_struct - ); -*/ +hg_return_t margo_get_input( + hg_handle_t handle, + void *in_struct); /** * Free resources allocated when deserializing the input. - * User may copy parameters contained in the input structure before calling - * HG_Free_input(). * - * \param handle [IN] HG handle - * \param in_struct [IN/OUT] pointer to input structure + * \param [in] handle Mercury handle + * \param [in/out] in_struct pointer to input structure * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Free_input( +hg_return_t margo_free_input( hg_handle_t handle, - void *in_struct - ); -*/ + void *in_struct); /** * Get output from handle (requires registration of output proc to deserialize - * parameters). Output must be freed using HG_Free_output(). - * - * \remark This is equivalent to: - * - HG_Core_get_output() - * - Call hg_proc to deserialize parameters + * parameters). Output must be freed using margo_free_output(). * - * - * \param handle [IN] HG handle - * \param out_struct [IN/OUT] pointer to output structure + * \param [in] handle Mercury handle + * \param [in/out] out_struct pointer to output structure * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Get_output( - hg_handle_t handle, - void *out_struct - ); -*/ +hg_return_t margo_get_output( + hg_handle_t handle, + void *out_struct); /** * Free resources allocated when deserializing the output. - * User may copy parameters contained in the output structure before calling - * HG_Free_output(). * - * \param handle [IN] HG handle - * \param out_struct [IN/OUT] pointer to input structure + * \param [in] handle Mercury handle + * \param [in/out] out_struct pointer to output structure * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Free_output( - hg_handle_t handle, - void *out_struct - ); -*/ +hg_return_t margo_free_output( + hg_handle_t handle, + void *out_struct); /** * Forward an RPC request to a remote host @@ -460,16 +410,154 @@ hg_return_t margo_respond( /** * Cancel an ongoing operation. * - * \param handle [IN] HG handle + * \param [in] handle Mercury handle * * \return HG_SUCCESS or corresponding HG error code */ -/* XXX -HG_EXPORT hg_return_t -HG_Cancel( - hg_handle_t handle - ); -*/ +hg_return_t margo_cancel( + hg_handle_t handle); + +/** + * Create an abstract bulk handle from specified memory segments. + * Memory allocated is then freed when margo_bulk_free() is called. + * \remark If NULL is passed to buf_ptrs, i.e., + * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim + * memory for the missing buf_ptrs array will be internally allocated. + * + * \param [in] mid Margo instance + * \param [in] count number of segments + * \param [in] buf_ptrs array of pointers + * \param [in] buf_sizes array of sizes + * \param [in] flags permission flag: + * - HG_BULK_READWRITE + * - HG_BULK_READ_ONLY + * - HG_BULK_WRITE_ONLY + * \param [out] handle pointer to returned abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_create( + margo_instance_id mid, + hg_uint32_t count, + void **buf_ptrs, + const hg_size_t *buf_sizes, + hg_uint8_t flags, + hg_bulk_t *handle); + +/** + * Free bulk handle. + * + * \param [in/out] handle abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_free( + hg_bulk_t handle); + +/** + * Increment ref count on bulk handle. + * + * \param handle [IN] abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_ref_incr( + hg_bulk_t handle); + +/** + * Access bulk handle to retrieve memory segments abstracted by handle. + * + * \param [in] handle abstract bulk handle + * \param [in] offset bulk offset + * \param [in] size bulk size + * \param [in] flags permission flag: + * - HG_BULK_READWRITE + * - HG_BULK_READ_ONLY + * \param [in] max_count maximum number of segments to be returned + * \param [in/out] buf_ptrs array of buffer pointers + * \param [in/out] buf_sizes array of buffer sizes + * \param [out] actual_count actual number of segments returned + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_access( + hg_bulk_t handle, + hg_size_t offset, + hg_size_t size, + hg_uint8_t flags, + hg_uint32_t max_count, + void **buf_ptrs, + hg_size_t *buf_sizes, + hg_uint32_t *actual_count); + +/** + * Get total size of data abstracted by bulk handle. + * + * \param [in] handle abstract bulk handle + * + * \return Non-negative value + */ +hg_size_t margo_bulk_get_size( + hg_bulk_t handle); + +/** + * Get total number of segments abstracted by bulk handle. + * + * \param [in] handle abstract bulk handle + * + * \return Non-negative value + */ +hg_uint32_t margo_bulk_get_segment_count( + hg_bulk_t handle); + +/** + * Get size required to serialize bulk handle. + * + * \param [in] handle abstract bulk handle + * \param [in] request_eager boolean (passing HG_TRUE adds size of encoding + * actual data along the handle if handle meets + * HG_BULK_READ_ONLY flag condition) + * + * \return Non-negative value + */ +hg_size_t margo_bulk_get_serialize_size( + hg_bulk_t handle, + hg_bool_t request_eager); + +/** + * Serialize bulk handle into a buffer. + * + * \param [in/out] buf pointer to buffer + * \param [in] buf_size buffer size + * \param [in] request_eager boolean (passing HG_TRUE encodes actual data + * along the handle, which is more efficient for + * small data, this is only valid if bulk handle + * has HG_BULK_READ_ONLY permission) + * \param [in] handle abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_serialize( + void *buf, + hg_size_t buf_size, + hg_bool_t request_eager, + hg_bulk_t handle); + +/** + * Deserialize bulk handle from an existing buffer. + * + * \param [in] mid Margo instance + * \param [out] handle abstract bulk handle + * \param [in] buf pointer to buffer + * \param [in] buf_size buffer size + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_deserialize( + margo_instance_id mid, + hg_bulk_t *handle, + const void *buf, + hg_size_t buf_size); /** * Perform a bulk transfer @@ -481,6 +569,7 @@ HG_Cancel( * @param [in] local_handle local bulk memory handle * @param [in] local_offset offset into local bulk memory to access * @param [in] size size (in bytes) of transfer + * @param [out] op_id pointer to returned operation ID * @returns 0 on success, hg_return_t values on error */ hg_return_t margo_bulk_transfer( @@ -491,9 +580,27 @@ hg_return_t margo_bulk_transfer( size_t origin_offset, hg_bulk_t local_handle, size_t local_offset, - size_t size); + size_t size, + hg_op_id_t *op_id); + +/** + * Cancel an ongoing operation. + * + * \param [in] op_id operation ID + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_cancel( + hg_op_id_t op_id); -/* XXX BULK */ +/** + * Suspends the calling ULT for a specified time duration + * @param [in] mid Margo instance + * @param [in] timeout_ms timeout duration in milliseconds + */ +void margo_thread_sleep( + margo_instance_id mid, + double timeout_ms); /** * Retrieve the abt_handler pool that was associated with the instance at @@ -527,15 +634,6 @@ hg_class_t* margo_get_class(margo_instance_id mid); */ margo_instance_id margo_hg_handle_get_instance(hg_handle_t h); -/** - * Suspends the calling ULT for a specified time duration - * @param [in] mid Margo instance - * @param [in] timeout_ms timeout duration in milliseconds - */ -void margo_thread_sleep( - margo_instance_id mid, - double timeout_ms); - /** * Maps an RPC id and mplex id to the pool that it should execute on * @param [in] mid Margo instance @@ -550,38 +648,23 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT * macro that registers a function as an RPC. */ #define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __rpc_id_ptr) do { \ - hg_return_t __hret; \ - hg_id_t __id; \ - hg_bool_t __flag; \ - int __ret; \ - __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \ - assert(__hret == HG_SUCCESS); \ - if(!__flag) \ - __id = HG_Register_name(margo_get_class(__mid), __func_name,\ - BOOST_PP_CAT(hg_proc_, __in_t),\ - BOOST_PP_CAT(hg_proc_, __out_t),\ - __handler##_handler); \ - __ret = margo_register(__mid, __id); \ - assert(__ret == 0); \ + hg_id_t __id = margo_register_name( \ + __mid, __func_name, \ + BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), \ + __handler##_handler); \ if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \ *(__rpc_id_ptr) = __id; \ } \ } while(0) #define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool, __rpc_id_ptr) do { \ - hg_return_t __hret; \ - hg_id_t __id; \ - hg_bool_t __flag; \ - int __ret; \ - __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \ - assert(__hret == HG_SUCCESS); \ - if(!__flag) \ - __id = HG_Register_name(margo_get_class(__mid), __func_name,\ - BOOST_PP_CAT(hg_proc_, __in_t),\ - BOOST_PP_CAT(hg_proc_, __out_t),\ - __handler##_handler); \ - __ret = margo_register_mplex(__mid, __id, __mplex_id, __pool); \ - assert(__ret == 0); \ + hg_id_t __id = margo_register_name_mplex( \ + __mid, __func_name, \ + BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), \ + __handler##_handler, \ + __mplex_id, __pool); \ if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \ *(__rpc_id_ptr) = __id; \ } \ diff --git a/src/margo.c b/src/margo.c index 171fb47ed33a11173c1c5deb7ece4d06cc7529d8..59aaf62b8799b80bc51b5a201729f9f653285bb5 100644 --- a/src/margo.c +++ b/src/margo.c @@ -317,112 +317,77 @@ void margo_wait_for_finalize(margo_instance_id mid) return; } -/* dedicated thread function to drive Mercury progress */ -static void hg_progress_fn(void* foo) +hg_id_t margo_register_name(margo_instance_id mid, const char *func_name, + hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb) { - int ret; - unsigned int actual_count; - struct margo_instance *mid = (struct margo_instance *)foo; - size_t size; - unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; - double next_timer_exp; - int trigger_happened; - - while(!mid->hg_progress_shutdown_flag) - { - trigger_happened = 0; - do { - ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); - if(ret == HG_SUCCESS && actual_count > 0) - trigger_happened = 1; - } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); + struct margo_rpc_data* margo_data; + hg_return_t hret; + hg_id_t id; - if(trigger_happened) - ABT_thread_yield(); + id = HG_Register_name(mid->hg_class, func_name, in_proc_cb, out_proc_cb, rpc_cb); + if(id <= 0) + return(0); - ABT_pool_get_total_size(mid->progress_pool, &size); - /* Are there any other threads executing in this pool that are *not* - * blocked on margo_wait_for_finalize()? If so then, we can't - * sleep here or else those threads will not get a chance to - * execute. - */ - if(size > mid->waiters_in_progress_pool) + /* register the margo data with the RPC */ + margo_data = (struct margo_rpc_data*)HG_Registered_data(mid->hg_class, id); + if(!margo_data) + { + margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); + if(!margo_data) + return(0); + margo_data->mid = mid; + margo_data->user_data = NULL; + margo_data->user_free_callback = NULL; + hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free); + if(hret != HG_SUCCESS) { - //printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool); - - /* TODO: this is being executed more than is necessary (i.e. - * in cases where there are other legitimate ULTs eligible - * for execution that are not blocking on any events, Margo - * or otherwise). Maybe we need an abt scheduling tweak here - * to make sure that this ULT is the lowest priority in that - * scenario. - */ - ret = HG_Progress(mid->hg_context, 0); - if(ret == HG_SUCCESS) - { - /* Mercury completed something; loop around to trigger - * callbacks - */ - } - else if(ret == HG_TIMEOUT) - { - /* No completion; yield here to allow other ULTs to run */ - ABT_thread_yield(); - } - else - { - /* TODO: error handling */ - fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); - } + free(margo_data); + return(0); } - else - { - hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; - ret = margo_timer_get_next_expiration(mid, &next_timer_exp); - if(ret == 0) - { - /* there is a queued timer, don't block long enough - * to keep this timer waiting - */ - if(next_timer_exp >= 0.0) - { - next_timer_exp *= 1000; /* convert to milliseconds */ - if(next_timer_exp < MERCURY_PROGRESS_TIMEOUT_UB) - hg_progress_timeout = (unsigned int)next_timer_exp; - } - else - { - hg_progress_timeout = 0; - } - } - ret = HG_Progress(mid->hg_context, hg_progress_timeout); - if(ret != HG_SUCCESS && ret != HG_TIMEOUT) - { - /* TODO: error handling */ - fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); - } - } - - /* check for any expired timers */ - margo_check_timers(mid); } - return; + return(id); } -ABT_pool* margo_get_handler_pool(margo_instance_id mid) +hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name, + hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, + uint32_t mplex_id, ABT_pool pool) { - return(&mid->handler_pool); -} + struct mplex_key key; + struct mplex_element *element; + hg_id_t id; -hg_context_t* margo_get_context(margo_instance_id mid) -{ - return(mid->hg_context); + id = margo_register_name(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb); + if(id <= 0) + return(0); + + /* nothing to do, we'll let the handler pool take this directly */ + if(mplex_id == MARGO_DEFAULT_MPLEX_ID) + return(id); + + memset(&key, 0, sizeof(key)); + key.id = id; + key.mplex_id = mplex_id; + + HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element); + if(element) + return(id); + + element = malloc(sizeof(*element)); + if(!element) + return(0); + element->key = key; + element->pool = pool; + + HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element); + + return(id); } -hg_class_t* margo_get_class(margo_instance_id mid) +hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name, + hg_id_t *id, hg_bool_t *flag) { - return(mid->hg_class); + return(HG_Registered_name(mid->hg_class, func_name, id, flag)); } hg_return_t margo_register_data( @@ -432,7 +397,7 @@ hg_return_t margo_register_data( void (*free_callback)(void *)) { struct margo_rpc_data* margo_data - = (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id); + = (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id); if(!margo_data) return HG_OTHER_ERROR; margo_data->user_data = data; margo_data->user_free_callback = free_callback; @@ -447,59 +412,192 @@ void* margo_registered_data(margo_instance_id mid, hg_id_t id) else return data->user_data; } -margo_instance_id margo_hg_handle_get_instance(hg_handle_t h) +hg_return_t margo_registered_disable_response( + margo_instance_id mid, + hg_id_t id, + int disable_flag) { - const struct hg_info* info = HG_Get_info(h); - if(!info) return MARGO_INSTANCE_NULL; - struct margo_rpc_data* data = - (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id); - if(!data) return MARGO_INSTANCE_NULL; - return data->mid; + return(HG_Registered_disable_response(mid->hg_class, id, disable_flag)); } -static hg_return_t margo_cb(const struct hg_cb_info *info) +struct lookup_cb_evt { - hg_return_t hret = info->ret; + hg_return_t nret; + hg_addr_t addr; +}; + +static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) +{ + struct lookup_cb_evt evt; + evt.nret = info->ret; + evt.addr = info->info.lookup.addr; struct margo_cb_arg* arg = info->arg; /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); - + ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); + #if 0 if(arg->in_pool) arg->mid->waiters_in_progress_pool--; #endif - + return(HG_SUCCESS); } -typedef struct +hg_return_t margo_addr_lookup( + margo_instance_id mid, + const char *name, + hg_addr_t *addr) { - hg_handle_t handle; -} margo_forward_timeout_cb_dat; + hg_return_t nret; + struct lookup_cb_evt *evt; + ABT_eventual eventual; + int ret; + struct margo_cb_arg arg; -static void margo_forward_timeout_cb(void *arg) + ret = ABT_eventual_create(sizeof(*evt), &eventual); + if(ret != 0) + { + return(HG_NOMEM_ERROR); + } + + arg.eventual = &eventual; + arg.mid = mid; +#if 0 + if(margo_xstream_is_in_progress_pool(mid)) + { + arg.in_pool = 1; + mid->waiters_in_progress_pool++; + } + else + arg.in_pool = 0; +#endif + nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, + &arg, name, HG_OP_ID_IGNORE); + if(nret == 0) + { + ABT_eventual_wait(eventual, (void**)&evt); + *addr = evt->addr; + nret = evt->nret; + } + + ABT_eventual_free(&eventual); + + return(nret); +} + +hg_return_t margo_addr_free( + margo_instance_id mid, + hg_addr_t addr) { - margo_forward_timeout_cb_dat *timeout_cb_dat = - (margo_forward_timeout_cb_dat *)arg; + return(HG_Addr_free(mid->hg_class, addr)); +} - /* cancel the Mercury op if the forward timed out */ - HG_Cancel(timeout_cb_dat->handle); - return; +hg_return_t margo_addr_self( + margo_instance_id mid, + hg_addr_t *addr) +{ + return(HG_Addr_self(mid->hg_class, addr)); } -hg_return_t margo_forward_timed( +hg_return_t margo_addr_dup( + margo_instance_id mid, + hg_addr_t addr, + hg_addr_t *new_addr) +{ + return(HG_Addr_dup(mid->hg_class, addr, new_addr)); +} + +hg_return_t margo_addr_to_string( margo_instance_id mid, + char *buf, + hg_size_t *buf_size, + hg_addr_t addr) +{ + return(HG_Addr_to_string(mid->hg_class, buf, buf_size, addr)); +} + +hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr, + hg_id_t id, hg_handle_t *handle) +{ + /* TODO: handle caching logic? */ + + return(HG_Create(mid->hg_context, addr, id, handle)); +} + +hg_return_t margo_destroy(hg_handle_t handle) +{ + /* TODO handle caching logic? */ + + return(HG_Destroy(handle)); +} + +hg_return_t margo_ref_incr( + hg_handle_t handle) +{ + return(HG_Ref_incr(handle)); +} + +const struct hg_info *margo_get_info( + hg_handle_t handle) +{ + return(HG_Get_info(handle)); +} + +hg_return_t margo_get_input( hg_handle_t handle, - void *in_struct, - double timeout_ms) + void *in_struct) { - int ret; - hg_return_t hret; + return(HG_Get_input(handle, in_struct)); +} + +hg_return_t margo_free_input( + hg_handle_t handle, + void *in_struct) +{ + return(HG_Free_input(handle, in_struct)); +} + +hg_return_t margo_get_output( + hg_handle_t handle, + void *out_struct) +{ + return(HG_Get_output(handle, out_struct)); +} + +hg_return_t margo_free_output( + hg_handle_t handle, + void *out_struct) +{ + return(HG_Free_output(handle, out_struct)); +} + + +static hg_return_t margo_cb(const struct hg_cb_info *info) +{ + hg_return_t hret = info->ret; + struct margo_cb_arg* arg = info->arg; + + /* propagate return code out through eventual */ + ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); + +#if 0 + if(arg->in_pool) + arg->mid->waiters_in_progress_pool--; +#endif + + return(HG_SUCCESS); +} + +hg_return_t margo_forward( + margo_instance_id mid, + hg_handle_t handle, + void *in_struct) +{ + hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; + int ret; hg_return_t* waited_hret; - margo_timer_t forward_timer; - margo_forward_timeout_cb_dat timeout_cb_dat; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -508,11 +606,6 @@ hg_return_t margo_forward_timed( return(HG_NOMEM_ERROR); } - /* set a timer object to expire when this forward times out */ - timeout_cb_dat.handle = handle; - margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, - &timeout_cb_dat, timeout_ms); - arg.eventual = &eventual; arg.mid = mid; #if 0 @@ -531,29 +624,38 @@ hg_return_t margo_forward_timed( hret = *waited_hret; } - /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */ - if(hret == HG_CANCELED) - hret = HG_TIMEOUT; - - /* remove timer if it is still in place (i.e., not timed out) */ - if(hret != HG_TIMEOUT) - margo_timer_destroy(mid, &forward_timer); - ABT_eventual_free(&eventual); return(hret); } +typedef struct +{ + hg_handle_t handle; +} margo_forward_timeout_cb_dat; -hg_return_t margo_forward( +static void margo_forward_timeout_cb(void *arg) +{ + margo_forward_timeout_cb_dat *timeout_cb_dat = + (margo_forward_timeout_cb_dat *)arg; + + /* cancel the Mercury op if the forward timed out */ + HG_Cancel(timeout_cb_dat->handle); + return; +} + +hg_return_t margo_forward_timed( margo_instance_id mid, hg_handle_t handle, - void *in_struct) + void *in_struct, + double timeout_ms) { - hg_return_t hret = HG_TIMEOUT; - ABT_eventual eventual; int ret; + hg_return_t hret; + ABT_eventual eventual; hg_return_t* waited_hret; + margo_timer_t forward_timer; + margo_forward_timeout_cb_dat timeout_cb_dat; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -562,6 +664,11 @@ hg_return_t margo_forward( return(HG_NOMEM_ERROR); } + /* set a timer object to expire when this forward times out */ + timeout_cb_dat.handle = handle; + margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, + &timeout_cb_dat, timeout_ms); + arg.eventual = &eventual; arg.mid = mid; #if 0 @@ -580,6 +687,14 @@ hg_return_t margo_forward( hret = *waited_hret; } + /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */ + if(hret == HG_CANCELED) + hret = HG_TIMEOUT; + + /* remove timer if it is still in place (i.e., not timed out) */ + if(hret != HG_TIMEOUT) + margo_timer_destroy(mid, &forward_timer); + ABT_eventual_free(&eventual); return(hret); @@ -625,86 +740,103 @@ hg_return_t margo_respond( return(hret); } +hg_return_t margo_cancel( + hg_handle_t handle) +{ + return(HG_Cancel(handle)); +} -static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) +hg_return_t margo_bulk_create( + margo_instance_id mid, + hg_uint32_t count, + void **buf_ptrs, + const hg_size_t *buf_sizes, + hg_uint8_t flags, + hg_bulk_t *handle) { - hg_return_t hret = info->ret; - struct margo_cb_arg* arg = info->arg; + /* XXX: handle caching logic? */ - /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); - - if(arg->in_pool) - arg->mid->waiters_in_progress_pool--; + return(HG_Bulk_create(mid->hg_class, count, + buf_ptrs, buf_sizes, flags, handle)); +} - return(HG_SUCCESS); +hg_return_t margo_bulk_free( + hg_bulk_t handle) +{ + /* XXX: handle caching logic? */ + + return(HG_Bulk_free(handle)); } -struct lookup_cb_evt +hg_return_t margo_bulk_ref_incr( + hg_bulk_t handle) { - hg_return_t nret; - hg_addr_t addr; -}; + return(HG_Bulk_ref_incr(handle)); +} -static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) +hg_return_t margo_bulk_access( + hg_bulk_t handle, + hg_size_t offset, + hg_size_t size, + hg_uint8_t flags, + hg_uint32_t max_count, + void **buf_ptrs, + hg_size_t *buf_sizes, + hg_uint32_t *actual_count) { - struct lookup_cb_evt evt; - evt.nret = info->ret; - evt.addr = info->info.lookup.addr; - struct margo_cb_arg* arg = info->arg; + return(HG_Bulk_access(handle, offset, size, flags, max_count, + buf_ptrs, buf_sizes, actual_count)); +} - /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); +hg_size_t margo_bulk_get_size( + hg_bulk_t handle) +{ + return(HG_Bulk_get_size(handle)); +} -#if 0 - if(arg->in_pool) - arg->mid->waiters_in_progress_pool--; -#endif - - return(HG_SUCCESS); +hg_uint32_t margo_bulk_get_segment_count( + hg_bulk_t handle) +{ + return(HG_Bulk_get_segment_count(handle)); } +hg_size_t margo_bulk_get_serialize_size( + hg_bulk_t handle, + hg_bool_t request_eager) +{ + return(HG_Bulk_get_serialize_size(handle, request_eager)); +} -hg_return_t margo_addr_lookup( - margo_instance_id mid, - const char *name, - hg_addr_t *addr) +hg_return_t margo_bulk_serialize( + void *buf, + hg_size_t buf_size, + hg_bool_t request_eager, + hg_bulk_t handle) { - hg_return_t nret; - struct lookup_cb_evt *evt; - ABT_eventual eventual; - int ret; - struct margo_cb_arg arg; + return(HG_Bulk_serialize(buf, buf_size, request_eager, handle)); +} - ret = ABT_eventual_create(sizeof(*evt), &eventual); - if(ret != 0) - { - return(HG_NOMEM_ERROR); - } +hg_return_t margo_bulk_deserialize( + margo_instance_id mid, + hg_bulk_t *handle, + const void *buf, + hg_size_t buf_size) +{ + return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); +} - arg.eventual = &eventual; - arg.mid = mid; -#if 0 - if(margo_xstream_is_in_progress_pool(mid)) - { - arg.in_pool = 1; - mid->waiters_in_progress_pool++; - } - else - arg.in_pool = 0; -#endif - nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, - &arg, name, HG_OP_ID_IGNORE); - if(nret == 0) - { - ABT_eventual_wait(eventual, (void**)&evt); - *addr = evt->addr; - nret = evt->nret; - } +static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) +{ + hg_return_t hret = info->ret; + struct margo_cb_arg* arg = info->arg; - ABT_eventual_free(&eventual); + /* propagate return code out through eventual */ + ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); + + if(arg->in_pool) + arg->mid->waiters_in_progress_pool--; - return(nret); + return(HG_SUCCESS); } hg_return_t margo_bulk_transfer( @@ -715,7 +847,8 @@ hg_return_t margo_bulk_transfer( size_t origin_offset, hg_bulk_t local_handle, size_t local_offset, - size_t size) + size_t size, + hg_op_id_t *op_id) { hg_return_t hret = HG_TIMEOUT; hg_return_t *waited_hret; @@ -740,7 +873,7 @@ hg_return_t margo_bulk_transfer( arg.in_pool = 0; hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, &arg, op, origin_addr, origin_handle, origin_offset, local_handle, - local_offset, size, HG_OP_ID_IGNORE); + local_offset, size, op_id); if(hret == 0) { ABT_eventual_wait(eventual, (void**)&waited_hret); @@ -752,6 +885,13 @@ hg_return_t margo_bulk_transfer( return(hret); } +hg_return_t margo_bulk_cancel( + hg_op_id_t op_id) +{ + return(HG_Bulk_cancel(op_id)); +} + +/* returns 1 if current xstream is in the progress pool, 0 if not */ typedef struct { margo_instance_id mid; @@ -817,31 +957,29 @@ void margo_thread_sleep( return; } -/* returns 1 if current xstream is in the progress pool, 0 if not */ -static int margo_xstream_is_in_progress_pool(margo_instance_id mid) +ABT_pool* margo_get_handler_pool(margo_instance_id mid) { - int ret; - ABT_xstream xstream; - ABT_pool pool; + return(&mid->handler_pool); +} - ret = ABT_xstream_self(&xstream); - assert(ret == ABT_SUCCESS); - ret = ABT_xstream_get_main_pools(xstream, 1, &pool); - assert(ret == ABT_SUCCESS); +hg_context_t* margo_get_context(margo_instance_id mid) +{ + return(mid->hg_context); +} - if(pool == mid->progress_pool) - return(1); - else - return(0); +hg_class_t* margo_get_class(margo_instance_id mid) +{ + return(mid->hg_class); } -static void margo_rpc_data_free(void* ptr) +margo_instance_id margo_hg_handle_get_instance(hg_handle_t h) { - struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; - if(data->user_data && data->user_free_callback) { - data->user_free_callback(data->user_data); - } - free(ptr); + const struct hg_info* info = HG_Get_info(h); + if(!info) return MARGO_INSTANCE_NULL; + struct margo_rpc_data* data = + (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id); + if(!data) return MARGO_INSTANCE_NULL; + return data->mid; } int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool) @@ -870,50 +1008,121 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT return(0); } -int margo_register(margo_instance_id mid, hg_id_t id) +static int margo_xstream_is_in_progress_pool(margo_instance_id mid) { - /* register the margo data with the RPC */ - struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); - margo_data->mid = mid; - margo_data->user_data = NULL; - margo_data->user_free_callback = NULL; - hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free); - return ret; + int ret; + ABT_xstream xstream; + ABT_pool pool; + + ret = ABT_xstream_self(&xstream); + assert(ret == ABT_SUCCESS); + ret = ABT_xstream_get_main_pools(xstream, 1, &pool); + assert(ret == ABT_SUCCESS); + + if(pool == mid->progress_pool) + return(1); + else + return(0); } -int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool) +static void margo_rpc_data_free(void* ptr) { - struct mplex_key key; - struct mplex_element *element; + struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; + if(data->user_data && data->user_free_callback) { + data->user_free_callback(data->user_data); + } + free(ptr); +} - /* register the margo data with the RPC */ - struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); - margo_data->mid = mid; - margo_data->user_data = NULL; - margo_data->user_free_callback = NULL; - hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free); - if(ret != HG_SUCCESS) - return ret; +/* dedicated thread function to drive Mercury progress */ +static void hg_progress_fn(void* foo) +{ + int ret; + unsigned int actual_count; + struct margo_instance *mid = (struct margo_instance *)foo; + size_t size; + unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; + double next_timer_exp; + int trigger_happened; - /* nothing to do, we'll let the handler pool take this directly */ - if(mplex_id == MARGO_DEFAULT_MPLEX_ID) - return(0); + while(!mid->hg_progress_shutdown_flag) + { + trigger_happened = 0; + do { + ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); + if(ret == HG_SUCCESS && actual_count > 0) + trigger_happened = 1; + } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); - memset(&key, 0, sizeof(key)); - key.id = id; - key.mplex_id = mplex_id; + if(trigger_happened) + ABT_thread_yield(); - HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element); - if(element) - return(0); + ABT_pool_get_total_size(mid->progress_pool, &size); + /* Are there any other threads executing in this pool that are *not* + * blocked on margo_wait_for_finalize()? If so then, we can't + * sleep here or else those threads will not get a chance to + * execute. + */ + if(size > mid->waiters_in_progress_pool) + { + //printf("DEBUG: Margo progress function running while other ULTs are eligible for execution (size: %d, waiters: %d.\n", size, mid->waiters_in_progress_pool); - element = malloc(sizeof(*element)); - if(!element) - return(-1); - element->key = key; - element->pool = pool; + /* TODO: this is being executed more than is necessary (i.e. + * in cases where there are other legitimate ULTs eligible + * for execution that are not blocking on any events, Margo + * or otherwise). Maybe we need an abt scheduling tweak here + * to make sure that this ULT is the lowest priority in that + * scenario. + */ + ret = HG_Progress(mid->hg_context, 0); + if(ret == HG_SUCCESS) + { + /* Mercury completed something; loop around to trigger + * callbacks + */ + } + else if(ret == HG_TIMEOUT) + { + /* No completion; yield here to allow other ULTs to run */ + ABT_thread_yield(); + } + else + { + /* TODO: error handling */ + fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); + } + } + else + { + hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; + ret = margo_timer_get_next_expiration(mid, &next_timer_exp); + if(ret == 0) + { + /* there is a queued timer, don't block long enough + * to keep this timer waiting + */ + if(next_timer_exp >= 0.0) + { + next_timer_exp *= 1000; /* convert to milliseconds */ + if(next_timer_exp < MERCURY_PROGRESS_TIMEOUT_UB) + hg_progress_timeout = (unsigned int)next_timer_exp; + } + else + { + hg_progress_timeout = 0; + } + } + ret = HG_Progress(mid->hg_context, hg_progress_timeout); + if(ret != HG_SUCCESS && ret != HG_TIMEOUT) + { + /* TODO: error handling */ + fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); + } + } - HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element); + /* check for any expired timers */ + margo_check_timers(mid); + } - return(0); + return; }