Commit 5e17e1eb authored by Matthieu Dorier's avatar Matthieu Dorier

added preparation of write_op chain

parent 5a93ecc6
......@@ -4,10 +4,12 @@ src_libmobject_store_la_SOURCES = \
src/write-op-impl.c \
src/read-op-impl.c \
src/proc-write-actions.c \
src/proc-read-actions.c
src/proc-read-actions.c \
src/prepare-write-op.c
noinst_HEADERS += src/log.h src/completion.h \
src/write-op-impl.h src/write-actions.h \
src/read-op-impl.h src/read-actions.h \
src/utlist.h src/proc-write-actions.h \
src/proc-read-actions.h
src/proc-read-actions.h \
src/prepare-write-op.h
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "prepare-write-op.h"
#include "write-op-impl.h"
#include "utlist.h"
#include "log.h"
static void prepare_write(uint64_t* cur_offset,
wr_action_write_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void prepare_write_full(uint64_t* cur_offset,
wr_action_write_full_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
static void prepare_write_same(uint64_t* cur_offset,
wr_action_write_same_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->data_len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->data_len;
action->buffer.as_offset = pos;
}
static void prepare_append(uint64_t* cur_offset,
wr_action_append_t action,
void** ptr,
size_t* len)
{
uint64_t pos = *cur_offset;
*cur_offset += action->len;
*ptr = (void*)action->buffer.as_pointer;
*len = action->len;
action->buffer.as_offset = pos;
}
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op)
{
if(write_op->use_local_pointers == 0) return;
if(write_op->num_actions == 0) return;
wr_action_base_t action;
void** pointers = (void**)calloc(write_op->num_actions, sizeof(void*));
size_t* lengths = (size_t*)calloc(write_op->num_actions, sizeof(size_t));
uint64_t current_offset = 0;
size_t i = 0;
DL_FOREACH(write_op->actions, action) {
switch(action->type) {
case WRITE_OPCODE_WRITE:
prepare_write(&current_offset,
(wr_action_write_t)action, pointers+i, lengths+i);
i += 1;
break;
case WRITE_OPCODE_WRITE_FULL:
prepare_write_full(&current_offset,
(wr_action_write_full_t)action, pointers+i, lengths+i);
i += 1;
break;
case WRITE_OPCODE_WRITE_SAME:
prepare_write_same(&current_offset,
(wr_action_write_same_t)action, pointers+i, lengths+i);
i += 1;
break;
case WRITE_OPCODE_APPEND:
prepare_append(&current_offset,
(wr_action_append_t)action, pointers+i, lengths+i);
i += 1;
break;
}
}
uint32_t count = i;
if(count != 0) {
hg_return_t ret = margo_bulk_create(mid, count,
pointers, lengths, HG_BULK_READ_ONLY,
&(write_op->bulk_handle));
// TODO handle error
}
write_op->use_local_pointers = 0;
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_PREPARE_WRITE_OP_H
#define __MOBJECT_PREPARE_WRITE_OP_H
#include <margo.h>
#include "libmobject-store.h"
void prepare_write_op(margo_instance_id mid, mobject_store_write_op_t write_op);
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment