Commit 38fa8f63 authored by Matthieu Dorier's avatar Matthieu Dorier

done adding bulk write

parent 5d3c8d70
......@@ -62,6 +62,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
free(cluster_handle);
return -1;
}
(*cluster)->my_address = NULL;
/* set the returned cluster handle */
*cluster = cluster_handle;
......@@ -99,6 +100,7 @@ int mobject_store_connect(mobject_store_t cluster)
proto[i] = svr_addr_str[i];
/* intialize margo */
fprintf(stderr,"Client initialized with proto = %s\n",proto);
/* XXX: probably want to expose some way of tweaking threading parameters */
cluster_handle->mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (cluster_handle->mid == MARGO_INSTANCE_NULL)
......@@ -140,6 +142,17 @@ int mobject_store_connect(mobject_store_t cluster)
}
cluster_handle->connected = 1;
/* get client's address */
{
hg_addr_t my_addr;
margo_addr_self(cluster_handle->mid, &my_addr);
hg_size_t addr_str_size;
margo_addr_to_string(cluster_handle->mid, NULL, &addr_str_size, my_addr);
cluster_handle->my_address = calloc(1,addr_str_size+1);
margo_addr_to_string(cluster_handle->mid, (char*)(cluster_handle->my_address), &addr_str_size, my_addr);
margo_addr_free(cluster_handle->mid, my_addr);
}
free(svr_addr_str);
return 0;
......@@ -173,6 +186,7 @@ void mobject_store_shutdown(mobject_store_t cluster)
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid);
free((char*)cluster_handle->my_address);
free(cluster_handle);
return;
......@@ -223,6 +237,7 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
in.object_name = oid;
in.pool_name = io->pool_name;
in.write_op = write_op;
in.client_addr = io->cluster->my_address;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
......@@ -260,6 +275,7 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
in.object_name = oid;
in.pool_name = ioctx->pool_name;
in.read_op = read_op;
in.client_addr = ioctx->cluster->my_address;
prepare_read_op(ioctx->cluster->mid, read_op);
......
......@@ -17,6 +17,7 @@ struct mobject_store_handle
{
margo_instance_id mid;
ssg_group_id_t gid;
const char* my_address;
int connected;
};
......
......@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(read_op_in_t,
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(client_addr))\
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(object_name))\
((mobject_store_read_op_t)(read_op)))
......
......@@ -10,7 +10,8 @@
#include "src/io-chain/proc-read-responses.h"
MERCURY_GEN_PROC(write_op_in_t,
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(client_addr))\
((hg_const_string_t)(pool_name))\
((hg_const_string_t)(object_name))\
((mobject_store_write_op_t)(write_op)))
......
......@@ -60,7 +60,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr;
const char* remote_addr = vargs->client_addr.as_string;
int ret;
// TODO: check return values of those calls
......@@ -77,7 +77,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr;
const char* remote_addr = vargs->client_addr.as_string;
int ret;
// TODO: check return values of those calls
......@@ -94,7 +94,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr;
const char* remote_addr = vargs->client_addr.as_string;
int ret;
// TODO: check return values of those calls
......@@ -111,7 +111,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
hg_addr_t remote_addr = vargs->client_addr;
const char* remote_addr = vargs->client_addr.as_string;
int ret;
// TODO: check return values of those calls
......
......@@ -60,7 +60,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
return;
}
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].read(mid, vargs->client_addr, vargs->bulk_handle,
fake_db[name].read(mid, vargs->client_addr.as_handle, vargs->bulk_handle,
buf.as_offset, offset, len, bytes_read);
*prval = 0;
}
......
......@@ -70,7 +70,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
std::cerr << "[FAKE-BACKEND-WARNING] (write) Object " << name << " does not exist, it will be created" << std::endl;
}
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, len);
fake_db[name].write(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, offset, len);
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
......@@ -81,7 +81,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
std::cerr << "[FAKE-BACKEND-WARNING] (write_full) Object " << name << " does not exist, it will be created" << std::endl;
}
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write_full(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
fake_db[name].write_full(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
......@@ -92,7 +92,7 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
std::cerr << "[FAKE-BACKEND-WARNING] (writesame) Object " << name << " does not exist, it will be created" << std::endl;
}
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].writesame(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
fake_db[name].writesame(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
......@@ -103,7 +103,7 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
std::cerr << "[FAKE-BACKEND-WARNING] (append) Object " << name << " does not exist, it will be created" << std::endl;
}
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].append(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
fake_db[name].append(mid, vargs->client_addr.as_handle, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_remove(void* u)
......
......@@ -214,12 +214,15 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data(mid, info->id);
vargs.client_addr = info->addr;
vargs.client_addr_type = MOBJECT_ADDR_STRING;
vargs.client_addr.as_string = in.client_addr;
vargs.bulk_handle = in.write_op->bulk_handle;
/* Execute the operation chain */
//print_write_op(in.write_op, in.object_name);
#ifdef FAKE_CPP_SERVER
vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
vargs.client_addr.as_handle = info->addr;
fake_write_op(in.write_op, &vargs);
#else
core_write_op(in.write_op, &vargs);
......@@ -264,12 +267,15 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data(mid,info->id);
vargs.client_addr = info->addr;
vargs.client_addr_type = MOBJECT_ADDR_STRING;
vargs.client_addr.as_string = in.client_addr;
vargs.bulk_handle = in.read_op->bulk_handle;
/* Compute the result. */
//print_read_op(in.read_op, in.object_name);
#ifdef FAKE_CPP_SERVER
vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
vargs.client_addr.as_handle = info->addr;
fake_read_op(in.read_op, &vargs);
#else
core_read_op(in.read_op, &vargs);
......
......@@ -9,12 +9,21 @@
extern "C" {
#endif
typedef enum {
MOBJECT_ADDR_STRING,
MOBJECT_ADDR_HANDLE
} addr_str_type_t ;
typedef struct {
const char* object_name;
const char* pool_name;
struct mobject_server_context* srv_ctx;
hg_addr_t client_addr;
union {
const char* as_string;
hg_addr_t as_handle;
} client_addr;
hg_bulk_t bulk_handle;
addr_str_type_t client_addr_type;
} server_visitor_args;
typedef server_visitor_args* server_visitor_args_t;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment