Add a test for parallel reads of independent files using MPI subgroups

This commit is contained in:
Richard Warren 2017-10-09 16:47:21 -04:00
parent ceab5a5176
commit 837624b9cd
3 changed files with 159 additions and 65 deletions

View File

@ -29,29 +29,6 @@
#endif /* H5_HAVE_PARALLEL */
#ifdef H5_HAVE_PARALLEL
#if 0 /* delete this eventually */
#define H5FD_GET_MPI_RANK_AND_SIZE(rank,size, f) { \
(rank) = 0; (size) = 1; \
if (H5F_HAS_FEATURE((f), H5FD_FEAT_HAS_MPI)) { \
(rank) = H5F_mpi_get_rank((f)); \
(size) = H5F_mpi_get_size((f)); \
} else { \
int mpi_initialized = 0, mpi_finalized = 0; \
MPI_Initialized(&mpi_initialized); \
MPI_Finalized(&mpi_finalized); \
if (mpi_initialized && !mpi_finalized) { \
MPI_Comm_rank(MPI_COMM_WORLD, &(rank)); \
MPI_Comm_size(MPI_COMM_WORLD, &(size)); \
} \
}}
#define H5FD_GET_MPI_COMM(comm, f) { \
if (H5F_HAS_FEATURE((f), H5FD_FEAT_HAS_MPI)) \
(comm) = H5F_mpi_get_comm((f)); \
else (comm) = MPI_COMM_WORLD; \
}
#endif /* delete this eventually */
/*Turn on H5FDmpio_debug if H5F_DEBUG is on */
#ifdef H5F_DEBUG
#ifndef H5FDmpio_DEBUG

View File

@ -359,9 +359,6 @@ H5F__super_read(H5F_t *f, hid_t meta_dxpl_id, hid_t raw_dxpl_id, hbool_t initial
/* Find the superblock */
#ifdef H5_HAVE_PARALLEL
#if 0
H5FD_GET_MPI_RANK_AND_SIZE(mpi_rank, mpi_size, f);
#else
if(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI)) {
if((mpi_rank = H5F_mpi_get_rank(f)) < 0)
@ -370,9 +367,15 @@ H5F__super_read(H5F_t *f, hid_t meta_dxpl_id, hid_t raw_dxpl_id, hbool_t initial
if((mpi_size = H5F_mpi_get_size(f)) < 0)
HGOTO_ERROR(H5E_PAGEBUF, H5E_CANTGET, FAIL, "can't retrieve MPI communicator size")
}
#endif
/* If we are an MPI application with at least two processes, the
* following superblock signature location optimization is applicable.
*
* Note:: For parallel applications which don't setup for using the
* HDF5 MPIO driver, we will arrive here with mpi_size == 1.
* This occurs because of the variable initialization (above) and the
* fact that we have skipped actually calling MPI functions to determine
* our MPI rank and size.
*/
if ( mpi_size > 1 ) {
MPI_Comm this_comm = MPI_COMM_NULL;
@ -381,12 +384,6 @@ H5F__super_read(H5F_t *f, hid_t meta_dxpl_id, hid_t raw_dxpl_id, hbool_t initial
if(H5FD_locate_signature(&fdio_info, &super_addr) < 0)
HGOTO_ERROR(H5E_FILE, H5E_NOTHDF5, FAIL, "unable to locate file signature")
}
#if 0
H5FD_GET_MPI_COMM(this_comm, f);
if (( this_comm == MPI_COMM_NULL ) ||
( MPI_Bcast(&super_addr,sizeof(super_addr), MPI_BYTE, 0, this_comm) != MPI_SUCCESS))
HGOTO_ERROR(H5E_FILE, H5E_NOTHDF5, FAIL, "unable to locate file signature")
#else
HDassert(H5F_HAS_FEATURE(f, H5FD_FEAT_HAS_MPI));
if ( MPI_COMM_NULL == (this_comm = H5F_mpi_get_comm(f)) )
@ -395,7 +392,6 @@ H5F__super_read(H5F_t *f, hid_t meta_dxpl_id, hid_t raw_dxpl_id, hbool_t initial
if ( MPI_SUCCESS !=
(mpi_result = MPI_Bcast(&super_addr,sizeof(super_addr), MPI_BYTE, 0, this_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Bcast failed", mpi_result)
#endif
}
else {
/* Locate the signature as per per the serial library */

View File

@ -35,15 +35,36 @@ static const char *random_hdf5_text =
manual or go thru the tutorials!\n\
While you\'re at it, now is also the time to read up on MPI-IO.";
static int generate_test_file(int mpi_rank, int mpi_size);
static int test_parallel_read(int mpi_rank);
static const char *hitchhiker_quote =
"A common mistake that people make when trying to design something\n\
completely foolproof is to underestimate the ingenuity of complete\n\
fools.\n";
static int generate_test_file(MPI_Comm comm, int mpi_rank, int group);
static int test_parallel_read(MPI_Comm comm, int mpi_rank, int group);
/*-------------------------------------------------------------------------
* Function: generate_test_file
*
* Purpose: *** Richard -- please fill this in ***
* Purpose: This function is called to produce an HDF5 data file
* whose superblock is relocated to a non-zero offset by
* utilizing the 'h5jam' utility to write random text
* at the start of the file. Unlike simple concatenation
* of files, h5jam is used to place the superblock on a
* power-of-2 boundary.
*
* Since data will be read back and validated, we generate
* data in a predictable manner rather than randomly.
* For now, we simply use the mpi_rank of the writing
* process as a starting component of the data generation.
* Subsequent writes are increments from the initial start
* value.
*
* In the overall scheme of running the test, we'll call
* this function twice so as to create two seperate files.
* Each file will serve as the input data for two
* independent parallel reads.
*
* Return: Success: 0
*
@ -57,14 +78,17 @@ static int test_parallel_read(int mpi_rank);
*-------------------------------------------------------------------------
*/
static int
generate_test_file( int mpi_rank, int mpi_size )
generate_test_file( MPI_Comm comm, int mpi_rank, int group_id )
{
FILE *header;
const char *fcn_name = "generate_test_file()";
const char *failure_mssg = NULL;
char group_file[FILENAME_BUF_SIZE];
char data_filename[FILENAME_BUF_SIZE];
char reloc_data_filename[FILENAME_BUF_SIZE];
char prolog_filename[FILENAME_BUF_SIZE];
int group_size;
int group_rank;
int local_failure = 0;
int global_failures = 0;
hsize_t count = COUNT;
@ -82,6 +106,18 @@ generate_test_file( int mpi_rank, int mpi_size )
pass = true;
HDassert(comm != MPI_COMM_NULL);
if ( (MPI_Comm_rank(comm, &group_rank)) != MPI_SUCCESS) {
pass = FALSE;
failure_mssg = "generate_test_file: MPI_Comm_rank failed.\n";
}
if ( (MPI_Comm_size(comm, &group_size)) != MPI_SUCCESS) {
pass = FALSE;
failure_mssg = "generate_test_file: MPI_Comm_size failed.\n";
}
if ( mpi_rank == 0 ) {
HDfprintf(stdout, "Constructing test files...");
@ -90,8 +126,11 @@ generate_test_file( int mpi_rank, int mpi_size )
/* setup the file names */
if ( pass ) {
HDassert(FILENAMES[0]);
if ( h5_fixname(FILENAMES[0], H5P_DEFAULT, data_filename,
if ( HDsprintf(group_file, "%s_%d", FILENAMES[0], group_id) < 0) {
pass = FALSE;
failure_mssg = "HDsprintf(0) failed.\n";
}
else if ( h5_fixname(group_file, H5P_DEFAULT, data_filename,
sizeof(data_filename)) == NULL ) {
pass = FALSE;
failure_mssg = "h5_fixname(0) failed.\n";
@ -100,8 +139,11 @@ generate_test_file( int mpi_rank, int mpi_size )
if ( pass ) {
HDassert(FILENAMES[1]);
if ( h5_fixname(FILENAMES[1], H5P_DEFAULT, reloc_data_filename,
if ( HDsprintf(group_file, "%s_%d", FILENAMES[1], group_id) < 0) {
pass = FALSE;
failure_mssg = "HDsprintf(1) failed.\n";
}
else if ( h5_fixname(group_file, H5P_DEFAULT, reloc_data_filename,
sizeof(reloc_data_filename)) == NULL ) {
pass = FALSE;
@ -111,8 +153,11 @@ generate_test_file( int mpi_rank, int mpi_size )
if ( pass ) {
HDassert(FILENAMES[2]);
if ( h5_fixname(FILENAMES[2], H5P_DEFAULT, prolog_filename,
if ( HDsprintf(group_file, "%s_%d", FILENAMES[2], group_id) < 0) {
pass = FALSE;
failure_mssg = "HDsprintf(2) failed.\n";
}
else if ( h5_fixname(group_file, H5P_DEFAULT, prolog_filename,
sizeof(prolog_filename)) == NULL ) {
pass = FALSE;
failure_mssg = "h5_fixname(2) failed.\n";
@ -145,7 +190,7 @@ generate_test_file( int mpi_rank, int mpi_size )
}
if ( pass ) {
if ( (H5Pset_fapl_mpio(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL)) < 0 ) {
if ( (H5Pset_fapl_mpio(fapl_id, comm, MPI_INFO_NULL)) < 0 ) {
pass = FALSE;
failure_mssg = "H5Pset_fapl_mpio() failed\n";
}
@ -184,7 +229,7 @@ generate_test_file( int mpi_rank, int mpi_size )
}
if ( pass ) {
dims[0] *= (hsize_t)mpi_size;
dims[0] *= (hsize_t)group_size;
if ( (filespace = H5Screate_simple(1, dims, NULL)) < 0 ) {
pass = FALSE;
failure_mssg = "H5Screate_simple(1, dims, NULL) failed (2).\n";
@ -192,7 +237,7 @@ generate_test_file( int mpi_rank, int mpi_size )
}
if ( pass ) {
offset = (hsize_t)mpi_rank * (hsize_t)COUNT;
offset = (hsize_t)group_rank * (hsize_t)COUNT;
if ( (H5Sselect_hyperslab(filespace, H5S_SELECT_SET, &offset,
NULL, &count, NULL)) < 0 ) {
pass = FALSE;
@ -266,11 +311,17 @@ generate_test_file( int mpi_rank, int mpi_size )
*
* Also delete files that are no longer needed.
*/
if ( mpi_rank == 0 ) {
if ( group_rank == 0 ) {
const char *text_to_write;
size_t bytes_to_write;
bytes_to_write = strlen(random_hdf5_text);
if (group_id == 0)
text_to_write = random_hdf5_text;
else
text_to_write = hitchhiker_quote;
bytes_to_write = strlen(text_to_write);
if ( pass ) {
if ( (header = HDfopen(prolog_filename, "w+")) == NULL ) {
@ -280,8 +331,8 @@ generate_test_file( int mpi_rank, int mpi_size )
}
if ( pass ) {
bytes_to_write = strlen(random_hdf5_text);
if ( HDfwrite(random_hdf5_text, 1, bytes_to_write, header) !=
if ( HDfwrite(text_to_write, 1, bytes_to_write, header) !=
bytes_to_write ) {
pass = FALSE;
failure_mssg = "Unable to write header file.\n";
@ -319,6 +370,7 @@ generate_test_file( int mpi_rank, int mpi_size )
*/
local_failure = ( pass ? 0 : 1 );
/* This is a global all reduce (NOT group specific) */
if ( MPI_Allreduce(&local_failure, &global_failures, 1,
MPI_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS ) {
if ( pass ) {
@ -355,8 +407,25 @@ generate_test_file( int mpi_rank, int mpi_size )
/*-------------------------------------------------------------------------
* Function: test_parallel_read
*
* Purpose: *** Richard -- please fill this in ***
* Purpose: This actually tests the superblock optimization
* and covers the two primary cases we're interested in.
* 1). That HDF5 files can be opened in parallel by
* the rank 0 process and that the superblock
* offset is correctly broadcast to the other
* parallel file readers.
* 2). That a parallel application can correctly
* handle reading multiple files by using
* subgroups of MPI_COMM_WORLD and that each
* subgroup operates as described in (1) to
* collectively read the data.
*
* The global MPI rank is used for reading and
* writing data for process specific data in the
* dataset. We do this rather simplisticly, i.e.
* rank 0: writes/reads 0-9999
* rank 1: writes/reads 1000-1999
* rank 2: writes/reads 2000-2999
* ...
*
* Return: Success: 0
*
@ -370,13 +439,16 @@ generate_test_file( int mpi_rank, int mpi_size )
*-------------------------------------------------------------------------
*/
static int
test_parallel_read(int mpi_rank)
test_parallel_read(MPI_Comm comm, int mpi_rank, int group_id)
{
const char *failure_mssg;
const char *fcn_name = "test_parallel_read()";
char group_file[FILENAME_BUF_SIZE];
char reloc_data_filename[FILENAME_BUF_SIZE];
int local_failure = 0;
int global_failures = 0;
int group_size;
int group_rank;
hid_t fapl_id;
hid_t file_id;
hid_t dset_id;
@ -391,6 +463,18 @@ test_parallel_read(int mpi_rank)
pass = TRUE;
HDassert(comm != MPI_COMM_NULL);
if ( (MPI_Comm_rank(comm, &group_rank)) != MPI_SUCCESS) {
pass = FALSE;
failure_mssg = "test_parallel_read: MPI_Comm_rank failed.\n";
}
if ( (MPI_Comm_size(comm, &group_size)) != MPI_SUCCESS) {
pass = FALSE;
failure_mssg = "test_parallel_read: MPI_Comm_size failed.\n";
}
if ( mpi_rank == 0 ) {
TESTING("parallel file open test 1");
@ -408,8 +492,11 @@ test_parallel_read(int mpi_rank)
/* construct file file name */
if ( pass ) {
HDassert(FILENAMES[1]);
if ( h5_fixname(FILENAMES[1], H5P_DEFAULT, reloc_data_filename,
if ( HDsprintf(group_file, "%s_%d", FILENAMES[1], group_id) < 0) {
pass = FALSE;
failure_mssg = "HDsprintf(0) failed.\n";
}
else if ( h5_fixname(group_file, H5P_DEFAULT, reloc_data_filename,
sizeof(reloc_data_filename)) == NULL ) {
pass = FALSE;
@ -426,7 +513,7 @@ test_parallel_read(int mpi_rank)
}
if ( pass ) {
if ( (H5Pset_fapl_mpio(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL)) < 0 ) {
if ( (H5Pset_fapl_mpio(fapl_id, comm, MPI_INFO_NULL)) < 0 ) {
pass = FALSE;
failure_mssg = "H5Pset_fapl_mpio() failed\n";
}
@ -467,7 +554,7 @@ test_parallel_read(int mpi_rank)
}
if ( pass ) {
offset = (hsize_t)mpi_rank * count;
offset = (hsize_t)group_rank * count;
if ( (H5Sselect_hyperslab(filespace, H5S_SELECT_SET,
&offset, NULL, &count, NULL)) < 0 ) {
pass = FALSE;
@ -557,7 +644,7 @@ test_parallel_read(int mpi_rank)
}
/* report results and finish cleanup */
if ( mpi_rank == 0 ) {
if ( group_rank == 0 ) {
if ( pass ) {
PASSED();
} else {
@ -584,8 +671,17 @@ test_parallel_read(int mpi_rank)
/*-------------------------------------------------------------------------
* Function: main
*
* Purpose: *** Richard -- please fill this in ***
* Purpose: To implement a parallel test which validates whether the
* new superblock lookup functionality is working correctly.
*
* The test consists of creating two seperate HDF datasets
* in which random text is inserted at the start of each
* file using the 'j5jam' application. This forces the
* HDF5 file superblock to a non-zero offset.
* Having created the two independant files, we create two
* non-overlapping MPI groups, each of which is then tasked
* with the opening and validation of the data contained
* therein.
*
* WARNING: This test uses fork() and execve(), and
* therefore will not run on Windows.
@ -606,8 +702,11 @@ int
main( int argc, char **argv)
{
int nerrs = 0;
int which_group;
int mpi_rank;
int mpi_size;
int split_size;
MPI_Comm group_comm = MPI_COMM_NULL;
if ( (MPI_Init(&argc, &argv)) != MPI_SUCCESS) {
HDfprintf(stderr, "FATAL: Unable to initialize MPI\n");
@ -633,21 +732,37 @@ main( int argc, char **argv)
HDfprintf(stdout, "========================================\n");
}
if ( mpi_size < 2 ) {
if ( mpi_size < 4 ) {
if ( mpi_rank == 0 ) {
HDprintf(" Need at least 2 processes. Exiting.\n");
HDprintf(" Need at least 4 processes. Exiting.\n");
}
goto finish;
}
/* Divide the available processes into two groups
* that are the same size (plus or minus).
*/
split_size = mpi_size / 2;
which_group = (mpi_rank < split_size ? 0 : 1);
if ( (MPI_Comm_split(MPI_COMM_WORLD,
which_group,
0,
&group_comm)) != MPI_SUCCESS) {
HDfprintf(stderr, "FATAL: MPI_Comm_split returned an error\n");
exit(2);
}
/* create the test files & verify that the process
* succeeded. If not, abort the remaining tests as
* they depend on the test files.
*/
nerrs += generate_test_file( mpi_rank, mpi_size );
nerrs += generate_test_file( group_comm, mpi_rank, which_group );
/* abort tests if there were any errors in test file construction */
if ( nerrs > 0 ) {
@ -655,13 +770,19 @@ main( int argc, char **argv)
HDprintf(" Test file construction failed -- skipping tests.\n");
}
goto finish;
}
}
/* run the tests */
nerrs += test_parallel_read(mpi_rank);
/* run the tests */
nerrs += test_parallel_read(group_comm, mpi_rank, which_group);
finish:
if ((group_comm != MPI_COMM_NULL) &&
(MPI_Comm_free(&group_comm)) != MPI_SUCCESS) {
HDfprintf(stderr, "MPI_Comm_free failed!\n");
}
/* make sure all processes are finished before final report, cleanup
* and exit.
*/