Commit 21ada1ab authored by Matthieu Dorier's avatar Matthieu Dorier

started read response lists

parent f9f4d76e
......@@ -10,7 +10,8 @@ src_libmobject_store_la_SOURCES = \
src/write-op-visitor.c \
src/read-op-visitor.c \
src/omap-iter-impl.c \
src/proc-omap-iter.c
src/proc-omap-iter.c \
src/read-responses.c
noinst_HEADERS += \
src/log.h \
......@@ -28,4 +29,5 @@ noinst_HEADERS += \
src/read-op-visitor.h \
src/buffer-union.h \
src/omap-iter-impl.h \
src/proc-omap-iter.h
src/proc-omap-iter.h \
src/read-responses.h
......@@ -11,8 +11,8 @@
/**
* This file contains a set of structures meant
* to store arguments for write_op operations in
* a buffer. Some of these structures are self-sufficiant.
* to store arguments for read_op operations in
* a buffer. Some of these structures are self-sufficient.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
......
......@@ -12,7 +12,7 @@
/**
* This file contains a set of structures meant
* to store arguments for write_op operations in
* a buffer. Some of these structures are self-sufficiant.
* a buffer. Some of these structures are self-sufficient.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
......
......@@ -6,6 +6,7 @@
#ifndef __MOBJECT_BUFFER_UNION_H
#define __MOBJECT_BUFFER_UNION_H
#include <stdint.h>
/**
* This union is defined to be used in read and write actions
* involving either a local pointer (const char*) or an offset
......
......@@ -26,7 +26,7 @@ void omap_iter_free(mobject_store_omap_iter_t iter)
free((void*)(elt->key));
free((void*)(elt->value));
free((void*)(elt));
}
}
free(iter);
}
......
......@@ -7,6 +7,7 @@
#define __MOBJECT_READ_OPCODES_H
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "buffer-union.h"
typedef enum {
......@@ -65,9 +66,9 @@ typedef struct rd_action_OMAP_GET_VALS {
struct rd_action_BASE base;
const char* start_after;
const char* filter_prefix;
uint64_t max_return;
mobject_store_omap_iter_t* iter;
int* prval;
uint64_t max_return;
mobject_store_omap_iter_t* iter;
int* prval;
size_t data_size;
char data[1];
}* rd_action_omap_get_vals_t;
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include "read-responses.h"
#include "omap-iter-impl.h"
#include "utlist.h"
typedef rd_response_base_t (*build_matching_fn)(rd_action_base_t);
static rd_response_base_t build_matching_stat(rd_action_stat_t a);
static rd_response_base_t build_matching_read(rd_action_read_t a);
static rd_response_base_t build_matching_omap_get_keys(rd_action_omap_get_keys_t a);
static rd_response_base_t build_matching_omap_get_vals(rd_action_omap_get_vals_t a);
static rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_by_keys_t a);
typedef void (*free_response_fn)(rd_response_base_t);
static void free_resp_omap(rd_response_omap_t a) {
omap_iter_free(a->iter);
free(a);
};
static build_matching_fn match_fn[] = {
NULL,
(build_matching_fn)build_matching_stat,
(build_matching_fn)build_matching_read,
(build_matching_fn)build_matching_omap_get_keys,
(build_matching_fn)build_matching_omap_get_vals,
(build_matching_fn)build_matching_omap_get_vals_by_keys
};
static free_response_fn free_fn[] = {
NULL,
(free_response_fn)free,
(free_response_fn)free,
(free_response_fn)free_resp_omap
};
rd_response_base_t build_matching_read_responses(rd_action_base_t actions)
{
rd_action_base_t a;
rd_response_base_t resp = NULL;
rd_response_base_t tmp = NULL;
DL_FOREACH(actions, a) {
tmp = match_fn[a->type](a);
DL_APPEND(resp, tmp);
}
return resp;
}
void free_read_responses(rd_response_base_t responses)
{
rd_response_base_t r, tmp;
DL_FOREACH_SAFE(responses, r, tmp) {
DL_DELETE(responses, r);
free_fn[r->type](r);
}
}
rd_response_base_t build_matching_stat(rd_action_stat_t a)
{
rd_response_stat_t resp = (rd_response_stat_t)calloc(1, sizeof(*resp));
resp->base.type = READ_RESPCODE_STAT;
a->psize = &(resp->psize);
a->pmtime = &(resp->pmtime);
a->prval = &(resp->prval);
return (rd_response_base_t)resp;
}
rd_response_base_t build_matching_read(rd_action_read_t a)
{
rd_response_read_t resp = (rd_response_read_t)calloc(1, sizeof(*resp));
resp->base.type = READ_RESPCODE_READ;
a->bytes_read = &(resp->bytes_read);
a->prval = &(resp->prval);
return (rd_response_base_t)resp;
}
rd_response_base_t build_matching_omap_get_keys(rd_action_omap_get_keys_t a)
{
rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp));
resp->base.type = READ_RESPCODE_OMAP;
a->prval = &(resp->prval);
a->iter = &(resp->iter);
return (rd_response_base_t)resp;
}
rd_response_base_t build_matching_omap_get_vals(rd_action_omap_get_vals_t a)
{
rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp));
resp->base.type = READ_RESPCODE_OMAP;
a->prval = &(resp->prval);
a->iter = &(resp->iter);
return (rd_response_base_t)resp;
}
rd_response_base_t build_matching_omap_get_vals_by_keys(rd_action_omap_get_vals_by_keys_t a)
{
rd_response_omap_t resp = (rd_response_omap_t)calloc(1, sizeof(*resp));
resp->base.type = READ_RESPCODE_OMAP;
a->prval = &(resp->prval);
a->iter = &(resp->iter);
return (rd_response_base_t)resp;
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_READ_RESPONSE_H
#define __MOBJECT_READ_RESPONSE_H
#include "mobject-store-config.h"
#include "read-actions.h"
/**
* This file contains a set of structures meant
* to store responses for read_op operations in
* a buffer. Some of these structures are self-sufficiant.
* Some other are meant to be used as headers, and the
* the serialized buffer actually contains additional
* data after them.
*/
typedef enum {
READ_RESPCODE_BASE = 0,
READ_RESPCODE_STAT,
READ_RESPCODE_READ,
READ_RESPCODE_OMAP,
_READ_RESPCODE_END_ENUM_
} read_resp_code_t;
typedef struct rd_response_BASE {
read_resp_code_t type;
struct rd_response_BASE* prev;
struct rd_response_BASE* next;
}* rd_response_base_t;
/**
* stat response
*/
typedef struct rd_response_STAT {
struct rd_response_BASE base;
uint64_t psize;
time_t pmtime;
int prval;
}* rd_response_stat_t;
/**
* read response
*/
typedef struct rd_response_READ {
struct rd_response_BASE base;
size_t bytes_read;
int prval;
}* rd_response_read_t;
/**
* omap_* responses
*/
typedef struct rd_response_OMAP {
struct rd_response_BASE base;
int prval;
mobject_store_omap_iter_t iter;
}* rd_response_omap_t;
rd_response_base_t build_matching_read_responses(rd_action_base_t actions);
void free_read_responses(rd_response_base_t responses);
#endif
......@@ -7,6 +7,9 @@
#include "types.h"
#include "src/write-op-visitor.h"
#include "src/read-op-visitor.h"
#include "src/read-op-impl.h"
#include "src/omap-iter-impl.h"
#include "src/read-responses.h"
/* after serving this number of rpcs, the server will shut down. */
static const int TOTAL_RPCS = 16;
......@@ -156,12 +159,17 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h)
ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS);
/* Create a response list matching the input actions */
rd_response_base_t resp_chain = build_matching_read_responses(in.chain->actions);
/* Compute the result. */
execute_read_op_visitor(&read_op_printer, in.chain, in.object_name);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
free_read_responses(resp_chain);
/* Free the input data. */
ret = margo_free_input(h, &in);
assert(ret == HG_SUCCESS);
......@@ -265,23 +273,34 @@ void read_op_printer_begin(void* u)
void read_op_printer_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
printf("\t<stat/>\n");
*psize = 128;
time(pmtime);
*prval = 1234;
}
void read_op_printer_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
printf("\t<read offset=%ld length=%ld to=%ld/>\n",offset, len, buf.as_offset);
*bytes_read = len;
*prval = 1235;
}
void read_op_printer_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_keys start_after=\"%s\" max_return=%ld />\n", start_after, max_return);
omap_iter_create(iter);
// use omap_iter_append to add things to the iterator
*prval = 1236;
}
void read_op_printer_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_vals start_after=\"%s\" filter_prefix=\"%s\" max_return=%ld />\n",
start_after, filter_prefix, max_return);
omap_iter_create(iter);
*prval = 1237;
}
void read_op_printer_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
......@@ -291,6 +310,9 @@ void read_op_printer_omap_get_vals_by_keys(void* u, char const* const* keys, siz
for(i=0; i<num_keys; i++)
printf("\t\t<record key=\"%s\" />\n", keys[i]);
printf("\t<omap_get_vals_by_keys/>\n");
omap_iter_create(iter);
*prval = 1238;
}
void read_op_printer_end(void* u)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment