Commit c7b518c0 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

name, oid, and omap databases integrated and tested

parent bcc74e03
...@@ -194,7 +194,7 @@ extern "C" { ...@@ -194,7 +194,7 @@ extern "C" {
void mobject_write_op_zero( void mobject_write_op_zero(
mobject_store_write_op_t write_op, mobject_store_write_op_t write_op,
uint64_t offset, uint64_t offset,
uint64_t len); size_t len);
/** /**
* Set key/value pairs on an object * Set key/value pairs on an object
......
...@@ -42,8 +42,8 @@ void mobject_write_op_create(mobject_store_write_op_t write_op, ...@@ -42,8 +42,8 @@ void mobject_write_op_create(mobject_store_write_op_t write_op,
void mobject_write_op_write(mobject_store_write_op_t write_op, void mobject_write_op_write(mobject_store_write_op_t write_op,
const char *buffer, const char *buffer,
size_t len, uint64_t offset,
uint64_t offset) size_t len)
{ {
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect"); MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed"); MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
...@@ -53,7 +53,7 @@ void mobject_write_op_write(mobject_store_write_op_t write_op, ...@@ -53,7 +53,7 @@ void mobject_write_op_write(mobject_store_write_op_t write_op,
action->buffer.as_pointer = buffer; action->buffer.as_pointer = buffer;
action->len = len; action->len = len;
action->offset = offset; action->offset = offset;
WRITE_ACTION_UPCAST(base, action); WRITE_ACTION_UPCAST(base, action);
DL_APPEND(write_op->actions, base); DL_APPEND(write_op->actions, base);
...@@ -98,9 +98,9 @@ void mobject_write_op_write_full(mobject_store_write_op_t write_op, ...@@ -98,9 +98,9 @@ void mobject_write_op_write_full(mobject_store_write_op_t write_op,
void mobject_write_op_write_same(mobject_store_write_op_t write_op, void mobject_write_op_write_same(mobject_store_write_op_t write_op,
const char *buffer, const char *buffer,
uint64_t offset,
size_t data_len, size_t data_len,
size_t write_len, size_t write_len)
uint64_t offset)
{ {
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect"); MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed"); MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
...@@ -186,7 +186,7 @@ void mobject_write_op_truncate(mobject_store_write_op_t write_op, ...@@ -186,7 +186,7 @@ void mobject_write_op_truncate(mobject_store_write_op_t write_op,
void mobject_write_op_zero(mobject_store_write_op_t write_op, void mobject_write_op_zero(mobject_store_write_op_t write_op,
uint64_t offset, uint64_t offset,
uint64_t len) size_t len)
{ {
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect"); MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed"); MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
......
...@@ -14,9 +14,14 @@ ...@@ -14,9 +14,14 @@
#include "src/server/core/covermap.hpp" #include "src/server/core/covermap.hpp"
static int tabs = 0; static int tabs = 0;
/*
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;} #define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); } #define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); } #define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
*/
#define ENTERING
#define LEAVING
#define ERROR
static void read_op_exec_begin(void*); static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*); static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
...@@ -113,7 +118,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_ ...@@ -113,7 +118,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) { while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) {
const segment_key_t& seg = it->first; const segment_key_t& seg = it->first;
const bake_region_id_t& region = it->second; const bake_region_id_t& region = it->second;
switch(seg.type) { switch(seg.type) {
case seg_type_t::ZERO: case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index); coverage.set(seg.start_index, seg.end_index);
......
...@@ -8,10 +8,16 @@ ...@@ -8,10 +8,16 @@
#include "src/io-chain/write-op-visitor.h" #include "src/io-chain/write-op-visitor.h"
#include "src/server/core/fake-kv.hpp" #include "src/server/core/fake-kv.hpp"
#if 0
static int tabs = 0; static int tabs = 0;
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;} #define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); } #define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); } #define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
#else
#define ENTERING
#define LEAVING
#define ERROR
#endif
static void write_op_exec_begin(void*); static void write_op_exec_begin(void*);
static void write_op_exec_end(void*); static void write_op_exec_end(void*);
...@@ -177,8 +183,23 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len) ...@@ -177,8 +183,23 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
unsigned i; unsigned i;
// TODO: check return values of those calls // TODO: check return values of those calls
ret = bake_create(bph, bti, len, &rid); ret = bake_create(bph, bti, len, &rid);
if(ret != 0) {
ERROR fprintf(stderr,"bake_create() returned %d\n", ret);
LEAVING;
return;
}
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len); ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
if(ret != 0) {
ERROR fprintf(stderr,"bake_proxy_write() returned %d\n", ret);
LEAVING;
return;
}
ret = bake_persist(bph, rid); ret = bake_persist(bph, rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist() returned %d\n", ret);
LEAVING;
return;
}
insert_region_log_entry(oid, 0, len, &rid); insert_region_log_entry(oid, 0, len, &rid);
LEAVING; LEAVING;
} }
......
...@@ -30,5 +30,4 @@ typedef struct omap_key_t { ...@@ -30,5 +30,4 @@ typedef struct omap_key_t {
#define MAX_OMAP_VAL_SIZE 256 #define MAX_OMAP_VAL_SIZE 256
#define SMALL_REGION_THRESHOLD (sizeof(bake_region_id_t)) #define SMALL_REGION_THRESHOLD (sizeof(bake_region_id_t))
#endif #endif
...@@ -26,23 +26,23 @@ int main(int argc, char** argv) ...@@ -26,23 +26,23 @@ int main(int argc, char** argv)
mobject_store_write_op_write(write_op, content+8, 4, 8); mobject_store_write_op_write(write_op, content+8, 4, 8);
// Add a "writesame" operation to write "DDDD" in two "DD", // Add a "writesame" operation to write "DDDD" in two "DD",
// content should then be "AAAABBBBCCCCDDDD" // content should then be "AAAABBBBCCCCDDDD"
// mobject_store_write_op_writesame(write_op, content+12, 2, 4, 12); mobject_store_write_op_writesame(write_op, content+12, 2, 4, 12);
// Add a "append" operation to append "EEEEFFFF" // Add a "append" operation to append "EEEEFFFF"
// content should then be "AAAABBBBCCCCDDDDEEEEFFFFF" // content should then be "AAAABBBBCCCCDDDDEEEEFFFFF"
// mobject_store_write_op_append(write_op, content+16, 8); mobject_store_write_op_append(write_op, content+16, 8);
// Add a "remove" operation // Add a "remove" operation
// mobject_store_write_op_remove(write_op); // mobject_store_write_op_remove(write_op);
// Add a "truncate" operation to remove the "FFFF" part // Add a "truncate" operation to remove the "FFFF" part
// content should then be "AAAABBBBCCCCDDDDEEEE" // content should then be "AAAABBBBCCCCDDDDEEEE"
// mobject_store_write_op_truncate(write_op, 20); mobject_store_write_op_truncate(write_op, 20);
// Add a "zero" operation zero-ing the "BBBBCCCC" // Add a "zero" operation zero-ing the "BBBBCCCC"
// content should then be "AAAA********DDDDEEEE" where "*" represent 0s // content should then be "AAAA********DDDDEEEE" where "*" represent 0s
// mobject_store_write_op_zero(write_op, 4, 8); mobject_store_write_op_zero(write_op, 4, 8);
// Add a "omap_set" operation // Add a "omap_set" operation
const char* keys[] = { "matthieu", "rob", "shane", "phil", "robl" }; const char* keys[] = { "matthieu", "rob", "shane", "phil", "robl" };
const char* values[] = { "mdorier@anl.gov", "rross@anl.gov", "ssnyder@anl.gov", "carns@anl.gov", "robl@anl.gov" }; const char* values[] = { "mdorier@anl.gov", "rross@anl.gov", "ssnyder@anl.gov", "carns@anl.gov", "robl@anl.gov" };
size_t val_sizes[] = { 16, 14, 16, 14, 13 }; size_t val_sizes[] = { 16, 14, 16, 14, 13 };
mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5); // mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5);
// keys will be sorted and stored as follows: // keys will be sorted and stored as follows:
/* matthieu => mdorier@anl.gov /* matthieu => mdorier@anl.gov
phil => carns@anl.gov phil => carns@anl.gov
...@@ -74,6 +74,7 @@ int main(int argc, char** argv) ...@@ -74,6 +74,7 @@ int main(int argc, char** argv)
int prval2; int prval2;
mobject_store_read_op_read(read_op, 0, 512, read_buf, &bytes_read, &prval2); mobject_store_read_op_read(read_op, 0, 512, read_buf, &bytes_read, &prval2);
// Add "omap_get_keys" operation // Add "omap_get_keys" operation
#if 0
const char* start_after1 = "rob"; const char* start_after1 = "rob";
mobject_store_omap_iter_t iter3; mobject_store_omap_iter_t iter3;
int prval3; int prval3;
...@@ -91,7 +92,7 @@ int main(int argc, char** argv) ...@@ -91,7 +92,7 @@ int main(int argc, char** argv)
mobject_store_omap_iter_t iter5; mobject_store_omap_iter_t iter5;
int prval5; int prval5;
mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter5, &prval5); mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter5, &prval5);
#endif
mobject_store_read_op_operate(read_op, ioctx, "test-object",LIBMOBJECT_OPERATION_NOFLAG); mobject_store_read_op_operate(read_op, ioctx, "test-object",LIBMOBJECT_OPERATION_NOFLAG);
mobject_store_release_read_op(read_op); mobject_store_release_read_op(read_op);
...@@ -107,7 +108,7 @@ int main(int argc, char** argv) ...@@ -107,7 +108,7 @@ int main(int argc, char** argv)
for(i=0; i<bytes_read; i++) printf("%c", read_buf[i] ? read_buf[i] : '*' ); for(i=0; i<bytes_read; i++) printf("%c", read_buf[i] ? read_buf[i] : '*' );
printf("\n"); printf("\n");
} }
#if 0
printf("omap_get_keys: prval=%d\n", prval3); printf("omap_get_keys: prval=%d\n", prval3);
{ {
char* key = NULL; char* key = NULL;
...@@ -138,8 +139,8 @@ int main(int argc, char** argv) ...@@ -138,8 +139,8 @@ int main(int argc, char** argv)
if(key) printf("===> key: \"%s\" , val: %s \n", key, val); if(key) printf("===> key: \"%s\" , val: %s \n", key, val);
} while(key); } while(key);
} }
#endif
} }
mobject_store_ioctx_destroy(ioctx); mobject_store_ioctx_destroy(ioctx);
mobject_store_shutdown(cluster); mobject_store_shutdown(cluster);
......
...@@ -25,7 +25,7 @@ function mobject_test_start_servers() ...@@ -25,7 +25,7 @@ function mobject_test_start_servers()
rm -rf ${storage} rm -rf ${storage}
bake-mkpool -s 50M /dev/shm/mobject.dat bake-mkpool -s 50M /dev/shm/mobject.dat
run_to $maxtime mpirun -np $nservers src/server/mobject-server-daemon na+sm:// $cfile & run_to $maxtime mpirun -np $nservers src/server/mobject-server-daemon tcp:// $cfile &
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
# TODO: this doesn't actually work; can't check return code of # TODO: this doesn't actually work; can't check return code of
# something executing in background. We have to rely on the # something executing in background. We have to rely on the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment