kv-client.c 9.57 KB
Newer Older
1
2
3
4
5
6
#include "sds-keyval.h"
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
#include <abt.h>

Rob Latham's avatar
Rob Latham committed
7
#include <assert.h>
8
9


10
11
12
13
14
15
// pass in Margo instance ID
kv_context_t *kv_client_register(const margo_instance_id mid) {
  hg_return_t ret;
  kv_context_t *context;
  context = (kv_context_t*)malloc(sizeof(kv_context_t));
  memset(context, 0, sizeof(kv_context_t));
16

17
18
19
20
  context->mid = mid;
  
  context->put_id = MARGO_REGISTER(context->mid, "put",
				   put_in_t, put_out_t, NULL);
21

22
23
  context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
					bulk_put_in_t, bulk_put_out_t, NULL);
24

25
26
  context->get_id = MARGO_REGISTER(context->mid, "get",
				   get_in_t, get_out_t, NULL);
27

28
29
  context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
					bulk_get_in_t, bulk_get_out_t, NULL);
30

31
32
  context->open_id = MARGO_REGISTER(context->mid, "open",
				    open_in_t, open_out_t, NULL);
33

34
35
  context->close_id = MARGO_REGISTER(context->mid, "close",
				     void, close_out_t, NULL);
36

37
38
  context->bench_id = MARGO_REGISTER(context->mid, "bench",
				     bench_in_t, bench_out_t, NULL);
39

40
41
42
  context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
					void, void, NULL);
  return context;
43
44
}

45
hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *db_name) {
46
47
48
49
  hg_return_t ret = HG_SUCCESS;
  hg_handle_t handle;
  open_in_t open_in;
  open_out_t open_out;
Rob Latham's avatar
Rob Latham committed
50

51
52
53
  printf("kv-client: kv_open, server_addr %s\n", server_addr);
  ret = margo_addr_lookup(context->mid, server_addr, &(context->svr_addr));
  assert(ret == HG_SUCCESS);
54

55
56
57
  ret = margo_create(context->mid, context->svr_addr,
		     context->open_id, &handle);
  assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
58

59
  open_in.name = (hg_string_t)db_name;
60
	
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
  ret = margo_forward(handle, &open_in);
  assert(ret == HG_SUCCESS);
  ret = margo_get_output(handle, &open_out);
  assert(ret == HG_SUCCESS);
  assert(open_out.ret == HG_SUCCESS);

  /* set up the other calls here: idea is we'll pay the registration cost
   * once here but can reuse the handles and identifiers multiple times */
  
  /* put/get handles: can have as many in flight as we have registered.
   * BAKE has a handle-caching mechanism we should consult.
   * should margo be caching handles? */
  
  ret = margo_create(context->mid, context->svr_addr,
		     context->put_id, &(context->put_handle));
  assert(ret == HG_SUCCESS);
  ret = margo_create(context->mid, context->svr_addr,
		     context->bulk_put_id, &(context->bulk_put_handle));
  assert(ret == HG_SUCCESS);
  ret = margo_create(context->mid, context->svr_addr,
		     context->get_id, &(context->get_handle));
  assert(ret == HG_SUCCESS);
  ret = margo_create(context->mid, context->svr_addr,
		     context->bulk_get_id, &(context->bulk_get_handle));
  assert(ret == HG_SUCCESS);
  ret = margo_create(context->mid, context->svr_addr,
		     context->shutdown_id, &(context->shutdown_handle));
  assert(ret == HG_SUCCESS);
89
	
90
91
  margo_free_output(handle, &open_out);
  margo_destroy(handle);
92
	
93
  return HG_SUCCESS;
Rob Latham's avatar
Rob Latham committed
94
95
96
97
}

/* we gave types in the open call.  Will need to maintain in 'context' the
 * size. */
98
99
100
101
102
103
104
105
hg_return_t kv_put(kv_context_t *context, 
		   void *key, hg_size_t ksize,
		   void *value, hg_size_t vsize) {
  hg_return_t ret;
  hg_size_t msize;
  
  msize = ksize + vsize + 2*sizeof(hg_size_t);

106
  printf("kv_put ksize %lu, vsize %lu, msize %lu\n", ksize, vsize, msize);
107
108
109
110
  /* 
   * If total payload is large, we'll do our own
   * explicit transfer of the value data.
   */
111
112
  double st1, et1, st2, et2;
  st1 = ABT_get_wtime();
113
114
115
116
117
118
119
120
121
  if (msize <= MAX_RPC_MESSAGE_SIZE) {
    put_in_t pin;
    put_out_t pout;

    pin.pi.key = (kv_data_t)key;
    pin.pi.ksize = ksize;
    pin.pi.value = (kv_data_t)value;
    pin.pi.vsize = vsize;

122
    st2 = ABT_get_wtime();
123
    ret = margo_forward(context->put_handle, &pin);
124
    et2 = ABT_get_wtime();
125
    printf("kv_put forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
126
127
    assert(ret == HG_SUCCESS);

128
    ret = margo_get_output(context->put_handle, &pout);
129
    assert(ret == HG_SUCCESS);
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    ret = pout.ret;

    margo_free_output(context->put_handle, &pout);
  }
  else {
    // use bulk transfer method to move value
    bulk_put_in_t bpin;
    bulk_put_out_t bpout;

    /*
     * If (ksize + sizeof(hg_size_t) is too large
     * we'll simply rely on HG to handle it rather
     * than do multiple bulk transfers. Most likely
     * key payload size is << value payload size
     */
    bpin.bulk.key = (kv_data_t)key;
    bpin.bulk.ksize = ksize;
    bpin.bulk.vsize = vsize;

149
    st2 = ABT_get_wtime();
150
151
    ret = margo_bulk_create(context->mid, 1, &value, &bpin.bulk.vsize,
			    HG_BULK_READ_ONLY, &bpin.bulk.handle);
152
153
    et2 = ABT_get_wtime();
    printf("kv_put bulk create time: %f microseconds\n", (et2-st2)*1000000);
154
155
    assert(ret == HG_SUCCESS);

156
    st2 = ABT_get_wtime();
157
    ret = margo_forward(context->bulk_put_handle, &bpin);
158
    et2 = ABT_get_wtime();
159
    printf("kv_put bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
160
161
    assert(ret == HG_SUCCESS);

162
163
164
    ret = margo_get_output(context->bulk_put_handle, &bpout);
    assert(ret == HG_SUCCESS);
    ret = bpout.ret; // make sure the server side says all is OK
165

166
167
    margo_free_output(context->bulk_put_handle, &bpout);
  }
168
169
  et1 = ABT_get_wtime();
  printf("kv_put time: %f microseconds\n", (et1-st1)*1000000);
170

171
    return ret;
Rob Latham's avatar
Rob Latham committed
172
}
David Rich's avatar
David Rich committed
173

174
175
176
177
// vsize is in/out
hg_return_t kv_get(kv_context_t *context, 
		   void *key, hg_size_t ksize,
		   void *value, hg_size_t *vsize)
David Rich's avatar
David Rich committed
178
{
179
  hg_return_t ret;
180
181
182
183
  hg_size_t size;
  hg_size_t msize;
  
  size = *(hg_size_t*)vsize;
184
  msize = size + sizeof(hg_size_t) + sizeof(hg_return_t);
185

186
  printf("kv_get ksize %lu, vsize %lu, msize %lu\n", ksize, size, msize);
187
188
189
190
  /* 
   * If return payload is large, we'll do our own
   * explicit transfer of the value data.
   */
191
192
  double st1, et1, st2, et2;
  st1 = ABT_get_wtime();
193
194
195
196
197
198
199
200
  if (msize <= MAX_RPC_MESSAGE_SIZE) {
    get_in_t gin;
    get_out_t gout;

    gin.gi.key = (kv_data_t)key;
    gin.gi.ksize = ksize;
    gin.gi.vsize = size;

201
    st2 = ABT_get_wtime();
202
    ret = margo_forward(context->get_handle, &gin);
203
    et2 = ABT_get_wtime();
204
    printf("kv_get forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
205
    assert(ret == HG_SUCCESS);
206

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    ret = margo_get_output(context->get_handle, &gout);
    assert(ret == HG_SUCCESS);
    ret = gout.go.ret;

    /*
     * Return size of data transferred. Note that
     * size may be zero if there was a problem 
     * with the transfer.
     */
    *vsize = (hg_size_t)gout.go.vsize;
    if (gout.go.vsize > 0) {
      memcpy(value, gout.go.value, gout.go.vsize);
    }

    margo_free_output(context->get_handle, &gout);
  }
  else {
    bulk_get_in_t bgin;
    bulk_get_out_t bgout;

    bgin.bulk.key = (kv_data_t)key;
    bgin.bulk.ksize = ksize;
    bgin.bulk.vsize = size;

231
    st2 = ABT_get_wtime();
232
233
    ret = margo_bulk_create(context->mid, 1, &value, &bgin.bulk.vsize,
			    HG_BULK_WRITE_ONLY, &bgin.bulk.handle);
234
235
    et2 = ABT_get_wtime();
    printf("kv_get bulk create time: %f microseconds\n", (et2-st2)*1000000);
236
    assert(ret == HG_SUCCESS);
237

238
    st2 = ABT_get_wtime();
239
    ret = margo_forward(context->bulk_get_handle, &bgin);
240
    et2 = ABT_get_wtime();
241
    printf("kv_get bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
242
    assert(ret == HG_SUCCESS);
243

244
245
246
    ret = margo_get_output(context->bulk_get_handle, &bgout);
    assert(ret == HG_SUCCESS);
    ret = bgout.ret; // make sure the server side says all is OK
247

248
249
250
251
252
253
    /*
     * Return size of data transferred. Note that
     * size may be zero if there was a problem 
     * with the transfer.
     */
    *vsize = (hg_size_t)bgout.size;
254

255
256
    margo_free_output(context->bulk_get_handle, &bgout);
  }
257
258
  et1 = ABT_get_wtime();
  printf("kv_get time: %f microseconds\n", (et1-st1)*1000000);
David Rich's avatar
David Rich committed
259

260
  return ret;
David Rich's avatar
David Rich committed
261
262
}

263
hg_return_t kv_close(kv_context_t *context)
Rob Latham's avatar
Rob Latham committed
264
{
265
266
267
    hg_return_t ret;
    hg_handle_t handle;
    close_out_t close_out;
Rob Latham's avatar
Rob Latham committed
268

269
270
271
    ret = margo_create(context->mid, context->svr_addr,
	    context->close_id, &handle);
    assert(ret == HG_SUCCESS);
272

273
274
    ret = margo_forward(handle, NULL);
    assert(ret == HG_SUCCESS);
275

276
277
278
279
280
281
282
283
    ret = margo_get_output(handle, &close_out);
    assert(ret == HG_SUCCESS);
    assert(close_out.ret == HG_SUCCESS);

    margo_free_output(handle, &close_out);
    margo_destroy(handle);

    return HG_SUCCESS;
Rob Latham's avatar
Rob Latham committed
284
}
285

286
287
288
289
290
291
bench_result_t *kv_benchmark(kv_context *context, int32_t count) {
  hg_return_t ret;
  hg_handle_t handle;
  bench_in_t bench_in;
  bench_out_t bench_out;
  bench_result_t *result = NULL;
292

293
    ret = margo_create(context->mid, context->svr_addr,
294
	    context->bench_id, &handle);
295
    assert(ret == HG_SUCCESS);
296

297
298
299
300
301
302
303
304
305
306
307
  ret = margo_get_output(handle, &bench_out);
  assert(ret == HG_SUCCESS);
    
  result = malloc(sizeof(bench_result_t));
  result->nkeys = bench_out.result.nkeys;
  result->insert_time = bench_out.result.insert_time;
  result->read_time = bench_out.result.read_time;
  result->overhead = bench_out.result.overhead;

  margo_free_output(handle, &bench_out);
  margo_destroy(handle);
Rob Latham's avatar
Rob Latham committed
308
309

    return result;
310
311
}

312
hg_return_t kv_client_deregister(kv_context_t *context) {
313
  hg_return_t ret;
314

315
316
317
318
319
  margo_destroy(context->put_handle);
  margo_destroy(context->get_handle);
  margo_destroy(context->bulk_put_handle);
  margo_destroy(context->bulk_get_handle);
  margo_destroy(context->shutdown_handle);
320

321
322
  ret = margo_addr_free(context->mid, context->svr_addr);
  assert(ret == HG_SUCCESS);
323

324
325
326
  free(context);

  return HG_SUCCESS;
327
}
328

329
// only one client calls shutdown
330
hg_return_t kv_client_signal_shutdown(kv_context_t *context) {
331
  hg_return_t ret;
332
333
334
335
336
337

  ret = margo_forward(context->shutdown_handle, NULL);
  assert(ret == HG_SUCCESS);

  return HG_SUCCESS;
}