diff --git a/Makefile.am b/Makefile.am index 7a43138ea44ec6a8c8889b18364df7728012005b..fbcbc6b2ec20ffccd63a24e78b2c696e6bda9793 100644 --- a/Makefile.am +++ b/Makefile.am @@ -7,7 +7,7 @@ TESTS = EXTRA_DIST = prepare.sh AM_CPPFLAGS = -I$(top_srcdir)/include -AM_CFLAGS = +AM_CFLAGS = AM_CXXFLAGS = $(AM_CFLAGS) lib_LTLIBRARIES = \ @@ -26,3 +26,4 @@ include Make.rules include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/tests/Makefile.subdir +include $(top_srcdir)/tests/io-chain/Makefile.subdir diff --git a/include/libmobject-store.h b/include/libmobject-store.h index cf1366a7a293a8b67335419fd54681211b1ac2aa..0bf317c1d9af8a970e0e3dd9392dc26f8828938d 100644 --- a/include/libmobject-store.h +++ b/include/libmobject-store.h @@ -79,6 +79,9 @@ enum { }; /** @} */ +#define LIBMOBJECT_CREATE_EXCLUSIVE 1 +#define LIBMOBJECT_CREATE_IDEMPOTENT 0 + /** * @typedef mobject_store_ioctx_t * @@ -108,7 +111,7 @@ typedef void *mobject_store_ioctx_t; * mobject_store_read_op_omap_get_vals_by_keys(), mobject_store_omap_get_next(), and * mobject_store_omap_get_end(). */ -typedef void *mobject_store_omap_iter_t; +typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t; /** * @typedef mobject_store_write_op_t @@ -294,9 +297,9 @@ void mobject_store_write_op_write_full(mobject_store_write_op_t write_op, */ void mobject_store_write_op_writesame(mobject_store_write_op_t write_op, const char *buffer, - size_t data_len, - size_t write_len, - uint64_t offset); + size_t data_len, + size_t write_len, + uint64_t offset); /** * Append to end of object. @@ -305,8 +308,8 @@ void mobject_store_write_op_writesame(mobject_store_write_op_t write_op, * @param len length of buffer */ void mobject_store_write_op_append(mobject_store_write_op_t write_op, - const char *buffer, - size_t len); + const char *buffer, + size_t len); /** * Remove object * @param write_op operation to add this action to @@ -450,8 +453,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op, */ void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op, const char *start_after, - uint64_t max_return, - mobject_store_omap_iter_t *iter, + uint64_t max_return, + mobject_store_omap_iter_t *iter, int *prval); /** diff --git a/src/Makefile.subdir b/src/Makefile.subdir index f9f459e222564164044c574d5ba327e33cf501f0..81244c7694ef3e5faa3b42182ae60b99b85e3bb3 100644 --- a/src/Makefile.subdir +++ b/src/Makefile.subdir @@ -38,3 +38,4 @@ src_mobject_server_daemon_LDADD = \ bin_PROGRAMS += \ src/mobject-server-daemon + diff --git a/src/args-read-actions.h b/src/args-read-actions.h index ce5b2757aa9f384360c5e177da44c36f024844ef..ce2d6aef37be0a7a13dd13d6040c78fef3e5e478 100644 --- a/src/args-read-actions.h +++ b/src/args-read-actions.h @@ -11,8 +11,8 @@ /** * This file contains a set of structures meant - * to store arguments for write_op operations in - * a buffer. Some of these structures are self-sufficiant. + * to store arguments for read_op operations in + * a buffer. Some of these structures are self-sufficient. * Some other are meant to be used as headers, and the * the serialized buffer actually contains additional * data after them. diff --git a/src/args-write-actions.h b/src/args-write-actions.h index ef2fe0a89ba0b6748778c376fb065490e885c3ac..e21dc8ed82eff5b022998a347b3ae72902bc3334 100644 --- a/src/args-write-actions.h +++ b/src/args-write-actions.h @@ -12,7 +12,7 @@ /** * This file contains a set of structures meant * to store arguments for write_op operations in - * a buffer. Some of these structures are self-sufficiant. + * a buffer. Some of these structures are self-sufficient. * Some other are meant to be used as headers, and the * the serialized buffer actually contains additional * data after them. diff --git a/src/buffer-union.h b/src/buffer-union.h index a0faf553b177f80c591c1e41dfbe7e6594813cf2..9c8d05213b8148e7905169a128e4dc1b57627ccf 100644 --- a/src/buffer-union.h +++ b/src/buffer-union.h @@ -6,6 +6,7 @@ #ifndef __MOBJECT_BUFFER_UNION_H #define __MOBJECT_BUFFER_UNION_H +#include /** * This union is defined to be used in read and write actions * involving either a local pointer (const char*) or an offset diff --git a/src/log.h b/src/log.h index be8a693a248f71c22c9530ab2d0ee00c141266bb..1140f067d3ee09ecb02be40bc76f937668dc05a5 100644 --- a/src/log.h +++ b/src/log.h @@ -6,6 +6,8 @@ #ifndef __MOBJECT_LOG_H #define __MOBJECT_LOG_H +#include + #define STRINGIZE(x) STRINGIZE2(x) #define STRINGIZE2(x) #x #define LINE_STRING STRINGIZE(__LINE__) diff --git a/src/omap-iter-impl.c b/src/omap-iter-impl.c new file mode 100644 index 0000000000000000000000000000000000000000..f06e4d630c9ec40592c576a09d745e0b18a3b9a3 --- /dev/null +++ b/src/omap-iter-impl.c @@ -0,0 +1,83 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include +#include "utlist.h" +#include "libmobject-store.h" +#include "omap-iter-impl.h" +#include "log.h" + +void omap_iter_create(mobject_store_omap_iter_t* iter) +{ + *iter = (mobject_store_omap_iter_t)calloc(1, sizeof(**iter)); + (*iter)->ref_count = 1; +} + +void omap_iter_free(mobject_store_omap_iter_t iter) +{ + if(!iter) return; + iter->ref_count -= 1; + if(iter->ref_count > 0) return; + + omap_iter_node_t elt, tmp; + + DL_FOREACH_SAFE(iter->head, elt, tmp) { + DL_DELETE(iter->head, elt); + free((void*)(elt->key)); + free((void*)(elt->value)); + free((void*)(elt)); + } + + free(iter); +} + +void omap_iter_incr_ref(mobject_store_omap_iter_t iter) +{ + if(!iter) return; + iter->ref_count += 1; +} + +void omap_iter_append(mobject_store_omap_iter_t iter, + const char* key, const char* val, + size_t val_size) +{ + MOBJECT_ASSERT(iter, "trying to append to a NULL iterator"); + + omap_iter_node_t item = (omap_iter_node_t)calloc(1, sizeof(*item)); + item->key = strdup(key); + item->key_size = strlen(key)+1; + item->value_size = val_size; + item->value = (char*)malloc(val_size); + memcpy(item->value, val, val_size); + + DL_APPEND(iter->head, item); + + if(iter->current == NULL) + iter->current = iter->head; + + iter->num_items += 1; +} + +int mobject_store_omap_get_next(mobject_store_omap_iter_t iter, + char **key, + char **val, + size_t *len) +{ + if(iter->current == NULL) return -1; + + *key = iter->current->key; + *val = iter->current->value; + *len = iter->current->value_size; + + iter->current = iter->current->next; + + return 0; +} + +void mobject_store_omap_get_end(mobject_store_omap_iter_t iter) +{ + omap_iter_free(iter); +} diff --git a/src/omap-iter-impl.h b/src/omap-iter-impl.h new file mode 100644 index 0000000000000000000000000000000000000000..96f079f3299418000045c8ffee695074fc4a1569 --- /dev/null +++ b/src/omap-iter-impl.h @@ -0,0 +1,38 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __MOBJECT_OMAP_ITER_H +#define __MOBJECT_OMAP_ITER_H + +#include "libmobject-store.h" + +typedef struct omap_iter_node* omap_iter_node_t; + +struct omap_iter_node { + char* key; + char* value; + size_t key_size; + size_t value_size; + omap_iter_node_t prev, next; +}; + +struct mobject_store_omap_iter { + size_t num_items; + size_t ref_count; + omap_iter_node_t head; + omap_iter_node_t current; +}; + +void omap_iter_create(mobject_store_omap_iter_t* iter); + +void omap_iter_incr_ref(mobject_store_omap_iter_t iter); + +void omap_iter_free(mobject_store_omap_iter_t iter); + +void omap_iter_append(mobject_store_omap_iter_t iter, + const char* key, const char* val, + size_t val_size); + +#endif diff --git a/src/prepare-read-op.c b/src/prepare-read-op.c index f07070a8ade865b98315e3bba10e6e4d63583f6d..8ec293900517617808e306f7127740d4b9f13910 100644 --- a/src/prepare-read-op.c +++ b/src/prepare-read-op.c @@ -16,7 +16,10 @@ static void prepare_read(uint64_t* cur_offset, void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op) { if(read_op->ready == 1) return; - if(read_op->num_actions == 0) return; + if(read_op->num_actions == 0) { + read_op->ready = 1; + return; + } rd_action_base_t action; diff --git a/src/prepare-write-op.c b/src/prepare-write-op.c index fc618b155f4fdb81d029987e6f7dd810b8da5cad..66b1b6f4ba6a1f96141a2163f3f40667f3f55b6a 100644 --- a/src/prepare-write-op.c +++ b/src/prepare-write-op.c @@ -31,7 +31,10 @@ static void convert_append(uint64_t* cur_offset, void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op) { if(write_op->ready == 1) return; - if(write_op->num_actions == 0) return; + if(write_op->num_actions == 0) { + write_op->ready = 1; + return; + } wr_action_base_t action; diff --git a/src/proc-omap-iter.c b/src/proc-omap-iter.c new file mode 100644 index 0000000000000000000000000000000000000000..844c221b3c775c5c2d0c5e32b0fa2ca7a93c6637 --- /dev/null +++ b/src/proc-omap-iter.c @@ -0,0 +1,66 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include "utlist.h" +#include "proc-omap-iter.h" + +hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap_iter_t* iter) +{ + omap_iter_node_t node; + hg_return_t ret = HG_SUCCESS; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + + ret = hg_proc_hg_size_t(proc, &((*iter)->num_items)); + if(ret != HG_SUCCESS) return ret; + + DL_FOREACH((*iter)->head, node) { + ret = hg_proc_hg_size_t(proc, &(node->key_size)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &(node->key), node->key_size); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_hg_size_t(proc, &(node->value_size)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, node->value, node->value_size); + if(ret != HG_SUCCESS) return ret; + } + + break; + + case HG_DECODE: + + omap_iter_create(iter); + ret = hg_proc_hg_size_t(proc, &((*iter)->num_items)); + if(ret != HG_SUCCESS) return ret; + + unsigned i; + size_t key_size, val_size; + char* key; + char* val; + for(i = 0; i < (*iter)->num_items; i++) { + ret = hg_proc_hg_size_t(proc, &key_size); + key = (char*)malloc(key_size); + ret = hg_proc_memcpy(proc, key, key_size); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_hg_size_t(proc, &val_size); + if(ret != HG_SUCCESS) return ret; + val - (char*)malloc(val_size); + ret = hg_proc_memcpy(proc, val, val_size); + omap_iter_append(*iter, key, val, val_size); + } + + break; + + case HG_FREE: + + omap_iter_free(*iter); + + break; + } + return HG_SUCCESS; +} diff --git a/src/proc-omap-iter.h b/src/proc-omap-iter.h new file mode 100644 index 0000000000000000000000000000000000000000..caf8a8d8e60a5f9778c61b8e43d8337cc559b267 --- /dev/null +++ b/src/proc-omap-iter.h @@ -0,0 +1,13 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __MOBJECT_PROC_OMAP_ITER_H +#define __MOBJECT_PROC_OMAP_ITER_H + +#include "omap-iter-impl.h" + +hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap_iter_t* iter); + +#endif diff --git a/src/proc-read-actions.c b/src/proc-read-actions.c index 96e25e6e1d4c66e722a0787c378125b6e8bf9d74..0e7f3d83d5f3ec79a9819aa9ba291c8f0d959206 100644 --- a/src/proc-read-actions.c +++ b/src/proc-read-actions.c @@ -113,7 +113,7 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o // for each action ... DL_FOREACH((*read_op)->actions,elem) { read_op_code_t opcode = elem->type; - MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), + MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_), "Invalid read_op opcode"); // encode the type of action ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); @@ -143,11 +143,12 @@ hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_o read_op_code_t opcode; ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); if(ret != HG_SUCCESS) return ret; - MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_), + MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_), "Invalid write_op opcode"); // decode the action's arguments ret = decode_read_action[opcode](proc, &position, &next_action); if(ret != HG_SUCCESS) return ret; + next_action->type = opcode; // append to the list DL_APPEND((*read_op)->actions, next_action); } @@ -189,7 +190,7 @@ static hg_return_t encode_read_action_read(hg_proc_t proc, args_rd_action_read a; a.offset = action->offset; a.len = action->len; - a.bulk_offset = action->buffer.as_offset; + a.bulk_offset = action->buffer.as_offset; *pos += a.len; return hg_proc_memcpy(proc, &a, sizeof(a)); } @@ -276,10 +277,10 @@ static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc, (*action)->max_return = a.max_return; (*action)->data_size = a.data_size; (*action)->start_after = (*action)->data; - size_t s = strlen((*action)->start_after); - (*action)->filter_prefix = (*action)->data + s + 1; ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size); + size_t s = strlen((*action)->start_after); + (*action)->filter_prefix = (*action)->data + s + 1; return ret; } diff --git a/src/proc-read-responses.c b/src/proc-read-responses.c new file mode 100644 index 0000000000000000000000000000000000000000..381a7fc3d36a46fae19bc0436e8f91a6849155d5 --- /dev/null +++ b/src/proc-read-responses.c @@ -0,0 +1,161 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include "proc-read-responses.h" +#include "read-responses.h" +#include "read-resp-impl.h" +#include "proc-omap-iter.h" +#include "utlist.h" +#include "log.h" + +typedef hg_return_t (*encode_fn)(hg_proc_t, void*); +typedef hg_return_t (*decode_fn)(hg_proc_t, void*); + +static hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r); +static hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r); +static hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r); +static hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r); +static hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r); +static hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r); + +static encode_fn encode[] = { + NULL, + (encode_fn)encode_stat_response, + (encode_fn)encode_read_response, + (encode_fn)encode_omap_response +}; + +static decode_fn decode[] = { + NULL, + (decode_fn)decode_stat_response, + (decode_fn)decode_read_response, + (decode_fn)decode_omap_response +}; + +hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response) +{ + hg_return_t ret = HG_SUCCESS; + rd_response_base_t elem; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // encode the number of actions + ret = hg_proc_memcpy(proc, &((*response)->num_responses), + sizeof((*response)->num_responses)); + if(ret != HG_SUCCESS) return ret; + + DL_FOREACH((*response)->responses,elem) { + read_resp_code_t opcode = elem->type; + MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_), + "Invalid response code"); + // encode the type of response + ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); + if(ret != HG_SUCCESS) return ret; + // encode the response's arguments + ret = encode[opcode](proc, elem); + if(ret != HG_SUCCESS) return ret; + } + + break; + case HG_DECODE: + // allocate the response chain + *response = (read_response_t)calloc(1, sizeof(**response)); + // decode the number of responses in the chain + ret = hg_proc_memcpy(proc, &((*response)->num_responses), sizeof((*response)->num_responses)); + if(ret != HG_SUCCESS) return ret; + + size_t i; + rd_response_base_t next_resp; + for(i = 0; i < (*response)->num_responses; i++) { + // decode the current response's type + read_resp_code_t opcode; + ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); + if(ret != HG_SUCCESS) return ret; + MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_), + "Invalid write_op opcode"); + // decode the response's arguments + ret = decode[opcode](proc, &next_resp); + if(ret != HG_SUCCESS) return ret; + next_resp->type = opcode; + // append to the list + DL_APPEND((*response)->responses, next_resp); + } + + break; + case HG_FREE: + + free_read_responses(*response); + break; + } + + return ret; +} + +hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r) +{ + hg_return_t ret; + ret = hg_proc_uint64_t(proc, &(r->psize)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &(r->pmtime), sizeof(r->pmtime)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval)); + return ret; +} + +hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r) +{ + *r = (rd_response_stat_t)calloc(1, sizeof(**r)); + + hg_return_t ret; + ret = hg_proc_uint64_t(proc, &((*r)->psize)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &((*r)->pmtime), sizeof((*r)->pmtime)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval)); + return ret; +} + +hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r) +{ + hg_return_t ret; + ret = hg_proc_hg_size_t(proc, &(r->bytes_read)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval)); + return ret; +} + +hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r) +{ + *r = (rd_response_read_t)calloc(1, sizeof(**r)); + + hg_return_t ret; + ret = hg_proc_hg_size_t(proc, &((*r)->bytes_read)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval)); + return ret; + +} + +hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r) +{ + hg_return_t ret; + ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_mobject_store_omap_iter_t(proc, &(r->iter)); + if(ret != HG_SUCCESS) return ret; +} + +hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r) +{ + *r = (rd_response_omap_t)calloc(1, sizeof(**r)); + + hg_return_t ret; + ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval)); + if(ret != HG_SUCCESS) return ret; + ret = hg_proc_mobject_store_omap_iter_t(proc, &((*r)->iter)); + if(ret != HG_SUCCESS) return ret; +} diff --git a/src/proc-read-responses.h b/src/proc-read-responses.h new file mode 100644 index 0000000000000000000000000000000000000000..df4a454c22ff9da3b3f229118c92dee64a463c9f --- /dev/null +++ b/src/proc-read-responses.h @@ -0,0 +1,14 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __MOBJECT_PROC_READ_RESPONSE_H +#define __MOBJECT_PROC_READ_RESPONSE_H + +#include +#include "read-resp-impl.h" + +hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response); + +#endif diff --git a/src/proc-write-actions.c b/src/proc-write-actions.c index 8d0b9ca768d5e49411c9fe61e7dc9135ddc14121..79e8f0302a120b0305f8622c2047f76d4a0c0d36 100644 --- a/src/proc-write-actions.c +++ b/src/proc-write-actions.c @@ -163,7 +163,7 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write // for each action ... DL_FOREACH((*write_op)->actions,elem) { write_op_code_t opcode = elem->type; - MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_), + MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_), "Invalid write_op opcode"); // encode the type of action ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); @@ -194,11 +194,12 @@ hg_return_t hg_proc_mobject_store_write_op_t(hg_proc_t proc, mobject_store_write write_op_code_t opcode; ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode)); if(ret != HG_SUCCESS) return ret; - MOBJECT_ASSERT((opcode <= 0 || opcode >= _WRITE_OPCODE_END_ENUM_), + MOBJECT_ASSERT((opcode > 0 && opcode < _WRITE_OPCODE_END_ENUM_), "Invalid write_op opcode"); // decode the action's arguments ret = decode_write_action[opcode](proc, &position, &next_action); if(ret != HG_SUCCESS) return ret; + next_action->type = opcode; // append to the list DL_APPEND((*write_op)->actions, next_action); } diff --git a/src/read-actions.h b/src/read-actions.h index 3c27e0bad2a080cc3de2d9edf098ab1d998aa3cf..62b065a8402c9d75341875c5d19e368cc61562ba 100644 --- a/src/read-actions.h +++ b/src/read-actions.h @@ -7,6 +7,7 @@ #define __MOBJECT_READ_OPCODES_H #include "mobject-store-config.h" +#include "libmobject-store.h" #include "buffer-union.h" typedef enum { @@ -65,9 +66,9 @@ typedef struct rd_action_OMAP_GET_VALS { struct rd_action_BASE base; const char* start_after; const char* filter_prefix; - uint64_t max_return; - mobject_store_omap_iter_t* iter; - int* prval; + uint64_t max_return; + mobject_store_omap_iter_t* iter; + int* prval; size_t data_size; char data[1]; }* rd_action_omap_get_vals_t; diff --git a/src/read-op-visitor.c b/src/read-op-visitor.c index bb3506a2f31859a2226e78030d4a579ecf7ada64..47bdfc9b9293f5c56eab9b5f784fefeb5c9c63fe 100644 --- a/src/read-op-visitor.c +++ b/src/read-op-visitor.c @@ -8,13 +8,13 @@ #include "read-op-impl.h" #include "utlist.h" -static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a); -static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a); -static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a); -static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a); -static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a); +static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs); +static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs); +static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a, void* uargs); +static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a, void* uargs); +static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a, void* uargs); -typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t); +typedef void (*dispatch_fn)(read_op_visitor_t, rd_action_base_t, void*); static dispatch_fn visitor_dispatch[_READ_OPCODE_END_ENUM_] = { NULL, @@ -25,15 +25,14 @@ static dispatch_fn visitor_dispatch[_READ_OPCODE_END_ENUM_] = { (dispatch_fn)execute_read_op_visitor_on_omap_get_vals_by_keys, }; -void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op) +void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op, void* uargs) { - void* uargs = visitor->uargs; rd_action_base_t a; visitor->visit_begin(uargs); DL_FOREACH((read_op->actions), a) { - visitor_dispatch[a->type](visitor, a); + visitor_dispatch[a->type](visitor, a, uargs); } visitor->visit_end(uargs); @@ -43,28 +42,34 @@ void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t // STATIC FUNCTIONS BELOW // //////////////////////////////////////////////////////////////////////////////// -static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a) +static void execute_read_op_visitor_on_stat(read_op_visitor_t visitor, rd_action_stat_t a, void* uargs) { - visitor->visit_stat(visitor->uargs, a->psize, a->pmtime, a->prval); + if(visitor->visit_stat) + visitor->visit_stat(uargs, a->psize, a->pmtime, a->prval); } -static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a) +static void execute_read_op_visitor_on_read(read_op_visitor_t visitor, rd_action_read_t a, void* uargs) { - visitor->visit_read(visitor->uargs, a->offset, a->len, a->buffer, a->bytes_read, a->prval); + if(visitor->visit_read) + visitor->visit_read(uargs, a->offset, a->len, a->buffer, a->bytes_read, a->prval); } -static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a) +static void execute_read_op_visitor_on_omap_get_keys(read_op_visitor_t visitor, rd_action_omap_get_keys_t a, void* uargs) { - visitor->visit_omap_get_keys(visitor->uargs, a->start_after, a->max_return, a->iter, a->prval); + if(visitor->visit_omap_get_keys) + visitor->visit_omap_get_keys(uargs, a->start_after, a->max_return, a->iter, a->prval); } -static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a) +static void execute_read_op_visitor_on_omap_get_vals(read_op_visitor_t visitor, rd_action_omap_get_vals_t a, void* uargs) { - visitor->visit_omap_get_vals(visitor->uargs, a->start_after, a->filter_prefix, a->max_return, a->iter, a->prval); + if(visitor->visit_omap_get_vals) + visitor->visit_omap_get_vals(uargs, a->start_after, a->filter_prefix, a->max_return, a->iter, a->prval); } -static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a) +static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t visitor, rd_action_omap_get_vals_by_keys_t a, void* uargs) { + if(visitor->visit_omap_get_vals_by_keys == NULL) return; + const char* keys[a->num_keys]; unsigned i; const char* ptr = a->data; @@ -72,6 +77,6 @@ static void execute_read_op_visitor_on_omap_get_vals_by_keys(read_op_visitor_t v keys[i] = ptr; ptr += strlen(keys[i])+1; } - visitor->visit_omap_get_vals_by_keys(visitor->uargs, keys, a->num_keys, a->iter, a->prval); + visitor->visit_omap_get_vals_by_keys(uargs, keys, a->num_keys, a->iter, a->prval); } diff --git a/src/read-op-visitor.h b/src/read-op-visitor.h index bba2a7c538f1ff47a665b7f0c969f8a455edac38..e4bfe78e71e6b7ca294ad6d204c260e64591c0e9 100644 --- a/src/read-op-visitor.h +++ b/src/read-op-visitor.h @@ -10,7 +10,6 @@ #include "buffer-union.h" typedef struct read_op_visitor { - void* uargs; void (*visit_begin)(void*); void (*visit_stat)(void*, uint64_t*, time_t*, int*); void (*visit_read)(void*, uint64_t, size_t, buffer_u, size_t*, int*); @@ -20,6 +19,6 @@ typedef struct read_op_visitor { void (*visit_end)(void*); }* read_op_visitor_t; -void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op); +void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op, void* uarg); #endif diff --git a/src/read-resp-impl.c b/src/read-resp-impl.c new file mode 100644 index 0000000000000000000000000000000000000000..2539ed5a795952c673a5db68f1e083bd9a2a049e --- /dev/null +++ b/src/read-resp-impl.c @@ -0,0 +1,211 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include "read-op-impl.h" +#include "read-responses.h" +#include "read-resp-impl.h" +#include "read-actions.h" +#include "omap-iter-impl.h" +#include "utlist.h" +#include "log.h" + + +/** + * "build" functions + * Taking an action and building a response object for it. + */ +typedef rd_response_base_t (*build_matching_fn)(rd_action_base_t); + +static rd_response_base_t build_matching_stat(rd_action_stat_t a); +static rd_response_base_t build_matching_read(rd_action_read_t a); +static rd_response_base_t build_matching_omap_get_keys(rd_action_omap_get_keys_t a); +static rd_response_base_t build_matching_omap_get_vals(rd_action_omap_get_vals_t a); +static rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_by_keys_t a); + +/** + * "feed" functions + * Taking an action and a response, feeding the response's fields + * into the actions's pointers. + */ +typedef void (*feed_action_fn)(rd_action_base_t, rd_response_base_t); + +static void feed_stat_action(rd_action_stat_t a, rd_response_stat_t r); +static void feed_read_action(rd_action_read_t a, rd_response_read_t r); +static void feed_omap_get_keys_action(rd_action_omap_get_keys_t a, rd_response_omap_t r); +static void feed_omap_get_vals_action(rd_action_omap_get_vals_t a, rd_response_omap_t r); +static void feed_omap_get_vals_by_keys_action(rd_action_omap_get_vals_by_keys_t a, rd_response_omap_t r); + +/** + * "free" functions + * Frees a response object. + */ +typedef void (*free_response_fn)(rd_response_base_t); + +static void free_resp_omap(rd_response_omap_t a) { + omap_iter_free(a->iter); + free(a); +}; + +static build_matching_fn match_fn[] = { + NULL, + (build_matching_fn)build_matching_stat, + (build_matching_fn)build_matching_read, + (build_matching_fn)build_matching_omap_get_keys, + (build_matching_fn)build_matching_omap_get_vals, + (build_matching_fn)build_matching_omap_get_vals_by_keys +}; + +static feed_action_fn feed_fn[] = { + NULL, + (feed_action_fn)feed_stat_action, + (feed_action_fn)feed_read_action, + (feed_action_fn)feed_omap_get_keys_action, + (feed_action_fn)feed_omap_get_vals_action, + (feed_action_fn)feed_omap_get_vals_by_keys_action +}; + +static free_response_fn free_fn[] = { + NULL, + (free_response_fn)free, + (free_response_fn)free, + (free_response_fn)free_resp_omap +}; + +read_response_t build_matching_read_responses(mobject_store_read_op_t read_op) +{ + rd_action_base_t a; + rd_response_base_t r = NULL; + + read_response_t result = (read_response_t)calloc(1,sizeof(*result)); + + DL_FOREACH(read_op->actions, a) { + r = match_fn[a->type](a); + DL_APPEND(result->responses, r); + result->num_responses += 1; + } + + return result; +} + +void free_read_responses(read_response_t resp) +{ + rd_response_base_t r, tmp; + DL_FOREACH_SAFE(resp->responses, r, tmp) { + DL_DELETE(resp->responses, r); + free_fn[r->type](r); + } + free(resp); +} + +void feed_read_op_pointers_from_response(mobject_store_read_op_t read_op, read_response_t response) +{ + MOBJECT_ASSERT((read_op->num_actions == response->num_responses), + "Number of responses received doesn't match number of operations in read_op"); + + rd_action_base_t a; + rd_response_base_t r = response->responses; + + DL_FOREACH(read_op->actions, a) { + feed_fn[a->type](a, r); + r = r->next; + } +} + +rd_response_base_t build_matching_stat(rd_action_stat_t a) +{ + rd_response_stat_t resp = (rd_response_stat_t)calloc(1, sizeof(*resp)); + resp->base.type = READ_RESPCODE_STAT; + a->psize = &(resp->psize); + a->pmtime = &(resp->pmtime); + a->prval = &(resp->prval); + return (rd_response_base_t)resp; +} + +rd_response_base_t build_matching_read(rd_action_read_t a) +{ + rd_response_read_t resp = (rd_response_read_t)calloc(1, sizeof(*resp)); + resp->base.type = READ_RESPCODE_READ; + a->bytes_read = &(resp->bytes_read); + a->prval = &(resp->prval); + return (rd_response_base_t)resp; +} + +rd_response_base_t build_matching_omap_get_keys(rd_action_omap_get_keys_t a) +{ + rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp)); + resp->base.type = READ_RESPCODE_OMAP; + a->prval = &(resp->prval); + a->iter = &(resp->iter); + return (rd_response_base_t)resp; +} + +rd_response_base_t build_matching_omap_get_vals(rd_action_omap_get_vals_t a) +{ + rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp)); + resp->base.type = READ_RESPCODE_OMAP; + a->prval = &(resp->prval); + a->iter = &(resp->iter); + return (rd_response_base_t)resp; +} + +rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_by_keys_t a) +{ + rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp)); + resp->base.type = READ_RESPCODE_OMAP; + a->prval = &(resp->prval); + a->iter = &(resp->iter); + return (rd_response_base_t)resp; +} + +void feed_stat_action(rd_action_stat_t a, rd_response_stat_t r) +{ + MOBJECT_ASSERT(r->base.type == READ_RESPCODE_STAT, + "Response type does not match the input action"); + if(a->psize) *(a->psize) = r->psize; + if(a->pmtime) *(a->pmtime) = r->pmtime; + if(a->prval) *(a->prval) = r->prval; +} + +void feed_read_action(rd_action_read_t a, rd_response_read_t r) +{ + MOBJECT_ASSERT(r->base.type == READ_RESPCODE_READ, + "Response type does not match the input action"); + if(a->bytes_read) *(a->bytes_read) = r->bytes_read; + if(a->prval) *(a->prval) = r->prval; +} + +void feed_omap_get_keys_action(rd_action_omap_get_keys_t a, rd_response_omap_t r) +{ + MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP, + "Response type does not match the input action"); + if(a->prval) *(a->prval) = r->prval; + if(a->iter) { + *(a->iter) = r->iter; + omap_iter_incr_ref(r->iter); + } +} + +void feed_omap_get_vals_action(rd_action_omap_get_vals_t a, rd_response_omap_t r) +{ + MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP, + "Response type does not match the input action"); + if(a->prval) *(a->prval) = r->prval; + if(a->iter) { + *(a->iter) = r->iter; + omap_iter_incr_ref(r->iter); + } +} + +void feed_omap_get_vals_by_keys_action(rd_action_omap_get_vals_by_keys_t a, rd_response_omap_t r) +{ + MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP, + "Response type does not match the input action"); + if(a->prval) *(a->prval) = r->prval; + if(a->iter) { + *(a->iter) = r->iter; + omap_iter_incr_ref(r->iter); + } +} diff --git a/src/read-resp-impl.h b/src/read-resp-impl.h new file mode 100644 index 0000000000000000000000000000000000000000..5684d5e6022597ebfc67d6d60e65e57447156d60 --- /dev/null +++ b/src/read-resp-impl.h @@ -0,0 +1,25 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __MOBJECT_READ_RESP_IMPL_H +#define __MOBJECT_READ_RESP_IMPL_H + +#include +#include "mobject-store-config.h" +#include "libmobject-store.h" + +typedef struct rd_response_BASE* rd_response_base_t; + +typedef struct read_response { + size_t num_responses; + rd_response_base_t responses; +}* read_response_t; + +read_response_t build_matching_read_responses(mobject_store_read_op_t actions); +void free_read_responses(read_response_t response); +void feed_read_op_pointers_from_response(mobject_store_read_op_t actions, read_response_t response); + +#endif + diff --git a/src/read-responses.h b/src/read-responses.h new file mode 100644 index 0000000000000000000000000000000000000000..08aac5ba9ef35ddadc4535e46ce878eff2a66b8c --- /dev/null +++ b/src/read-responses.h @@ -0,0 +1,56 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __MOBJECT_READ_RESPONSE_H +#define __MOBJECT_READ_RESPONSE_H + +#include +#include "mobject-store-config.h" +#include "libmobject-store.h" + +typedef enum { + READ_RESPCODE_BASE = 0, + READ_RESPCODE_STAT, + READ_RESPCODE_READ, + READ_RESPCODE_OMAP, + _READ_RESPCODE_END_ENUM_ +} read_resp_code_t; + +typedef struct rd_response_BASE { + read_resp_code_t type; + struct rd_response_BASE* prev; + struct rd_response_BASE* next; +}* rd_response_base_t; + +/** + * stat response + */ + typedef struct rd_response_STAT { + struct rd_response_BASE base; + uint64_t psize; + time_t pmtime; + int prval; + }* rd_response_stat_t; + +/** + * read response + */ +typedef struct rd_response_READ { + struct rd_response_BASE base; + size_t bytes_read; + int prval; +}* rd_response_read_t; + +/** + * omap_* responses + */ +typedef struct rd_response_OMAP { + struct rd_response_BASE base; + int prval; + mobject_store_omap_iter_t iter; +}* rd_response_omap_t; + +#endif + diff --git a/src/write-op-impl.c b/src/write-op-impl.c index f84fb347be64ef9f569874837c48c408e54a9456..06887174585a8c2c895267731e550fca3bd35b30 100644 --- a/src/write-op-impl.c +++ b/src/write-op-impl.c @@ -195,6 +195,7 @@ void mobject_store_write_op_truncate(mobject_store_write_op_t write_op, wr_action_truncate_t action = (wr_action_truncate_t)calloc(1, sizeof(*action)); action->base.type = WRITE_OPCODE_TRUNCATE; + action->offset = offset; WRITE_ACTION_UPCAST(base, action); DL_APPEND(write_op->actions, base); diff --git a/src/write-op-visitor.c b/src/write-op-visitor.c index 50336abc8867d91fd36e25f7254b9107418a0d8a..1d6373e3ad15dff7cf27ef045ac59bec7029d928 100644 --- a/src/write-op-visitor.c +++ b/src/write-op-visitor.c @@ -7,19 +7,20 @@ #include "write-op-visitor.h" #include "write-op-impl.h" #include "utlist.h" +#include "log.h" -static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a); -static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a); -static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a); -static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a); -static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a); -static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a); -static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a); -static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a); -static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a); -static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a); +static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a, void* uargs); +static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a, void* uargs); +static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a, void* uargs); +static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a, void* uargs); +static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a, void* uargs); +static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a, void* uargs); +static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a, void* uargs); +static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a, void* uargs); +static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a, void* uargs); +static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a, void* uargs); -typedef void (*dispatch_fn)(write_op_visitor_t, wr_action_base_t); +typedef void (*dispatch_fn)(write_op_visitor_t, wr_action_base_t, void* uargs); static dispatch_fn visitor_dispatch[_WRITE_OPCODE_END_ENUM_] = { NULL, @@ -35,15 +36,16 @@ static dispatch_fn visitor_dispatch[_WRITE_OPCODE_END_ENUM_] = { (dispatch_fn)execute_write_op_visitor_on_omap_rm_keys }; -void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op) +void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op, void* uargs) { - void* uargs = visitor->uargs; wr_action_base_t a; visitor->visit_begin(uargs); DL_FOREACH((write_op->actions), a) { - visitor_dispatch[a->type](visitor, a); + MOBJECT_ASSERT(a->type > 0 && a->type < _WRITE_OPCODE_END_ENUM_, + "Invalid action type"); + visitor_dispatch[a->type](visitor, a, uargs); } visitor->visit_end(uargs); @@ -53,48 +55,58 @@ void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op // STATIC FUNCTIONS BELOW // //////////////////////////////////////////////////////////////////////////////// -static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a) +static void execute_write_op_visitor_on_create(write_op_visitor_t visitor, wr_action_create_t a, void* uargs) { - visitor->visit_create(visitor->uargs, a->exclusive); + if(visitor->visit_create) + visitor->visit_create(uargs, a->exclusive); } -static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a) +static void execute_write_op_visitor_on_write(write_op_visitor_t visitor, wr_action_write_t a, void* uargs) { - visitor->visit_write(visitor->uargs, a->buffer, a->len, a->offset); + if(visitor->visit_write) + visitor->visit_write(uargs, a->buffer, a->len, a->offset); } -static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a) +static void execute_write_op_visitor_on_write_full(write_op_visitor_t visitor, wr_action_write_full_t a, void* uargs) { - visitor->visit_write_full(visitor->uargs, a->buffer, a->len); + if(visitor->visit_write_full) + visitor->visit_write_full(uargs, a->buffer, a->len); } -static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a) +static void execute_write_op_visitor_on_write_same(write_op_visitor_t visitor, wr_action_write_same_t a, void* uargs) { - visitor->visit_writesame(visitor->uargs, a->buffer, a->data_len, a->write_len, a->offset); + if(visitor->visit_writesame) + visitor->visit_writesame(uargs, a->buffer, a->data_len, a->write_len, a->offset); } -static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a) +static void execute_write_op_visitor_on_append(write_op_visitor_t visitor, wr_action_append_t a, void* uargs) { - visitor->visit_append(visitor->uargs, a->buffer, a->len); + if(visitor->visit_append) + visitor->visit_append(uargs, a->buffer, a->len); } -static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a) +static void execute_write_op_visitor_on_remove(write_op_visitor_t visitor, wr_action_remove_t a, void* uargs) { - visitor->visit_remove(visitor->uargs); + if(visitor->visit_remove) + visitor->visit_remove(uargs); } -static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a) +static void execute_write_op_visitor_on_truncate(write_op_visitor_t visitor, wr_action_truncate_t a, void* uargs) { - visitor->visit_truncate(visitor->uargs, a->offset); + if(visitor->visit_truncate) + visitor->visit_truncate(uargs, a->offset); } -static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a) +static void execute_write_op_visitor_on_zero(write_op_visitor_t visitor, wr_action_zero_t a, void* uargs) { - visitor->visit_zero(visitor->uargs, a->offset, a->len); + if(visitor->visit_zero) + visitor->visit_zero(uargs, a->offset, a->len); } -static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a) +static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_action_omap_set_t a, void* uargs) { + if(visitor->visit_omap_set == NULL) return; + size_t num = a->num; size_t lens[num]; const char* keys[num]; @@ -116,11 +128,13 @@ static void execute_write_op_visitor_on_omap_set(write_op_visitor_t visitor, wr_ ptr += lens[i]; } - visitor->visit_omap_set(visitor->uargs, keys, vals, lens, num); + visitor->visit_omap_set(uargs, keys, vals, lens, num); } -static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a) +static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, wr_action_omap_rm_keys_t a, void* uargs) { + if(visitor->visit_omap_rm_keys == NULL) return; + size_t num_keys = a->num_keys; const char* keys[num_keys]; @@ -131,5 +145,5 @@ static void execute_write_op_visitor_on_omap_rm_keys(write_op_visitor_t visitor, ptr += strlen(keys[i])+1; } - visitor->visit_omap_rm_keys(visitor->uargs, keys, num_keys); + visitor->visit_omap_rm_keys(uargs, keys, num_keys); } diff --git a/src/write-op-visitor.h b/src/write-op-visitor.h index b7f84a8f1dbc2b893a51cc9862c34dc6fa873344..ccc6e271e0d8e31d60f9d767b65697472a71f8ea 100644 --- a/src/write-op-visitor.h +++ b/src/write-op-visitor.h @@ -10,7 +10,6 @@ #include "buffer-union.h" typedef struct write_op_visitor { - void* uargs; void (*visit_begin)(void*); void (*visit_create)(void*, int); void (*visit_write)(void*, buffer_u, size_t, uint64_t); @@ -25,6 +24,6 @@ typedef struct write_op_visitor { void (*visit_end)(void*); }* write_op_visitor_t; -void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op); +void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op, void* uargs); #endif diff --git a/tests/io-chain/Makefile.subdir b/tests/io-chain/Makefile.subdir new file mode 100644 index 0000000000000000000000000000000000000000..fcb1197707fa9e31180b7828f47a657f82065c0a --- /dev/null +++ b/tests/io-chain/Makefile.subdir @@ -0,0 +1,11 @@ +check_PROGRAMS += \ + tests/io-chain/io-chain-client \ + tests/io-chain/io-chain-server + +tests_io_chain_io_chain_client_SOURCES = tests/io-chain/io-chain-client.c +tests_io_chain_io_chain_client_CPPFLAGS = -I${srcdir}/include -I${srcdir}/test/io-chain +tests_io_chain_io_chain_client_LDADD = src/libmobject-store.la + +tests_io_chain_io_chain_server_SOURCES = tests/io-chain/io-chain-server.c +tests_io_chain_io_chain_server_CPPFLAGS = -I${srcdir}/include -I${srcdir}/test/io-chain +tests_io_chain_io_chain_server_LDADD = src/libmobject-store.la diff --git a/tests/io-chain/io-chain-client.c b/tests/io-chain/io-chain-client.c new file mode 100644 index 0000000000000000000000000000000000000000..2b155dfad4ac37f71e7fb66048829e707f297b9c --- /dev/null +++ b/tests/io-chain/io-chain-client.c @@ -0,0 +1,153 @@ +#include +#include +#include +#include +#include "types.h" +#include "src/prepare-write-op.h" +#include "src/prepare-read-op.h" + +/* Main function. */ +int main(int argc, char** argv) +{ + if(argc != 2) { + fprintf(stderr,"Usage: %s \n", argv[0]); + exit(0); + } + + /* Start Margo */ + margo_instance_id mid = margo_init("bmi+tcp", MARGO_CLIENT_MODE, 0, 0); + + /* Register a RPC function */ + hg_id_t write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL); + hg_id_t read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL); + + /* Lookup the address of the server */ + hg_addr_t svr_addr; + margo_addr_lookup(mid, argv[1], &svr_addr); + + char buffer[256]; + unsigned i; + for(i=0; i<256; i++) buffer[i] = 'A'+(i % 26); + + { // WRITE OP TEST + + mobject_store_write_op_t write_op = mobject_store_create_write_op(); + + // Add a "create" operation + mobject_store_write_op_create(write_op, LIBMOBJECT_CREATE_EXCLUSIVE, NULL); + // Add a "write" operation + mobject_store_write_op_write(write_op, buffer, 128, 32); + // Add a "write_full" operation + mobject_store_write_op_write_full(write_op, buffer, 256); + // Add a "writesame" operation + mobject_store_write_op_writesame(write_op, buffer, 32, 64, 256); + // Add a "append" operation + mobject_store_write_op_append(write_op, buffer, 64); + // Add a "remove" operation + mobject_store_write_op_remove(write_op); + // Add a "truncate" operation + mobject_store_write_op_truncate(write_op, 32); + // Add a "zero" operation + mobject_store_write_op_zero(write_op, 16, 48); + // Add a "omap_set" operation + const char* keys[] = { "matthieu", "rob", "shane", "phil", "robl" }; + const char* values[] = { "mdorier@anl.gov", "rross@anl.gov", "ssnyder@anl.gov", "carns@anl.gov", "robl@anl.gov" }; + size_t val_sizes[] = { 16, 14, 16, 14, 13 }; + mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5); + // Add a omap_rm_keys" operation + mobject_store_write_op_omap_rm_keys(write_op, keys, 5); + + // BEGIN this is what write_op_operate should contain + + write_op_in_t in; + in.object_name = "test-object"; + in.write_op = write_op; + + prepare_write_op(mid, write_op); + + hg_handle_t h; + margo_create(mid, svr_addr, write_op_rpc_id, &h); + margo_forward(h, &in); + + write_op_out_t resp; + margo_get_output(h, &resp); + + margo_free_output(h,&resp); + margo_destroy(h); + + // END this is what write_op_operate should contain + + mobject_store_release_write_op(write_op); + + } + + { // READ OP TEST + + mobject_store_read_op_t read_op = mobject_store_create_read_op(); + + // Add "stat" operation + uint64_t psize; + time_t pmtime; + int prval1; + mobject_store_read_op_stat(read_op, &psize, &pmtime, &prval1); + // Add "read" operation + char read_buf[512]; + size_t bytes_read; + int prval2; + mobject_store_read_op_read(read_op, 2, 32, buffer, &bytes_read, &prval2); + // Add "omap_get_keys" operation + const char* start_after = "shane"; + mobject_store_omap_iter_t iter3; + int prval3; + mobject_store_read_op_omap_get_keys(read_op, start_after, 7, &iter3, &prval3); + // Add "omap_get_vals" operation + const char* filter_prefix = "p"; + mobject_store_omap_iter_t iter4; + int prval4; + mobject_store_read_op_omap_get_vals(read_op, start_after, filter_prefix, 3, &iter4, &prval4); + // Add "omap_get_vals_by_keys" operation + const char* keys[] = {"matthieu", "shane"}; + int prval5; + mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter4, &prval5); + + // BEGIN this is what read_op_operate should contain + + read_op_in_t in; + in.object_name = "test-object"; + in.read_op = read_op; + + prepare_read_op(mid, read_op); + + hg_handle_t h; + margo_create(mid, svr_addr, read_op_rpc_id, &h); + margo_forward(h, &in); + + read_op_out_t resp; + margo_get_output(h, &resp); + + feed_read_op_pointers_from_response(read_op, resp.responses); + + margo_free_output(h,&resp); + margo_destroy(h); + + // END this is what read_op_operate should contain + + mobject_store_release_read_op(read_op); + + // print the results of the read operations + printf("Client received the following results:\n"); + printf("stat: psize=%ld pmtime=%lld prval=%d\n", psize, pmtime, prval1); + printf("read: bytes_read=%ld prval=%d\n", bytes_read, prval2); + printf("omap_get_keys: prval=%d\n", prval3); + printf("omap_get_vals: prval=%d\n", prval4); + printf("omap_get_vals_by_keys: prval=%d\n", prval5); + } + + /* free the address */ + margo_addr_free(mid, svr_addr); + + /* shut down Margo */ + margo_finalize(mid); + + return 0; +} diff --git a/tests/io-chain/io-chain-server.c b/tests/io-chain/io-chain-server.c new file mode 100644 index 0000000000000000000000000000000000000000..ea9455be5f4d10414210187969aca2660543ae75 --- /dev/null +++ b/tests/io-chain/io-chain-server.c @@ -0,0 +1,322 @@ +#include +#include +#include +#include +#include +#include +#include "types.h" +#include "src/write-op-visitor.h" +#include "src/read-op-visitor.h" +#include "src/omap-iter-impl.h" +#include "src/read-resp-impl.h" + +/* after serving this number of rpcs, the server will shut down. */ +static const int TOTAL_RPCS = 16; +/* number of RPCS already received. */ +static int num_rpcs = 0; + +hg_return_t mobject_write_op_rpc(hg_handle_t h); +DECLARE_MARGO_RPC_HANDLER(mobject_write_op_rpc) + +hg_return_t mobject_read_op_rpc(hg_handle_t h); +DECLARE_MARGO_RPC_HANDLER(mobject_read_op_rpc) + +/* + * main function. + */ +int main(int argc, char** argv) +{ + /* Initialize Margo */ + margo_instance_id mid = margo_init("bmi+tcp", MARGO_SERVER_MODE, 0, 0); + assert(mid); + + hg_addr_t my_address; + margo_addr_self(mid, &my_address); + char addr_str[128]; + size_t addr_str_size = 128; + margo_addr_to_string(mid, addr_str, &addr_str_size, my_address); + margo_addr_free(mid,my_address); + printf("Server running at address %s\n", addr_str); + + /* Register the RPC by its name ("sum") */ + MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, mobject_write_op_rpc); + MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, mobject_read_op_rpc) + + /* NOTE: there isn't anything else for the server to do at this point + * except wait for itself to be shut down. The + * margo_wait_for_finalize() call here yields to let Margo drive + * progress until that happens. + */ + margo_wait_for_finalize(mid); + + return 0; +} + +static void write_op_printer_begin(void*); +static void write_op_printer_end(void*); +static void write_op_printer_create(void*, int); +static void write_op_printer_write(void*, buffer_u, size_t, uint64_t); +static void write_op_printer_write_full(void*, buffer_u, size_t); +static void write_op_printer_writesame(void*, buffer_u, size_t, size_t, uint64_t); +static void write_op_printer_append(void*, buffer_u, size_t); +static void write_op_printer_remove(void*); +static void write_op_printer_truncate(void*, uint64_t); +static void write_op_printer_zero(void*, uint64_t, uint64_t); +static void write_op_printer_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t); +static void write_op_printer_omap_rm_keys(void*, char const* const*, size_t); + +struct write_op_visitor write_op_printer = { + .visit_begin = write_op_printer_begin, + .visit_create = write_op_printer_create, + .visit_write = write_op_printer_write, + .visit_write_full = write_op_printer_write_full, + .visit_writesame = write_op_printer_writesame, + .visit_append = write_op_printer_append, + .visit_remove = write_op_printer_remove, + .visit_truncate = write_op_printer_truncate, + .visit_zero = write_op_printer_zero, + .visit_omap_set = write_op_printer_omap_set, + .visit_omap_rm_keys = write_op_printer_omap_rm_keys, + .visit_end = write_op_printer_end +}; + +/* Implementation of the RPC. */ +hg_return_t mobject_write_op_rpc(hg_handle_t h) +{ + hg_return_t ret; + num_rpcs += 1; + + write_op_in_t in; + write_op_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* Deserialize the input from the received handle. */ + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + /* Execute the operation chain */ + execute_write_op_visitor(&write_op_printer, in.write_op, in.object_name); + + // set the return value of the RPC + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + /* Free the input data. */ + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + /* We are not going to use the handle anymore, so we should destroy it. */ + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + + if(num_rpcs == TOTAL_RPCS) { + /* NOTE: we assume that the server daemon is using + * margo_wait_for_finalize() to suspend until this RPC executes, so there + * is no need to send any extra signal to notify it. + */ + margo_finalize(mid); + } + + return HG_SUCCESS; +} +DEFINE_MARGO_RPC_HANDLER(mobject_write_op_rpc) + + +static void read_op_printer_begin(void*); +static void read_op_printer_stat(void*, uint64_t*, time_t*, int*); +static void read_op_printer_read(void*, uint64_t, size_t, buffer_u, size_t*, int*); +static void read_op_printer_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*); +static void read_op_printer_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*); +static void read_op_printer_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*); +static void read_op_printer_end(void*); + +struct read_op_visitor read_op_printer = { + .visit_begin = read_op_printer_begin, + .visit_end = read_op_printer_end, + .visit_stat = read_op_printer_stat, + .visit_read = read_op_printer_read, + .visit_omap_get_keys = read_op_printer_omap_get_keys, + .visit_omap_get_vals = read_op_printer_omap_get_vals, + .visit_omap_get_vals_by_keys = read_op_printer_omap_get_vals_by_keys +}; + +/* Implementation of the RPC. */ +hg_return_t mobject_read_op_rpc(hg_handle_t h) +{ + hg_return_t ret; + num_rpcs += 1; + + read_op_in_t in; + read_op_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* Deserialize the input from the received handle. */ + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + /* Create a response list matching the input actions */ + read_response_t resp = build_matching_read_responses(in.read_op); + + /* Compute the result. */ + execute_read_op_visitor(&read_op_printer, in.read_op, in.object_name); + + out.responses = resp; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + free_read_responses(resp); + + /* Free the input data. */ + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + /* We are not going to use the handle anymore, so we should destroy it. */ + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + + if(num_rpcs == TOTAL_RPCS) { + /* NOTE: we assume that the server daemon is using + * margo_wait_for_finalize() to suspend until this RPC executes, so there + * is no need to send any extra signal to notify it. + */ + margo_finalize(mid); + } + + return HG_SUCCESS; +} +DEFINE_MARGO_RPC_HANDLER(mobject_read_op_rpc) + +void write_op_printer_begin(void* object_name) +{ + printf("\n",(char*)object_name); +} + +void write_op_printer_end(void* unused) +{ + printf("\n"); +} + +void write_op_printer_create(void* unused, int exclusive) +{ + printf("\t\n", exclusive); +} + +void write_op_printer_write(void* u, buffer_u buf, size_t len, uint64_t offset) +{ + printf("\t\n", buf.as_offset, len, offset); +} + +void write_op_printer_write_full(void* u, buffer_u buf, size_t len) +{ + printf("\t\n", buf.as_offset, len); +} + +void write_op_printer_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset) +{ + printf("\t\n", + buf.as_offset, data_len, write_len, offset); +} + +void write_op_printer_append(void* u, buffer_u buf, size_t len) +{ + printf("\t\n", buf.as_offset, len); +} + +void write_op_printer_remove(void* u) +{ + printf("\t\n"); +} + +void write_op_printer_truncate(void* u, uint64_t offset) +{ + printf("\t\n", offset); +} + +void write_op_printer_zero(void* u, uint64_t offset, uint64_t len) +{ + printf("\t\n", offset, len); +} + +void write_op_printer_omap_set(void* u, char const* const* keys, + char const* const* vals, + const size_t *lens, + size_t num) +{ + printf("\t\n", num); + unsigned i; + for(i=0; i%s\n", + keys[i], lens[i], vals[i]); + } + printf("\t\n"); +} + +void write_op_printer_omap_rm_keys(void* u, char const* const* keys, size_t num_keys) +{ + printf("\t\n", num_keys); + unsigned i; + for(i=0; i\n", keys[i]); + } + printf("\t\n"); +} + +void read_op_printer_begin(void* u) +{ + printf("\n", (char*)u); +} + +void read_op_printer_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval) +{ + printf("\t\n"); + *psize = 128; + time(pmtime); + *prval = 1234; +} + +void read_op_printer_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval) +{ + printf("\t\n",offset, len, buf.as_offset); + *bytes_read = len; + *prval = 1235; +} + +void read_op_printer_omap_get_keys(void* u, const char* start_after, uint64_t max_return, + mobject_store_omap_iter_t* iter, int* prval) +{ + printf("\t\n", start_after, max_return); + omap_iter_create(iter); + // use omap_iter_append to add things to the iterator + *prval = 1236; +} + +void read_op_printer_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval) +{ + printf("\t\n", + start_after, filter_prefix, max_return); + + omap_iter_create(iter); + *prval = 1237; +} + +void read_op_printer_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval) +{ + printf("\t\n", num_keys); + unsigned i; + for(i=0; i\n", keys[i]); + printf("\t\n"); + + omap_iter_create(iter); + *prval = 1238; +} + +void read_op_printer_end(void* u) +{ + printf("\n"); +} diff --git a/tests/io-chain/types.h b/tests/io-chain/types.h new file mode 100644 index 0000000000000000000000000000000000000000..9bcc428bacfcf13f7ecb420d60cd67497c2995ec --- /dev/null +++ b/tests/io-chain/types.h @@ -0,0 +1,24 @@ +#ifndef PARAM_H +#define PARAM_H + +#include +#include +#include +#include +#include "src/proc-write-actions.h" +#include "src/proc-read-actions.h" +#include "src/proc-read-responses.h" + +MERCURY_GEN_PROC(write_op_in_t, + ((hg_string_t)(object_name))\ + ((mobject_store_write_op_t)(write_op))) + +MERCURY_GEN_PROC(write_op_out_t, ((int32_t)(ret))) + +MERCURY_GEN_PROC(read_op_in_t, + ((hg_string_t)(object_name))\ + ((mobject_store_read_op_t)(read_op))) + +MERCURY_GEN_PROC(read_op_out_t, ((read_response_t)(responses))) + +#endif