kvgroup-client.cc 4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
#include "sds-keyval-group.h"

#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <iostream>

unsigned long server_indexes[CH_MAX_REPLICATION];

kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{
12
  kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  memset(context, 0, sizeof(kvgroup_context_t));
  
  int sret = ssg_init(mid);
  assert(sret == SSG_SUCCESS);

  sret = ssg_group_attach(gid);
  assert(sret == SSG_SUCCESS);
  
  /* update kvgroup_context_t with MID, GID */
  context->mid = mid;
  context->gid = gid;

  return context;
}

hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
{
30
  hg_size_t addr_str_sz = 128;
31
32
33
34
35
  char addr_str[addr_str_sz];
  hg_return_t ret = HG_SUCCESS;
  
  // register and open a connection with each kv-server in the group
  hg_size_t gsize = ssg_get_group_size(context->gid);
36
37
  context->gsize = gsize;
  context->kv_context = (kv_context_t**)malloc(gsize*sizeof(kv_context_t*));
38
39
40
41
  for (hg_size_t i=0; i<gsize; i++) {
    // register this client context with Margo
    context->kv_context[i] = kv_client_register(context->mid);
    assert(context->kv_context[i] != NULL);
42
    hg_addr_t server_addr = ssg_get_addr(context->gid, i);
43
44
45
    // turn server_addr into string
    ret = margo_addr_to_string(context->mid, addr_str, &addr_str_sz, server_addr);
    assert(ret == HG_SUCCESS);
46
47
48
    margo_addr_free(context->mid, server_addr);
    std::string dbname(db_name);
    dbname += std::string(".") + std::to_string(i); // each session uses unique db name
49
    // open client connection with this server
50
    std::cout << "request open of " << dbname << " from server " << addr_str << std::endl;
51
52
53
54
55
56
57
    ret = kv_open(context->kv_context[i], addr_str, dbname.c_str());
    assert(ret == HG_SUCCESS);
  }

  // initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now
  context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);

58
  return HG_SUCCESS;
59
60
61
62
63
64
65
66
67
68
69
}

// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
			void *key, hg_size_t ksize,
			void *value, hg_size_t vsize)
{

  // not using any replication for now (is this right?)
  ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
70
  kv_context_t *kv_context = context->kv_context[server_indexes[0]];
71
72
73
74
75
76
77
78
79
80
81
82
83
  
  return kv_put(kv_context, key, ksize, value, vsize);
}

// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
// vsize is in/out
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
			void *key, hg_size_t ksize,
			void *value, hg_size_t *vsize)
{
  // not using any replication for now (is this right?)
  ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
84
  kv_context_t *kv_context = context->kv_context[server_indexes[0]];
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
  
  return kv_get(kv_context, key, ksize, value, vsize);
}

hg_return_t kvgroup_close(kvgroup_context_t *context)
{
  hg_return_t ret;
  for (hg_size_t i=0; i<context->gsize; i++) {
    ret = kv_close(context->kv_context[i]);
    assert(ret == HG_SUCCESS);
  }
  return HG_SUCCESS;
}

hg_return_t kvgroup_client_deregister(kvgroup_context_t *context)
{
  hg_return_t ret;
  for (hg_size_t i=0; i<context->gsize; i++) {
    ret = kv_client_deregister(context->kv_context[i]);
    assert(ret == HG_SUCCESS);
  }
  ch_placement_finalize(context->ch_instance);
  ssg_group_detach(context->gid);
  ssg_finalize();
109
110
111
112
  margo_diag_dump(context->mid, "-", 0);
  //margo_finalize(context->mid); // workaround since we core dump here
  ssg_group_id_free(context->gid);
  free(context->kv_context);
113
  free(context);
114
  return HG_SUCCESS;
115
116
117
118
119
120
121
}

// only one client calls shutdown
hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context)
{
  hg_return_t ret;
  for (hg_size_t i=0; i<context->gsize; i++) {
122
    ret = kv_client_signal_shutdown(context->kv_context[i]);
123
124
125
126
    assert(ret == HG_SUCCESS);
  }
  return HG_SUCCESS;
}