...
 
Commits (2)
......@@ -94,7 +94,6 @@ int mobject_store_connect(mobject_store_t cluster)
proto[i] = svr_addr_str[i];
/* intialize margo */
fprintf(stderr,"Client initialized with proto = %s\n",proto);
/* XXX: probably want to expose some way of tweaking threading parameters */
margo_instance_id mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (mid == MARGO_INSTANCE_NULL)
......@@ -340,7 +339,6 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, server_idx);
fprintf(stderr,"Object oid=%s will go to server %ld\n", oid, server_idx);
// TODO for now multiplex id is hard-coded as 1
int r = mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
......@@ -418,7 +416,6 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
ch_placement_find_closest(ioctx->cluster->ch_instance, oid_hash, 1, &server_idx);
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, server_idx);
fprintf(stderr,"Object oid=%s is read from server %ld\n", oid, server_idx);
// TODO for now multiplex id is hard-coded as 1
int r = mobject_provider_handle_create(ioctx->cluster->mobject_clt, svr_addr, 1, &mph);
if(r != 0) return r;
......
......@@ -143,16 +143,16 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
if(len > SMALL_REGION_THRESHOLD) {
ret = bake_create(bake_ph, bti, len, &rid);
if(ret != 0) {
ERROR fprintf(stderr,"bake_create returned %d\n",ret);
ERROR bake_perror("bake_create",ret);
return;
}
ret = bake_proxy_write(bake_ph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
ERROR bake_perror( "bake_proxy_write", ret);
}
ret = bake_persist(bake_ph, rid, offset, len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
ERROR bake_perror("bake_persist", ret);
}
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
......@@ -215,19 +215,19 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
ret = bake_create(bph, bti, data_len, &rid);
if(ret != 0) {
ERROR fprintf(stderr, "bake_create returned %d\n", ret);
ERROR bake_perror("bake_create", ret);
LEAVING;
return;
}
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_proxy_write returned %d\n", ret);
ERROR bake_perror("bake_proxy_write", ret);
LEAVING;
return;
}
ret = bake_persist(bph, rid, offset, data_len);
if(ret != 0) {
ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
ERROR bake_perror("bake_persist", ret);
LEAVING;
return;
}
......@@ -299,8 +299,11 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
bake_region_id_t rid;
ret = bake_create(bph, bti, len, &rid);
if (ret != 0) bake_perror("bake_create", ret);
ret = bake_proxy_write(bph, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
if (ret != 0) bake_perror("bake_proxy_write", ret);
ret = bake_persist(bph, rid, offset, len);
if (ret != 0) bake_perror("bake_persist", ret);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);
......@@ -406,8 +409,8 @@ void write_op_exec_remove(void* u)
ret = bake_remove(bake_ph, region);
if (ret != BAKE_SUCCESS) {
/* XXX should save the error and keep removing */
ERROR fprintf(stderr, "write_op_exec_remove: "
"error in bake_remove() (ret = %d)\n", ret);
ERROR bake_perror("write_op_exec_remove: "
"error in bake_remove()", ret);
LEAVING;
return;
}
......
......@@ -84,21 +84,26 @@ int main(int argc, char *argv[])
if(-1 == access(bake_target_name, F_OK)) {
// XXX creating a pool of 10MB - this should come from a config file
ret = bake_makepool(bake_target_name, 10*1024*1024, 0664);
if (ret != 0) bake_perror("bake_makepool", ret);
ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
}
bake_provider_t bake_prov;
bake_target_id_t bake_tid;
ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
if (ret != 0) bake_perror("bake_provider_register", ret);
ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
if (ret != 0) bake_perror("bake_provider_add_storage_target", ret);
ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
bake_target_name, ret);
/* Bake provider handle initialization from self addr */
bake_client_data bake_clt_data;
ret = bake_client_init(mid, &(bake_clt_data.client));
if (ret != 0) bake_perror("bake_client_init", ret);
ASSERT(ret == 0, "bake_client_init() failed (ret = %d)\n", ret);
ret = bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
if (ret != 0) bake_perror("bake_provider_handle_create", ret);
ASSERT(ret == 0, "bake_provider_handle_create() failed (ret = %d)\n", ret);
margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);
......