Commit ecbdaa69 authored by Matthieu Dorier's avatar Matthieu Dorier

big reorganization

parent c8cbf938
noinst_HEADERS += \
src/args-read-actions.h \
src/args-write-actions.h \
src/buffer-union.h \
src/completion.h \
src/log.h \
src/omap-iter-impl.h \
src/prepare-read-op.h \
src/prepare-write-op.h \
src/proc-omap-iter.h \
src/proc-read-actions.h \
src/proc-read-responses.h \
src/proc-write-actions.h \
src/read-actions.h \
src/read-op-impl.h \
src/read-op-visitor.h \
src/read-resp-impl.h \
src/utlist.h \
src/write-actions.h \
src/write-op-impl.h \
src/write-op-visitor.h
src/io-chain/args-read-actions.h \
src/io-chain/args-write-actions.h \
src/util/buffer-union.h \
src/aio/completion.h \
src/util/log.h \
src/omap-iter/omap-iter-impl.h \
src/io-chain/prepare-read-op.h \
src/io-chain/prepare-write-op.h \
src/omap-iter/proc-omap-iter.h \
src/io-chain/proc-read-actions.h \
src/io-chain/proc-read-responses.h \
src/io-chain/proc-write-actions.h \
src/io-chain/read-actions.h \
src/io-chain/read-op-impl.h \
src/io-chain/read-op-visitor.h \
src/io-chain/read-resp-impl.h \
src/util/utlist.h \
src/io-chain/write-actions.h \
src/io-chain/write-op-impl.h \
src/io-chain/write-op-visitor.h \
src/server/exec-write-op.h \
src/server/exec-read-op.h
src_libmobject_store_la_SOURCES = \
src/completion.c \
src/libmobject-store.c \
src/omap-iter-impl.c \
src/prepare-read-op.c \
src/prepare-write-op.c \
src/proc-omap-iter.c \
src/proc-read-actions.c \
src/proc-read-responses.c \
src/proc-write-actions.c \
src/read-op-impl.c \
src/read-op-visitor.c \
src/read-resp-impl.c \
src/write-op-impl.c \
src/write-op-visitor.c
src/aio/completion.c \
src/client/libmobject-store.c \
src/omap-iter/omap-iter-impl.c \
src/io-chain/prepare-read-op.c \
src/io-chain/prepare-write-op.c \
src/omap-iter/proc-omap-iter.c \
src/io-chain/proc-read-actions.c \
src/io-chain/proc-read-responses.c \
src/io-chain/proc-write-actions.c \
src/io-chain/read-op-impl.c \
src/io-chain/read-op-visitor.c \
src/io-chain/read-resp-impl.c \
src/io-chain/write-op-impl.c \
src/io-chain/write-op-visitor.c
src_libmobject_server_la_SOURCES = \
src/mobject-server.c
src/aio/completion.c \
src/server/mobject-server.c \
src/omap-iter/omap-iter-impl.c \
src/omap-iter/proc-omap-iter.c \
src/io-chain/proc-read-actions.c \
src/io-chain/proc-read-responses.c \
src/io-chain/proc-write-actions.c \
src/io-chain/read-op-impl.c \
src/io-chain/read-op-visitor.c \
src/io-chain/read-resp-impl.c \
src/io-chain/write-op-impl.c \
src/io-chain/write-op-visitor.c \
src/server/exec-write-op.c \
src/server/exec-read-op.c
src_mobject_server_daemon_SOURCES = \
src/mobject-server-daemon.c
src/server/mobject-server-daemon.c
src_mobject_server_daemon_LDADD = \
src/libmobject-server.la
src/libmobject-server.la
bin_PROGRAMS += \
src/mobject-server-daemon
......
......@@ -7,8 +7,8 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "completion.h"
#include "log.h"
#include "src/aio/completion.h"
#include "src/util/log.h"
int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_callback_t cb_complete,
......
......@@ -32,10 +32,9 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
(void)id; /* XXX: id unused in mobject */
/* allocate a new cluster handle and set some fields */
cluster_handle = malloc(sizeof(*cluster_handle));
cluster_handle = (mobject_store_handle_t*)calloc(1,sizeof(*cluster_handle));
if (!cluster_handle)
return -1;
memset(cluster_handle, 0, sizeof(cluster_handle));
/* use env variable to determine how to connect to the cluster */
/* NOTE: this is the _only_ method for specifying a cluster for now... */
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_ARGS_READ_ACTION_H
#include "mobject-store-config.h"
#include "read-actions.h"
#include "src/io-chain/read-actions.h"
/**
* This file contains a set of structures meant
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_PI_WRITE_ACTION_H
#include "mobject-store-config.h"
#include "write-actions.h"
#include "src/io-chain/write-actions.h"
/**
* This file contains a set of structures meant
......
......@@ -3,10 +3,10 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "prepare-read-op.h"
#include "read-op-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/io-chain/read-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
static void prepare_read(uint64_t* cur_offset,
rd_action_read_t action,
......
......@@ -3,10 +3,10 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "prepare-write-op.h"
#include "write-op-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/write-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
static void convert_write(uint64_t* cur_offset,
wr_action_write_t action,
......
......@@ -3,11 +3,11 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "proc-read-actions.h"
#include "args-read-actions.h"
#include "read-op-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/proc-read-actions.h"
#include "src/io-chain/args-read-actions.h"
#include "src/io-chain/read-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
/**
* This file contains the main hg_proc_mobject_store_read_op_t
......
......@@ -4,12 +4,12 @@
* See COPYRIGHT in top-level directory.
*/
#include <mercury_proc.h>
#include "proc-read-responses.h"
#include "read-responses.h"
#include "read-resp-impl.h"
#include "proc-omap-iter.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/proc-read-responses.h"
#include "src/io-chain/read-responses.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/proc-omap-iter.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
typedef hg_return_t (*encode_fn)(hg_proc_t, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, void*);
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_PROC_READ_RESPONSE_H
#include <margo.h>
#include "read-resp-impl.h"
#include "src/io-chain/read-resp-impl.h"
hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response);
......
......@@ -3,11 +3,11 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "proc-write-actions.h"
#include "args-write-actions.h"
#include "write-op-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/proc-write-actions.h"
#include "src/io-chain/args-write-actions.h"
#include "src/io-chain/write-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
/**
* This file contains the main hg_proc_mobject_store_write_op_t
......
......@@ -8,7 +8,7 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "buffer-union.h"
#include "src/util/buffer-union.h"
typedef enum {
READ_OPCODE_BASE = 0,
......
......@@ -7,11 +7,11 @@
#include <stdlib.h>
#include <string.h>
#include "mobject-store-config.h"
#include "utlist.h"
#include "libmobject-store.h"
#include "log.h"
#include "read-op-impl.h"
#include "completion.h"
#include "src/io-chain/read-op-impl.h"
#include "src/aio/completion.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
mobject_store_read_op_t mobject_store_create_read_op(void)
{
......
......@@ -9,7 +9,7 @@
#include <margo.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "read-actions.h"
#include "src/io-chain/read-actions.h"
/**
* This object represents a handler for a list of actions
......
......@@ -4,9 +4,9 @@
* See COPYRIGHT in top-level directory.
*/
#include "libmobject-store.h"
#include "read-op-visitor.h"
#include "read-op-impl.h"
#include "utlist.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-op-impl.h"
#include "src/util/utlist.h"
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs);
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs);
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_READ_OP_VISITOR_H
#include "libmobject-store.h"
#include "buffer-union.h"
#include "src/util/buffer-union.h"
typedef struct read_op_visitor {
void (*visit_begin)(void*);
......
......@@ -4,13 +4,13 @@
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include "read-op-impl.h"
#include "read-responses.h"
#include "read-resp-impl.h"
#include "read-actions.h"
#include "omap-iter-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/read-op-impl.h"
#include "src/io-chain/read-responses.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/io-chain/read-actions.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
/**
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_WRITE_OPCODES_H
#include "mobject-store-config.h"
#include "buffer-union.h"
#include "src/util/buffer-union.h"
typedef enum {
WRITE_OPCODE_BASE = 0,
......
......@@ -7,11 +7,11 @@
#include <stdlib.h>
#include <string.h>
#include "mobject-store-config.h"
#include "utlist.h"
#include "libmobject-store.h"
#include "log.h"
#include "write-op-impl.h"
#include "completion.h"
#include "src/io-chain/write-op-impl.h"
#include "src/aio/completion.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
mobject_store_write_op_t mobject_store_create_write_op(void)
{
......
......@@ -9,7 +9,7 @@
#include <margo.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "write-actions.h"
#include "src/io-chain/write-actions.h"
/**
* The mobject_store_write_op structure is what a
......
......@@ -4,10 +4,10 @@
* See COPYRIGHT in top-level directory.
*/
#include "libmobject-store.h"
#include "write-op-visitor.h"
#include "write-op-impl.h"
#include "utlist.h"
#include "log.h"
#include "src/io-chain/write-op-visitor.h"
#include "src/io-chain/write-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a, void* uargs);
static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a, void* uargs);
......
......@@ -7,7 +7,7 @@
#define __MOBJECT_WRITE_OP_VISITOR_H
#include "libmobject-store.h"
#include "buffer-union.h"
#include "src/util/buffer-union.h"
typedef struct write_op_visitor {
void (*visit_begin)(void*);
......
......@@ -5,10 +5,10 @@
*/
#include <stdlib.h>
#include <string.h>
#include "utlist.h"
#include "libmobject-store.h"
#include "omap-iter-impl.h"
#include "log.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
void omap_iter_create(mobject_store_omap_iter_t* iter)
{
......
......@@ -4,8 +4,8 @@
* See COPYRIGHT in top-level directory.
*/
#include <margo.h>
#include "utlist.h"
#include "proc-omap-iter.h"
#include "src/omap-iter/proc-omap-iter.h"
#include "src/util/utlist.h"
hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap_iter_t* iter)
{
......
#ifndef PARAM_H
#define PARAM_H
#ifndef __RPC_TYPE_READ_OP_H
#define __RPC_TYPE_READ_OP_H
#include <mercury.h>
#include <mercury_macros.h>
#include <mercury_proc_string.h>
#include <libmobject-store.h>
#include "src/proc-write-actions.h"
#include "src/proc-read-actions.h"
#include "src/proc-read-responses.h"
MERCURY_GEN_PROC(write_op_in_t,
((hg_string_t)(object_name))\
((mobject_store_write_op_t)(write_op)))
MERCURY_GEN_PROC(write_op_out_t, ((int32_t)(ret)))
#include "src/io-chain/proc-write-actions.h"
#include "src/io-chain/proc-read-actions.h"
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(read_op_in_t,
((hg_string_t)(object_name))\
......
#ifndef __RPC_TYPE_WRITE_OP_H
#define __RPC_TYPE_WRITE_OP_H
#include <mercury.h>
#include <mercury_macros.h>
#include <mercury_proc_string.h>
#include <libmobject-store.h>
#include "src/io-chain/proc-write-actions.h"
#include "src/io-chain/proc-read-actions.h"
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(write_op_in_t,
((hg_string_t)(object_name))\
((mobject_store_write_op_t)(write_op)))
MERCURY_GEN_PROC(write_op_out_t, ((int32_t)(ret)))
#endif
#include <stdio.h>
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/server/exec-read-op.h"
static void read_op_printer_begin(void*);
static void read_op_printer_stat(void*, uint64_t*, time_t*, int*);
static void read_op_printer_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_printer_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_end(void*);
struct read_op_visitor read_op_printer = {
.visit_begin = read_op_printer_begin,
.visit_end = read_op_printer_end,
.visit_stat = read_op_printer_stat,
.visit_read = read_op_printer_read,
.visit_omap_get_keys = read_op_printer_omap_get_keys,
.visit_omap_get_vals = read_op_printer_omap_get_vals,
.visit_omap_get_vals_by_keys = read_op_printer_omap_get_vals_by_keys
};
void execute_read_op(mobject_store_read_op_t read_op, const char* object_name)
{
execute_read_op_visitor(&read_op_printer, read_op, (void*)object_name);
}
void read_op_printer_begin(void* u)
{
printf("<mobject_read_operation object_name=\"%s\">\n", (char*)u);
}
void read_op_printer_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
printf("\t<stat/>\n");
*psize = 128;
time(pmtime);
*prval = 1234;
}
void read_op_printer_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
printf("\t<read offset=%ld length=%ld to=%ld/>\n",offset, len, buf.as_offset);
*bytes_read = len;
*prval = 1235;
}
void read_op_printer_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_keys start_after=\"%s\" max_return=%ld />\n", start_after, max_return);
omap_iter_create(iter);
// use omap_iter_append to add things to the iterator
*prval = 1236;
}
void read_op_printer_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_vals start_after=\"%s\" filter_prefix=\"%s\" max_return=%ld />\n",
start_after, filter_prefix, max_return);
omap_iter_create(iter);
*prval = 1237;
}
void read_op_printer_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_vals_by_keys num=%ld>\n", num_keys);
unsigned i;
for(i=0; i<num_keys; i++)
printf("\t\t<record key=\"%s\" />\n", keys[i]);
printf("\t<omap_get_vals_by_keys/>\n");
omap_iter_create(iter);
*prval = 1238;
}
void read_op_printer_end(void* u)
{
printf("</mobject_read_operation>\n");
}
#ifndef __EXEC_READ_OP_H
#define __EXEC_READ_OP_H
#include <margo.h>
#include "libmobject-store.h"
void execute_read_op(mobject_store_read_op_t read_op, const char* object_name);
#endif
#include <stdio.h>
#include "src/server/exec-write-op.h"
#include "src/io-chain/write-op-visitor.h"
static void write_op_printer_begin(void*);
static void write_op_printer_end(void*);
static void write_op_printer_create(void*, int);
static void write_op_printer_write(void*, buffer_u, size_t, uint64_t);
static void write_op_printer_write_full(void*, buffer_u, size_t);
static void write_op_printer_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_printer_append(void*, buffer_u, size_t);
static void write_op_printer_remove(void*);
static void write_op_printer_truncate(void*, uint64_t);
static void write_op_printer_zero(void*, uint64_t, uint64_t);
static void write_op_printer_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_printer_omap_rm_keys(void*, char const* const*, size_t);
struct write_op_visitor write_op_printer = {
.visit_begin = write_op_printer_begin,
.visit_create = write_op_printer_create,
.visit_write = write_op_printer_write,
.visit_write_full = write_op_printer_write_full,
.visit_writesame = write_op_printer_writesame,
.visit_append = write_op_printer_append,
.visit_remove = write_op_printer_remove,
.visit_truncate = write_op_printer_truncate,
.visit_zero = write_op_printer_zero,
.visit_omap_set = write_op_printer_omap_set,
.visit_omap_rm_keys = write_op_printer_omap_rm_keys,
.visit_end = write_op_printer_end
};
void execute_write_op(mobject_store_write_op_t write_op, const char* object_name)
{
/* Execute the operation chain */
execute_write_op_visitor(&write_op_printer, write_op, (void*)object_name);
}
void write_op_printer_begin(void* object_name)
{
printf("<mobject_write_operation on=\"%s\">\n",(char*)object_name);
}
void write_op_printer_end(void* unused)
{
printf("</mobject_write_operation>\n");
}
void write_op_printer_create(void* unused, int exclusive)
{
printf("\t<create exclusive=%d />\n", exclusive);
}
void write_op_printer_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
printf("\t<write from=%ld length=%ld offset=%ld />\n", buf.as_offset, len, offset);
}
void write_op_printer_write_full(void* u, buffer_u buf, size_t len)
{
printf("\t<write_full from=%ld length=%ld />\n", buf.as_offset, len);
}
void write_op_printer_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
printf("\t<writesame from=%ld data_len=%ld write_len=%ld offset=%ld />\n",
buf.as_offset, data_len, write_len, offset);
}
void write_op_printer_append(void* u, buffer_u buf, size_t len)
{
printf("\t<append from=%ld length=%ld />\n", buf.as_offset, len);
}
void write_op_printer_remove(void* u)
{
printf("\t<remove />\n");
}
void write_op_printer_truncate(void* u, uint64_t offset)
{
printf("\t<truncate offset=%ld />\n", offset);
}
void write_op_printer_zero(void* u, uint64_t offset, uint64_t len)
{
printf("\t<zero offset=%ld len=%ld />\n", offset, len);
}
void write_op_printer_omap_set(void* u, char const* const* keys,
char const* const* vals,
const size_t *lens,
size_t num)
{
printf("\t<omap_set num=%ld>\n", num);
unsigned i;
for(i=0; i<num; i++) {
printf("\t\t<record key=\"%s\"\t lens=%ld>%s</record>\n",
keys[i], lens[i], vals[i]);
}
printf("\t</omap_set>\n");
}
void write_op_printer_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
printf("\t<omap_rm_keys num=%ld>\n", num_keys);
unsigned i;
for(i=0; i<num_keys; i++) {
printf("\t\t<record key=\"%s\" />\n", keys[i]);
}
printf("\t</omap_rm_keys>\n");
}
#ifndef __EXEC_WRITE_OP_H
#define __EXEC_WRITE_OP_H
#include <margo.h>
#include "libmobject-store.h"
void execute_write_op(mobject_store_write_op_t write_op, const char* object_name);
#endif
......@@ -13,7 +13,10 @@
#include <ssg-mpi.h>
#include "mobject-server.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/server/exec-write-op.h"
#include "src/server/exec-read-op.h"
typedef struct mobject_server_context
{
......@@ -25,9 +28,13 @@ typedef struct mobject_server_context
static int mobject_server_register(mobject_server_context_t *srv_ctx);
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
/* mobject RPC IDs */
static hg_id_t mobject_shutdown_rpc_id;
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
/* XXX one global mobject server state struct */
static mobject_server_context_t *g_srv_ctx = NULL;
......@@ -118,9 +125,17 @@ static int mobject_server_register(mobject_server_context_t *srv_ctx)
{
int ret=0;
mobject_shutdown_rpc_id = MARGO_REGISTER(srv_ctx->mid, "mobject_shutdown",
margo_instance_id mid = srv_ctx->mid;
mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
void, void, mobject_shutdown_ult);
mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult);
mobject_read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op",
read_op_in_t, read_op_out_t, mobject_read_op_ult)
#if 0