Commit c1d1362c authored by Matthieu Dorier's avatar Matthieu Dorier

added implementation of fake C++ backend

parent a4deb170
......@@ -21,8 +21,8 @@ noinst_HEADERS += \
src/omap-iter/proc-omap-iter.h \
src/rpc-types/read-op.h \
src/rpc-types/write-op.h \
src/server/exec-read-op.h\
src/server/exec-write-op.h \
src/server/print-read-op.h\
src/server/print-write-op.h \
src/util/buffer-union.h \
src/util/log.h \
src/util/utlist.h
......@@ -58,8 +58,11 @@ src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
src_server_libmobject_server_la_SOURCES = \
src/server/mobject-server.c \
src/server/exec-write-op.c \
src/server/exec-read-op.c
src/server/fake/fake-write-op.cpp \
src/server/fake/fake-read-op.cpp \
src/server/fake/fake-db.cpp \
src/server/print-write-op.c \
src/server/print-read-op.c
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${SERVER_LIBS}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
}
......@@ -4,6 +4,7 @@
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <stdio.h>
#include "libmobject-store.h"
#include "src/omap-iter/omap-iter-impl.h"
......@@ -12,13 +13,18 @@ int mobject_store_omap_get_next(mobject_store_omap_iter_t iter,
char **val,
size_t *len)
{
if(iter->current == NULL) return -1;
if(iter->current == NULL) {
*key = NULL;
*val = NULL;
*len = 0;
return -1;
}
*key = iter->current->key;
*val = iter->current->value;
*len = iter->current->value_size;
iter->current = iter->current->next;
if(iter->current == iter->head) iter->current = NULL;
return 0;
}
......
......@@ -9,6 +9,10 @@
#include "libmobject-store.h"
#include "src/util/buffer-union.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct read_op_visitor {
void (*visit_begin)(void*);
void (*visit_stat)(void*, uint64_t*, time_t*, int*);
......@@ -21,4 +25,8 @@ typedef struct read_op_visitor {
void execute_read_op_visitor(read_op_visitor_t visitor, mobject_store_read_op_t read_op, void* uarg);
#ifdef __cplusplus
}
#endif
#endif
......@@ -9,6 +9,10 @@
#include "libmobject-store.h"
#include "src/util/buffer-union.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct write_op_visitor {
void (*visit_begin)(void*);
void (*visit_create)(void*, int);
......@@ -26,4 +30,8 @@ typedef struct write_op_visitor {
void execute_write_op_visitor(write_op_visitor_t visitor, mobject_store_write_op_t write_op, void* uargs);
#ifdef __cplusplus
}
#endif
#endif
......@@ -26,8 +26,8 @@ void omap_iter_free(mobject_store_omap_iter_t iter)
DL_FOREACH_SAFE(iter->head, elt, tmp) {
DL_DELETE(iter->head, elt);
free((void*)(elt->key));
free((void*)(elt->value));
if(elt->key) free((void*)(elt->key));
if(elt->value) free((void*)(elt->value));
free((void*)(elt));
}
......@@ -47,11 +47,11 @@ void omap_iter_append(mobject_store_omap_iter_t iter,
MOBJECT_ASSERT(iter, "trying to append to a NULL iterator");
omap_iter_node_t item = (omap_iter_node_t)calloc(1, sizeof(*item));
item->key = strdup(key);
item->key_size = strlen(key)+1;
item->key = key ? strdup(key) : NULL;
item->key_size = key ? strlen(key)+1 : 0;
item->value_size = val_size;
item->value = (char*)malloc(val_size);
memcpy(item->value, val, val_size);
item->value = val_size ? (char*)malloc(val_size) : NULL;
if(val) memcpy(item->value, val, val_size);
DL_APPEND(iter->head, item);
......
......@@ -8,6 +8,10 @@
#include "libmobject-store.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct omap_iter_node* omap_iter_node_t;
struct omap_iter_node {
......@@ -35,4 +39,8 @@ void omap_iter_append(mobject_store_omap_iter_t iter,
const char* key, const char* val,
size_t val_size);
#ifdef __cplusplus
}
#endif
#endif
......@@ -3,6 +3,7 @@
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <margo.h>
#include "src/omap-iter/proc-omap-iter.h"
#include "src/util/utlist.h"
......@@ -22,12 +23,16 @@ hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap
DL_FOREACH((*iter)->head, node) {
ret = hg_proc_hg_size_t(proc, &(node->key_size));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, &(node->key), node->key_size);
if(ret != HG_SUCCESS) return ret;
if(node->key_size) {
ret = hg_proc_memcpy(proc, node->key, node->key_size);
if(ret != HG_SUCCESS) return ret;
}
ret = hg_proc_hg_size_t(proc, &(node->value_size));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_memcpy(proc, node->value, node->value_size);
if(ret != HG_SUCCESS) return ret;
if(node->value_size) {
ret = hg_proc_memcpy(proc, node->value, node->value_size);
if(ret != HG_SUCCESS) return ret;
}
}
break;
......@@ -35,23 +40,34 @@ hg_return_t hg_proc_mobject_store_omap_iter_t(hg_proc_t proc, mobject_store_omap
case HG_DECODE:
omap_iter_create(iter);
ret = hg_proc_hg_size_t(proc, &((*iter)->num_items));
size_t num_items = 0;
ret = hg_proc_hg_size_t(proc, &num_items);
if(ret != HG_SUCCESS) return ret;
unsigned i;
size_t key_size, val_size;
char* key;
char* val;
for(i = 0; i < (*iter)->num_items; i++) {
for(i = 0; i < num_items; i++) {
ret = hg_proc_hg_size_t(proc, &key_size);
key = (char*)malloc(key_size);
ret = hg_proc_memcpy(proc, key, key_size);
if(ret != HG_SUCCESS) return ret;
if(key_size != 0) {
key = (char*)calloc(1,key_size);
ret = hg_proc_memcpy(proc, key, key_size);
if(ret != HG_SUCCESS) return ret;
} else {
key = NULL;
}
ret = hg_proc_hg_size_t(proc, &val_size);
if(ret != HG_SUCCESS) return ret;
val = (char*)malloc(val_size);
ret = hg_proc_memcpy(proc, val, val_size);
omap_iter_append(*iter, key, val, val_size);
if(val_size != 0) {
val = (char*)calloc(1,val_size);
ret = hg_proc_memcpy(proc, val, val_size);
} else {
val = NULL;
}
omap_iter_append(*iter, key, val, val_size);
if(key) free(key);
if(val) free(val);
}
break;
......
#include <map>
#include <string>
#include "src/server/fake/fake-object.hpp"
std::map< std::string, fake_object > fake_db;
#ifndef __FAKE_OBJECT_HPP
#define __FAKE_OBJECT_HPP
#include <iostream>
#include <cstring>
#include <vector>
#include <string>
#include <margo.h>
#include <libmobject-store.h>
#include "src/omap-iter/omap-iter-impl.h"
class fake_object {
private:
std::vector<char> m_data; // data associated with the object
std::map< std::string, std::vector<char> > m_omap; // omap
time_t m_creation_time; // creation time
time_t m_modification_time; // modification time
public:
fake_object()
{
time(&m_creation_time);
m_modification_time = m_creation_time;
}
void omap_set(const std::string& key, size_t len, const char* val) {
m_omap[key] = std::vector<char>(val, val+len);
}
void omap_rm(const std::string& key) {
m_omap.erase(key);
}
void omap_get_keys(const std::string& start_after, size_t max, mobject_store_omap_iter_t iter) const {
auto it = m_omap.lower_bound(start_after);
for(size_t i = 0; (it != m_omap.end()) && (i < max); it++, i++) {
auto& key = it->first;
omap_iter_append(iter, &key[0], (const char*)0 , 0);
}
}
void omap_get_vals(const std::string& start_after, const std::string& filter_prefix,
size_t max, mobject_store_omap_iter_t iter) const {
auto it = m_omap.lower_bound(start_after);
for(size_t i = 0; (it != m_omap.end()) && (i < max); it++, i++) {
auto& key = it->first;
auto& val = it->second;
if(key.substr(0, filter_prefix.size()) != filter_prefix) continue;
omap_iter_append(iter, &key[0], &val[0], val.size());
}
}
void omap_get_vals_by_keys(const std::vector<std::string>& keys, mobject_store_omap_iter_t iter) const {
for(auto& k : keys) {
if(m_omap.count(k) != 0) {
auto it = m_omap.find(k);
const auto& val = it->second;
omap_iter_append(iter, &k[0], &val[0], val.size());
}
}
}
void write(margo_instance_id mid, hg_addr_t client_addr, hg_bulk_t bulk_handle,
uint64_t remote_offset, uint64_t local_offset, size_t len) {
if(local_offset + len > m_data.size()) m_data.resize(local_offset + len);
std::vector<void*> buf_ptrs(1);
std::vector<hg_size_t> buf_sizes(1);
buf_ptrs[0] = (void*)(&m_data[local_offset]);
buf_sizes[0] = len;
hg_bulk_t local_bulk;
hg_return_t ret =
margo_bulk_create(mid, 1, buf_ptrs.data(), buf_sizes.data(),
HG_BULK_WRITE_ONLY, &local_bulk);
// TODO error checking
ret =
margo_bulk_transfer(mid, HG_BULK_PULL, client_addr, bulk_handle,
remote_offset, local_bulk, 0, len);
ret =
margo_bulk_free(local_bulk);
time(&m_modification_time);
}
void write_full(margo_instance_id mid,
hg_addr_t client_addr,
hg_bulk_t bulk_handle,
uint64_t remote_offset, size_t len) {
m_data.resize(len);
write(mid, client_addr, bulk_handle, remote_offset, 0, len);
}
void writesame(margo_instance_id mid,
hg_addr_t client_addr,
hg_bulk_t bulk_handle,
uint64_t remote_offset,
uint64_t local_offset,
size_t data_len,
size_t write_len) {
if(write_len < data_len) {
data_len = write_len;
}
uint64_t base_offset = local_offset;
write(mid, client_addr, bulk_handle, remote_offset, local_offset, data_len);
write_len -= data_len;
local_offset += data_len;
if(local_offset + write_len > m_data.size()) {
m_data.resize(local_offset + write_len);
}
while(write_len > 0) {
if(write_len < data_len) data_len = write_len;
std::memcpy((void*)(&m_data[local_offset]),(void*)(&m_data[base_offset]),data_len);
local_offset += data_len;
write_len -= data_len;
}
}
void append(margo_instance_id mid,
hg_addr_t client_addr,
hg_bulk_t bulk_handle,
uint64_t remote_offset,
size_t len) {
uint64_t local_offset = m_data.size();
write(mid, client_addr, bulk_handle, remote_offset, local_offset, len);
}
void truncate(uint64_t offset) {
m_data.resize(offset);
time(&m_modification_time);
}
void zero(uint64_t offset, size_t size) {
if(offset+size > m_data.size()) {
m_data.resize(offset+size);
}
std::memset((void*)(&m_data[offset]), 0, size);
time(&m_modification_time);
}
void stat(size_t* psize, time_t* pmtime) const {
*psize = m_data.size();
*pmtime = m_modification_time;
}
void read(margo_instance_id mid, hg_addr_t client_addr, hg_bulk_t remote_handle,
uint64_t remote_offset,
uint64_t local_offset, size_t len, size_t* bytes_read) const {
if(local_offset > m_data.size()) {
*bytes_read = 0;
return;
}
// Figure out what we can read
if(local_offset + len > m_data.size()) len = m_data.size() - local_offset;
std::vector<void*> buf_ptrs(1);
std::vector<hg_size_t> buf_sizes(1);
buf_ptrs[0] = (void*)(&m_data[local_offset]);
buf_sizes[0] = len;
hg_bulk_t local_bulk;
hg_return_t ret =
margo_bulk_create(mid, 1, buf_ptrs.data(), buf_sizes.data(),
HG_BULK_READ_ONLY, &local_bulk);
// TODO error checking
ret =
margo_bulk_transfer(mid, HG_BULK_PUSH, client_addr, remote_handle,
remote_offset, local_bulk, 0, len);
ret =
margo_bulk_free(local_bulk);
*bytes_read = len;
}
};
#endif
#include <map>
#include <string>
#include <iostream>
#include "src/server/fake/fake-object.hpp"
#include "src/server/visitor-args.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
extern std::map< std::string, fake_object > fake_db;
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_exec_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat,
.visit_read = read_op_exec_read,
.visit_omap_get_keys = read_op_exec_omap_get_keys,
.visit_omap_get_vals = read_op_exec_omap_get_vals,
.visit_omap_get_vals_by_keys = read_op_exec_omap_get_vals_by_keys,
.visit_end = read_op_exec_end
};
extern "C" void fake_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs)
{
execute_read_op_visitor(&read_op_exec, read_op, (void*)vargs);
}
void read_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (stat) Object " << name << " does not exist" << std::endl;
*prval = -1;
return;
}
fake_db[name].stat(psize, pmtime);
*prval = 0;
}
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (read) Object " << name << " does not exist" << std::endl;
*prval = -1;
return;
}
fake_db[name].read(vargs->mid, vargs->client_addr, vargs->bulk_handle,
buf.as_offset, offset, len, bytes_read);
*prval = 0;
}
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (omap_get_keys) Object " << name << " does not exist" << std::endl;
*prval = -1;
*iter = NULL;
return;
}
omap_iter_create(iter);
fake_db[name].omap_get_keys(start_after, max_return, *iter);
*prval = 0;
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (omap_get_vals) Object " << name << " does not exist" << std::endl;
*prval = -1;
*iter = NULL;
return;
}
omap_iter_create(iter);
fake_db[name].omap_get_vals(start_after, filter_prefix, max_return, *iter);
*prval = 0;
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (omap_get_vals_by_keys) Object " << name << " does not exist" << std::endl;
*prval = -1;
*iter = NULL;
return;
}
omap_iter_create(iter);
std::vector<std::string> keys_vec(keys, keys+num_keys);
fake_db[name].omap_get_vals_by_keys(keys_vec, *iter);
*prval = 0;
}
void read_op_exec_end(void* u)
{
}
#ifndef __FAKE_READ_OP_H
#define __FAKE_READ_OP_H
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/visitor-args.h"
#ifdef __cplusplus
extern "C" {
#endif
void fake_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs);
#ifdef __cplusplus
}
#endif
#endif
#include <map>
#include <string>
#include <iostream>
#include "src/server/visitor-args.h"
#include "src/server/fake/fake-object.hpp"
#include "src/io-chain/write-op-visitor.h"
extern std::map< std::string, fake_object > fake_db;
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create,
.visit_write = write_op_exec_write,
.visit_write_full = write_op_exec_write_full,
.visit_writesame = write_op_exec_writesame,
.visit_append = write_op_exec_append,
.visit_remove = write_op_exec_remove,
.visit_truncate = write_op_exec_truncate,
.visit_zero = write_op_exec_zero,
.visit_omap_set = write_op_exec_omap_set,
.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
.visit_end = write_op_exec_end
};
extern "C" void fake_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
/* Execute the operation chain */
execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}
void write_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_create(void* u, int exclusive)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) != 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (create) Object " << name << " already exists" << std::endl;
return;
}
fake_db[name] = fake_object();
}
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (write) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].write(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, len);
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (write_full) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].write_full(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (writesame) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].writesame(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (append) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].append(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_remove(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (remove) Object " << name << " did not exist" << std::endl;
return;
}
fake_db.erase(name);
}
void write_op_exec_truncate(void* u, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (truncate) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].truncate(offset);
}
void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
std::string name(vargs->object_name);
if(fake_db.count(