Commit 141bbf2a authored by Matthieu Dorier's avatar Matthieu Dorier

implemented serializers for write_op and read_op

parent 27db559b
......@@ -436,6 +436,24 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
size_t *bytes_read,
int *prval);
/**
* Start iterating over keys on an object.
*
* They will be returned sorted by key, and the iterator
* will fill in NULL for all values if specified.
*
* @param read_op operation to add this action to
* @param start_after list keys starting after start_after
* @param max_return list no more than max_return keys
* @param iter where to store the iterator
* @param prval where to store the return value from this action
*/
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Start iterating over key/value pairs on an object.
*
......
src_libmobject_store_la_SOURCES = \
src/libmobject-store.c \
src/completion.c \
src/write-op.c \
src/read-op.c
src/write-op-impl.c \
src/read-op-impl.c \
src/proc-write-actions.c \
src/proc-read-actions.c
noinst_HEADERS += src/log.h src/completion.h \
src/write-op.h src/write-actions.h \
src/read-op.h src/read-actions.h \
src/utlist.h
src/write-op-impl.h src/write-actions.h \
src/read-op-impl.h src/read-actions.h \
src/utlist.h src/proc-write-actions.h \
src/proc-read-actions.h
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_ARGS_READ_ACTION_H
#define __MOBJECT_ARGS_READ_ACTION_H
#include "mobject-store-config.h"
#include "read-actions.h"
/**
* This file contains a set of structures meant
* to store arguments for write_op operations in
* a buffer. Some of these structures are self-sufficiant.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
*/
/**
* stat operation
* no header (so no defined structure)
* no extra data
*/
// typedef struct args_rd_action_STAT {
// } args_rd_action_stat;
/**
* read operation
* no extra data
*/
typedef struct args_rd_action_READ {
uint64_t offset;
size_t len;
uint64_t bulk_offset;
} args_rd_action_read;
/**
* omap_get_keys operation
* extra data contains the start_after string
* (see read-actions.h)
*/
typedef struct args_rd_action_OMAP_GET_KEYS {
uint64_t max_return;
size_t data_size;
} args_rd_action_omap_get_keys;
/**
* omap_get_vals operation
* extra data contains the start_after and
* filter_prefix strings
* (see read-actions.h)
*/
typedef struct args_rd_action_OMAP_GET_VALS {
uint64_t max_return;
size_t data_size;
} args_rd_action_omap_get_vals;
/**
* omap_get_vals_by_keys operation
* extra data contains the list of null-terminated
* keys; the total size of the extra data in bytes is
* in data_size
*/
typedef struct args_rd_action_OMAP_GET_VALS_BY_KEYS {
size_t num_keys;
size_t data_size;
} args_rd_action_omap_get_vals_by_keys;
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PI_WRITE_ACTION_H
#define __MOBJECT_PI_WRITE_ACTION_H
#include "mobject-store-config.h"
#include "write-actions.h"
/**
* This file contains a set of structures meant
* to store arguments for write_op operations in
* a buffer. Some of these structures are self-sufficiant.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
*/
/**
* create operation
* no extra data
*/
typedef struct args_wr_action_CREATE {
int exclusive;
} args_wr_action_create;
/**
* write operation
* no extra data
*/
typedef struct args_wr_action_WRITE {
uint64_t buffer_position; // position in the received bulk handle
size_t len; // length in the received bulk handle
uint64_t offset; // offset at which to position the data in the object
} args_wr_action_write;
/**
* write_full operation
* no extra data
*/
typedef struct args_wr_action_WRITE_FULL {
uint64_t buffer_position; // position in the received bulk handle
size_t len; // length in the received bulk handle
} args_wr_action_write_full;
/**
* writesame operation
* no extra data
*/
typedef struct args_wr_action_WRITE_SAME {
uint64_t buffer_position; // position in the received bulk handle
size_t data_len; // length to take from received data
size_t write_len; // length to write in the object
uint64_t offset; // at which offset to write in the object
} args_wr_action_write_same;
/**
* append operation
* no extra data
*/
typedef struct args_wr_action_APPEND {
uint64_t buffer_position; // position in the received bulk handle
size_t len; // length to take from received data
} args_wr_action_append;
/**
* remove operation
* no header (so no definition needed)
* no extra data
*/
// typedef struct args_wr_action_REMOVE {
// } args_wr_action_remove;
/**
* truncate operation
* no extra data
*/
typedef struct args_wr_action_TRUNCATE {
uint64_t offset; // offset at which to truncate the object
} args_wr_action_truncate;
/**
* zero operation
* no extra data
*/
typedef struct args_wr_action_ZERO {
uint64_t offset; // offset at which to start zero-ing
uint64_t len; // length to set to zero
} args_wr_action_zero;
/**
* omap_set operation
* data_size represents the size of the extra data
* to be read after this header.
* extra data contains serialized keys and values
* (see write-op-impl.h for the format)
*/
typedef struct args_wr_action_OMAP_SET {
size_t num;
size_t data_size;
} args_wr_action_omap_set;
/**
* rm_keys operation
* data_size represents the size of the extra data
* to be read after this header.
* extra data conrains serialized key names
* (see write-op-impl.h for the format)
*/
typedef struct args_wr_action_RM_KEYS {
size_t num_keys;
size_t data_size;
} args_wr_action_omap_rm_keys;
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "proc-read-actions.h"
#include "args-read-actions.h"
#include "read-op-impl.h"
#include "utlist.h"
/**
* This file contains the main hg_proc_mobject_store_read_op_t
* serialization function. It relies on helper functions to encode
* and decode each possible write action. Encoding/decoding works
* by creating an args_rd_action_* object and passing it the required
* parameters, then serializing the structure along with potential
* additional data.
*/
typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);
static hg_return_t encode_read_action_stat(hg_proc_t proc, uint64_t* pos, rd_action_stat_t action)
{
return HG_SUCCESS;
}
static hg_return_t decode_read_action_stat(hg_proc_t proc, uint64_t* pos, rd_action_stat_t* action)
{
hg_return_t ret = HG_SUCCESS;
*action = (rd_action_stat_t)calloc(1, sizeof(**action));
return ret;
}
static hg_return_t encode_read_action_read(hg_proc_t proc, uint64_t* pos, rd_action_read_t action)
{
args_rd_action_read a;
a.offset = action->offset;
a.len = action->len;
a.bulk_offset = action->u.bulk_offset;
*pos += a.len;
return hg_proc_memcpy(proc, &a, sizeof(a));
}
static hg_return_t decode_read_action_read(hg_proc_t proc, uint64_t* pos, rd_action_read_t* action)
{
hg_return_t ret = HG_SUCCESS;
args_rd_action_read a;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
*action = (rd_action_read_t)calloc(1, sizeof(**action));
(*action)->offset = a.offset;
(*action)->len = a.len;
(*action)->u.bulk_offset = a.bulk_offset;
*pos += a.len;
return ret;
}
static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_keys_t action)
{
args_rd_action_omap_get_keys a;
a.max_return = action->max_return;
a.data_size = action->data_size;
hg_return_t ret;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_string_t(proc, action->start_after);
return ret;
}
static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_keys_t* action)
{
hg_return_t ret = HG_SUCCESS;
args_rd_action_omap_get_keys a;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
*action = (rd_action_omap_get_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
(*action)->max_return = a.max_return;
(*action)->data_size = a.data_size;
(*action)->start_after = (*action)->data;
ret = hg_proc_hg_string_t(proc, (*action)->start_after);
return ret;
}
static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_vals_t action)
{
args_rd_action_omap_get_vals a;
a.max_return = action->max_return;
a.data_size = action->data_size;
hg_return_t ret;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_string_t(proc, action->start_after);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_string_t(proc, action->filter_prefix);
return ret;
}
static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_vals_t* action)
{
hg_return_t ret = HG_SUCCESS;
args_rd_action_omap_get_vals a;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
*action = (rd_action_omap_get_vals_t)calloc(1, sizeof(**action)-1+a.data_size);
(*action)->max_return = a.max_return;
(*action)->data_size = a.data_size;
(*action)->start_after = (*action)->data;
size_t s = strlen((*action)->start_after);
(*action)->filter_prefix = (*action)->data + s + 1;
ret = hg_proc_hg_string_t(proc, (*action)->start_after);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_string_t(proc, (*action)->filter_prefix);
return ret;
}
static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_vals_by_keys_t action)
{
args_rd_action_omap_get_vals_by_keys a;
a.num_keys = action->num_keys;
a.data_size = action->data_size;
hg_return_t ret;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, action->data, action->data_size);
return ret;
}
static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc, uint64_t* pos, rd_action_omap_get_vals_by_keys_t* action)
{
hg_return_t ret = HG_SUCCESS;
args_rd_action_omap_get_vals_by_keys a;
ret = hg_proc_memcpy(proc, &a, sizeof(a));
if(ret != HG_SUCCESS) return ret;
*action = (rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
(*action)->num_keys = a.num_keys;
(*action)->data_size = a.data_size;
ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
return ret;
}
/**
* The following two arrays are here to avoid a big switch.
*/
/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(encode_fn)encode_read_action_stat,
(encode_fn)encode_read_action_read,
(encode_fn)encode_read_action_omap_get_keys,
(encode_fn)encode_read_action_omap_get_vals,
(encode_fn)encode_read_action_omap_get_vals_by_keys
};
/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(decode_fn)decode_read_action_stat,
(decode_fn)decode_read_action_read,
(decode_fn)decode_read_action_omap_get_keys,
(decode_fn)decode_read_action_omap_get_vals,
(decode_fn)decode_read_action_omap_get_vals_by_keys
};
/**
* Serialization function for mobject_store_read_op_t objects.
* For encoding, the object should be prepared first (that is, the union fields
* pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
*/
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t read_op)
{
rd_action_base_t elem, tmp;
hg_return_t ret = HG_SUCCESS;
uintptr_t position = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
// encode the bulk handle associated with the series of operations
ret = hg_proc_hg_bulk_t(proc, &(read_op->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// encode the number of actions
ret = hg_proc_memcpy(proc, &(read_op->num_actions), sizeof(read_op->num_actions));
if(ret != HG_SUCCESS) return ret;
// for each action ...
DL_FOREACH(read_op->actions,elem) {
read_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), "Invalid read_op opcode");
// encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
// encode the action's arguments
ret = encode_read_action[opcode](proc, &position, elem);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
// decode the bulk handle
ret = hg_proc_hg_bulk_t(proc, &(read_op->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// decode the number of actions
ret = hg_proc_memcpy(proc, &(read_op->num_actions), sizeof(read_op->num_actions));
if(ret != HG_SUCCESS) return ret;
rd_action_base_t next_action;
size_t i;
for(i = 0; i < read_op->num_actions; i++) {
// decode the current action's type
read_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), "Invalid write_op opcode");
// decode the action's arguments
ret = decode_read_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret;
// append to the list
DL_APPEND(read_op->actions, next_action);
}
break;
case HG_FREE:
mobject_store_release_read_op(read_op);
return HG_SUCCESS;
}
return ret;
}
......@@ -3,15 +3,13 @@
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_WRITE_OP_H
#define __MOBJECT_WRITE_OP_H
#ifndef __MOBJECT_PROC_READ_ACTION_H
#define __MOBJECT_PROC_READ_ACTION_H
#include "mobject-store-config.h"
#include "write-actions.h"
#include <margo.h>
#include "libmobject-store.h"
struct mobject_store_write_op {
wr_action_base_t actions;
};
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t list);
#endif
This diff is collapsed.
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PROC_WRITE_ACTION_H
#define __MOBJECT_PROC_WRITE_ACTION_H
#include <margo.h>
#include "libmobject-store.h"
hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write_op_t list);
#endif
......@@ -12,8 +12,10 @@ typedef enum {
READ_OPCODE_BASE = 0,
READ_OPCODE_STAT,
READ_OPCODE_READ,
READ_OPCODE_OMAP_GET_KEYS,
READ_OPCODE_OMAP_GET_VALS,
READ_OPCODE_OMAP_GET_VALS_BY_KEYS,
_READ_OPCODE_END_ENUM_
} read_op_code_t;
#define READ_ACTION_DOWNCAST(child_obj, base_obj, child_category) \
......@@ -42,11 +44,25 @@ typedef struct rd_action_READ {
struct rd_action_BASE base;
uint64_t offset;
size_t len;
char* buffer;
union {
char* buffer;
uint64_t bulk_offset;
} u;
size_t* bytes_read;
int* prval;
}* rd_action_read_t;
typedef struct rd_action_OMAP_GET_KEYS {
struct rd_action_BASE base;
const char* start_after;
uint64_t max_return;
mobject_store_omap_iter_t* iter;
int* prval;
size_t data_size;
char data[1];
}* rd_action_omap_get_keys_t;
// data field here to hold embedded data (start_after)
typedef struct rd_action_OMAP_GET_VALS {
struct rd_action_BASE base;
const char* start_after;
......@@ -54,6 +70,7 @@ typedef struct rd_action_OMAP_GET_VALS {
uint64_t max_return;
mobject_store_omap_iter_t* iter;
int* prval;
size_t data_size;
char data[1];
}* rd_action_omap_get_vals_t;
// data field here to hold embedded data (start_after
......@@ -61,11 +78,12 @@ typedef struct rd_action_OMAP_GET_VALS {
typedef struct rd_action_OMAP_GET_VALS_BY_KEYS {
struct rd_action_BASE base;
size_t keys_len;
size_t num_keys;
mobject_store_omap_iter_t* iter;
int* prval;
char keys[1];
size_t data_size;
char data[1];
}* rd_action_omap_get_vals_by_keys_t;
// keys is a contiguous buffer holding all
// data is a contiguous buffer holding all
// the null-terminated keys
#endif
......@@ -10,7 +10,7 @@
#include "utlist.h"
#include "libmobject-store.h"
#include "log.h"
#include "read-op.h"
#include "read-op-impl.h"
#include "completion.h"
mobject_store_read_op_t mobject_store_create_read_op(void)
......@@ -19,6 +19,8 @@ mobject_store_read_op_t mobject_store_create_read_op(void)
(mobject_store_read_op_t)calloc(1, sizeof(*read_op));
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "Could not allocate read_op");
read_op->actions = (rd_action_base_t)0;
read_op->bulk_handle = HG_BULK_NULL;
read_op->use_local_pointers = 1;
return read_op;
}
......@@ -42,6 +44,7 @@ void mobject_store_read_op_stat(mobject_store_read_op_t read_op,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
MOBJECT_ASSERT(read_op->use_local_pointers, "can't modify a read_op that has been sent");
rd_action_stat_t action = (rd_action_stat_t)calloc(1, sizeof(*action));
action->base.type = READ_OPCODE_STAT;
......@@ -61,12 +64,13 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
MOBJECT_ASSERT(read_op->use_local_pointers, "can't modify a read_op that has been sent");
rd_action_read_t action = (rd_action_read_t)calloc(1, sizeof(*action));
action->base.type = READ_OPCODE_READ;
action->offset = offset;
action->len = len;
action->buffer = buffer;
action->u.buffer = buffer;
action->bytes_read = bytes_read;
action->prval = prval;
......@@ -74,6 +78,30 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
DL_APPEND(read_op->actions, base);
}
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
MOBJECT_ASSERT(read_op->use_local_pointers, "can't modify a read_op that has been sent");
size_t strl = strlen(start_after);
rd_action_omap_get_keys_t action = (rd_action_omap_get_keys_t)calloc(1, sizeof(*action)+strl);
action->base.type = READ_OPCODE_OMAP_GET_KEYS;
action->start_after = action->data;
action->max_return = max_return;
action->iter = iter;
action->prval = prval;
action->data_size = strl+1;
strcpy(action->data, start_after);
READ_ACTION_UPCAST(base, action);
DL_APPEND(read_op->actions, base);
}
void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
const char *start_after,
const char *filter_prefix,
......@@ -82,6 +110,7 @@ void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
MOBJECT_ASSERT(read_op->use_local_pointers, "can't modify a read_op that has been sent");
// compute required size for embedded data
size_t strl1 = strlen(start_after)+1;
......@@ -95,6 +124,7 @@ void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
action->max_return = max_return;
action->iter = iter;
action->prval = prval;
action->data_size = extra_mem;
strcpy(action->data, start_after);
strcpy(action->data + strl1, filter_prefix);
......@@ -109,6 +139,7 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
MOBJECT_ASSERT(read_op->use_local_pointers, "can't modify a read_op that has been sent");
// computing extra memory required to hold keys
size_t extra_mem = 0;
......@@ -120,10 +151,11 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
rd_action_omap_get_vals_by_keys_t action =
(rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(*action) - 1 + extra_mem);
action->base.type = READ_OPCODE_OMAP_GET_VALS_BY_KEYS;
action->keys_len = keys_len;
action->num_keys = keys_len;
action->iter = iter;
action->prval = prval;
char* s = action->keys;
action->data_size = extra_mem;
char* s = action->data;
for(i = 0; i < keys_len; i++) {
strcpy(s, keys[i]);
s += strlen(keys[i]) + 1;
......
......@@ -6,11 +6,15 @@
#ifndef __MOBJECT_READ_OP_H
#define __MOBJECT_READ_OP_H
#include <margo.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "read-actions.h"
struct mobject_store_read_op {
size_t num_actions;
int use_local_pointers;
hg_bulk_t bulk_handle;
size_t num_actions;
rd_action_base_t actions;
};
......
......@@ -19,7 +19,8 @@ typedef enum {
WRITE_OPCODE_TRUNCATE,
WRITE_OPCODE_ZERO,
WRITE_OPCODE_OMAP_SET,
WRITE_OPCODE_OMAP_RM_KEYS
WRITE_OPCODE_OMAP_RM_KEYS,
_WRITE_OPCODE_END_ENUM_
} write_op_code_t;
#define WRITE_ACTION_DOWNCAST(child_obj, base_obj, child_category) \
......@@ -44,20 +45,29 @@ typedef struct wr_action_CREATE {
typedef struct wr_action_WRITE {
struct wr_action_BASE base;
const char* buffer;
union {
const char* buffer;
uint64_t bulk_offset;
} u;
size_t len;
uint64_t offset;
}* wr_action_write_t;
typedef struct wr_action_WRITE_FULL {
struct wr_action_BASE base;
const char* buffer;
union {
const char* buffer;
uint64_t bulk_offset;
} u;
size_t len;
}* wr_action_write_full_t;
typedef struct wr_action_WRITE_SAME {
struct wr_action_BASE base;
const char* buffer;
union {
const char* buffer;