Commit 2e9ae13d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

done adding bulk write

parent e800fbdf
...@@ -62,6 +62,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -62,6 +62,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
free(cluster_handle); free(cluster_handle);
return -1; return -1;
} }
(*cluster)->my_address = NULL;
/* set the returned cluster handle */ /* set the returned cluster handle */
*cluster = cluster_handle; *cluster = cluster_handle;
...@@ -99,6 +100,7 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -99,6 +100,7 @@ int mobject_store_connect(mobject_store_t cluster)
proto[i] = svr_addr_str[i]; proto[i] = svr_addr_str[i];
/* intialize margo */ /* intialize margo */
fprintf(stderr,"Client initialized with proto = %s\n",proto);
/* XXX: probably want to expose some way of tweaking threading parameters */ /* XXX: probably want to expose some way of tweaking threading parameters */
cluster_handle->mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1); cluster_handle->mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (cluster_handle->mid == MARGO_INSTANCE_NULL) if (cluster_handle->mid == MARGO_INSTANCE_NULL)
...@@ -140,6 +142,17 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -140,6 +142,17 @@ int mobject_store_connect(mobject_store_t cluster)
} }
cluster_handle->connected = 1; cluster_handle->connected = 1;
/* get client's address */
{
hg_addr_t my_addr;
margo_addr_self(cluster_handle->mid, &my_addr);
hg_size_t addr_str_size;
margo_addr_to_string(cluster_handle->mid, NULL, &addr_str_size, my_addr);
cluster_handle->my_address = calloc(1,addr_str_size+1);
margo_addr_to_string(cluster_handle->mid, (char*)(cluster_handle->my_address), &addr_str_size, my_addr);
margo_addr_free(cluster_handle->mid, my_addr);
}
free(svr_addr_str); free(svr_addr_str);
return 0; return 0;
...@@ -173,6 +186,7 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -173,6 +186,7 @@ void mobject_store_shutdown(mobject_store_t cluster)
ssg_finalize(); ssg_finalize();
margo_finalize(cluster_handle->mid); margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid); ssg_group_id_free(cluster_handle->gid);
free((char*)cluster_handle->my_address);
free(cluster_handle); free(cluster_handle);
return; return;
...@@ -223,6 +237,7 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op, ...@@ -223,6 +237,7 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
in.object_name = oid; in.object_name = oid;
in.pool_name = io->pool_name; in.pool_name = io->pool_name;
in.write_op = write_op; in.write_op = write_op;
in.client_addr = io->cluster->my_address;
// TODO take mtime into account // TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op); prepare_write_op(io->cluster->mid, write_op);
...@@ -260,6 +275,7 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, ...@@ -260,6 +275,7 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
in.object_name = oid; in.object_name = oid;
in.pool_name = ioctx->pool_name; in.pool_name = ioctx->pool_name;
in.read_op = read_op; in.read_op = read_op;
in.client_addr = ioctx->cluster->my_address;
prepare_read_op(ioctx->cluster->mid, read_op); prepare_read_op(ioctx->cluster->mid, read_op);
......
...@@ -17,6 +17,7 @@ struct mobject_store_handle ...@@ -17,6 +17,7 @@ struct mobject_store_handle
{ {
margo_instance_id mid; margo_instance_id mid;
ssg_group_id_t gid; ssg_group_id_t gid;
const char* my_address;
int connected; int connected;
}; };
......
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h" #include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(read_op_in_t, MERCURY_GEN_PROC(read_op_in_t,
((hg_const_string_t)(pool_name))\ ((hg_const_string_t)(client_addr))\
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(object_name))\ ((hg_const_string_t)(object_name))\
((mobject_store_read_op_t)(read_op))) ((mobject_store_read_op_t)(read_op)))
......
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h" #include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(write_op_in_t, MERCURY_GEN_PROC(write_op_in_t,
((hg_const_string_t)(pool_name))\ ((hg_const_string_t)(client_addr))\
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(object_name))\ ((hg_const_string_t)(object_name))\
((mobject_store_write_op_t)(write_op))) ((mobject_store_write_op_t)(write_op)))
......
...@@ -60,7 +60,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ...@@ -60,7 +60,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
bake_target_id_t bti = vargs->srv_ctx->bake_id; bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid; bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle; hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr; const char* remote_addr = vargs->client_addr.as_string;
int ret; int ret;
// TODO: check return values of those calls // TODO: check return values of those calls
...@@ -77,7 +77,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len) ...@@ -77,7 +77,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
bake_target_id_t bti = vargs->srv_ctx->bake_id; bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid; bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle; hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr; const char* remote_addr = vargs->client_addr.as_string;
int ret; int ret;
// TODO: check return values of those calls // TODO: check return values of those calls
...@@ -94,7 +94,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ ...@@ -94,7 +94,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
bake_target_id_t bti = vargs->srv_ctx->bake_id; bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid; bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle; hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr; const char* remote_addr = vargs->client_addr.as_string;
int ret; int ret;
// TODO: check return values of those calls // TODO: check return values of those calls
...@@ -111,7 +111,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len) ...@@ -111,7 +111,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
bake_target_id_t bti = vargs->srv_ctx->bake_id; bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid; bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle; hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr; const char* remote_addr = vargs->client_addr.as_string;
int ret; int ret;
// TODO: check return values of those calls // TODO: check return values of those calls
......
...@@ -60,7 +60,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_ ...@@ -60,7 +60,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
return; return;
} }
margo_instance_id mid = vargs->srv_ctx->mid; margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].read(mid, vargs->client_addr, vargs->bulk_handle, fake_db[name].read(mid, vargs->client_addr.as_handle, vargs->bulk_handle,
buf.as_offset, offset, len, bytes_read); buf.as_offset, offset, len, bytes_read);
*prval = 0; *prval = 0;
} }
......
...@@ -70,7 +70,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset) ...@@ -70,7 +70,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
std::cerr << "[FAKE-BACKEND-WARNING] (write) Object " << name << " does not exist, it will be created" << std::endl; std::cerr << "[FAKE-BACKEND-WARNING] (write) Object " << name << " does not exist, it will be created" << std::endl;
} }
margo_instance_id mid = vargs->srv_ctx->mid; margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, len); fake_db[name].write(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, offset, len);
} }
void write_op_exec_write_full(void* u, buffer_u buf, size_t len) void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
...@@ -81,7 +81,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len) ...@@ -81,7 +81,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
std::cerr << "[FAKE-BACKEND-WARNING] (write_full) Object " << name << " does not exist, it will be created" << std::endl; std::cerr << "[FAKE-BACKEND-WARNING] (write_full) Object " << name << " does not exist, it will be created" << std::endl;
} }
margo_instance_id mid = vargs->srv_ctx->mid; margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write_full(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len); fake_db[name].write_full(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, len);
} }
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset) void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
...@@ -92,7 +92,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ ...@@ -92,7 +92,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
std::cerr << "[FAKE-BACKEND-WARNING] (writesame) Object " << name << " does not exist, it will be created" << std::endl; std::cerr << "[FAKE-BACKEND-WARNING] (writesame) Object " << name << " does not exist, it will be created" << std::endl;
} }
margo_instance_id mid = vargs->srv_ctx->mid; margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].writesame(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len); fake_db[name].writesame(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
} }
void write_op_exec_append(void* u, buffer_u buf, size_t len) void write_op_exec_append(void* u, buffer_u buf, size_t len)
...@@ -103,7 +103,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len) ...@@ -103,7 +103,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
std::cerr << "[FAKE-BACKEND-WARNING] (append) Object " << name << " does not exist, it will be created" << std::endl; std::cerr << "[FAKE-BACKEND-WARNING] (append) Object " << name << " does not exist, it will be created" << std::endl;
} }
margo_instance_id mid = vargs->srv_ctx->mid; margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].append(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len); fake_db[name].append(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, len);
} }
void write_op_exec_remove(void* u) void write_op_exec_remove(void* u)
......
...@@ -214,12 +214,15 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h) ...@@ -214,12 +214,15 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data(mid, info->id); vargs.srv_ctx = margo_registered_data(mid, info->id);
vargs.client_addr = info->addr; vargs.client_addr_type = MOBJECT_ADDR_STRING;
vargs.client_addr.as_string = in.client_addr;
vargs.bulk_handle = in.write_op->bulk_handle; vargs.bulk_handle = in.write_op->bulk_handle;
/* Execute the operation chain */ /* Execute the operation chain */
//print_write_op(in.write_op, in.object_name); //print_write_op(in.write_op, in.object_name);
#ifdef FAKE_CPP_SERVER #ifdef FAKE_CPP_SERVER
vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
vargs.client_addr.as_handle = info->addr;
fake_write_op(in.write_op, &vargs); fake_write_op(in.write_op, &vargs);
#else #else
core_write_op(in.write_op, &vargs); core_write_op(in.write_op, &vargs);
...@@ -264,12 +267,15 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) ...@@ -264,12 +267,15 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data(mid,info->id); vargs.srv_ctx = margo_registered_data(mid,info->id);
vargs.client_addr = info->addr; vargs.client_addr_type = MOBJECT_ADDR_STRING;
vargs.client_addr.as_string = in.client_addr;
vargs.bulk_handle = in.read_op->bulk_handle; vargs.bulk_handle = in.read_op->bulk_handle;
/* Compute the result. */ /* Compute the result. */
//print_read_op(in.read_op, in.object_name); //print_read_op(in.read_op, in.object_name);
#ifdef FAKE_CPP_SERVER #ifdef FAKE_CPP_SERVER
vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
vargs.client_addr.as_handle = info->addr;
fake_read_op(in.read_op, &vargs); fake_read_op(in.read_op, &vargs);
#else #else
core_read_op(in.read_op, &vargs); core_read_op(in.read_op, &vargs);
......
...@@ -9,12 +9,21 @@ ...@@ -9,12 +9,21 @@
extern "C" { extern "C" {
#endif #endif
typedef enum {
MOBJECT_ADDR_STRING,
MOBJECT_ADDR_HANDLE
} addr_str_type_t ;
typedef struct { typedef struct {
const char* object_name; const char* object_name;
const char* pool_name; const char* pool_name;
struct mobject_server_context* srv_ctx; struct mobject_server_context* srv_ctx;
hg_addr_t client_addr; union {
const char* as_string;
hg_addr_t as_handle;
} client_addr;
hg_bulk_t bulk_handle; hg_bulk_t bulk_handle;
addr_str_type_t client_addr_type;
} server_visitor_args; } server_visitor_args;
typedef server_visitor_args* server_visitor_args_t; typedef server_visitor_args* server_visitor_args_t;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment