Commit 301f13b1 authored by Matthieu Dorier's avatar Matthieu Dorier

done implementing and testing read_op responses

parent 21ada1ab
......@@ -11,7 +11,8 @@ src_libmobject_store_la_SOURCES = \
src/read-op-visitor.c \
src/omap-iter-impl.c \
src/proc-omap-iter.c \
src/read-responses.c
src/read-resp-impl.c \
src/proc-read-responses.c
noinst_HEADERS += \
src/log.h \
......@@ -30,4 +31,6 @@ noinst_HEADERS += \
src/buffer-union.h \
src/omap-iter-impl.h \
src/proc-omap-iter.h \
src/read-responses.h
src/read-responses.h \
src/read-resp-impl.h \
src/proc-read-responses.h
......@@ -13,11 +13,14 @@
void omap_iter_create(mobject_store_omap_iter_t* iter)
{
*iter = (mobject_store_omap_iter_t)calloc(1, sizeof(**iter));
(*iter)->ref_count = 1;
}
void omap_iter_free(mobject_store_omap_iter_t iter)
{
if(!iter) return;
iter->ref_count -= 1;
if(iter->ref_count > 0) return;
omap_iter_node_t elt, tmp;
......@@ -31,6 +34,12 @@ void omap_iter_free(mobject_store_omap_iter_t iter)
free(iter);
}
void omap_iter_incr_ref(mobject_store_omap_iter_t iter)
{
if(!iter) return;
iter->ref_count += 1;
}
void omap_iter_append(mobject_store_omap_iter_t iter,
const char* key, const char* val,
size_t val_size)
......
......@@ -20,12 +20,15 @@ struct omap_iter_node {
struct mobject_store_omap_iter {
size_t num_items;
size_t ref_count;
omap_iter_node_t head;
omap_iter_node_t current;
};
void omap_iter_create(mobject_store_omap_iter_t* iter);
void omap_iter_incr_ref(mobject_store_omap_iter_t iter);
void omap_iter_free(mobject_store_omap_iter_t iter);
void omap_iter_append(mobject_store_omap_iter_t iter,
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <mercury_proc.h>
#include "proc-read-responses.h"
#include "read-responses.h"
#include "read-resp-impl.h"
#include "proc-omap-iter.h"
#include "utlist.h"
#include "log.h"
typedef hg_return_t (*encode_fn)(hg_proc_t, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, void*);
static hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r);
static hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r);
static hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r);
static hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r);
static hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r);
static hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r);
static encode_fn encode[] = {
NULL,
(encode_fn)encode_stat_response,
(encode_fn)encode_read_response,
(encode_fn)encode_omap_response
};
static decode_fn decode[] = {
NULL,
(decode_fn)decode_stat_response,
(decode_fn)decode_read_response,
(decode_fn)decode_omap_response
};
hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response)
{
hg_return_t ret = HG_SUCCESS;
rd_response_base_t elem;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
// encode the number of actions
ret = hg_proc_memcpy(proc, &((*response)->num_responses),
sizeof((*response)->num_responses));
if(ret != HG_SUCCESS) return ret;
DL_FOREACH((*response)->responses,elem) {
read_resp_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_),
"Invalid response code");
// encode the type of response
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
// encode the response's arguments
ret = encode[opcode](proc, elem);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
// allocate the response chain
*response = (read_response_t)calloc(1, sizeof(**response));
// decode the number of responses in the chain
ret = hg_proc_memcpy(proc, &((*response)->num_responses), sizeof((*response)->num_responses));
if(ret != HG_SUCCESS) return ret;
size_t i;
rd_response_base_t next_resp;
for(i = 0; i < (*response)->num_responses; i++) {
// decode the current response's type
read_resp_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode > 0 || opcode < _READ_RESPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the response's arguments
ret = decode[opcode](proc, &next_resp);
if(ret != HG_SUCCESS) return ret;
next_resp->type = opcode;
// append to the list
DL_APPEND((*response)->responses, next_resp);
}
break;
case HG_FREE:
free_read_responses(*response);
break;
}
return ret;
}
hg_return_t encode_stat_response(hg_proc_t proc, rd_response_stat_t r)
{
hg_return_t ret;
ret = hg_proc_uint64_t(proc, &(r->psize));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->pmtime), sizeof(r->pmtime));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
return ret;
}
hg_return_t decode_stat_response(hg_proc_t proc, rd_response_stat_t* r)
{
*r = (rd_response_stat_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_uint64_t(proc, &((*r)->psize));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->pmtime), sizeof((*r)->pmtime));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
return ret;
}
hg_return_t encode_read_response(hg_proc_t proc, rd_response_read_t r)
{
hg_return_t ret;
ret = hg_proc_hg_size_t(proc, &(r->bytes_read));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
return ret;
}
hg_return_t decode_read_response(hg_proc_t proc, rd_response_read_t* r)
{
*r = (rd_response_read_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_hg_size_t(proc, &((*r)->bytes_read));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
return ret;
}
hg_return_t encode_omap_response(hg_proc_t proc, rd_response_omap_t r)
{
hg_return_t ret;
ret = hg_proc_memcpy(proc, &(r->prval), sizeof(r->prval));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_mobject_store_omap_iter_t(proc, &(r->iter));
if(ret != HG_SUCCESS) return ret;
}
hg_return_t decode_omap_response(hg_proc_t proc, rd_response_omap_t* r)
{
*r = (rd_response_omap_t)calloc(1, sizeof(**r));
hg_return_t ret;
ret = hg_proc_memcpy(proc, &((*r)->prval), sizeof((*r)->prval));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_mobject_store_omap_iter_t(proc, &((*r)->iter));
if(ret != HG_SUCCESS) return ret;
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PROC_READ_RESPONSE_H
#define __MOBJECT_PROC_READ_RESPONSE_H
#include <margo.h>
#include "read-resp-impl.h"
hg_return_t hg_proc_read_response_t(hg_proc_t proc, read_response_t* response);
#endif
......@@ -4,10 +4,19 @@
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include "read-op-impl.h"
#include "read-responses.h"
#include "read-resp-impl.h"
#include "read-actions.h"
#include "omap-iter-impl.h"
#include "utlist.h"
#include "log.h"
/**
* "build" functions
* Taking an action and building a response object for it.
*/
typedef rd_response_base_t (*build_matching_fn)(rd_action_base_t);
static rd_response_base_t build_matching_stat(rd_action_stat_t a);
......@@ -16,6 +25,23 @@ static rd_response_base_t build_matching_omap_get_keys(rd_action_omap_get_keys_t
static rd_response_base_t build_matching_omap_get_vals(rd_action_omap_get_vals_t a);
static rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_by_keys_t a);
/**
* "feed" functions
* Taking an action and a response, feeding the response's fields
* into the actions's pointers.
*/
typedef void (*feed_action_fn)(rd_action_base_t, rd_response_base_t);
static void feed_stat_action(rd_action_stat_t a, rd_response_stat_t r);
static void feed_read_action(rd_action_read_t a, rd_response_read_t r);
static void feed_omap_get_keys_action(rd_action_omap_get_keys_t a, rd_response_omap_t r);
static void feed_omap_get_vals_action(rd_action_omap_get_vals_t a, rd_response_omap_t r);
static void feed_omap_get_vals_by_keys_action(rd_action_omap_get_vals_by_keys_t a, rd_response_omap_t r);
/**
* "free" functions
* Frees a response object.
*/
typedef void (*free_response_fn)(rd_response_base_t);
static void free_resp_omap(rd_response_omap_t a) {
......@@ -32,6 +58,15 @@ static build_matching_fn match_fn[] = {
(build_matching_fn)build_matching_omap_get_vals_by_keys
};
static feed_action_fn feed_fn[] = {
NULL,
(feed_action_fn)feed_stat_action,
(feed_action_fn)feed_read_action,
(feed_action_fn)feed_omap_get_keys_action,
(feed_action_fn)feed_omap_get_vals_action,
(feed_action_fn)feed_omap_get_vals_by_keys_action
};
static free_response_fn free_fn[] = {
NULL,
(free_response_fn)free,
......@@ -39,27 +74,44 @@ static free_response_fn free_fn[] = {
(free_response_fn)free_resp_omap
};
rd_response_base_t build_matching_read_responses(rd_action_base_t actions)
read_response_t build_matching_read_responses(mobject_store_read_op_t read_op)
{
rd_action_base_t a;
rd_response_base_t resp = NULL;
rd_response_base_t tmp = NULL;
rd_response_base_t r = NULL;
DL_FOREACH(actions, a) {
tmp = match_fn[a->type](a);
DL_APPEND(resp, tmp);
read_response_t result = (read_response_t)calloc(1,sizeof(*result));
DL_FOREACH(read_op->actions, a) {
r = match_fn[a->type](a);
DL_APPEND(result->responses, r);
result->num_responses += 1;
}
return resp;
return result;
}
void free_read_responses(rd_response_base_t responses)
void free_read_responses(read_response_t resp)
{
rd_response_base_t r, tmp;
DL_FOREACH_SAFE(responses, r, tmp) {
DL_DELETE(responses, r);
DL_FOREACH_SAFE(resp->responses, r, tmp) {
DL_DELETE(resp->responses, r);
free_fn[r->type](r);
}
free(resp);
}
void feed_read_op_pointers_from_response(mobject_store_read_op_t read_op, read_response_t response)
{
MOBJECT_ASSERT((read_op->num_actions == response->num_responses),
"Number of responses received doesn't match number of operations in read_op");
rd_action_base_t a;
rd_response_base_t r = response->responses;
DL_FOREACH(read_op->actions, a) {
feed_fn[a->type](a, r);
r = r->next;
}
}
rd_response_base_t build_matching_stat(rd_action_stat_t a)
......@@ -107,3 +159,53 @@ rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_
a->iter = &(resp->iter);
return (rd_response_base_t)resp;
}
void feed_stat_action(rd_action_stat_t a, rd_response_stat_t r)
{
MOBJECT_ASSERT(r->base.type == READ_RESPCODE_STAT,
"Response type does not match the input action");
if(a->psize) *(a->psize) = r->psize;
if(a->pmtime) *(a->pmtime) = r->pmtime;
if(a->prval) *(a->prval) = r->prval;
}
void feed_read_action(rd_action_read_t a, rd_response_read_t r)
{
MOBJECT_ASSERT(r->base.type == READ_RESPCODE_READ,
"Response type does not match the input action");
if(a->bytes_read) *(a->bytes_read) = r->bytes_read;
if(a->prval) *(a->prval) = r->prval;
}
void feed_omap_get_keys_action(rd_action_omap_get_keys_t a, rd_response_omap_t r)
{
MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP,
"Response type does not match the input action");
if(a->prval) *(a->prval) = r->prval;
if(a->iter) {
*(a->iter) = r->iter;
omap_iter_incr_ref(r->iter);
}
}
void feed_omap_get_vals_action(rd_action_omap_get_vals_t a, rd_response_omap_t r)
{
MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP,
"Response type does not match the input action");
if(a->prval) *(a->prval) = r->prval;
if(a->iter) {
*(a->iter) = r->iter;
omap_iter_incr_ref(r->iter);
}
}
void feed_omap_get_vals_by_keys_action(rd_action_omap_get_vals_by_keys_t a, rd_response_omap_t r)
{
MOBJECT_ASSERT(r->base.type == READ_RESPCODE_OMAP,
"Response type does not match the input action");
if(a->prval) *(a->prval) = r->prval;
if(a->iter) {
*(a->iter) = r->iter;
omap_iter_incr_ref(r->iter);
}
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_READ_RESP_IMPL_H
#define __MOBJECT_READ_RESP_IMPL_H
#include <stdint.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
typedef struct rd_response_BASE* rd_response_base_t;
typedef struct read_response {
size_t num_responses;
rd_response_base_t responses;
}* read_response_t;
read_response_t build_matching_read_responses(mobject_store_read_op_t actions);
void free_read_responses(read_response_t response);
void feed_read_op_pointers_from_response(mobject_store_read_op_t actions, read_response_t response);
#endif
......@@ -6,17 +6,10 @@
#ifndef __MOBJECT_READ_RESPONSE_H
#define __MOBJECT_READ_RESPONSE_H
#include <stdint.h>
#include "mobject-store-config.h"
#include "read-actions.h"
#include "libmobject-store.h"
/**
* This file contains a set of structures meant
* to store responses for read_op operations in
* a buffer. Some of these structures are self-sufficiant.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
*/
typedef enum {
READ_RESPCODE_BASE = 0,
READ_RESPCODE_STAT,
......@@ -59,8 +52,5 @@ typedef struct rd_response_OMAP {
mobject_store_omap_iter_t iter;
}* rd_response_omap_t;
rd_response_base_t build_matching_read_responses(rd_action_base_t actions);
void free_read_responses(rd_response_base_t responses);
#endif
......@@ -61,7 +61,7 @@ int main(int argc, char** argv)
write_op_in_t in;
in.object_name = "test-object";
in.chain = write_op;
in.write_op = write_op;
prepare_write_op(mid, write_op);
......@@ -78,6 +78,7 @@ int main(int argc, char** argv)
// END this is what write_op_operate should contain
mobject_store_release_write_op(write_op);
}
{ // READ OP TEST
......@@ -106,13 +107,14 @@ int main(int argc, char** argv)
mobject_store_read_op_omap_get_vals(read_op, start_after, filter_prefix, 3, &iter4, &prval4);
// Add "omap_get_vals_by_keys" operation
const char* keys[] = {"matthieu", "shane"};
mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter4, &prval4);
int prval5;
mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter4, &prval5);
// BEGIN this is what read_op_operate should contain
read_op_in_t in;
in.object_name = "test-object";
in.chain = read_op;
in.read_op = read_op;
prepare_read_op(mid, read_op);
......@@ -123,7 +125,7 @@ int main(int argc, char** argv)
read_op_out_t resp;
margo_get_output(h, &resp);
// print something
feed_read_op_pointers_from_response(read_op, resp.responses);
margo_free_output(h,&resp);
margo_destroy(h);
......@@ -131,6 +133,14 @@ int main(int argc, char** argv)
// END this is what read_op_operate should contain
mobject_store_release_read_op(read_op);
// print the results of the read operations
printf("Client received the following results:\n");
printf("stat: psize=%ld pmtime=%lld prval=%d\n", psize, pmtime, prval1);
printf("read: bytes_read=%ld prval=%d\n", bytes_read, prval2);
printf("omap_get_keys: prval=%d\n", prval3);
printf("omap_get_vals: prval=%d\n", prval4);
printf("omap_get_vals_by_keys: prval=%d\n", prval5);
}
/* free the address */
......
......@@ -7,9 +7,8 @@
#include "types.h"
#include "src/write-op-visitor.h"
#include "src/read-op-visitor.h"
#include "src/read-op-impl.h"
#include "src/omap-iter-impl.h"
#include "src/read-responses.h"
#include "src/read-resp-impl.h"
/* after serving this number of rpcs, the server will shut down. */
static const int TOTAL_RPCS = 16;
......@@ -97,7 +96,7 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h)
assert(ret == HG_SUCCESS);
/* Execute the operation chain */
execute_write_op_visitor(&write_op_printer, in.chain, in.object_name);
execute_write_op_visitor(&write_op_printer, in.write_op, in.object_name);
// set the return value of the RPC
out.ret = 0;
......@@ -160,15 +159,17 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h)
assert(ret == HG_SUCCESS);
/* Create a response list matching the input actions */
rd_response_base_t resp_chain = build_matching_read_responses(in.chain->actions);
read_response_t resp = build_matching_read_responses(in.read_op);
/* Compute the result. */
execute_read_op_visitor(&read_op_printer, in.chain, in.object_name);
execute_read_op_visitor(&read_op_printer, in.read_op, in.object_name);
out.responses = resp;
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
free_read_responses(resp_chain);
free_read_responses(resp);
/* Free the input data. */
ret = margo_free_input(h, &in);
......
......@@ -7,17 +7,18 @@
#include <libmobject-store.h>
#include "src/proc-write-actions.h"
#include "src/proc-read-actions.h"
#include "src/proc-read-responses.h"
MERCURY_GEN_PROC(write_op_in_t,
((hg_string_t)(object_name))\
((mobject_store_write_op_t)(chain)))
((mobject_store_write_op_t)(write_op)))
MERCURY_GEN_PROC(write_op_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(read_op_in_t,
((hg_string_t)(object_name))\
((mobject_store_read_op_t)(chain)))
((mobject_store_read_op_t)(read_op)))
MERCURY_GEN_PROC(read_op_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(read_op_out_t, ((read_response_t)(responses)))
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment