write-op-impl.c 11.3 KB
Newer Older
1 2 3 4 5 6 7
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdlib.h>
8
#include <string.h>
9
#include "mobject-store-config.h"
10
#include "utlist.h"
11
#include "libmobject-store.h"
12
#include "log.h"
13
#include "write-op-impl.h"
14 15 16 17 18 19
#include "completion.h"

mobject_store_write_op_t mobject_store_create_write_op(void)
{
	mobject_store_write_op_t write_op = 
		(mobject_store_write_op_t)calloc(1, sizeof(struct mobject_store_write_op));
20
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "Could not allocate write_op");
21 22 23
	write_op->actions     = (wr_action_base_t)0;
	write_op->bulk_handle = HG_BULK_NULL;
	write_op->num_actions = 0;
24
	write_op->ready       = 0;
25 26 27 28 29 30
	return write_op;
}

void mobject_store_release_write_op(mobject_store_write_op_t write_op)
{
	if(write_op == MOBJECT_WRITE_OP_NULL) return;
31 32 33

	if(write_op->bulk_handle != HG_BULK_NULL) 
		margo_bulk_free(write_op->bulk_handle);
34 35 36 37 38 39 40 41
	
	wr_action_base_t action, tmp;

	DL_FOREACH_SAFE(write_op->actions, action, tmp) {
		DL_DELETE(write_op->actions, action);
		free(action);
	}

42 43 44 45 46 47 48
	free(write_op);
}

void mobject_store_write_op_create(mobject_store_write_op_t write_op,
                                   int exclusive,
                                   const char* category)
{
49
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
50
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
51

52
	wr_action_create_t action = (wr_action_create_t)calloc(1, sizeof(*action));
53 54 55 56 57
	action->base.type         = WRITE_OPCODE_CREATE;
	action->exclusive         = exclusive;

	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
58 59

	write_op->num_actions += 1;
60 61 62 63 64 65 66
}

void mobject_store_write_op_write(mobject_store_write_op_t write_op,
                                  const char *buffer,
                                  size_t len,
                                  uint64_t offset)
{
67
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
68
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
69

70 71 72 73 74
	wr_action_write_t action  = (wr_action_write_t)calloc(1, sizeof(*action));
	action->base.type         = WRITE_OPCODE_WRITE;
	action->buffer.as_pointer = buffer;
	action->len               = len;
	action->offset            = offset;
75 76 77
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
78 79

	write_op->num_actions += 1;
80 81 82 83 84 85
}

void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
                                       const char *buffer,
                                       size_t len)
{
86
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
87
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
88 89 90
	
	wr_action_write_full_t action = (wr_action_write_full_t)calloc(1, sizeof(*action));
	action->base.type             = WRITE_OPCODE_WRITE_FULL;
91
	action->buffer.as_pointer     = buffer;
92 93
	action->len                   = len;

94
/* POTENTIAL OPTIMIZATION (INCOMPLETE)
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	// a write_full will replace the entire content of the object
	// so we can try and optimize by removing some operations that will
	// be overwritten anyway
	wr_action_base_t current, temp;
	DL_FOREACH_SAFE(write_op->actions, current, temp) {
		if(current->type == WRITE_OPCODE_WRITE
		|| current->type == WRITE_OPCODE_WRITE_FULL
		|| current->type == WRITE_OPCODE_WRITE_SAME
		|| current->type == WRITE_OPCODE_APPEND
		|| current->type == WRITE_OPCODE_TRUNCATE
		|| current->type == WRITE_OPCODE_ZERO) {
			DL_DELETE(write_op->actions, current);
			free(current);
		}
	}
110
*/
111

112 113
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
114 115

	write_op->num_actions += 1;
116 117 118 119 120 121 122 123
}

void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
                                      const char *buffer,
                                      size_t data_len,
                                      size_t write_len,
                                      uint64_t offset)
{
124
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
125
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
126 127 128

	wr_action_write_same_t action = (wr_action_write_same_t)calloc(1, sizeof(*action));
	action->base.type             = WRITE_OPCODE_WRITE_SAME;
129
	action->buffer.as_pointer     = buffer;
130 131 132
	action->data_len              = data_len;
	action->write_len             = write_len;
	action->offset                = offset;
133

134 135
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
136 137

	write_op->num_actions += 1;
138 139 140 141 142 143
}

void mobject_store_write_op_append(mobject_store_write_op_t write_op,
                                   const char *buffer,
                                   size_t len)
{
144
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
145
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
146

147 148
	wr_action_append_t action = (wr_action_append_t)calloc(1, sizeof(*action));
	action->base.type         = WRITE_OPCODE_APPEND;
149
	action->buffer.as_pointer = buffer;
150 151 152 153
	action->len               = len;

	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
154 155

	write_op->num_actions += 1;
156 157 158 159
}

void mobject_store_write_op_remove(mobject_store_write_op_t write_op)
{
160
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
161
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
162

163 164
	wr_action_remove_t action = (wr_action_remove_t)calloc(1, sizeof(*action));
	action->base.type         = WRITE_OPCODE_REMOVE;
165

166
/* THE FOLLOWING IS A POTENTIAL (INCOMPLETE) OPTIMIZATION
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
	// a remove operation will make all previous operations unecessary
	// so we can delete all previously posted operations (and potentially
	// not even post the remove)	
	wr_action_base_t current, temp;
	int do_not_even_post = 0;
	DL_FOREACH_SAFE(write_op->actions, current, temp) {
		if(current->type == WRITE_OPCODE_CREATE)
			do_not_even_post = 1;
		DL_DELETE(write_op->actions, current);
		free(current);
	}

	if(!do_not_even_post) {
		WRITE_ACTION_UPCAST(base, action);
		DL_APPEND(write_op->actions, base);
	}
183 184 185 186 187
*/
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);

	write_op->num_actions += 1;
188 189 190 191 192
}

void mobject_store_write_op_truncate(mobject_store_write_op_t write_op,
                                     uint64_t offset)
{
193
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
194
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
195

196 197
	wr_action_truncate_t action = (wr_action_truncate_t)calloc(1, sizeof(*action));
	action->base.type           = WRITE_OPCODE_TRUNCATE;
198
	action->offset              = offset;
199 200 201
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
202 203

	write_op->num_actions += 1;
204 205 206 207 208 209
}

void mobject_store_write_op_zero(mobject_store_write_op_t write_op,
                                 uint64_t offset,
                                 uint64_t len)
{
210
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
211
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
212 213 214 215 216 217 218 219

	wr_action_zero_t action = (wr_action_zero_t)calloc(1, sizeof(*action));
	action->base.type       = WRITE_OPCODE_ZERO;
	action->offset          = offset;
	action->len             = len;
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
220 221

	write_op->num_actions += 1;
222 223 224 225 226 227 228 229
}

void mobject_store_write_op_omap_set(mobject_store_write_op_t write_op,
                                     char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
230
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
231
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
232

233 234 235 236 237 238 239 240 241
	// compute the size required to embed the keys and values
	size_t i;
	size_t extra_size = sizeof(lens[0])*num;
	for(i = 0; i < num; i++) {
		extra_size += strlen(keys[i])+1;
		extra_size += lens[i];
	}

	wr_action_omap_set_t action = (wr_action_omap_set_t)calloc(1, sizeof(*action)-1+extra_size);
242 243
	action->base.type           = WRITE_OPCODE_OMAP_SET;
	action->num                 = num;
244
	action->data_size           = extra_size;
245

246 247 248 249 250 251 252 253 254 255 256 257 258
	char* data = action->data;
	for(i = 0; i < num; i++) {
		// serialize key
		strcpy(data, keys[i]);
		data += strlen(keys[i])+1;
		// serialize size of value
		memcpy(data, &lens[i], sizeof(lens[i]));
		data += sizeof(lens[i]);
		// serialize value
		memcpy(data, vals[i], lens[i]);
		data += lens[i];
	}

259 260
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
261 262

	write_op->num_actions += 1;
263 264 265 266 267 268
}

void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
                                         char const* const* keys,
                                         size_t keys_len)
{
269
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
270
	MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
271

272 273 274 275 276 277 278 279
	// find out the extra memory to allocate
	size_t i;
	size_t extra_mem = 0;
	for(i = 0; i < keys_len; i++) {
		extra_mem += strlen(keys[i])+1;
	}

	wr_action_omap_rm_keys_t action = (wr_action_omap_rm_keys_t)calloc(1, sizeof(*action)-1+extra_mem);
280
	action->base.type               = WRITE_OPCODE_OMAP_RM_KEYS;
281 282
	action->num_keys                = keys_len;
	action->data_size               = extra_mem;
283

284
	char* data = action->data;
285 286 287 288 289 290
	// serialize the keys
	for(i = 0; i < keys_len; i++) {
		strcpy(data, keys[i]);
		data += strlen(keys[i])+1;
	}

291 292
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
293 294

	write_op->num_actions += 1;
295 296 297 298 299 300 301 302
}

int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
                                   int flags)
{
303
	int r;
304
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
305 306 307 308 309 310 311 312 313 314
	mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
	r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
	MOBJECT_ASSERT(0 == r, "Could not create completion object");
	r = mobject_store_aio_write_op_operate(write_op, io, completion, oid, mtime, flags);
	MOBJECT_ASSERT(0 == r, "Call to mobject_store_aio_write_op_operate failed");
	r = mobject_store_aio_wait_for_complete(completion);
	MOBJECT_ASSERT(0 == r, "Could not wait for completion");
	int ret = mobject_store_aio_get_return_value(completion);
	mobject_store_aio_release(completion);
	return ret;
315 316 317 318 319 320 321 322 323
}

int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
                                       mobject_store_ioctx_t io,
                                       mobject_store_completion_t completion,
                                       const char *oid,
                                       time_t *mtime,
                                       int flags)
{
324
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
325
	// TODO
326
}