Commit 96f35fd9 authored by Philip Carns's avatar Philip Carns
Browse files

fix memory leaks

- unrelated to the purpose of this branch, but necessary to get asan to
  pass while testing changes.  One of these is important and would have
  leaked a small amount of memory with every blocking abt_io call
parent 85807d78
---
Language: Cpp
# BasedOnStyle: LLVM
AccessModifierOffset: -2
AlignAfterOpenBracket: Align
AlignConsecutiveMacros: true
AlignConsecutiveAssignments: true
AlignConsecutiveBitFields: true
AlignConsecutiveDeclarations: true
AlignEscapedNewlines: Left
AlignOperands: AlignAfterOperator
AlignTrailingComments: true
AllowAllArgumentsOnNextLine: true
AllowAllConstructorInitializersOnNextLine: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortEnumsOnASingleLine: false
AllowShortBlocksOnASingleLine: Always
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: All
AllowShortLambdasOnASingleLine: All
AllowShortIfStatementsOnASingleLine: WithoutElse
AllowShortLoopsOnASingleLine: true
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: MultiLine
BinPackArguments: true
BinPackParameters: false
BraceWrapping:
AfterCaseLabel: false
AfterClass: false
AfterControlStatement: Never
AfterEnum: false
AfterFunction: true
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
AfterExternBlock: false
BeforeCatch: false
BeforeElse: false
BeforeLambdaBody: false
BeforeWhile: false
IndentBraces: false
SplitEmptyFunction: false
SplitEmptyRecord: false
SplitEmptyNamespace: true
BreakBeforeBinaryOperators: All
BreakBeforeBraces: Custom
BreakBeforeInheritanceComma: false
BreakInheritanceList: BeforeColon
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: false
BreakConstructorInitializers: BeforeColon
BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: false
ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: false
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
FixNamespaceComments: true
ForEachMacros:
- foreach
- Q_FOREACH
- BOOST_FOREACH
IncludeBlocks: Preserve
IncludeCategories:
- Regex: '^"(margo)/'
Priority: 2
SortPriority: 0
- Regex: '^(<|/)'
Priority: 3
SortPriority: 0
- Regex: '.*'
Priority: 1
SortPriority: 0
IncludeIsMainRegex: '(Test)?$'
IncludeIsMainSourceRegex: ''
IndentCaseLabels: false
IndentCaseBlocks: false
IndentGotoLabels: false
IndentPPDirectives: BeforeHash
IndentExternBlock: AfterExternBlock
IndentWidth: 4
IndentWrappedFunctionNames: false
InsertTrailingCommas: None
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBinPackProtocolList: Auto
ObjCBlockIndentWidth: 2
ObjCBreakBeforeNestedBlockParam: true
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: true
PenaltyBreakAssignment: 2
PenaltyBreakBeforeFirstCallParameter: 19
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyBreakTemplateDeclaration: 10
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
PointerAlignment: Left
ReflowComments: true
SortIncludes: false
SortUsingDeclarations: true
SpaceAfterCStyleCast: false
SpaceAfterLogicalNot: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeCpp11BracedList: false
SpaceBeforeCtorInitializerColon: true
SpaceBeforeInheritanceColon: true
SpaceBeforeParens: ControlStatements
SpaceBeforeRangeBasedForLoopColon: true
SpaceInEmptyBlock: false
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 1
SpacesInAngles: false
SpacesInConditionalStatement: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceBeforeSquareBrackets: false
Standard: Latest
StatementMacros:
- MARGO_REGISTER
- MARGO_REGISTER_PROVIDER
- DEFINE_MARGO_RPC_HANDLER
- DECLARE_MARGO_RPC_HANDLER
- MERCURY_GEN_PROC
TabWidth: 4
UseCRLF: false
UseTab: Never
WhitespaceSensitiveMacros:
- MERCURY_GEN_PROC
...
......@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.67])
AC_INIT([abt-io], [0.4], [],[],[])
AC_INIT([abt-io], [0.5], [],[],[])
AC_CONFIG_MACRO_DIRS([m4])
LT_INIT
......@@ -50,11 +50,11 @@ LIBS="$ARGOBOTS_LIBS $LIBS"
CPPFLAGS="$ARGOBOTS_CFLAGS $CPPFLAGS"
CFLAGS="$ARGOBOTS_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([MOCHICFG],[mochi-cfg],[],
[AC_MSG_ERROR([Could not find working mochi-cfg installation!])])
LIBS="$MOCHICFG_LIBS $LIBS"
CPPFLAGS="$MOCHICFG_CFLAGS $CPPFLAGS"
CFLAGS="$MOCHICFG_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([JSONC],[json-c],[],
[AC_MSG_ERROR([Could not find working json-c installation!])])
LIBS="$JSONC_LIBS $LIBS"
CPPFLAGS="$JSONC_CFLAGS $CPPFLAGS"
CFLAGS="$JSONC_CFLAGS -DMARGO_USE_JSON_C $CFLAGS"
AC_MSG_CHECKING([for fallocate])
AC_TRY_COMPILE([
......
#define _GNU_SOURCE
#define _GNU_SOURCE
#include <stdio.h>
#include <assert.h>
......@@ -18,54 +18,56 @@
#include <abt-io.h>
#ifndef HAVE_ODIRECT
#define O_DIRECT 0
#define O_DIRECT 0
#endif
struct worker_ult_common
{
int opt_io;
int opt_compute;
int opt_abt_io;
int opt_unit_size;
int opt_num_units;
struct worker_ult_common {
int opt_io;
int opt_compute;
int opt_abt_io;
int opt_unit_size;
int opt_num_units;
abt_io_instance_id aid;
ABT_cond cond;
ABT_mutex mutex;
int completed;
int inflight_threads;
ABT_cond cond;
ABT_mutex mutex;
int completed;
int inflight_threads;
};
struct worker_ult_arg
{
struct worker_ult_common *common;
void* buffer;
struct worker_ult_arg {
struct worker_ult_common* common;
void* buffer;
};
static void worker_ult(void *_arg);
static void worker_ult(void* _arg);
static double wtime(void);
static int ABT_xstream_create_helper(int num_xstreams, ABT_pool *newpool, ABT_xstream *newxstreams);
static int ABT_xstream_create_helper(int num_xstreams,
ABT_pool* newpool,
ABT_xstream* newxstreams);
int main(int argc, char **argv)
int main(int argc, char** argv)
{
int ret;
double seconds;
double end, start;
int i;
ABT_xstream *io_xstreams;
ABT_pool io_pool;
ABT_xstream *compute_xstreams;
ABT_pool compute_pool;
int io_es_count = -1;
int compute_es_count = -1;
struct worker_ult_arg *arg_array;
int ret;
double seconds;
double end, start;
int i;
ABT_xstream* io_xstreams;
ABT_pool io_pool;
ABT_xstream* compute_xstreams;
ABT_pool compute_pool;
int io_es_count = -1;
int compute_es_count = -1;
struct worker_ult_arg* arg_array;
struct worker_ult_common common;
ABT_sched self_sched;
ABT_xstream self_xstream;
if(argc != 9)
{
fprintf(stderr, "Usage: abt-io-overlap <compute> <io> <abt_io 0|1> <unit_size> <num_units> <compute_es_count> <io_es_count> <inflight_threads>\n");
return(-1);
ABT_sched self_sched;
ABT_xstream self_xstream;
if (argc != 9) {
fprintf(stderr,
"Usage: abt-io-overlap <compute> <io> <abt_io 0|1> <unit_size> "
"<num_units> <compute_es_count> <io_es_count> "
"<inflight_threads>\n");
return (-1);
}
ret = sscanf(argv[1], "%d", &common.opt_compute);
......@@ -98,23 +100,26 @@ int main(int argc, char **argv)
/* set caller (self) ES to sleep when idle by using SCHED_BASIC_WAIT */
ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 0, NULL,
ABT_SCHED_CONFIG_NULL, &self_sched);
ABT_SCHED_CONFIG_NULL, &self_sched);
assert(ret == ABT_SUCCESS);
ret = ABT_xstream_self(&self_xstream);
assert(ret == ABT_SUCCESS);
ret = ABT_xstream_set_main_sched(self_xstream, self_sched);
assert(ret == ABT_SUCCESS);
ret = ABT_xstream_create_helper(compute_es_count, &compute_pool, compute_xstreams);
ret = ABT_xstream_create_helper(compute_es_count, &compute_pool,
compute_xstreams);
assert(ret == 0);
if(common.opt_abt_io)
{
if (common.opt_abt_io) {
struct abt_io_init_info info = {0};
ret = ABT_xstream_create_helper(io_es_count, &io_pool, io_xstreams);
assert(ret == 0);
/* initialize abt_io */
common.aid = abt_io_init_pool(io_pool);
info.progress_pool = io_pool;
common.aid = abt_io_init_ext(&info);
assert(common.aid != NULL);
}
......@@ -122,11 +127,10 @@ int main(int argc, char **argv)
ABT_mutex_create(&common.mutex);
common.completed = 0;
arg_array = malloc(sizeof(*arg_array)*common.opt_num_units);
arg_array = malloc(sizeof(*arg_array) * common.opt_num_units);
assert(arg_array);
for(i=0; i<common.opt_num_units; i++)
{
for (i = 0; i < common.opt_num_units; i++) {
arg_array[i].common = &common;
ret = posix_memalign(&arg_array[i].buffer, 4096, common.opt_unit_size);
assert(ret == 0);
......@@ -134,96 +138,93 @@ int main(int argc, char **argv)
}
start = wtime();
for(i=0; i<common.opt_num_units; i++)
{
for (i = 0; i < common.opt_num_units; i++) {
ABT_mutex_lock(common.mutex);
while((i + 1 - common.completed) >= common.inflight_threads)
while ((i + 1 - common.completed) >= common.inflight_threads)
ABT_cond_wait(common.cond, common.mutex);
ABT_mutex_unlock(common.mutex);
/* create ULTs */
ret = ABT_thread_create(compute_pool, worker_ult, &arg_array[i], ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(compute_pool, worker_ult, &arg_array[i],
ABT_THREAD_ATTR_NULL, NULL);
assert(ret == 0);
}
ABT_mutex_lock(common.mutex);
while(common.completed < common.opt_num_units)
while (common.completed < common.opt_num_units)
ABT_cond_wait(common.cond, common.mutex);
ABT_mutex_unlock(common.mutex);
end = wtime();
seconds = end-start;
seconds = end - start;
/* wait on the compute ESs to complete */
for(i=0; i<compute_es_count; i++)
{
for (i = 0; i < compute_es_count; i++) {
ABT_xstream_join(compute_xstreams[i]);
ABT_xstream_free(&compute_xstreams[i]);
}
if(common.opt_abt_io)
{
if (common.opt_abt_io) {
abt_io_finalize(common.aid);
/* wait on IO ESs to complete */
for(i=0; i<io_es_count; i++)
{
for (i = 0; i < io_es_count; i++) {
ABT_xstream_join(io_xstreams[i]);
ABT_xstream_free(&io_xstreams[i]);
}
}
ABT_cond_free(&common.cond);
ABT_mutex_free(&common.mutex);
for(i=0; i<common.opt_num_units; i++)
free(arg_array[i].buffer);
for (i = 0; i < common.opt_num_units; i++) free(arg_array[i].buffer);
free(arg_array);
ABT_finalize();
free(io_xstreams);
free(compute_xstreams);
assert(common.opt_num_units == common.completed);
printf("#<opt_compute>\t<opt_io>\t<opt_abt_io>\t<opt_unit_size>\t<opt_num_units>\t<opt_compute_es_count>\t<opt_io_es_count>\t<time (s)>\t<bytes/s>\t<ops/s>\n");
printf("%d\t%d\t%d\t%d\t%d\t%d\t%d\t%f\t%f\t%f\n", common.opt_compute, common.opt_io, common.opt_abt_io,
common.opt_unit_size, common.opt_num_units, compute_es_count, io_es_count, seconds, ((double)common.opt_unit_size* (double)common.opt_num_units)/seconds, (double)common.opt_num_units/seconds);
return(0);
printf(
"#<opt_compute>\t<opt_io>\t<opt_abt_io>\t<opt_unit_size>\t<opt_num_"
"units>\t<opt_compute_es_count>\t<opt_io_es_count>\t<time "
"(s)>\t<bytes/s>\t<ops/s>\n");
printf("%d\t%d\t%d\t%d\t%d\t%d\t%d\t%f\t%f\t%f\n", common.opt_compute,
common.opt_io, common.opt_abt_io, common.opt_unit_size,
common.opt_num_units, compute_es_count, io_es_count, seconds,
((double)common.opt_unit_size * (double)common.opt_num_units)
/ seconds,
(double)common.opt_num_units / seconds);
return (0);
}
static void worker_ult(void *_arg)
static void worker_ult(void* _arg)
{
struct worker_ult_arg* arg = _arg;
struct worker_ult_common *common = arg->common;
void *buffer = arg->buffer;
size_t ret;
struct worker_ult_arg* arg = _arg;
struct worker_ult_common* common = arg->common;
void* buffer = arg->buffer;
size_t ret;
char template[256];
int fd;
if(common->opt_compute)
{
if (common->opt_compute) {
ret = RAND_bytes(buffer, common->opt_unit_size);
assert(ret == 1);
}
sprintf(template, "./data-XXXXXX");
if(common->opt_io)
{
if(common->opt_abt_io)
{
fd = abt_io_mkostemp(common->aid, template, O_DIRECT|O_SYNC);
if(fd < 0)
{
fprintf(stderr, "abt_io_mkostemp: %d\n", fd);
}
if (common->opt_io) {
if (common->opt_abt_io) {
fd = abt_io_mkostemp(common->aid, template, O_DIRECT | O_SYNC);
if (fd < 0) { fprintf(stderr, "abt_io_mkostemp: %d\n", fd); }
assert(fd >= 0);
ret = abt_io_pwrite(common->aid, fd, buffer, common->opt_unit_size, 0);
ret = abt_io_pwrite(common->aid, fd, buffer, common->opt_unit_size,
0);
assert(ret == common->opt_unit_size);
ret = abt_io_close(common->aid, fd);
......@@ -231,16 +232,13 @@ static void worker_ult(void *_arg)
ret = abt_io_unlink(common->aid, template);
assert(ret == 0);
}
else
{
} else {
#ifdef HAVE_MKOSTEMP
fd = mkostemp(template, O_DIRECT|O_SYNC);
fd = mkostemp(template, O_DIRECT | O_SYNC);
#else
fd = mkstemp(template);
#endif
if(fd < 0)
{
if (fd < 0) {
perror("mkostemp");
fprintf(stderr, "errno: %d\n", errno);
}
......@@ -269,28 +267,29 @@ static double wtime(void)
{
struct timeval t;
gettimeofday(&t, NULL);
return((double)t.tv_sec + (double)t.tv_usec / 1000000.0);
return ((double)t.tv_sec + (double)t.tv_usec / 1000000.0);
}
static int ABT_xstream_create_helper(int num_xstreams, ABT_pool *newpool, ABT_xstream *newxstreams)
static int ABT_xstream_create_helper(int num_xstreams,
ABT_pool* newpool,
ABT_xstream* newxstreams)
{
ABT_sched *scheds;
int i;
ABT_sched* scheds;
int i;
scheds = malloc(num_xstreams * sizeof(*scheds));
if(!scheds)
return(-1);
if (!scheds) return (-1);
/* Create a shared pool */
ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
ABT_TRUE, newpool);
ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE,
newpool);
/* create schedulers */
for (i = 0; i < num_xstreams; i++) {
ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, newpool,
ABT_SCHED_CONFIG_NULL, &scheds[i]);
ABT_SCHED_CONFIG_NULL, &scheds[i]);
}
/* create ESs */
for (i = 0; i < num_xstreams; i++) {
ABT_xstream_create(scheds[i], &newxstreams[i]);
......@@ -298,6 +297,6 @@ static int ABT_xstream_create_helper(int num_xstreams, ABT_pool *newpool, ABT_xs
free(scheds);
return(0);
return (0);
}
This diff is collapsed.
#define _GNU_SOURCE
#define _GNU_SOURCE
#include <stdio.h>
#include <assert.h>
......@@ -17,45 +17,44 @@
#include "abt-io-config.h"
#ifndef HAVE_ODIRECT
#define O_DIRECT 0
#define O_DIRECT 0
#endif
struct worker_pthread_common
{
int opt_io;
int opt_compute;
int opt_unit_size;
int opt_num_units;
pthread_cond_t cond;
struct worker_pthread_common {
int opt_io;
int opt_compute;
int opt_unit_size;
int opt_num_units;
pthread_cond_t cond;
pthread_mutex_t mutex;
int completed;
int inflight_threads;
int completed;
int inflight_threads;
};
struct worker_pthread_arg
{
struct worker_pthread_common *common;
void* buffer;
struct worker_pthread_arg {
struct worker_pthread_common* common;
void* buffer;
};
static void *worker_pthread(void *_arg);
static void* worker_pthread(void* _arg);
static double wtime(void);
int main(int argc, char **argv)
int main(int argc, char** argv)
{
int ret;
double seconds;
double end, start;
int i;
struct worker_pthread_arg *arg_array;
int ret;
double seconds;
double end, start;
int i;
struct worker_pthread_arg* arg_array;
struct worker_pthread_common common;
pthread_attr_t attr;
pthread_t tid;
if(argc != 6)
{
fprintf(stderr, "Usage: pthread-overlap <compute> <io> <unit_size> <num_units> <inflight_threads>\n");
return(-1);
pthread_attr_t attr;
pthread_t tid;
if (argc != 6) {
fprintf(stderr,
"Usage: pthread-overlap <compute> <io> <unit_size> <num_units> "
"<inflight_threads>\n");
return (-1);
}
ret = sscanf(argv[1], "%d", &common.opt_compute);
......@@ -74,11 +73,10 @@ int main(int argc, char **argv)
pthread_mutex_init(&common.mutex, NULL);
common.completed = 0;
arg_array = malloc(sizeof(*arg_array)*common.opt_num_units);
arg_array = malloc(sizeof(*arg_array) * common.opt_num_units);
assert(arg_array);
for(i=0; i<common.opt_num_units; i++)
{
for (i = 0; i < common.opt_num_units; i++) {
arg_array[i].common = &common;
ret = posix_memalign(&arg_array[i].buffer, 4096, common.opt_unit_size);
assert(ret == 0);
......@@ -90,10 +88,9 @@ int main(int argc, char **argv)
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(i=0; i<common.opt_num_units; i++)
{
for (i = 0; i < common.opt_num_units; i++) {
pthread_mutex_lock(&common.mutex);
while((i + 1 - common.completed) >= common.inflight_threads)
while ((i + 1 - common.completed) >= common.inflight_threads)
pthread_cond_wait(&common.cond, &common.mutex);
pthread_mutex_unlock(&common.mutex);
......@@ -103,55 +100,56 @@ int main(int argc, char **argv)