proc-read-actions.c 8.76 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include "proc-read-actions.h"
#include "args-read-actions.h"
#include "read-op-impl.h"
#include "utlist.h"

/**
 * This file contains the main hg_proc_mobject_store_read_op_t 
 * serialization function. It relies on helper functions to encode
 * and decode each possible write action. Encoding/decoding works
 * by creating an args_rd_action_* object and passing it the required
 * parameters, then serializing the structure along with potential
 * additional data.
 */

typedef hg_return_t (*encode_fn)(hg_proc_t, uint64_t*, void*);
typedef hg_return_t (*decode_fn)(hg_proc_t, uint64_t*, void*);

23 24 25
static hg_return_t encode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t action)
26 27 28 29
{
	return HG_SUCCESS;
}

30 31 32
static hg_return_t decode_read_action_stat(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_stat_t* action)
33 34 35 36 37 38
{
	hg_return_t ret = HG_SUCCESS;
	*action = (rd_action_stat_t)calloc(1, sizeof(**action));
	return ret;	
}

39 40 41
static hg_return_t encode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t action)
42 43 44 45
{
	args_rd_action_read a;
	a.offset      = action->offset;
	a.len         = action->len;
Matthieu Dorier's avatar
Matthieu Dorier committed
46
    a.bulk_offset = action->buffer.as_offset;
47 48 49 50
	*pos         += a.len;
	return hg_proc_memcpy(proc, &a, sizeof(a));
}

51 52 53
static hg_return_t decode_read_action_read(hg_proc_t proc, 
                                           uint64_t* pos, 
                                           rd_action_read_t* action)
54 55 56 57 58 59 60
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_read a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_read_t)calloc(1, sizeof(**action));
Matthieu Dorier's avatar
Matthieu Dorier committed
61 62 63 64
	(*action)->offset           = a.offset;
	(*action)->len              = a.len;
	(*action)->buffer.as_offset = a.bulk_offset;
	*pos                       += a.len;
65 66 67 68

	return ret;
}

69 70 71
static hg_return_t encode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t action)
72 73 74 75 76 77 78 79 80
{
	args_rd_action_omap_get_keys a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
81
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
82 83 84
	return ret;
}

85 86 87
static hg_return_t decode_read_action_omap_get_keys(hg_proc_t proc, 
                                                    uint64_t* pos, 
                                                    rd_action_omap_get_keys_t* action)
88 89 90 91 92 93 94 95 96 97 98
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;

99
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
100 101 102
	return ret;
}

103 104 105
static hg_return_t encode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t action)
106 107 108 109 110 111 112 113 114
{
	args_rd_action_omap_get_vals a;
	a.max_return = action->max_return;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
115
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
116 117 118 119
	
	return ret;
}

120 121 122
static hg_return_t decode_read_action_omap_get_vals(hg_proc_t proc,
                                                    uint64_t* pos,
                                                    rd_action_omap_get_vals_t* action)
123 124 125 126 127 128 129 130 131 132 133 134 135
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->max_return  = a.max_return;
	(*action)->data_size   = a.data_size;
	(*action)->start_after = (*action)->data;
	size_t s = strlen((*action)->start_after);
	(*action)->filter_prefix = (*action)->data + s + 1;

136 137
	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);

138 139 140
	return ret;
}

141 142 143
static hg_return_t encode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t action)
144 145 146 147 148 149 150 151 152 153 154 155 156 157
{
	args_rd_action_omap_get_vals_by_keys a;
	a.num_keys   = action->num_keys;
	a.data_size  = action->data_size;
	
	hg_return_t ret;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;
	
	ret = hg_proc_memcpy(proc, action->data, action->data_size);
	
	return ret;
}

158 159 160
static hg_return_t decode_read_action_omap_get_vals_by_keys(hg_proc_t proc,
                                                            uint64_t* pos,
                                                            rd_action_omap_get_vals_by_keys_t* action)
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
{
	hg_return_t ret = HG_SUCCESS;
	args_rd_action_omap_get_vals_by_keys a;
	ret = hg_proc_memcpy(proc, &a, sizeof(a));
	if(ret != HG_SUCCESS) return ret;

	*action = (rd_action_omap_get_vals_by_keys_t)calloc(1, sizeof(**action)-1+a.data_size);
	(*action)->num_keys    = a.num_keys;
	(*action)->data_size   = a.data_size;

	ret = hg_proc_memcpy(proc, (*action)->data, (*action)->data_size);
	
	return ret;
}

/**
 * The following two arrays are here to avoid a big switch.
 */

/* encoding functions */
static encode_fn encode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(encode_fn)encode_read_action_stat,
	(encode_fn)encode_read_action_read,
	(encode_fn)encode_read_action_omap_get_keys,
	(encode_fn)encode_read_action_omap_get_vals,
	(encode_fn)encode_read_action_omap_get_vals_by_keys
};

/* decoding functions */
static decode_fn decode_read_action[_READ_OPCODE_END_ENUM_] = {
	NULL,
	(decode_fn)decode_read_action_stat,
	(decode_fn)decode_read_action_read,
	(decode_fn)decode_read_action_omap_get_keys,
	(decode_fn)decode_read_action_omap_get_vals,
	(decode_fn)decode_read_action_omap_get_vals_by_keys
};

/**
 * Serialization function for mobject_store_read_op_t objects.
 * For encoding, the object should be prepared first (that is, the union fields
 * pointing to either a buffer or an offset in a bulk should be an offset in a bulk).
 */
205
hg_return_t hg_proc_mobject_store_read_op_t(hg_proc_t proc, mobject_store_read_op_t* read_op)
206 207 208 209 210 211 212 213
{
	rd_action_base_t elem, tmp;
	hg_return_t ret = HG_SUCCESS;
	uintptr_t position = 0;

	switch(hg_proc_get_op(proc)) {

	case HG_ENCODE:
Matthieu Dorier's avatar
Matthieu Dorier committed
214

215 216
		MOBJECT_ASSERT((*read_op)->use_local_pointers == 0, 
			"Cannot encode a read_op before it has been prepared");
217
		// encode the bulk handle associated with the series of operations
218
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
219 220
		if(ret != HG_SUCCESS) return ret;
		// encode the number of actions
221 222
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions), 
								sizeof((*read_op)->num_actions));
223 224 225
		if(ret != HG_SUCCESS) return ret;

		// for each action ...
226
		DL_FOREACH((*read_op)->actions,elem) {
227
			read_op_code_t opcode = elem->type;
228 229
			MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
				"Invalid read_op opcode");
230 231 232 233 234 235 236 237 238 239
			// encode the type of action
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
			// encode the action's arguments
			ret = encode_read_action[opcode](proc, &position, elem);
			if(ret != HG_SUCCESS) return ret;
		}
		break;

	case HG_DECODE:
240 241 242
	
		*read_op = mobject_store_create_read_op();
		(*read_op)->use_local_pointers = 0;
243
		// decode the bulk handle
244
		ret = hg_proc_hg_bulk_t(proc, &((*read_op)->bulk_handle));
245 246
		if(ret != HG_SUCCESS) return ret;
		// decode the number of actions
247 248
		ret = hg_proc_memcpy(proc, &((*read_op)->num_actions),
								sizeof((*read_op)->num_actions));
249 250 251 252
		if(ret != HG_SUCCESS) return ret;

		rd_action_base_t next_action;
		size_t i;
253
		for(i = 0; i < (*read_op)->num_actions; i++) {
254 255 256 257
			// decode the current action's type
			read_op_code_t opcode;
			ret = hg_proc_memcpy(proc, &opcode, sizeof(opcode));
			if(ret != HG_SUCCESS) return ret;
258 259
			MOBJECT_ASSERT((opcode <= 0 || opcode >= _READ_OPCODE_END_ENUM_),
				"Invalid write_op opcode");
260 261 262 263
			// decode the action's arguments
			ret = decode_read_action[opcode](proc, &position, &next_action);
			if(ret != HG_SUCCESS) return ret;
			// append to the list
264
			DL_APPEND((*read_op)->actions, next_action);
265 266 267 268
		}
		break;
	
	case HG_FREE:
269 270

		mobject_store_release_read_op(*read_op);
271 272 273 274 275
		return HG_SUCCESS;
	}

	return ret;
}