kv-client.c 6.14 KB
Newer Older
1
2
3
4
5
6
#include "sds-keyval.h"
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
#include <abt.h>

Rob Latham's avatar
Rob Latham committed
7
#include <assert.h>
8
9
10



David Rich's avatar
Fixups.    
David Rich committed
11
kv_context *kv_client_register(char *addr_str=0) {
David Rich's avatar
David Rich committed
12
	int ret;
13
14
15
	kv_context * context;
	context = malloc(sizeof(kv_context));

16
	/* client side: no custom xstreams */
17

David Rich's avatar
David Rich committed
18
19
20
21
22
23
24
25
	if (!addr_str) {
	  context->mid = margo_init("ofi+tcp://",
				    MARGO_CLIENT_MODE, 0, -1);
	}
	else {
	  context->mid = margo_init(addr_str,
				    MARGO_CLIENT_MODE, 0, -1);
	}
26

Rob Latham's avatar
Rob Latham committed
27
	context->put_id = MARGO_REGISTER(context->mid, "put",
28
29
			put_in_t, put_out_t, NULL);

Rob Latham's avatar
Rob Latham committed
30
	context->get_id = MARGO_REGISTER(context->mid, "get",
31
32
			get_in_t, get_out_t, NULL);

Rob Latham's avatar
Rob Latham committed
33
	context->open_id = MARGO_REGISTER(context->mid, "open",
34
35
			open_in_t, open_out_t, NULL);

Rob Latham's avatar
Rob Latham committed
36
	context->close_id = MARGO_REGISTER(context->mid, "close",
37
			close_in_t, close_out_t, NULL);
38

39
40
41
	return context;
}

42
int kv_open(kv_context *context, char * server, char *name,
Rob Latham's avatar
Rob Latham committed
43
44
45
46
47
48
		kv_type keytype, kv_type valtype) {
	int ret = HG_SUCCESS;
	hg_handle_t handle;
	open_in_t open_in;
	open_out_t open_out;

49
50
51
	ret = margo_addr_lookup(context->mid, server, &(context->svr_addr));
	assert(ret == HG_SUCCESS);

Rob Latham's avatar
Rob Latham committed
52
	ret = margo_create(context->mid, context->svr_addr,
Rob Latham's avatar
Rob Latham committed
53
54
55
56
57
			context->open_id, &handle);
	assert(ret == HG_SUCCESS);

	open_in.name = name;

Rob Latham's avatar
Rob Latham committed
58
	ret = margo_forward(handle, &open_in);
Rob Latham's avatar
Rob Latham committed
59
60
61
62
	assert(ret == HG_SUCCESS);
	ret = HG_Get_output(handle, &open_out);
	assert(ret == HG_SUCCESS);
	ret = open_out.ret;
63
64
65

	/* set up the other calls here: idea is we'll pay the registration cost
	 * once here but can reuse the handles and identifiers multiple times*/
66
67
68
	/* put/get handles: can have as many in flight as we have registered.
	 * BAKE has a handle-caching mechanism we should consult.
	 * should margo be caching handles? */
Rob Latham's avatar
Rob Latham committed
69
	ret = margo_create(context->mid, context->svr_addr,
70
71
			context->put_id, &(context->put_handle) );
	assert(ret == HG_SUCCESS);
72
73
74
	ret = margo_create(context->mid, context->svr_addr,
			context->put_id, &(context->bulk_put_handle) );
	assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
75
	ret = margo_create(context->mid, context->svr_addr,
76
77
			context->get_id, &(context->get_handle) );
	assert(ret == HG_SUCCESS);
78
79
80
81
82
	ret = margo_create(context->mid, context->svr_addr,
			context->get_id, &(context->bulk_get_handle) );
	assert(ret == HG_SUCCESS);
	ret = margo_create(context->mid, context->svr_addr,
		context->bench_id, &(context->bench_handle) );
83

Rob Latham's avatar
Rob Latham committed
84
85
	margo_free_output(handle, &open_out);
	margo_destroy(handle);
Rob Latham's avatar
Rob Latham committed
86
87
88
89
90
91
	return ret;
}

/* we gave types in the open call.  Will need to maintain in 'context' the
 * size. */
int kv_put(kv_context *context, void *key, void *value) {
Rob Latham's avatar
Rob Latham committed
92
	int ret=0;
Rob Latham's avatar
Rob Latham committed
93
94
95
96
97
	put_in_t put_in;
	put_out_t put_out;

	put_in.key = *(int*)key;
	put_in.value = *(int*)value;
Rob Latham's avatar
Rob Latham committed
98
	ret = margo_forward(context->put_handle, &put_in);
Rob Latham's avatar
Rob Latham committed
99
	assert(ret == HG_SUCCESS);
100
	ret = HG_Get_output(context->put_handle, &put_out);
Rob Latham's avatar
Rob Latham committed
101
	assert(ret == HG_SUCCESS);
102
	HG_Free_output(context->put_handle, &put_out);
Rob Latham's avatar
Rob Latham committed
103
	return ret;
Rob Latham's avatar
Rob Latham committed
104
105
}

David Rich's avatar
Fixups.    
David Rich committed
106
int kv_bulk_put(kv_context *context, void *key, void *data, hg_size_t data_size) {
David Rich's avatar
David Rich committed
107
108
	int ret;
	bulk_put_in_t bpin;
David Rich's avatar
Fixups.    
David Rich committed
109
	bulk_put_out_t bpout;
David Rich's avatar
David Rich committed
110
111

	bpin.key = *(uint64_t*)key;
112
	bpin.size = data_size;
David Rich's avatar
Fixups.    
David Rich committed
113
	ret = margo_bulk_create(context->mid, 1, &data, &data_size,
David Rich's avatar
David Rich committed
114
115
116
117
				HG_BULK_READ_ONLY, &bpin.bulk_handle);
	assert(ret == HG_SUCCESS);
	ret = margo_forward(context->bulk_put_handle, &bpin);
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
118
	ret = HG_Get_output(context->bulk_put_handle, &bpout);
David Rich's avatar
David Rich committed
119
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
120
121
	assert(bpout.ret == HG_SUCCESS); // make sure the server side says all is OK
	HG_Free_output(context->bulk_put_handle, &bpout);
David Rich's avatar
David Rich committed
122

David Rich's avatar
Fixups.    
David Rich committed
123
	return HG_SUCCESS;
David Rich's avatar
David Rich committed
124
125
}

Rob Latham's avatar
Rob Latham committed
126
127
int kv_get(kv_context *context, void *key, void *value)
{
Rob Latham's avatar
Rob Latham committed
128
	int ret=0;
Rob Latham's avatar
Rob Latham committed
129
130
131
132
	get_in_t get_in;
	get_out_t get_out;

	get_in.key = *(int*)key;
Rob Latham's avatar
Rob Latham committed
133
	ret = margo_forward(context->get_handle, &get_in);
Rob Latham's avatar
Rob Latham committed
134
	assert(ret == HG_SUCCESS);
135
	ret = HG_Get_output(context->get_handle, &get_out);
Rob Latham's avatar
Rob Latham committed
136
137
	*(int*) value  = get_out.value;
	assert(ret == HG_SUCCESS);
138
	HG_Free_output(context->get_handle, &get_out);
Rob Latham's avatar
Rob Latham committed
139
	return ret;
Rob Latham's avatar
Rob Latham committed
140
}
David Rich's avatar
David Rich committed
141

David Rich's avatar
Fixups.    
David Rich committed
142
int kv_bulk_get(kv_context *context, void *key, void *data, hg_size_t data_size)
David Rich's avatar
David Rich committed
143
144
145
{
	int ret;
	bulk_get_in_t bgin;
David Rich's avatar
Fixups.    
David Rich committed
146
	bulk_get_out_t bgout;
David Rich's avatar
David Rich committed
147
148

	bgin.key = *(uint64_t*)key;
149
	bgin.size = data_size;
David Rich's avatar
Fixups.    
David Rich committed
150
	ret = margo_bulk_create(context->mid, 1, &data, &data_size,
David Rich's avatar
David Rich committed
151
152
153
154
				HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
	assert(ret == HG_SUCCESS);
	ret = margo_forward(context->bulk_get_handle, &bgin);
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
155
	ret = HG_Get_output(context->bulk_get_handle, &bgout);
David Rich's avatar
David Rich committed
156
	assert(ret == HG_SUCCESS);
David Rich's avatar
Fixups.    
David Rich committed
157
158
	assert(bgout.ret == HG_SUCCESS); // make sure the server side says all is OK
	HG_Free_output(context->get_handle, &bgout);
David Rich's avatar
David Rich committed
159

David Rich's avatar
Fixups.    
David Rich committed
160
	return HG_SUCCESS;
David Rich's avatar
David Rich committed
161
162
}

Rob Latham's avatar
Rob Latham committed
163
164
int kv_close(kv_context *context)
{
Rob Latham's avatar
Rob Latham committed
165
	int ret=0;
Rob Latham's avatar
Rob Latham committed
166
167
168
169
	hg_handle_t handle;
	put_in_t close_in;
	put_out_t close_out;

Rob Latham's avatar
Rob Latham committed
170
	ret = margo_create(context->mid, context->svr_addr,
Rob Latham's avatar
Rob Latham committed
171
172
			context->close_id, &handle);
	assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
173
	ret = margo_forward(handle, &close_in);
Rob Latham's avatar
Rob Latham committed
174
175
176
177
	assert(ret == HG_SUCCESS);
	ret = HG_Get_output(handle, &close_out);
	assert(ret == HG_SUCCESS);
	HG_Free_output(handle, &close_out);
178
179
180

	HG_Destroy(context->put_handle);
	HG_Destroy(context->get_handle);
Rob Latham's avatar
Rob Latham committed
181
	HG_Destroy(handle);
Rob Latham's avatar
Rob Latham committed
182
	return ret;
Rob Latham's avatar
Rob Latham committed
183
}
184

Rob Latham's avatar
Rob Latham committed
185
bench_result *kv_benchmark(kv_context *context, int count) {
186
187
188
189
    int ret;
    hg_handle_t handle;
    bench_in_t bench_in;
    bench_out_t bench_out;
Rob Latham's avatar
Rob Latham committed
190
191
192
193
194
195
    bench_result *result=NULL;

    context->bench_id= MARGO_REGISTER(context->mid, "bench",
			bench_in_t, bench_out_t, NULL);
    ret = margo_create(context->mid, context->svr_addr,
		context->bench_id, &(context->bench_handle) );
196
197

    bench_in.count = count;
Rob Latham's avatar
Rob Latham committed
198
    ret = margo_create(context->mid, context->svr_addr,
199
200
	    context->bench_id, &handle);
    assert(ret == HG_SUCCESS);
Rob Latham's avatar
Rob Latham committed
201
    ret = margo_forward(context->bench_handle, &bench_in);
202
203
204
    assert(ret == HG_SUCCESS);
    ret = HG_Get_output(context->bench_handle, &bench_out);

Rob Latham's avatar
Rob Latham committed
205
206
207
208
209
210
211
212
213
214
215
    result = malloc(sizeof(bench_result));
    result->nkeys = bench_out.result.nkeys;
    result->insert_time = bench_out.result.insert_time;
    result->read_time = bench_out.result.read_time;

    margo_free_output(handle, &bench_out);
    //margo_destroy(handle);

    HG_Destroy(context->bench_handle);

    return result;
216
217
}

218
int kv_client_deregister(kv_context *context) {
Rob Latham's avatar
Rob Latham committed
219
	margo_addr_free(context->mid, context->svr_addr);
220
221
222
223
	margo_finalize(context->mid);
	free(context);
	return 0;
}