Commit f9535643 authored by Sangmin Seo's avatar Sangmin Seo Committed by Rob Latham
Browse files

Add more nonblocking collective I/O tests.



Ported test programs using collective I/O in the ROMIO test directory
to the nonblocking collective I/O version. They were temporarily added
to the MPICH test directory to run with Jenkins and nightly tests.
However, they may need to be moved to the ROMIO test directory later.
Signed-off-by: Rob Latham's avatarRob Latham <robl@mcs.anl.gov>
parent 96d8f4e9
......@@ -28,11 +28,19 @@ noinst_PROGRAMS = \
hindexed_io
if BUILD_MPIX_TESTS
noinst_PROGRAMS += \
i_bigtype \
i_hindexed_io \
i_rdwrord \
i_setviewcur
noinst_PROGRAMS += \
i_bigtype \
i_hindexed_io \
i_rdwrord \
i_setviewcur \
i_aggregation1 \
i_aggregation2 \
i_coll_test \
i_darray_read \
i_hindexed \
i_noncontig_coll \
i_noncontig_coll2 \
i_types_with_zeros
endif
clean-local:
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
/* Test case from John Bent (ROMIO req #835)
* Aggregation code was not handling certain access patterns when collective
* buffering forced */
/* Uses nonblocking collective I/O.*/
#include <unistd.h>
#include <stdlib.h>
#include <mpi.h>
#include <stdio.h>
#include <string.h>
#define NUM_OBJS 4
#define OBJ_SIZE 1048576
extern char *optarg;
extern int optind, opterr, optopt;
char *prog = NULL;
int debug = 0;
static void Usage(int line)
{
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
fprintf(stderr,
"Usage (line %d): %s [-d] [-h] -f filename\n"
"\t-d for debugging\n"
"\t-h to turn on the hints to force collective aggregation\n",
line, prog);
}
exit(0);
}
static void fatal_error(int mpi_ret, MPI_Status *mpi_stat, const char *msg)
{
fprintf(stderr, "Fatal error %s: %d\n", msg, mpi_ret);
MPI_Abort(MPI_COMM_WORLD, -1);
}
static void print_hints(int rank, MPI_File *mfh)
{
MPI_Info info;
int nkeys;
int i, dummy_int;
char key[1024];
char value[1024];
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) {
MPI_File_get_info(*mfh, &info);
MPI_Info_get_nkeys(info, &nkeys);
printf("HINTS:\n");
for (i = 0; i < nkeys; i++) {
MPI_Info_get_nthkey(info, i, key);
printf("%35s -> ", key);
MPI_Info_get(info, key, 1024, value, &dummy_int);
printf("%s\n", value);
}
MPI_Info_free(&info);
}
MPI_Barrier(MPI_COMM_WORLD);
}
static void fill_buffer(char *buffer, int bufsize, int rank, MPI_Offset offset)
{
memset((void *)buffer, 0, bufsize);
snprintf(buffer, bufsize, "Hello from %d at %lld\n", rank, offset);
}
static MPI_Offset get_offset(int rank, int num_objs, int obj_size, int which_obj)
{
MPI_Offset offset;
offset = (MPI_Offset)rank * num_objs * obj_size + which_obj * obj_size;
return offset;
}
static void write_file(char *target, int rank, MPI_Info *info)
{
MPI_File wfh;
MPI_Request *request;
MPI_Status *mpi_stat;
int mpi_ret;
int i;
char **buffer;
request = (MPI_Request *)malloc(NUM_OBJS * sizeof(MPI_Request));
mpi_stat = (MPI_Status *)malloc(NUM_OBJS * sizeof(MPI_Status));
buffer = (char **)malloc(NUM_OBJS * sizeof(char *));
if (debug) printf("%d writing file %s\n", rank, target);
if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target,
MPI_MODE_WRONLY | MPI_MODE_CREATE,
*info, &wfh))
!= MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "open for write");
}
/* nonblocking collective write */
for (i = 0; i < NUM_OBJS; i++) {
MPI_Offset offset = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
buffer[i] = (char *)malloc(OBJ_SIZE);
fill_buffer(buffer[i], OBJ_SIZE, rank, offset);
if (debug) printf("%s", buffer[i]);
if ((mpi_ret = MPIX_File_iwrite_at_all(wfh, offset, buffer[i], OBJ_SIZE,
MPI_CHAR, &request[i]))
!= MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "write");
}
}
if (debug) print_hints(rank, &wfh);
MPI_Waitall(NUM_OBJS, request, mpi_stat);
if ((mpi_ret = MPI_File_close(&wfh)) != MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "close for write");
}
if (debug) printf("%d wrote file %s\n", rank, target);
for (i = 0; i < NUM_OBJS; i++) free(buffer[i]);
free(buffer);
free(mpi_stat);
free(request);
}
static int reduce_corruptions(int corrupt_blocks)
{
int mpi_ret;
int sum;
if ((mpi_ret = MPI_Reduce(&corrupt_blocks, &sum, 1, MPI_INT, MPI_SUM, 0,
MPI_COMM_WORLD)) != MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "MPI_Reduce");
}
return sum;
}
static void read_file(char *target, int rank, MPI_Info *info, int *corrupt_blocks)
{
MPI_File rfh;
MPI_Offset *offset;
MPI_Request *request;
MPI_Status *mpi_stat;
int mpi_ret;
int i;
char **buffer;
char **verify_buf = NULL;
offset = (MPI_Offset *)malloc(NUM_OBJS * sizeof(MPI_Offset));
request = (MPI_Request *)malloc(NUM_OBJS * sizeof(MPI_Request));
mpi_stat = (MPI_Status *)malloc(NUM_OBJS * sizeof(MPI_Status));
buffer = (char **)malloc(NUM_OBJS * sizeof(char *));
verify_buf = (char **)malloc(NUM_OBJS * sizeof(char *));
if (debug) printf("%d reading file %s\n", rank, target);
if ((mpi_ret = MPI_File_open(MPI_COMM_WORLD, target, MPI_MODE_RDONLY,
*info, &rfh)) != MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "open for read");
}
/* nonblocking collective read */
for (i = 0; i < NUM_OBJS; i++) {
offset[i] = get_offset(rank, NUM_OBJS, OBJ_SIZE, i);
buffer[i] = (char *)malloc(OBJ_SIZE);
verify_buf[i] = (char *)malloc(OBJ_SIZE);
fill_buffer(verify_buf[i], OBJ_SIZE, rank, offset[i]);
if (debug) printf("Expecting %s", verify_buf[i]);
if ((mpi_ret = MPIX_File_iread_at_all(rfh, offset[i], buffer[i],
OBJ_SIZE, MPI_CHAR, &request[i]))
!= MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "read");
}
}
MPI_Waitall(NUM_OBJS, request, mpi_stat);
/* verification */
for (i = 0; i < NUM_OBJS; i++) {
if (memcmp(verify_buf[i], buffer[i], OBJ_SIZE) != 0) {
(*corrupt_blocks)++;
printf("Corruption at %lld\n", offset[i]);
if (debug) {
printf("\tExpecting %s\n" "\tRecieved %s\n",
verify_buf[i], buffer[i]);
}
}
}
if ((mpi_ret = MPI_File_close(&rfh)) != MPI_SUCCESS) {
fatal_error(mpi_ret, NULL, "close for read");
}
for (i = 0; i < NUM_OBJS; i++) {
free(verify_buf[i]);
free(buffer[i]);
}
free(verify_buf);
free(buffer);
free(mpi_stat);
free(request);
free(offset);
}
static void set_hints(MPI_Info *info)
{
MPI_Info_set(*info, "romio_cb_write", "enable");
MPI_Info_set(*info, "romio_no_indep_rw", "1");
MPI_Info_set(*info, "cb_nodes", "1");
MPI_Info_set(*info, "cb_buffer_size", "4194304");
}
/*
void
set_hints(MPI_Info *info, char *hints) {
char *delimiter = " ";
char *hints_cp = strdup(hints);
char *key = strtok(hints_cp, delimiter);
char *val;
while (key) {
val = strtok(NULL, delimiter);
if (debug) printf("HINT: %s = %s\n", key, val);
if (! val) {
Usage(__LINE__);
}
MPI_Info_set(*info, key, val);
key = strtok(NULL, delimiter);
}
free(hints_cp);
}
*/
int main(int argc, char *argv[])
{
int nproc = 1, rank = 0;
char *target = NULL;
int c;
MPI_Info info;
int mpi_ret;
int corrupt_blocks = 0;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if ((mpi_ret = MPI_Info_create(&info)) != MPI_SUCCESS) {
if (rank == 0) fatal_error(mpi_ret, NULL, "MPI_info_create.\n");
}
prog = strdup(argv[0]);
if (argc > 1) {
while ((c = getopt(argc, argv, "df:h")) != EOF) {
switch (c) {
case 'd':
debug = 1;
break;
case 'f':
target = strdup(optarg);
break;
case 'h':
set_hints(&info);
break;
default:
Usage(__LINE__);
}
}
if (!target) {
Usage(__LINE__);
}
} else {
target = "testfile";
set_hints(&info);
}
write_file(target, rank, &info);
read_file(target, rank, &info, &corrupt_blocks);
corrupt_blocks = reduce_corruptions(corrupt_blocks);
if (rank == 0) {
if (corrupt_blocks == 0) {
fprintf(stdout, " No Errors\n");
}
else {
fprintf(stdout, "%d/%d blocks corrupt\n", corrupt_blocks,
nproc * NUM_OBJS);
}
}
MPI_Info_free(&info);
MPI_Finalize();
free(prog);
exit(0);
}
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
/* Look for regressions in aggregator code. A more simple access pattern than
* aggregation1 */
/* Uses nonblocking collective I/O.*/
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#define BUFSIZE 512
static void handle_error(int errcode, const char *str)
{
char msg[MPI_MAX_ERROR_STRING];
int resultlen;
MPI_Error_string(errcode, msg, &resultlen);
fprintf(stderr, "%s: %s\n", str, msg);
MPI_Abort(MPI_COMM_WORLD, 1);
}
int main(int argc, char **argv)
{
MPI_Info info = MPI_INFO_NULL;
MPI_File fh;
MPI_Offset off = 0;
MPI_Status status;
int errcode;
int i, rank, errs = 0, toterrs, buffer[BUFSIZE], buf2[BUFSIZE];
MPI_Request request;
char *filename = NULL;
filename = (argc > 1) ? argv[1] : "testfile";
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Info_create(&info);
MPI_Info_set(info, "romio_cb_write", "enable");
MPI_Info_set(info, "cb_nodes", "1");
for (i = 0; i < BUFSIZE; i++) {
buffer[i] = 10000 + rank;
}
off = rank * sizeof(buffer);
errcode = MPI_File_open(MPI_COMM_WORLD, filename,
MPI_MODE_WRONLY | MPI_MODE_CREATE, info, &fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_open");
errcode = MPIX_File_iwrite_at_all(fh, off, buffer, BUFSIZE, MPI_INT,
&request);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPIX_File_iwrite_at_all");
MPI_Wait(&request, &status);
errcode = MPI_File_close(&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_close");
errcode = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY, info,
&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_open");
errcode = MPIX_File_iread_at_all(fh, off, buf2, BUFSIZE, MPI_INT,
&request);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPIX_File_iread_at_all");
MPI_Wait(&request, &status);
errcode = MPI_File_close(&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_close");
for (i = 0; i < BUFSIZE; i++) {
if (buf2[i] != 10000 + rank)
errs++;
}
MPI_Allreduce(&errs, &toterrs, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
if (rank == 0) {
if (toterrs > 0) {
fprintf(stderr, "Found %d errors\n", toterrs);
}
else {
fprintf(stdout, " No Errors\n");
}
}
MPI_Info_free(&info);
MPI_Finalize();
return 0;
}
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpi.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
/* A 32^3 array. For other array sizes, change array_of_gsizes below. */
/* Uses nonblocking collective I/O. Writes a 3D block-distributed array to
a file corresponding to the global array in row-major (C) order, reads it
back, and checks that the data read is correct. */
/* Note that the file access pattern is noncontiguous. */
void handle_error(int errcode, const char *str);
void handle_error(int errcode, const char *str)
{
char msg[MPI_MAX_ERROR_STRING];
int resultlen;
MPI_Error_string(errcode, msg, &resultlen);
fprintf(stderr, "%s: %s\n", str, msg);
MPI_Abort(MPI_COMM_WORLD, 1);
}
int main(int argc, char **argv)
{
MPI_Datatype newtype;
int i, ndims, array_of_gsizes[3], array_of_distribs[3];
int order, nprocs, j, len;
int array_of_dargs[3], array_of_psizes[3];
int *readbuf, *writebuf, mynod, *tmpbuf, array_size;
MPI_Count bufcount;
char *filename;
int errs = 0, toterrs;
MPI_File fh;
MPI_Status status;
MPI_Request request;
MPI_Info info = MPI_INFO_NULL;
int errcode;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mynod);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
/* process 0 broadcasts the file name to other processes */
if (!mynod) {
filename = "testfile";
len = strlen(filename);
MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(filename, len + 1, MPI_CHAR, 0, MPI_COMM_WORLD);
}
else {
MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
filename = (char *)malloc(len + 1);
MPI_Bcast(filename, len + 1, MPI_CHAR, 0, MPI_COMM_WORLD);
}
/* create the distributed array filetype */
ndims = 3;
order = MPI_ORDER_C;
array_of_gsizes[0] = 32;
array_of_gsizes[1] = 32;
array_of_gsizes[2] = 32;
array_of_distribs[0] = MPI_DISTRIBUTE_BLOCK;
array_of_distribs[1] = MPI_DISTRIBUTE_BLOCK;
array_of_distribs[2] = MPI_DISTRIBUTE_BLOCK;
array_of_dargs[0] = MPI_DISTRIBUTE_DFLT_DARG;
array_of_dargs[1] = MPI_DISTRIBUTE_DFLT_DARG;
array_of_dargs[2] = MPI_DISTRIBUTE_DFLT_DARG;
for (i = 0; i < ndims; i++) array_of_psizes[i] = 0;
MPI_Dims_create(nprocs, ndims, array_of_psizes);
MPI_Type_create_darray(nprocs, mynod, ndims, array_of_gsizes,
array_of_distribs, array_of_dargs,
array_of_psizes, order, MPI_INT, &newtype);
MPI_Type_commit(&newtype);
/* initialize writebuf */
MPI_Type_size_x(newtype, &bufcount);
bufcount = bufcount / sizeof(int);
writebuf = (int *)malloc(bufcount * sizeof(int));
for (i = 0; i < bufcount; i++) writebuf[i] = 1;
array_size = array_of_gsizes[0] * array_of_gsizes[1] * array_of_gsizes[2];
tmpbuf = (int *) calloc(array_size, sizeof(int));
MPI_Irecv(tmpbuf, 1, newtype, mynod, 10, MPI_COMM_WORLD, &request);
MPI_Send(writebuf, bufcount, MPI_INT, mynod, 10, MPI_COMM_WORLD);
MPI_Wait(&request, &status);
j = 0;
for (i = 0; i < array_size; i++)
if (tmpbuf[i]) {
writebuf[j] = i;
j++;
}
free(tmpbuf);
if (j != bufcount) {
fprintf(stderr, "Error in initializing writebuf on process %d\n",
mynod);
MPI_Abort(MPI_COMM_WORLD, 1);
}
/* end of initialization */
/* write the array to the file */
errcode = MPI_File_open(MPI_COMM_WORLD, filename,
MPI_MODE_CREATE | MPI_MODE_RDWR, info, &fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_open");
errcode = MPI_File_set_view(fh, 0, MPI_INT, newtype, "native", info);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_set_view");
errcode = MPIX_File_iwrite_all(fh, writebuf, bufcount, MPI_INT, &request);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPIX_File_iwrite_all");
MPI_Wait(&request, &status);
errcode = MPI_File_close(&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_close");
if (!mynod) {
/* wkl suggests potential for false " No Errors" if both read
* and write use the same file view */
/* solution: rank 0 reads entire file and checks write values */
errcode = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_RDONLY, info,
&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_open");
readbuf = (int *)malloc(array_size * sizeof(int));
errcode = MPI_File_read(fh, readbuf, array_size, MPI_INT, &status);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_read");
errcode = MPI_File_close(&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_close");
for (i = 0; i < array_size; i++)
if (readbuf[i] != i) {
errs++;
fprintf(stderr, "Error: write integer %d but read %d\n",
i, readbuf[i]);
break;
}
free(readbuf);
}
MPI_Barrier(MPI_COMM_WORLD);
/* now read it back */
readbuf = (int *)malloc(bufcount * sizeof(int));
errcode = MPI_File_open(MPI_COMM_WORLD, filename,
MPI_MODE_CREATE | MPI_MODE_RDWR, info, &fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_open");
errcode = MPI_File_set_view(fh, 0, MPI_INT, newtype, "native", info);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_set_view");
errcode = MPIX_File_iread_all(fh, readbuf, bufcount, MPI_INT, &request);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPIX_File_iread_all");
MPI_Wait(&request, &status);
errcode = MPI_File_close(&fh);
if (errcode != MPI_SUCCESS) handle_error(errcode, "MPI_File_close");
/* check the data read */
for (i = 0; i < bufcount; i++) {
if (readbuf[i] != writebuf[i]) {
errs++;
fprintf(stderr, "Process %d, readbuf %d, writebuf %d, i %d\n",
mynod, readbuf[i], writebuf[i], i);
}
}