Commit f9f4d76e authored by Matthieu Dorier's avatar Matthieu Dorier

added tests for read_op

parent 4ab5867f
...@@ -453,8 +453,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op, ...@@ -453,8 +453,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
*/ */
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op, void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after, const char *start_after,
uint64_t max_return, uint64_t max_return,
mobject_store_omap_iter_t *iter, mobject_store_omap_iter_t *iter,
int *prval); int *prval);
/** /**
......
...@@ -16,7 +16,10 @@ static void prepare_read(uint64_t* cur_offset, ...@@ -16,7 +16,10 @@ static void prepare_read(uint64_t* cur_offset,
void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op) void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op)
{ {
if(read_op->ready == 1) return; if(read_op->ready == 1) return;
if(read_op->num_actions == 0) return; if(read_op->num_actions == 0) {
read_op->ready = 1;
return;
}
rd_action_base_t action; rd_action_base_t action;
......
...@@ -277,10 +277,10 @@ static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc, ...@@ -277,10 +277,10 @@ static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
(*action)->max_return = a.max_return; (*action)->max_return = a.max_return;
(*action)->data_size = a.data_size; (*action)->data_size = a.data_size;
(*action)->start_after = (*action)->data; (*action)->start_after = (*action)->data;
size_t s = strlen((*action)->start_after);
(*action)->filter_prefix = (*action)->data + s + 1;
ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size); ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
size_t s = strlen((*action)->start_after);
(*action)->filter_prefix = (*action)->data + s + 1;
return ret; return ret;
} }
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <libmobject-store.h> #include <libmobject-store.h>
#include "types.h" #include "types.h"
#include "src/prepare-write-op.h" #include "src/prepare-write-op.h"
#include "src/prepare-read-op.h"
/* Main function. */ /* Main function. */
int main(int argc, char** argv) int main(int argc, char** argv)
...@@ -30,11 +31,7 @@ int main(int argc, char** argv) ...@@ -30,11 +31,7 @@ int main(int argc, char** argv)
{ // WRITE OP TEST { // WRITE OP TEST
write_op_in_t in;
in.object_name = "test-write-object";
mobject_store_write_op_t write_op = mobject_store_create_write_op(); mobject_store_write_op_t write_op = mobject_store_create_write_op();
in.chain = write_op;
// Add a "create" operation // Add a "create" operation
mobject_store_write_op_create(write_op, LIBMOBJECT_CREATE_EXCLUSIVE, NULL); mobject_store_write_op_create(write_op, LIBMOBJECT_CREATE_EXCLUSIVE, NULL);
...@@ -59,7 +56,13 @@ int main(int argc, char** argv) ...@@ -59,7 +56,13 @@ int main(int argc, char** argv)
mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5); mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5);
// Add a omap_rm_keys" operation // Add a omap_rm_keys" operation
mobject_store_write_op_omap_rm_keys(write_op, keys, 5); mobject_store_write_op_omap_rm_keys(write_op, keys, 5);
// the operation chain should be prepare for sending before being serialized
// BEGIN this is what write_op_operate should contain
write_op_in_t in;
in.object_name = "test-object";
in.chain = write_op;
prepare_write_op(mid, write_op); prepare_write_op(mid, write_op);
hg_handle_t h; hg_handle_t h;
...@@ -69,18 +72,49 @@ int main(int argc, char** argv) ...@@ -69,18 +72,49 @@ int main(int argc, char** argv)
write_op_out_t resp; write_op_out_t resp;
margo_get_output(h, &resp); margo_get_output(h, &resp);
// printf("Got response: %d+%d = %d\n", args.x, args.y, resp.ret);
margo_free_output(h,&resp); margo_free_output(h,&resp);
margo_destroy(h); margo_destroy(h);
// END this is what write_op_operate should contain
mobject_store_release_write_op(write_op);
} }
#if 0
{ // READ OP TEST { // READ OP TEST
mobject_store_read_op_t read_op = mobject_store_create_read_op();
// Add "stat" operation
uint64_t psize;
time_t pmtime;
int prval1;
mobject_store_read_op_stat(read_op, &psize, &pmtime, &prval1);
// Add "read" operation
char read_buf[512];
size_t bytes_read;
int prval2;
mobject_store_read_op_read(read_op, 2, 32, buffer, &bytes_read, &prval2);
// Add "omap_get_keys" operation
const char* start_after = "shane";
mobject_store_omap_iter_t iter3;
int prval3;
mobject_store_read_op_omap_get_keys(read_op, start_after, 7, &iter3, &prval3);
// Add "omap_get_vals" operation
const char* filter_prefix = "p";
mobject_store_omap_iter_t iter4;
int prval4;
mobject_store_read_op_omap_get_vals(read_op, start_after, filter_prefix, 3, &iter4, &prval4);
// Add "omap_get_vals_by_keys" operation
const char* keys[] = {"matthieu", "shane"};
mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter4, &prval4);
// BEGIN this is what read_op_operate should contain
read_op_in_t in; read_op_in_t in;
in.object_name = "test-object";
in.chain = read_op;
// TODO fill the read_op_in_t prepare_read_op(mid, read_op);
hg_handle_t h; hg_handle_t h;
margo_create(mid, svr_addr, read_op_rpc_id, &h); margo_create(mid, svr_addr, read_op_rpc_id, &h);
...@@ -93,8 +127,12 @@ int main(int argc, char** argv) ...@@ -93,8 +127,12 @@ int main(int argc, char** argv)
margo_free_output(h,&resp); margo_free_output(h,&resp);
margo_destroy(h); margo_destroy(h);
// END this is what read_op_operate should contain
mobject_store_release_read_op(read_op);
} }
#endif
/* free the address */ /* free the address */
margo_addr_free(mid, svr_addr); margo_addr_free(mid, svr_addr);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <mercury.h> #include <mercury.h>
#include "types.h" #include "types.h"
#include "src/write-op-visitor.h" #include "src/write-op-visitor.h"
#include "src/read-op-visitor.h"
/* after serving this number of rpcs, the server will shut down. */ /* after serving this number of rpcs, the server will shut down. */
static const int TOTAL_RPCS = 16; static const int TOTAL_RPCS = 16;
...@@ -93,7 +94,7 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h) ...@@ -93,7 +94,7 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h)
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
/* Execute the operation chain */ /* Execute the operation chain */
execute_write_op_visitor(&write_op_printer, in.chain, NULL); execute_write_op_visitor(&write_op_printer, in.chain, in.object_name);
// set the return value of the RPC // set the return value of the RPC
out.ret = 0; out.ret = 0;
...@@ -121,6 +122,25 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h) ...@@ -121,6 +122,25 @@ hg_return_t mobject_write_op_rpc(hg_handle_t h)
} }
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_rpc) DEFINE_MARGO_RPC_HANDLER(mobject_write_op_rpc)
static void read_op_printer_begin(void*);
static void read_op_printer_stat(void*, uint64_t*, time_t*, int*);
static void read_op_printer_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_printer_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_printer_end(void*);
struct read_op_visitor read_op_printer = {
.visit_begin = read_op_printer_begin,
.visit_end = read_op_printer_end,
.visit_stat = read_op_printer_stat,
.visit_read = read_op_printer_read,
.visit_omap_get_keys = read_op_printer_omap_get_keys,
.visit_omap_get_vals = read_op_printer_omap_get_vals,
.visit_omap_get_vals_by_keys = read_op_printer_omap_get_vals_by_keys
};
/* Implementation of the RPC. */ /* Implementation of the RPC. */
hg_return_t mobject_read_op_rpc(hg_handle_t h) hg_return_t mobject_read_op_rpc(hg_handle_t h)
{ {
...@@ -137,7 +157,7 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h) ...@@ -137,7 +157,7 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h)
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
/* Compute the result. */ /* Compute the result. */
// TODO execute_read_op_visitor(&read_op_printer, in.chain, in.object_name);
ret = margo_respond(h, &out); ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
...@@ -162,9 +182,9 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h) ...@@ -162,9 +182,9 @@ hg_return_t mobject_read_op_rpc(hg_handle_t h)
} }
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_rpc) DEFINE_MARGO_RPC_HANDLER(mobject_read_op_rpc)
void write_op_printer_begin(void* unused) void write_op_printer_begin(void* object_name)
{ {
printf("<mobject_write_operation>\n"); printf("<mobject_write_operation on=\"%s\">\n",(char*)object_name);
} }
void write_op_printer_end(void* unused) void write_op_printer_end(void* unused)
...@@ -236,3 +256,44 @@ void write_op_printer_omap_rm_keys(void* u, char const* const* keys, size_t num_ ...@@ -236,3 +256,44 @@ void write_op_printer_omap_rm_keys(void* u, char const* const* keys, size_t num_
} }
printf("\t</omap_rm_keys>\n"); printf("\t</omap_rm_keys>\n");
} }
void read_op_printer_begin(void* u)
{
printf("<mobject_read_operation object_name=\"%s\">\n", (char*)u);
}
void read_op_printer_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
printf("\t<stat/>\n");
}
void read_op_printer_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
printf("\t<read offset=%ld length=%ld to=%ld/>\n",offset, len, buf.as_offset);
}
void read_op_printer_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_keys start_after=\"%s\" max_return=%ld />\n", start_after, max_return);
}
void read_op_printer_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_vals start_after=\"%s\" filter_prefix=\"%s\" max_return=%ld />\n",
start_after, filter_prefix, max_return);
}
void read_op_printer_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
printf("\t<omap_get_vals_by_keys num=%ld>\n", num_keys);
unsigned i;
for(i=0; i<num_keys; i++)
printf("\t\t<record key=\"%s\" />\n", keys[i]);
printf("\t<omap_get_vals_by_keys/>\n");
}
void read_op_printer_end(void* u)
{
printf("</mobject_read_operation>\n");
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment