Commit 881d3b85 authored by Shane Snyder's avatar Shane Snyder
Browse files

cleanup mercury input/output inside of swim

parent d369c043
...@@ -82,6 +82,7 @@ void swim_register_ping_rpcs( ...@@ -82,6 +82,7 @@ void swim_register_ping_rpcs(
swim_iping_resp_t, swim_iping_recv_ult_handler); swim_iping_resp_t, swim_iping_recv_ult_handler);
/* register swim context data structure with each RPC type */ /* register swim context data structure with each RPC type */
/* TODO: this won't work */
HG_Register_data(hg_cls, dping_rpc_id, g, NULL); HG_Register_data(hg_cls, dping_rpc_id, g, NULL);
HG_Register_data(hg_cls, iping_rpc_id, g, NULL); HG_Register_data(hg_cls, iping_rpc_id, g, NULL);
...@@ -153,8 +154,7 @@ static int swim_send_dping( ...@@ -153,8 +154,7 @@ static int swim_send_dping(
if (hret == HG_SUCCESS) if (hret == HG_SUCCESS)
{ {
hret = HG_Get_output(handle, &dping_resp); hret = HG_Get_output(handle, &dping_resp);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS) goto fini;
return(ret);
SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id); SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == target); assert(dping_resp.msg.source_id == target);
...@@ -162,6 +162,7 @@ static int swim_send_dping( ...@@ -162,6 +162,7 @@ static int swim_send_dping(
/* extract target's membership state from response */ /* extract target's membership state from response */
swim_unpack_message(g, &(dping_resp.msg)); swim_unpack_message(g, &(dping_resp.msg));
HG_Free_output(handle, &dping_resp);
ret = 0; ret = 0;
} }
else if(hret != HG_TIMEOUT) else if(hret != HG_TIMEOUT)
...@@ -169,6 +170,7 @@ static int swim_send_dping( ...@@ -169,6 +170,7 @@ static int swim_send_dping(
SSG_DEBUG(g, "SWIM: dping req error from %d (err=%d)\n", (int)target, hret); SSG_DEBUG(g, "SWIM: dping req error from %d (err=%d)\n", (int)target, hret);
} }
fini:
HG_Destroy(handle); HG_Destroy(handle);
return(ret); return(ret);
} }
...@@ -184,16 +186,14 @@ static void swim_dping_recv_ult(hg_handle_t handle) ...@@ -184,16 +186,14 @@ static void swim_dping_recv_ult(hg_handle_t handle)
/* get ssg & swim state */ /* get ssg & swim state */
info = HG_Get_info(handle); info = HG_Get_info(handle);
if(info == NULL) if(info == NULL) goto fini;
return;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id); g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
assert(g != NULL); assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx; swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL); assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &dping_req); hret = HG_Get_input(handle, &dping_req);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS) goto fini;
return;
SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id); SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
...@@ -208,6 +208,8 @@ static void swim_dping_recv_ult(hg_handle_t handle) ...@@ -208,6 +208,8 @@ static void swim_dping_recv_ult(hg_handle_t handle)
/* respond to sender of the dping req */ /* respond to sender of the dping req */
margo_respond(ssg_mid, handle, &dping_resp); margo_respond(ssg_mid, handle, &dping_resp);
HG_Free_input(handle, &dping_req);
fini:
HG_Destroy(handle); HG_Destroy(handle);
return; return;
} }
...@@ -273,8 +275,7 @@ void swim_iping_send_ult( ...@@ -273,8 +275,7 @@ void swim_iping_send_ult(
if (hret == HG_SUCCESS) if (hret == HG_SUCCESS)
{ {
hret = HG_Get_output(handle, &iping_resp); hret = HG_Get_output(handle, &iping_resp);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS) goto fini;
return;
SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n", SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
(int)iping_resp.msg.source_id, (int)swim_ctx->ping_target); (int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
...@@ -288,6 +289,8 @@ void swim_iping_send_ult( ...@@ -288,6 +289,8 @@ void swim_iping_send_ult(
*/ */
if(swim_ctx->ping_target == iping_req.target_id) if(swim_ctx->ping_target == iping_req.target_id)
swim_ctx->ping_target_acked = 1; swim_ctx->ping_target_acked = 1;
HG_Free_output(handle, &iping_resp);
} }
else if(hret != HG_TIMEOUT) else if(hret != HG_TIMEOUT)
{ {
...@@ -295,6 +298,7 @@ void swim_iping_send_ult( ...@@ -295,6 +298,7 @@ void swim_iping_send_ult(
(int)my_subgroup_member, hret, (int)swim_ctx->ping_target); (int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
} }
fini:
HG_Destroy(handle); HG_Destroy(handle);
return; return;
} }
...@@ -311,16 +315,14 @@ static void swim_iping_recv_ult(hg_handle_t handle) ...@@ -311,16 +315,14 @@ static void swim_iping_recv_ult(hg_handle_t handle)
/* get the swim state */ /* get the swim state */
info = HG_Get_info(handle); info = HG_Get_info(handle);
if(info == NULL) if(info == NULL) goto fini;
return;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id); g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
assert(g != NULL); assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx; swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL); assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &iping_req); hret = HG_Get_input(handle, &iping_req);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS) goto fini;
return;
SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n", SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
(int)iping_req.msg.source_id, (int)iping_req.target_id); (int)iping_req.msg.source_id, (int)iping_req.target_id);
...@@ -344,6 +346,8 @@ static void swim_iping_recv_ult(hg_handle_t handle) ...@@ -344,6 +346,8 @@ static void swim_iping_recv_ult(hg_handle_t handle)
margo_respond(ssg_mid, handle, &iping_resp); margo_respond(ssg_mid, handle, &iping_resp);
} }
HG_Free_input(handle, &iping_req);
fini:
HG_Destroy(handle); HG_Destroy(handle);
return; return;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment