Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mobject-store
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
12
Issues
12
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sds
mobject-store
Commits
28d89e85
Commit
28d89e85
authored
Dec 08, 2017
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
async api implemented and tested
parent
59fd13a9
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
280 additions
and
133 deletions
+280
-133
src/Makefile.subdir
src/Makefile.subdir
+2
-1
src/client/aio/aio-operate.c
src/client/aio/aio-operate.c
+71
-12
src/client/aio/completion.c
src/client/aio/completion.c
+43
-12
src/client/aio/completion.h
src/client/aio/completion.h
+14
-2
src/client/read-op.c
src/client/read-op.c
+0
-73
src/client/write-op.c
src/client/write-op.c
+0
-32
tests/Makefile.subdir
tests/Makefile.subdir
+4
-1
tests/mobject-aio-test.c
tests/mobject-aio-test.c
+146
-0
No files found.
src/Makefile.subdir
View file @
28d89e85
...
...
@@ -49,7 +49,8 @@ src_client_libmobject_store_la_SOURCES = \
src/client/read-op.c
\
src/client/write-op.c
\
src/client/omap-iter.c
\
src/client/aio/completion.c
src/client/aio/completion.c
\
src/client/aio/aio-operate.c
src_client_libmobject_store_la_CPPFLAGS
=
${AM_CPPFLAGS}
${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD
=
src/omap-iter/libomap-iter.la
\
src/io-chain/libio-chain.la
${CLIENT_LIBS}
...
...
src/client/aio/aio-operate.c
View file @
28d89e85
...
...
@@ -7,25 +7,84 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
// global variables for RPC ids, defined in client/cluster.c
extern
hg_id_t
mobject_write_op_rpc_id
;
extern
hg_id_t
mobject_read_op_rpc_id
;
int
mobject_store_aio_write_op_operate
(
mobject_store_write_op_t
write_op
,
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
time_t
*
mtime
,
int
flags
)
{
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
time_t
*
mtime
,
int
flags
)
{
hg_return_t
ret
;
}
write_op_in_t
in
;
in
.
object_name
=
oid
;
in
.
pool_name
=
io
->
pool_name
;
in
.
write_op
=
write_op
;
// TODO take mtime into account
prepare_write_op
(
io
->
cluster
->
mid
,
write_op
);
hg_addr_t
svr_addr
=
ssg_get_addr
(
io
->
cluster
->
gid
,
0
);
// XXX pick other servers using ch-placement
MOBJECT_ASSERT
(
svr_addr
!=
HG_ADDR_NULL
,
"NULL server address"
);
hg_handle_t
h
;
ret
=
margo_create
(
io
->
cluster
->
mid
,
svr_addr
,
mobject_write_op_rpc_id
,
&
h
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not create RPC handle"
);
margo_request
req
;
ret
=
margo_iforward
(
h
,
&
in
,
&
req
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not forward RPC"
);
completion
->
request
=
req
;
completion
->
handle
=
h
;
completion
->
type
=
AIO_WRITE_COMPLETION
;
completion
->
op
.
write_op
=
write_op
;
return
0
;
}
int
mobject_store_aio_read_op_operate
(
mobject_store_read_op_t
read_op
,
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
int
flags
)
{
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
int
flags
)
{
hg_return_t
ret
;
read_op_in_t
in
;
in
.
object_name
=
oid
;
in
.
pool_name
=
io
->
pool_name
;
in
.
read_op
=
read_op
;
prepare_read_op
(
io
->
cluster
->
mid
,
read_op
);
hg_addr_t
svr_addr
=
ssg_get_addr
(
io
->
cluster
->
gid
,
0
);
// XXX pick other servers using ch-placement
MOBJECT_ASSERT
(
svr_addr
!=
HG_ADDR_NULL
,
"NULL server address"
);
hg_handle_t
h
;
ret
=
margo_create
(
io
->
cluster
->
mid
,
svr_addr
,
mobject_read_op_rpc_id
,
&
h
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not create RPC handle"
);
margo_request
req
;
ret
=
margo_iforward
(
h
,
&
in
,
&
req
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not forward RPC"
);
completion
->
request
=
req
;
completion
->
handle
=
h
;
completion
->
type
=
AIO_READ_COMPLETION
;
completion
->
op
.
read_op
=
read_op
;
return
0
;
}
src/client/aio/completion.c
View file @
28d89e85
...
...
@@ -8,8 +8,10 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/aio/completion.h"
#include "src/rpc-types/read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/util/log.h"
#if 0
int
mobject_store_aio_create_completion
(
void
*
cb_arg
,
mobject_store_callback_t
cb_complete
,
mobject_store_callback_t
cb_safe
,
...
...
@@ -19,9 +21,12 @@ int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_completion_t
completion
=
(
mobject_store_completion_t
)
calloc
(
1
,
sizeof
(
struct
mobject_store_completion
));
MOBJECT_ASSERT
(
completion
!=
0
,
"Could not allocate mobject_store_completion_t object"
);
completion->cb_complete = cb_complete;
completion
->
type
=
AIO_NULL_COMPLETION
;
completion
->
op
.
read_op
=
NULL
;
completion
->
cb_complete
=
cb_complete
;
completion
->
cb_safe
=
cb_safe
;
completion
->
cb_arg
=
cb_arg
;
completion
->
handle
=
HG_HANDLE_NULL
;
completion
->
request
=
MARGO_REQUEST_NULL
;
*
pc
=
completion
;
return
0
;
...
...
@@ -35,12 +40,33 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
}
MOBJECT_ASSERT
(
c
->
request
!=
MARGO_REQUEST_NULL
,
"Invalid completion handle"
);
c->ret_value = margo_wait(c->request);
if(c->ret_value != HG_SUCCESS) {
int
ret
=
margo_wait
(
c
->
request
);
// TODO check the return value of margo_wait
if
(
ret
!=
HG_SUCCESS
)
{
MOBJECT_LOG
(
"Warning: margo_wait returned something different from HG_SUCCESS"
);
}
c
->
request
=
MARGO_REQUEST_NULL
;
switch
(
c
->
type
)
{
case
AIO_WRITE_COMPLETION
:
{
write_op_out_t
resp
;
ret
=
margo_get_output
(
c
->
handle
,
&
resp
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not get RPC output"
);
c
->
ret_value
=
resp
.
ret
;
ret
=
margo_free_output
(
c
->
handle
,
&
resp
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not free RPC output"
);
}
break
;
case
AIO_READ_COMPLETION
:
{
read_op_out_t
resp
;
ret
=
margo_get_output
(
c
->
handle
,
&
resp
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not get RPC output"
);
feed_read_op_pointers_from_response
(
c
->
op
.
read_op
,
resp
.
responses
);
ret
=
margo_free_output
(
c
->
handle
,
&
resp
);
MOBJECT_ASSERT
(
ret
==
HG_SUCCESS
,
"Could not free RPC output"
);
}
}
if
(
c
->
cb_safe
)
(
c
->
cb_safe
)(
c
,
c
->
cb_arg
);
...
...
@@ -52,8 +78,15 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
int
mobject_store_aio_is_complete
(
mobject_store_completion_t
c
)
{
MOBJECT_ASSERT(0,"mobject_store_aio_is_complete is not yet implemented");
return 0;
if
(
c
==
MOBJECT_COMPLETION_NULL
)
{
MOBJECT_LOG
(
"Warning: passing NULL to mobject_store_aio_wait_for_complete"
);
return
1
;
}
int
flag
;
margo_test
(
c
->
request
,
&
flag
);
return
flag
;
}
int
mobject_store_aio_get_return_value
(
mobject_store_completion_t
c
)
...
...
@@ -61,18 +94,16 @@ int mobject_store_aio_get_return_value(mobject_store_completion_t c)
int
r
;
if
(
c
==
MOBJECT_COMPLETION_NULL
)
{
MOBJECT_LOG
(
"Warning: passing NULL to mobject_store_aio_get_return_value"
);
return
0
;
return
-
1
;
}
MOBJECT_ASSERT((c->request == MARGO_REQUEST_NULL),
"calling mobject_store_aio_get_return_value on a non-terminated completion");
return
c
->
ret_value
;
}
void
mobject_store_aio_release
(
mobject_store_completion_t
c
)
{
if
(
c
==
MOBJECT_COMPLETION_NULL
)
return
;
MOBJECT_ASSERT(c->request
!
= MARGO_REQUEST_NULL,
MOBJECT_ASSERT
(
c
->
request
=
=
MARGO_REQUEST_NULL
,
"Trying to release a completion handle before operation completed"
);
free(c);
margo_destroy
(
c
->
handle
);
free
(
c
);
}
#endif
src/client/aio/completion.h
View file @
28d89e85
...
...
@@ -8,7 +8,14 @@
#include <margo.h>
#include "mobject-store-config.h"
#if 0
#include "libmobject-store.h"
typedef
enum
completion_op_type
{
AIO_NULL_COMPLETION
,
AIO_WRITE_COMPLETION
,
AIO_READ_COMPLETION
}
completion_op_type
;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
...
...
@@ -19,13 +26,18 @@
* in libmobject-store.h.
*/
struct
mobject_store_completion
{
completion_op_type
type
;
// completion for write or for reads
union
{
mobject_store_read_op_t
read_op
;
mobject_store_write_op_t
write_op
;
}
op
;
// operation that initiated the completion
mobject_store_callback_t
cb_complete
;
// completion callback
mobject_store_callback_t
cb_safe
;
// safe callback
void
*
cb_arg
;
// arguments for callbacks
margo_request
request
;
// margo request to wait on
hg_handle_t
handle
;
// handle of the RPC sent for this operation
int
ret_value
;
// return value of the operation
};
#endif
#endif
src/client/read-op.c
View file @
28d89e85
...
...
@@ -159,76 +159,3 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
read_op
->
num_actions
+=
1
;
}
/*
typedef struct read_op_ult_args {
mobject_store_read_op_t read_op;
mobject_store_ioctx_t ioctx;
mobject_store_completion_t completion
char* oid;
int flags;
} read_op_ult_args;
static void aio_read_op_operate_ult(read_op_ult_args* args) {
read_op_in_t in;
in.object_name = args->oid;
in.pool_name = args->ioctx->pool_name;
in.read_op = args->read_op;
prepare_read_op(io->mid, read_op);
// TODO: svr_addr should be computed based on the pool name, object name,
// and SSG structures accessible via the io context
hg_handle_t h;
margo_create(io->mid, io->svr_addr, mobject_read_op_rpc_id, &h);
margo_forward(h, &in);
read_op_out_t resp;
margo_get_output(h, &resp);
feed_read_op_pointers_from_response(read_op, resp.responses);
margo_free_output(h,&resp);
margo_destroy(h);
free(args->oid);
ABT_rwlock_wrlock(args->completion->lock);
int ret = 0; // TODO change that depending on results of the read_op
ABT_eventual_set (args->completion->eventual, &ret, sizeof(int));
mobject_store_callback_t cb_complete = args->completion->cb_complete;
void* cb_arg = args->completion->cb_arg;
ABT_rwlock_unlock(args->completion->lock);
if(complete_cb)
complete_cb(args->completion, cb_arg);
free(args);
return 0;
}
*/
int
mobject_store_aio_read_op_operate
(
mobject_store_read_op_t
read_op
,
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
int
flags
)
{
/* MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t object");
// TODO this is not great, we should use the margo non-blocking API instead
ABT_xstream self_es;
ABT_xstream_self(&self_es);
ABT_pool pool;
ABT_xstream_get_main_pools(self_es, 1, &pool);
ABT_thread ult;
read_op_ult_args* args = (read_op_ult_args*)calloc(1, sizeof(*args);
args->read_op = read_op;
args->ioctx = io;
args->completion = completion;
args->oid = strdup(oid);
args->flags = flags;
ABT_thread_create(pool, aio_read_op_operate_ult, args, ABT_THREAD_ATTR_NULL, &ult);
completion->ult = ult;
*/
return
0
;
}
src/client/write-op.c
View file @
28d89e85
...
...
@@ -274,35 +274,3 @@ void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
write_op
->
num_actions
+=
1
;
}
/*
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
const char *oid,
time_t *mtime,
int flags)
{
int r;
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
MOBJECT_ASSERT(0 == r, "Could not create completion object");
r = mobject_store_aio_write_op_operate(write_op, io, completion, oid, mtime, flags);
MOBJECT_ASSERT(0 == r, "Call to mobject_store_aio_write_op_operate failed");
r = mobject_store_aio_wait_for_complete(completion);
MOBJECT_ASSERT(0 == r, "Could not wait for completion");
int ret = mobject_store_aio_get_return_value(completion);
mobject_store_aio_release(completion);
return ret;
}
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
// TODO
}
*/
tests/Makefile.subdir
View file @
28d89e85
...
...
@@ -4,7 +4,8 @@ TESTS_ENVIRONMENT += \
check_PROGRAMS
+=
\
tests/mobject-connect-test
\
tests/mobject-client-test
tests/mobject-client-test
\
tests/mobject-aio-test
# don't include rados programs in make check
if
HAVE_RADOS
...
...
@@ -30,3 +31,5 @@ endif
tests_mobject_client_test_LDADD
=
src/client/libmobject-store.la
${CLIENT_LIBS}
tests_mobject_aio_test_LDADD
=
src/client/libmobject-store.la
${CLIENT_LIBS}
tests/mobject-aio-test.c
0 → 100644
View file @
28d89e85
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <libmobject-store.h>
const
char
*
content
=
"AAAABBBBCCCCDDDDEEEEFFFF"
;
/* Main function. */
int
main
(
int
argc
,
char
**
argv
)
{
mobject_store_t
cluster
;
mobject_store_create
(
&
cluster
,
"admin"
);
mobject_store_connect
(
cluster
);
mobject_store_ioctx_t
ioctx
;
mobject_store_ioctx_create
(
cluster
,
"my-object-pool"
,
&
ioctx
);
{
// WRITE OP TEST
mobject_store_write_op_t
write_op
=
mobject_store_create_write_op
();
// Add a "create" operation
mobject_store_write_op_create
(
write_op
,
LIBMOBJECT_CREATE_EXCLUSIVE
,
NULL
);
// Add a "write_full" operation to write "AAAABBBB"
mobject_store_write_op_write_full
(
write_op
,
content
,
8
);
// Add a "write" operation to write "CCCC"
mobject_store_write_op_write
(
write_op
,
content
+
8
,
4
,
8
);
// Add a "writesame" operation to write "DDDD" in two "DD"
mobject_store_write_op_writesame
(
write_op
,
content
+
12
,
2
,
4
,
12
);
// Add a "append" operation to append "EEEEFFFF"
mobject_store_write_op_append
(
write_op
,
content
+
16
,
8
);
// Add a "remove" operation
// mobject_store_write_op_remove(write_op);
// Add a "truncate" operation to remove the "FFFF" part
mobject_store_write_op_truncate
(
write_op
,
20
);
// Add a "zero" operation zero-ing the "BBBBCCCC"
mobject_store_write_op_zero
(
write_op
,
4
,
8
);
// Add a "omap_set" operation
const
char
*
keys
[]
=
{
"matthieu"
,
"rob"
,
"shane"
,
"phil"
,
"robl"
};
const
char
*
values
[]
=
{
"mdorier@anl.gov"
,
"rross@anl.gov"
,
"ssnyder@anl.gov"
,
"carns@anl.gov"
,
"robl@anl.gov"
};
size_t
val_sizes
[]
=
{
16
,
14
,
16
,
14
,
13
};
mobject_store_write_op_omap_set
(
write_op
,
keys
,
values
,
val_sizes
,
5
);
// Add a omap_rm_keys" operation
// mobject_store_write_op_omap_rm_keys(write_op, keys, 5);
mobject_store_completion_t
completion
=
MOBJECT_COMPLETION_NULL
;
mobject_store_aio_create_completion
(
NULL
,
NULL
,
NULL
,
&
completion
);
mobject_store_aio_write_op_operate
(
write_op
,
ioctx
,
completion
,
"test-object"
,
NULL
,
LIBMOBJECT_OPERATION_NOFLAG
);
mobject_store_aio_wait_for_complete
(
completion
);
mobject_store_release_write_op
(
write_op
);
mobject_store_aio_release
(
completion
);
}
{
// READ OP TEST
mobject_store_read_op_t
read_op
=
mobject_store_create_read_op
();
// Add "stat" operation
uint64_t
psize
;
time_t
pmtime
;
int
prval1
;
mobject_store_read_op_stat
(
read_op
,
&
psize
,
&
pmtime
,
&
prval1
);
// Add "read" operation
char
read_buf
[
512
];
size_t
bytes_read
;
int
prval2
;
mobject_store_read_op_read
(
read_op
,
0
,
512
,
read_buf
,
&
bytes_read
,
&
prval2
);
// Add "omap_get_keys" operation
const
char
*
start_after1
=
"shane"
;
mobject_store_omap_iter_t
iter3
;
int
prval3
;
mobject_store_read_op_omap_get_keys
(
read_op
,
start_after1
,
7
,
&
iter3
,
&
prval3
);
// Add "omap_get_vals" operation
const
char
*
start_after2
=
"matthieu"
;
const
char
*
filter_prefix2
=
"p"
;
mobject_store_omap_iter_t
iter4
;
int
prval4
;
mobject_store_read_op_omap_get_vals
(
read_op
,
start_after2
,
filter_prefix2
,
3
,
&
iter4
,
&
prval4
);
// Add "omap_get_vals_by_keys" operation
const
char
*
keys
[]
=
{
"matthieu"
,
"robl"
};
mobject_store_omap_iter_t
iter5
;
int
prval5
;
mobject_store_read_op_omap_get_vals_by_keys
(
read_op
,
keys
,
2
,
&
iter5
,
&
prval5
);
mobject_store_completion_t
completion
=
MOBJECT_COMPLETION_NULL
;
mobject_store_aio_create_completion
(
NULL
,
NULL
,
NULL
,
&
completion
);
mobject_store_aio_read_op_operate
(
read_op
,
ioctx
,
completion
,
"test-object"
,
LIBMOBJECT_OPERATION_NOFLAG
);
mobject_store_aio_wait_for_complete
(
completion
);
mobject_store_release_read_op
(
read_op
);
mobject_store_aio_release
(
completion
);
// print the results of the read operations
printf
(
"Client received the following results:
\n
"
);
printf
(
"stat: psize=%ld pmtime=%lld prval=%d
\n
"
,
psize
,
(
long
long
)
pmtime
,
prval1
);
{
printf
(
"read: bytes_read = %ld, prval=%d content: "
,
bytes_read
,
prval2
);
unsigned
i
;
for
(
i
=
0
;
i
<
bytes_read
;
i
++
)
printf
(
"%c"
,
read_buf
[
i
]
?
read_buf
[
i
]
:
'*'
);
printf
(
"
\n
"
);
}
printf
(
"omap_get_keys: prval=%d
\n
"
,
prval3
);
{
char
*
key
=
NULL
;
char
*
val
=
NULL
;
size_t
size
;
do
{
mobject_store_omap_get_next
(
iter3
,
&
key
,
&
val
,
&
size
);
if
(
key
)
printf
(
"===> key:
\"
%s
\"\n
"
,
key
);
}
while
(
key
);
}
printf
(
"omap_get_vals: prval=%d
\n
"
,
prval4
);
{
char
*
key
=
NULL
;
char
*
val
=
NULL
;
size_t
size
;
do
{
mobject_store_omap_get_next
(
iter4
,
&
key
,
&
val
,
&
size
);
if
(
key
)
printf
(
"===> key:
\"
%s
\"
, val: %s
\n
"
,
key
,
val
);
}
while
(
key
);
}
printf
(
"omap_get_vals_by_keys: prval=%d
\n
"
,
prval5
);
{
char
*
key
=
NULL
;
char
*
val
=
NULL
;
size_t
size
;
do
{
mobject_store_omap_get_next
(
iter5
,
&
key
,
&
val
,
&
size
);
if
(
key
)
printf
(
"===> key:
\"
%s
\"
, val: %s
\n
"
,
key
,
val
);
}
while
(
key
);
}
}
mobject_store_ioctx_destroy
(
ioctx
);
mobject_store_shutdown
(
cluster
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment