proc-read-actions.c 11.1 KB
Newer Older
1 2 3 4 5
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
6 7 8 9 10
#include "src/io-chain/proc-read-actions.h"
#include "src/io-chain/args-read-actions.h"
#include "src/io-chain/read-op-impl.h"
#include "src/util/utlist.h"
#include "src/util/log.h"
11 12 13 14 15 16 17 18 19 20 21 22 23

/**
 * This file contains the main hg_proc_mobject_store_read_op_t 
 * serialization function. It relies on helper functions to encode
 * and decode each possible write action. Encoding/decoding works
 * by creating an args_rd_action_* object and passing it the required
 * parameters, then serializing the structure along with potential
 * additional data.
 */

typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
static hg_return_t encode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t action);

static hg_return_t decode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t* action);

static hg_return_t encode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t action);

static hg_return_t decode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t* action);

static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t action);

static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t* action);

static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t action);

static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t* action);

static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t action);

static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t* action);

/**
 * The following two arrays are here to avoid a big switch.
 */

/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(encode_fn)encode_read_action_stat,
	(encode_fn)encode_read_action_read,
	(encode_fn)encode_read_action_omap_get_keys,
	(encode_fn)encode_read_action_omap_get_vals,
	(encode_fn)encode_read_action_omap_get_vals_by_keys
};

/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(decode_fn)decode_read_action_stat,
	(decode_fn)decode_read_action_read,
	(decode_fn)decode_read_action_omap_get_keys,
	(decode_fn)decode_read_action_omap_get_vals,
	(decode_fn)decode_read_action_omap_get_vals_by_keys
};

/**
 * Serialization function for mobject_store_read_op_t objects.
 * For encoding, the object should be prepared first (that is, the union fields
 * pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
 */
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op)
{
Rob Latham's avatar
Rob Latham committed
95
	rd_action_base_t elem;
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
	hg_return_t ret = HG_SUCCESS;
	uintptr_t position = 0;

	switch(hg_proc_get_op(proc)) {

	case HG_ENCODE:

		MOBJECT_ASSERT((*read_op)->ready, 
			"Cannot encode a read_op before it has been prepared");
		// encode the bulk handle associated with the series of operations
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
		if(ret != HG_SUCCESS) return ret;
		// encode the number of actions
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions), 
								sizeof((*read_op)->num_actions));
		if(ret != HG_SUCCESS) return ret;

		// for each action ...
		DL_FOREACH((*read_op)->actions,elem) {
			read_op_code_t opcode = elem->type;
116
			MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
117 118 119 120 121 122 123 124 125 126 127 128
				"Invalid read_op opcode");
			// encode the type of action
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
			// encode the action's arguments
			ret = encode_read_action[opcode](proc, &position, elem);
			if(ret != HG_SUCCESS) return ret;
		}
		break;

	case HG_DECODE:
	
129
		*read_op = create_read_op();
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
		(*read_op)->ready = 1;
		// decode the bulk handle
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
		if(ret != HG_SUCCESS) return ret;
		// decode the number of actions
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
								sizeof((*read_op)->num_actions));
		if(ret != HG_SUCCESS) return ret;

		rd_action_base_t next_action;
		size_t i;
		for(i = 0; i < (*read_op)->num_actions; i++) {
			// decode the current action's type
			read_op_code_t opcode;
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
146
			MOBJECT_ASSERT((opcode > 0 || opcode < _READ_OPCODE_END_ENUM_),
147 148 149 150
				"Invalid write_op opcode");
			// decode the action's arguments
			ret = decode_read_action[opcode](proc, &position, &next_action);
			if(ret != HG_SUCCESS) return ret;
151
			next_action->type = opcode;
152 153 154 155 156 157 158
			// append to the list
			DL_APPEND((*read_op)->actions, next_action);
		}
		break;
	
	case HG_FREE:

159
		release_read_op(*read_op);
160 161 162 163 164 165 166 167 168 169
		return HG_SUCCESS;
	}

	return ret;
}

////////////////////////////////////////////////////////////////////////////////
//                          STATIC FUNCTIONS BELOW                            //
////////////////////////////////////////////////////////////////////////////////

170 171 172
static hg_return_t encode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t action)
173 174 175 176
{
	return HG_SUCCESS;
}

177 178 179
static hg_return_t decode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t* action)
180 181 182 183 184 185
{
	hg_return_t ret = HG_SUCCESS;
	*action = (rd_action_stat_t)calloc(1, sizeof(**action));
	return ret;	
}

186 187 188
static hg_return_t encode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t action)
189 190 191 192
{
	args_rd_action_read a;
	a.offset      = action->offset;
	a.len         = action->len;
193
	a.bulk_offset = action->buffer.as_offset;
194 195 196 197
	*pos         += a.len;
	return hg_proc_memcpy(proc, &a, sizeof(a));
}

198 199 200
static hg_return_t decode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t* action)
201 202 203 204 205 206 207
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_read a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_read_t)calloc(1, sizeof(**action));
208 209 210 211
	(*action)->offset           = a.offset;
	(*action)->len              = a.len;
	(*action)->buffer.as_offset = a.bulk_offset;
	*pos                       += a.len;
212 213 214 215

	return ret;
}

216 217 218
static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t action)
219 220 221 222 223 224 225 226 227
{
	args_rd_action_omap_get_keys a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
228
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
229 230 231
	return ret;
}

232 233 234
static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t* action)
235 236 237 238 239 240 241 242 243 244 245
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;

246
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
247 248 249
	return ret;
}

250 251 252
static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t action)
253 254 255 256 257 258 259 260 261
{
	args_rd_action_omap_get_vals a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
262
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
263 264 265 266
	
	return ret;
}

267 268 269
static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t* action)
270 271 272 273 274 275 276 277 278 279 280
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;

281
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
282 283
	size_t s = strlen((*action)->start_after);
	(*action)->filter_prefix = (*action)->data + s + 1;
284

285 286 287
	return ret;
}

288 289 290
static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t action)
291 292 293 294 295 296 297 298 299 300 301 302 303 304
{
	args_rd_action_omap_get_vals_by_keys a;
	a.num_keys   = action->num_keys;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
	
	return ret;
}

305 306 307
static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t* action)
308 309 310 311 312 313 314 315 316 317 318 319 320 321
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals_by_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->num_keys    = a.num_keys;
	(*action)->data_size   = a.data_size;

	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
	
	return ret;
}