Commit 4ab5867f authored by Matthieu Dorier's avatar Matthieu Dorier

write_op fully tested and working

parent d9dc798a
......@@ -6,7 +6,7 @@ TESTS =
EXTRA_DIST = prepare.sh
AM_CPPFLAGS = -I$(top_srcdir)/include
AM_CFLAGS =
AM_CFLAGS =
AM_CXXFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = src/libmobject-store.la
......
......@@ -79,6 +79,9 @@ enum {
};
/** @} */
#define LIBMOBJECT_CREATE_EXCLUSIVE 1
#define LIBMOBJECT_CREATE_IDEMPOTENT 0
/**
* @typedef mobject_store_ioctx_t
*
......@@ -294,9 +297,9 @@ void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
*/
void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
const char *buffer,
size_t data_len,
size_t write_len,
uint64_t offset);
size_t data_len,
size_t write_len,
uint64_t offset);
/**
* Append to end of object.
......@@ -305,8 +308,8 @@ void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
* @param len length of buffer
*/
void mobject_store_write_op_append(mobject_store_write_op_t write_op,
const char *buffer,
size_t len);
const char *buffer,
size_t len);
/**
* Remove object
* @param write_op operation to add this action to
......
......@@ -31,7 +31,10 @@ static void convert_append(uint64_t* cur_offset,
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
{
if(write_op->ready == 1) return;
if(write_op->num_actions == 0) return;
if(write_op->num_actions == 0) {
write_op->ready = 1;
return;
}
wr_action_base_t action;
......
......@@ -113,7 +113,7 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o
// for each action ...
DL_FOREACH((*read_op)->actions,elem) {
read_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
"Invalid read_op opcode");
// encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
......@@ -143,11 +143,12 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o
read_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the action's arguments
ret = decode_read_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret;
next_action->type = opcode;
// append to the list
DL_APPEND((*read_op)->actions, next_action);
}
......
......@@ -163,7 +163,7 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write
// for each action ...
DL_FOREACH((*write_op)->actions,elem) {
write_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_),
MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_),
"Invalid write_op opcode");
// encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
......@@ -194,11 +194,12 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write
write_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_),
MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the action's arguments
ret = decode_write_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret;
next_action->type = opcode;
// append to the list
DL_APPEND((*write_op)->actions, next_action);
}
......
......@@ -8,13 +8,13 @@
#include "read-op-impl.h"
#include "utlist.h"
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a);
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a);
static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a);
static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a);
static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a);
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs);
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a, void* uargs);
typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t);
typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t, void*);
static dispatch_fn visitor_dispatch[_READ_OPCODE_END_ENUM_] = {
NULL,
......@@ -25,15 +25,14 @@ static dispatch_fn visitor_dispatch[_READ_OPCODE_END_ENUM_] = {
(dispatch_fn)execute_read_op_visitor_on_omap_get_vals_by_keys,
};
void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op)
void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op, void* uargs)
{
void* uargs = visitor->uargs;
rd_action_base_t a;
visitor->visit_begin(uargs);
DL_FOREACH((read_op->actions), a) {
visitor_dispatch[a->type](visitor, a);
visitor_dispatch[a->type](visitor, a, uargs);
}
visitor->visit_end(uargs);
......@@ -43,31 +42,31 @@ void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t
// STATIC FUNCTIONS BELOW //
////////////////////////////////////////////////////////////////////////////////
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a)
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs)
{
if(visitor->visit_stat)
visitor->visit_stat(visitor->uargs, a->psize, a->pmtime, a->prval);
visitor->visit_stat(uargs, a->psize, a->pmtime, a->prval);
}
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a)
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs)
{
if(visitor->visit_read)
visitor->visit_read(visitor->uargs, a->offset, a->len, a->buffer, a->bytes_read, a->prval);
visitor->visit_read(uargs, a->offset, a->len, a->buffer, a->bytes_read, a->prval);
}
static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a)
static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a, void* uargs)
{
if(visitor->visit_omap_get_keys)
visitor->visit_omap_get_keys(visitor->uargs, a->start_after, a->max_return, a->iter, a->prval);
visitor->visit_omap_get_keys(uargs, a->start_after, a->max_return, a->iter, a->prval);
}
static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a)
static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a, void* uargs)
{
if(visitor->visit_omap_get_vals)
visitor->visit_omap_get_vals(visitor->uargs, a->start_after, a->filter_prefix, a->max_return, a->iter, a->prval);
visitor->visit_omap_get_vals(uargs, a->start_after, a->filter_prefix, a->max_return, a->iter, a->prval);
}
static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a)
static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a, void* uargs)
{
if(visitor->visit_omap_get_vals_by_keys == NULL) return;
......@@ -78,6 +77,6 @@ static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t v
keys[i] = ptr;
ptr += strlen(keys[i])+1;
}
visitor->visit_omap_get_vals_by_keys(visitor->uargs, keys, a->num_keys, a->iter, a->prval);
visitor->visit_omap_get_vals_by_keys(uargs, keys, a->num_keys, a->iter, a->prval);
}
......@@ -10,7 +10,6 @@
#include "buffer-union.h"
typedef struct read_op_visitor {
void* uargs;
void (*visit_begin)(void*);
void (*visit_stat)(void*, uint64_t*, time_t*, int*);
void (*visit_read)(void*, uint64_t, size_t, buffer_u, size_t*, int*);
......@@ -20,6 +19,6 @@ typedef struct read_op_visitor {
void (*visit_end)(void*);
}* read_op_visitor_t;
void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op);
void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op, void* uarg);
#endif
......@@ -195,6 +195,7 @@ void mobject_store_write_op_truncate(mobject_store_write_op_t write_op,
wr_action_truncate_t action = (wr_action_truncate_t)calloc(1, sizeof(*action));
action->base.type = WRITE_OPCODE_TRUNCATE;
action->offset = offset;
WRITE_ACTION_UPCAST(base, action);
DL_APPEND(write_op->actions, base);
......
......@@ -7,19 +7,20 @@
#include "write-op-visitor.h"
#include "write-op-impl.h"
#include "utlist.h"
#include "log.h"
static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a);
static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a);
static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a);
static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a);
static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a);
static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a);
static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a);
static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a);
static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a);
static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a);
static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a, void* uargs);
static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a, void* uargs);
static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a, void* uargs);
static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a, void* uargs);
static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a, void* uargs);
static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a, void* uargs);
static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a, void* uargs);
static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a, void* uargs);
static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a, void* uargs);
static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a, void* uargs);
typedef void (*dispatch_fn)(write_op_visitor_t, wr_action_base_t);
typedef void (*dispatch_fn)(write_op_visitor_t, wr_action_base_t, void* uargs);
static dispatch_fn visitor_dispatch[_WRITE_OPCODE_END_ENUM_] = {
NULL,
......@@ -35,15 +36,16 @@ static dispatch_fn visitor_dispatch[_WRITE_OPCODE_END_ENUM_] = {
(dispatch_fn)execute_write_op_visitor_on_omap_rm_keys
};
void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op)
void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op, void* uargs)
{
void* uargs = visitor->uargs;
wr_action_base_t a;
visitor->visit_begin(uargs);
DL_FOREACH((write_op->actions), a) {
visitor_dispatch[a->type](visitor, a);
MOBJECT_ASSERT(a->type > 0 && a->type < _WRITE_OPCODE_END_ENUM_,
"Invalid action type");
visitor_dispatch[a->type](visitor, a, uargs);
}
visitor->visit_end(uargs);
......@@ -53,55 +55,55 @@ void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op
// STATIC FUNCTIONS BELOW //
////////////////////////////////////////////////////////////////////////////////
static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a)
static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a, void* uargs)
{
if(visitor->visit_create)
visitor->visit_create(visitor->uargs, a->exclusive);
visitor->visit_create(uargs, a->exclusive);
}
static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a)
static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a, void* uargs)
{
if(visitor->visit_write)
visitor->visit_write(visitor->uargs, a->buffer, a->len, a->offset);
visitor->visit_write(uargs, a->buffer, a->len, a->offset);
}
static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a)
static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a, void* uargs)
{
if(visitor->visit_write_full)
visitor->visit_write_full(visitor->uargs, a->buffer, a->len);
visitor->visit_write_full(uargs, a->buffer, a->len);
}
static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a)
static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a, void* uargs)
{
if(visitor->visit_writesame)
visitor->visit_writesame(visitor->uargs, a->buffer, a->data_len, a->write_len, a->offset);
visitor->visit_writesame(uargs, a->buffer, a->data_len, a->write_len, a->offset);
}
static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a)
static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a, void* uargs)
{
if(visitor->visit_append)
visitor->visit_append(visitor->uargs, a->buffer, a->len);
visitor->visit_append(uargs, a->buffer, a->len);
}
static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a)
static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a, void* uargs)
{
if(visitor->visit_remove)
visitor->visit_remove(visitor->uargs);
visitor->visit_remove(uargs);
}
static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a)
static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a, void* uargs)
{
if(visitor->visit_truncate)
visitor->visit_truncate(visitor->uargs, a->offset);
visitor->visit_truncate(uargs, a->offset);
}
static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a)
static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a, void* uargs)
{
if(visitor->visit_zero)
visitor->visit_zero(visitor->uargs, a->offset, a->len);
visitor->visit_zero(uargs, a->offset, a->len);
}
static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a)
static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a, void* uargs)
{
if(visitor->visit_omap_set == NULL) return;
......@@ -126,12 +128,12 @@ static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_
ptr += lens[i];
}
visitor->visit_omap_set(visitor->uargs, keys, vals, lens, num);
visitor->visit_omap_set(uargs, keys, vals, lens, num);
}
static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a)
static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a, void* uargs)
{
if(visitor->visit_omap_rm_keys != NULL) return;
if(visitor->visit_omap_rm_keys == NULL) return;
size_t num_keys = a->num_keys;
const char* keys[num_keys];
......@@ -143,5 +145,5 @@ static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor,
ptr += strlen(keys[i])+1;
}
visitor->visit_omap_rm_keys(visitor->uargs, keys, num_keys);
visitor->visit_omap_rm_keys(uargs, keys, num_keys);
}
......@@ -10,7 +10,6 @@
#include "buffer-union.h"
typedef struct write_op_visitor {
void* uargs;
void (*visit_begin)(void*);
void (*visit_create)(void*, int);
void (*visit_write)(void*, buffer_u, size_t, uint64_t);
......@@ -25,6 +24,6 @@ typedef struct write_op_visitor {
void (*visit_end)(void*);
}* write_op_visitor_t;
void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op);
void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op, void* uargs);
#endif
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <libmobject-store.h>
#include "types.h"
#include "src/prepare-write-op.h"
/* Main function. */
int main(int argc, char** argv)
......@@ -16,21 +18,53 @@ int main(int argc, char** argv)
/* Register a RPC function */
hg_id_t write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
hg_id_t read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
hg_id_t read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
/* Lookup the address of the server */
hg_addr_t svr_addr;
margo_addr_lookup(mid, argv[1], &svr_addr);
char buffer[256];
unsigned i;
for(i=0; i<256; i++) buffer[i] = 'A'+(i % 26);
{ // WRITE OP TEST
write_op_in_t in;
// TODO fill the write_op_in_t
in.object_name = "test-write-object";
mobject_store_write_op_t write_op = mobject_store_create_write_op();
in.chain = write_op;
// Add a "create" operation
mobject_store_write_op_create(write_op, LIBMOBJECT_CREATE_EXCLUSIVE, NULL);
// Add a "write" operation
mobject_store_write_op_write(write_op, buffer, 128, 32);
// Add a "write_full" operation
mobject_store_write_op_write_full(write_op, buffer, 256);
// Add a "writesame" operation
mobject_store_write_op_writesame(write_op, buffer, 32, 64, 256);
// Add a "append" operation
mobject_store_write_op_append(write_op, buffer, 64);
// Add a "remove" operation
mobject_store_write_op_remove(write_op);
// Add a "truncate" operation
mobject_store_write_op_truncate(write_op, 32);
// Add a "zero" operation
mobject_store_write_op_zero(write_op, 16, 48);
// Add a "omap_set" operation
const char* keys[] = { "matthieu", "rob", "shane", "phil", "robl" };
const char* values[] = { "mdorier@anl.gov", "rross@anl.gov", "ssnyder@anl.gov", "carns@anl.gov", "robl@anl.gov" };
size_t val_sizes[] = { 16, 14, 16, 14, 13 };
mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5);
// Add a omap_rm_keys" operation
mobject_store_write_op_omap_rm_keys(write_op, keys, 5);
// the operation chain should be prepare for sending before being serialized
prepare_write_op(mid, write_op);
hg_handle_t h;
margo_create(mid, svr_addr, write_op_rpc_id, &h);
margo_forward(h, &args);
margo_forward(h, &in);
write_op_out_t resp;
margo_get_output(h, &resp);
......@@ -41,6 +75,7 @@ int main(int argc, char** argv)
margo_destroy(h);
}
#if 0
{ // READ OP TEST
read_op_in_t in;
......@@ -49,7 +84,7 @@ int main(int argc, char** argv)
hg_handle_t h;
margo_create(mid, svr_addr, read_op_rpc_id, &h);
margo_forward(h, &args);
margo_forward(h, &in);
read_op_out_t resp;
margo_get_output(h, &resp);
......@@ -59,12 +94,12 @@ int main(int argc, char** argv)
margo_free_output(h,&resp);
margo_destroy(h);
}
#endif
/* free the address */
margo_addr_free(mid, svr_addr);
/* shut down Margo */
margo_finalize(mid);
margo_finalize(mid);
return 0;
}
......@@ -5,22 +5,18 @@
#include <margo.h>
#include <mercury.h>
#include "types.h"
#include "src/write-op-visitor.h"
/* after serving this number of rpcs, the server will shut down. */
static const int TOTAL_RPCS = 16;
/* number of RPCS already received. */
static int num_rpcs = 0;
/*
* hello_world function to expose as an RPC.
* This function just prints "Hello World"
* and increment the num_rpcs variable.
*
* All Mercury RPCs must have a signature
* hg_return_t f(hg_handle_t h)
*/
hg_return_t sum(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(sum)
hg_return_t mobject_write_op_rpc(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_rpc)
hg_return_t mobject_read_op_rpc(hg_handle_t h);
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_rpc)
/*
* main function.
......@@ -29,7 +25,7 @@ int main(int argc, char** argv)
{
/* Initialize Margo */
margo_instance_id mid = margo_init("bmi+tcp", MARGO_SERVER_MODE, 0, 0);
assert(mid);
assert(mid);
hg_addr_t my_address;
margo_addr_self(mid, &my_address);
......@@ -40,26 +36,99 @@ int main(int argc, char** argv)
printf("Server running at address %s\n", addr_str);
/* Register the RPC by its name ("sum") */
MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum);
MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, mobject_write_op_rpc);
MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, mobject_read_op_rpc)
/* NOTE: there isn't anything else for the server to do at this point
* except wait for itself to be shut down. The
* margo_wait_for_finalize() call here yields to let Margo drive
* progress until that happens.
* except wait for itself to be shut down. The
* margo_wait_for_finalize() call here yields to let Margo drive
* progress until that happens.
*/
margo_wait_for_finalize(mid);
return 0;
}
static void write_op_printer_begin(void*);
static void write_op_printer_end(void*);
static void write_op_printer_create(void*, int);
static void write_op_printer_write(void*, buffer_u, size_t, uint64_t);
static void write_op_printer_write_full(void*, buffer_u, size_t);
static void write_op_printer_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_printer_append(void*, buffer_u, size_t);
static void write_op_printer_remove(void*);
static void write_op_printer_truncate(void*, uint64_t);
static void write_op_printer_zero(void*, uint64_t, uint64_t);
static void write_op_printer_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_printer_omap_rm_keys(void*, char const* const*, size_t);
struct write_op_visitor write_op_printer = {
.visit_begin = write_op_printer_begin,
.visit_create = write_op_printer_create,
.visit_write = write_op_printer_write,
.visit_write_full = write_op_printer_write_full,
.visit_writesame = write_op_printer_writesame,
.visit_append = write_op_printer_append,
.visit_remove = write_op_printer_remove,
.visit_truncate = write_op_printer_truncate,
.visit_zero = write_op_printer_zero,
.visit_omap_set = write_op_printer_omap_set,
.visit_omap_rm_keys = write_op_printer_omap_rm_keys,
.visit_end = write_op_printer_end
};
/* Implementation of the RPC. */
hg_return_t mobject_write_op_rpc(hg_handle_t h)
{
hg_return_t ret;
num_rpcs += 1;
write_op_in_t in;
write_op_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(h);
/* Deserialize the input from the received handle. */
ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS);
/* Execute the operation chain */
execute_write_op_visitor(&write_op_printer, in.chain, NULL);
// set the return value of the RPC
out.ret = 0;
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
/* Free the input data. */
ret = margo_free_input(h, &in);
assert(ret == HG_SUCCESS);
/* We are not going to use the handle anymore, so we should destroy it. */
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
if(num_rpcs == TOTAL_RPCS) {
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
}
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_rpc)
/* Implementation of the RPC. */
hg_return_t sum(hg_handle_t h)
hg_return_t mobject_read_op_rpc(hg_handle_t h)
{
hg_return_t ret;
num_rpcs += 1;
sum_in_t in;
sum_out_t out;
read_op_in_t in;
read_op_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(h);
......@@ -68,8 +137,7 @@ hg_return_t sum(hg_handle_t h)
assert(ret == HG_SUCCESS);
/* Compute the result. */
out.ret = in.x + in.y;
printf("Computed %d + %d = %d\n",in.x,in.y,out.ret);
// TODO
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -92,4 +160,79 @@ hg_return_t sum(hg_handle_t h)
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(sum)
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_rpc)
void write_op_printer_begin(void* unused)
{
printf("<mobject_write_operation>\n");
}
void write_op_printer_end(void* unused)
{
printf("</mobject_write_operation>\n");
}
void write_op_printer_create(void* unused, int exclusive)
{
printf("\t<create exclusive=%d />\n", exclusive);
}
void write_op_printer_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
printf("\t<write from=%ld length=%ld offset=%ld />\n", buf.as_offset, len, offset);
}
void write_op_printer_write_full(void* u, buffer_u buf, size_t len)
{
printf("\t<write_full from=%ld length=%ld />\n", buf.as_offset, len);
}
void write_op_printer_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
printf("\t<writesame from=%ld data_len=%ld write_len=%ld offset=%ld />\n",
buf.as_offset, data_len, write_len, offset);
}
void write_op_printer_append(void* u, buffer_u buf, size_t len)
{
printf("\t<append from=%ld length=%ld />\n", buf.as_offset, len);
}
void write_op_printer_remove(void* u)
{
printf("\t<remove />\n");
}
void write_op_printer_truncate(void* u, uint64_t offset)
{
printf("\t<truncate offset=%ld />\n", offset);
}
void write_op_printer_zero(void* u, uint64_t offset, uint64_t len)
{
printf("\t<zero offset=%ld len=%ld />\n", offset, len);
}
void write_op_printer_omap_set(void* u, char const* const* keys,
char const* const* vals,
const size_t *lens,
size_t num)
{
printf("\t<omap_set num=%ld>\n", num);
unsigned i;
for(i=0; i<num; i++) {
printf("\t\t<record key=\"%s\"\t lens=%ld>%s</record>\n",
keys[i], lens[i], vals[i]);
}
printf("\t</omap_set>\n");
}
void write_op_printer_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
printf("\t<omap_rm_keys num=%ld>\n", num_keys);
unsigned i;
for(i=0; i<num_keys; i++) {
printf("\t\t<record key=\"%s\" />\n", keys[i]);
}
printf("\t</omap_rm_keys>\n");
}
......@@ -3,15 +3,21 @@
#include <mercury.h>
#include <mercury_macros.h>
#include <mercury_proc_string.h>
#include <libmobject-store.h>
#include "src/proc-write-actions.h"
#include "src/proc-read-actions.h"