Commit d2b92151 authored by Matthieu Dorier's avatar Matthieu Dorier

added ch-placement, need to test

parent ffacc8f2
......@@ -21,6 +21,8 @@
#include "src/rpc-types/read-op.h"
#include "src/util/log.h"
static unsigned long sdbm_hash(const char* str);
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
int mobject_store_create(mobject_store_t *cluster, const char * const id)
......@@ -42,7 +44,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
if (!cluster_file)
{
fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
MOBJECT_CLUSTER_FILE_ENV);
MOBJECT_CLUSTER_FILE_ENV);
free(cluster_handle);
return -1;
}
......@@ -51,12 +53,11 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
if (ret != 0)
{
fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
cluster_file);
cluster_file);
free(cluster_handle);
return -1;
}
/* set the returned cluster handle */
*cluster = cluster_handle;
......@@ -133,6 +134,34 @@ int mobject_store_connect(mobject_store_t cluster)
}
cluster_handle->connected = 1;
// get number of servers
int gsize = ssg_get_group_size(cluster_handle->gid);
if(gsize == 0)
{
fprintf(stderr, "Error: Unable to get SSG group size\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
// initialize ch-placement
cluster_handle->ch_instance =
ch_placement_initialize("hash_lookup3", gsize, 4, 0);
if(!cluster_handle->ch_instance)
{
fprintf(stderr, "Error: Unable to initialize ch-placement instance\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
// initialize mobject client
ret = mobject_client_init(mid, &(cluster_handle->mobject_clt));
if(ret != 0)
{
......@@ -166,7 +195,7 @@ void mobject_store_shutdown(mobject_store_t cluster)
if (ret != 0)
{
fprintf(stderr, "Warning: Unable to send shutdown signal \
to mobject server cluster\n");
to mobject server cluster\n");
}
}
......@@ -175,6 +204,7 @@ void mobject_store_shutdown(mobject_store_t cluster)
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid);
ch_placement_finalize(cluster_handle->ch_instance);
free(cluster_handle);
return;
......@@ -197,9 +227,9 @@ int mobject_store_pool_delete(mobject_store_t cluster, const char * pool_name)
}
int mobject_store_ioctx_create(
mobject_store_t cluster,
const char * pool_name,
mobject_store_ioctx_t *ioctx)
mobject_store_t cluster,
const char * pool_name,
mobject_store_ioctx_t *ioctx)
{
*ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
(*ioctx)->pool_name = strdup(pool_name);
......@@ -298,15 +328,18 @@ void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
}
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
const char *oid,
time_t *mtime,
int flags)
mobject_store_ioctx_t io,
const char *oid,
time_t *mtime,
int flags)
{
mobject_provider_handle_t mph;
// TODO chose the target server based on ch-placement
// remember that multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
uint64_t oid_hash = sdbm_hash(oid);
unsigned long server_idx;
ch_placement_find_closest(io->cluster->ch_instance, oid_hash, 1, &server_idx);
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, server_idx);
// TODO for now multiplex id is hard-coded as 1
int r = mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
if(r != 0) return r;
......@@ -373,18 +406,20 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
}
int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t ioctx,
const char *oid,
int flags)
mobject_store_ioctx_t ioctx,
const char *oid,
int flags)
{
mobject_provider_handle_t mph;
// TODO chose the target server based on ch-placement
// remember that multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0);
mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
uint64_t oid_hash = sdbm_hash(oid);
unsigned long server_idx;
ch_placement_find_closest(ioctx->cluster->ch_instance, oid_hash, 1, &server_idx);
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, server_idx);
// TODO for now multiplex id is hard-coded as 1
int r = mobject_provider_handle_create(ioctx->cluster->mobject_clt, svr_addr, 1, &mph);
if(r != 0) return r;
r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags);
mobject_provider_handle_release(mph);
return r;
......@@ -406,3 +441,13 @@ static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_h
return mobject_shutdown(cluster_handle->mobject_clt, svr_addr);
}
static unsigned long sdbm_hash(const char* str)
{
unsigned long hash = 0;
int c;
while (c = *str++)
hash = c + (hash << 6) + (hash << 16) - hash;
return hash;
}
......@@ -8,24 +8,29 @@
#include <margo.h>
#include <ssg.h>
#include <ch-placement.h>
#include "libmobject-store.h"
#include "mobject-client.h"
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
#define MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV "MOBJECT_SHUTDOWN_KILL_SERVERS"
typedef struct ch_placement_instance* chi_t;
struct mobject_store_handle
{
margo_instance_id mid;
mobject_client_t mobject_clt;
ssg_group_id_t gid;
int connected;
margo_instance_id mid;
mobject_client_t mobject_clt;
ssg_group_id_t gid;
chi_t ch_instance;
int connected;
};
struct mobject_store_ioctx
{
mobject_store_t cluster;
char* pool_name;
char* pool_name;
};
#endif
......@@ -4,9 +4,11 @@
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <mpi.h>
#include <margo.h>
#include <ssg.h>
#include <ssg-mpi.h>
#include <bake-client.h>
#include <bake-server.h>
#include <sdskv-client.h>
......@@ -15,6 +17,8 @@
#include "mobject-server.h"
#include "src/server/core/key-types.h"
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
void usage(void)
{
fprintf(stderr, "Usage: mobject-server-daemon <listen_addr> <cluster_file>\n");
......@@ -33,6 +37,7 @@ typedef struct {
sdskv_provider_handle_t provider_handle;
} sdskv_client_data;
static void finalize_ssg_cb(void* data);
static void finalize_bake_client_cb(void* data);
static void finalize_sdskv_client_cb(void* data);
......@@ -55,8 +60,10 @@ int main(int argc, char *argv[])
listen_addr = argv[1];
cluster_file = argv[2];
/* XXX: MPI required for SSG bootstrapping */
/* MPI required for SSG bootstrapping */
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
/* Margo initialization */
mid = margo_init(listen_addr, MARGO_SERVER_MODE, 0, -1);
......@@ -69,48 +76,72 @@ int main(int argc, char *argv[])
/* SSG initialization */
ret = ssg_init(mid);
if(ret != 0) {
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(mid);
return -1;
}
ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
/* TODO assert the return value of all the calls below */
#if 0
// XXX group name and file should be defined in a config file
ssg_group_id_t gid = ssg_group_create_mpi("mobject-providers", MPI_COMM_WORLD, NULL, NULL);
ASSERT(gid != SSG_GROUP_ID_NULL, "%s", "ssg_group_create_mpi() failed");
ret = ssg_group_id_store("mobject.ssg", gid);
ASSERT(ret == 0, "ssg_group_id_store() failed (ret = %d)\n", ret);
margo_push_finalize_callback(mid, &finalize_ssg_cb, (void*)&gid);
/* Get self address */
ssg_member_id_t self_id = ssg_get_group_self_id(gid);
hg_addr_t self_addr = ssg_get_addr(gid, self_id);
#endif
hg_addr_t self_addr;
margo_addr_self(mid, &self_addr);
/* Bake provider initialization */
/* XXX mplex id and target name should be taken from config file */
uint8_t bake_mplex_id = 1;
const char* bake_target_name = "/dev/shm/mobject.dat";
char bake_target_name[128];
sprintf(bake_target_name, "/dev/shm/mobject.%d.dat", rank);
/* create the bake target if it does not exist */
if(-1 == access(bake_target_name, F_OK)) {
// XXX creating a pool of 10MB - this should come from a config file
ret = bake_makepool(bake_target_name, 10*1024*1024, 0664);
ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
}
bake_provider_t bake_prov;
bake_target_id_t bake_tid;
bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
bake_target_name, ret);
/* Bake provider handle initialization from self addr */
bake_client_data bake_clt_data;
bake_client_init(mid, &(bake_clt_data.client));
bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
ret = bake_client_init(mid, &(bake_clt_data.client));
ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);
/* SDSKV provider initialization */
uint8_t sdskv_mplex_id = 1;
sdskv_provider_t sdskv_prov;
sdskv_database_id_t oid_map_id, name_map_id, seg_map_id, omap_map_id;
sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
sdskv_provider_add_database(sdskv_prov, "oid_map", KVDB_MAP, &oid_map_compare, &oid_map_id);
sdskv_provider_add_database(sdskv_prov, "name_map", KVDB_MAP, &name_map_compare, &name_map_id);
sdskv_provider_add_database(sdskv_prov, "seg_map", KVDB_MAP, &seg_map_compare, &seg_map_id);
sdskv_provider_add_database(sdskv_prov, "omap_map", KVDB_MAP, &omap_map_compare, &omap_map_id);
ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);
ret = sdskv_provider_add_database(sdskv_prov, "oid_map", KVDB_MAP, &oid_map_compare, &oid_map_id);
ASSERT(ret == 0, "sdskv_provider_add_database() failed to add database \"oid_map\" (ret = %d)\n", ret);
ret = sdskv_provider_add_database(sdskv_prov, "name_map", KVDB_MAP, &name_map_compare, &name_map_id);
ASSERT(ret == 0, "sdskv_provider_add_database() failed to add database \"name_map\" (ret = %d)\n", ret);
ret = sdskv_provider_add_database(sdskv_prov, "seg_map", KVDB_MAP, &seg_map_compare, &seg_map_id);
ASSERT(ret == 0, "sdskv_provider_add_database() failed to add database \"seg_map\" (ret = %d)\n", ret);
ret = sdskv_provider_add_database(sdskv_prov, "omap_map", KVDB_MAP, &omap_map_compare, &omap_map_id);
ASSERT(ret == 0, "sdskv_provider_add_database() failed to add database \"omap_map\" (ret = %d)\n", ret);
/* SDSKV provider handle initialization from self addr */
sdskv_client_data sdskv_clt_data;
sdskv_client_init(mid, &(sdskv_clt_data.client));
sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
ret = sdskv_client_init(mid, &(sdskv_clt_data.client));
ASSERT(ret == 0, "sdskv_client_init() failed (ret = %d)\n", ret);
ret = sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
ASSERT(ret == 0, "sdskv_provider_handle_create() failed (ret = %d)\n", ret);
margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);
/* Mobject provider initialization */
......@@ -127,7 +158,7 @@ int main(int argc, char *argv[])
return -1;
}
margo_addr_free(mid,self_addr);
margo_addr_free(mid, self_addr);
margo_wait_for_finalize(mid);
......@@ -136,6 +167,12 @@ int main(int argc, char *argv[])
return 0;
}
static void finalize_ssg_cb(void* data)
{
ssg_group_id_t* gid = (ssg_group_id_t*)data;
ssg_group_destroy(*gid);
}
static void finalize_bake_client_cb(void* data)
{
bake_client_data* clt_data = (bake_client_data*)data;
......@@ -152,7 +189,6 @@ static void finalize_sdskv_client_cb(void* data)
static int oid_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// keys are oid_t (uint64_t)
oid_t x = *((oid_t*)k1);
oid_t y = *((oid_t*)k2);
if(x == y) return 0;
......@@ -162,7 +198,6 @@ static int oid_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk
static int name_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// names are strings (const char*)
const char* n1 = (const char*)k1;
const char* n2 = (const char*)k2;
return strcmp(n1,n2);
......@@ -170,15 +205,6 @@ static int name_map_compare(const void* k1, size_t sk1, const void* k2, size_t s
static int seg_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// segments are as follows:
/*struct segment_key_t {
oid_t oid;
seg_type_t type;
double timestamp;
uint64_t start_index;
uint64_t end_index;
};
*/
const segment_key_t* seg1 = (const segment_key_t*)k1;
const segment_key_t* seg2 = (const segment_key_t*)k2;
if(seg1->oid < seg2->oid) return -1;
......@@ -190,12 +216,6 @@ static int seg_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk
static int omap_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// omap keys are as follows:
/* struct omap_key_t {
oid_t oid;
char key[1];
};
*/
const omap_key_t* ok1 = (const omap_key_t*)k1;
const omap_key_t* ok2 = (const omap_key_t*)k2;
if(ok1->oid < ok2->oid) return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment