Commit 057664bc authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

cleaning up and commenting

parent 22efaa79
...@@ -6,6 +6,11 @@ ...@@ -6,6 +6,11 @@
#ifndef __MOBJECT_BUFFER_UNION_H #ifndef __MOBJECT_BUFFER_UNION_H
#define __MOBJECT_BUFFER_UNION_H #define __MOBJECT_BUFFER_UNION_H
/**
* This union is defined to be used in read and write actions
* involving either a local pointer (const char*) or an offset
* within a hg_bulk_t object (unt64_t).
*/
typedef union { typedef union {
const char* as_pointer; const char* as_pointer;
uint64_t as_offset; uint64_t as_offset;
......
...@@ -9,6 +9,15 @@ ...@@ -9,6 +9,15 @@
#include <abt.h> #include <abt.h>
#include "mobject-store-config.h" #include "mobject-store-config.h"
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
* safe and when the operation has completed, as well as potential
* user data and required mechanism to be able to block on the
* completion object.
* mobject_store_completion* is typedef-ed as mobject_store_completion_t
* in libmobject-store.h.
*/
struct mobject_store_completion { struct mobject_store_completion {
mobject_store_callback_t cb_complete; // completion callback mobject_store_callback_t cb_complete; // completion callback
mobject_store_callback_t cb_safe; // safe callback mobject_store_callback_t cb_safe; // safe callback
......
...@@ -11,18 +11,11 @@ ...@@ -11,18 +11,11 @@
static void prepare_read(uint64_t* cur_offset, static void prepare_read(uint64_t* cur_offset,
rd_action_read_t action, rd_action_read_t action,
void** ptr, void** ptr,
size_t* len) size_t* len);
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op) void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op)
{ {
if(read_op->use_local_pointers == 0) return; if(read_op->ready == 1) return;
if(read_op->num_actions == 0) return; if(read_op->num_actions == 0) return;
rd_action_base_t action; rd_action_base_t action;
...@@ -52,9 +45,24 @@ void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op) ...@@ -52,9 +45,24 @@ void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op)
} }
read_op->use_local_pointers = 0; read_op->ready = 1;
free(pointers); free(pointers);
free(lengths); free(lengths);
} }
////////////////////////////////////////////////////////////////////////////////
// STATIC FUNCTIONS BELOW //
////////////////////////////////////////////////////////////////////////////////
static void prepare_read(uint64_t* cur_offset,
rd_action_read_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
...@@ -9,6 +9,13 @@ ...@@ -9,6 +9,13 @@
#include <margo.h> #include <margo.h>
#include "libmobject-store.h" #include "libmobject-store.h"
/**
* This function takes a read_op that was created by the client
* and prepares it to be sent to a server. This means creating a bulk
* handle that stiches together all the buffers that the user wants to use
* as a destination, and replacing all pointers in the chain of actions
* by offsets within the resulting hg_bultk_t object.
*/
void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op); void prepare_read_op(margo_instance_id mid, mobject_store_read_op_t read_op);
#endif #endif
...@@ -8,57 +8,29 @@ ...@@ -8,57 +8,29 @@
#include "utlist.h" #include "utlist.h"
#include "log.h" #include "log.h"
static void prepare_write(uint64_t* cur_offset, static void convert_write(uint64_t* cur_offset,
wr_action_write_t action, wr_action_write_t action,
void** ptr, void** ptr,
size_t* len) size_t* len);
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void prepare_write_full(uint64_t* cur_offset, static void convert_write_full(uint64_t* cur_offset,
wr_action_write_full_t action, wr_action_write_full_t action,
void** ptr, void** ptr,
size_t* len) size_t* len);
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void prepare_write_same(uint64_t* cur_offset, static void convert_write_same(uint64_t* cur_offset,
wr_action_write_same_t action, wr_action_write_same_t action,
void** ptr, void** ptr,
size_t* len) size_t* len);
{
uint64_t pos = *cur_offset;
*cur_offset += action->data_len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->data_len;
action->buffer.as_offset = pos;
}
static void prepare_append(uint64_t* cur_offset, static void convert_append(uint64_t* cur_offset,
wr_action_append_t action, wr_action_append_t action,
void** ptr, void** ptr,
size_t* len) size_t* len);
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op) void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
{ {
if(write_op->use_local_pointers == 0) return; if(write_op->ready == 1) return;
if(write_op->num_actions == 0) return; if(write_op->num_actions == 0) return;
wr_action_base_t action; wr_action_base_t action;
...@@ -72,22 +44,22 @@ void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op) ...@@ -72,22 +44,22 @@ void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
switch(action->type) { switch(action->type) {
case WRITE_OPCODE_WRITE: case WRITE_OPCODE_WRITE:
prepare_write(&current_offset, convert_write(&current_offset,
(wr_action_write_t)action, pointers+i, lengths+i); (wr_action_write_t)action, pointers+i, lengths+i);
i += 1; i += 1;
break; break;
case WRITE_OPCODE_WRITE_FULL: case WRITE_OPCODE_WRITE_FULL:
prepare_write_full(&current_offset, convert_write_full(&current_offset,
(wr_action_write_full_t)action, pointers+i, lengths+i); (wr_action_write_full_t)action, pointers+i, lengths+i);
i += 1; i += 1;
break; break;
case WRITE_OPCODE_WRITE_SAME: case WRITE_OPCODE_WRITE_SAME:
prepare_write_same(&current_offset, convert_write_same(&current_offset,
(wr_action_write_same_t)action, pointers+i, lengths+i); (wr_action_write_same_t)action, pointers+i, lengths+i);
i += 1; i += 1;
break; break;
case WRITE_OPCODE_APPEND: case WRITE_OPCODE_APPEND:
prepare_append(&current_offset, convert_append(&current_offset,
(wr_action_append_t)action, pointers+i, lengths+i); (wr_action_append_t)action, pointers+i, lengths+i);
i += 1; i += 1;
break; break;
...@@ -103,6 +75,57 @@ void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op) ...@@ -103,6 +75,57 @@ void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
} }
write_op->use_local_pointers = 0; write_op->ready = 1;
}
////////////////////////////////////////////////////////////////////////////////
// STATIC FUNCTIONS BELOW //
////////////////////////////////////////////////////////////////////////////////
static void convert_write(uint64_t* cur_offset,
wr_action_write_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void convert_write_full(uint64_t* cur_offset,
wr_action_write_full_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void convert_write_same(uint64_t* cur_offset,
wr_action_write_same_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->data_len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->data_len;
action->buffer.as_offset = pos;
} }
static void convert_append(uint64_t* cur_offset,
wr_action_append_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
...@@ -9,6 +9,13 @@ ...@@ -9,6 +9,13 @@
#include <margo.h> #include <margo.h>
#include "libmobject-store.h" #include "libmobject-store.h"
/**
* This function takes a read_op that was created by the client
* and prepares it to be sent to a server. This means creating a bulk
* handle that stiches together all the buffers that the user wants to use
* as a destination, and replacing all pointers in the chain of actions
* by offsets within the resulting hg_bultk_t object.
*/
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op); void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op);
#endif #endif
...@@ -21,6 +21,151 @@ ...@@ -21,6 +21,151 @@
typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*); typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*); typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);
static hg_return_t encode_read_action_stat(hg_proc_t proc,
uint64_t* pos,
rd_action_stat_t action);
static hg_return_t decode_read_action_stat(hg_proc_t proc,
uint64_t* pos,
rd_action_stat_t* action);
static hg_return_t encode_read_action_read(hg_proc_t proc,
uint64_t* pos,
rd_action_read_t action);
static hg_return_t decode_read_action_read(hg_proc_t proc,
uint64_t* pos,
rd_action_read_t* action);
static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_keys_t action);
static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_keys_t* action);
static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_vals_t action);
static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_vals_t* action);
static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_vals_by_keys_t action);
static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
uint64_t* pos,
rd_action_omap_get_vals_by_keys_t* action);
/**
* The following two arrays are here to avoid a big switch.
*/
/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(encode_fn)encode_read_action_stat,
(encode_fn)encode_read_action_read,
(encode_fn)encode_read_action_omap_get_keys,
(encode_fn)encode_read_action_omap_get_vals,
(encode_fn)encode_read_action_omap_get_vals_by_keys
};
/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(decode_fn)decode_read_action_stat,
(decode_fn)decode_read_action_read,
(decode_fn)decode_read_action_omap_get_keys,
(decode_fn)decode_read_action_omap_get_vals,
(decode_fn)decode_read_action_omap_get_vals_by_keys
};
/**
* Serialization function for mobject_store_read_op_t objects.
* For encoding, the object should be prepared first (that is, the union fields
* pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
*/
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op)
{
rd_action_base_t elem, tmp;
hg_return_t ret = HG_SUCCESS;
uintptr_t position = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
MOBJECT_ASSERT((*read_op)->ready,
"Cannot encode a read_op before it has been prepared");
// encode the bulk handle associated with the series of operations
ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// encode the number of actions
ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
sizeof((*read_op)->num_actions));
if(ret != HG_SUCCESS) return ret;
// for each action ...
DL_FOREACH((*read_op)->actions,elem) {
read_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
"Invalid read_op opcode");
// encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
// encode the action's arguments
ret = encode_read_action[opcode](proc, &position, elem);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
*read_op = mobject_store_create_read_op();
(*read_op)->ready = 1;
// decode the bulk handle
ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// decode the number of actions
ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
sizeof((*read_op)->num_actions));
if(ret != HG_SUCCESS) return ret;
rd_action_base_t next_action;
size_t i;
for(i = 0; i < (*read_op)->num_actions; i++) {
// decode the current action's type
read_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the action's arguments
ret = decode_read_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret;
// append to the list
DL_APPEND((*read_op)->actions, next_action);
}
break;
case HG_FREE:
mobject_store_release_read_op(*read_op);
return HG_SUCCESS;
}
return ret;
}
////////////////////////////////////////////////////////////////////////////////
// STATIC FUNCTIONS BELOW //
////////////////////////////////////////////////////////////////////////////////
static hg_return_t encode_read_action_stat(hg_proc_t proc, static hg_return_t encode_read_action_stat(hg_proc_t proc,
uint64_t* pos, uint64_t* pos,
rd_action_stat_t action) rd_action_stat_t action)
...@@ -173,104 +318,3 @@ static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc, ...@@ -173,104 +318,3 @@ static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
return ret; return ret;
} }
/**
* The following two arrays are here to avoid a big switch.
*/
/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(encode_fn)encode_read_action_stat,
(encode_fn)encode_read_action_read,
(encode_fn)encode_read_action_omap_get_keys,
(encode_fn)encode_read_action_omap_get_vals,
(encode_fn)encode_read_action_omap_get_vals_by_keys
};
/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
NULL,
(decode_fn)decode_read_action_stat,
(decode_fn)decode_read_action_read,
(decode_fn)decode_read_action_omap_get_keys,
(decode_fn)decode_read_action_omap_get_vals,
(decode_fn)decode_read_action_omap_get_vals_by_keys
};
/**
* Serialization function for mobject_store_read_op_t objects.
* For encoding, the object should be prepared first (that is, the union fields
* pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
*/
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op)
{
rd_action_base_t elem, tmp;
hg_return_t ret = HG_SUCCESS;
uintptr_t position = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
MOBJECT_ASSERT((*read_op)->use_local_pointers == 0,
"Cannot encode a read_op before it has been prepared");
// encode the bulk handle associated with the series of operations
ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// encode the number of actions
ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
sizeof((*read_op)->num_actions));
if(ret != HG_SUCCESS) return ret;
// for each action ...
DL_FOREACH((*read_op)->actions,elem) {
read_op_code_t opcode = elem->type;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
"Invalid read_op opcode");
// encode the type of action
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
// encode the action's arguments
ret = encode_read_action[opcode](proc, &position, elem);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
*read_op = mobject_store_create_read_op();
(*read_op)->use_local_pointers = 0;
// decode the bulk handle
ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
if(ret != HG_SUCCESS) return ret;
// decode the number of actions
ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
sizeof((*read_op)->num_actions));
if(ret != HG_SUCCESS) return ret;
rd_action_base_t next_action;
size_t i;
for(i = 0; i < (*read_op)->num_actions; i++) {
// decode the current action's type
read_op_code_t opcode;
ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
if(ret != HG_SUCCESS) return ret;
MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
"Invalid write_op opcode");
// decode the action's arguments
ret = decode_read_action[opcode](proc, &position, &next_action);
if(ret != HG_SUCCESS) return ret;
// append to the list
DL_APPEND((*read_op)->actions, next_action);
}
break;
case HG_FREE:
mobject_store_release_read_op(*read_op);
return HG_SUCCESS;
}
return ret;
}
...@@ -9,6 +9,10 @@ ...@@ -9,6 +9,10 @@
#include <margo.h> #include <margo.h>
#include "libmobject-store.h" #include "libmobject-store.h"
/**
* This function is the traditional hg_proc_* function meant to serialize
* a mobject_store_read_op_t object to send it through RPC.
*/
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op); hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op);
#endif #endif
......
...@@ -21,6 +21,201 @@ ...@@ -21,6 +21,201 @@
typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*); typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*); typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);
static hg_return_t encode_write_action_create(hg_proc_t proc,
uint64_t* pos,
wr_action_create_t action);
static hg_return_t decode_write_action_create(hg_proc_t proc,
uint64_t* pos,
wr_action_create_t* action);
static hg_return_t encode_write_action_write(hg_proc_t proc,
uint64_t* pos,
wr_action_write_t action);