write-op.c 9.56 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5 6 7
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdlib.h>
8
#include <string.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
9
#include "mobject-store-config.h"
10
#include "utlist.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include "libmobject-store.h"
12
#include "log.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
13 14 15 16 17 18 19
#include "write-op.h"
#include "completion.h"

mobject_store_write_op_t mobject_store_create_write_op(void)
{
	mobject_store_write_op_t write_op = 
		(mobject_store_write_op_t)calloc(1, sizeof(struct mobject_store_write_op));
20 21
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "Could not allocate write_op");
	write_op->actions = (wr_action_base_t)0;
Matthieu Dorier's avatar
Matthieu Dorier committed
22 23 24 25 26 27
	return write_op;
}

void mobject_store_release_write_op(mobject_store_write_op_t write_op)
{
	if(write_op == MOBJECT_WRITE_OP_NULL) return;
28 29 30 31 32 33 34 35
	
	wr_action_base_t action, tmp;

	DL_FOREACH_SAFE(write_op->actions, action, tmp) {
		DL_DELETE(write_op->actions, action);
		free(action);
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
36 37 38 39 40 41 42
	free(write_op);
}

void mobject_store_write_op_create(mobject_store_write_op_t write_op,
                                   int exclusive,
                                   const char* category)
{
43 44
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");

45
	wr_action_create_t action = (wr_action_create_t)calloc(1, sizeof(*action));
46 47 48 49 50
	action->base.type         = WRITE_OPCODE_CREATE;
	action->exclusive         = exclusive;

	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
51 52 53 54 55 56 57
}

void mobject_store_write_op_write(mobject_store_write_op_t write_op,
                                  const char *buffer,
                                  size_t len,
                                  uint64_t offset)
{
58 59 60 61 62 63 64 65 66 67
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");

	wr_action_write_t action = (wr_action_write_t)calloc(1, sizeof(*action));
	action->base.type        = WRITE_OPCODE_WRITE;
	action->buffer           = buffer;
	action->len              = len;
	action->offset           = offset;
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
68 69 70 71 72 73
}

void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
                                       const char *buffer,
                                       size_t len)
{
74 75 76 77 78 79 80
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
	
	wr_action_write_full_t action = (wr_action_write_full_t)calloc(1, sizeof(*action));
	action->base.type             = WRITE_OPCODE_WRITE_FULL;
	action->buffer                = buffer;
	action->len                   = len;

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
	// a write_full will replace the entire content of the object
	// so we can try and optimize by removing some operations that will
	// be overwritten anyway
	wr_action_base_t current, temp;
	DL_FOREACH_SAFE(write_op->actions, current, temp) {
		if(current->type == WRITE_OPCODE_WRITE
		|| current->type == WRITE_OPCODE_WRITE_FULL
		|| current->type == WRITE_OPCODE_WRITE_SAME
		|| current->type == WRITE_OPCODE_APPEND
		|| current->type == WRITE_OPCODE_TRUNCATE
		|| current->type == WRITE_OPCODE_ZERO) {
			DL_DELETE(write_op->actions, current);
			free(current);
		}
	}

97 98
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
99 100 101 102 103 104 105 106
}

void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
                                      const char *buffer,
                                      size_t data_len,
                                      size_t write_len,
                                      uint64_t offset)
{
107 108 109 110 111 112 113 114
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");

	wr_action_write_same_t action = (wr_action_write_same_t)calloc(1, sizeof(*action));
	action->base.type             = WRITE_OPCODE_WRITE_SAME;
	action->buffer                = buffer;
	action->data_len              = data_len;
	action->write_len             = write_len;
	action->offset                = offset;
Matthieu Dorier's avatar
Matthieu Dorier committed
115

116 117
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
118 119 120 121 122 123
}

void mobject_store_write_op_append(mobject_store_write_op_t write_op,
                                   const char *buffer,
                                   size_t len)
{
124
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
Matthieu Dorier's avatar
Matthieu Dorier committed
125

126 127 128 129 130 131 132
	wr_action_append_t action = (wr_action_append_t)calloc(1, sizeof(*action));
	action->base.type         = WRITE_OPCODE_APPEND;
	action->buffer            = buffer;
	action->len               = len;

	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
133 134 135 136
}

void mobject_store_write_op_remove(mobject_store_write_op_t write_op)
{
137
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
Matthieu Dorier's avatar
Matthieu Dorier committed
138

139 140
	wr_action_remove_t action = (wr_action_remove_t)calloc(1, sizeof(*action));
	action->base.type         = WRITE_OPCODE_REMOVE;
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157

	// a remove operation will make all previous operations unecessary
	// so we can delete all previously posted operations (and potentially
	// not even post the remove)	
	wr_action_base_t current, temp;
	int do_not_even_post = 0;
	DL_FOREACH_SAFE(write_op->actions, current, temp) {
		if(current->type == WRITE_OPCODE_CREATE)
			do_not_even_post = 1;
		DL_DELETE(write_op->actions, current);
		free(current);
	}

	if(!do_not_even_post) {
		WRITE_ACTION_UPCAST(base, action);
		DL_APPEND(write_op->actions, base);
	}
Matthieu Dorier's avatar
Matthieu Dorier committed
158 159 160 161 162
}

void mobject_store_write_op_truncate(mobject_store_write_op_t write_op,
                                     uint64_t offset)
{
163
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
Matthieu Dorier's avatar
Matthieu Dorier committed
164

165 166 167 168 169
	wr_action_truncate_t action = (wr_action_truncate_t)calloc(1, sizeof(*action));
	action->base.type           = WRITE_OPCODE_TRUNCATE;
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
170 171 172 173 174 175
}

void mobject_store_write_op_zero(mobject_store_write_op_t write_op,
                                 uint64_t offset,
                                 uint64_t len)
{
176 177 178 179 180 181 182 183 184
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");

	wr_action_zero_t action = (wr_action_zero_t)calloc(1, sizeof(*action));
	action->base.type       = WRITE_OPCODE_ZERO;
	action->offset          = offset;
	action->len             = len;
	
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
185 186 187 188 189 190 191 192
}

void mobject_store_write_op_omap_set(mobject_store_write_op_t write_op,
                                     char const* const* keys,
                                     char const* const* vals,
                                     const size_t *lens,
                                     size_t num)
{
193 194
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");

195 196 197 198 199 200 201 202 203
	// compute the size required to embed the keys and values
	size_t i;
	size_t extra_size = sizeof(lens[0])*num;
	for(i = 0; i < num; i++) {
		extra_size += strlen(keys[i])+1;
		extra_size += lens[i];
	}

	wr_action_omap_set_t action = (wr_action_omap_set_t)calloc(1, sizeof(*action)-1+extra_size);
204 205
	action->base.type           = WRITE_OPCODE_OMAP_SET;
	action->num                 = num;
Matthieu Dorier's avatar
Matthieu Dorier committed
206

207 208 209 210 211 212 213 214 215 216 217 218 219
	char* data = action->data;
	for(i = 0; i < num; i++) {
		// serialize key
		strcpy(data, keys[i]);
		data += strlen(keys[i])+1;
		// serialize size of value
		memcpy(data, &lens[i], sizeof(lens[i]));
		data += sizeof(lens[i]);
		// serialize value
		memcpy(data, vals[i], lens[i]);
		data += lens[i];
	}

220 221
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
222 223 224 225 226 227
}

void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
                                         char const* const* keys,
                                         size_t keys_len)
{
228
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
Matthieu Dorier's avatar
Matthieu Dorier committed
229

230 231 232 233 234 235 236 237
	// find out the extra memory to allocate
	size_t i;
	size_t extra_mem = 0;
	for(i = 0; i < keys_len; i++) {
		extra_mem += strlen(keys[i])+1;
	}

	wr_action_omap_rm_keys_t action = (wr_action_omap_rm_keys_t)calloc(1, sizeof(*action)-1+extra_mem);
238 239 240
	action->base.type               = WRITE_OPCODE_OMAP_RM_KEYS;
	action->keys_len                = keys_len;

241 242 243 244 245 246 247
	char* data = action->keys;
	// serialize the keys
	for(i = 0; i < keys_len; i++) {
		strcpy(data, keys[i]);
		data += strlen(keys[i])+1;
	}

248 249
	WRITE_ACTION_UPCAST(base, action);
	DL_APPEND(write_op->actions, base);
Matthieu Dorier's avatar
Matthieu Dorier committed
250 251 252 253 254 255 256 257
}

int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
                                   int flags)
{
258
	int r;
259
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
260 261 262 263 264 265 266 267 268 269
	mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
	r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
	MOBJECT_ASSERT(0 == r, "Could not create completion object");
	r = mobject_store_aio_write_op_operate(write_op, io, completion, oid, mtime, flags);
	MOBJECT_ASSERT(0 == r, "Call to mobject_store_aio_write_op_operate failed");
	r = mobject_store_aio_wait_for_complete(completion);
	MOBJECT_ASSERT(0 == r, "Could not wait for completion");
	int ret = mobject_store_aio_get_return_value(completion);
	mobject_store_aio_release(completion);
	return ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
270 271 272 273 274 275 276 277 278
}

int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
                                       mobject_store_ioctx_t io,
                                       mobject_store_completion_t completion,
                                       const char *oid,
                                       time_t *mtime,
                                       int flags)
{
279
	MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
280
	// TODO
Matthieu Dorier's avatar
Matthieu Dorier committed
281
}