Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mobject-store
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
13
Issues
13
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sds
mobject-store
Commits
ba37c79a
Commit
ba37c79a
authored
Oct 27, 2017
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
done with operate functions
parent
24c138b7
Changes
14
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
423 additions
and
61 deletions
+423
-61
include/libmobject-store.h
include/libmobject-store.h
+4
-4
src/Makefile.subdir
src/Makefile.subdir
+11
-9
src/aio/completion.c
src/aio/completion.c
+32
-18
src/aio/completion.h
src/aio/completion.h
+12
-2
src/client/io-context.c
src/client/io-context.c
+23
-0
src/client/io-context.h
src/client/io-context.h
+19
-0
src/client/libmobject-store.c
src/client/libmobject-store.c
+80
-0
src/io-chain/read-op-impl.c
src/io-chain/read-op-impl.c
+64
-22
src/io-chain/write-op-impl.c
src/io-chain/write-op-impl.c
+2
-1
src/rpc-types/read-op.h
src/rpc-types/read-op.h
+2
-1
src/rpc-types/write-op.h
src/rpc-types/write-op.h
+2
-1
tests/Makefile.subdir
tests/Makefile.subdir
+6
-1
tests/io-chain/io-chain-server.c
tests/io-chain/io-chain-server.c
+2
-2
tests/mobject-client-test.c
tests/mobject-client-test.c
+164
-0
No files found.
include/libmobject-store.h
View file @
ba37c79a
...
...
@@ -102,7 +102,7 @@ enum {
* libmobject_store users must synchronize any of these changes on their own,
* or use separate io contexts for each thread
*/
typedef
void
*
mobject_store_ioctx_t
;
typedef
struct
mobject_ioctx
*
mobject_store_ioctx_t
;
/**
* @typedef mobject_store_omap_iter_t
...
...
@@ -370,9 +370,9 @@ void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
*/
int
mobject_store_write_op_operate
(
mobject_store_write_op_t
write_op
,
mobject_store_ioctx_t
io
,
const
char
*
oid
,
time_t
*
mtime
,
int
flags
);
const
char
*
oid
,
time_t
*
mtime
,
int
flags
);
/**
* Perform a write operation asynchronously
...
...
src/Makefile.subdir
View file @
ba37c79a
noinst_HEADERS
+=
\
src/aio/completion.h
\
src/client/io-context.h
\
src/io-chain/args-read-actions.h
\
src/io-chain/args-write-actions.h
\
src/util/buffer-union.h
\
src/aio/completion.h
\
src/util/log.h
\
src/omap-iter/omap-iter-impl.h
\
src/io-chain/prepare-read-op.h
\
src/io-chain/prepare-write-op.h
\
src/omap-iter/proc-omap-iter.h
\
src/io-chain/proc-read-actions.h
\
src/io-chain/proc-read-responses.h
\
src/io-chain/proc-write-actions.h
\
...
...
@@ -16,14 +13,18 @@ noinst_HEADERS += \
src/io-chain/read-op-visitor.h
\
src/io-chain/read-resp-impl.h
\
src/io-chain/read-responses.h
\
src/util/utlist.h
\
src/io-chain/write-actions.h
\
src/io-chain/write-op-impl.h
\
src/io-chain/write-op-visitor.h
\
src/server/exec-write-op.h
\
src/server/exec-read-op.h
\
src/omap-iter/omap-iter-impl.h
\
src/omap-iter/proc-omap-iter.h
\
src/rpc-types/read-op.h
\
src/rpc-types/write-op.h
\
src/rpc-types/read-op.h
src/server/exec-read-op.h
\
src/server/exec-write-op.h
\
src/util/buffer-union.h
\
src/util/log.h
\
src/util/utlist.h
noinst_LTLIBRARIES
+=
src/libomap-iter.la
\
src/libiochain.la
...
...
@@ -44,6 +45,7 @@ src_libiochain_la_SOURCES = src/io-chain/prepare-read-op.c \
src_libmobject_store_la_SOURCES
=
\
src/client/io-context.c
\
src/aio/completion.c
\
src/client/libmobject-store.c
...
...
src/aio/completion.c
View file @
ba37c79a
...
...
@@ -19,15 +19,16 @@ int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_completion_t
completion
=
(
mobject_store_completion_t
)
calloc
(
1
,
sizeof
(
struct
mobject_store_completion
));
MOBJECT_ASSERT
(
completion
!=
0
,
"Could not allocate mobject_store_completion_t object"
);
completion
->
state
=
COMPLETION_CREATED
;
completion
->
cb_complete
=
cb_complete
;
completion
->
cb_safe
=
cb_safe
;
completion
->
cb_arg
=
cb_arg
;
r
=
ABT_eventual_create
(
sizeof
(
int
),
(
void
**
)(
&
(
completion
->
eventual
)));
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"Could not create ABT_eventual"
);
completion
->
ret_value_ptr
=
(
int
*
)
0
;
//
r = ABT_eventual_create(sizeof(int), (void**)(&(completion->eventual)));
//
MOBJECT_ASSERT(r == ABT_SUCCESS, "Could not create ABT_eventual");
//
completion->ret_value_ptr = (int*)0;
r
=
ABT_rwlock_create
(
&
(
completion
->
lock
));
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"Could not create ABT_rwlock"
);
completion
->
ult
=
ABT_THREAD_NULL
;
*
pc
=
completion
;
return
0
;
}
...
...
@@ -40,14 +41,21 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
return
-
1
;
}
int
*
val_ptr
=
(
int
*
)
0
;
r
=
ABT_eventual_wait
(
c
->
eventual
,
(
void
**
)(
&
val_ptr
));
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_eventual_wait failed"
);
r
=
ABT_rwlock_wrlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_wrlock failed"
);
c
->
ret_value_ptr
=
val_ptr
;
r
=
ABT_rwlock_unlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_unlock failed"
);
if
(
c
->
state
==
COMPLETION_IN_PROGRESS
||
c
->
state
==
COMPLETION_TERMINATED
)
{
ABT_thread_join
(
c
->
ult
);
ABT_thread_free
(
c
->
ult
);
c
->
ult
=
ABT_THREAD_NULL
;
c
->
state
=
COMPLETION_JOINED
;
}
// int* val_ptr = (int*)0;
// r = ABT_eventual_wait(c->eventual, (void**)(&val_ptr));
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_eventual_wait failed");
// r = ABT_rwlock_wrlock(c->lock);
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_wrlock failed");
// c->ret_value_ptr = val_ptr;
// r = ABT_rwlock_unlock(c->lock);
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_unlock failed");
return
0
;
}
...
...
@@ -59,13 +67,13 @@ int mobject_store_aio_is_complete(mobject_store_completion_t c)
MOBJECT_LOG
(
"Warning: passing NULL to mobject_store_aio_is_complete"
);
return
0
;
}
int
result
=
0
;
return
(
c
->
state
==
COMPLETION_TERMINATED
)
||
(
c
->
state
==
COMPLETION_JOINED
);
int
result
;
r
=
ABT_rwlock_rdlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_rdlock failed"
);
result
=
(
c
->
ret_value_ptr
!=
(
int
*
)
0
);
result
=
(
c
->
state
==
COMPLETION_TERMINATED
)
||
(
c
->
state
==
COMPLETION_JOINED
);
r
=
ABT_rwlock_unlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_unlock failed"
);
return
result
;
}
...
...
@@ -76,20 +84,26 @@ int mobject_store_aio_get_return_value(mobject_store_completion_t c)
MOBJECT_LOG
(
"Warning: passing NULL to mobject_store_aio_get_return_value"
);
return
0
;
}
if
(
c
->
state
==
COMPLETION_TERMINATED
)
{
mobject_store_aio_wait_for_complete
(
c
);
}
MOBJECT_ASSERT
((
c
->
state
!=
COMPLETION_JOINED
),
"calling mobject_store_aio_get_return_value on a non-terminated completion"
);
int
result
=
0
;
r
=
ABT_rwlock_rdlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_rdlock failed"
);
if
(
c
->
ret_value_ptr
!=
(
int
*
)
0
)
result
=
*
(
c
->
ret_value_ptr
);
result
=
c
->
ret_value
;
//if(c->ret_value_ptr != (int*)0) result = *(c->ret_value_ptr);
r
=
ABT_rwlock_unlock
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_unlock failed"
);
return
0
;
return
result
;
}
void
mobject_store_aio_release
(
mobject_store_completion_t
c
)
{
int
r
;
if
(
c
==
MOBJECT_COMPLETION_NULL
)
return
;
r
=
ABT_eventual_free
(
c
->
eventual
);
//
r = ABT_eventual_free(c->eventual);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_eventual_free failed"
);
r
=
ABT_rwlock_free
(
c
->
lock
);
MOBJECT_ASSERT
(
r
==
ABT_SUCCESS
,
"ABT_rwlock_free failed"
);
...
...
src/aio/completion.h
View file @
ba37c79a
...
...
@@ -9,6 +9,13 @@
#include <abt.h>
#include "mobject-store-config.h"
typedef
enum
{
COMPLETION_CREATED
=
1
,
COMPLETION_IN_PROGRESS
,
COMPLETION_TERMINATED
,
COMPLETION_JOINED
}
completion_state_t
;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
...
...
@@ -19,12 +26,15 @@
* in libmobject-store.h.
*/
struct
mobject_store_completion
{
completion_state_t
state
;
// state of the completion
mobject_store_callback_t
cb_complete
;
// completion callback
mobject_store_callback_t
cb_safe
;
// safe callback
void
*
cb_arg
;
// arguments for callbacks
ABT_eventual
eventual
;
// eventual used to notify completion
int
*
ret_value_ptr
;
// pointer to eventual's internal value
// ABT_eventual eventual; // eventual used to notify completion
// int* ret_value_ptr; // pointer to eventual's internal value
int
ret_value
;
// return value of the operation
ABT_rwlock
lock
;
// lock protecting access to this structure
ABT_thread
ult
;
// thread running the operation
};
#endif
...
...
src/client/io-context.c
0 → 100644
View file @
ba37c79a
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "libmobject-store.h"
#include "src/client/io-context.h"
int
mobject_store_ioctx_create
(
mobject_store_t
cluster
,
const
char
*
pool_name
,
mobject_store_ioctx_t
*
ioctx
)
{
// TODO take mid from cluster parameter
*
ioctx
=
(
mobject_store_ioctx_t
)
calloc
(
1
,
sizeof
(
**
ioctx
));
(
*
ioctx
)
->
pool_name
=
strdup
(
pool_name
);
}
void
mobject_store_ioctx_destroy
(
mobject_store_ioctx_t
ioctx
)
{
if
(
ioctx
)
free
(
ioctx
->
pool_name
);
free
(
ioctx
);
}
src/client/io-context.h
0 → 100644
View file @
ba37c79a
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_IOCTX_H
#define __MOBJECT_IOCTX_H
#include <margo.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
typedef
struct
mobject_ioctx
{
margo_instance_id
mid
;
hg_addr_t
svr_addr
;
// TODO change this to an SSG thingy
char
*
pool_name
;
}
*
mobject_store_ioctx_t
;
#endif
src/client/libmobject-store.c
View file @
ba37c79a
...
...
@@ -14,9 +14,21 @@
#include <ssg.h>
#include "libmobject-store.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/io-context.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/io-chain/prepare-write-op.h"
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
// global variables for RPC ids
hg_id_t
mobject_write_op_rpc_id
;
hg_id_t
mobject_read_op_rpc_id
;
hg_id_t
mobject_shutdown_rpc_id
;
typedef
struct
mobject_store_handle
{
ssg_group_id_t
gid
;
...
...
@@ -79,3 +91,71 @@ void mobject_store_shutdown(mobject_store_t cluster)
return
;
}
void
mobject_store_register
(
margo_instance_id
mid
)
{
static
int
registered
=
0
;
if
(
!
registered
)
{
mobject_write_op_rpc_id
=
MARGO_REGISTER
(
mid
,
"mobject_write_op"
,
write_op_in_t
,
write_op_out_t
,
NULL
);
mobject_read_op_rpc_id
=
MARGO_REGISTER
(
mid
,
"mobject_read_op"
,
read_op_in_t
,
read_op_out_t
,
NULL
);
mobject_shutdown_rpc_id
=
MARGO_REGISTER
(
mid
,
"mobject_shutdown"
,
void
,
void
,
NULL
);
registered
=
1
;
}
}
int
mobject_store_write_op_operate
(
mobject_store_write_op_t
write_op
,
mobject_store_ioctx_t
io
,
const
char
*
oid
,
time_t
*
mtime
,
int
flags
)
{
write_op_in_t
in
;
in
.
object_name
=
oid
;
in
.
pool_name
=
io
->
pool_name
;
in
.
write_op
=
write_op
;
prepare_write_op
(
io
->
mid
,
write_op
);
hg_handle_t
h
;
margo_create
(
io
->
mid
,
io
->
svr_addr
,
mobject_write_op_rpc_id
,
&
h
);
margo_forward
(
h
,
&
in
);
write_op_out_t
resp
;
margo_get_output
(
h
,
&
resp
);
margo_free_output
(
h
,
&
resp
);
margo_destroy
(
h
);
}
int
mobject_store_read_op_operate
(
mobject_store_read_op_t
read_op
,
mobject_store_ioctx_t
ioctx
,
const
char
*
oid
,
int
flags
)
{
read_op_in_t
in
;
in
.
object_name
=
oid
;
in
.
pool_name
=
ioctx
->
pool_name
;
in
.
read_op
=
read_op
;
prepare_read_op
(
ioctx
->
mid
,
read_op
);
// TODO: svr_addr should be computed based on the pool name, object name,
// and SSG structures accessible via the io context
hg_handle_t
h
;
margo_create
(
ioctx
->
mid
,
ioctx
->
svr_addr
,
mobject_read_op_rpc_id
,
&
h
);
margo_forward
(
h
,
&
in
);
read_op_out_t
resp
;
margo_get_output
(
h
,
&
resp
);
feed_read_op_pointers_from_response
(
read_op
,
resp
.
responses
);
margo_free_output
(
h
,
&
resp
);
margo_destroy
(
h
);
return
0
;
}
src/io-chain/read-op-impl.c
View file @
ba37c79a
...
...
@@ -9,7 +9,6 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/io-chain/read-op-impl.h"
#include "src/aio/completion.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
...
...
@@ -175,32 +174,75 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
read_op
->
num_actions
+=
1
;
}
int
mobject_store_read_op_operate
(
mobject_store_read_op_t
read_op
,
mobject_store_ioctx_t
io
,
const
char
*
oid
,
int
flags
)
{
int
r
;
MOBJECT_ASSERT
(
read_op
!=
MOBJECT_READ_OP_NULL
,
"invalid mobject_store_read_op_t obect"
);
mobject_store_completion_t
completion
=
MOBJECT_COMPLETION_NULL
;
r
=
mobject_store_aio_create_completion
(
NULL
,
NULL
,
NULL
,
&
completion
);
MOBJECT_ASSERT
(
0
==
r
,
"Could not create completion object"
);
r
=
mobject_store_aio_read_op_operate
(
read_op
,
io
,
completion
,
oid
,
flags
);
MOBJECT_ASSERT
(
0
==
r
,
"Call to mobject_store_aio_read_op_operate failed"
);
r
=
mobject_store_aio_wait_for_complete
(
completion
);
MOBJECT_ASSERT
(
0
==
r
,
"Could not wait for completion"
);
int
ret
=
mobject_store_aio_get_return_value
(
completion
);
mobject_store_aio_release
(
completion
);
return
ret
;
}
/*
typedef struct read_op_ult_args {
mobject_store_read_op_t read_op;
mobject_store_ioctx_t ioctx;
mobject_store_completion_t completion
char* oid;
int flags;
} read_op_ult_args;
static void aio_read_op_operate_ult(read_op_ult_args* args) {
read_op_in_t in;
in.object_name = args->oid;
in.pool_name = args->ioctx->pool_name;
in.read_op = args->read_op;
prepare_read_op(io->mid, read_op);
// TODO: svr_addr should be computed based on the pool name, object name,
// and SSG structures accessible via the io context
hg_handle_t h;
margo_create(io->mid, io->svr_addr, mobject_read_op_rpc_id, &h);
margo_forward(h, &in);
read_op_out_t resp;
margo_get_output(h, &resp);
feed_read_op_pointers_from_response(read_op, resp.responses);
margo_free_output(h,&resp);
margo_destroy(h);
free(args->oid);
ABT_rwlock_wrlock(args->completion->lock);
int ret = 0; // TODO change that depending on results of the read_op
ABT_eventual_set (args->completion->eventual, &ret, sizeof(int));
mobject_store_callback_t cb_complete = args->completion->cb_complete;
void* cb_arg = args->completion->cb_arg;
ABT_rwlock_unlock(args->completion->lock);
if(complete_cb)
complete_cb(args->completion, cb_arg);
free(args);
return 0;
}
*/
int
mobject_store_aio_read_op_operate
(
mobject_store_read_op_t
read_op
,
mobject_store_ioctx_t
io
,
mobject_store_completion_t
completion
,
const
char
*
oid
,
int
flags
)
{
MOBJECT_ASSERT
(
read_op
!=
MOBJECT_READ_OP_NULL
,
"invalid mobject_store_read_op_t obect"
);
// TODO
/* MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t object");
// TODO this is not great, we should use the margo non-blocking API instead
ABT_xstream self_es;
ABT_xstream_self(&self_es);
ABT_pool pool;
ABT_xstream_get_main_pools(self_es, 1, &pool);
ABT_thread ult;
read_op_ult_args* args = (read_op_ult_args*)calloc(1, sizeof(*args);
args->read_op = read_op;
args->ioctx = io;
args->completion = completion;
args->oid = strdup(oid);
args->flags = flags;
ABT_thread_create(pool, aio_read_op_operate_ult, args, ABT_THREAD_ATTR_NULL, &ult);
completion->ult = ult;
*/
}
src/io-chain/write-op-impl.c
View file @
ba37c79a
...
...
@@ -293,7 +293,7 @@ void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
write_op
->
num_actions
+=
1
;
}
/*
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
const char *oid,
...
...
@@ -324,3 +324,4 @@ int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
// TODO
}
*/
src/rpc-types/read-op.h
View file @
ba37c79a
...
...
@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC
(
read_op_in_t
,
((
hg_string_t
)(
object_name
))
\
((
hg_const_string_t
)(
pool_name
))
\
((
hg_const_string_t
)(
object_name
))
\
((
mobject_store_read_op_t
)(
read_op
)))
MERCURY_GEN_PROC
(
read_op_out_t
,
((
read_response_t
)(
responses
)))
...
...
src/rpc-types/write-op.h
View file @
ba37c79a
...
...
@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC
(
write_op_in_t
,
((
hg_string_t
)(
object_name
))
\
((
hg_const_string_t
)(
pool_name
))
\
((
hg_const_string_t
)(
object_name
))
\
((
mobject_store_write_op_t
)(
write_op
)))
MERCURY_GEN_PROC
(
write_op_out_t
,
((
int32_t
)(
ret
)))
...
...
tests/Makefile.subdir
View file @
ba37c79a
check_PROGRAMS
+=
\
tests/mobject-connect-test
\
tests/mobject-server
\
tests/test-sds-keyval-client
tests/test-sds-keyval-client
\
tests/mobject-client-test
if
HAVE_RADOS
check_PROGRAMS
+=
\
...
...
@@ -15,6 +16,10 @@ tests_mobject_connect_test_SOURCES = tests/mobject-connect-test.c
tests_mobject_connect_test_CPPFLAGS
=
-I
${srcdir}
/include
tests_mobject_connect_test_LDADD
=
src/libmobject-store.la
${LIBS}
tests_mobject_client_test_SOURCES
=
tests/mobject-client-test.c
tests_mobject_client_test_CPPFLAGS
=
-I
${srcdir}
/include
tests_mobject_client_test_LDADD
=
src/libmobject-store.la
${LIBS}
tests_mobject_server_CPPFLAGS
=
-I
${srcdir}
/include
tests_mobject_server_LDADD
=
src/libmobject-server.la
${LIBS}
...
...
tests/io-chain/io-chain-server.c
View file @
ba37c79a
...
...
@@ -97,7 +97,7 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h)
assert
(
ret
==
HG_SUCCESS
);
/* Execute the operation chain */
execute_write_op_visitor
(
&
write_op_printer
,
in
.
write_op
,
in
.
object_name
);
execute_write_op_visitor
(
&
write_op_printer
,
in
.
write_op
,
(
void
*
)
in
.
object_name
);
// set the return value of the RPC
out
.
ret
=
0
;
...
...
@@ -163,7 +163,7 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h)
read_response_t
resp
=
build_matching_read_responses
(
in
.
read_op
);
/* Compute the result. */
execute_read_op_visitor
(
&
read_op_printer
,
in
.
read_op
,
in
.
object_name
);
execute_read_op_visitor
(
&
read_op_printer
,
in
.
read_op
,
(
void
*
)
in
.
object_name
);
out
.
responses
=
resp
;
...
...
tests/mobject-client-test.c
0 → 100644
View file @
ba37c79a
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <libmobject-store.h>
#include "src/client/io-context.h" // XXX included because we modify manually the ioctx
void
mobject_store_register
(
margo_instance_id
mid
);
/* Main function. */
int
main
(
int
argc
,
char
**
argv
)
{
if
(
argc
!=
2
)
{
fprintf
(
stderr
,
"Usage: %s <server address>
\n
"
,
argv
[
0
]);
exit
(
0
);
}
/* Start Margo */
margo_instance_id
mid
=
margo_init
(
"bmi+tcp"
,
MARGO_CLIENT_MODE
,
0
,
0
);
mobject_store_register
(
mid
);
/* Lookup the address of the server */
hg_addr_t
svr_addr
;
margo_addr_lookup
(
mid
,
argv
[
1
],
&
svr_addr
);
mobject_store_ioctx_t
ioctx
;
mobject_store_ioctx_create
(
NULL
,
"my-object-pool"
,
&
ioctx
);
// XXX the bellow modifications of ioctx should be done inside mobject_store_ioctx_create
// once we have the rest of the API
ioctx
->
svr_addr
=
svr_addr
;
ioctx
->
mid
=
mid
;
char
buffer
[
256
];
unsigned
i
;
for
(
i
=
0
;
i
<
256
;
i
++
)
buffer
[
i
]
=
'A'
+
(
i
%
26
);
{
// WRITE OP TEST
mobject_store_write_op_t
write_op
=
mobject_store_create_write_op
();
// Add a "create" operation
mobject_store_write_op_create
(
write_op
,
LIBMOBJECT_CREATE_EXCLUSIVE
,
NULL
);
// Add a "write" operation
mobject_store_write_op_write
(
write_op
,
buffer
,
128
,
32
);
// Add a "write_full" operation
mobject_store_write_op_write_full
(
write_op
,
buffer
,
256
);
// Add a "writesame" operation
mobject_store_write_op_writesame
(
write_op
,
buffer
,
32
,
64
,
256
);
// Add a "append" operation
mobject_store_write_op_append
(
write_op
,
buffer
,
64
);
// Add a "remove" operation
mobject_store_write_op_remove
(
write_op
);
// Add a "truncate" operation
mobject_store_write_op_truncate
(
write_op
,
32
);
// Add a "zero" operation
mobject_store_write_op_zero
(
write_op
,
16
,
48
);
// Add a "omap_set" operation
const
char
*
keys
[]
=
{
"matthieu"
,
"rob"
,
"shane"
,
"phil"
,
"robl"
};
const
char
*
values
[]
=
{
"mdorier@anl.gov"
,
"rross@anl.gov"
,
"ssnyder@anl.gov"
,
"carns@anl.gov"
,
"robl@anl.gov"
};
size_t
val_sizes
[]
=
{
16
,
14
,
16
,
14
,
13
};
mobject_store_write_op_omap_set
(
write_op
,
keys
,
values
,
val_sizes
,
5
);
// Add a omap_rm_keys" operation
mobject_store_write_op_omap_rm_keys
(
write_op
,
keys
,
5
);
mobject_store_write_op_operate
(
write_op
,
ioctx
,
"test-object"
,
NULL
,
LIBMOBJECT_OPERATION_NOFLAG
);
// BEGIN this is what write_op_operate should contain
/*
write_op_in_t in;
in.object_name = "test-object";
in.write_op = write_op;
prepare_write_op(mid, write_op);
hg_handle_t h;
margo_create(mid, svr_addr, write_op_rpc_id, &h);
margo_forward(h, &in);
write_op_out_t resp;
margo_get_output(h, &resp);
margo_free_output(h,&resp);
margo_destroy(h);
*/
// END this is what write_op_operate should contain
mobject_store_release_write_op
(
write_op
);
}
{
// READ OP TEST
mobject_store_read_op_t
read_op
=
mobject_store_create_read_op
();
// Add "stat" operation
uint64_t
psize
;
time_t
pmtime
;
int
prval1
;
mobject_store_read_op_stat
(
read_op
,
&
psize
,
&
pmtime
,
&
prval1
);
// Add "read" operation
char
read_buf
[
512
];
size_t
bytes_read
;
int
prval2
;
mobject_store_read_op_read
(
read_op
,
2
,
32
,
buffer
,
&
bytes_read
,
&
prval2
);
// Add "omap_get_keys" operation
const
char
*
start_after
=
"shane"
;
mobject_store_omap_iter_t
iter3
;
int
prval3
;
mobject_store_read_op_omap_get_keys
(
read_op
,
start_after
,
7
,
&
iter3
,
&
prval3
);
// Add "omap_get_vals" operation
const
char
*
filter_prefix
=
"p"
;
mobject_store_omap_iter_t
iter4
;
int
prval4
;
mobject_store_read_op_omap_get_vals
(
read_op
,
start_after
,
filter_prefix
,
3
,
&
iter4
,
&
prval4
);
// Add "omap_get_vals_by_keys" operation
const
char
*
keys
[]
=
{
"matthieu"
,
"shane"
};
int
prval5
;
mobject_store_read_op_omap_get_vals_by_keys
(
read_op
,
keys
,
2
,
&
iter4
,
&
prval5
);
mobject_store_read_op_operate
(
read_op
,
ioctx
,
"test-object"
,
LIBMOBJECT_OPERATION_NOFLAG
);
/*
read_op_in_t in;
in.object_name = "test-object";
in.read_op = read_op;
prepare_read_op(mid, read_op);
hg_handle_t h;
margo_create(mid, svr_addr, read_op_rpc_id, &h);
margo_forward(h, &in);
read_op_out_t resp;
margo_get_output(h, &resp);
feed_read_op_pointers_from_response(read_op, resp.responses);
margo_free_output(h,&resp);
margo_destroy(h);
*/
// END this is what read_op_operate should contain
mobject_store_release_read_op
(
read_op
);
// print the results of the read operations
printf
(
"Client received the following results:
\n
"
);
printf
(
"stat: psize=%ld pmtime=%lld prval=%d
\n
"
,
psize
,
pmtime
,
prval1
);
printf
(
"read: bytes_read=%ld prval=%d
\n
"
,
bytes_read
,
prval2
);
printf
(
"omap_get_keys: prval=%d
\n
"
,
prval3
);
printf
(
"omap_get_vals: prval=%d
\n
"
,
prval4
);
printf
(
"omap_get_vals_by_keys: prval=%d
\n
"
,
prval5
);
}
mobject_store_ioctx_destroy
(
ioctx
);
/* free the address */
margo_addr_free
(
mid
,
svr_addr
);
/* shut down Margo */
margo_finalize
(
mid
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment