Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
margo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
17
Issues
17
List
Boards
Labels
Milestones
Merge Requests
2
Merge Requests
2
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
sds
margo
Commits
5a1d56d8
Commit
5a1d56d8
authored
Aug 25, 2017
by
Shane Snyder
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add handle caching code
parent
070b55a7
Changes
18
Hide whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
178 additions
and
36 deletions
+178
-36
examples/composition/composed-benchmark.c
examples/composition/composed-benchmark.c
+2
-2
examples/composition/composed-client-lib.c
examples/composition/composed-client-lib.c
+2
-2
examples/composition/composed-svc-daemon.c
examples/composition/composed-svc-daemon.c
+1
-1
examples/composition/data-xfer-service.c
examples/composition/data-xfer-service.c
+1
-1
examples/composition/delegator-service.c
examples/composition/delegator-service.c
+2
-2
examples/margo-example-client.c
examples/margo-example-client.c
+2
-2
examples/multiplex/margo-example-mp-client.c
examples/multiplex/margo-example-mp-client.c
+1
-1
examples/multiplex/margo-example-mp-server.c
examples/multiplex/margo-example-mp-server.c
+1
-1
examples/multiplex/svc1-client.c
examples/multiplex/svc1-client.c
+2
-2
examples/multiplex/svc1-server.c
examples/multiplex/svc1-server.c
+2
-2
examples/multiplex/svc2-client.c
examples/multiplex/svc2-client.c
+2
-2
examples/multiplex/svc2-server.c
examples/multiplex/svc2-server.c
+2
-2
examples/my-rpc.c
examples/my-rpc.c
+2
-2
include/margo.h
include/margo.h
+3
-1
src/margo.c
src/margo.c
+146
-6
tests/margo-test-client-timeout.c
tests/margo-test-client-timeout.c
+2
-2
tests/margo-test-client.c
tests/margo-test-client.c
+2
-2
tests/my-rpc.c
tests/my-rpc.c
+3
-3
No files found.
examples/composition/composed-benchmark.c
View file @
5a1d56d8
...
...
@@ -124,7 +124,7 @@ int main(int argc, char **argv)
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
if
(
strcmp
(
argv
[
1
],
argv
[
2
]))
{
sleep
(
3
);
...
...
@@ -133,7 +133,7 @@ int main(int argc, char **argv)
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
}
margo_addr_free
(
mid
,
delegator_svr_addr
);
...
...
examples/composition/composed-client-lib.c
View file @
5a1d56d8
...
...
@@ -72,7 +72,7 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
return
;
}
...
...
@@ -122,7 +122,7 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_addr_free
(
mid
,
addr_self
);
return
;
...
...
examples/composition/composed-svc-daemon.c
View file @
5a1d56d8
...
...
@@ -36,7 +36,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
examples/composition/data-xfer-service.c
View file @
5a1d56d8
...
...
@@ -66,7 +66,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
hret
=
margo_respond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
return
;
}
...
...
examples/composition/delegator-service.c
View file @
5a1d56d8
...
...
@@ -74,8 +74,8 @@ static void delegator_read_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_addr_free
(
mid
,
data_xfer_svc_addr
);
margo_destroy
(
handle
);
margo_destroy
(
handle_relay
);
margo_destroy
(
mid
,
handle
);
margo_destroy
(
mid
,
handle_relay
);
return
;
}
...
...
examples/margo-example-client.c
View file @
5a1d56d8
...
...
@@ -143,7 +143,7 @@ int main(int argc, char **argv)
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_addr_free
(
mid
,
svr_addr
);
/* shut down everything */
...
...
@@ -196,7 +196,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free
(
in
.
bulk_handle
);
margo_free_output
(
handle
,
&
out
);
margo_destroy
(
handle
);
margo_destroy
(
arg
->
mid
,
handle
);
free
(
buffer
);
printf
(
"ULT [%d] done.
\n
"
,
arg
->
val
);
...
...
examples/multiplex/margo-example-mp-client.c
View file @
5a1d56d8
...
...
@@ -76,7 +76,7 @@ int main(int argc, char **argv)
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_addr_free
(
mid
,
svr_addr
);
/* shut down everything */
...
...
examples/multiplex/margo-example-mp-server.c
View file @
5a1d56d8
...
...
@@ -34,7 +34,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
examples/multiplex/svc1-client.c
View file @
5a1d56d8
...
...
@@ -70,7 +70,7 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -116,7 +116,7 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
examples/multiplex/svc1-server.c
View file @
5a1d56d8
...
...
@@ -60,7 +60,7 @@ static void svc1_do_thing_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -118,7 +118,7 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
examples/multiplex/svc2-client.c
View file @
5a1d56d8
...
...
@@ -69,7 +69,7 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -115,7 +115,7 @@ void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
/* clean up resources consumed by this rpc */
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
examples/multiplex/svc2-server.c
View file @
5a1d56d8
...
...
@@ -60,7 +60,7 @@ static void svc2_do_thing_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -118,7 +118,7 @@ static void svc2_do_other_thing_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
examples/my-rpc.c
View file @
5a1d56d8
...
...
@@ -80,7 +80,7 @@ static void my_rpc_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -104,7 +104,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_diag_dump
(
mid
,
"-"
,
0
);
...
...
include/margo.h
View file @
5a1d56d8
...
...
@@ -292,11 +292,13 @@ hg_return_t margo_create(
/**
* Destroy Mercury handle.
*
* \param [in] handle Mercury handle
* \param [in] mid Margo instance
* \param [in] handle Mercury handle
*
* \return HG_SUCCESS or corresponding HG error code
*/
hg_return_t
margo_destroy
(
margo_instance_id
mid
,
hg_handle_t
handle
);
/**
...
...
src/margo.c
View file @
5a1d56d8
...
...
@@ -19,6 +19,7 @@
#include "uthash.h"
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100
/* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
struct
mplex_key
{
...
...
@@ -49,9 +50,16 @@ do {\
if((__time) < __data.min) __data.min = (__time); \
} while(0)
struct
margo_handle_cache_el
{
hg_handle_t
handle
;
UT_hash_handle
hh
;
/* in-use hash link */
struct
margo_handle_cache_el
*
next
;
/* free list link */
};
struct
margo_instance
{
/*
provided by caller
*/
/*
mercury/argobots state
*/
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
ABT_pool
handler_pool
;
...
...
@@ -76,6 +84,10 @@ struct margo_instance
/* hash table to track multiplexed rpcs registered with margo */
struct
mplex_element
*
mplex_table
;
/* linked list of free hg handles and a hash of in-use handles */
struct
margo_handle_cache_el
*
free_handle_list
;
struct
margo_handle_cache_el
*
used_handle_hash
;
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
...
...
@@ -105,6 +117,13 @@ struct margo_rpc_data
static
void
hg_progress_fn
(
void
*
foo
);
static
void
margo_rpc_data_free
(
void
*
ptr
);
static
hg_return_t
margo_handle_cache_init
(
margo_instance_id
mid
);
static
void
margo_handle_cache_destroy
(
margo_instance_id
mid
);
static
hg_return_t
margo_handle_cache_get
(
margo_instance_id
mid
,
hg_addr_t
addr
,
hg_id_t
id
,
hg_handle_t
*
handle
);
static
hg_return_t
margo_handle_cache_put
(
margo_instance_id
mid
,
hg_handle_t
handle
);
margo_instance_id
margo_init
(
const
char
*
addr_str
,
int
mode
,
int
use_progress_thread
,
int
rpc_thread_count
)
{
...
...
@@ -216,6 +235,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t
*
hg_context
)
{
int
ret
;
hg_return_t
hret
;
struct
margo_instance
*
mid
;
mid
=
malloc
(
sizeof
(
*
mid
));
...
...
@@ -235,6 +255,10 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
ret
=
margo_timer_instance_init
(
mid
);
if
(
ret
!=
0
)
goto
err
;
/* initialize the handle cache */
hret
=
margo_handle_cache_init
(
mid
);
if
(
hret
!=
HG_SUCCESS
)
goto
err
;
ret
=
ABT_thread_create
(
mid
->
progress_pool
,
hg_progress_fn
,
mid
,
ABT_THREAD_ATTR_NULL
,
&
mid
->
hg_progress_tid
);
if
(
ret
!=
0
)
goto
err
;
...
...
@@ -244,6 +268,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
err:
if
(
mid
)
{
margo_handle_cache_destroy
(
mid
);
margo_timer_instance_finalize
(
mid
);
ABT_mutex_free
(
&
mid
->
finalize_mutex
);
ABT_cond_free
(
&
mid
->
finalize_cond
);
...
...
@@ -277,6 +302,8 @@ static void margo_cleanup(margo_instance_id mid)
free
(
mid
->
rpc_xstreams
);
}
margo_handle_cache_destroy
(
mid
);
if
(
mid
->
margo_init
)
{
if
(
mid
->
hg_context
)
...
...
@@ -530,16 +557,32 @@ hg_return_t margo_addr_to_string(
hg_return_t
margo_create
(
margo_instance_id
mid
,
hg_addr_t
addr
,
hg_id_t
id
,
hg_handle_t
*
handle
)
{
/* TODO: handle caching logic? */
hg_return_t
hret
;
/* look for a handle to reuse */
hret
=
margo_handle_cache_get
(
mid
,
addr
,
id
,
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
/* else try creating a new handle */
hret
=
HG_Create
(
mid
->
hg_context
,
addr
,
id
,
handle
);
}
return
(
HG_Create
(
mid
->
hg_context
,
addr
,
id
,
handle
))
;
return
hret
;
}
hg_return_t
margo_destroy
(
hg_handle_t
handle
)
hg_return_t
margo_destroy
(
margo_instance_id
mid
,
hg_handle_t
handle
)
{
/* TODO handle caching logic? */
hg_return_t
hret
;
/* recycle this handle if it came from the handle cache */
hret
=
margo_handle_cache_put
(
mid
,
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
/* else destroy the handle manually */
hret
=
HG_Destroy
(
handle
);
}
return
(
HG_Destroy
(
handle
))
;
return
hret
;
}
hg_return_t
margo_ref_incr
(
...
...
@@ -1216,3 +1259,100 @@ void margo_get_param(margo_instance_id mid, int option, void *param)
return
;
}
static
hg_return_t
margo_handle_cache_init
(
margo_instance_id
mid
)
{
int
i
;
struct
margo_handle_cache_el
*
el
;
hg_return_t
hret
=
HG_SUCCESS
;
for
(
i
=
0
;
i
<
DEFAULT_MERCURY_HANDLE_CACHE_SIZE
;
i
++
)
{
el
=
malloc
(
sizeof
(
*
el
));
if
(
!
el
)
{
hret
=
HG_NOMEM_ERROR
;
margo_handle_cache_destroy
(
mid
);
break
;
}
/* create handle with NULL_ADDRs, we will reset later to valid addrs */
hret
=
HG_Create
(
mid
->
hg_context
,
HG_ADDR_NULL
,
0
,
&
el
->
handle
);
if
(
hret
!=
HG_SUCCESS
)
{
free
(
el
);
margo_handle_cache_destroy
(
mid
);
break
;
}
/* add to the free list */
LL_PREPEND
(
mid
->
free_handle_list
,
el
);
}
return
hret
;
}
static
void
margo_handle_cache_destroy
(
margo_instance_id
mid
)
{
struct
margo_handle_cache_el
*
el
,
*
tmp
;
/* only free handle list elements -- handles in hash are still in use */
LL_FOREACH_SAFE
(
mid
->
free_handle_list
,
el
,
tmp
)
{
LL_DELETE
(
mid
->
free_handle_list
,
el
);
HG_Destroy
(
el
->
handle
);
free
(
el
);
}
return
;
}
static
hg_return_t
margo_handle_cache_get
(
margo_instance_id
mid
,
hg_addr_t
addr
,
hg_id_t
id
,
hg_handle_t
*
handle
)
{
struct
margo_handle_cache_el
*
el
;
hg_return_t
hret
;
if
(
!
mid
->
free_handle_list
)
{
/* if no available handles, just fall through */
return
HG_OTHER_ERROR
;
}
/* pop first element from the free handle list */
el
=
mid
->
free_handle_list
;
LL_DELETE
(
mid
->
free_handle_list
,
el
);
/* reset handle */
hret
=
HG_Reset
(
el
->
handle
,
addr
,
id
);
if
(
hret
==
HG_SUCCESS
)
{
/* put on in-use list and pass back handle */
HASH_ADD
(
hh
,
mid
->
used_handle_hash
,
handle
,
sizeof
(
hg_handle_t
),
el
);
*
handle
=
el
->
handle
;
}
return
hret
;
}
static
hg_return_t
margo_handle_cache_put
(
margo_instance_id
mid
,
hg_handle_t
handle
)
{
struct
margo_handle_cache_el
*
el
;
/* look for handle in the in-use hash */
HASH_FIND
(
hh
,
mid
->
used_handle_hash
,
&
handle
,
sizeof
(
hg_handle_t
),
el
);
if
(
!
el
)
{
/* this handle was manually allocated -- just fall through */
return
HG_OTHER_ERROR
;
}
/* remove from the in-use hash */
HASH_DELETE
(
hh
,
mid
->
used_handle_hash
,
el
);
/* add to the tail of the free handle list */
LL_APPEND
(
mid
->
free_handle_list
,
el
);
return
HG_SUCCESS
;
}
tests/margo-test-client-timeout.c
View file @
5a1d56d8
...
...
@@ -144,7 +144,7 @@ int main(int argc, char **argv)
hret
=
margo_forward_timed
(
mid
,
handle
,
NULL
,
2000
.
0
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_addr_free
(
mid
,
svr_addr
);
/* shut down everything */
...
...
@@ -202,7 +202,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free
(
in
.
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
arg
->
mid
,
handle
);
free
(
buffer
);
printf
(
"ULT [%d] done.
\n
"
,
arg
->
val
);
...
...
tests/margo-test-client.c
View file @
5a1d56d8
...
...
@@ -142,7 +142,7 @@ int main(int argc, char **argv)
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
margo_addr_free
(
mid
,
svr_addr
);
/* shut down everything */
...
...
@@ -194,7 +194,7 @@ static void run_my_rpc(void *_arg)
/* clean up resources consumed by this rpc */
margo_bulk_free
(
in
.
bulk_handle
);
margo_free_output
(
handle
,
&
out
);
margo_destroy
(
handle
);
margo_destroy
(
arg
->
mid
,
handle
);
free
(
buffer
);
printf
(
"ULT [%d] done.
\n
"
,
arg
->
val
);
...
...
tests/my-rpc.c
View file @
5a1d56d8
...
...
@@ -80,7 +80,7 @@ static void my_rpc_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
@@ -104,7 +104,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
@@ -164,7 +164,7 @@ static void my_rpc_hang_ult(hg_handle_t handle)
assert
(
hret
==
HG_SUCCESS
);
margo_bulk_free
(
bulk_handle
);
margo_destroy
(
handle
);
margo_destroy
(
mid
,
handle
);
free
(
buffer
);
return
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment