kv-client.c 7.86 KB
Newer Older
1
2
3
4
5
6
#include "sds-keyval.h"
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
#include <abt.h>

Rob Latham's avatar
Rob Latham committed
7
#include <assert.h>
8
9
10



David Rich's avatar
Fixups.    
David Rich committed
11
12
// pass in NULL pointer to get default behavior
kv_context *kv_client_register(char *addr_str) {
13
	hg_return_t ret;
14
15
16
	kv_context * context;
	context = malloc(sizeof(kv_context));

17
	/* client side: no custom xstreams */
18

David Rich's avatar
David Rich committed
19
20
21
22
23
24
25
26
	if (!addr_str) {
	  context->mid = margo_init("ofi+tcp://",
				    MARGO_CLIENT_MODE, 0, -1);
	}
	else {
	  context->mid = margo_init(addr_str,
				    MARGO_CLIENT_MODE, 0, -1);
	}
27

Rob Latham's avatar
Rob Latham committed
28
	context->put_id = MARGO_REGISTER(context->mid, "put",
29
					 put_in_t, put_out_t, NULL);
30

31
	context->put_id = MARGO_REGISTER(context->mid, "put",
32
					 put_in_t, put_out_t, NULL);
33
34
35
36

	context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
					      bulk_put_in_t, bulk_put_out_t, NULL);

Rob Latham's avatar
Rob Latham committed
37
	context->get_id = MARGO_REGISTER(context->mid, "get",
38
					 get_in_t, get_out_t, NULL);
39

40
41
42
	context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
					      bulk_get_in_t, bulk_get_out_t, NULL);

Rob Latham's avatar
Rob Latham committed
43
	context->open_id = MARGO_REGISTER(context->mid, "open",
44
					  open_in_t, open_out_t, NULL);
45

Rob Latham's avatar
Rob Latham committed
46
	context->close_id = MARGO_REGISTER(context->mid, "close",
47
					   close_in_t, close_out_t, NULL);
48

49
50
51
52
53
	context->bench_id= MARGO_REGISTER(context->mid, "bench",
					  bench_in_t, bench_out_t, NULL);

	context->shutdown_id= MARGO_REGISTER(context->mid, "shutdown",
					     void, void, NULL);
54
55
56
	return context;
}

David Rich's avatar
David Rich committed
57
hg_return_t kv_open(kv_context *context, char * server, char *name,
Rob Latham's avatar
Rob Latham committed
58
		kv_type keytype, kv_type valtype) {
59
	hg_return_t ret = HG_SUCCESS;
Rob Latham's avatar
Rob Latham committed
60
61
62
63
	hg_handle_t handle;
	open_in_t open_in;
	open_out_t open_out;

64
65
66
	ret = margo_addr_lookup(context->mid, server, &(context->svr_addr));
	assert(ret == HG_SUCCESS);

Rob Latham's avatar
Rob Latham committed
67
	ret = margo_create(context->mid, context->svr_addr,
68
			   context->open_id, &handle);
Rob Latham's avatar
Rob Latham committed
69
70
71
72
	assert(ret == HG_SUCCESS);

	open_in.name = name;

Rob Latham's avatar
Rob Latham committed
73
	ret = margo_forward(handle, &open_in);
Rob Latham's avatar
Rob Latham committed
74
	assert(ret == HG_SUCCESS);
75
	ret = margo_get_output(handle, &open_out);
Rob Latham's avatar
Rob Latham committed
76
	assert(ret == HG_SUCCESS);
77
	assert(open_out.ret == HG_SUCCESS);
78
79
80

	/* set up the other calls here: idea is we'll pay the registration cost
	 * once here but can reuse the handles and identifiers multiple times*/
81
82
83
	/* put/get handles: can have as many in flight as we have registered.
	 * BAKE has a handle-caching mechanism we should consult.
	 * should margo be caching handles? */
Rob Latham's avatar
Rob Latham committed
84
	ret = margo_create(context->mid, context->svr_addr,
85
			   context->put_id, &(context->put_handle) );
86
	assert(ret == HG_SUCCESS);
87
	ret = margo_create(context->mid, context->svr_addr,
88
			   context->bulk_put_id, &(context->bulk_put_handle) );
89
	assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
90
	ret = margo_create(context->mid, context->svr_addr,
91
			   context->get_id, &(context->get_handle) );
92
	assert(ret == HG_SUCCESS);
93
	ret = margo_create(context->mid, context->svr_addr,
94
			   context->bulk_get_id, &(context->bulk_get_handle) );
95
96
	assert(ret == HG_SUCCESS);
	ret = margo_create(context->mid, context->svr_addr,
97
			   context->bench_id, &(context->bench_handle) );
98
99
100
101
	assert(ret == HG_SUCCESS);
	ret = margo_create(context->mid, context->svr_addr,
			   context->shutdown_id, &(context->shutdown_handle) );
	assert(ret == HG_SUCCESS);
102
	
Rob Latham's avatar
Rob Latham committed
103
104
	margo_free_output(handle, &open_out);
	margo_destroy(handle);
105
106
	
	return HG_SUCCESS;
Rob Latham's avatar
Rob Latham committed
107
108
109
110
}

/* we gave types in the open call.  Will need to maintain in 'context' the
 * size. */
David Rich's avatar
David Rich committed
111
hg_return_t kv_put(kv_context *context, void *key, void *value) {
112
	hg_return_t ret;
Rob Latham's avatar
Rob Latham committed
113
114
115
116
117
	put_in_t put_in;
	put_out_t put_out;

	put_in.key = *(int*)key;
	put_in.value = *(int*)value;
Rob Latham's avatar
Rob Latham committed
118
	ret = margo_forward(context->put_handle, &put_in);
Rob Latham's avatar
Rob Latham committed
119
	assert(ret == HG_SUCCESS);
120
	ret = margo_get_output(context->put_handle, &put_out);
Rob Latham's avatar
Rob Latham committed
121
	assert(ret == HG_SUCCESS);
122
	assert(put_out.ret == HG_SUCCESS);
123
	margo_free_output(context->put_handle, &put_out);
Rob Latham's avatar
Rob Latham committed
124
	return ret;
Rob Latham's avatar
Rob Latham committed
125
126
}

David Rich's avatar
David Rich committed
127
hg_return_t kv_bulk_put(kv_context *context, void *key, void *data, uint64_t *data_size) {
128
	hg_return_t ret;
David Rich's avatar
David Rich committed
129
	bulk_put_in_t bpin;
David Rich's avatar
Fixups.    
David Rich committed
130
	bulk_put_out_t bpout;
David Rich's avatar
David Rich committed
131
132

	bpin.key = *(uint64_t*)key;
David Rich's avatar
David Rich committed
133
	bpin.size = *(uint64_t*)data_size;
David Rich's avatar
David Rich committed
134
	ret = margo_bulk_create(context->mid, 1, &data, data_size,
David Rich's avatar
David Rich committed
135
136
137
138
				HG_BULK_READ_ONLY, &bpin.bulk_handle);
	assert(ret == HG_SUCCESS);
	ret = margo_forward(context->bulk_put_handle, &bpin);
	assert(ret == HG_SUCCESS);
139
	ret = margo_get_output(context->bulk_put_handle, &bpout);
David Rich's avatar
David Rich committed
140
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
141
	assert(bpout.ret == HG_SUCCESS); // make sure the server side says all is OK
142
	margo_free_output(context->bulk_put_handle, &bpout);
David Rich's avatar
David Rich committed
143

David Rich's avatar
Fixups.    
David Rich committed
144
	return HG_SUCCESS;
David Rich's avatar
David Rich committed
145
146
}

David Rich's avatar
David Rich committed
147
hg_return_t kv_get(kv_context *context, void *key, void *value)
Rob Latham's avatar
Rob Latham committed
148
{
149
	hg_return_t ret;
Rob Latham's avatar
Rob Latham committed
150
151
152
153
	get_in_t get_in;
	get_out_t get_out;

	get_in.key = *(int*)key;
Rob Latham's avatar
Rob Latham committed
154
	ret = margo_forward(context->get_handle, &get_in);
Rob Latham's avatar
Rob Latham committed
155
	assert(ret == HG_SUCCESS);
156
	ret = margo_get_output(context->get_handle, &get_out);
Rob Latham's avatar
Rob Latham committed
157
	assert(ret == HG_SUCCESS);
158
159
	assert(get_out.ret == HG_SUCCESS);
	*(int*) value  = get_out.value;
160
	margo_free_output(context->get_handle, &get_out);
Rob Latham's avatar
Rob Latham committed
161
	return ret;
Rob Latham's avatar
Rob Latham committed
162
}
David Rich's avatar
David Rich committed
163

David Rich's avatar
David Rich committed
164
hg_return_t kv_bulk_get(kv_context *context, void *key, void *data, uint64_t *data_size)
David Rich's avatar
David Rich committed
165
{
166
	hg_return_t ret;
David Rich's avatar
David Rich committed
167
	bulk_get_in_t bgin;
David Rich's avatar
Fixups.    
David Rich committed
168
	bulk_get_out_t bgout;
David Rich's avatar
David Rich committed
169
170
	size_t bgsize;
	int32_t bgret;
David Rich's avatar
David Rich committed
171
172

	bgin.key = *(uint64_t*)key;
David Rich's avatar
David Rich committed
173
174
	bgin.size = *(uint64_t*)data_size;
	ret = margo_bulk_create(context->mid, 1, &data, data_size,
David Rich's avatar
David Rich committed
175
176
177
178
				HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
	assert(ret == HG_SUCCESS);
	ret = margo_forward(context->bulk_get_handle, &bgin);
	assert(ret == HG_SUCCESS);
179
	ret = margo_get_output(context->bulk_get_handle, &bgout);
David Rich's avatar
David Rich committed
180
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
181
	assert(bgout.ret == HG_SUCCESS); // make sure the server side says all is OK
David Rich's avatar
David Rich committed
182
	*data_size = (uint64_t)bgout.size; // report actual size of data transferred to caller
183
	margo_free_output(context->bulk_get_handle, &bgout);
David Rich's avatar
David Rich committed
184

185
	return HG_SUCCESS;
David Rich's avatar
David Rich committed
186
187
}

David Rich's avatar
David Rich committed
188
hg_return_t kv_close(kv_context *context)
Rob Latham's avatar
Rob Latham committed
189
{
190
	hg_return_t ret;
Rob Latham's avatar
Rob Latham committed
191
192
193
194
	hg_handle_t handle;
	put_in_t close_in;
	put_out_t close_out;

Rob Latham's avatar
Rob Latham committed
195
	ret = margo_create(context->mid, context->svr_addr,
196
			   context->close_id, &handle);
Rob Latham's avatar
Rob Latham committed
197
	assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
198
	ret = margo_forward(handle, &close_in);
Rob Latham's avatar
Rob Latham committed
199
	assert(ret == HG_SUCCESS);
200
	ret = margo_get_output(handle, &close_out);
Rob Latham's avatar
Rob Latham committed
201
	assert(ret == HG_SUCCESS);
202
203
	assert(close_out.ret == HG_SUCCESS);

204
205
206
	margo_free_output(handle, &close_out);
	margo_destroy(handle);

207
	return HG_SUCCESS;
Rob Latham's avatar
Rob Latham committed
208
}
209

Rob Latham's avatar
Rob Latham committed
210
bench_result *kv_benchmark(kv_context *context, int count) {
211
    hg_return_t ret;
212
213
214
    hg_handle_t handle;
    bench_in_t bench_in;
    bench_out_t bench_out;
Rob Latham's avatar
Rob Latham committed
215
216
217
218
219
220
    bench_result *result=NULL;

    context->bench_id= MARGO_REGISTER(context->mid, "bench",
			bench_in_t, bench_out_t, NULL);
    ret = margo_create(context->mid, context->svr_addr,
		context->bench_id, &(context->bench_handle) );
221
222

    bench_in.count = count;
Rob Latham's avatar
Rob Latham committed
223
    ret = margo_create(context->mid, context->svr_addr,
224
		       context->bench_id, &handle);
225
    assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
226
    ret = margo_forward(context->bench_handle, &bench_in);
227
    assert(ret == HG_SUCCESS);
228
229
230
231
    ret = margo_get_output(context->bench_handle, &bench_out);
    
    margo_free_output(handle, &bench_out);
    margo_destroy(handle);
232

Rob Latham's avatar
Rob Latham committed
233
234
235
236
237
238
239
240
241
242
243
    result = malloc(sizeof(bench_result));
    result->nkeys = bench_out.result.nkeys;
    result->insert_time = bench_out.result.insert_time;
    result->read_time = bench_out.result.read_time;

    margo_free_output(handle, &bench_out);
    //margo_destroy(handle);

    HG_Destroy(context->bench_handle);

    return result;
244
245
}

David Rich's avatar
David Rich committed
246
hg_return_t kv_client_deregister(kv_context *context) {
247
  hg_return_t ret;
248

249
250
251
252
253
254
  margo_destroy(context->put_handle);
  margo_destroy(context->get_handle);
  margo_destroy(context->bulk_put_handle);
  margo_destroy(context->bulk_get_handle);
  margo_destroy(context->bench_handle);
  margo_destroy(context->shutdown_handle);
255

256
257
  assert(ret == HG_SUCCESS);
  ret = margo_addr_free(context->mid, context->svr_addr);
258

259
260
  assert(ret == HG_SUCCESS);
  margo_finalize(context->mid);
261

262
263
264
  free(context);

  return HG_SUCCESS;
265
}
266

David Rich's avatar
David Rich committed
267
hg_return_t kv_client_signal_shutdown(kv_context *context) {
268
  hg_return_t ret;
269
270
271
272
273
274

  ret = margo_forward(context->shutdown_handle, NULL);
  assert(ret == HG_SUCCESS);

  return HG_SUCCESS;
}