Commit 0a437100 authored by Rob Latham's avatar Rob Latham
Browse files

Two-phase I/O with threaded write

Experimental async-with-pthread I/O approach to hiding some of the I/O
latency/variability from the two-phase collectives.

heavily modified from Paul Coffman's (pkcoffman@us.ibm.com) original work
parent d1e292ca
......@@ -88,6 +88,8 @@ extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
ADIO_Offset *end_offset_ptr, int
*contig_access_count_ptr);
void ADIOI_BG_ReadStridedColl(ADIO_File fd, void *buf, int count,
MPI_Datatype datatype, int file_ptr_type,
ADIO_Offset offset, ADIO_Status *status, int
......
......@@ -35,6 +35,7 @@ int bgmpio_tunegather;
int bgmpio_tuneblocking;
long bglocklessmpio_f_type;
int bgmpio_bg_nagg_pset;
int bgmpio_pthreadio;
double bgmpio_prof_cw [BGMPIO_CIO_LAST];
double bgmpio_prof_cr [BGMPIO_CIO_LAST];
......@@ -81,7 +82,17 @@ double bgmpio_prof_cr [BGMPIO_CIO_LAST];
* - any integer
* - Default is 8
*
* - BGMPIO_PTHREADIO - Enables a very simple form of asyncronous io where a
* pthread is spawned to do the posix writes while the main thread does the
* data aggregation - useful for large files where multiple rounds are
* required (more that the cb_buffer_size of data per aggregator). User
* must ensure there is hw resource available for the thread to run. I
* am sure there is a better way to do this involving comm threads - this is
* just a start. NOTE: For some reason the stats collected when this is
* enabled misses some of the data so the data sizes are off a bit - this is
* a statistical issue only, the data is still accurately written out
*/
void ad_bg_get_env_vars() {
char *x, *dummy;
......@@ -108,6 +119,12 @@ void ad_bg_get_env_vars() {
bgmpio_bg_nagg_pset = ADIOI_BG_NAGG_PSET_DFLT;
x = getenv("BGMPIO_NAGG_PSET");
if (x) bgmpio_bg_nagg_pset = atoi(x);
bgmpio_pthreadio = 0;
x = getenv( "BGMPIO_PTHREADIO" );
if (x) bgmpio_pthreadio = atoi(x);
}
/* report timing breakdown for MPI I/O collective call */
......
......@@ -65,6 +65,7 @@ extern int bgmpio_comm;
extern int bgmpio_tunegather;
extern int bgmpio_tuneblocking;
extern long bglocklessmpio_f_type;
extern int bgmpio_pthreadio;
/* Default is, well, kind of complicated. Blue Gene /L and /P had "psets": one
* i/o node and all compute nodes wired to it. On Blue Gene /Q that
......
......@@ -26,6 +26,8 @@
#include "mpe.h"
#endif
#include <pthread.h>
/* prototypes of functions used for collective writes only. */
static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
datatype, int nprocs, int myrank, ADIOI_Access
......@@ -428,7 +430,7 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
ADIO_Offset size=0;
int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
char *write_buf=NULL;
char *write_buf=NULL, *write_buf2=NULL;
int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
int *partial_recv, *sent_to_proc, *start_pos, flag;
int *send_buf_idx, *curr_to_proc, *done_to_proc;
......@@ -438,6 +440,9 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
int info_flag, coll_bufsize;
char *value;
static char myname[] = "ADIOI_EXCH_AND_WRITE";
pthread_t io_thread;
void *thread_ret;
ADIOI_IO_ThreadFuncData io_thread_args;
*error_code = MPI_SUCCESS; /* changed below if error */
/* only I/O errors are currently reported */
......@@ -452,6 +457,11 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
coll_bufsize = atoi(value);
ADIOI_Free(value);
if (bgmpio_pthreadio == 1){
/* ROMIO will spawn an additional thread. both threads use separate
* halves of the collective buffer*/
coll_bufsize = coll_bufsize/2;
}
for (i=0; i < nprocs; i++) {
if (others_req[i].count) {
......@@ -480,6 +490,9 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
fd->comm);
write_buf = fd->io_buf;
if (bgmpio_pthreadio == 1) {
write_buf2 = fd->io_buf + coll_bufsize;
}
curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
/* its use is explained below. calloc initializes to 0. */
......@@ -536,6 +549,9 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
done = 0;
off = st_loc;
if(bgmpio_pthreadio == 1)
io_thread = pthread_self();
#ifdef PROFILE
MPE_Log_event(14, 0, "end computation");
#endif
......@@ -658,14 +674,48 @@ static void ADIOI_Exch_and_write(ADIO_File fd, const void *buf, MPI_Datatype
if (flag) {
ADIOI_Assert(size == (int)size);
ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET,
off, &status, error_code);
if (*error_code != MPI_SUCCESS) return;
if (bgmpio_pthreadio == 1) {
/* there is no such thing as "invalid pthread identifier", so
* we'll use pthread_self() instead. Before we do I/O we want
* to complete I/O from any previous iteration -- but only a
* previous iteration that had I/O work to do (i.e. set 'flag')
*/
if(!pthread_equal(io_thread, pthread_self())) {
pthread_join(io_thread, &thread_ret);
*error_code = *(int *)thread_ret;
if (*error_code != MPI_SUCCESS) return;
io_thread = pthread_self();
}
io_thread_args.fd = fd;
/* do a little pointer shuffling: background I/O works from one
* buffer while two-phase machinery fills up another */
io_thread_args.buf = write_buf;
ADIOI_SWAP(write_buf, write_buf2, char*);
io_thread_args.io_kind = ADIOI_WRITE;
io_thread_args.size = size;
io_thread_args.offset = off;
io_thread_args.status = status;
io_thread_args.error_code = *error_code;
if ( (pthread_create(&io_thread, NULL,
ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
io_thread = pthread_self();
} else {
ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE,
ADIO_EXPLICIT_OFFSET, off, &status, error_code);
if (*error_code != MPI_SUCCESS) return;
}
}
off += size;
done += size;
}
if (bgmpio_pthreadio == 1) {
if ( !pthread_equal(io_thread, pthread_self()) ) {
pthread_join(io_thread, &thread_ret);
*error_code = *(int *)thread_ret;
}
}
for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
#ifdef PROFILE
......
......@@ -66,5 +66,6 @@ romio_other_sources += \
adio/common/status_setb.c \
adio/common/strfns.c \
adio/common/system_hints.c \
adio/common/hint_fns.c
adio/common/hint_fns.c \
adio/common/ad_threaded_io.c
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (C) 1997-2001 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*/
#include "adio.h"
#include "adio_extern.h"
/* Function for running in another thread for doing the file reading while the
* main thread is doing data aggregation - useful only when multiple rounds are
* needed due to file size relative to the read buffer size and number of
* aggregators */
void *ADIOI_IO_Thread_Func(void *vptr_args) {
ADIOI_IO_ThreadFuncData *args = (ADIOI_IO_ThreadFuncData*)vptr_args;
ADIOI_Assert(args->size == (int)(args->size));
if (args->io_kind == ADIOI_READ) {
ADIO_ReadContig(args->fd, args->buf, args->size, MPI_BYTE,
ADIO_EXPLICIT_OFFSET, args->offset,
&(args->status), &(args->error_code));
} else {
ADIO_WriteContig(args->fd, args->buf, args->size, MPI_BYTE,
ADIO_EXPLICIT_OFFSET, args->offset,
&(args->status), &(args->error_code));
}
pthread_exit(&(args->error_code));
return NULL;
}
......@@ -197,6 +197,9 @@ struct ADIOI_Fns_struct {
#define ADIOI_MIN(a, b) ((a) < (b) ? (a) : (b))
#define ADIOI_MAX(a, b) ((a) > (b) ? (a) : (b))
/* thanks stackoverflow:
* http://stackoverflow.com/questions/3982348/implement-generic-swap-macro-in-c */
#define ADIOI_SWAP(x, y, T) do { T temp##x##y = x; x = y; y = temp##x##y; } while (0);
#define ADIOI_PREALLOC_BUFSZ 16777216 /* buffer size used to
preallocate disk space */
......@@ -859,5 +862,23 @@ if (MPIR_Ext_dbg_romio_typical_enabled) fprintf
#define DBG_FPRINTF if (0) fprintf
#define DBGV_FPRINTF if (0) fprintf
#endif
/* declarations for threaded I/O */
/* i/o thread data structure (bgmpio_pthreadwc) */
typedef struct wcThreadFuncData {
ADIO_File fd;
int io_kind;
char *buf;
int size;
ADIO_Offset offset;
ADIO_Status status;
int error_code;
} ADIOI_IO_ThreadFuncData;
void *ADIOI_IO_Thread_Func(void *vptr_args);
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment