Commit 7a6bf0a6 authored by Matthieu Dorier's avatar Matthieu Dorier

resolving conflict merging master into new branch

parents f7bdeaef 6b9ee5c8
...@@ -7,7 +7,7 @@ TESTS = ...@@ -7,7 +7,7 @@ TESTS =
EXTRA_DIST = prepare.sh EXTRA_DIST = prepare.sh
AM_CPPFLAGS = -I$(top_srcdir)/include AM_CPPFLAGS = -I$(top_srcdir)/include
AM_CFLAGS = AM_CFLAGS =
AM_CXXFLAGS = $(AM_CFLAGS) AM_CXXFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = \ lib_LTLIBRARIES = \
...@@ -26,3 +26,4 @@ include Make.rules ...@@ -26,3 +26,4 @@ include Make.rules
include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/src/Makefile.subdir
include $(top_srcdir)/tests/Makefile.subdir include $(top_srcdir)/tests/Makefile.subdir
include $(top_srcdir)/tests/io-chain/Makefile.subdir
...@@ -79,6 +79,9 @@ enum { ...@@ -79,6 +79,9 @@ enum {
}; };
/** @} */ /** @} */
#define LIBMOBJECT_CREATE_EXCLUSIVE 1
#define LIBMOBJECT_CREATE_IDEMPOTENT 0
/** /**
* @typedef mobject_store_ioctx_t * @typedef mobject_store_ioctx_t
* *
...@@ -108,7 +111,7 @@ typedef void *mobject_store_ioctx_t; ...@@ -108,7 +111,7 @@ typedef void *mobject_store_ioctx_t;
* mobject_store_read_op_omap_get_vals_by_keys(), mobject_store_omap_get_next(), and * mobject_store_read_op_omap_get_vals_by_keys(), mobject_store_omap_get_next(), and
* mobject_store_omap_get_end(). * mobject_store_omap_get_end().
*/ */
typedef void *mobject_store_omap_iter_t; typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
/** /**
* @typedef mobject_store_write_op_t * @typedef mobject_store_write_op_t
...@@ -294,9 +297,9 @@ void mobject_store_write_op_write_full(mobject_store_write_op_t write_op, ...@@ -294,9 +297,9 @@ void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
*/ */
void mobject_store_write_op_writesame(mobject_store_write_op_t write_op, void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
const char *buffer, const char *buffer,
size_t data_len, size_t data_len,
size_t write_len, size_t write_len,
uint64_t offset); uint64_t offset);
/** /**
* Append to end of object. * Append to end of object.
...@@ -305,8 +308,8 @@ void mobject_store_write_op_writesame(mobject_store_write_op_t write_op, ...@@ -305,8 +308,8 @@ void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
* @param len length of buffer * @param len length of buffer
*/ */
void mobject_store_write_op_append(mobject_store_write_op_t write_op, void mobject_store_write_op_append(mobject_store_write_op_t write_op,
const char *buffer, const char *buffer,
size_t len); size_t len);
/** /**
* Remove object * Remove object
* @param write_op operation to add this action to * @param write_op operation to add this action to
...@@ -450,8 +453,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op, ...@@ -450,8 +453,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
*/ */
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op, void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after, const char *start_after,
uint64_t max_return, uint64_t max_return,
mobject_store_omap_iter_t *iter, mobject_store_omap_iter_t *iter,
int *prval); int *prval);
/** /**
......
...@@ -38,3 +38,4 @@ src_mobject_server_daemon_LDADD = \ ...@@ -38,3 +38,4 @@ src_mobject_server_daemon_LDADD = \
bin_PROGRAMS += \ bin_PROGRAMS += \
src/mobject-server-daemon src/mobject-server-daemon
...@@ -11,8 +11,8 @@ ...@@ -11,8 +11,8 @@
/** /**
* This file contains a set of structures meant * This file contains a set of structures meant
* to store arguments for write_op operations in * to store arguments for read_op operations in
* a buffer. Some of these structures are self-sufficiant. * a buffer. Some of these structures are self-sufficient.
* Some other are meant to be used as headers, and the * Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional * the serialized buffer actually contains additional
* data after them. * data after them.
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
/** /**
* This file contains a set of structures meant * This file contains a set of structures meant
* to store arguments for write_op operations in * to store arguments for write_op operations in
* a buffer. Some of these structures are self-sufficiant. * a buffer. Some of these structures are self-sufficient.
* Some other are meant to be used as headers, and the * Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional * the serialized buffer actually contains additional
* data after them. * data after them.
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#ifndef __MOBJECT_BUFFER_UNION_H #ifndef __MOBJECT_BUFFER_UNION_H
#define __MOBJECT_BUFFER_UNION_H #define __MOBJECT_BUFFER_UNION_H
#include <stdint.h>
/** /**
* This union is defined to be used in read and write actions * This union is defined to be used in read and write actions
* involving either a local pointer (const char*) or an offset * involving either a local pointer (const char*) or an offset
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#ifndef __MOBJECT_LOG_H #ifndef __MOBJECT_LOG_H
#define __MOBJECT_LOG_H #define __MOBJECT_LOG_H
#include <stdio.h>
#define STRINGIZE(x) STRINGIZE2(x) #define STRINGIZE(x) STRINGIZE2(x)
#define STRINGIZE2(x) #x #define STRINGIZE2(x) #x
#define LINE_STRING STRINGIZE(__LINE__) #define LINE_STRING STRINGIZE(__LINE__)
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <string.h>
#include "utlist.h"
#include "libmobject-store.h"
#include "omap-iter-impl.h"
#include "log.h"
void omap_iter_create(mobject_store_omap_iter_t* iter)
{
*iter = (mobject_store_omap_iter_t)calloc(1, sizeof(**iter));
(*iter)->ref_count = 1;
}
void omap_iter_free(mobject_store_omap_iter_t iter)
{
if(!iter) return;
iter->ref_count -= 1;
if(iter->ref_count > 0) return;
omap_iter_node_t elt, tmp;
DL_FOREACH_SAFE(iter->head, elt, tmp) {
DL_DELETE(iter->head, elt);
free((void*)(elt->key));
free((void*)(elt->value));
free((void*)(elt));
}
free(iter);
}
void omap_iter_incr_ref(mobject_store_omap_iter_t iter)
{
if(!iter) return;
iter->ref_count += 1;
}
void omap_iter_append(mobject_store_omap_iter_t iter,
const char* key, const char* val,
size_t val_size)
{
MOBJECT_ASSERT(iter, "trying to append to a NULL iterator");
omap_iter_node_t item = (omap_iter_node_t)calloc(1, sizeof(*item));
item->key = strdup(key);
item->key_size = strlen(key)+1;
item->value_size = val_size;
item->value = (char*)malloc(val_size);
memcpy(item->value, val, val_size);
DL_APPEND(iter->head, item);
if(iter->current == NULL)
iter->current = iter->head;
iter->num_items += 1;
}
int mobject_store_omap_get_next(mobject_store_omap_iter_t iter,
char **key,
char **val,
size_t *len)
{
if(iter->current == NULL) return -1;
*key = iter->current->key;
*val = iter->current->value;
*len = iter->current->value_size;
iter->current = iter->current->next;
return 0;
}
void mobject_store_omap_get_end(mobject_store_omap_iter_t iter)
{
omap_iter_free(iter);
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_OMAP_ITER_H
#define __MOBJECT_OMAP_ITER_H
#include "libmobject-store.h"
typedef struct omap_iter_node* omap_iter_node_t;
struct omap_iter_node {
char* key;
char* value;
size_t key_size;
size_t value_size;
omap_iter_node_t prev, next;
};
struct mobject_store_omap_iter {
size_t num_items;
size_t ref_count;
omap_iter_node_t head;
omap_iter_node_t current;
};
void omap_iter_create(mobject_store_omap_iter_t* iter);
void omap_iter_incr_ref(mobject_store_omap_iter_t iter);
void omap_iter_free(mobject_store_omap_iter_t iter);
void omap_iter_append(mobject_store_omap_iter_t iter,
const char* key, const char* val,
size_t val_size);
#endif
...@@ -16,7 +16,10 @@ static void prepare_read(uint64_t* cur_offset, ...@@ -16,7 +16,10 @@ static void prepare_read(uint64_t* cur_offset,
void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op) void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op)
{ {
if(read_op->ready == 1) return; if(read_op->ready == 1) return;
if(read_op->num_actions == 0) return; if(read_op->num_actions == 0) {
read_op->ready = 1;
return;
}
rd_action_base_t action; rd_action_base_t action;
......
...@@ -31,7 +31,10 @@ static void convert_append(uint64_t* cur_offset, ...@@ -31,7 +31,10 @@ static void convert_append(uint64_t* cur_offset,
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op) void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
{ {
if(write_op->ready == 1) return; if(write_op->ready == 1) return;
if(write_op->num_actions == 0) return; if(write_op->num_actions == 0) {
write_op->ready = 1;
return;
}
wr_action_base_t action; wr_action_base_t action;
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <margo.h>
#include "utlist.h"
#include "proc-omap-iter.h"
hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap_iter_t* iter)
{
omap_iter_node_t node;
hg_return_t ret = HG_SUCCESS;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_hg_size_t(proc, &((*iter)->num_items));
if(ret != HG_SUCCESS) return ret;
DL_FOREACH((*iter)->head, node) {
ret = hg_proc_hg_size_t(proc, &(node->key_size));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(node->key), node->key_size);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &(node->value_size));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, node->value, node->value_size);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
omap_iter_create(iter);
ret = hg_proc_hg_size_t(proc, &((*iter)->num_items));
if(ret != HG_SUCCESS) return ret;
unsigned i;
size_t key_size, val_size;
char* key;
char* val;
for(i = 0; i < (*iter)->num_items; i++) {
ret = hg_proc_hg_size_t(proc, &key_size);
key = (char*)malloc(key_size);
ret = hg_proc_memcpy(proc, key, key_size);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &val_size);
if(ret != HG_SUCCESS) return ret;
val - (char*)malloc(val_size);
ret = hg_proc_memcpy(proc, val, val_size);
omap_iter_append(*iter, key, val, val_size);
}
break;
case HG_FREE:
omap_iter_free(*iter);
break;
}
return HG_SUCCESS;
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PROC_OMAP_ITER_H
#define __MOBJECT_PROC_OMAP_ITER_H
#include "omap-iter-impl.h"
hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap_iter_t* iter);
#endif
...@@ -113,7 +113,7 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o ...@@ -113,7 +113,7 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o
// for each action ... // for each action ...
DL_FOREACH((*read_op)->actions,elem) { DL_FOREACH((*read_op)->actions,elem) {
read_op_code_t opcode = elem->type; read_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
"Invalid read_op opcode"); "Invalid read_op opcode");
// encode the type of action // encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
...@@ -143,11 +143,12 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o ...@@ -143,11 +143,12 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o
read_op_code_t opcode; read_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret; if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
"Invalid write_op opcode"); "Invalid write_op opcode");
// decode the action's arguments // decode the action's arguments
ret = decode_read_action[opcode](proc, &position, &next_action); ret = decode_read_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret; if(ret != HG_SUCCESS) return ret;
next_action->type = opcode;
// append to the list // append to the list
DL_APPEND((*read_op)->actions, next_action); DL_APPEND((*read_op)->actions, next_action);
} }
...@@ -189,7 +190,7 @@ static hg_return_t encode_read_action_read(hg_proc_t proc, ...@@ -189,7 +190,7 @@ static hg_return_t encode_read_action_read(hg_proc_t proc,
args_rd_action_read a; args_rd_action_read a;
a.offset = action->offset; a.offset = action->offset;
a.len = action->len; a.len = action->len;
a.bulk_offset = action->buffer.as_offset; a.bulk_offset = action->buffer.as_offset;
*pos += a.len; *pos += a.len;
return hg_proc_memcpy(proc, &a, sizeof(a)); return hg_proc_memcpy(proc, &a, sizeof(a));
} }
...@@ -276,10 +277,10 @@ static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc, ...@@ -276,10 +277,10 @@ static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
(*action)->max_return = a.max_return; (*action)->max_return = a.max_return;
(*action)->data_size = a.data_size; (*action)->data_size = a.data_size;
(*action)->start_after = (*action)->data; (*action)->start_after = (*action)->data;
size_t s = strlen((*action)->start_after);
(*action)->filter_prefix = (*action)->data + s + 1;
ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size); ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
size_t s = strlen((*action)->start_after);
(*action)->filter_prefix = (*action)->data + s + 1;
return ret; return ret;
} }
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <mercury_proc.h>
#include "proc-read-responses.h"
#include "read-responses.h"
#include "read-resp-impl.h"
#include "proc-omap-iter.h"
#include "utlist.h"
#include "log.h"
typedef hg_return_t (*encode_fn)(hg_proc_t, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, void*);
static hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r);
static hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r);
static hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r);
static hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r);
static hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r);
static hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r);
static encode_fn encode[] = {
NULL,
(encode_fn)encode_stat_response,
(encode_fn)encode_read_response,
(encode_fn)encode_omap_response
};
static decode_fn decode[] = {
NULL,
(decode_fn)decode_stat_response,
(decode_fn)decode_read_response,
(decode_fn)decode_omap_response
};
hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response)
{
hg_return_t ret = HG_SUCCESS;
rd_response_base_t elem;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
// encode the number of actions
ret = hg_proc_memcpy(proc, &((*response)->num_responses),
sizeof((*response)->num_responses));
if(ret != HG_SUCCESS) return ret;
DL_FOREACH((*response)->responses,elem) {
read_resp_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_),
"Invalid response code");
// encode the type of response
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
// encode the response's arguments
ret = encode[opcode](proc, elem);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
// allocate the response chain
*response = (read_response_t)calloc(1, sizeof(**response));
// decode the number of responses in the chain
ret = hg_proc_memcpy(proc, &((*response)->num_responses), sizeof((*response)->num_responses));
if(ret != HG_SUCCESS) return ret;
size_t i;
rd_response_base_t next_resp;
for(i = 0; i < (*response)->num_responses; i++) {
// decode the current response's type
read_resp_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the response's arguments
ret = decode[opcode](proc, &next_resp);
if(ret != HG_SUCCESS) return ret;
next_resp->type = opcode;
// append to the list
DL_APPEND((*response)->responses, next_resp);
}
break;
case HG_FREE:
free_read_responses(*response);
break;
}
return ret;
}
hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r)
{
hg_return_t ret;
ret = hg_proc_uint64_t(proc, &(r->psize));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->pmtime), sizeof(r->pmtime));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
return ret;
}
hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r)
{
*r = (rd_response_stat_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_uint64_t(proc, &((*r)->psize));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->pmtime), sizeof((*r)->pmtime));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
return ret;
}
hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r)
{
hg_return_t ret;
ret = hg_proc_hg_size_t(proc, &(r->bytes_read));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
return ret;
}
hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r)
{
*r = (rd_response_read_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_hg_size_t(proc, &((*r)->bytes_read));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
return ret;
}
hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r)
{
hg_return_t ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_mobject_store_omap_iter_t(proc, &(r->iter));
if(ret != HG_SUCCESS) return ret;
}
hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r)
{
*r = (rd_response_omap_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_mobject_store_omap_iter_t(proc, &((*r)->iter));
if(ret != HG_SUCCESS) return ret;
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PROC_READ_RESPONSE_H
#define __MOBJECT_PROC_READ_RESPONSE_H
#include <margo.h>
#include "read-resp-impl.h"
hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response);
#endif
...@@ -163,7 +163,7 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write ...@@ -163,7 +163,7 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write
// for each action ... // for each action ...
DL_FOREACH((*write_op)->actions,elem) { DL_FOREACH((*write_op)->actions,elem) {
write_op_code_t opcode = elem->type; write_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_), MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_),
"Invalid write_op opcode"); "Invalid write_op opcode");
// encode the type of action // encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
...@@ -194,11 +194,12 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write ...@@ -194,11 +194,12 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write
write_op_code_t opcode; write_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret; if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_), MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_),
"Invalid write_op opcode"); "Invalid write_op opcode");
// decode the action's arguments // decode the action's arguments
ret = decode_write_action[opcode](proc, &position, &next_action); ret = decode_write_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret; if(ret != HG_SUCCESS) return ret;
next_action->type = opcode;
// append to the list // append to the list
DL_APPEND((*write_op)->actions, next_action); DL_APPEND((*write_op)->actions, next_action);
} }
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#define __MOBJECT_READ_OPCODES_H #define __MOBJECT_READ_OPCODES_H
#include "mobject-store-config.h" #include "mobject-store-config.h"
#include "libmobject-store.h"
#include "buffer-union.h" #include "buffer-union.h"
typedef enum { typedef enum {
...@@ -65,9 +66,9 @@ typedef struct rd_action_OMAP_GET_VALS { ...@@ -65,9 +66,9 @@ typedef struct rd_action_OMAP_GET_VALS {
struct rd_action_BASE base; struct rd_action_BASE base;
const char* start_after; const char* start_after;
const char* filter_prefix; const char* filter_prefix;
uint64_t max_return; uint64_t max_return;
mobject_store_omap_iter_t* iter; mobject_store_omap_iter_t* iter;
int* prval; int* prval;
size_t data_size; size_t data_size;
char data[1]; char data[1];
}* rd_action_omap_get_vals_t; }* rd_action_omap_get_vals_t;
......
...@@ -8,13 +8,13 @@ ...@@ -8,13 +8,13 @@
#include "read-op-impl.h" #include "read-op-impl.h"
#include "utlist.h" #include "utlist.h"
static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a); static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs);
static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a); static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a); static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a); static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a, void* uargs);
static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a); static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a, void* uargs);
typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t); typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t, void*);