proc-read-actions.c 8.78 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include "proc-read-actions.h"
#include "args-read-actions.h"
#include "read-op-impl.h"
#include "utlist.h"
Rob Latham's avatar
Rob Latham committed
10
#include "log.h"
11 12 13 14 15 16 17 18 19 20 21 22 23

/**
 * This file contains the main hg_proc_mobject_store_read_op_t 
 * serialization function. It relies on helper functions to encode
 * and decode each possible write action. Encoding/decoding works
 * by creating an args_rd_action_* object and passing it the required
 * parameters, then serializing the structure along with potential
 * additional data.
 */

typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);

24 25 26
static hg_return_t encode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t action)
27 28 29 30
{
	return HG_SUCCESS;
}

31 32 33
static hg_return_t decode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t* action)
34 35 36 37 38 39
{
	hg_return_t ret = HG_SUCCESS;
	*action = (rd_action_stat_t)calloc(1, sizeof(**action));
	return ret;	
}

40 41 42
static hg_return_t encode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t action)
43 44 45 46
{
	args_rd_action_read a;
	a.offset      = action->offset;
	a.len         = action->len;
Matthieu Dorier's avatar
Matthieu Dorier committed
47
    a.bulk_offset = action->buffer.as_offset;
48 49 50 51
	*pos         += a.len;
	return hg_proc_memcpy(proc, &a, sizeof(a));
}

52 53 54
static hg_return_t decode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t* action)
55 56 57 58 59 60 61
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_read a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_read_t)calloc(1, sizeof(**action));
Matthieu Dorier's avatar
Matthieu Dorier committed
62 63 64 65
	(*action)->offset           = a.offset;
	(*action)->len              = a.len;
	(*action)->buffer.as_offset = a.bulk_offset;
	*pos                       += a.len;
66 67 68 69

	return ret;
}

70 71 72
static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t action)
73 74 75 76 77 78 79 80 81
{
	args_rd_action_omap_get_keys a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
82
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
83 84 85
	return ret;
}

86 87 88
static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t* action)
89 90 91 92 93 94 95 96 97 98 99
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;

100
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
101 102 103
	return ret;
}

104 105 106
static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t action)
107 108 109 110 111 112 113 114 115
{
	args_rd_action_omap_get_vals a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
116
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
117 118 119 120
	
	return ret;
}

121 122 123
static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t* action)
124 125 126 127 128 129 130 131 132 133 134 135 136
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;
	size_t s = strlen((*action)->start_after);
	(*action)->filter_prefix = (*action)->data + s + 1;

137 138
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);

139 140 141
	return ret;
}

142 143 144
static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t action)
145 146 147 148 149 150 151 152 153 154 155 156 157 158
{
	args_rd_action_omap_get_vals_by_keys a;
	a.num_keys   = action->num_keys;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
	
	return ret;
}

159 160 161
static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t* action)
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals_by_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->num_keys    = a.num_keys;
	(*action)->data_size   = a.data_size;

	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
	
	return ret;
}

/**
 * The following two arrays are here to avoid a big switch.
 */

/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(encode_fn)encode_read_action_stat,
	(encode_fn)encode_read_action_read,
	(encode_fn)encode_read_action_omap_get_keys,
	(encode_fn)encode_read_action_omap_get_vals,
	(encode_fn)encode_read_action_omap_get_vals_by_keys
};

/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(decode_fn)decode_read_action_stat,
	(decode_fn)decode_read_action_read,
	(decode_fn)decode_read_action_omap_get_keys,
	(decode_fn)decode_read_action_omap_get_vals,
	(decode_fn)decode_read_action_omap_get_vals_by_keys
};

/**
 * Serialization function for mobject_store_read_op_t objects.
 * For encoding, the object should be prepared first (that is, the union fields
 * pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
 */
206
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op)
207 208 209 210 211 212 213 214
{
	rd_action_base_t elem, tmp;
	hg_return_t ret = HG_SUCCESS;
	uintptr_t position = 0;

	switch(hg_proc_get_op(proc)) {

	case HG_ENCODE:
Matthieu Dorier's avatar
Matthieu Dorier committed
215

216 217
		MOBJECT_ASSERT((*read_op)->use_local_pointers == 0, 
			"Cannot encode a read_op before it has been prepared");
218
		// encode the bulk handle associated with the series of operations
219
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
220 221
		if(ret != HG_SUCCESS) return ret;
		// encode the number of actions
222 223
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions), 
								sizeof((*read_op)->num_actions));
224 225 226
		if(ret != HG_SUCCESS) return ret;

		// for each action ...
227
		DL_FOREACH((*read_op)->actions,elem) {
228
			read_op_code_t opcode = elem->type;
229 230
			MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
				"Invalid read_op opcode");
231 232 233 234 235 236 237 238 239 240
			// encode the type of action
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
			// encode the action's arguments
			ret = encode_read_action[opcode](proc, &position, elem);
			if(ret != HG_SUCCESS) return ret;
		}
		break;

	case HG_DECODE:
241 242 243
	
		*read_op = mobject_store_create_read_op();
		(*read_op)->use_local_pointers = 0;
244
		// decode the bulk handle
245
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
246 247
		if(ret != HG_SUCCESS) return ret;
		// decode the number of actions
248 249
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
								sizeof((*read_op)->num_actions));
250 251 252 253
		if(ret != HG_SUCCESS) return ret;

		rd_action_base_t next_action;
		size_t i;
254
		for(i = 0; i < (*read_op)->num_actions; i++) {
255 256 257 258
			// decode the current action's type
			read_op_code_t opcode;
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
259 260
			MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
				"Invalid write_op opcode");
261 262 263 264
			// decode the action's arguments
			ret = decode_read_action[opcode](proc, &position, &next_action);
			if(ret != HG_SUCCESS) return ret;
			// append to the list
265
			DL_APPEND((*read_op)->actions, next_action);
266 267 268 269
		}
		break;
	
	case HG_FREE:
270 271

		mobject_store_release_read_op(*read_op);
272 273 274 275 276
		return HG_SUCCESS;
	}

	return ret;
}