Commit bc808eb6 authored by Shane Snyder's avatar Shane Snyder
Browse files

port swim code to new margo api

parent e435e86b
......@@ -11,7 +11,6 @@
#include <assert.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
#include "ssg.h"
......@@ -72,18 +71,16 @@ static hg_id_t swim_iping_rpc_id;
void swim_register_ping_rpcs(
ssg_group_t *g)
{
hg_class_t *hg_cls = margo_get_class(ssg_inst->mid);
/* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult_handler);
swim_iping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult_handler);
swim_dping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult);
swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult);
/* register swim context data structure with each RPC type */
/* XXX: this won't work for multiple groups ... */
HG_Register_data(hg_cls, swim_dping_rpc_id, g, NULL);
HG_Register_data(hg_cls, swim_iping_rpc_id, g, NULL);
margo_register_data(ssg_inst->mid, swim_dping_rpc_id, g, NULL);
margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL);
return;
}
......@@ -137,8 +134,7 @@ static int swim_send_dping(
if(target_addr == HG_ADDR_NULL)
return(ret);
hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_dping_rpc_id,
&handle);
hret = margo_create(ssg_inst->mid, target_addr, swim_dping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return(ret);
......@@ -152,7 +148,7 @@ static int swim_send_dping(
swim_ctx->prot_period_len);
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &dping_resp);
hret = margo_get_output(handle, &dping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
......@@ -161,7 +157,7 @@ static int swim_send_dping(
/* extract target's membership state from response */
swim_unpack_message(g, &(dping_resp.msg));
HG_Free_output(handle, &dping_resp);
margo_free_output(handle, &dping_resp);
ret = 0;
}
else if(hret != HG_TIMEOUT)
......@@ -170,7 +166,7 @@ static int swim_send_dping(
}
fini:
HG_Destroy(handle);
margo_destroy(ssg_inst->mid, handle);
return(ret);
}
......@@ -178,20 +174,17 @@ static void swim_dping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
swim_context_t *swim_ctx;
const struct hg_info *info;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
/* get ssg & swim state */
info = HG_Get_info(handle);
if(info == NULL) goto fini;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, swim_dping_rpc_id);
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &dping_req);
hret = margo_get_input(handle, &dping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
......@@ -207,9 +200,9 @@ static void swim_dping_recv_ult(hg_handle_t handle)
/* respond to sender of the dping req */
margo_respond(ssg_inst->mid, handle, &dping_resp);
HG_Free_input(handle, &dping_req);
margo_free_input(handle, &dping_req);
fini:
HG_Destroy(handle);
margo_destroy(ssg_inst->mid, handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
......@@ -250,8 +243,7 @@ void swim_iping_send_ult(
if(target_addr == HG_ADDR_NULL)
return;
hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_iping_rpc_id,
&handle);
hret = margo_create(ssg_inst->mid, target_addr, swim_iping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return;
......@@ -273,7 +265,7 @@ void swim_iping_send_ult(
(swim_ctx->prot_period_len - swim_ctx->dping_timeout));
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &iping_resp);
hret = margo_get_output(handle, &iping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
......@@ -289,7 +281,7 @@ void swim_iping_send_ult(
if(swim_ctx->ping_target == iping_req.target_id)
swim_ctx->ping_target_acked = 1;
HG_Free_output(handle, &iping_resp);
margo_free_output(handle, &iping_resp);
}
else if(hret != HG_TIMEOUT)
{
......@@ -298,7 +290,7 @@ void swim_iping_send_ult(
}
fini:
HG_Destroy(handle);
margo_destroy(ssg_inst->mid, handle);
return;
}
......@@ -306,21 +298,18 @@ static void swim_iping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
swim_context_t *swim_ctx;
const struct hg_info *info;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
int ret;
/* get the swim state */
info = HG_Get_info(handle);
if(info == NULL) goto fini;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, swim_dping_rpc_id);
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &iping_req);
hret = margo_get_input(handle, &iping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
......@@ -345,9 +334,9 @@ static void swim_iping_recv_ult(hg_handle_t handle)
margo_respond(ssg_inst->mid, handle, &iping_resp);
}
HG_Free_input(handle, &iping_req);
margo_free_input(handle, &iping_req);
fini:
HG_Destroy(handle);
margo_destroy(ssg_inst->mid, handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment