Commit b886ddfe authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'master' of xgitlab.cels.anl.gov:sds/mobject-store

parents fac5ba6d d3783667
...@@ -28,6 +28,7 @@ lib_LTLIBRARIES = \ ...@@ -28,6 +28,7 @@ lib_LTLIBRARIES = \
include_HEADERS = \ include_HEADERS = \
include/libmobject-store.h \ include/libmobject-store.h \
include/librados-mobject-store.h \ include/librados-mobject-store.h \
include/mobject-client.h \
include/mobject-server.h include/mobject-server.h
include Make.rules include Make.rules
......
...@@ -15,40 +15,9 @@ extern "C" { ...@@ -15,40 +15,9 @@ extern "C" {
#include <time.h> #include <time.h>
#include <mobject-client.h> #include <mobject-client.h>
// derived from: http://docs.ceph.com/docs/master/mobject_store/api/libmobject_store/
// derived from: http://docs.ceph.com/docs/master/rados/api/librados/
/* KEY TERMS
*
* Pool:
* - an object container providing separate namespaces for objects
* - pools can have different placement & replication strategies
*
* Placement group (PG):
* - Groups of objects within a pool that share the same OSDs.
*
* CRUSH:
* - Data placement algorithm for RADOS objects, mapping OIDs to PGs.
*/
/* ASSUMPTIONS/QUESTIONS
* - Initially we enforce one global pool for each mobject store instance for simplicity.
*
* - Does ch-placement offer an algorithm similar to crush (e.g., PG concept)?
*
* - We will not implement any replication.
*
* - RADOS seems to expose object versions, but I only see them in the 'object operation' portion of their API, which is big and awkward (i.e., would love to not implement that).
*
* - Do we need any of the following from RADOS:
* - async I/O operations?
* - cursor stuff?
* - snapshots?
* - extended attributes?
*
* - RADOS API includes a watch/notify API that sounds like something we could build on top of a generic SSG pub-sub service, if we wanted.
*
*/
/* opaque type for a handle for interacting with a mobject store cluster */ /* opaque type for a handle for interacting with a mobject store cluster */
typedef struct mobject_store_handle *mobject_store_t; typedef struct mobject_store_handle *mobject_store_t;
...@@ -179,18 +148,21 @@ typedef struct mobject_store_completion* mobject_store_completion_t; ...@@ -179,18 +148,21 @@ typedef struct mobject_store_completion* mobject_store_completion_t;
* Create a handle to a mobject cluster. * Create a handle to a mobject cluster.
* *
* @param[in/out] cluster pointer to store mobject cluster handle at * @param[in/out] cluster pointer to store mobject cluster handle at
* @param[in] id the user to connect as (NOTE: ignored in mobject store) * @param[in] id the user to connect as (NOTE: currently ignored in mobject store)
* @returns 0 on success, negative error code on failure * @returns 0 on success, negative error code on failure
*
* NOTES:
* - from mobject_store_create(mobject_store_t * cluster, const char *const user_id)
* - drop user_id from API
* - may eventually need conf. file, env variable checking, etc.
*/ */
int mobject_store_create( int mobject_store_create(
mobject_store_t *cluster, mobject_store_t *cluster,
const char * const id); const char * const id);
static inline
int mobject_store_conf_read_file(
mobject_store_t cluster,
const char * path)
{
return 0;
}
/** /**
* Connect to a mobject cluster. * Connect to a mobject cluster.
* *
......
...@@ -15,6 +15,9 @@ extern "C" { ...@@ -15,6 +15,9 @@ extern "C" {
typedef mobject_store_t rados_t; typedef mobject_store_t rados_t;
#define LIBRADOS_CREATE_EXCLUSIVE LIBMOBJECT_CREATE_EXCLUSIVE
#define LIBRADOS_CREATE_IDEMPOTENT LIBMOBJECT_CREATE_IDEMPOTENT
#define LIBRADOS_OPERATION_NOFLAG LIBMOBJECT_OPERATION_NOFLAG #define LIBRADOS_OPERATION_NOFLAG LIBMOBJECT_OPERATION_NOFLAG
#define LIBRADOS_OPERATION_BALANCE_READS LIBMOBJECT_OPERATION_BALANCE_READS #define LIBRADOS_OPERATION_BALANCE_READS LIBMOBJECT_OPERATION_BALANCE_READS
#define LIBRADOS_OPERATION_LOCALIZE_READS LIBMOBJECT_OPERATION_LOCALIZE_READS #define LIBRADOS_OPERATION_LOCALIZE_READS LIBMOBJECT_OPERATION_LOCALIZE_READS
...@@ -34,6 +37,7 @@ typedef mobject_store_completion_t rados_completion_t; ...@@ -34,6 +37,7 @@ typedef mobject_store_completion_t rados_completion_t;
typedef mobject_store_callback_t rados_callback_t; typedef mobject_store_callback_t rados_callback_t;
#define rados_create mobject_store_create #define rados_create mobject_store_create
#define rados_conf_read_file mobject_store_conf_read_file
#define rados_connect mobject_store_connect #define rados_connect mobject_store_connect
#define rados_shutdown mobject_store_shutdown #define rados_shutdown mobject_store_shutdown
#define rados_pool_create mobject_store_pool_create #define rados_pool_create mobject_store_pool_create
......
...@@ -58,7 +58,7 @@ extern "C" { ...@@ -58,7 +58,7 @@ extern "C" {
* *
* @param client client managing the provider handle * @param client client managing the provider handle
* @param addr address of the provider * @param addr address of the provider
* @param mplex_id multiplex id of the provider * @param provider_id multiplex id of the provider
* @param handle resulting handle * @param handle resulting handle
* *
* @return 0 on success, -1 on failure * @return 0 on success, -1 on failure
...@@ -66,7 +66,7 @@ extern "C" { ...@@ -66,7 +66,7 @@ extern "C" {
int mobject_provider_handle_create( int mobject_provider_handle_create(
mobject_client_t client, mobject_client_t client,
hg_addr_t addr, hg_addr_t addr,
uint8_t mplex_id, uint16_t provider_id,
mobject_provider_handle_t* handle); mobject_provider_handle_t* handle);
/** /**
......
...@@ -27,7 +27,7 @@ typedef struct mobject_server_context* mobject_provider_t; ...@@ -27,7 +27,7 @@ typedef struct mobject_server_context* mobject_provider_t;
* Start a mobject server instance * Start a mobject server instance
* *
* @param[in] mid margo instance id * @param[in] mid margo instance id
* @param[in] mplex_id multiplex id of the provider * @param[in] provider_id id of the provider
* @param[in] pool Argobots pool for the provider * @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data * @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] sdskv_ph SDSKV provider handle to use to access metadata * @param[in] sdskv_ph SDSKV provider handle to use to access metadata
...@@ -39,7 +39,7 @@ typedef struct mobject_server_context* mobject_provider_t; ...@@ -39,7 +39,7 @@ typedef struct mobject_server_context* mobject_provider_t;
*/ */
int mobject_provider_register( int mobject_provider_register(
margo_instance_id mid, margo_instance_id mid,
uint8_t mplex_id, uint16_t provider_id,
ABT_pool pool, ABT_pool pool,
bake_provider_handle_t bake_ph, bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph, sdskv_provider_handle_t sdskv_ph,
......
...@@ -46,10 +46,8 @@ int mobject_aio_write_op_operate( ...@@ -46,10 +46,8 @@ int mobject_aio_write_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id);
margo_request mreq; margo_request mreq;
ret = margo_iforward(h, &in, &mreq); ret = margo_provider_iforward(mph->provider_id, h, &in, &mreq);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
...@@ -97,10 +95,8 @@ int mobject_aio_read_op_operate( ...@@ -97,10 +95,8 @@ int mobject_aio_read_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id);
margo_request mreq; margo_request mreq;
ret = margo_iforward(h, &in, &mreq); ret = margo_provider_iforward(mph->provider_id, h, &in, &mreq);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
......
...@@ -30,7 +30,7 @@ struct mobject_client { ...@@ -30,7 +30,7 @@ struct mobject_client {
struct mobject_provider_handle { struct mobject_provider_handle {
mobject_client_t client; mobject_client_t client;
hg_addr_t addr; hg_addr_t addr;
uint8_t mplex_id; uint16_t provider_id;
uint64_t refcount; uint64_t refcount;
}; };
......
...@@ -14,31 +14,13 @@ ...@@ -14,31 +14,13 @@
#include <ssg.h> #include <ssg.h>
#include "mobject-client.h" #include "mobject-client.h"
#include "src/client/mobject-client-impl.h"
#include "src/io-chain/prepare-write-op.h" #include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h" #include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h" #include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h" #include "src/rpc-types/read-op.h"
#include "src/util/log.h" #include "src/util/log.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int mobject_client_register(mobject_client_t client, margo_instance_id mid) static int mobject_client_register(mobject_client_t client, margo_instance_id mid)
{ {
int ret; int ret;
...@@ -110,7 +92,7 @@ int mobject_client_finalize(mobject_client_t client) ...@@ -110,7 +92,7 @@ int mobject_client_finalize(mobject_client_t client)
int mobject_provider_handle_create( int mobject_provider_handle_create(
mobject_client_t client, mobject_client_t client,
hg_addr_t addr, hg_addr_t addr,
uint8_t mplex_id, uint16_t provider_id,
mobject_provider_handle_t* handle) mobject_provider_handle_t* handle)
{ {
if(client == MOBJECT_CLIENT_NULL) return -1; if(client == MOBJECT_CLIENT_NULL) return -1;
...@@ -126,9 +108,9 @@ int mobject_provider_handle_create( ...@@ -126,9 +108,9 @@ int mobject_provider_handle_create(
return -1; return -1;
} }
provider->client = client; provider->client = client;
provider->mplex_id = mplex_id; provider->provider_id = provider_id;
provider->refcount = 1; provider->refcount = 1;
client->num_provider_handles += 1; client->num_provider_handles += 1;
...@@ -192,9 +174,7 @@ int mobject_write_op_operate( ...@@ -192,9 +174,7 @@ int mobject_write_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id); ret = margo_provider_forward(mph->provider_id, h, &in);
ret = margo_forward(h, &in);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
...@@ -247,9 +227,7 @@ int mobject_read_op_operate( ...@@ -247,9 +227,7 @@ int mobject_read_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id); ret = margo_provider_forward(mph->provider_id, h, &in);
ret = margo_forward(h, &in);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_read_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_read_op_operate()\n");
margo_destroy(h); margo_destroy(h);
......
...@@ -20,7 +20,7 @@ struct mobject_server_context ...@@ -20,7 +20,7 @@ struct mobject_server_context
{ {
/* margo, bake, sds-keyval, ssg state */ /* margo, bake, sds-keyval, ssg state */
margo_instance_id mid; margo_instance_id mid;
uint8_t mplex_id; uint16_t provider_id;
ABT_pool pool; ABT_pool pool;
/* ssg-related data */ /* ssg-related data */
ssg_group_id_t gid; ssg_group_id_t gid;
......
...@@ -36,7 +36,7 @@ static void mobject_finalize_cb(void* data); ...@@ -36,7 +36,7 @@ static void mobject_finalize_cb(void* data);
int mobject_provider_register( int mobject_provider_register(
margo_instance_id mid, margo_instance_id mid,
uint8_t mplex_id, uint16_t provider_id,
ABT_pool pool, ABT_pool pool,
bake_provider_handle_t bake_ph, bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph, sdskv_provider_handle_t sdskv_ph,
...@@ -52,9 +52,9 @@ int mobject_provider_register( ...@@ -52,9 +52,9 @@ int mobject_provider_register(
{ {
hg_id_t id; hg_id_t id;
hg_bool_t flag; hg_bool_t flag;
margo_registered_name_mplex(mid, "mobject_write_op", mplex_id, &id, &flag); margo_provider_registered_name(mid, "mobject_write_op", provider_id, &id, &flag);
if(flag == HG_TRUE) { if(flag == HG_TRUE) {
fprintf(stderr, "mobject_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id); fprintf(stderr, "mobject_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
return -1; return -1;
} }
} }
...@@ -64,7 +64,7 @@ int mobject_provider_register( ...@@ -64,7 +64,7 @@ int mobject_provider_register(
if (!srv_ctx) if (!srv_ctx)
return -1; return -1;
srv_ctx->mid = mid; srv_ctx->mid = mid;
srv_ctx->mplex_id = mplex_id; srv_ctx->provider_id = provider_id;
srv_ctx->pool = pool; srv_ctx->pool = pool;
srv_ctx->ref_count = 1; srv_ctx->ref_count = 1;
...@@ -135,15 +135,15 @@ int mobject_provider_register( ...@@ -135,15 +135,15 @@ int mobject_provider_register(
hg_id_t rpc_id; hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_MPLEX(mid, "mobject_write_op", rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult, write_op_in_t, write_op_out_t, mobject_write_op_ult,
mplex_id, pool); provider_id, pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, srv_ctx, NULL); margo_register_data(mid, rpc_id, srv_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "mobject_read_op", rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_read_op",
read_op_in_t, read_op_out_t, mobject_read_op_ult, read_op_in_t, read_op_out_t, mobject_read_op_ult,
mplex_id, pool); provider_id, pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, srv_ctx, NULL); margo_register_data(mid, rpc_id, srv_ctx, NULL);
margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx); margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx);
...@@ -170,7 +170,7 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h) ...@@ -170,7 +170,7 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.oid = 0; vargs.oid = 0;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id); vargs.srv_ctx = margo_registered_data(mid, info->id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR; if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
vargs.client_addr_str = in.client_addr; vargs.client_addr_str = in.client_addr;
vargs.client_addr = info->addr; vargs.client_addr = info->addr;
...@@ -223,7 +223,7 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) ...@@ -223,7 +223,7 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.oid = 0; vargs.oid = 0;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id); vargs.srv_ctx = margo_registered_data(mid, info->id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR; if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
vargs.client_addr_str = in.client_addr; vargs.client_addr_str = in.client_addr;
vargs.client_addr = info->addr; vargs.client_addr = info->addr;
......
...@@ -9,11 +9,7 @@ ...@@ -9,11 +9,7 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#ifdef USE_RADOS
#include <rados/librados.h>
#else
#include <librados-mobject-store.h> #include <librados-mobject-store.h>
#endif
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment