Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
B
bake
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
7
Issues
7
List
Boards
Labels
Milestones
Merge Requests
2
Merge Requests
2
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sds
bake
Commits
c9f647f2
Commit
c9f647f2
authored
Aug 14, 2018
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
implementation ready for testing
parent
209d72e5
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
214 additions
and
2 deletions
+214
-2
include/bake-client.h
include/bake-client.h
+2
-1
include/bake.h
include/bake.h
+1
-0
src/bake-client.c
src/bake-client.c
+51
-0
src/bake-rpc.h
src/bake-rpc.h
+10
-0
src/bake-server.c
src/bake-server.c
+150
-1
No files found.
include/bake-client.h
View file @
c9f647f2
...
...
@@ -390,7 +390,8 @@ int bake_migrate_target(
bake_target_id_t
src_target_id
,
int
remove_source
,
const
char
*
dest_addr
,
uint16_t
dest_provider_id
);
uint16_t
dest_provider_id
,
const
char
*
dest_root
);
/**
* Shuts down a remote BAKE service (given an address).
...
...
include/bake.h
View file @
c9f647f2
...
...
@@ -36,6 +36,7 @@ typedef struct {
#define BAKE_ERR_UNKNOWN_PROVIDER (-7)
/* Provider id could not be matched with a provider */
#define BAKE_ERR_UNKNOWN_REGION (-8)
/* Region id could not be found */
#define BAKE_ERR_OUT_OF_BOUNDS (-9)
/* Attempting an out of bound access */
#define BAKE_ERR_REMI (-10)
/* Error related to REMI */
#ifdef __cplusplus
}
...
...
src/bake-client.c
View file @
c9f647f2
...
...
@@ -35,6 +35,7 @@ struct bake_client
hg_id_t
bake_noop_id
;
hg_id_t
bake_remove_id
;
hg_id_t
bake_migrate_region_id
;
hg_id_t
bake_migrate_target_id
;
uint64_t
num_provider_handles
;
};
...
...
@@ -71,6 +72,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
margo_registered_name
(
mid
,
"bake_noop_rpc"
,
&
client
->
bake_noop_id
,
&
flag
);
margo_registered_name
(
mid
,
"bake_remove_rpc"
,
&
client
->
bake_remove_id
,
&
flag
);
margo_registered_name
(
mid
,
"bake_migrate_region_rpc"
,
&
client
->
bake_migrate_region_id
,
&
flag
);
margo_registered_name
(
mid
,
"bake_migrate_target_rpc"
,
&
client
->
bake_migrate_target_id
,
&
flag
);
}
else
{
/* RPCs not already registered */
...
...
@@ -113,6 +115,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client
->
bake_migrate_region_id
=
MARGO_REGISTER
(
mid
,
"bake_migrate_region_rpc"
,
bake_migrate_region_in_t
,
bake_migrate_region_out_t
,
NULL
);
client
->
bake_migrate_target_id
=
MARGO_REGISTER
(
mid
,
"bake_migrate_target_rpc"
,
bake_migrate_target_in_t
,
bake_migrate_target_out_t
,
NULL
);
}
return
BAKE_SUCCESS
;
...
...
@@ -771,6 +776,52 @@ int bake_migrate_region(
return
(
ret
);
}
int
bake_migrate_target
(
bake_provider_handle_t
source
,
bake_target_id_t
src_target_id
,
int
remove_source
,
const
char
*
dest_addr
,
uint16_t
dest_provider_id
,
const
char
*
dest_root
)
{
hg_return_t
hret
;
hg_handle_t
handle
;
bake_migrate_target_in_t
in
;
bake_migrate_target_out_t
out
;
int
ret
;
in
.
target_id
=
src_target_id
;
in
.
remove_src
=
remove_source
;
in
.
dest_remi_addr
=
dest_addr
;
in
.
dest_remi_provider_id
=
dest_provider_id
;
in
.
dest_root
=
dest_root
;
hret
=
margo_create
(
source
->
client
->
mid
,
source
->
addr
,
source
->
client
->
bake_migrate_target_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
return
BAKE_ERR_MERCURY
;
hret
=
margo_provider_forward
(
source
->
provider_id
,
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_destroy
(
handle
);
return
BAKE_ERR_MERCURY
;
}
hret
=
margo_get_output
(
handle
,
&
out
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_destroy
(
handle
);
return
BAKE_ERR_MERCURY
;
}
ret
=
out
.
ret
;
margo_free_output
(
handle
,
&
out
);
margo_destroy
(
handle
);
return
(
ret
);
}
int
bake_noop
(
bake_provider_handle_t
provider
)
{
hg_return_t
hret
;
...
...
src/bake-rpc.h
View file @
c9f647f2
...
...
@@ -133,6 +133,16 @@ MERCURY_GEN_PROC(bake_migrate_region_out_t,
((
int32_t
)(
ret
))
\
((
bake_region_id_t
)(
dest_rid
)))
/* BAKE migrate target */
MERCURY_GEN_PROC
(
bake_migrate_target_in_t
,
((
bake_target_id_t
)(
target_id
))
\
((
int32_t
)(
remove_src
))
\
((
hg_const_string_t
)(
dest_remi_addr
))
\
((
uint16_t
)(
dest_remi_provider_id
))
\
((
hg_const_string_t
)(
dest_root
)))
MERCURY_GEN_PROC
(
bake_migrate_target_out_t
,
((
int32_t
)(
ret
)))
static
inline
hg_return_t
hg_proc_bake_region_id_t
(
hg_proc_t
proc
,
bake_region_id_t
*
rid
)
{
/* TODO: update later depending on final region_id_t type */
...
...
src/bake-server.c
View file @
c9f647f2
...
...
@@ -8,7 +8,9 @@
#include <assert.h>
#include <libpmemobj.h>
#include <bake-server.h>
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#include "bake-server.h"
#include "uthash.h"
#include "bake-rpc.h"
...
...
@@ -26,6 +28,7 @@ DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER
(
bake_noop_ult
)
DECLARE_MARGO_RPC_HANDLER
(
bake_remove_ult
)
DECLARE_MARGO_RPC_HANDLER
(
bake_migrate_region_ult
)
DECLARE_MARGO_RPC_HANDLER
(
bake_migrate_target_ult
)
/* definition of BAKE root data structure (just a uuid for now) */
typedef
struct
...
...
@@ -49,6 +52,8 @@ typedef struct
PMEMobjpool
*
pmem_pool
;
bake_root_t
*
pmem_root
;
bake_target_id_t
target_id
;
char
*
root
;
char
*
filename
;
UT_hash_handle
hh
;
}
bake_pmem_entry_t
;
...
...
@@ -57,10 +62,14 @@ typedef struct bake_server_context_t
uint64_t
num_targets
;
bake_pmem_entry_t
*
targets
;
hg_id_t
bake_create_write_persist_id
;
remi_client_t
remi_client
;
remi_provider_t
remi_provider
;
}
bake_server_context_t
;
static
void
bake_server_finalize_cb
(
void
*
data
);
static
void
bake_target_migration_callback
(
remi_fileset_t
fileset
,
void
*
provider
);
int
bake_makepool
(
const
char
*
pool_name
,
size_t
pool_size
,
...
...
@@ -167,6 +176,10 @@ int bake_provider_register(
bake_migrate_region_in_t
,
bake_migrate_region_out_t
,
bake_migrate_region_ult
,
provider_id
,
abt_pool
);
margo_register_data
(
mid
,
rpc_id
,
(
void
*
)
tmp_svr_ctx
,
NULL
);
rpc_id
=
MARGO_REGISTER_PROVIDER
(
mid
,
"bake_migrate_target_rpc"
,
bake_migrate_target_in_t
,
bake_migrate_target_out_t
,
bake_migrate_target_ult
,
provider_id
,
abt_pool
);
margo_register_data
(
mid
,
rpc_id
,
(
void
*
)
tmp_svr_ctx
,
NULL
);
/* get a client-side version of the bake_create_write_persist RPC */
hg_bool_t
flag
;
...
...
@@ -179,6 +192,14 @@ int bake_provider_register(
bake_create_write_persist_in_t
,
bake_create_write_persist_out_t
,
NULL
);
}
/* register a REMI client */
remi_client_init
(
mid
,
&
(
tmp_svr_ctx
->
remi_client
));
/* register a REMI provider */
remi_provider_register
(
mid
,
provider_id
,
abt_pool
,
&
(
tmp_svr_ctx
->
remi_provider
));
remi_provider_register_migration_class
(
tmp_svr_ctx
->
remi_provider
,
"bake"
,
bake_target_migration_callback
,
NULL
,
tmp_svr_ctx
);
/* install the bake server finalize callback */
margo_push_finalize_callback
(
mid
,
&
bake_server_finalize_cb
,
tmp_svr_ctx
);
...
...
@@ -194,10 +215,19 @@ int bake_provider_add_storage_target(
bake_target_id_t
*
target_id
)
{
bake_pmem_entry_t
*
new_entry
=
calloc
(
1
,
sizeof
(
*
new_entry
));
new_entry
->
root
=
NULL
;
new_entry
->
filename
=
NULL
;
char
*
tmp
=
strrchr
(
target_name
,
'/'
);
new_entry
->
filename
=
strdup
(
tmp
);
ptrdiff_t
d
=
tmp
-
target_name
;
new_entry
->
root
=
strndup
(
target_name
,
d
);
new_entry
->
pmem_pool
=
pmemobj_open
(
target_name
,
NULL
);
if
(
!
(
new_entry
->
pmem_pool
))
{
fprintf
(
stderr
,
"pmemobj_open: %s
\n
"
,
pmemobj_errormsg
());
free
(
new_entry
->
filename
);
free
(
new_entry
->
root
);
free
(
new_entry
);
return
BAKE_ERR_PMEM
;
}
...
...
@@ -212,6 +242,8 @@ int bake_provider_add_storage_target(
{
fprintf
(
stderr
,
"Error: BAKE pool %s is not properly initialized
\n
"
,
target_name
);
pmemobj_close
(
new_entry
->
pmem_pool
);
free
(
new_entry
->
filename
);
free
(
new_entry
->
root
);
free
(
new_entry
);
return
BAKE_ERR_UNKNOWN_TARGET
;
}
...
...
@@ -224,6 +256,8 @@ int bake_provider_add_storage_target(
if
(
check_entry
!=
new_entry
)
{
fprintf
(
stderr
,
"Error: BAKE could not insert new pmem pool into the hash
\n
"
);
pmemobj_close
(
new_entry
->
pmem_pool
);
free
(
new_entry
->
filename
);
free
(
new_entry
->
root
);
free
(
new_entry
);
return
BAKE_ERR_ALLOCATION
;
}
...
...
@@ -251,6 +285,8 @@ int bake_provider_remove_storage_target(
if
(
!
entry
)
return
BAKE_ERR_UNKNOWN_TARGET
;
pmemobj_close
(
entry
->
pmem_pool
);
HASH_DEL
(
provider
->
targets
,
entry
);
free
(
entry
->
filename
);
free
(
entry
->
root
);
free
(
entry
);
return
0
;
}
...
...
@@ -262,6 +298,8 @@ int bake_provider_remove_all_storage_targets(
HASH_ITER
(
hh
,
provider
->
targets
,
p
,
tmp
)
{
HASH_DEL
(
provider
->
targets
,
p
);
pmemobj_close
(
p
->
pmem_pool
);
free
(
p
->
filename
);
free
(
p
->
root
);
free
(
p
);
}
provider
->
num_targets
=
0
;
...
...
@@ -1279,6 +1317,93 @@ static void bake_migrate_region_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER
(
bake_migrate_region_ult
)
static
void
bake_migrate_target_ult
(
hg_handle_t
handle
)
{
bake_migrate_target_in_t
in
;
in
.
dest_remi_addr
=
NULL
;
in
.
dest_root
=
NULL
;
bake_migrate_target_out_t
out
;
hg_addr_t
dest_addr
=
HG_ADDR_NULL
;
hg_return_t
hret
;
int
ret
;
remi_provider_handle_t
remi_ph
=
REMI_PROVIDER_HANDLE_NULL
;
remi_fileset_t
local_fileset
=
REMI_FILESET_NULL
;
memset
(
&
out
,
0
,
sizeof
(
out
));
margo_instance_id
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
mid
);
const
struct
hg_info
*
info
=
margo_get_info
(
handle
);
bake_provider_t
svr_ctx
=
margo_registered_data
(
mid
,
info
->
id
);
if
(
!
svr_ctx
)
{
out
.
ret
=
BAKE_ERR_UNKNOWN_PROVIDER
;
goto
finish
;
}
hret
=
margo_get_input
(
handle
,
&
in
);
if
(
hret
!=
HG_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_MERCURY
;
goto
finish
;
}
bake_pmem_entry_t
*
entry
=
find_pmem_entry
(
svr_ctx
,
in
.
target_id
);
if
(
!
entry
)
{
out
.
ret
=
BAKE_ERR_UNKNOWN_TARGET
;
goto
finish
;
}
/* lookup the address of the destination REMI provider */
hret
=
margo_addr_lookup
(
mid
,
in
.
dest_remi_addr
,
&
dest_addr
);
if
(
hret
!=
HG_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_MERCURY
;
goto
finish
;
}
/* use the REMI client to create a REMI provider handle */
ret
=
remi_provider_handle_create
(
svr_ctx
->
remi_client
,
dest_addr
,
in
.
dest_remi_provider_id
,
&
remi_ph
);
if
(
ret
!=
REMI_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_REMI
;
goto
finish
;
}
/* create a fileset */
ret
=
remi_fileset_create
(
"bake"
,
entry
->
root
,
&
local_fileset
);
if
(
ret
!=
REMI_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_REMI
;
goto
finish
;
}
/* fill the fileset */
ret
=
remi_fileset_register_file
(
local_fileset
,
entry
->
filename
);
if
(
ret
!=
REMI_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_REMI
;
goto
finish
;
}
/* issue the migration */
ret
=
remi_fileset_migrate
(
remi_ph
,
local_fileset
,
in
.
dest_root
,
in
.
remove_src
);
if
(
ret
!=
REMI_SUCCESS
)
{
out
.
ret
=
BAKE_ERR_REMI
;
goto
finish
;
}
/* remove the target from the list of managed targets */
bake_provider_remove_storage_target
(
svr_ctx
,
in
.
target_id
);
out
.
ret
=
BAKE_SUCCESS
;
finish:
remi_fileset_free
(
local_fileset
);
remi_provider_handle_release
(
remi_ph
);
margo_addr_free
(
mid
,
dest_addr
);
margo_free_input
(
handle
,
&
in
);
margo_respond
(
handle
,
&
out
);
margo_destroy
(
handle
);
return
;
}
DEFINE_MARGO_RPC_HANDLER
(
bake_migrate_target_ult
)
static
void
bake_server_finalize_cb
(
void
*
data
)
{
bake_server_context_t
*
svr_ctx
=
(
bake_server_context_t
*
)
data
;
...
...
@@ -1286,8 +1411,32 @@ static void bake_server_finalize_cb(void *data)
bake_provider_remove_all_storage_targets
(
svr_ctx
);
remi_client_finalize
(
svr_ctx
->
remi_client
);
free
(
svr_ctx
);
return
;
}
typedef
struct
migration_cb_args
{
char
root
[
1024
];
bake_server_context_t
*
provider
;
}
migration_cb_args
;
static
void
migration_fileset_cb
(
const
char
*
filename
,
void
*
arg
)
{
migration_cb_args
*
mig_args
=
(
migration_cb_args
*
)
arg
;
char
*
fullname
=
strcat
(
mig_args
->
root
,
filename
);
bake_target_id_t
tid
;
bake_provider_add_storage_target
(
mig_args
->
provider
,
fullname
,
&
tid
);
free
(
fullname
);
}
static
void
bake_target_migration_callback
(
remi_fileset_t
fileset
,
void
*
uarg
)
{
migration_cb_args
args
;
args
.
provider
=
(
bake_server_context_t
*
)
uarg
;
size_t
root_size
=
1024
;
remi_fileset_get_root
(
fileset
,
args
.
root
,
&
root_size
);
remi_fileset_foreach_file
(
fileset
,
migration_fileset_cb
,
&
args
);
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment