Subfiling VFD updates (#2106)

This commit is contained in:
jhendersonHDF 2022-09-16 11:17:30 -05:00 committed by GitHub
parent 45178c87a3
commit 16aa2dbaa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 5702 additions and 2626 deletions

View File

@ -740,6 +740,12 @@ option (HDF5_ENABLE_SUBFILING_VFD "Build Parallel HDF5 Subfiling VFD" OFF)
if (HDF5_ENABLE_SUBFILING_VFD)
if (NOT HDF5_ENABLE_PARALLEL)
message (FATAL_ERROR "Subfiling VFD requires a parallel HDF5 build")
else ()
# Check for MPI_Comm_split_type
CHECK_SYMBOL_EXISTS (MPI_Comm_split_type "mpi.h" H5_HAVE_MPI_Comm_split_type)
if (NOT H5_HAVE_MPI_Comm_split_type)
message (FATAL_ERROR "Subfiling VFD requires MPI-3 support for MPI_Comm_split_type")
endif ()
endif()
if (NOT DEFINED Threads_FOUND)

View File

@ -3213,6 +3213,32 @@ if test "X$SUBFILING_VFD" = "Xyes"; then
AC_DEFINE([HAVE_IOC_VFD], [1],
[Define if the I/O Concentrator virtual file driver (VFD) should be compiled])
if test "X${PARALLEL}" != "Xyes"; then
AC_MSG_ERROR([--enable-parallel is required for --enable-subfiling-vfd])
fi
## ----------------------------------------------------------------------
## Check for MPI_Comm_split_type availability
##
AC_MSG_CHECKING([for MPI_Comm_split_type function])
AC_LINK_IFELSE(
[AC_LANG_PROGRAM(
[[
#include <mpi.h>
]],
[[
MPI_Comm intra_comm;
MPI_Init(0, (void *) 0);
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &intra_comm);
]]
)],
[AC_MSG_RESULT([yes])],
[AC_MSG_RESULT([no])
AC_MSG_ERROR([unable to link MPI program that uses MPI_Comm_split_type])
]
)
# Set-up mercury
HAVE_MERCURY="yes"
mercury_dir="$ac_abs_confdir/src/H5FDsubfiling/mercury"
@ -3221,17 +3247,13 @@ if test "X$SUBFILING_VFD" = "Xyes"; then
CPPFLAGS="$CPPFLAGS -I$mercury_inc"
AM_CPPFLAGS="$AM_CPPFLAGS -I$mercury_inc"
if test "X${PARALLEL}" != "Xyes"; then
AC_MSG_ERROR([--enable-parallel is required for --enable-subfiling-vfd])
fi
HAVE_STDATOMIC_H="yes"
AC_CHECK_HEADERS([stdatomic.h],,[HAVE_STDATOMIC_H="no"])
if test "x$HAVE_STDATOMIC_H" = "xno"; then
AC_MSG_ERROR([Subfiling VFD requires atomic operations support. C11 stdatomic.h NOT available.])
fi
# Checks for libraries.
# Checks for libraries.
AC_SEARCH_LIBS([shm_open], [rt])
AC_CHECK_LIB([pthread], [pthread_self],[], [echo "Error: Required library pthread not found." && exit 1])

View File

@ -12,7 +12,7 @@
/*
* Example of using HDF5's Subfiling VFD to write to an
* HDF5 file that is striped across multiple sub-files
* HDF5 file that is striped across multiple subfiles
*
* If the HDF5_NOCLEANUP environment variable is set, the
* files that this example creates will not be removed as
@ -35,12 +35,13 @@
#define EXAMPLE_FILE "h5_subfiling_default_example.h5"
#define EXAMPLE_FILE2 "h5_subfiling_custom_example.h5"
#define EXAMPLE_FILE3 "h5_subfiling_precreate_example.h5"
#define EXAMPLE_DSET_NAME "DSET"
#define EXAMPLE_DSET_DIMS 2
/* Have each MPI rank write 64MiB of data */
#define EXAMPLE_DSET_NY 16777216
/* Have each MPI rank write 16MiB of data */
#define EXAMPLE_DSET_NY 4194304
/* Dataset datatype */
#define EXAMPLE_DSET_DATATYPE H5T_NATIVE_INT
@ -56,6 +57,11 @@ cleanup(char *filename, hid_t fapl_id)
H5Fdelete(filename, fapl_id);
}
/*
* An example of using the HDF5 Subfiling VFD with
* its default settings of 1 subfile per node, with
* a stripe size of 32MiB
*/
static void
subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
{
@ -64,11 +70,18 @@ subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
hsize_t start[EXAMPLE_DSET_DIMS];
hsize_t count[EXAMPLE_DSET_DIMS];
hid_t file_id;
hid_t subfiling_fapl;
hid_t dset_id;
hid_t filespace;
char filename[512];
char *par_prefix;
/*
* Make a copy of the FAPL so we don't disturb
* it for the other examples
*/
subfiling_fapl = H5Pcopy(fapl_id);
/*
* Set Subfiling VFD on FAPL using default settings
* (use IOC VFD, 1 IOC per node, 32MiB stripe size)
@ -77,7 +90,7 @@ subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
* can be adjusted with environment variables as well
* in this case.
*/
H5Pset_fapl_subfiling(fapl_id, NULL);
H5Pset_fapl_subfiling(subfiling_fapl, NULL);
/*
* OPTIONAL: Set alignment of objects in HDF5 file to
@ -94,7 +107,7 @@ subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
* files, so it is a good idea to keep an eye
* on this.
*/
H5Pset_alignment(fapl_id, 0, 33554432); /* Align to default 32MiB stripe size */
H5Pset_alignment(subfiling_fapl, 0, 33554432); /* Align to default 32MiB stripe size */
/* Parse any parallel prefix and create filename */
par_prefix = getenv("HDF5_PARAPREFIX");
@ -105,7 +118,7 @@ subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
/*
* Create a new file collectively
*/
file_id = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id);
file_id = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, subfiling_fapl);
/*
* Create the dataspace for the dataset. The first
@ -155,9 +168,15 @@ subfiling_write_default(hid_t fapl_id, int mpi_size, int mpi_rank)
H5Sclose(filespace);
H5Fclose(file_id);
cleanup(EXAMPLE_FILE, fapl_id);
cleanup(EXAMPLE_FILE, subfiling_fapl);
H5Pclose(subfiling_fapl);
}
/*
* An example of using the HDF5 Subfiling VFD with
* custom settings
*/
static void
subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
{
@ -168,17 +187,23 @@ subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
hsize_t start[EXAMPLE_DSET_DIMS];
hsize_t count[EXAMPLE_DSET_DIMS];
hid_t file_id;
hid_t ioc_fapl;
hid_t subfiling_fapl;
hid_t dset_id;
hid_t filespace;
char filename[512];
char *par_prefix;
/*
* Make a copy of the FAPL so we don't disturb
* it for the other examples
*/
subfiling_fapl = H5Pcopy(fapl_id);
/*
* Get a default Subfiling and IOC configuration
*/
H5Pget_fapl_subfiling(fapl_id, &subf_config);
H5Pget_fapl_ioc(fapl_id, &ioc_config);
H5Pget_fapl_subfiling(subfiling_fapl, &subf_config);
H5Pget_fapl_ioc(subfiling_fapl, &ioc_config);
/*
* Set Subfiling configuration to use a 1MiB
@ -198,32 +223,18 @@ subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
* configuration.
*/
ioc_config.thread_pool_size = 2;
ioc_config.subf_config = subf_config.shared_cfg;
/*
* Create a File Access Property List for
* the IOC VFD and set our new configuration
* on it. We make a copy of the original
* FAPL here so we get the MPI parameters
* set on it
* Set our new configuration on the IOC
* FAPL used for Subfiling
*/
ioc_fapl = H5Pcopy(fapl_id);
H5Pset_fapl_ioc(ioc_fapl, &ioc_config);
/*
* Close FAPLs in the default configurations
* we retrieved and update the subfiling
* configuration with our new IOC FAPL
*/
H5Pclose(ioc_config.under_fapl_id);
H5Pclose(subf_config.ioc_fapl_id);
subf_config.ioc_fapl_id = ioc_fapl;
H5Pset_fapl_ioc(subf_config.ioc_fapl_id, &ioc_config);
/*
* Finally, set our new Subfiling configuration
* on the original FAPL
*/
H5Pset_fapl_subfiling(fapl_id, &subf_config);
H5Pset_fapl_subfiling(subfiling_fapl, &subf_config);
/*
* OPTIONAL: Set alignment of objects in HDF5 file to
@ -240,7 +251,7 @@ subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
* files, so it is a good idea to keep an eye
* on this.
*/
H5Pset_alignment(fapl_id, 0, 1048576); /* Align to custom 1MiB stripe size */
H5Pset_alignment(subfiling_fapl, 0, 1048576); /* Align to custom 1MiB stripe size */
/* Parse any parallel prefix and create filename */
par_prefix = getenv("HDF5_PARAPREFIX");
@ -251,7 +262,7 @@ subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
/*
* Create a new file collectively
*/
file_id = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, fapl_id);
file_id = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, subfiling_fapl);
/*
* Create the dataspace for the dataset. The first
@ -301,7 +312,179 @@ subfiling_write_custom(hid_t fapl_id, int mpi_size, int mpi_rank)
H5Sclose(filespace);
H5Fclose(file_id);
cleanup(EXAMPLE_FILE2, fapl_id);
cleanup(EXAMPLE_FILE2, subfiling_fapl);
H5Pclose(subfiling_fapl);
}
/*
* An example of pre-creating an HDF5 file on MPI rank
* 0 when using the HDF5 Subfiling VFD. In this case,
* the subfiling stripe count must be set so that rank
* 0 knows how many subfiles to pre-create.
*/
static void
subfiling_write_precreate(hid_t fapl_id, int mpi_size, int mpi_rank)
{
EXAMPLE_DSET_C_DATATYPE *data;
H5FD_subfiling_config_t subf_config;
hsize_t dset_dims[EXAMPLE_DSET_DIMS];
hsize_t start[EXAMPLE_DSET_DIMS];
hsize_t count[EXAMPLE_DSET_DIMS];
hid_t file_id;
hid_t subfiling_fapl;
hid_t dset_id;
hid_t filespace;
char filename[512];
char *par_prefix;
/*
* Make a copy of the FAPL so we don't disturb
* it for the other examples
*/
subfiling_fapl = H5Pcopy(fapl_id);
/*
* Get a default Subfiling and IOC configuration
*/
H5Pget_fapl_subfiling(subfiling_fapl, &subf_config);
/*
* Set the Subfiling stripe count so that rank
* 0 knows how many subfiles the logical HDF5
* file should consist of. In this case, use
* 5 subfiles with a default stripe size of
* 32MiB.
*/
subf_config.shared_cfg.stripe_count = 5;
/*
* OPTIONAL: Set alignment of objects in HDF5 file to
* be equal to the Subfiling stripe size.
* Choosing a Subfiling stripe size and HDF5
* object alignment value that are some
* multiple of the disk block size can
* generally help performance by ensuring
* that I/O is well-aligned and doesn't
* excessively cross stripe boundaries.
*
* Note that this option can substantially
* increase the size of the resulting HDF5
* files, so it is a good idea to keep an eye
* on this.
*/
H5Pset_alignment(subfiling_fapl, 0, 1048576); /* Align to custom 1MiB stripe size */
/* Parse any parallel prefix and create filename */
par_prefix = getenv("HDF5_PARAPREFIX");
snprintf(filename, sizeof(filename), "%s%s%s", par_prefix ? par_prefix : "", par_prefix ? "/" : "",
EXAMPLE_FILE3);
/* Set dataset dimensionality */
dset_dims[0] = mpi_size;
dset_dims[1] = EXAMPLE_DSET_NY;
if (mpi_rank == 0) {
/*
* Make sure only this rank opens the file
*/
H5Pset_mpi_params(subfiling_fapl, MPI_COMM_SELF, MPI_INFO_NULL);
/*
* Set the Subfiling VFD on our FAPL using
* our custom configuration
*/
H5Pset_fapl_subfiling(subfiling_fapl, &subf_config);
/*
* Create a new file on rank 0
*/
file_id = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, subfiling_fapl);
/*
* Create the dataspace for the dataset. The first
* dimension varies with the number of MPI ranks
* while the second dimension is fixed.
*/
filespace = H5Screate_simple(EXAMPLE_DSET_DIMS, dset_dims, NULL);
/*
* Create the dataset with default properties
*/
dset_id = H5Dcreate2(file_id, EXAMPLE_DSET_NAME, EXAMPLE_DSET_DATATYPE, filespace, H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
/*
* Initialize data buffer
*/
data = malloc(dset_dims[0] * dset_dims[1] * sizeof(EXAMPLE_DSET_C_DATATYPE));
for (size_t i = 0; i < dset_dims[0] * dset_dims[1]; i++) {
data[i] = i;
}
/*
* Rank 0 writes to the whole dataset
*/
H5Dwrite(dset_id, EXAMPLE_DSET_DATATYPE, H5S_BLOCK, filespace, H5P_DEFAULT, data);
/*
* Close/release resources.
*/
free(data);
H5Dclose(dset_id);
H5Sclose(filespace);
H5Fclose(file_id);
}
MPI_Barrier(MPI_COMM_WORLD);
/*
* Use all MPI ranks to re-open the file and
* read back the dataset that was created
*/
H5Pset_mpi_params(subfiling_fapl, MPI_COMM_WORLD, MPI_INFO_NULL);
/*
* Use the same subfiling configuration as rank 0
* used to create the file
*/
H5Pset_fapl_subfiling(subfiling_fapl, &subf_config);
/*
* Re-open the file on all ranks
*/
file_id = H5Fopen(filename, H5F_ACC_RDONLY, subfiling_fapl);
/*
* Open the dataset that was created
*/
dset_id = H5Dopen2(file_id, EXAMPLE_DSET_NAME, H5P_DEFAULT);
/*
* Initialize data buffer
*/
data = malloc(dset_dims[0] * dset_dims[1] * sizeof(EXAMPLE_DSET_C_DATATYPE));
/*
* Read the dataset on all ranks
*/
H5Dread(dset_id, EXAMPLE_DSET_DATATYPE, H5S_BLOCK, H5S_ALL, H5P_DEFAULT, data);
/*
* Close/release resources.
*/
free(data);
H5Dclose(dset_id);
H5Fclose(file_id);
cleanup(EXAMPLE_FILE3, subfiling_fapl);
H5Pclose(subfiling_fapl);
}
int
@ -338,6 +521,12 @@ main(int argc, char **argv)
/* Use Subfiling VFD with custom settings */
subfiling_write_custom(fapl_id, mpi_size, mpi_rank);
/*
* Use Subfiling VFD to precreate the HDF5
* file on MPI rank 0
*/
subfiling_write_precreate(fapl_id, mpi_size, mpi_rank);
H5Pclose(fapl_id);
if (mpi_rank == 0)

File diff suppressed because it is too large Load Diff

View File

@ -84,11 +84,6 @@
* Property List. A pointer to an instance of this structure is
* a parameter to H5Pset_fapl_ioc() and H5Pget_fapl_ioc().
*
* The #H5FD_IOC driver shares much of its configuration with the
* #H5FD_SUBFILING driver and so its configuration structure
* contains an instance of a H5FD_subfiling_shared_config_t
* configuration structure.
*
* \var uint32_t H5FD_ioc_config_t::magic
* A somewhat unique number which distinguishes the #H5FD_IOC driver
* from other drivers. Used in combination with a version number, it
@ -101,31 +96,17 @@
* number or an error will be raised. Currently, this field should be set
* to #H5FD_IOC_CURR_FAPL_VERSION.
*
* \var hid_t H5FD_ioc_config_t::under_fapl_id
* The File Access Property List which is setup with the file driver
* to use for I/O to the HDF5 stub file. The stub file looks like a
* typical HDF5 file, but currently only contains the superblock metadata
* for compatibility with legacy HDF5 applications. The default driver used
* is currently the #H5FD_MPIO driver.
*
* \var int32_t H5FD_ioc_config_t::thread_pool_size
* The number of I/O concentrator worker threads to use.
*
* This value can also be set or adjusted with the #H5FD_IOC_THREAD_POOL_SIZE
* environment variable.
*
* \var H5FD_subfiling_shared_config_t H5FD_ioc_config_t::subf_config
* Subfiling configuration data for the parent #H5FD_SUBFILING driver. This
* includes the sub-file stripe size, number of I/O concentrators, IOC
* selection method, etc.
*
*/
typedef struct H5FD_ioc_config_t {
uint32_t magic; /* Must be set to H5FD_IOC_FAPL_MAGIC */
uint32_t version; /* Must be set to H5FD_IOC_CURR_FAPL_VERSION */
hid_t under_fapl_id; /* FAPL setup with the VFD to use for I/O to the HDF5 stub file */
int32_t thread_pool_size; /* Number of I/O concentrator worker threads to use */
H5FD_subfiling_shared_config_t subf_config; /* Subfiling driver configuration */
} H5FD_ioc_config_t;
//! <!-- [H5FD_ioc_config_t_snip] -->
@ -152,7 +133,7 @@ H5_DLL hid_t H5FD_ioc_init(void);
*
* The #H5FD_IOC driver is a reference implementation of an "I/O concentrator"
* file driver that works in conjunction with the #H5FD_SUBFILING driver and
* provides the I/O backend for servicing I/O requests to sub-files.
* provides the I/O backend for servicing I/O requests to subfiles.
*
* Typically, an HDF5 application won't need to call this routine directly.
* The #H5FD_IOC driver is usually set up as a side effect of an HDF5 application

View File

@ -16,31 +16,36 @@
#include "H5FDioc_priv.h"
static int async_completion(void *arg);
/*
* Given a file offset, the stripe size and
* the number of IOCs, calculate the target
* IOC for I/O and the file offset for the
* subfile that IOC controls
* Given a file offset, the stripe size, the
* number of IOCs and the number of subfiles,
* calculate the target IOC for I/O, the index
* of the target subfile out of the subfiles
* that the IOC controls and the file offset
* into that subfile
*/
static inline void
calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int n_io_concentrators, int64_t *target_ioc,
int64_t *ioc_file_offset)
calculate_target_ioc(int64_t file_offset, int64_t stripe_size, int num_io_concentrators, int num_subfiles,
int64_t *target_ioc, int64_t *ioc_file_offset, int64_t *ioc_subfile_idx)
{
int64_t stripe_idx;
int64_t subfile_row;
int64_t subfile_idx;
HDassert(stripe_size > 0);
HDassert(num_io_concentrators > 0);
HDassert(num_subfiles > 0);
HDassert(target_ioc);
HDassert(ioc_file_offset);
HDassert(stripe_size > 0);
HDassert(n_io_concentrators > 0);
HDassert(ioc_subfile_idx);
stripe_idx = file_offset / stripe_size;
subfile_row = stripe_idx / n_io_concentrators;
subfile_row = stripe_idx / num_subfiles;
subfile_idx = (stripe_idx % num_subfiles) / num_io_concentrators;
*target_ioc = stripe_idx % n_io_concentrators;
*target_ioc = (stripe_idx % num_subfiles) % num_io_concentrators;
*ioc_file_offset = (subfile_row * stripe_size) + (file_offset % stripe_size);
*ioc_subfile_idx = subfile_idx;
}
/*
@ -90,17 +95,20 @@ cast_to_void(const void *data)
*-------------------------------------------------------------------------
*/
herr_t
ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
const void *data, io_req_t **io_req)
ioc__write_independent_async(int64_t context_id, int64_t offset, int64_t elements, const void *data,
io_req_t **io_req)
{
subfiling_context_t *sf_context = NULL;
MPI_Request ack_request = MPI_REQUEST_NULL;
io_req_t *sf_io_request = NULL;
int64_t ioc_start;
int64_t ioc_offset;
int64_t ioc_subfile_idx;
int64_t msg[3] = {0};
int *io_concentrators = NULL;
int data_tag = 0;
int num_io_concentrators;
int num_subfiles;
int data_tag = 0;
int mpi_code;
herr_t ret_value = SUCCEED;
@ -111,13 +119,16 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
HDassert(sf_context->topology);
HDassert(sf_context->topology->io_concentrators);
io_concentrators = sf_context->topology->io_concentrators;
io_concentrators = sf_context->topology->io_concentrators;
num_io_concentrators = sf_context->topology->n_io_concentrators;
num_subfiles = sf_context->sf_num_subfiles;
/*
* Calculate the IOC that we'll send the I/O request to
* and the offset within that IOC's subfile
*/
calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start,
&ioc_offset, &ioc_subfile_idx);
/*
* Wait for memory to be allocated on the target IOC before
@ -141,37 +152,43 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
*/
msg[0] = elements;
msg[1] = ioc_offset;
msg[2] = context_id;
if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], WRITE_INDEP,
sf_context->sf_msg_comm)))
msg[2] = ioc_subfile_idx;
if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start],
WRITE_INDEP, sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
/* Wait to receive data tag */
/* Wait to receive the data tag from the IOC */
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code);
if (data_tag == 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "received NACK from IOC");
/* At this point in the new implementation, we should queue
* the async write so that when the top level VFD tells us
* to complete all pending IO requests, we have all the info
* we need to accomplish that.
/*
* Allocate the I/O request object that will
* be returned to the caller
*/
if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_WRITEERROR, FAIL, "couldn't allocate I/O request");
H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
sf_io_request->completion_func.io_args.context_id = context_id;
sf_io_request->completion_func.io_args.offset = offset;
sf_io_request->completion_func.io_args.elements = elements;
sf_io_request->completion_func.io_args.data = cast_to_void(data);
sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
sf_io_request->completion_func.io_function = async_completion;
sf_io_request->completion_func.pending = 0;
sf_io_request->ioc = (int)ioc_start;
sf_io_request->context_id = context_id;
sf_io_request->offset = offset;
sf_io_request->elements = elements;
sf_io_request->data = cast_to_void(data);
sf_io_request->io_transfer_req = MPI_REQUEST_NULL;
sf_io_request->io_comp_req = MPI_REQUEST_NULL;
sf_io_request->io_comp_tag = -1;
sf_io_request->prev = sf_io_request->next = NULL;
/*
* Start a non-blocking receive from the IOC that signifies
* when the actual write is complete
*/
if (MPI_SUCCESS !=
(mpi_code = MPI_Irecv(&sf_io_request->io_comp_tag, 1, MPI_INT, io_concentrators[ioc_start],
WRITE_DATA_DONE, sf_context->sf_data_comm, &sf_io_request->io_comp_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
/*
* Start the actual data transfer using the ack received
@ -180,7 +197,7 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
H5_CHECK_OVERFLOW(elements, int64_t, int);
if (MPI_SUCCESS !=
(mpi_code = MPI_Isend(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag,
sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
sf_context->sf_data_comm, &sf_io_request->io_transfer_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code);
/*
@ -193,14 +210,23 @@ ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t
* to the caller.
*/
sf_io_request->completion_func.pending = 1;
*io_req = sf_io_request;
*io_req = sf_io_request;
done:
if (ret_value < 0) {
if (ack_request != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&ack_request)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
}
if (sf_io_request) {
if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
}
if (sf_io_request->io_comp_req != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_comp_req, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
}
}
HDfree(sf_io_request);
@ -241,81 +267,141 @@ done:
*-------------------------------------------------------------------------
*/
herr_t
ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset, int64_t elements,
void *data, io_req_t **io_req)
ioc__read_independent_async(int64_t context_id, int64_t offset, int64_t elements, void *data,
io_req_t **io_req)
{
subfiling_context_t *sf_context = NULL;
MPI_Request ack_request = MPI_REQUEST_NULL;
io_req_t *sf_io_request = NULL;
hbool_t need_data_tag = FALSE;
int64_t ioc_start;
int64_t ioc_offset;
int64_t ioc_subfile_idx;
int64_t msg[3] = {0};
int *io_concentrators = NULL;
int num_io_concentrators;
int num_subfiles;
int data_tag = 0;
int mpi_code;
herr_t ret_value = SUCCEED;
HDassert(io_req);
H5_CHECK_OVERFLOW(elements, int64_t, int);
if (NULL == (sf_context = H5_get_subfiling_object(context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "can't get subfiling context from ID");
HDassert(sf_context->topology);
HDassert(sf_context->topology->io_concentrators);
io_concentrators = sf_context->topology->io_concentrators;
io_concentrators = sf_context->topology->io_concentrators;
num_io_concentrators = sf_context->topology->n_io_concentrators;
num_subfiles = sf_context->sf_num_subfiles;
/*
* If we are using 1 subfile per IOC, we can optimize reads
* a little since each read will go to a separate IOC and we
* won't be in danger of data being received in an
* unpredictable order. However, if some IOCs own more than
* 1 subfile, we need to associate each read with a unique
* message tag to make sure the data is received in the
* correct order.
*/
need_data_tag = num_subfiles != num_io_concentrators;
if (!need_data_tag)
data_tag = READ_INDEP_DATA;
/*
* Calculate the IOC that we'll send the I/O request to
* and the offset within that IOC's subfile
*/
calculate_target_ioc(offset, sf_context->sf_stripe_size, n_io_concentrators, &ioc_start, &ioc_offset);
calculate_target_ioc(offset, sf_context->sf_stripe_size, num_io_concentrators, num_subfiles, &ioc_start,
&ioc_offset, &ioc_subfile_idx);
/*
* At this point in the new implementation, we should queue
* the non-blocking recv so that when the top level VFD tells
* us to complete all pending IO requests, we have all the info
* we need to accomplish that.
*
* Post the early non-blocking receive here.
* Allocate the I/O request object that will
* be returned to the caller
*/
if (NULL == (sf_io_request = HDmalloc(sizeof(io_req_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_READERROR, FAIL, "couldn't allocate I/O request");
H5_CHECK_OVERFLOW(ioc_start, int64_t, int);
sf_io_request->completion_func.io_args.ioc = (int)ioc_start;
sf_io_request->completion_func.io_args.context_id = context_id;
sf_io_request->completion_func.io_args.offset = offset;
sf_io_request->completion_func.io_args.elements = elements;
sf_io_request->completion_func.io_args.data = data;
sf_io_request->completion_func.io_args.io_req = MPI_REQUEST_NULL;
sf_io_request->completion_func.io_function = async_completion;
sf_io_request->completion_func.pending = 0;
sf_io_request->ioc = (int)ioc_start;
sf_io_request->context_id = context_id;
sf_io_request->offset = offset;
sf_io_request->elements = elements;
sf_io_request->data = data;
sf_io_request->io_transfer_req = MPI_REQUEST_NULL;
sf_io_request->io_comp_req = MPI_REQUEST_NULL;
sf_io_request->io_comp_tag = -1;
sf_io_request->prev = sf_io_request->next = NULL;
if (need_data_tag) {
/*
* Post an early non-blocking receive for IOC to send an ACK
* (or NACK) message with a data tag that we will use for
* receiving data
*/
if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&data_tag, 1, MPI_INT, io_concentrators[ioc_start],
READ_INDEP_ACK, sf_context->sf_data_comm, &ack_request)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
H5_CHECK_OVERFLOW(elements, int64_t, int);
if (MPI_SUCCESS !=
(mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], READ_INDEP_DATA,
sf_context->sf_data_comm, &sf_io_request->completion_func.io_args.io_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
/*
* Prepare and send an I/O request to the IOC identified
* by the file offset
*/
msg[0] = elements;
msg[1] = ioc_offset;
msg[2] = ioc_subfile_idx;
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP,
sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
sf_io_request->completion_func.pending = 1;
*io_req = sf_io_request;
/* Wait to receive the data tag from the IOC */
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Wait failed", mpi_code);
if (data_tag == 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, FAIL, "received NACK from IOC");
}
/*
* Prepare and send an I/O request to the IOC identified
* by the file offset
* Post a non-blocking receive for the data from the IOC
* using the selected data tag (either the one received
* from the IOC or the static READ_INDEP_DATA tag)
*/
msg[0] = elements;
msg[1] = ioc_offset;
msg[2] = context_id;
if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 3, MPI_INT64_T, io_concentrators[ioc_start], READ_INDEP,
sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
if (MPI_SUCCESS !=
(mpi_code = MPI_Irecv(data, (int)elements, MPI_BYTE, io_concentrators[ioc_start], data_tag,
sf_context->sf_data_comm, &sf_io_request->io_transfer_req)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv failed", mpi_code);
if (!need_data_tag) {
/*
* Prepare and send an I/O request to the IOC identified
* by the file offset
*/
msg[0] = elements;
msg[1] = ioc_offset;
msg[2] = ioc_subfile_idx;
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, io_concentrators[ioc_start], READ_INDEP,
sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
}
*io_req = sf_io_request;
done:
if (ret_value < 0) {
if (sf_io_request && sf_io_request->completion_func.io_args.io_req != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&sf_io_request->completion_func.io_args.io_req)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel failed", mpi_code);
if (ack_request != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&ack_request, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
}
if (sf_io_request) {
if (sf_io_request->io_transfer_req != MPI_REQUEST_NULL) {
if (MPI_SUCCESS != (mpi_code = MPI_Wait(&sf_io_request->io_transfer_req, MPI_STATUS_IGNORE)))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Wait failed", mpi_code);
}
}
HDfree(sf_io_request);
@ -326,56 +412,27 @@ done:
} /* end ioc__read_independent_async() */
/*-------------------------------------------------------------------------
* Function: async_completion
* Function: ioc__async_completion
*
* Purpose: Given a single io_func_t structure containing the function
* pointer and it's input arguments and a single MPI_Request
* argument which needs to be completed, we make progress
* by calling MPI_Test. In this initial example, we loop
* until the request is completed as indicated by a non-zero
* flag variable.
* Purpose: IOC function to complete outstanding I/O requests.
* Currently just a wrapper around MPI_Waitall on the given
* MPI_Request array.
*
* As we go further with the implementation, we anticipate that
* rather than testing a single request variable, we will
* deal with a collection of all pending IO requests (on
* this rank).
* Return: Non-negative on success/Negative on failure
*
* Return: an integer status. Zero(0) indicates success. Negative
* values (-1) indicates an error.
*-------------------------------------------------------------------------
*/
static int
async_completion(void *arg)
herr_t
ioc__async_completion(MPI_Request *mpi_reqs, size_t num_reqs)
{
int n_reqs;
int mpi_code;
int ret_value = 0;
struct async_arg {
int n_reqs;
MPI_Request *sf_reqs;
} *in_progress = (struct async_arg *)arg;
herr_t ret_value = SUCCEED;
int mpi_code;
HDassert(arg);
HDassert(mpi_reqs);
n_reqs = in_progress->n_reqs;
if (n_reqs < 0) {
#ifdef H5FD_IOC_DEBUG
HDprintf("%s: invalid number of in progress I/O requests\n", __func__);
#endif
ret_value = -1;
goto done;
}
if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_reqs, in_progress->sf_reqs, MPI_STATUSES_IGNORE))) {
#ifdef H5FD_IOC_DEBUG
HDprintf("%s: MPI_Waitall failed with rc %d\n", __func__, mpi_code);
#endif
ret_value = -1;
goto done;
}
H5_CHECK_OVERFLOW(num_reqs, size_t, int);
if (MPI_SUCCESS != (mpi_code = MPI_Waitall((int)num_reqs, mpi_reqs, MPI_STATUSES_IGNORE)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code);
done:
H5_SUBFILING_FUNC_LEAVE;

View File

@ -394,26 +394,15 @@ typedef struct ioc_io_queue {
* input arguments for the functions which were originally
* invoked. See below.
*/
typedef struct _client_io_args {
int ioc; /* ID of the IO Concentrator handling this IO. */
int64_t context_id; /* The context id provided for the read or write */
int64_t offset; /* The file offset for the IO operation */
int64_t elements; /* How many bytes */
void *data; /* A pointer to the (contiguous) data segment */
MPI_Request io_req; /* An MPI request to allow the code to loop while */
/* making progress on multiple IOs */
} io_args_t;
typedef struct _client_io_func {
int (*io_function)(void *this_io); /* pointer to a completion function */
io_args_t io_args; /* arguments passed to the completion function */
int pending; /* The function is complete (0) or pending (1)? */
} io_func_t;
typedef struct _io_req {
struct _io_req *prev; /* A simple list structure containing completion */
struct _io_req *next; /* functions. These should get removed as IO ops */
io_func_t completion_func; /* are completed */
int ioc; /* ID of the IO Concentrator handling this IO. */
int64_t context_id; /* The context id provided for the read or write */
int64_t offset; /* The file offset for the IO operation */
int64_t elements; /* How many bytes */
void *data; /* A pointer to the (contiguous) data segment */
MPI_Request io_transfer_req; /* MPI request for Isend/Irecv of I/O data */
MPI_Request io_comp_req; /* MPI request signifying when actual I/O is finished */
int io_comp_tag; /* MPI tag value used for completed I/O request */
} io_req_t;
extern int *H5FD_IOC_tag_ub_val_ptr;
@ -425,10 +414,12 @@ extern "C" {
H5_DLL int initialize_ioc_threads(void *_sf_context);
H5_DLL int finalize_ioc_threads(void *_sf_context);
H5_DLL herr_t ioc__write_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset,
int64_t elements, const void *data, io_req_t **io_req);
H5_DLL herr_t ioc__read_independent_async(int64_t context_id, int n_io_concentrators, int64_t offset,
int64_t elements, void *data, io_req_t **io_req);
H5_DLL herr_t ioc__write_independent_async(int64_t context_id, int64_t offset, int64_t elements,
const void *data, io_req_t **io_req);
H5_DLL herr_t ioc__read_independent_async(int64_t context_id, int64_t offset, int64_t elements, void *data,
io_req_t **io_req);
H5_DLL herr_t ioc__async_completion(MPI_Request *mpi_reqs, size_t num_reqs);
H5_DLL int wait_for_thread_main(void);

View File

@ -72,16 +72,16 @@ static double sf_queue_delay_time = 0.0;
static HG_THREAD_RETURN_TYPE ioc_thread_main(void *arg);
static int ioc_main(ioc_data_t *ioc_data);
static int ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
static int ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm,
uint32_t counter);
static int ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
static int ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm,
uint32_t counter);
static int ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
int subfile_rank);
static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size,
int subfile_rank);
static int ioc_file_truncate(int fd, int64_t length, int subfile_rank);
static int ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm);
int ioc_idx);
static int ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx);
static int ioc_file_truncate(sf_work_request_t *msg);
static int ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm);
static ioc_io_queue_entry_t *ioc_io_queue_alloc_entry(void);
static void ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_ptr);
@ -156,6 +156,8 @@ initialize_ioc_threads(void *_sf_context)
#endif
};
sf_context->ioc_data = ioc_data;
/* Initialize atomic vars */
atomic_init(&ioc_data->sf_ioc_ready, 0);
atomic_init(&ioc_data->sf_shutdown_flag, 0);
@ -194,7 +196,7 @@ initialize_ioc_threads(void *_sf_context)
t_end = MPI_Wtime();
#ifdef H5FD_IOC_DEBUG
if (sf_context->topology->subfile_rank == 0) {
if (sf_context->topology->ioc_idx == 0) {
HDprintf("%s: time = %lf seconds\n", __func__, (t_end - t_start));
HDfflush(stdout);
}
@ -202,8 +204,6 @@ initialize_ioc_threads(void *_sf_context)
#endif
sf_context->ioc_data = ioc_data;
done:
H5_SUBFILING_FUNC_LEAVE;
}
@ -245,6 +245,7 @@ finalize_ioc_threads(void *_sf_context)
ioc_data->io_queue.num_failed);
HDfree(ioc_data);
sf_context->ioc_data = NULL;
H5_SUBFILING_FUNC_LEAVE;
}
@ -346,7 +347,6 @@ ioc_main(ioc_data_t *ioc_data)
{
subfiling_context_t *context = NULL;
sf_work_request_t wk_req;
int subfile_rank;
int shutdown_requested;
int ret_value = 0;
@ -362,8 +362,6 @@ ioc_main(ioc_data_t *ioc_data)
* represent an open file).
*/
subfile_rank = context->sf_group_rank;
/* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */
atomic_store(&ioc_data->sf_ioc_ready, 1);
@ -415,11 +413,11 @@ ioc_main(ioc_data_t *ioc_data)
queue_start_time = MPI_Wtime();
wk_req.tag = tag;
wk_req.source = source;
wk_req.subfile_rank = subfile_rank;
wk_req.context_id = ioc_data->sf_context_id;
wk_req.start_time = queue_start_time;
wk_req.tag = tag;
wk_req.source = source;
wk_req.ioc_idx = context->topology->ioc_idx;
wk_req.context_id = ioc_data->sf_context_id;
wk_req.start_time = queue_start_time;
ioc_io_queue_add_entry(ioc_data, &wk_req);
@ -506,7 +504,7 @@ handle_work_request(void *arg)
subfiling_context_t *sf_context = NULL;
sf_work_request_t *msg = &(q_entry_ptr->wk_req);
ioc_data_t *ioc_data = NULL;
int64_t file_context_id = msg->header[2];
int64_t file_context_id = msg->context_id;
int op_ret;
hg_thread_ret_t ret_value = 0;
@ -524,27 +522,27 @@ handle_work_request(void *arg)
switch (msg->tag) {
case WRITE_INDEP:
op_ret = ioc_file_queue_write_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm,
op_ret = ioc_file_queue_write_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm,
q_entry_ptr->counter);
break;
case READ_INDEP:
op_ret = ioc_file_queue_read_indep(msg, msg->subfile_rank, msg->source, sf_context->sf_data_comm);
op_ret = ioc_file_queue_read_indep(msg, msg->ioc_idx, msg->source, sf_context->sf_data_comm,
q_entry_ptr->counter);
break;
case TRUNC_OP:
op_ret = ioc_file_truncate(sf_context->sf_fid, q_entry_ptr->wk_req.header[0],
sf_context->topology->subfile_rank);
op_ret = ioc_file_truncate(msg);
break;
case GET_EOF_OP:
op_ret = ioc_file_report_eof(msg, msg->subfile_rank, msg->source, sf_context->sf_eof_comm);
op_ret = ioc_file_report_eof(msg, sf_context->sf_eof_comm);
break;
default:
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id, "%s: IOC %d received unknown message with tag %x from rank %d",
__func__, msg->subfile_rank, msg->tag, msg->source);
__func__, msg->ioc_idx, msg->tag, msg->source);
#endif
op_ret = -1;
@ -555,11 +553,11 @@ handle_work_request(void *arg)
if (op_ret < 0) {
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(
file_context_id,
"%s: IOC %d request(%s) filename=%s from rank(%d), size=%ld, offset=%ld FAILED with ret %d",
__func__, msg->subfile_rank, translate_opcode((io_op_t)msg->tag), sf_context->sf_filename,
msg->source, msg->header[0], msg->header[1], op_ret);
H5_subfiling_log(file_context_id,
"%s: IOC %d request(%s) from rank(%d), (%" PRId64 ", %" PRId64 ", %" PRId64
") FAILED with ret %d",
__func__, msg->ioc_idx, translate_opcode((io_op_t)msg->tag), msg->source,
msg->header[0], msg->header[1], msg->header[2], op_ret);
#endif
q_entry_ptr->wk_ret = op_ret;
@ -686,15 +684,15 @@ from the thread pool threads...
*-------------------------------------------------------------------------
*/
static int
ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm,
uint32_t counter)
ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter)
{
subfiling_context_t *sf_context = NULL;
MPI_Status msg_status;
hbool_t send_nack = FALSE;
int64_t file_context_id;
int64_t data_size;
int64_t file_offset;
int64_t file_context_id;
int64_t subfile_idx;
int64_t stripe_id;
haddr_t sf_eof;
#ifdef H5FD_IOC_COLLECT_STATS
@ -714,10 +712,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
HDassert(msg);
file_context_id = msg->context_id;
/* Retrieve the fields of the RPC message for the write operation */
data_size = msg->header[0];
file_offset = msg->header[1];
file_context_id = msg->header[2];
data_size = msg->header[0];
file_offset = msg->header[1];
subfile_idx = msg->header[2];
if (data_size < 0) {
send_nack = TRUE;
@ -746,7 +746,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id,
"[ioc(%d) %s]: msg from %d: datasize=%ld\toffset=%ld, queue_delay = %lf seconds\n",
subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
ioc_idx, __func__, source, data_size, file_offset, t_queue_delay);
#endif
#endif
@ -764,12 +764,12 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
* allows us to distinguish between multiple concurrent
* writes from a single rank.
*/
HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= WRITE_TAG_BASE));
rcv_tag = (int)(counter % (INT_MAX - WRITE_TAG_BASE));
rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - WRITE_TAG_BASE);
rcv_tag += WRITE_TAG_BASE;
HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE));
rcv_tag = (int)(counter % (INT_MAX - IO_TAG_BASE));
rcv_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE);
rcv_tag += IO_TAG_BASE;
if (send_ack_to_client(rcv_tag, source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
if (send_ack_to_client(rcv_tag, source, ioc_idx, WRITE_INDEP_ACK, comm) < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send ACK to client");
/* Receive data from client */
@ -794,13 +794,14 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
t_start = t_end;
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n",
subfile_rank, __func__, data_size, source, mpi_code);
H5_subfiling_log(file_context_id, "[ioc(%d) %s] MPI_Recv(%ld bytes, from = %d) status = %d\n", ioc_idx,
__func__, data_size, source, mpi_code);
#endif
#endif
sf_fid = sf_context->sf_fid;
HDassert(subfile_idx < sf_context->sf_num_fids);
sf_fid = sf_context->sf_fids[subfile_idx];
#ifdef H5_SUBFILING_DEBUG
if (sf_fid < 0)
@ -810,7 +811,7 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
if (sf_fid >= 0) {
/* Actually write data received from client into subfile */
if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, subfile_rank)) < 0)
if ((write_ret = ioc_file_write_data(sf_fid, file_offset, recv_buf, data_size, ioc_idx)) < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_WRITEERROR, -1,
"write function(FID=%d, Source=%d) returned an error (%d)", sf_fid,
source, write_ret);
@ -834,10 +835,17 @@ ioc_file_queue_write_indep(sf_work_request_t *msg, int subfile_rank, int source,
H5FD_ioc_end_thread_exclusive();
/*
* Send a message back to the client that the I/O call has
* completed and it is safe to return from the write call
*/
if (MPI_SUCCESS != (mpi_code = MPI_Send(&rcv_tag, 1, MPI_INT, source, WRITE_DATA_DONE, comm)))
H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code);
done:
if (send_nack) {
/* Send NACK back to client so client can handle failure gracefully */
if (send_nack_to_client(source, subfile_rank, WRITE_INDEP_ACK, comm) < 0)
if (send_nack_to_client(source, ioc_idx, WRITE_INDEP_ACK, comm) < 0)
H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_WRITEERROR, -1, "couldn't send NACK to client");
}
@ -867,13 +875,16 @@ done:
*-------------------------------------------------------------------------
*/
static int
ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
ioc_file_queue_read_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm, uint32_t counter)
{
subfiling_context_t *sf_context = NULL;
hbool_t send_empty_buf = TRUE;
hbool_t send_nack = FALSE;
hbool_t need_data_tag = FALSE;
int64_t file_context_id;
int64_t data_size;
int64_t file_offset;
int64_t file_context_id;
int64_t subfile_idx;
#ifdef H5FD_IOC_COLLECT_STATS
double t_start;
double t_end;
@ -881,6 +892,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
double t_queue_delay;
#endif
char *send_buf = NULL;
int send_tag;
int sf_fid;
int read_ret;
int mpi_code;
@ -888,17 +900,37 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
HDassert(msg);
/* Retrieve the fields of the RPC message for the read operation */
data_size = msg->header[0];
file_offset = msg->header[1];
file_context_id = msg->header[2];
if (data_size < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read");
file_context_id = msg->context_id;
sf_context = H5_get_subfiling_object(file_context_id);
HDassert(sf_context);
/*
* If we are using 1 subfile per IOC, we can optimize reads
* a little since each read will go to a separate IOC and we
* won't be in danger of data being received in an
* unpredictable order. However, if some IOCs own more than
* 1 subfile, we need to associate each read with a unique
* message tag to make sure the data is received in the
* correct order.
*/
need_data_tag = sf_context->sf_num_subfiles != sf_context->topology->n_io_concentrators;
if (!need_data_tag)
send_tag = READ_INDEP_DATA;
/* Retrieve the fields of the RPC message for the read operation */
data_size = msg->header[0];
file_offset = msg->header[1];
subfile_idx = msg->header[2];
if (data_size < 0) {
if (need_data_tag) {
send_nack = TRUE;
send_empty_buf = FALSE;
}
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "invalid data size for read");
}
/* Flag that we've attempted to read data from the file */
sf_context->sf_read_count++;
@ -911,22 +943,48 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id,
"[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n",
subfile_rank, __func__, source, data_size, file_offset, t_queue_delay);
"[ioc(%d) %s] msg from %d: datasize=%ld\toffset=%ld queue_delay=%lf seconds\n", ioc_idx,
__func__, source, data_size, file_offset, t_queue_delay);
#endif
#endif
/* Allocate space to send data read from file to client */
if (NULL == (send_buf = HDmalloc((size_t)data_size)))
if (NULL == (send_buf = HDmalloc((size_t)data_size))) {
if (need_data_tag) {
send_nack = TRUE;
send_empty_buf = FALSE;
}
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, -1, "couldn't allocate send buffer for data");
}
sf_fid = sf_context->sf_fid;
if (need_data_tag) {
/*
* Calculate message tag for the client to use for receiving
* data, then send an ACK message to the client with the
* calculated message tag. This calculated message tag
* allows us to distinguish between multiple concurrent
* reads from a single rank, which can happen when a rank
* owns multiple subfiles.
*/
HDassert(H5FD_IOC_tag_ub_val_ptr && (*H5FD_IOC_tag_ub_val_ptr >= IO_TAG_BASE));
send_tag = (int)(counter % (INT_MAX - IO_TAG_BASE));
send_tag %= (*H5FD_IOC_tag_ub_val_ptr - IO_TAG_BASE);
send_tag += IO_TAG_BASE;
if (send_ack_to_client(send_tag, source, ioc_idx, READ_INDEP_ACK, comm) < 0) {
send_empty_buf = FALSE;
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send ACK to client");
}
}
/* Read data from the subfile */
HDassert(subfile_idx < sf_context->sf_num_fids);
sf_fid = sf_context->sf_fids[subfile_idx];
if (sf_fid < 0)
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_BADVALUE, -1, "subfile file descriptor %d is invalid", sf_fid);
/* Read data from the subfile */
if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, subfile_rank)) < 0) {
if ((read_ret = ioc_file_read_data(sf_fid, file_offset, send_buf, data_size, ioc_idx)) < 0) {
H5_SUBFILING_GOTO_ERROR(H5E_IO, H5E_READERROR, read_ret,
"read function(FID=%d, Source=%d) returned an error (%d)", sf_fid, source,
read_ret);
@ -936,8 +994,7 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
/* Send read data to the client */
H5_CHECK_OVERFLOW(data_size, int64_t, int);
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, READ_INDEP_DATA, comm)))
if (MPI_SUCCESS != (mpi_code = MPI_Send(send_buf, (int)data_size, MPI_BYTE, source, send_tag, comm)))
H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send failed", mpi_code);
#ifdef H5FD_IOC_COLLECT_STATS
@ -947,19 +1004,24 @@ ioc_file_queue_read_indep(sf_work_request_t *msg, int subfile_rank, int source,
sf_queue_delay_time += t_queue_delay;
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", subfile_rank,
H5_subfiling_log(sf_context->sf_context_id, "[ioc(%d)] MPI_Send to source(%d) completed\n", ioc_idx,
source);
#endif
#endif
done:
if (need_data_tag && send_nack) {
/* Send NACK back to client so client can handle failure gracefully */
if (send_nack_to_client(source, ioc_idx, READ_INDEP_ACK, comm) < 0)
H5_SUBFILING_DONE_ERROR(H5E_IO, H5E_READERROR, -1, "couldn't send NACK to client");
}
if (send_empty_buf) {
/*
* Send an empty message back to client on failure. The client will
* likely get a message truncation error, but at least shouldn't hang.
*/
if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, READ_INDEP_DATA, comm)))
if (MPI_SUCCESS != (mpi_code = MPI_Send(NULL, 0, MPI_BYTE, source, send_tag, comm)))
H5_SUBFILING_MPI_DONE_ERROR(-1, "MPI_Send failed", mpi_code);
}
@ -978,7 +1040,7 @@ being thread safe.
*/
static int
ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx)
{
ssize_t bytes_remaining = (ssize_t)data_size;
ssize_t bytes_written = 0;
@ -986,7 +1048,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data
int ret_value = 0;
#ifndef H5FD_IOC_DEBUG
(void)subfile_rank;
(void)ioc_idx;
#endif
HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
@ -1000,7 +1062,7 @@ ioc_file_write_data(int fd, int64_t file_offset, void *data_buffer, int64_t data
bytes_remaining -= bytes_written;
#ifdef H5FD_IOC_DEBUG
HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
HDprintf("[ioc(%d) %s]: wrote %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx,
__func__, bytes_written, bytes_remaining, file_offset);
#endif
@ -1024,7 +1086,7 @@ done:
} /* end ioc_file_write_data() */
static int
ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int subfile_rank)
ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_size, int ioc_idx)
{
useconds_t delay = 100;
ssize_t bytes_remaining = (ssize_t)data_size;
@ -1034,7 +1096,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
int ret_value = 0;
#ifndef H5FD_IOC_DEBUG
(void)subfile_rank;
(void)ioc_idx;
#endif
HDcompile_assert(H5_SIZEOF_OFF_T == sizeof(file_offset));
@ -1052,7 +1114,7 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
bytes_remaining -= bytes_read;
#ifdef H5FD_IOC_DEBUG
HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", subfile_rank,
HDprintf("[ioc(%d) %s]: read %ld bytes, remaining=%ld, file_offset=%" PRId64 "\n", ioc_idx,
__func__, bytes_read, bytes_remaining, file_offset);
#endif
@ -1069,8 +1131,8 @@ ioc_file_read_data(int fd, int64_t file_offset, void *data_buffer, int64_t data_
else {
if (retries == 0) {
#ifdef H5FD_IOC_DEBUG
HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", subfile_rank,
__func__, file_offset, data_size);
HDprintf("[ioc(%d) %s]: TIMEOUT: file_offset=%" PRId64 ", data_size=%ld\n", ioc_idx, __func__,
file_offset, data_size);
#endif
H5_SUBFILING_SYS_GOTO_ERROR(H5E_IO, H5E_READERROR, -1, "HDpread failed");
@ -1087,19 +1149,40 @@ done:
} /* end ioc_file_read_data() */
static int
ioc_file_truncate(int fd, int64_t length, int subfile_rank)
ioc_file_truncate(sf_work_request_t *msg)
{
int ret_value = 0;
subfiling_context_t *sf_context = NULL;
int64_t file_context_id;
int64_t length;
int64_t subfile_idx;
int fd;
int ioc_idx;
int ret_value = 0;
HDassert(msg);
file_context_id = msg->context_id;
ioc_idx = msg->ioc_idx;
length = msg->header[0];
subfile_idx = msg->header[1];
#ifndef H5FD_IOC_DEBUG
(void)subfile_rank;
(void)ioc_idx;
#endif
if (NULL == (sf_context = H5_get_subfiling_object(file_context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context");
HDassert(subfile_idx < sf_context->sf_num_fids);
fd = sf_context->sf_fids[subfile_idx];
if (HDftruncate(fd, (off_t)length) != 0)
H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SEEKERROR, -1, "HDftruncate failed");
#ifdef H5FD_IOC_DEBUG
HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", subfile_rank, __func__,
HDprintf("[ioc(%d) %s]: truncated subfile to %lld bytes. ret = %d\n", ioc_idx, __func__,
(long long)length, errno);
HDfflush(stdout);
#endif
@ -1111,7 +1194,7 @@ done:
/*-------------------------------------------------------------------------
* Function: ioc_file_report_eof
*
* Purpose: Determine the target sub-file's eof and report this value
* Purpose: Determine the target subfile's eof and report this value
* to the requesting rank.
*
* Notes: This function will have to be reworked once we solve
@ -1131,40 +1214,48 @@ done:
*/
static int
ioc_file_report_eof(sf_work_request_t *msg, int subfile_rank, int source, MPI_Comm comm)
ioc_file_report_eof(sf_work_request_t *msg, MPI_Comm comm)
{
subfiling_context_t *sf_context = NULL;
h5_stat_t sb;
int64_t eof_req_reply[3];
int64_t file_context_id;
int64_t subfile_idx;
int fd;
int source;
int ioc_idx;
int mpi_code;
int ret_value = 0;
HDassert(msg);
/* first get the EOF of the target file. */
file_context_id = msg->context_id;
source = msg->source;
ioc_idx = msg->ioc_idx;
file_context_id = msg->header[2];
subfile_idx = msg->header[0];
if (NULL == (sf_context = H5_get_subfiling_object(file_context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_CANTGET, -1, "couldn't retrieve subfiling context");
fd = sf_context->sf_fid;
HDassert(subfile_idx < sf_context->sf_num_fids);
fd = sf_context->sf_fids[subfile_idx];
if (HDfstat(fd, &sb) < 0)
H5_SUBFILING_SYS_GOTO_ERROR(H5E_FILE, H5E_SYSERRSTR, -1, "HDfstat failed");
eof_req_reply[0] = (int64_t)subfile_rank;
eof_req_reply[0] = (int64_t)ioc_idx;
eof_req_reply[1] = (int64_t)(sb.st_size);
eof_req_reply[2] = 0; /* not used */
eof_req_reply[2] = subfile_idx;
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(file_context_id, "%s: reporting file EOF as %" PRId64 ".", __func__, eof_req_reply[1]);
#endif
/* return the subfile EOF to the querying rank */
if (MPI_SUCCESS != (mpi_code = MPI_Send(eof_req_reply, 3, MPI_INT64_T, source, GET_EOF_COMPLETED, comm)))
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(eof_req_reply, 1, H5_subfiling_rpc_msg_type, source, GET_EOF_COMPLETED, comm)))
H5_SUBFILING_MPI_GOTO_ERROR(-1, "MPI_Send", mpi_code);
done:
@ -1272,12 +1363,13 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1);
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(wk_req_ptr->context_id,
"%s: request %d queued. op = %d, offset/len = %lld/%lld, q-ed/disp/ops_pend = %d/%d/%d.",
__func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
atomic_load(&ioc_data->sf_io_ops_pending));
H5_subfiling_log(
wk_req_ptr->context_id,
"%s: request %d queued. op = %d, req = (%lld, %lld, %lld), q-ed/disp/ops_pend = %d/%d/%d.", __func__,
entry_ptr->counter, (entry_ptr->wk_req.tag), (long long)(entry_ptr->wk_req.header[0]),
(long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[2]),
ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
atomic_load(&ioc_data->sf_io_ops_pending));
#endif
HDassert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
@ -1478,14 +1570,14 @@ ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, hbool_t try_lock)
entry_ptr->thread_wk.args = entry_ptr;
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(entry_ptr->wk_req.context_id,
"%s: request %d dispatched. op = %d, offset/len = %lld/%lld, "
"q-ed/disp/ops_pend = %d/%d/%d.",
__func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[1]),
(long long)(entry_ptr->wk_req.header[0]), ioc_data->io_queue.num_pending,
ioc_data->io_queue.num_in_progress,
atomic_load(&ioc_data->sf_io_ops_pending));
H5_subfiling_log(
entry_ptr->wk_req.context_id,
"%s: request %d dispatched. op = %d, req = (%lld, %lld, %lld), "
"q-ed/disp/ops_pend = %d/%d/%d.",
__func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
(long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
#endif
#ifdef H5FD_IOC_COLLECT_STATS
@ -1564,12 +1656,12 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt
#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(entry_ptr->wk_req.context_id,
"%s: request %d completed with ret %d. op = %d, offset/len = %lld/%lld, "
"%s: request %d completed with ret %d. op = %d, req = (%lld, %lld, %lld), "
"q-ed/disp/ops_pend = %d/%d/%d.",
__func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[0]),
ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
atomic_load(&ioc_data->sf_io_ops_pending));
(long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
(long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
/*
* If this I/O request is a truncate or "get eof" op, make sure

View File

@ -30,11 +30,11 @@
* Note: This code should be moved -- most likely to the IOC
* code files.
*
* Purpose: Apply a truncate operation to the sub-files.
* Purpose: Apply a truncate operation to the subfiles.
*
* In the context of the I/O concentrators, the eof must be
* translated into the appropriate value for each of the
* sub-files, and then applied to same.
* subfiles, and then applied to same.
*
* Further, we must ensure that all prior I/O requests complete
* before the truncate is applied.
@ -44,7 +44,7 @@
* 1) Run a barrier on entry.
*
* 2) Determine if this rank is a IOC. If it is, compute
* the correct EOF for this sub-file, and send a truncate
* the correct EOF for this subfile, and send a truncate
* request to the IOC.
*
* 3) On the IOC thread, allow all pending I/O requests
@ -72,50 +72,61 @@
herr_t
H5FD__subfiling__truncate_sub_files(hid_t context_id, int64_t logical_file_eof, MPI_Comm comm)
{
int mpi_code; /* MPI return code */
subfiling_context_t *sf_context = NULL;
int64_t msg[3] = {
0,
};
herr_t ret_value = SUCCEED; /* Return value */
int64_t msg[3] = {0};
int mpi_size;
int mpi_code;
herr_t ret_value = SUCCEED;
if (MPI_SUCCESS != (mpi_code = MPI_Comm_size(comm, &mpi_size)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Comm_size failed", mpi_code);
/* Barrier on entry */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);
if (mpi_size > 1)
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);
if (NULL == (sf_context = (subfiling_context_t *)H5_get_subfiling_object(context_id)))
H5_SUBFILING_GOTO_ERROR(H5E_FILE, H5E_BADVALUE, FAIL, "can't get subfile context");
/* Test to see if this rank is running an I/O concentrator. */
if (sf_context->topology->rank_is_ioc) {
int i;
int64_t subfile_eof;
int64_t num_full_stripes;
int64_t num_leftover_stripes;
int64_t partial_stripe_len;
#ifndef NDEBUG
int64_t test_file_eof;
#endif /* NDEBUG */
/* if it is, first compute the sub-file EOF */
num_full_stripes = logical_file_eof / sf_context->sf_blocksize_per_stripe;
partial_stripe_len = logical_file_eof % sf_context->sf_blocksize_per_stripe;
num_leftover_stripes = partial_stripe_len / sf_context->sf_stripe_size;
num_full_stripes = logical_file_eof / sf_context->sf_blocksize_per_stripe;
partial_stripe_len = logical_file_eof % sf_context->sf_blocksize_per_stripe;
/* Compute the EOF for each subfile this IOC owns */
for (int i = 0; i < sf_context->sf_num_fids; i++) {
int64_t subfile_eof = num_full_stripes * sf_context->sf_stripe_size;
int64_t global_subfile_idx;
subfile_eof = num_full_stripes * sf_context->sf_stripe_size;
global_subfile_idx =
(i * sf_context->topology->n_io_concentrators) + sf_context->topology->ioc_idx;
if (sf_context->topology->subfile_rank < (partial_stripe_len / sf_context->sf_stripe_size)) {
if (global_subfile_idx < num_leftover_stripes) {
subfile_eof += sf_context->sf_stripe_size;
}
else if (global_subfile_idx == num_leftover_stripes) {
subfile_eof += partial_stripe_len % sf_context->sf_stripe_size;
}
subfile_eof += sf_context->sf_stripe_size;
}
else if (sf_context->topology->subfile_rank == (partial_stripe_len / sf_context->sf_stripe_size)) {
/* Direct the IOC to truncate this subfile to the correct EOF */
msg[0] = subfile_eof;
msg[1] = i;
msg[2] = -1; /* padding -- not used in this message */
subfile_eof += partial_stripe_len % sf_context->sf_stripe_size;
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type,
sf_context->topology->io_concentrators[sf_context->topology->ioc_idx],
TRUNC_OP, sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
}
/* sanity check -- compute the file eof using the same mechanism used to
* compute the sub-file eof. Assert that the computed value and the
* compute the subfile eof. Assert that the computed value and the
* actual value match.
*
* Do this only for debug builds -- probably delete this before release.
@ -124,40 +135,29 @@ H5FD__subfiling__truncate_sub_files(hid_t context_id, int64_t logical_file_eof,
*/
#ifndef NDEBUG
test_file_eof = 0;
{
int64_t test_file_eof = 0;
for (i = 0; i < sf_context->topology->n_io_concentrators; i++) {
for (int i = 0; i < sf_context->sf_num_subfiles; i++) {
test_file_eof += num_full_stripes * sf_context->sf_stripe_size;
test_file_eof += num_full_stripes * sf_context->sf_stripe_size;
if (i < (partial_stripe_len / sf_context->sf_stripe_size)) {
test_file_eof += sf_context->sf_stripe_size;
if (i < num_leftover_stripes) {
test_file_eof += sf_context->sf_stripe_size;
}
else if (i == num_leftover_stripes) {
test_file_eof += partial_stripe_len % sf_context->sf_stripe_size;
}
}
else if (i == (partial_stripe_len / sf_context->sf_stripe_size)) {
test_file_eof += partial_stripe_len % sf_context->sf_stripe_size;
}
HDassert(test_file_eof == logical_file_eof);
}
HDassert(test_file_eof == logical_file_eof);
#endif /* NDEBUG */
/* then direct the IOC to truncate the sub-file to the correct EOF */
msg[0] = subfile_eof;
msg[1] = 0; /* padding -- not used in this message */
msg[2] = context_id;
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(msg, 3, MPI_INT64_T,
sf_context->topology->io_concentrators[sf_context->topology->subfile_rank],
TRUNC_OP, sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send failed", mpi_code);
}
/* Barrier on exit */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);
if (mpi_size > 1)
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code);
done:
@ -176,9 +176,10 @@ done:
* Do this as follows:
*
* 1) allocate an array of int64_t of length equal to the
* the number of IOCs, and initialize all fields to -1.
* the number of subfiles, and initialize all fields to -1.
*
* 2) Send each IOC a message requesting that sub-file's EOF.
* 2) Send each subfile's IOC a message requesting that
* subfile's EOF.
*
* 3) Await reply from each IOC, storing the reply in
* the appropriate entry in the array allocated in 1.
@ -197,13 +198,13 @@ done:
* than for the more traditional HDF5 file implementations.
* This statement derives from the fact that unlike "normal"
* HDF5 files, subfiling introduces a multi-file representation
* of a single HDF5 file. The plurality of sub-files represents
* a software RAID-0 based HDF5 file. As such, each sub-file
* of a single HDF5 file. The plurality of subfiles represents
* a software RAID-0 based HDF5 file. As such, each subfile
* contains a designated portion of the address space of the
* virtual HDF5 storage. We have no notion of HDF5 datatypes,
* datasets, metadata, or other HDF5 structures; only BYTES.
*
* The organization of the bytes within sub-files is consistent
* The organization of the bytes within subfiles is consistent
* with the RAID-0 striping, i.e. there are IO Concentrators
* (IOCs) which correspond to a stripe-count (in Lustre) as
* well as a stripe_size. The combination of these two
@ -220,7 +221,7 @@ done:
* follows.
* 1. At file creation, each IOC is assigned a rank value
* (0 to N-1, where N is the total number of IOCs) and
* a 'sf_base_addr' = 'subfile_rank' * 'sf_stripe_size')
* a 'sf_base_addr' = 'ioc_idx' * 'sf_stripe_size')
* we also determine the 'sf_blocksize_per_stripe' which
* is simply the 'sf_stripe_size' * 'n_ioc_concentrators'
*
@ -263,9 +264,10 @@ H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
int64_t msg[3] = {0, 0, 0};
int64_t logical_eof = 0;
int64_t sf_logical_eof;
int n_io_concentrators = 0; /* copy of value in topology */
int mpi_code; /* MPI return code */
herr_t ret_value = SUCCEED; /* Return value */
int n_io_concentrators = 0;
int num_subfiles = 0;
int mpi_code; /* MPI return code */
herr_t ret_value = SUCCEED; /* Return value */
HDassert(logical_eof_ptr);
@ -275,56 +277,60 @@ H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
HDassert(sf_context->topology);
n_io_concentrators = sf_context->topology->n_io_concentrators;
num_subfiles = sf_context->sf_num_subfiles;
HDassert(n_io_concentrators > 0);
HDassert(num_subfiles >= n_io_concentrators);
if (NULL == (sf_eofs = HDmalloc((size_t)n_io_concentrators * sizeof(int64_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate sub-file EOFs array");
if (NULL == (recv_reqs = HDmalloc((size_t)n_io_concentrators * sizeof(*recv_reqs))))
if (NULL == (sf_eofs = HDmalloc((size_t)num_subfiles * sizeof(int64_t))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate subfile EOFs array");
if (NULL == (recv_reqs = HDmalloc((size_t)num_subfiles * sizeof(*recv_reqs))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate receive requests array");
if (NULL == (recv_msg = HDmalloc((size_t)n_io_concentrators * 3 * sizeof(*recv_msg))))
if (NULL == (recv_msg = HDmalloc((size_t)num_subfiles * sizeof(msg))))
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "can't allocate message array");
for (int i = 0; i < n_io_concentrators; i++) {
for (int i = 0; i < num_subfiles; i++) {
sf_eofs[i] = -1;
recv_reqs[i] = MPI_REQUEST_NULL;
}
/* Post early non-blocking receives for replies from each IOC */
for (int i = 0; i < n_io_concentrators; i++) {
int ioc_rank = sf_context->topology->io_concentrators[i];
/* Post early non-blocking receives for the EOF of each subfile */
for (int i = 0; i < num_subfiles; i++) {
int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];
if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&recv_msg[3 * i], 3, MPI_INT64_T, ioc_rank,
if (MPI_SUCCESS != (mpi_code = MPI_Irecv(&recv_msg[3 * i], 1, H5_subfiling_rpc_msg_type, ioc_rank,
GET_EOF_COMPLETED, sf_context->sf_eof_comm, &recv_reqs[i])))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Irecv", mpi_code);
}
/* Send each IOC a message requesting that subfile's EOF */
/* Send each subfile's IOC a message requesting that subfile's EOF */
msg[0] = 0; /* padding -- not used in this message */
msg[1] = 0; /* padding -- not used in this message */
msg[2] = context_id;
msg[1] = -1; /* padding -- not used in this message */
msg[2] = -1; /* padding -- not used in this message */
for (int i = 0; i < n_io_concentrators; i++) {
int ioc_rank = sf_context->topology->io_concentrators[i];
for (int i = 0; i < num_subfiles; i++) {
int ioc_rank = sf_context->topology->io_concentrators[i % n_io_concentrators];
if (MPI_SUCCESS !=
(mpi_code = MPI_Send(msg, 3, MPI_INT64_T, ioc_rank, GET_EOF_OP, sf_context->sf_msg_comm)))
/* Set subfile index for receiving IOC */
msg[0] = i / n_io_concentrators;
if (MPI_SUCCESS != (mpi_code = MPI_Send(msg, 1, H5_subfiling_rpc_msg_type, ioc_rank, GET_EOF_OP,
sf_context->sf_msg_comm)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Send", mpi_code);
}
/* Wait for EOF communication to complete */
if (MPI_SUCCESS != (mpi_code = MPI_Waitall(n_io_concentrators, recv_reqs, MPI_STATUSES_IGNORE)))
if (MPI_SUCCESS != (mpi_code = MPI_Waitall(num_subfiles, recv_reqs, MPI_STATUSES_IGNORE)))
H5_SUBFILING_MPI_GOTO_ERROR(FAIL, "MPI_Waitall", mpi_code);
for (int i = 0; i < n_io_concentrators; i++) {
for (int i = 0; i < num_subfiles; i++) {
int ioc_rank = (int)recv_msg[3 * i];
HDassert(ioc_rank >= 0);
HDassert(ioc_rank < n_io_concentrators);
HDassert(sf_eofs[ioc_rank] == -1);
HDassert(sf_eofs[i] == -1);
sf_eofs[ioc_rank] = recv_msg[(3 * i) + 1];
sf_eofs[i] = recv_msg[(3 * i) + 1];
}
/* 4) After all IOCs have replied, compute the offset of
@ -333,21 +339,21 @@ H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
* EOF.
*/
for (int i = 0; i < n_io_concentrators; i++) {
for (int i = 0; i < num_subfiles; i++) {
/* compute number of complete stripes */
sf_logical_eof = sf_eofs[i] / sf_context->sf_stripe_size;
/* multiply by stripe size */
sf_logical_eof *= sf_context->sf_stripe_size * n_io_concentrators;
sf_logical_eof *= sf_context->sf_stripe_size * num_subfiles;
/* if the sub-file doesn't end on a stripe size boundary, must add in a partial stripe */
/* if the subfile doesn't end on a stripe size boundary, must add in a partial stripe */
if (sf_eofs[i] % sf_context->sf_stripe_size > 0) {
/* add in the size of the partial stripe up to but not including this subfile */
sf_logical_eof += i * sf_context->sf_stripe_size;
/* finally, add in the number of bytes in the last partial stripe depth in the sub-file */
/* finally, add in the number of bytes in the last partial stripe depth in the subfile */
sf_logical_eof += sf_eofs[i] % sf_context->sf_stripe_size;
}
@ -365,7 +371,7 @@ H5FD__subfiling__get_real_eof(hid_t context_id, int64_t *logical_eof_ptr)
done:
if (ret_value < 0) {
for (int i = 0; i < n_io_concentrators; i++) {
for (int i = 0; i < num_subfiles; i++) {
if (recv_reqs && (recv_reqs[i] != MPI_REQUEST_NULL)) {
if (MPI_SUCCESS != (mpi_code = MPI_Cancel(&recv_reqs[i])))
H5_SUBFILING_MPI_DONE_ERROR(FAIL, "MPI_Cancel", mpi_code);

File diff suppressed because it is too large Load Diff

View File

@ -48,21 +48,51 @@
/**
* \def H5FD_SUBFILING_DEFAULT_STRIPE_SIZE
* The default stripe size (in bytes) for data stripes in sub-files
* The default stripe size (in bytes) for data stripes in subfiles
*/
#define H5FD_SUBFILING_DEFAULT_STRIPE_SIZE (32 * 1024 * 1024)
/**
* \def H5FD_SUBFILING_FILENAME_TEMPLATE
* The basic template for a sub-file filename
* \def H5FD_SUBFILING_DEFAULT_STRIPE_COUNT
* Macro for the default Subfiling stripe count value. The default
* is currently to use one subfile per node.
*/
#define H5FD_SUBFILING_FILENAME_TEMPLATE ".subfile_%" PRIu64 "_%0*d_of_%d"
#define H5FD_SUBFILING_DEFAULT_STRIPE_COUNT -1
/**
* \def H5FD_SUBFILING_FILENAME_TEMPLATE
* The basic template for a subfile filename. The format specifiers
* correspond to:
*
* %s -> base filename, e.g. "file.h5"
* %PRIu64 -> file inode, e.g. 11273556
* %0*d -> number (starting at 1) signifying the Nth (out of total
* number of subfiles) subfile. Zero-padded according
* to the number of digits in the number of subfiles
* (calculated by log10(num_subfiles) + 1)
* %d -> number of subfiles
*
* yielding filenames such as:
*
* file.h5.subfile_11273556_01_of_10
* file.h5.subfile_11273556_02_of_10
* file.h5.subfile_11273556_10_of_10
*/
#define H5FD_SUBFILING_FILENAME_TEMPLATE "%s.subfile_%" PRIu64 "_%0*d_of_%d"
/**
* \def H5FD_SUBFILING_CONFIG_FILENAME_TEMPLATE
* The basic template for a #H5FD_SUBFILING driver configuration filename
* The basic template for a #H5FD_SUBFILING driver configuration filename.
* The format specifiers correspond to:
*
* %s -> base filename, e.g. "file.h5"
* %PRIu64 -> file inode, e.g. 11273556
*
* yielding a filename such as:
*
* file.h5.subfile_11273556.config
*/
#define H5FD_SUBFILING_CONFIG_FILENAME_TEMPLATE ".subfile_%" PRIu64 ".config"
#define H5FD_SUBFILING_CONFIG_FILENAME_TEMPLATE "%s.subfile_%" PRIu64 ".config"
/*
* Environment variables interpreted by the HDF5 Subfiling feature
@ -71,7 +101,7 @@
/**
* \def H5FD_SUBFILING_STRIPE_SIZE
* Macro for name of the environment variable that specifies the size
* (in bytes) for data stripes in sub-files
* (in bytes) for data stripes in subfiles
*
* The value set for this environment variable is interpreted as a
* long long value and must be > 0.
@ -112,7 +142,7 @@
/**
* \def H5FD_SUBFILING_SUBFILE_PREFIX
* Macro for name of the environment variable that specifies a prefix
* to apply to the filenames generated for sub-files
* to apply to the filenames generated for subfiles
*
* The value set for this environment variable is interpreted as a
* pathname.
@ -153,53 +183,56 @@
* Unused. Sentinel value
*/
typedef enum {
SELECT_IOC_ONE_PER_NODE = 0, /* Default */
SELECT_IOC_ONE_PER_NODE = 0, /* Default */
SELECT_IOC_EVERY_NTH_RANK, /* Starting at rank 0, select-next += N */
SELECT_IOC_WITH_CONFIG, /* NOT IMPLEMENTED: Read-from-file */
SELECT_IOC_TOTAL, /* Starting at rank 0, mpi_size / total */
ioc_selection_options /* Sentinel value */
SELECT_IOC_WITH_CONFIG, /* NOT IMPLEMENTED: Read-from-file */
SELECT_IOC_TOTAL, /* Starting at rank 0, mpi_size / total */
ioc_selection_options /* Sentinel value */
} H5FD_subfiling_ioc_select_t;
/**
* \struct H5FD_subfiling_shared_config_t
* \brief Subfiling configuration structure that is shared between the #H5FD_SUBFILING
* \struct H5FD_subfiling_params_t
* \brief Subfiling parameter structure that is shared between the #H5FD_SUBFILING
* and #H5FD_IOC drivers
*
* \var H5FD_subfiling_ioc_select_t H5FD_subfiling_shared_config_t::ioc_selection
* \var H5FD_subfiling_ioc_select_t H5FD_subfiling_params_t::ioc_selection
* The method to use for selecting MPI ranks to be I/O concentrators. The
* current default is to select one MPI rank per node to be an I/O concentrator.
* Refer to #H5FD_subfiling_ioc_select_t for a description of the algorithms
* available for use.
*
* \var int64_t H5FD_subfiling_shared_config_t::stripe_size
* \var int64_t H5FD_subfiling_params_t::stripe_size
* The stripe size defines the size (in bytes) of the data stripes in the
* sub-files for the logical HDF5 file. Data is striped across the sub-files
* subfiles for the logical HDF5 file. Data is striped across the subfiles
* in a round-robin wrap-around fashion in segments equal to the stripe size.
*
* For example, in an HDF5 file consisting of four sub-files with a 1MiB stripe
* size, the first and fifth 1MiB of data would reside in the first sub-file,
* the second and sixth 1MiB of data would reside in the second sub-file and so
* For example, in an HDF5 file consisting of four subfiles with a 1MiB stripe
* size, the first and fifth 1MiB of data would reside in the first subfile,
* the second and sixth 1MiB of data would reside in the second subfile and so
* on.
*
* This value can also be set or adjusted with the #H5FD_SUBFILING_STRIPE_SIZE
* environment variable.
*
* \var int32_t H5FD_subfiling_shared_config_t::stripe_count
* The number of I/O concentrators (and, currently, the number of sub-files)
* to use for the logical HDF5 file. This value is used in conjunction with
* the IOC selection method to determine which MPI ranks will be assigned as
* I/O concentrators.
* \var int32_t H5FD_subfiling_params_t::stripe_count
* The target number of subfiles to use for the logical HDF5 file. The current
* default is to use one subfile per node, but it can be useful to set a
* different target number of subfiles, especially if the HDF5 application will
* pre-create the HDF5 file on a single MPI rank. In that particular case, the
* single rank will need to know how many subfiles the logical HDF5 file will
* consist of in order to properly pre-create the file.
*
* Alternatively, the mapping between MPI ranks and I/O concentrators can be
* set or adjusted with a combination of the #ioc_selection field and the
* #H5FD_SUBFILING_IOC_PER_NODE and #H5FD_SUBFILING_IOC_SELECTION_CRITERIA
* environment variables.
* This value is used in conjunction with the IOC selection method to determine
* which MPI ranks will be assigned as I/O concentrators. Alternatively, the
* mapping between MPI ranks and I/O concentrators can be set or adjusted with a
* combination of the #ioc_selection field and the #H5FD_SUBFILING_IOC_PER_NODE
* and #H5FD_SUBFILING_IOC_SELECTION_CRITERIA environment variables.
*/
typedef struct H5FD_subfiling_shared_config_t {
H5FD_subfiling_ioc_select_t ioc_selection; /* Method to select I/O concentrators */
int64_t stripe_size; /* Size (in bytes) of data stripes in sub-files */
int32_t stripe_count; /* Number of I/O concentrators to use */
} H5FD_subfiling_shared_config_t;
typedef struct H5FD_subfiling_params_t {
H5FD_subfiling_ioc_select_t ioc_selection; /* Method to select I/O concentrators */
int64_t stripe_size; /* Size (in bytes) of data stripes in subfiles */
int32_t stripe_count; /* Target number of subfiles to use */
} H5FD_subfiling_params_t;
//! <!-- [H5FD_subfiling_config_t_snip] -->
/**
@ -226,7 +259,7 @@ typedef struct H5FD_subfiling_shared_config_t {
* \var hid_t H5FD_subfiling_config_t::ioc_fapl_id
* The File Access Property List which is setup with the file driver that
* the #H5FD_SUBFILING driver will use for servicing I/O requests to the
* sub-files. Currently, the File Access Property List must be setup with
* subfiles. Currently, the File Access Property List must be setup with
* the #H5FD_IOC driver by calling H5Pset_fapl_ioc(), but future development
* may allow other file drivers to be used.
*
@ -235,19 +268,18 @@ typedef struct H5FD_subfiling_shared_config_t {
* use the #H5FD_IOC driver for its I/O operations. This field should currently
* always be set to TRUE.
*
* \var H5FD_subfiling_shared_config_t H5FD_subfiling_config_t::shared_cfg
* \var H5FD_subfiling_params_t H5FD_subfiling_config_t::shared_cfg
* A structure which contains the subfiling parameters that are shared between
* the #H5FD_SUBFILING and #H5FD_IOC drivers. This includes the sub-file stripe
* size, number of I/O concentrators, IOC selection method, etc.
* the #H5FD_SUBFILING and #H5FD_IOC drivers. This includes the subfile stripe
* size, stripe count, IOC selection method, etc.
*
*/
typedef struct H5FD_subfiling_config_t {
uint32_t magic; /* Must be set to H5FD_SUBFILING_FAPL_MAGIC */
uint32_t version; /* Must be set to H5FD_SUBFILING_CURR_FAPL_VERSION */
hid_t ioc_fapl_id; /* The FAPL setup with the stacked VFD to use for I/O concentrators */
hbool_t require_ioc; /* Whether to use the IOC VFD (currently must always be TRUE) */
H5FD_subfiling_shared_config_t
shared_cfg; /* Subfiling/IOC parameters (stripe size, stripe count, etc.) */
uint32_t magic; /* Must be set to H5FD_SUBFILING_FAPL_MAGIC */
uint32_t version; /* Must be set to H5FD_SUBFILING_CURR_FAPL_VERSION */
hid_t ioc_fapl_id; /* The FAPL setup with the stacked VFD to use for I/O concentrators */
hbool_t require_ioc; /* Whether to use the IOC VFD (currently must always be TRUE) */
H5FD_subfiling_params_t shared_cfg; /* Subfiling/IOC parameters (stripe size, stripe count, etc.) */
} H5FD_subfiling_config_t;
//! <!-- [H5FD_subfiling_config_t_snip] -->
@ -274,8 +306,8 @@ H5_DLL hid_t H5FD_subfiling_init(void);
*
* The #H5FD_SUBFILING driver is an MPI-based file driver that allows an
* HDF5 application to distribute a logical HDF5 file across a collection
* of "sub-files" in equal-sized data segment "stripes". I/O to the logical
* HDF5 file is then directed to the appropriate "sub-file" according to the
* of "subfiles" in equal-sized data segment "stripes". I/O to the logical
* HDF5 file is then directed to the appropriate "subfile" according to the
* #H5FD_SUBFILING configuration and a system of I/O concentrators, which
* are MPI ranks operating worker threads.
*

File diff suppressed because it is too large Load Diff

View File

@ -20,16 +20,48 @@
#include <stdatomic.h>
#include "H5private.h"
#include "H5FDprivate.h"
#include "H5Iprivate.h"
#include "H5Pprivate.h"
#include "H5FDsubfiling.h"
#include "H5FDioc.h"
#ifndef PATH_MAX
#define PATH_MAX 4096
#endif
/*
* Some definitions for debugging the Subfiling feature
*/
/* #define H5_SUBFILING_DEBUG */
/*
* Some definitions for controlling performance across
* different machines where some types of MPI operations
* may be better optimized than others
*/
/* #define H5_SUBFILING_PREFER_ALLGATHER_TOPOLOGY */
#ifndef H5_SUBFILING_PREFER_ALLGATHER_TOPOLOGY
#if !H5_CHECK_MPI_VERSION(3, 0)
#error "MPI 3 required for MPI_Comm_split_type"
#endif
#endif
/*
* Name of the HDF5 FAPL property that the Subfiling VFD
* uses to pass its configuration down to the underlying
* IOC VFD
*/
#define H5FD_SUBFILING_CONFIG_PROP "H5FD_SUBFILING_CONFIG_PROP"
/*
* Name of the HDF5 FAPL property that the Subfiling VFD
* uses to pass the HDF5 stub file's Inode value to the
* underlying IOC VFD
*/
#define H5FD_SUBFILING_STUB_FILE_ID "H5FD_SUBFILING_STUB_FILE_ID"
/*
* MPI Tags are 32 bits, we treat them as unsigned
* to allow the use of the available bits for RPC
@ -80,8 +112,10 @@
/* MPI tag values for data communicator */
#define WRITE_INDEP_ACK 0
#define READ_INDEP_DATA 1
#define WRITE_TAG_BASE 2
#define READ_INDEP_ACK 1
#define READ_INDEP_DATA 2
#define WRITE_DATA_DONE 3
#define IO_TAG_BASE 4
/*
* Object type definitions for subfiling objects.
@ -112,70 +146,70 @@ typedef enum io_ops {
LOGGING_OP = 16
} io_op_t;
/* Every application rank will record their MPI rank
* and hostid as a structure. These eventually get
* communicated to MPI rank zero(0) and sorted before
* being broadcast. The resulting sorted vector
* provides a basis for determining which MPI ranks
* will host an IO Concentrator (IOC), e.g. For
* default behavior, we choose the first vector entry
* associated with a "new" hostid.
/*
* Every MPI rank in a file's communicator will
* record their MPI rank for the file communicator
* and their node-local MPI rank for the node's
* communicator. Then the resulting information
* will be broadcast to all MPI ranks and will
* provide a basis for determining which MPI ranks
* will host an I/O concentrator.
*/
typedef struct {
long rank;
long hostid;
int rank;
int node_local_rank;
int node_local_size;
int node_lead_rank;
} layout_t;
/* This typedef defines a fixed process layout which
/*
* This typedef defines a fixed process layout which
* can be reused for any number of file open operations
*/
typedef struct app_layout_t {
long hostid; /* value returned by gethostid() */
layout_t *layout; /* Vector of {rank,hostid} values */
int *node_ranks; /* ranks extracted from sorted layout */
int node_count; /* Total nodes (different hostids) */
int node_index; /* My node: index into node_ranks */
int local_peers; /* How may local peers on my node */
int world_rank; /* My MPI rank */
int world_size; /* Total number of MPI ranks */
layout_t *layout; /* Array of (rank, node local rank, node local size) values */
int *node_ranks; /* Array of lowest MPI rank values on each node */
int node_count; /* Total number of nodes */
int world_rank; /* MPI rank in file communicator */
int world_size; /* Size of file communicator */
int node_local_rank; /* MPI rank on node */
int node_local_size; /* Size of node intra-communicator */
} app_layout_t;
/* This typedef defines things related to IOC selections */
typedef struct topology {
app_layout_t *app_layout; /* Pointer to our layout struct */
bool rank_is_ioc; /* Indicates that we host an IOC */
int subfile_rank; /* Valid only if rank_is_ioc */
int n_io_concentrators; /* Number of IO concentrators */
int *io_concentrators; /* Vector of ranks which are IOCs */
int *subfile_fd; /* file descriptor (if IOC) */
H5FD_subfiling_ioc_select_t selection_type; /* Cache our IOC selection criteria */
app_layout_t *app_layout; /* Pointer to our layout struct */
MPI_Comm app_comm; /* MPI communicator for this topology */
bool rank_is_ioc; /* Indicates that we host an IOC */
int ioc_idx; /* Valid only if rank_is_ioc */
int n_io_concentrators; /* Number of I/O concentrators */
int *io_concentrators; /* Vector of ranks which are IOCs */
H5FD_subfiling_ioc_select_t selection_type; /* Cache our IOC selection criteria */
} sf_topology_t;
typedef struct {
int64_t sf_context_id; /* Generated context ID which embeds the cache index */
uint64_t h5_file_id; /* GUID (basically the inode value) */
void *h5_file_handle; /* Low-level handle for the HDF5 stub file */
int sf_fid; /* value returned by open(file,..) */
size_t sf_write_count; /* Statistics: write_count */
size_t sf_read_count; /* Statistics: read_count */
haddr_t sf_eof; /* File eof */
int64_t sf_stripe_size; /* Stripe-depth */
int64_t sf_blocksize_per_stripe; /* Stripe-depth X n_IOCs */
int64_t sf_base_addr; /* For an IOC, our base address */
MPI_Comm sf_msg_comm; /* MPI comm used to send RPC msg */
MPI_Comm sf_data_comm; /* MPI comm used to move data */
MPI_Comm sf_eof_comm; /* MPI comm used to communicate EOF */
MPI_Comm sf_barrier_comm; /* MPI comm used for barrier operations */
MPI_Comm sf_group_comm; /* Not used: for IOC collectives */
MPI_Comm sf_intercomm; /* Not used: for msgs to all IOC */
int sf_group_size; /* IOC count (in sf_group_comm) */
int sf_group_rank; /* IOC rank (in sf_group_comm) */
int sf_intercomm_root; /* Not used: for IOC comms */
char *subfile_prefix; /* If subfiles are node-local */
char *sf_filename; /* A generated subfile name */
char *h5_filename; /* The user supplied file name */
void *ioc_data; /* Private data for underlying IOC */
sf_topology_t *topology; /* pointer to our topology */
uint64_t h5_file_id; /* GUID (basically the inode value) */
int *sf_fids; /* Array of file IDs for subfiles this rank owns */
int sf_num_fids; /* Number of subfiles this rank owns */
int sf_num_subfiles; /* Total number of subfiles for logical HDF5 file */
size_t sf_write_count; /* Statistics: write_count */
size_t sf_read_count; /* Statistics: read_count */
haddr_t sf_eof; /* File eof */
int64_t sf_stripe_size; /* Stripe-depth */
int64_t sf_blocksize_per_stripe; /* Stripe-depth X n_IOCs */
int64_t sf_base_addr; /* For an IOC, our base address */
MPI_Comm sf_msg_comm; /* MPI comm used to send RPC msg */
MPI_Comm sf_data_comm; /* MPI comm used to move data */
MPI_Comm sf_eof_comm; /* MPI comm used to communicate EOF */
MPI_Comm sf_node_comm; /* MPI comm used for intra-node comms */
MPI_Comm sf_group_comm; /* Not used: for IOC collectives */
int sf_group_size; /* IOC count (in sf_group_comm) */
int sf_group_rank; /* IOC rank (in sf_group_comm) */
char *subfile_prefix; /* If subfiles are node-local */
char *h5_filename; /* The user supplied file name */
void *ioc_data; /* Private data for underlying IOC */
sf_topology_t *topology; /* Pointer to our topology */
#ifdef H5_SUBFILING_DEBUG
char sf_logfile_name[PATH_MAX];
@ -189,30 +223,45 @@ typedef struct {
* an easy gathering of statistics by the IO Concentrator.
*/
typedef struct {
/* {Datasize, Offset, FileID} */
int64_t header[3]; /* The basic RPC input plus */
int tag; /* the supplied OPCODE tag */
int source; /* Rank of who sent the message */
int subfile_rank; /* The IOC rank */
int64_t context_id; /* context to be used to complete */
double start_time; /* the request, + time of receipt */
/* from which we calc Time(queued) */
int64_t header[3]; /* The basic RPC input */
int tag; /* the supplied OPCODE tag */
int source; /* Rank of who sent the message */
int ioc_idx; /* The IOC rank */
int64_t context_id; /* context to be used to complete */
double start_time; /* the request, + time of receipt */
/* from which we calc Time(queued) */
} sf_work_request_t;
/* MPI Datatype used to send/receive an RPC message */
extern MPI_Datatype H5_subfiling_rpc_msg_type;
#ifdef __cplusplus
extern "C" {
#endif
H5_DLL herr_t H5_open_subfiles(const char *base_filename, void *h5_file_handle,
H5FD_subfiling_shared_config_t *subfiling_config, int file_acc_flags,
H5_DLL herr_t H5_open_subfiling_stub_file(const char *name, unsigned flags, MPI_Comm file_comm,
H5FD_t **file_ptr, uint64_t *file_id);
H5_DLL herr_t H5_open_subfiles(const char *base_filename, uint64_t file_id,
H5FD_subfiling_params_t *subfiling_config, int file_acc_flags,
MPI_Comm file_comm, int64_t *context_id_out);
H5_DLL herr_t H5_close_subfiles(int64_t subfiling_context_id);
H5_DLL herr_t H5_close_subfiles(int64_t subfiling_context_id, MPI_Comm file_comm);
H5_DLL int64_t H5_new_subfiling_object_id(sf_obj_type_t obj_type, int64_t index_val);
H5_DLL int64_t H5_new_subfiling_object_id(sf_obj_type_t obj_type);
H5_DLL void *H5_get_subfiling_object(int64_t object_id);
H5_DLL int64_t H5_subfile_fhandle_to_context(void *file_handle);
H5_DLL herr_t H5_free_subfiling_object(int64_t object_id);
H5_DLL herr_t H5_get_num_iocs_from_config_file(FILE *config_file, int *n_io_concentrators);
H5_DLL herr_t H5_get_subfiling_config_from_file(FILE *config_file, int64_t *stripe_size,
int64_t *num_subfiles);
H5_DLL herr_t H5_resolve_pathname(const char *filepath, MPI_Comm comm, char **resolved_filepath);
H5_DLL herr_t H5_subfiling_set_config_prop(H5P_genplist_t *plist_ptr,
const H5FD_subfiling_params_t *vfd_config);
H5_DLL herr_t H5_subfiling_get_config_prop(H5P_genplist_t *plist_ptr, H5FD_subfiling_params_t *vfd_config);
H5_DLL herr_t H5_subfiling_set_file_id_prop(H5P_genplist_t *plist_ptr, uint64_t file_id);
H5_DLL herr_t H5_subfiling_get_file_id_prop(H5P_genplist_t *plist_ptr, uint64_t *file_id);
H5_DLL int64_t H5_subfile_fid_to_context(uint64_t file_id);
H5_DLL herr_t H5_subfiling_validate_config(const H5FD_subfiling_params_t *subf_config);
H5_DLL herr_t H5_subfiling_terminate(void);
H5_DLL void H5_subfiling_log(int64_t sf_context_id, const char *fmt, ...);

View File

@ -98,6 +98,14 @@ set (test_par_CLEANFILES
t_filters_parallel.h5
MPItest.h5
ShapeSameTest.h5
test_subfiling_basic_create.h5
test_subfiling_config_file.h5
test_subfiling_stripe_sizes.h5
test_subfiling_read_different_stripe_sizes.h5
test_subfiling_precreate_rank_0.h5
test_subfiling_write_many_read_one.h5
test_subfiling_write_many_read_few.h5
test_subfiling_h5fuse.h5
)
# Remove any output file left over from previous test run

File diff suppressed because it is too large Load Diff

View File

@ -327,7 +327,7 @@ setup_vfd_test_file(int file_name_id, char *file_name, int mpi_size, H5FD_mpio_x
#ifdef H5_HAVE_SUBFILING_VFD
else if (HDstrcmp(vfd_name, H5FD_SUBFILING_NAME) == 0) {
H5FD_subfiling_shared_config_t shared_conf = {
H5FD_subfiling_params_t shared_conf = {
/* ioc_selection = */ SELECT_IOC_ONE_PER_NODE,
/* stripe_size = */ (INTS_PER_RANK / 2),
/* stripe_count = */ 0, /* will over write */
@ -342,9 +342,7 @@ setup_vfd_test_file(int file_name_id, char *file_name, int mpi_size, H5FD_mpio_x
H5FD_ioc_config_t ioc_config = {
/* magic = */ H5FD_IOC_FAPL_MAGIC,
/* version = */ H5FD_IOC_CURR_FAPL_VERSION,
/* under_fapl_id = */ H5P_DEFAULT,
/* thread_pool_size = */ H5FD_IOC_DEFAULT_THREAD_POOL_SIZE,
/* subf_config = */ shared_conf,
};
hid_t ioc_fapl = H5I_INVALID_HID;

View File

@ -1,10 +1,20 @@
cmake_minimum_required (VERSION 3.18)
project (HDF5_UTILS_SUBFILINGVFD C)
configure_file (${HDF5_UTILS_SUBFILINGVFD_SOURCE_DIR}/h5fuse.sh.in ${HDF5_BINARY_DIR}/h5fuse.sh @ONLY)
configure_file (${HDF5_UTILS_SUBFILINGVFD_SOURCE_DIR}/h5fuse.sh.in ${HDF5_UTILS_SUBFILINGVFD_BINARY_DIR}/h5fuse.sh @ONLY)
# Copy h5fuse.sh to testpar directory for subfiling tests
if (HDF5_ENABLE_PARALLEL AND HDF5_TEST_PARALLEL)
file (
COPY
${HDF5_UTILS_SUBFILINGVFD_BINARY_DIR}/h5fuse.sh
DESTINATION
${HDF5_TEST_PAR_BINARY_DIR}
)
endif ()
install (
FILES ${HDF5_BINARY_DIR}/h5fuse.sh
FILES ${HDF5_UTILS_SUBFILINGVFD_BINARY_DIR}/h5fuse.sh
DESTINATION ${HDF5_INSTALL_BIN_DIR}
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE
COMPONENT utilsapplications

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Copyright by The HDF Group.
# All rights reserved.
@ -49,7 +49,7 @@ while getopts ":h:f:" option; do
h) # display Help
usage
exit;;
f) # subfiling configureation file
f) # subfiling configuration file
file_config=$OPTARG;;
\?) # Invalid option
echo -e "$RED ERROR: Invalid option ${BLD}-${OPTARG}${RED} $NC"