Commit 050f9d2f authored by Matthieu Dorier's avatar Matthieu Dorier

implemented read_op functions

parent 1a1a05ac
src_libmobject_store_la_SOURCES = \
src/libmobject-store.c \
src/completion.c \
src/write-op.c
src/write-op.c \
src/read-op.c
noinst_HEADERS += src/log.h src/completion.h \
src/write-op.h src/write-actions.h\
src/write-op.h src/write-actions.h \
src/read-op.h src/read-actions.h \
src/utlist.h
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_READ_OPCODES_H
#define __MOBJECT_READ_OPCODES_H
#include "mobject-store-config.h"
typedef enum {
READ_OPCODE_BASE = 0,
READ_OPCODE_STAT,
READ_OPCODE_READ,
READ_OPCODE_OMAP_GET_VALS,
READ_OPCODE_OMAP_GET_VALS_BY_KEYS,
} read_op_code_t;
#define READ_ACTION_DOWNCAST(child_obj, base_obj, child_category) \
if(READ_OPCODE_ ## child_category != base_obj->type) {\
MOBJECT_LOG("Downcast error: " #base_obj " is not of type READ_ACTION_" #child_category);\
}\
struct rd_action_ ## child_category * child_obj = (struct rd_action_ ## child_category *) base_obj;
#define READ_ACTION_UPCAST(base_obj, child_obj) \
struct rd_action_BASE* base_obj = (struct rd_action_BASE*) child_obj;
typedef struct rd_action_BASE {
read_op_code_t type;
struct rd_action_BASE* prev;
struct rd_action_BASE* next;
}* rd_action_base_t;
typedef struct rd_action_STAT {
struct rd_action_BASE base;
uint64_t* psize;
time_t* pmtime;
int* prval;
}* rd_action_stat_t;
typedef struct rd_action_READ {
struct rd_action_BASE base;
uint64_t offset;
size_t len;
char* buffer;
size_t* bytes_read;
int* prval;
}* rd_action_read_t;
typedef struct rd_action_OMAP_GET_VALS {
struct rd_action_BASE base;
const char* start_after;
const char* filter_prefix;
uint64_t max_return;
mobject_store_omap_iter_t* iter;
int* prval;
char data[1];
}* rd_action_omap_get_vals_t;
// data field here to hold embedded data (start_after
// and filter_prefix strings)
typedef struct rd_action_OMAP_GET_VALS_BY_KEYS {
struct rd_action_BASE base;
size_t keys_len;
mobject_store_omap_iter_t* iter;
int* prval;
char keys[1];
}* rd_action_omap_get_vals_by_keys_t;
// keys is a contiguous buffer holding all
// the null-terminated keys
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <string.h>
#include "mobject-store-config.h"
#include "utlist.h"
#include "libmobject-store.h"
#include "log.h"
#include "read-op.h"
#include "completion.h"
mobject_store_read_op_t mobject_store_create_read_op(void)
{
mobject_store_read_op_t read_op =
(mobject_store_read_op_t)calloc(1, sizeof(*read_op));
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "Could not allocate read_op");
read_op->actions = (rd_action_base_t)0;
return read_op;
}
void mobject_store_release_read_op(mobject_store_read_op_t read_op)
{
if(read_op == MOBJECT_READ_OP_NULL) return;
rd_action_base_t action, tmp;
DL_FOREACH_SAFE(read_op->actions, action, tmp) {
DL_DELETE(read_op->actions, action);
free(action);
}
free(read_op);
}
void mobject_store_read_op_stat(mobject_store_read_op_t read_op,
uint64_t *psize,
time_t *pmtime,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
rd_action_stat_t action = (rd_action_stat_t)calloc(1, sizeof(*action));
action->base.type = READ_OPCODE_STAT;
action->psize = psize;
action->pmtime = pmtime;
action->prval = prval;
READ_ACTION_UPCAST(base, action);
DL_APPEND(read_op->actions, base);
}
void mobject_store_read_op_read(mobject_store_read_op_t read_op,
uint64_t offset,
size_t len,
char *buffer,
size_t *bytes_read,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
rd_action_read_t action = (rd_action_read_t)calloc(1, sizeof(*action));
action->base.type = READ_OPCODE_READ;
action->offset = offset;
action->len = len;
action->buffer = buffer;
action->bytes_read = bytes_read;
action->prval = prval;
READ_ACTION_UPCAST(base, action);
DL_APPEND(read_op->actions, base);
}
void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
const char *start_after,
const char *filter_prefix,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
// compute required size for embedded data
size_t strl1 = strlen(start_after)+1;
size_t strl2 = strlen(filter_prefix)+1;
size_t extra_mem = strl1+strl2;
rd_action_omap_get_vals_t action = (rd_action_omap_get_vals_t)calloc(1, sizeof(*action)-1+extra_mem);
action->base.type = READ_OPCODE_OMAP_GET_VALS;
action->start_after = action->data;
action->filter_prefix = action->data + strl1;
action->max_return = max_return;
action->iter = iter;
action->prval = prval;
strcpy(action->data, start_after);
strcpy(action->data + strl1, filter_prefix);
READ_ACTION_UPCAST(base, action);
DL_APPEND(read_op->actions, base);
}
void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op,
char const* const* keys,
size_t keys_len,
mobject_store_omap_iter_t *iter,
int *prval)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
// computing extra memory required to hold keys
size_t extra_mem = 0;
size_t i;
for(i = 0; i < keys_len; i++) {
extra_mem += strlen(keys[i]) + 1;
}
rd_action_omap_get_vals_by_keys_t action =
(rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(*action) - 1 + extra_mem);
action->base.type = READ_OPCODE_OMAP_GET_VALS_BY_KEYS;
action->keys_len = keys_len;
action->iter = iter;
action->prval = prval;
char* s = action->keys;
for(i = 0; i < keys_len; i++) {
strcpy(s, keys[i]);
s += strlen(keys[i]) + 1;
}
READ_ACTION_UPCAST(base, action);
DL_APPEND(read_op->actions, base);
}
int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
const char *oid,
int flags)
{
int r;
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
MOBJECT_ASSERT(0 == r, "Could not create completion object");
r = mobject_store_aio_read_op_operate(read_op, io, completion, oid, flags);
MOBJECT_ASSERT(0 == r, "Call to mobject_store_aio_read_op_operate failed");
r = mobject_store_aio_wait_for_complete(completion);
MOBJECT_ASSERT(0 == r, "Could not wait for completion");
int ret = mobject_store_aio_get_return_value(completion);
mobject_store_aio_release(completion);
return ret;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t obect");
// TODO
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_READ_OP_H
#define __MOBJECT_READ_OP_H
#include "mobject-store-config.h"
#include "read-actions.h"
struct mobject_store_read_op {
size_t num_actions;
rd_action_base_t actions;
};
#endif
......@@ -18,7 +18,6 @@ mobject_store_write_op_t mobject_store_create_write_op(void)
mobject_store_write_op_t write_op =
(mobject_store_write_op_t)calloc(1, sizeof(struct mobject_store_write_op));
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "Could not allocate write_op");
write_op->num_actions = 0;
write_op->actions = (wr_action_base_t)0;
return write_op;
}
......@@ -227,6 +226,7 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
int flags)
{
int r;
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
MOBJECT_ASSERT(0 == r, "Could not create completion object");
......@@ -246,5 +246,6 @@ int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
time_t *mtime,
int flags)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
// TODO
}
......@@ -10,7 +10,6 @@
#include "write-actions.h"
struct mobject_store_write_op {
size_t num_actions;
wr_action_base_t actions;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment