Commit b2e8638d authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-new-bake-api' into 'master'

Dev new bake api

See merge request !3
parents a21fb678 8f187425
......@@ -17,6 +17,10 @@ AM_CXXFLAGS = -std=c++14 $(AM_CFLAGS)
SERVER_LIBS = @SERVER_LIBS@
CLIENT_LIBS = @CLIENT_LIBS@
SERVER_CPPFLAGS = @SERVER_CPPFLAGS@
CLIENT_CPPFLAGS = @CLIENT_CPPFLAGS@
SERVER_CFLAGS = @SERVER_CFLAGS@
CLIENT_CFLAGS = @CLIENT_CFLAGS@
lib_LTLIBRARIES = \
src/client/libmobject-store.la \
......
......@@ -91,11 +91,17 @@ SERVER_LIBS="$PMEM_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$PMEM_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$PMEM_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKE],[bake-bulk-server],[],
AC_MSG_ERROR([Could not find working BAKE installation!]) )
SERVER_LIBS="$BAKE_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKE_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKE_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKESERVER],[bake-server],[],
AC_MSG_ERROR([Could not find working BAKE server installation!]) )
SERVER_LIBS="$BAKESERVER_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKESERVER_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKESERVER_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKECLIENT],[bake-client],[],
AC_MSG_ERROR([Could not find working BAKE client installation!]) )
SERVER_LIBS="$BAKECLIENT_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKECLIENT_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKECLIENT_CFLAGS $SERVER_CFLAGS"
# check that SSG was compiled with MPI support
AC_CHECK_LIB([ssg], [ssg_group_create_mpi],
......@@ -123,5 +129,9 @@ AM_CONDITIONAL(HAVE_RADOS, test x"$with_rados" == "xyes")
AC_SUBST(SERVER_LIBS)
AC_SUBST(CLIENT_LIBS)
AC_SUBST(SERVER_CPPFLAGS)
AC_SUBST(CLIENT_CPPFLAGS)
AC_SUBST(SERVER_CFLAGS)
AC_SUBST(CLIENT_CFLAGS)
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
......@@ -14,6 +14,7 @@ extern "C" {
#include <inttypes.h>
#include <time.h>
#include <mobject-client.h>
// derived from: http://docs.ceph.com/docs/master/mobject_store/api/libmobject_store/
......@@ -110,8 +111,9 @@ typedef struct mobject_store_ioctx *mobject_store_ioctx_t;
* Used with mobject_store_read_op_omap_get_keys(), mobject_store_read_op_omap_get_vals(),
* mobject_store_read_op_omap_get_vals_by_keys(), mobject_store_omap_get_next(), and
* mobject_store_omap_get_end().
*
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
/**
* @typedef mobject_store_write_op_t
......@@ -133,10 +135,9 @@ typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
* mobject_store_write_op_truncate(), mobject_store_write_op_zero(), mobject_store_write_op_cmpext()
* - Hints: mobject_store_write_op_set_alloc_hint()
* - Performing the operation: mobject_store_write_op_operate(), mobject_store_aio_write_op_operate()
*
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_write_op *mobject_store_write_op_t;
#define MOBJECT_WRITE_OP_NULL ((mobject_store_write_op_t)0)
/**
* @typedef mobject_store_read_op_t
......@@ -157,10 +158,8 @@ typedef struct mobject_store_write_op *mobject_store_write_op_t;
* - Request properties: mobject_store_read_op_set_flags()
* - Performing the operation: mobject_store_read_op_operate(),
* mobject_store_aio_read_op_operate()
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_read_op *mobject_store_read_op_t;
#define MOBJECT_READ_OP_NULL ((mobject_store_read_op_t)0)
/**
* @typedef mobject_store_completion_t
......@@ -479,10 +478,10 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
* @param prval where to store the return value from this action
*/
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Start iterating over key/value pairs on an object.
......
/*
* (C) 2018 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_CLIENT_H
#define __MOBJECT_CLIENT_H
#include <stdint.h>
#include <margo.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct mobject_client* mobject_client_t;
typedef struct mobject_provider_handle* mobject_provider_handle_t;
typedef struct mobject_store_write_op *mobject_store_write_op_t;
typedef struct mobject_store_read_op *mobject_store_read_op_t;
typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
typedef struct mobject_store_completion* mobject_store_completion_t;
typedef struct mobject_request* mobject_request_t;
#define MOBJECT_CLIENT_NULL ((mobject_client_t)NULL)
#define MOBJECT_PROVIDER_HANDLE_NULL ((mobject_provider_handle_t)NULL)
#define MOBJECT_WRITE_OP_NULL ((mobject_store_write_op_t)NULL)
#define MOBJECT_READ_OP_NULL ((mobject_store_read_op_t)NULL)
#define MOBJECT_REQUEST_NULL ((mobject_request_t)NULL)
/**
* Creates a Mobject client attached to the given margo instance.
* This will effectively register the RPC needed by BAKE into
* the margo instance. The client must be freed with
* mobject_client_finalize.
*
* @param[in] mid margo instance
* @param[out] client resulting mobject client object
*
* @return 0 on success, -1 on failure
*/
int mobject_client_init(margo_instance_id mid, mobject_client_t* client);
/**
* Finalizes a Mobject client.
* WARNING: This function must not be called after Margo has been
* finalized. If you need to finalize a Mobject client when Margo is
* finalized, use margo_push_finalize_callback.
*
* @param client Mobject client to destroy
*
* @return 0 on success, -1 on failure
*/
int mobject_client_finalize(mobject_client_t client);
/**
* Creates a provider handle to point to a particular Mobject provider.
*
* @param client client managing the provider handle
* @param addr address of the provider
* @param mplex_id multiplex id of the provider
* @param handle resulting handle
*
* @return 0 on success, -1 on failure
*/
int mobject_provider_handle_create(
mobject_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
mobject_provider_handle_t* handle);
/**
* Increment the reference counter of the provider handle
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
*/
int mobject_provider_handle_ref_incr(mobject_provider_handle_t handle);
/**
* Decrement the reference counter of the provider handle,
* effectively freeing the provider handle when the reference count
* is down to 0.
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
*/
int mobject_provider_handle_release(mobject_provider_handle_t handle);
int mobject_shutdown(mobject_client_t client, hg_addr_t addr);
/**
* Create a new mobject_store_write_op_t write operation.
* This will store all actions to be performed atomically.
* You must call mobject_store_release_write_op when you are
* finished with it.
*
* @returns non-NULL on success, NULL on memory allocation error.
*/
mobject_store_write_op_t mobject_create_write_op(void);
/**
* Free a mobject_store_write_op_t, must be called when you're done with it.
* @param write_op operation to deallocate
*/
void mobject_release_write_op(mobject_store_write_op_t write_op);
/**
* Create the object
* @param write_op operation to add this action to
* @param exclusive set to either LIBMOBJECT_CREATE_EXCLUSIVE or
* LIBMOBJECT_CREATE_IDEMPOTENT
* will error if the object already exists.
* @param category category string (DEPRECATED, HAS NO EFFECT)
*/
void mobject_write_op_create(
mobject_store_write_op_t write_op,
int exclusive,
const char* category);
/**
* Write to offset
* @param write_op operation to add this action to
* @param offset offset to write to
* @param buffer bytes to write
* @param len length of buffer
*/
void mobject_write_op_write(
mobject_store_write_op_t write_op,
const char *buffer,
uint64_t offset,
size_t len);
/**
* Write whole object, atomically replacing it.
* @param write_op operation to add this action to
* @param buffer bytes to write
* @param len length of buffer
*/
void mobject_write_op_write_full(
mobject_store_write_op_t write_op,
const char *buffer,
size_t len);
/**
* Write the same buffer multiple times
* @param write_op operation to add this action to
* @param buffer bytes to write
* @param data_len length of buffer
* @param write_len total number of bytes to write, as a multiple of @data_len
* @param offset offset to write to
*/
void mobject_write_op_write_same(
mobject_store_write_op_t write_op,
const char *buffer,
uint64_t offset,
size_t data_len,
size_t write_len);
/**
* Append to end of object.
* @param write_op operation to add this action to
* @param buffer bytes to write
* @param len length of buffer
*/
void mobject_write_op_append(
mobject_store_write_op_t write_op,
const char *buffer,
size_t len);
/**
* Remove object
* @param write_op operation to add this action to
*/
void mobject_write_op_remove(mobject_store_write_op_t write_op);
/**
* Truncate an object
* @param write_op operation to add this action to
* @param offset Offset to truncate to
*/
void mobject_write_op_truncate(
mobject_store_write_op_t write_op,
uint64_t offset);
/**
* Zero part of an object
* @param write_op operation to add this action to
* @param offset Offset to zero
* @param len length to zero
*/
void mobject_write_op_zero(
mobject_store_write_op_t write_op,
uint64_t offset,
uint64_t len);
/**
* Set key/value pairs on an object
*
* @param write_op operation to add this action to
* @param keys array of null-terminated char arrays representing keys to set
* @param vals array of pointers to values to set
* @param lens array of lengths corresponding to each value
* @param num number of key/value pairs to set
*/
void mobject_write_op_omap_set(
mobject_store_write_op_t write_op,
char const* const* keys,
char const* const* vals,
const size_t *lens,
size_t num);
/**
* Remove key/value pairs from an object
*
* @param write_op operation to add this action to
* @param keys array of null-terminated char arrays representing keys to remove
* @param keys_len number of key/value pairs to remove
*/
void mobject_write_op_omap_rm_keys(
mobject_store_write_op_t write_op,
char const* const* keys,
size_t keys_len);
/**
* Perform a write operation synchronously
* @param write_op operation to perform
* @param pool_name the name of the pool in which to write
* @param oid the object id
* @param mtime the time to set the mtime to, NULL for the current time
* @param flags flags to apply to the entire operation (LIBMOBJECT_OPERATION_*)
*/
int mobject_write_op_operate(
mobject_provider_handle_t handle,
mobject_store_write_op_t write_op,
const char* pool_name,
const char *oid,
time_t *mtime,
int flags);
/**
* Perform a write operation asynchronously
* @param write_op operation to perform
* @param pool_name the name of the pool in which to write
* @param oid the object id
* @param mtime the time to set the mtime to, NULL for the current time
* @param flags flags to apply to the entire operation (LIBMOBJECT_OPERATION_*)
* @param req resulting request
*/
int mobject_aio_write_op_operate(
mobject_provider_handle_t handle,
mobject_store_write_op_t write_op,
const char* pool_name,
const char *oid,
time_t *mtime,
int flags,
mobject_request_t* req);
/**
* Create a new mobject_store_read_op_t write operation. This will store all
* actions to be performed atomically. You must call
* mobject_store_release_read_op when you are finished with it (after it
* completes, or you decide not to send it in the first place).
*
* @returns non-NULL on success, NULL on memory allocation error.
*/
mobject_store_read_op_t mobject_create_read_op(void);
/**
* Free a mobject_store_read_op_t, must be called when you're done with it.
* @param read_op operation to deallocate, created with mobject_create_read_op
*/
void mobject_release_read_op(mobject_store_read_op_t read_op);
/**
* Get object size and mtime
* @param read_op operation to add this action to
* @param psize where to store object size
* @param pmtime where to store modification time
* @param prval where to store the return value of this action
*/
void mobject_read_op_stat(
mobject_store_read_op_t read_op,
uint64_t *psize,
time_t *pmtime,
int *prval);
/**
* Read bytes from offset into buffer.
*
* prlen will be filled with the number of bytes read if successful.
* A short read can only occur if the read reaches the end of the
* object.
*
* @param read_op operation to add this action to
* @param offset offset to read from
* @param len length of buffer
* @param buffer where to put the data
* @param bytes_read where to store the number of bytes read by this action
* @param prval where to store the return value of this action
*/
void mobject_read_op_read(
mobject_store_read_op_t read_op,
char *buffer,
uint64_t offset,
size_t len,
size_t *bytes_read,
int *prval);
/**
* Start iterating over keys on an object.
*
* They will be returned sorted by key, and the iterator
* will fill in NULL for all values if specified.
*
* @param read_op operation to add this action to
* @param start_after list keys starting after start_after
* @param max_return list no more than max_return keys
* @param iter where to store the iterator
* @param prval where to store the return value from this action
*/
void mobject_read_op_omap_get_keys(
mobject_store_read_op_t read_op,
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Start iterating over key/value pairs on an object.
*
* They will be returned sorted by key.
*
* @param read_op operation to add this action to
* @param start_after list keys starting after start_after
* @param filter_prefix list only keys beginning with filter_prefix
* @param max_return list no more than max_return key/value pairs
* @param iter where to store the iterator
* @param prval where to store the return value from this action
*/
void mobject_read_op_omap_get_vals(
mobject_store_read_op_t read_op,
const char *start_after,
const char *filter_prefix,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Start iterating over specific key/value pairs
*
* They will be returned sorted by key.
*
* @param read_op operation to add this action to
* @param keys array of pointers to null-terminated keys to get
* @param keys_len the number of strings in keys
* @param iter where to store the iterator
* @param prval where to store the return value from this action
*/
void mobject_read_op_omap_get_vals_by_keys(
mobject_store_read_op_t read_op,
char const* const* keys,
size_t keys_len,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Perform a read operation synchronously
* @param read_op operation to perform
* @param pool_name the pool that the object is in
* @param oid the object id
* @param flags flags to apply to the entire operation (LIBMOBJECT_OPERATION_*)
*/
int mobject_read_op_operate(
mobject_provider_handle_t handle,
mobject_store_read_op_t read_op,
const char *pool_name,
const char *oid,
int flags);
/**
* Perform a read operation asynchronously
* @param read_op operation to perform
* @param pool_name the pool that the object is in
* @param completion what to do when operation has been attempted
* @param oid the object id
* @param flags flags to apply to the entire operation (LIBMOBJECT_OPERATION_*)
*/
int mobject_aio_read_op_operate(
mobject_provider_handle_t handle,
mobject_store_read_op_t read_op,
const char *pool_name,
const char *oid,
int flags,
mobject_request_t* req);
int mobject_aio_wait(mobject_request_t req, int* ret);
int mobject_aio_test(mobject_request_t req, int* flag);
#ifdef __cplusplus
}
#endif
#endif /* __MOBJECT_CLIENT_H */
......@@ -8,30 +8,33 @@
#define MOBJECT_SERVER_H
#include <margo.h>
#include <bake-client.h>
/* server-side utilities and routines. Clients are looking for either
* libmobject-store.h or librados-mobject-store.h */
#define MOBJECT_SERVER_GROUP_NAME "mobject-store-servers"
#define MOBJECT_ABT_POOL_DEFAULT ABT_POOL_NULL
typedef struct mobject_server_context* mobject_provider_t;
/**
* Start a mobject server instance
*
* @param[in] mid margo instance id
* @param[in] mplex_id multiplex id of the provider
* @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] cluster_file file name to write cluster connect info to
* @returns 0 on success, negative error code on failure
*/
int mobject_server_init(margo_instance_id mid, const char *cluster_file);
/**
* Shutdown a mobject server instance
* @param[out] provider resulting provider
*
* @param[in] mid margo instance id
*/
void mobject_server_shutdown(margo_instance_id mid);
/**
* Wait for a mobject server instance to get a shutdown request.
* @returns 0 on success, negative error code on failure
*/
void mobject_server_wait_for_shutdown(void);
int mobject_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
ABT_pool pool,
bake_provider_handle_t bake_ph,
const char *cluster_file,
mobject_provider_t* provider);
#endif
noinst_HEADERS += \
src/client/cluster.h \
src/client/mobject-client-impl.h \
src/client/aio/completion.h \
src/io-chain/args-read-actions.h \
src/io-chain/args-write-actions.h \
......@@ -20,8 +21,8 @@ noinst_HEADERS += \
src/omap-iter/proc-omap-iter.h \
src/rpc-types/read-op.h \
src/rpc-types/write-op.h \
src/server/print-read-op.h\
src/server/print-write-op.h \
src/server/printer/print-read-op.h\
src/server/printer/print-write-op.h \
src/util/buffer-union.h \
src/util/log.h \
src/util/utlist.h
......@@ -45,11 +46,13 @@ src_io_chain_libio_chain_la_SOURCES = src/io-chain/prepare-read-op.c \
src_client_libmobject_store_la_SOURCES = \
src/client/mobject-client.c \
src/client/cluster.c \
src/client/read-op.c \
src/client/write-op.c \
src/client/omap-iter.c \
src/client/aio/completion.c \
src/client/aio/aio-cluster-operate.c \
src/client/aio/aio-operate.c
src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
......@@ -60,15 +63,20 @@ src_server_libmobject_server_la_SOURCES = \
src/server/fake/fake-write-op.cpp \
src/server/fake/fake-read-op.cpp \
src/server/fake/fake-db.cpp \
src/server/print-write-op.c \
src/server/print-read-op.c
src/server/core/core-write-op.cpp \
src/server/core/core-read-op.cpp \
src/server/printer/print-write-op.c \
src/server/printer/print-read-op.c \
src/server/core/fake-kv.cpp
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${SERVER_LIBS}
src_server_mobject_server_daemon_SOURCES = \
src/server/mobject-server-daemon.c
src_server_mobject_server_daemon_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_mobject_server_daemon_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_mobject_server_daemon_LDADD = \
src/server/libmobject-server.la ${SERVER_LIBS}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <ssg.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
int mobject_store_aio_write_op_operate(
mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_write_op_operate(mph, write_op, io->pool_name, oid, mtime, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_read_op_operate(mph, read_op, io->pool_name, oid, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
return 0;
}
......@@ -7,84 +7,172 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/mobject-client-impl.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
// global variables for RPC ids, defined in client/cluster.c
extern hg_id_t mobject_write_op_rpc_id;
extern hg_id_t mobject_read_op_rpc_id;
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
int mobject_aio_write_op_operate(
mobject_provider_handle_t mph,
mobject_store_write_op_t write_op,
const char *pool_name,
const char *oid,
time_t *mtime,
int flags)
int flags,
mobject_request_t* req)
{
hg_return_t ret;
write_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.pool_name = pool_name;
in.write_op = write_op;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
prepare_write_op(mph->client->mid, write_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_write_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_WRITE_COMPLETION;
completion->op.write_op = write_op;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_write_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_aio_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
margo_request mreq;
ret = margo_iforward(h, &in, &mreq);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");