kv-server.cc 15.2 KB
Newer Older
1
#include "sds-keyval.h"
2
3
#include "datastore.h"

4
5
6
7
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
#include <abt.h>
8
#include <ssg.h>
9

10
11
12
//#include <random>
#include <stdlib.h>
#include <time.h>
13
#include <iostream>
14
#include <assert.h>
15

16
// since this is global, we're assuming this server instance will manage a single DB
17
AbstractDataStore *datastore = NULL;
18
std::string db_name;
Rob Latham's avatar
Rob Latham committed
19

20
static hg_return_t open_handler(hg_handle_t handle)
Rob Latham's avatar
Rob Latham committed
21
{
22
23
24
25
26
27
  hg_return_t ret;
  open_in_t in;
  open_out_t out;

  ret = margo_get_input(handle, &in);
  std::string in_name(in.name);
28
#ifdef KV_DEBUG
29
  std::cout << "SERVER: OPEN " << in_name << std::endl;
30
#endif
31
32

  if (!datastore) {
33
    //datastore = new BwTreeDataStore(); // testing BwTree
34
35
    datastore = new LevelDBDataStore(); // testing LevelDB
    //datastore = new BerkeleyDBDataStore(); // testing BerkeleyDB
36
    //datastore->set_in_memory(true); // testing in-memory BerkeleyDB
37
38
    db_name = in_name;
    datastore->createDatabase(db_name);
39
#ifdef KV_DEBUG
40
    std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
41
#endif
42
43
44
45
    out.ret = HG_SUCCESS;
  }
  else {
    if (db_name == in_name) {
46
#ifdef KV_DEBUG
47
      std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
48
#endif
49
      out.ret = HG_SUCCESS;
50
51
    }
    else {
52
53
#ifdef KV_DEBUG
      std::cout << "SERVER OPEN failed: currently managing " << db_name
54
		<< " and unable to process OPEN request for " << in_name << std::endl;
55
56
#endif
      out.ret = HG_OTHER_ERROR;
57
    }
Rob Latham's avatar
Rob Latham committed
58

59
60
    ret = margo_respond(handle, &out);
    assert(ret == HG_SUCCESS);
61

62
63
    margo_free_input(handle, &in);
    margo_destroy(handle);
Rob Latham's avatar
Rob Latham committed
64

65
    return HG_SUCCESS;
66
}
Rob Latham's avatar
Rob Latham committed
67
DEFINE_MARGO_RPC_HANDLER(open_handler)
68

69
static hg_return_t close_handler(hg_handle_t handle)
70
{
71
72
  hg_return_t ret;
  close_out_t out;
73

74
75
76
77
  out.ret = HG_SUCCESS;
  
  ret = margo_respond(handle, &out);
  assert(ret == HG_SUCCESS);
78

79
  margo_destroy(handle);
80

81
  return HG_SUCCESS;
82
}
Rob Latham's avatar
Rob Latham committed
83
DEFINE_MARGO_RPC_HANDLER(close_handler)
84

85
static hg_return_t put_handler(hg_handle_t handle)
86
{
87
  hg_return_t ret;
88
89
  put_in_t pin;
  put_out_t pout;
90
  double st1, et1;
91

92
  st1 = ABT_get_wtime();
93
  ret = margo_get_input(handle, &pin);
94
  assert(ret == HG_SUCCESS);
95
	
96
97
98
99
100
101
102
103
104
105
106
107
#ifdef KV_DEBUG
  std::cout << "put_handler processing key with key size " << pin.pi.ksize << std::endl;
#endif
  ds_bulk_t kdata;
  kdata.resize(pin.pi.ksize);
  memcpy(kdata.data(), pin.pi.key, pin.pi.ksize);
#ifdef KV_DEBUG
  std::cout << "put_handler processing value with value size " << pin.pi.vsize << std::endl;
#endif
  ds_bulk_t vdata;
  vdata.resize(pin.pi.vsize);
  memcpy(vdata.data(), pin.pi.value, pin.pi.vsize);
108

109
#ifdef KV_DEBUG
110
  std::cout << "put_handler calling datastore->put()" << std::endl;
111
#endif
112
113
  if (datastore->put(kdata, vdata)) {
    pout.ret = HG_SUCCESS;
114
115
  }
  else {
116
    pout.ret = HG_OTHER_ERROR;
117
  }
118

119
120
121
  et1 = ABT_get_wtime();
  std::cout << "put_handler time: " <<  (et1-st1)*1000000 << " microseconds" << std::endl;

122
  ret = margo_respond(handle, &pout);
123
  assert(ret == HG_SUCCESS);
124

125
  margo_free_input(handle, &pin);
126
  margo_destroy(handle);
127

128
  return HG_SUCCESS;
129
}
Rob Latham's avatar
Rob Latham committed
130
DEFINE_MARGO_RPC_HANDLER(put_handler)
131

132
static hg_return_t bulk_put_handler(hg_handle_t handle)
David Rich's avatar
David Rich committed
133
{
134
135
136
137
138
139
  hg_return_t ret;
  bulk_put_in_t bpin;
  bulk_put_out_t bpout;
  hg_bulk_t bulk_handle;
  const struct hg_info *hgi;
  margo_instance_id mid;
140
  double st1, et1;
141

142
  st1 = ABT_get_wtime();
143
144
145
146
147
148
149
150
151
  ret = margo_get_input(handle, &bpin);
  assert(ret == HG_SUCCESS);

  /* get handle info and margo instance */
  hgi = margo_get_info(handle);
  assert(hgi);
  mid = margo_hg_info_get_instance(hgi);
  assert(mid != MARGO_INSTANCE_NULL);

152
153
154
155
156
157
158
159
160
161
162
163
164
165
#ifdef KV_DEBUG
  std::cout << "bulk_put_handler processing key with key size " << bpin.bulk.ksize << std::endl;
#endif
  ds_bulk_t kdata;
  kdata.resize(bpin.bulk.ksize);
  memcpy(kdata.data(), bpin.bulk.key, bpin.bulk.ksize);
#ifdef KV_DEBUG
  std::cout << "bulk_put_handler processing value with value size " << bpin.bulk.vsize << std::endl;
#endif
  ds_bulk_t vdata;
  vdata.resize(bpin.bulk.vsize);
  void *buffer = (void*)vdata.data();
  ret = margo_bulk_create(mid, 1, (void**)&buffer, &bpin.bulk.vsize, 
			  HG_BULK_WRITE_ONLY, &bulk_handle);
166
  assert(ret == HG_SUCCESS);
167
168
  ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, bpin.bulk.handle, 0,
			    bulk_handle, 0, bpin.bulk.vsize);
169
170
  assert(ret == HG_SUCCESS);

171
#ifdef KV_DEBUG
172
  std::cout << "bulk_put_handler calling datastore->put()" << std::endl;
173
#endif
174
  if (datastore->put(kdata, vdata)) {
175
176
177
178
179
180
181
    bpout.ret = HG_SUCCESS;
  }
  else {
    // e.g. put returns false if the key-value pair already
    // exists in the DB and duplicates are not allowed or ignored
    bpout.ret = HG_OTHER_ERROR;
  }
David Rich's avatar
David Rich committed
182

183
184
185
#ifdef KV_DEBUG
  std::cout << "bulk_put_handler sending response back with ret=" << bpout.ret << std::endl;
#endif
186
187
188
  et1 = ABT_get_wtime();
  std::cout << "bulk_put_handler time: " <<  (et1-st1)*1000000 << " microseconds" << std::endl;

189
190
  ret = margo_respond(handle, &bpout);
  assert(ret == HG_SUCCESS);
David Rich's avatar
David Rich committed
191

192
193
194
#ifdef KV_DEBUG
  std::cout << "bulk_put_handler performing cleanup" << std::endl;
#endif
195
196
197
  margo_free_input(handle, &bpin);
  margo_bulk_free(bulk_handle);
  margo_destroy(handle);
David Rich's avatar
David Rich committed
198
	
199
  return HG_SUCCESS;
David Rich's avatar
David Rich committed
200
201
202
}
DEFINE_MARGO_RPC_HANDLER(bulk_put_handler)

203
static hg_return_t get_handler(hg_handle_t handle)
204
{
205
  hg_return_t ret;
206
207
  get_in_t gin;
  get_out_t gout;
208
  double st1, et1;
209

210
  st1 = ABT_get_wtime();
211
  ret = margo_get_input(handle, &gin);
212
213
  assert(ret == HG_SUCCESS);

214
#ifdef KV_DEBUG
215
216
  std::cout << "get_handler processing key with key size " << gin.gi.ksize << std::endl;
  std::cout << "get_handler receive buffer size " << gin.gi.vsize << std::endl;
217
#endif
218
219
220
221
222
223
  ds_bulk_t kdata;
  kdata.resize(gin.gi.ksize);
  memcpy(kdata.data(), gin.gi.key, gin.gi.ksize);
  ds_bulk_t vdata;
  if (datastore->get(kdata, vdata)) {
    gout.go.vsize = vdata.size();
224
#ifdef KV_DEBUG
225
    std::cout << "get_handler datastore returned value with size " << gout.go.vsize << std::endl;
226
#endif
227
228
229
230
231
232
    if (gout.go.vsize <= gin.gi.vsize) {
      gout.go.value = (kv_data_t)malloc(gout.go.vsize);
      memcpy(gout.go.value, vdata.data(), gout.go.vsize);
      gout.go.ret = HG_SUCCESS;
    }
    else {
233
      gout.go.vsize = 0; // not returning a value
234
      gout.go.ret = HG_SIZE_ERROR; // caller should be checking return value
235
236
237
238
    }
  }
  else {
    // get on key did not succeed
239
240
    // rethink the use of HG_OTHER_ERROR here
    // maybe define a kv_return_t that is a superset of hg_return_t?
241
#ifdef KV_DEBUG
242
    std::cout << "get_handler datastore did not return a value" << std::endl;
243
#endif
244
    gout.go.vsize = 0; // not returning a value
245
    gout.go.ret = HG_OTHER_ERROR; // caller should be checking return value
246
  }
Rob Latham's avatar
Rob Latham committed
247

248
249
250
  et1 = ABT_get_wtime();
  std::cout << "get_handler time: " <<  (et1-st1)*1000000 << " microseconds" << std::endl;

251
  ret = margo_respond(handle, &gout);
252
  assert(ret == HG_SUCCESS);
253

254
  margo_free_input(handle, &gin);
255
  margo_destroy(handle);
256

257
  return HG_SUCCESS;
258
}
Rob Latham's avatar
Rob Latham committed
259
260
DEFINE_MARGO_RPC_HANDLER(get_handler)

261
static hg_return_t bulk_get_handler(hg_handle_t handle)
David Rich's avatar
David Rich committed
262
{
263
264
265
266
267
268
  hg_return_t ret;
  bulk_get_in_t bgin;
  bulk_get_out_t bgout;
  hg_bulk_t bulk_handle;
  const struct hg_info *hgi;
  margo_instance_id mid;
269
  double st1, et1;
270

271
  st1 = ABT_get_wtime();
272
273
274
  ret = margo_get_input(handle, &bgin);
  assert(ret == HG_SUCCESS);

275
#ifdef KV_DEBUG
276
277
  std::cout << "bulk_get_handler processing key with key size " << bgin.bulk.ksize << std::endl;
  std::cout << "bulk_get_handler receive buffer size " << bgin.bulk.vsize << std::endl;
278
#endif
279
280
281
282
283
  ds_bulk_t kdata;
  kdata.resize(bgin.bulk.ksize);
  memcpy(kdata.data(), bgin.bulk.key, bgin.bulk.ksize);
  ds_bulk_t vdata;
  if (datastore->get(kdata, vdata)) {
284
    // will the transfer fit on the client side?
285
286
287
288
289
    bgout.size = vdata.size();
#ifdef KV_DEBUG
    std::cout << "bulk_get_handler datastore returned value with size " << bgout.size << std::endl;
#endif
    if (bgout.size <= bgin.bulk.vsize) {
290
291
292
293
294
295
      /* get handle info and margo instance */
      hgi = margo_get_info(handle);
      assert(hgi);
      mid = margo_hg_info_get_instance(hgi);
      assert(mid != MARGO_INSTANCE_NULL);

296
297
298
299
300
301
#ifdef KV_DEBUG
      std::cout << "bulk_get_handler tranferring data with size " << bgout.size << std::endl;
#endif
      void *buffer = (void*)vdata.data();
      ret = margo_bulk_create(mid, 1, (void**)&buffer, &bgout.size, 
			      HG_BULK_READ_ONLY, &bulk_handle);
302
      assert(ret == HG_SUCCESS);
303
304
      ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, bgin.bulk.handle, 0,
				bulk_handle, 0, bgout.size);
305
306
307
308
309
      assert(ret == HG_SUCCESS);

      bgout.ret = HG_SUCCESS;
    }
    else {
310
311
      bgout.size = 0; // not returning a value
      bgout.ret = HG_SIZE_ERROR; // assuming caller will check return code
312
313
314
315
    }
  }
  else {
    // get on key did not find a value (return 0 for size)
316
317
    bgout.size = 0; // not returning a value
    bgout.ret = HG_OTHER_ERROR; // assuming caller will check return code
318
  }
David Rich's avatar
David Rich committed
319
	
320
321
322
323
#ifdef KV_DEBUG
  std::cout << "bulk_get_handler returning ret = " << bgout.ret
	    << ", anad size = " << bgout.size << std::endl;
#endif
324
325
326
  et1 = ABT_get_wtime();
  std::cout << "bulk_get_handler time: " <<  (et1-st1)*1000000 << " microseconds" << std::endl;

327
328
  ret = margo_respond(handle, &bgout);
  assert(ret == HG_SUCCESS);
329

330
331
332
  margo_free_input(handle, &bgin);
  margo_bulk_free(bulk_handle);
  margo_destroy(handle);
David Rich's avatar
David Rich committed
333
	
334
  return HG_SUCCESS;
David Rich's avatar
David Rich committed
335
336
}
DEFINE_MARGO_RPC_HANDLER(bulk_get_handler)
337

338
339
static void shutdown_handler(hg_handle_t handle)
{
340
341
  hg_return_t ret;
  margo_instance_id mid;
342

343
  std::cout << "SERVER: got RPC request to shutdown" << std::endl;
344

345
346
347
  /* get handle info and margo instance */
  mid = margo_hg_handle_get_instance(handle);
  assert(mid != MARGO_INSTANCE_NULL);
348

349
350
  ret = margo_respond(handle, NULL);
  assert(ret == HG_SUCCESS);
351

352
  margo_destroy(handle);
353

354
355
356
357
358
  /* NOTE: We assume that the server daemon is using
   * margo_wait_for_finalize() to suspend until this
   * RPC executes, so there is no need to send any 
   * extra signal to notify it.
   */
359
  ssg_finalize(); // ignore return and should be a no-op?
360
  margo_finalize(mid);
361

362
  std::cout << "SERVER: margo finalized" << std::endl;
363
364
365
}
DEFINE_MARGO_RPC_HANDLER(shutdown_handler)

366
367
368
369
/*
 * from BwTree tests:
 * RandomInsertSpeedTest() - Tests how fast it is to insert keys randomly
 */
Rob Latham's avatar
Rob Latham committed
370

371
static void RandomInsertSpeedTest(int32_t key_num, bench_result_t *results)
372
{
373
374
375
376
377
  //std::random_device r{};
  //std::default_random_engine e1(r());
  //std::uniform_int_distribution<int32_t> uniform_dist(0, key_num - 1);

  srand(time(NULL)); // initialize random seed
378

379
  BwTree<int32_t, int32_t> *t = new BwTree<int32_t, int32_t>();
380
381
382
383
384
385
386
387
  t->SetDebugLogging(0);
  t->UpdateThreadLocal(1);
  t->AssignGCID(0);

  std::chrono::time_point<std::chrono::system_clock> start, end;

  // We loop for keynum * 2 because in average half of the insertion
  // will hit an empty slot
388
  start = std::chrono::system_clock::now();
389
  for(int32_t i = 0;i < key_num * 2;i++) {
390
391
    //int32_t key = uniform_dist(e1);
    int32_t key = rand() % key_num;
392
393
394
395
396
397
398

    t->Insert(key, key);
  }
  end = std::chrono::system_clock::now();

  std::chrono::duration<double> elapsed_seconds = end - start;

399
  results->nkeys = (hg_size_t)key_num;
Rob Latham's avatar
Rob Latham committed
400
401
  results->insert_time = elapsed_seconds.count();

402
403
  std::cout << "BwTree: at least " << (key_num * 2.0 / (1000 * 1000)) / elapsed_seconds.count()
	    << " million random insertion/sec" << "\n";
404
405

  // Then test random read after random insert
406
  std::vector<int32_t> v{};
407
  v.reserve(1);
408
409

  start = std::chrono::system_clock::now();
410
  for(int32_t i = 0;i < key_num * 2;i++) {
411
412
    //int32_t key = uniform_dist(e1);
    int32_t key = rand() % key_num;
413
414
415
416
417
418
419

    t->GetValue(key, v);
    v.clear();
  }
  end = std::chrono::system_clock::now();

  elapsed_seconds = end - start;
Rob Latham's avatar
Rob Latham committed
420
  results->read_time = elapsed_seconds.count();
421

422
423
424
  std::cout << "BwTree: at least " << (key_num * 2.0 / (1000 * 1000)) / elapsed_seconds.count()
            << " million random read after random insert/sec" << "\n";

425
426
427
  // Measure the overhead

  start = std::chrono::system_clock::now();
428
  for(int32_t i = 0;i < key_num * 2;i++) {
429
430
    //int32_t key = uniform_dist(e1);
    int32_t key = rand() % key_num;
431
432
433
434
435
436
437
438
439

    v.push_back(key);
    v.clear();
  }
  end = std::chrono::system_clock::now();

  std::chrono::duration<double> overhead = end - start;

  std::cout << "    Overhead = " << overhead.count() << " seconds" << std::endl;
Rob Latham's avatar
Rob Latham committed
440
  results->overhead = overhead.count();
441

442
  delete t;
443
444

  return;
445
446
}

447
static hg_return_t bench_handler(hg_handle_t handle)
448
{
449
450
451
  hg_return_t ret = HG_SUCCESS;
  bench_in_t bench_in;
  bench_out_t bench_out;
452
  bench_result_t random_insert;
453

454
455
  ret = margo_get_input(handle, &bench_in);
  assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
456

457
458
  printf("benchmarking %d keys\n", bench_in.count);
  RandomInsertSpeedTest(bench_in.count, &random_insert);
459

460
461
462
463
464
465
  bench_out.result.nkeys = random_insert.nkeys*2;
  bench_out.result.insert_time = random_insert.insert_time;
  bench_out.result.read_time = random_insert.read_time;
  bench_out.result.overhead = random_insert.overhead;

  ret = margo_respond(handle, &bench_out);
466
  assert(ret == HG_SUCCESS);
467
468
469
470

  margo_free_input(handle, &bench_in);
  margo_destroy(handle);

471
  return HG_SUCCESS;
472
473
}
DEFINE_MARGO_RPC_HANDLER(bench_handler)
Rob Latham's avatar
Rob Latham committed
474
#endif
475

476
kv_context_t *kv_server_register(const margo_instance_id mid)
477
{
478
479
480
481
  hg_return_t ret;
  hg_addr_t addr_self;
  char addr_self_string[128];
  hg_size_t addr_self_string_sz = 128;
482
  kv_context_t *context = NULL;
483
484
	
  /* sds keyval server init */
485
486
  context = (kv_context_t*)malloc(sizeof(kv_context_t));
  memset(context, 0, sizeof(kv_context_t));
487
488

  context->mid = mid;
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507

    /* figure out what address this server is listening on */
    ret = margo_addr_self(context->mid, &addr_self);
    if(ret != HG_SUCCESS)
    {
	std::cerr << "Error: margo_addr_self()" << std::endl;
	margo_finalize(context->mid);
	return NULL;
    }
    ret = margo_addr_to_string(context->mid, addr_self_string,
	    &addr_self_string_sz, addr_self);
    if(ret != HG_SUCCESS)
    {
	std::cerr << "Error: margo_addr_to_string()" << std::endl;
	margo_finalize(context->mid);
	return NULL;
    }
    margo_addr_free(context->mid, addr_self);
    std::cout << "accepting RPCs on address " << std::string(addr_self_string) << std::endl;
508

509
510
    context->open_id = MARGO_REGISTER(context->mid, "open",
	    open_in_t, open_out_t, open_handler);
511

512
513
    context->close_id = MARGO_REGISTER(context->mid, "close",
	    void, close_out_t, close_handler);
514

515
516
    context->put_id = MARGO_REGISTER(context->mid, "put",
	    put_in_t, put_out_t, put_handler);
517

518
519
    context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
	    bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
520

521
522
    context->get_id = MARGO_REGISTER(context->mid, "get",
	    get_in_t, get_out_t, get_handler);
523

524
525
    context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
	    bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
526

527
528
    context->bench_id = MARGO_REGISTER(context->mid, "bench",
	    bench_in_t, bench_out_t, bench_handler);
529

530
531
    context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
	    void, void, shutdown_handler);
532

533
534
}

535
hg_return_t kv_server_wait_for_shutdown(kv_context_t *context) {
536
537
538
539
  margo_wait_for_finalize(context->mid);
  return HG_SUCCESS;
}

540
/* this is the same as client. should be moved to common utility library */
541
hg_return_t kv_server_deregister(kv_context_t *context) {
542
  free(context);
543
544
  delete datastore;
  std::cout << "SERVER: cleanup complete, deregistered" << std::endl;
545
  return HG_SUCCESS;
546
}