Commit 8aa2745a authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

fix up arguments handling

parent 73174fa5
......@@ -10,6 +10,8 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <float.h>
#include <errno.h>
#include <abt.h>
#include <abt-io.h>
......@@ -151,7 +153,7 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
{
ABT_thread *tid_array = NULL;
ABT_mutex mutex;
struct write_abt_arg arg;
struct write_abt_arg* args;
off_t next_offset = 0;
int ret;
double end;
......@@ -161,9 +163,12 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
ABT_xstream xstream;
ABT_pool pool;
abt_io_instance_id aid;
int fd;
void *buffer;
double start;
arg.fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!arg.fd)
fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!fd)
{
perror("open");
assert(0);
......@@ -195,41 +200,58 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
ABT_mutex_create(&mutex);
arg.mutex = &mutex;
arg.size = size;
arg.next_offset = &next_offset;
arg.duration = duration;
arg.aid = aid;
if(!buffer_per_thread)
if (buffer_per_thread)
buffer = NULL;
else
{
ret = posix_memalign(&arg.buffer, 4096, arg.size);
ret = posix_memalign(&buffer, 4096, size);
assert(ret == 0);
memset(arg.buffer, 0, arg.size);
memset(buffer, 0, size);
}
args = malloc(concurrency*sizeof(*args));
assert(args != NULL);
for (i = 0; i < concurrency; i++)
{
args[i].mutex = &mutex;
args[i].size = size;
args[i].next_offset = &next_offset;
args[i].duration = duration;
args[i].aid = aid;
args[i].fd = fd;
if (buffer == NULL)
{
ret = posix_memalign(&args[i].buffer, 4096, size);
assert(ret == 0);
memset(args[i].buffer, 0, size);
}
else
args[i].buffer = buffer;
}
else
arg.buffer = NULL;
arg.start_time = wtime();
for(i=0; i<concurrency; i++)
{
/* create ULTs */
ret = ABT_thread_create(pool, write_abt_bench, &arg, ABT_THREAD_ATTR_NULL, &tid_array[i]);
ret = ABT_thread_create(pool, write_abt_bench, &args[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
arg.start_time = wtime();
for(i=0; i<concurrency; i++)
ABT_thread_join(tid_array[i]);
end = wtime();
for(i=0; i<concurrency; i++)
ABT_thread_free(&tid_array[i]);
*seconds = end-arg.start_time;
/* compute min start time */
start = DBL_MAX;
for (i = 0; i < concurrency; i++)
start = args[i].start_time < start ? args[i].start_time : start;
*seconds = end-start;
*ops_done = next_offset/size;
abt_io_finalize(aid);
......@@ -245,10 +267,17 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
free(tid_array);
free(progress_xstreams);
if(!buffer_per_thread)
free(arg.buffer);
if(buffer_per_thread)
{
for (i = 0; i < concurrency; i++)
free(args[i].buffer);
}
else
free(buffer);
close(arg.fd);
free(args);
close(fd);
unlink(filename);
return;
......@@ -369,14 +398,17 @@ static void pthread_bench(int buffer_per_thread, unsigned int concurrency, size_
{
pthread_t *id_array = NULL;
pthread_mutex_t mutex;
struct write_pthread_arg arg;
struct write_pthread_arg* args;
off_t next_offset = 0;
int ret;
double end;
unsigned int i;
int fd;
void *buffer;
double start;
arg.fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!arg.fd)
fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!fd)
{
perror("open");
assert(0);
......@@ -387,25 +419,37 @@ static void pthread_bench(int buffer_per_thread, unsigned int concurrency, size_
pthread_mutex_init(&mutex, NULL);
arg.mutex = &mutex;
arg.size = size;
arg.next_offset = &next_offset;
arg.duration = duration;
if(!buffer_per_thread)
if (buffer_per_thread)
buffer = NULL;
else
{
ret = posix_memalign(&arg.buffer, 4096, arg.size);
ret = posix_memalign(&buffer, 4096, size);
assert(ret == 0);
memset(arg.buffer, 0, arg.size);
memset(buffer, 0, size);
}
else
arg.buffer = NULL;
arg.start_time = wtime();
args = malloc(concurrency * sizeof(*args));
assert(args != NULL);
for (i = 0; i < concurrency; i++) {
args[i].mutex = &mutex;
args[i].size = size;
args[i].next_offset = &next_offset;
args[i].duration = duration;
args[i].fd = fd;
if(buffer == NULL)
{
ret = posix_memalign(&args[i].buffer, 4096, size);
assert(ret == 0);
memset(args[i].buffer, 0, size);
}
else
args[i].buffer = buffer;
}
for(i=0; i<concurrency; i++)
{
ret = pthread_create(&id_array[i], NULL, write_pthread_bench, &arg);
ret = pthread_create(&id_array[i], NULL, write_pthread_bench, &args[i]);
assert(ret == 0);
}
......@@ -416,17 +460,28 @@ static void pthread_bench(int buffer_per_thread, unsigned int concurrency, size_
}
end = wtime();
*seconds = end-arg.start_time;
start = DBL_MAX;
for (i = 0; i < concurrency; i++)
start = args[i].start_time < start ? args[i].start_time : start;
*seconds = end-start;
*ops_done = next_offset/size;
pthread_mutex_destroy(&mutex);
free(id_array);
if(!buffer_per_thread)
free(arg.buffer);
if(buffer_per_thread)
{
for (i = 0; i < concurrency; i++)
free(args[i].buffer);
}
else
free(buffer);
free(args);
close(arg.fd);
close(fd);
unlink(filename);
return;
......@@ -437,18 +492,9 @@ static void write_abt_bench(void *_arg)
struct write_abt_arg* arg = _arg;
off_t my_offset;
size_t ret;
int do_free = 0;
if(!arg->buffer)
{
ret = posix_memalign(&arg->buffer, 4096, arg->size);
assert(ret == 0);
memset(arg->buffer, 0, arg->size);
do_free = 1;
}
double now = wtime();
while((now-arg->start_time) < arg->duration)
arg->start_time = wtime();
while((wtime()-arg->start_time) < arg->duration)
{
ABT_mutex_lock(*arg->mutex);
my_offset = *arg->next_offset;
......@@ -457,13 +503,8 @@ static void write_abt_bench(void *_arg)
ret = abt_io_pwrite(arg->aid, arg->fd, arg->buffer, arg->size, my_offset);
assert(ret == arg->size);
now = wtime();
}
if(do_free)
free(arg->buffer);
return;
}
......@@ -472,18 +513,9 @@ static void *write_pthread_bench(void *_arg)
struct write_pthread_arg* arg = _arg;
off_t my_offset;
size_t ret;
int do_free = 0;
if(!arg->buffer)
{
ret = posix_memalign(&arg->buffer, 4096, arg->size);
assert(ret == 0);
memset(arg->buffer, 0, arg->size);
do_free = 1;
}
double now = wtime();
while((now-arg->start_time) < arg->duration)
arg->start_time = wtime();
while((wtime()-arg->start_time) < arg->duration)
{
pthread_mutex_lock(arg->mutex);
my_offset = *arg->next_offset;
......@@ -492,12 +524,7 @@ static void *write_pthread_bench(void *_arg)
ret = pwrite(arg->fd, arg->buffer, arg->size, my_offset);
assert(ret == arg->size);
now = wtime();
}
if(do_free)
free(arg->buffer);
return(NULL);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment