mirror of
https://github.com/HDFGroup/hdf5.git
synced 2024-11-21 01:04:10 +08:00
Implement selection vector I/O with collective chunk filling (#3826)
* Changes for ECP-344: Implement selection vector I/O with collective chunk filling. Also fix a bug in H5FD__mpio_write_vector() to account for fixed size optimization when computing max address. * Fixes based on PR review comments: For H5Dchunk.c: fix H5MM_xfree() For H5FDmpio.c: 1) Revert the fix to H5FD__mpio_write_vector() 2) Apply the patch from Neil on the proper length of s_sizes reported by H5FD__mpio_vector_build_types() * Put back the logic of dividing up the work among all the mpi ranks similar to the original H5D__chunk_collective_fill() routine. * Add a test to verify the fix for the illegal reference problem in H5FD__mpio_write_vector().
This commit is contained in:
parent
ef39882fa1
commit
ed31aaca79
264
src/H5Dchunk.c
264
src/H5Dchunk.c
@ -5536,11 +5536,9 @@ done:
|
||||
/*-------------------------------------------------------------------------
|
||||
* Function: H5D__chunk_collective_fill
|
||||
*
|
||||
* Purpose: Use MPIO collective write to fill the chunks (if number of
|
||||
* chunks to fill is greater than the number of MPI procs;
|
||||
* otherwise use independent I/O).
|
||||
* Purpose: Use MPIO selection vector I/O for writing fill chunks
|
||||
*
|
||||
* Return: Non-negative on success/Negative on failure
|
||||
* Return: Non-negative on success/Negative on failure
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
|
||||
int mpi_code; /* MPI return code */
|
||||
size_t num_blocks; /* Number of blocks between processes. */
|
||||
size_t leftover_blocks; /* Number of leftover blocks to handle */
|
||||
int blocks, leftover; /* converted to int for MPI */
|
||||
MPI_Aint *chunk_disp_array = NULL;
|
||||
MPI_Aint *block_disps = NULL;
|
||||
int *block_lens = NULL;
|
||||
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
|
||||
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
|
||||
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
|
||||
bool need_sort = false;
|
||||
size_t i; /* Local index variable */
|
||||
int blocks; /* converted to int for MPI */
|
||||
int leftover; /* converted to int for MPI */
|
||||
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
|
||||
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
|
||||
size_t i; /* Local index variable */
|
||||
haddr_t *io_addrs = NULL;
|
||||
size_t *io_sizes = NULL;
|
||||
const void **io_wbufs = NULL;
|
||||
H5FD_mem_t io_types[2];
|
||||
bool all_same_block_len = true;
|
||||
bool need_sort = false;
|
||||
size_t io_2sizes[2];
|
||||
herr_t ret_value = SUCCEED; /* Return value */
|
||||
|
||||
FUNC_ENTER_PACKAGE
|
||||
|
||||
assert(chunk_fill_info->num_chunks != 0);
|
||||
|
||||
/*
|
||||
* If a separate fill buffer is provided for partial chunks, ensure
|
||||
* that the "don't filter partial edge chunks" flag is set.
|
||||
@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
|
||||
/* Distribute evenly the number of blocks between processes. */
|
||||
if (mpi_size == 0)
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");
|
||||
|
||||
num_blocks =
|
||||
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */
|
||||
|
||||
@ -5602,157 +5606,97 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
|
||||
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);
|
||||
|
||||
/* Check if we have any chunks to write on this rank */
|
||||
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
|
||||
MPI_Aint partial_fill_buf_disp = 0;
|
||||
bool all_same_block_len = true;
|
||||
if (num_blocks > 0 || leftover > mpi_rank) {
|
||||
|
||||
/* Allocate buffers */
|
||||
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");
|
||||
if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs))))
|
||||
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
|
||||
"couldn't allocate space for I/O addresses vector");
|
||||
|
||||
if (partial_chunk_fill_buf) {
|
||||
MPI_Aint fill_buf_addr;
|
||||
MPI_Aint partial_fill_buf_addr;
|
||||
if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs))))
|
||||
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");
|
||||
}
|
||||
|
||||
/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
|
||||
/*
|
||||
* Perform initial scan of chunk info list to:
|
||||
* - make sure that chunk addresses are monotonically non-decreasing
|
||||
* - check if all blocks have the same length
|
||||
*/
|
||||
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
|
||||
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
|
||||
need_sort = true;
|
||||
|
||||
#if H5_CHECK_MPI_VERSION(3, 1)
|
||||
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
|
||||
#else
|
||||
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
|
||||
#endif
|
||||
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
|
||||
all_same_block_len = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Allocate all-zero block displacements array. If a block's displacement
|
||||
* is left as zero, that block will be written to from the regular fill
|
||||
* buffer. If a block represents an unfiltered partial edge chunk, its
|
||||
* displacement will be set so that the block is written to from the
|
||||
* unfiltered fill buffer.
|
||||
*/
|
||||
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
|
||||
}
|
||||
/*
|
||||
* Note that we sort all of the chunks here, and not just a subset
|
||||
* corresponding to this rank. We do this since we have found MPI I/O to work
|
||||
* better when each rank writes blocks that are contiguous in the file,
|
||||
* and by sorting the full list we maximize the chance of that happening.
|
||||
*/
|
||||
if (need_sort)
|
||||
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info),
|
||||
H5D__chunk_cmp_coll_fill_info);
|
||||
|
||||
/*
|
||||
* Perform initial scan of chunk info list to:
|
||||
* - make sure that chunk addresses are monotonically non-decreasing
|
||||
* - check if all blocks have the same length
|
||||
*/
|
||||
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
|
||||
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
|
||||
need_sort = true;
|
||||
/*
|
||||
* If all the chunks have the same length, use the compressed feature
|
||||
* to store the size.
|
||||
* Otherwise, allocate the array of sizes for storing chunk sizes.
|
||||
*/
|
||||
if (all_same_block_len) {
|
||||
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
|
||||
io_2sizes[1] = 0;
|
||||
}
|
||||
else {
|
||||
if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes))))
|
||||
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
|
||||
}
|
||||
|
||||
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
|
||||
all_same_block_len = false;
|
||||
}
|
||||
/*
|
||||
* Since the type of all chunks is raw data, use the compressed feature
|
||||
* to store the chunk type.
|
||||
*/
|
||||
io_types[0] = H5FD_MEM_DRAW;
|
||||
io_types[1] = H5FD_MEM_NOLIST;
|
||||
|
||||
if (need_sort)
|
||||
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
|
||||
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);
|
||||
/*
|
||||
* For the chunks corresponding to this rank, fill in the
|
||||
* address, size and buf pointer for each chunk.
|
||||
*/
|
||||
for (i = 0; i < (size_t)blocks; i++) {
|
||||
size_t idx = i + (size_t)(mpi_rank * blocks);
|
||||
|
||||
io_addrs[i] = chunk_fill_info->chunk_info[idx].addr;
|
||||
|
||||
/* Allocate buffer for block lengths if necessary */
|
||||
if (!all_same_block_len)
|
||||
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");
|
||||
io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size;
|
||||
|
||||
for (i = 0; i < (size_t)blocks; i++) {
|
||||
size_t idx = i + (size_t)(mpi_rank * blocks);
|
||||
if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk)
|
||||
io_wbufs[i] = partial_chunk_fill_buf;
|
||||
else
|
||||
io_wbufs[i] = fill_buf;
|
||||
}
|
||||
|
||||
/* store the chunk address as an MPI_Aint */
|
||||
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);
|
||||
/*
|
||||
* For the leftover chunk corresponding to this rank, fill in the
|
||||
* address, size and buf pointer for the chunk.
|
||||
*/
|
||||
if (leftover > mpi_rank) {
|
||||
io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;
|
||||
|
||||
if (!all_same_block_len)
|
||||
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);
|
||||
if (!all_same_block_len)
|
||||
io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size;
|
||||
|
||||
if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
|
||||
assert(partial_chunk_fill_buf);
|
||||
block_disps[i] = partial_fill_buf_disp;
|
||||
}
|
||||
} /* end for */
|
||||
|
||||
/* Calculate if there are any leftover blocks after evenly
|
||||
* distributing. If there are, then round-robin the distribution
|
||||
* to processes 0 -> leftover.
|
||||
*/
|
||||
if (leftover && leftover > mpi_rank) {
|
||||
chunk_disp_array[blocks] =
|
||||
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;
|
||||
|
||||
if (!all_same_block_len)
|
||||
H5_CHECKED_ASSIGN(block_lens[blocks], int,
|
||||
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
|
||||
size_t);
|
||||
|
||||
if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
|
||||
assert(partial_chunk_fill_buf);
|
||||
block_disps[blocks] = partial_fill_buf_disp;
|
||||
}
|
||||
|
||||
blocks++;
|
||||
}
|
||||
|
||||
/* Create file and memory types for the write operation */
|
||||
if (all_same_block_len) {
|
||||
int block_len;
|
||||
|
||||
H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);
|
||||
|
||||
mpi_code =
|
||||
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
|
||||
if (mpi_code != MPI_SUCCESS)
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
|
||||
|
||||
if (partial_chunk_fill_buf) {
|
||||
/*
|
||||
* If filters are disabled for partial edge chunks, those chunks could
|
||||
* potentially have the same block length as the other chunks, but still
|
||||
* need to be written to using the unfiltered fill buffer. Use an hindexed
|
||||
* block type rather than an hvector.
|
||||
*/
|
||||
mpi_code =
|
||||
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
|
||||
if (mpi_code != MPI_SUCCESS)
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
|
||||
}
|
||||
else {
|
||||
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
|
||||
if (mpi_code != MPI_SUCCESS)
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
|
||||
}
|
||||
}
|
||||
else {
|
||||
/*
|
||||
* Currently, different block lengths implies that there are partial
|
||||
* edge chunks and the "don't filter partial edge chunks" flag is set.
|
||||
*/
|
||||
if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
|
||||
assert(partial_chunk_fill_buf);
|
||||
assert(block_lens);
|
||||
assert(block_disps);
|
||||
|
||||
mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
|
||||
if (mpi_code != MPI_SUCCESS)
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
|
||||
|
||||
mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
|
||||
if (mpi_code != MPI_SUCCESS)
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
|
||||
io_wbufs[blocks] = partial_chunk_fill_buf;
|
||||
}
|
||||
else
|
||||
io_wbufs[blocks] = fill_buf;
|
||||
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
|
||||
} /* end if */
|
||||
|
||||
/* Set MPI-IO VFD properties */
|
||||
|
||||
/* Set MPI datatypes for operation */
|
||||
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
|
||||
blocks++;
|
||||
}
|
||||
|
||||
/* Get current transfer mode */
|
||||
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
|
||||
@ -5763,31 +5707,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
|
||||
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");
|
||||
|
||||
/* Low-level write (collective) */
|
||||
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
|
||||
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
|
||||
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");
|
||||
|
||||
/* Barrier so processes don't race ahead */
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
|
||||
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
|
||||
|
||||
/* Perform the selection vector I/O for the chunks */
|
||||
if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs,
|
||||
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
|
||||
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");
|
||||
|
||||
done:
|
||||
if (have_xfer_mode)
|
||||
/* Set transfer mode */
|
||||
/* Restore transfer mode */
|
||||
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
|
||||
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");
|
||||
|
||||
/* free things */
|
||||
if (MPI_BYTE != file_type)
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
|
||||
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
|
||||
if (MPI_BYTE != mem_type)
|
||||
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
|
||||
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
|
||||
H5MM_xfree(chunk_disp_array);
|
||||
H5MM_xfree(block_disps);
|
||||
H5MM_xfree(block_lens);
|
||||
H5MM_xfree(io_addrs);
|
||||
H5MM_xfree(io_wbufs);
|
||||
H5MM_xfree(io_sizes);
|
||||
|
||||
FUNC_LEAVE_NOAPI(ret_value)
|
||||
} /* end H5D__chunk_collective_fill() */
|
||||
@ -5805,6 +5742,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)
|
||||
|
||||
FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
|
||||
} /* end H5D__chunk_cmp_coll_fill_info() */
|
||||
|
||||
#endif /* H5_HAVE_PARALLEL */
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
|
@ -106,7 +106,7 @@ static herr_t H5FD__mpio_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, co
|
||||
/* Other functions */
|
||||
static herr_t H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[],
|
||||
size_t sizes[], H5_flexible_const_ptr_t bufs[],
|
||||
haddr_t *s_addrs[], size_t *s_sizes[],
|
||||
haddr_t *s_addrs[], size_t *s_sizes[], uint32_t *s_sizes_len,
|
||||
H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted,
|
||||
MPI_Offset *mpi_off, H5_flexible_const_ptr_t *mpi_bufs_base,
|
||||
int *size_i, MPI_Datatype *buf_type, bool *buf_type_created,
|
||||
@ -1675,7 +1675,8 @@ done:
|
||||
static herr_t
|
||||
H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
|
||||
H5_flexible_const_ptr_t bufs[], haddr_t *s_addrs[], size_t *s_sizes[],
|
||||
H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted, MPI_Offset *mpi_off,
|
||||
uint32_t *s_sizes_len, H5_flexible_const_ptr_t *s_bufs[],
|
||||
bool *vector_was_sorted, MPI_Offset *mpi_off,
|
||||
H5_flexible_const_ptr_t *mpi_bufs_base, int *size_i, MPI_Datatype *buf_type,
|
||||
bool *buf_type_created, MPI_Datatype *file_type, bool *file_type_created,
|
||||
char *unused)
|
||||
@ -1716,6 +1717,10 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
|
||||
/* Get bio I/O transition point (may be lower than 2G for testing) */
|
||||
bigio_count = H5_mpi_get_bigio_count();
|
||||
|
||||
/* Start with s_sizes_len at count */
|
||||
if (s_sizes_len)
|
||||
*s_sizes_len = count;
|
||||
|
||||
if (count == 1) {
|
||||
/* Single block. Just use a series of MPI_BYTEs for the file view.
|
||||
*/
|
||||
@ -1808,8 +1813,13 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
|
||||
if (!fixed_size) {
|
||||
if ((*s_sizes)[i] == 0) {
|
||||
assert(vector_was_sorted);
|
||||
assert(i > 0);
|
||||
fixed_size = true;
|
||||
size = sizes[i - 1];
|
||||
|
||||
/* Return the used length of the s_sizes buffer */
|
||||
if (s_sizes_len)
|
||||
*s_sizes_len = (uint32_t)i;
|
||||
}
|
||||
else {
|
||||
size = (*s_sizes)[i];
|
||||
@ -2098,7 +2108,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou
|
||||
if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
|
||||
/* Build MPI types, etc. */
|
||||
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
|
||||
&s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
|
||||
&s_addrs, &s_sizes, NULL, (H5_flexible_const_ptr_t **)&s_bufs,
|
||||
&vector_was_sorted, &mpi_off,
|
||||
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
|
||||
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
|
||||
@ -2464,17 +2474,21 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co
|
||||
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode");
|
||||
|
||||
if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
|
||||
uint32_t s_sizes_len;
|
||||
|
||||
/* Build MPI types, etc. */
|
||||
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
|
||||
&s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
|
||||
&vector_was_sorted, &mpi_off,
|
||||
&s_addrs, &s_sizes, &s_sizes_len,
|
||||
(H5_flexible_const_ptr_t **)&s_bufs, &vector_was_sorted, &mpi_off,
|
||||
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
|
||||
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
|
||||
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't build MPI datatypes for I/O");
|
||||
|
||||
/* Compute max address written to */
|
||||
/* Compute max address written to. Note s_sizes is indexed according to the length of that array as
|
||||
* reported by H5FD__mpio_vector_build_types(), which may be shorter if using the compressed arrays
|
||||
* feature. */
|
||||
if (count > 0)
|
||||
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[count - 1]);
|
||||
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[s_sizes_len - 1]);
|
||||
|
||||
/* free sorted vectors if they exist */
|
||||
if (!vector_was_sorted) {
|
||||
|
@ -547,8 +547,9 @@ verify_chunk_opt_status(size_t num_dsets, test_mode_t test_mode, bool any_io, bo
|
||||
|
||||
/* Verify selection I/O mode on rank 0 */
|
||||
if (mpi_rank == 0) {
|
||||
/* No actual I/O performed, only reported I/O will be from allocation, even if "no" datasets were
|
||||
* involved (num_dsets == 0 implies the call was expected to fail, but it fails after allocation).
|
||||
/* No actual I/O performed, the only reported I/O will be from allocation which is vector I/O,
|
||||
* even if "no" datasets were involved (num_dsets == 0 implies the call was expected to fail,
|
||||
* but it fails after allocation).
|
||||
* Also if the test mode is mixed filtered and unfiltered and the call did not fail, then there
|
||||
* will always be an I/O callback made with raw data. This is because unfiltered datasets fall
|
||||
* back to scalar I/O when mixed with filtered, and scalar I/O reports an I/O call was made even
|
||||
@ -557,9 +558,18 @@ verify_chunk_opt_status(size_t num_dsets, test_mode_t test_mode, bool any_io, bo
|
||||
* filtered dataset with no selection. Vector I/O does report an I/O call was made if passed a raw
|
||||
* data element of size 0, so this is consistent. */
|
||||
if (!any_io) {
|
||||
if (did_alloc || (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED))
|
||||
if (did_alloc && (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED)) {
|
||||
VRFY((H5D_VECTOR_IO | H5D_SCALAR_IO) == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was vector and scalar I/O");
|
||||
}
|
||||
else if (did_alloc) {
|
||||
VRFY(H5D_VECTOR_IO == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was vector I/O");
|
||||
}
|
||||
else if (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED) {
|
||||
VRFY(H5D_SCALAR_IO == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was scalar I/O");
|
||||
}
|
||||
else
|
||||
VRFY(0 == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was 0 (no I/O)");
|
||||
@ -588,19 +598,18 @@ verify_chunk_opt_status(size_t num_dsets, test_mode_t test_mode, bool any_io, bo
|
||||
switch (test_mode) {
|
||||
case USE_SINGLE_DATASET:
|
||||
case USE_MULTIPLE_DATASETS:
|
||||
/* Collective case with only filtered datasets. If we performed allocation then there
|
||||
* should be scalar I/O for allocation in addition to vector I/O for the actual data.
|
||||
* If we're reading from an unallocated dataset then there should be no actual I/O.
|
||||
* Otherwise there should only be vector I/O. */
|
||||
if (did_alloc)
|
||||
VRFY((H5D_SCALAR_IO | H5D_VECTOR_IO) == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was scalar and vector I/O");
|
||||
else if (unalloc_read)
|
||||
/* Collective case with only filtered datasets.
|
||||
* If we're reading from an unallocated dataset then there
|
||||
* should be no actual I/O.
|
||||
* Otherwise, only vector I/O is reported whether or not
|
||||
* allocation happened. */
|
||||
if (unalloc_read)
|
||||
VRFY(0 == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was 0 (no I/O)");
|
||||
else
|
||||
else { /* did_alloc || !unalloc_read */
|
||||
VRFY(H5D_VECTOR_IO == actual_sel_io_mode_reduced,
|
||||
"verified actual selection I/O mode was vector I/O");
|
||||
}
|
||||
break;
|
||||
|
||||
case USE_MULTIPLE_DATASETS_MIXED_FILTERED:
|
||||
|
298
testpar/t_vfd.c
298
testpar/t_vfd.c
@ -40,13 +40,14 @@ const char *FILENAMES[] = {"mpio_vfd_test_file_0", /*0*/
|
||||
"mpio_vfd_test_file_4", /*4*/
|
||||
"mpio_vfd_test_file_5", /*5*/
|
||||
"mpio_vfd_test_file_6", /*6*/
|
||||
"subfiling_vfd_test_file_0", /*7*/
|
||||
"subfiling_vfd_test_file_1", /*8*/
|
||||
"subfiling_vfd_test_file_2", /*9*/
|
||||
"subfiling_vfd_test_file_3", /*10*/
|
||||
"subfiling_vfd_test_file_4", /*11*/
|
||||
"subfiling_vfd_test_file_5", /*12*/
|
||||
"subfiling_vfd_test_file_6", /*13*/
|
||||
"mpio_vfd_test_file_7", /*7*/
|
||||
"subfiling_vfd_test_file_0", /*8*/
|
||||
"subfiling_vfd_test_file_1", /*9*/
|
||||
"subfiling_vfd_test_file_2", /*10*/
|
||||
"subfiling_vfd_test_file_3", /*11*/
|
||||
"subfiling_vfd_test_file_4", /*12*/
|
||||
"subfiling_vfd_test_file_5", /*13*/
|
||||
"subfiling_vfd_test_file_6", /*14*/
|
||||
NULL};
|
||||
|
||||
/* File Test Images
|
||||
@ -100,6 +101,8 @@ static unsigned vector_write_test_6(int file_name_id, int mpi_rank, int mpi_size
|
||||
H5FD_mpio_collective_opt_t coll_opt_mode, const char *vfd_name);
|
||||
static unsigned vector_write_test_7(int file_name_id, int mpi_rank, int mpi_size, H5FD_mpio_xfer_t xfer_mode,
|
||||
H5FD_mpio_collective_opt_t coll_opt_mode, const char *vfd_name);
|
||||
static unsigned vector_write_test_8(int file_name_id, int mpi_rank, int mpi_size, H5FD_mpio_xfer_t xfer_mode,
|
||||
H5FD_mpio_collective_opt_t coll_opt_mode, const char *vfd_name);
|
||||
/*
|
||||
* Tests for selection I/O:
|
||||
* They are derived from test_selection_io() in test/vfd.c and modified for parallel testing.
|
||||
@ -4159,6 +4162,280 @@ vector_write_test_7(int file_name_id, int mpi_rank, int mpi_size, H5FD_mpio_xfer
|
||||
|
||||
} /* vector_write_test_7() */
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
* Function: vector_write_test_8()
|
||||
*
|
||||
* Purpose: This test is to verify the fix for the following problem
|
||||
* in H5FD__mpio_write_vector when calculating max_addr:
|
||||
* --illegal reference occurs when referencing the s_sizes array
|
||||
* with <count - 1> due to <count> exceeding the length of the
|
||||
* size array which uses the compressed feature.
|
||||
*
|
||||
* 1) Open the test file with the specified VFD, and set
|
||||
* the eoa.
|
||||
*
|
||||
* 2) Set the test file in a known state by writing zeros
|
||||
* to all bytes in the test file. Since we have already
|
||||
* tested this, do this via a vector write of zero_fi_buf.
|
||||
*
|
||||
* 3) Barrier
|
||||
*
|
||||
* 4) For each rank, define base_index equal to:
|
||||
*
|
||||
* mpi_rank * INTS_PER_RANK
|
||||
*
|
||||
* and define base_addr equal to
|
||||
*
|
||||
* base_index * sizeof(int32_t).
|
||||
*
|
||||
* Setup a vector of length INTS_PER_RANK - 1.
|
||||
* Set up the size array with the compressed feature:
|
||||
* --The first element has size (2 * sizeof(int32_t))
|
||||
* --The second and third elements are of size sizeof(int32_t)
|
||||
* --The fourth element is zero.
|
||||
* Set up addrs and bufs accordingly.
|
||||
*
|
||||
* Write the vector.
|
||||
*
|
||||
* 5) Barrier
|
||||
*
|
||||
* 6) On each rank, read the entire file into the read_fi_buf,
|
||||
* and compare against increasing_fi_buf.
|
||||
* Report failure if any differences are detected.
|
||||
*
|
||||
* 7) Close the test file. On rank 0, delete the test file.
|
||||
*
|
||||
* Return: false on success, true if any errors are detected.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
static unsigned
|
||||
vector_write_test_8(int file_name_id, int mpi_rank, int mpi_size, H5FD_mpio_xfer_t xfer_mode,
|
||||
H5FD_mpio_collective_opt_t coll_opt_mode, const char *vfd_name)
|
||||
{
|
||||
const char *fcn_name = "vector_write_test_8()";
|
||||
char test_title[120];
|
||||
char filename[512];
|
||||
haddr_t eoa;
|
||||
haddr_t base_addr;
|
||||
bool show_progress = false;
|
||||
hid_t fapl_id = H5I_INVALID_HID; /* file access property list ID */
|
||||
hid_t dxpl_id = H5I_INVALID_HID; /* data access property list ID */
|
||||
H5FD_t *lf = NULL; /* VFD struct ptr */
|
||||
int cp = 0;
|
||||
int i;
|
||||
int base_index;
|
||||
uint32_t count = 0;
|
||||
size_t sizes[4];
|
||||
H5FD_mem_t types[2];
|
||||
|
||||
haddr_t *tt_addrs = NULL; /* For storing addrs */
|
||||
const void **tt_bufs = NULL; /* For storing buf pointers */
|
||||
|
||||
pass = true;
|
||||
|
||||
if (mpi_rank == 0) {
|
||||
|
||||
if (xfer_mode == H5FD_MPIO_INDEPENDENT) {
|
||||
|
||||
snprintf(test_title, sizeof(test_title), "parallel vector write test 8 -- %s / independent",
|
||||
vfd_name);
|
||||
}
|
||||
else if (coll_opt_mode == H5FD_MPIO_INDIVIDUAL_IO) {
|
||||
|
||||
snprintf(test_title, sizeof(test_title), "parallel vector write test 8 -- %s / col op / ind I/O",
|
||||
vfd_name);
|
||||
}
|
||||
else {
|
||||
|
||||
assert(coll_opt_mode == H5FD_MPIO_COLLECTIVE_IO);
|
||||
|
||||
snprintf(test_title, sizeof(test_title), "parallel vector write test 8 -- %s / col op / col I/O",
|
||||
vfd_name);
|
||||
}
|
||||
|
||||
TESTING(test_title);
|
||||
}
|
||||
|
||||
show_progress = ((show_progress) && (mpi_rank == 0));
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "\n%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 1) Allocate local buffers for addrs and bufs,
|
||||
open the test file with the specified VFD, set the eoa, and setup the dxpl */
|
||||
if (pass) {
|
||||
|
||||
tt_addrs = (haddr_t *)malloc((INTS_PER_RANK) * sizeof(haddr_t *));
|
||||
tt_bufs = (const void **)malloc((INTS_PER_RANK) * sizeof(void *));
|
||||
|
||||
if (tt_addrs == NULL || tt_bufs == NULL) {
|
||||
pass = false;
|
||||
failure_mssg = "Can't allocate local addrs and bufs buffers.";
|
||||
}
|
||||
|
||||
if (pass) {
|
||||
eoa = (haddr_t)mpi_size * (haddr_t)INTS_PER_RANK * (haddr_t)(sizeof(int32_t));
|
||||
|
||||
setup_vfd_test_file(file_name_id, filename, mpi_size, xfer_mode, coll_opt_mode, vfd_name, eoa,
|
||||
&lf, &fapl_id, &dxpl_id);
|
||||
}
|
||||
}
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 2) Using rank zero, write the entire negative_fi_buf to
|
||||
* the file.
|
||||
*/
|
||||
if (pass) {
|
||||
|
||||
size_t image_size = (size_t)mpi_size * (size_t)INTS_PER_RANK * sizeof(int32_t);
|
||||
|
||||
if (mpi_rank == 0) {
|
||||
|
||||
if (H5FDwrite(lf, H5FD_MEM_DRAW, H5P_DEFAULT, (haddr_t)0, image_size, (void *)zero_fi_buf) < 0) {
|
||||
|
||||
pass = false;
|
||||
failure_mssg = "H5FDwrite() on rank 0 failed.\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* 3) Barrier */
|
||||
MPI_Barrier(comm);
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 4) For each rank, define base_index equal to:
|
||||
*
|
||||
* mpi_rank * INTS_PER_RANK
|
||||
*
|
||||
* and define base_addr equal to
|
||||
*
|
||||
* base_index * sizeof(int32_t).
|
||||
*
|
||||
* Set up the array of sizes and types with the compressed feature
|
||||
* as described in the routine header description.
|
||||
*/
|
||||
if (pass) {
|
||||
|
||||
base_index = (mpi_rank * INTS_PER_RANK);
|
||||
base_addr = (haddr_t)base_index * (haddr_t)sizeof(int32_t);
|
||||
|
||||
count = INTS_PER_RANK - 1;
|
||||
|
||||
types[0] = H5FD_MEM_DRAW;
|
||||
types[1] = H5FD_MEM_NOLIST;
|
||||
|
||||
sizes[0] = 2 * sizeof(int32_t);
|
||||
sizes[1] = sizeof(int32_t);
|
||||
sizes[2] = sizeof(int32_t);
|
||||
sizes[3] = 0;
|
||||
|
||||
tt_addrs[0] = base_addr;
|
||||
tt_bufs[0] = (const void *)(&(increasing_fi_buf[base_index]));
|
||||
|
||||
tt_addrs[0] = base_addr;
|
||||
base_index += 2;
|
||||
base_addr = (haddr_t)base_index * (haddr_t)sizeof(int32_t);
|
||||
|
||||
for (i = 1; i < (INTS_PER_RANK - 1); i++) {
|
||||
|
||||
tt_addrs[i] = base_addr + ((haddr_t)(i - 1) * (haddr_t)sizeof(int32_t));
|
||||
tt_bufs[i] = (const void *)(&(increasing_fi_buf[base_index + (i - 1)]));
|
||||
}
|
||||
|
||||
if (H5FDwrite_vector(lf, dxpl_id, count, types, tt_addrs, sizes, tt_bufs) < 0) {
|
||||
|
||||
pass = false;
|
||||
failure_mssg = "H5FDwrite_vector() failed (1).\n";
|
||||
}
|
||||
}
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 5) Barrier */
|
||||
MPI_Barrier(comm);
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 6) On each rank, read the entire file into the read_fi_buf,
|
||||
* and compare against increasing_fi_buf
|
||||
* Report failure if any differences are detected.
|
||||
*/
|
||||
if (pass) {
|
||||
|
||||
size_t image_size = (size_t)mpi_size * (size_t)INTS_PER_RANK * sizeof(int32_t);
|
||||
|
||||
if (H5FDread(lf, H5FD_MEM_DRAW, H5P_DEFAULT, (haddr_t)0, image_size, (void *)read_fi_buf) < 0) {
|
||||
|
||||
pass = false;
|
||||
failure_mssg = "H5FDread() failed.\n";
|
||||
}
|
||||
|
||||
for (i = 0; ((pass) && (i < mpi_size * INTS_PER_RANK)); i++) {
|
||||
|
||||
if (read_fi_buf[i] != increasing_fi_buf[i]) {
|
||||
|
||||
pass = false;
|
||||
failure_mssg = "unexpected data read from file (1)";
|
||||
}
|
||||
}
|
||||
} /* end if */
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 7) Barrier */
|
||||
MPI_Barrier(comm);
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* 8) Close the test file and delete it (on rank 0 only).
|
||||
* Close FAPL and DXPL.
|
||||
*/
|
||||
takedown_vfd_test_file(mpi_rank, filename, &lf, &fapl_id, &dxpl_id);
|
||||
|
||||
/* Free the local buffers */
|
||||
if (tt_addrs) {
|
||||
free(tt_addrs);
|
||||
tt_addrs = NULL;
|
||||
}
|
||||
|
||||
if (tt_bufs) {
|
||||
free(tt_bufs);
|
||||
tt_bufs = NULL;
|
||||
}
|
||||
|
||||
if (show_progress)
|
||||
fprintf(stdout, "%s: cp = %d, pass = %d.\n", fcn_name, cp++, pass);
|
||||
|
||||
/* report results */
|
||||
if (mpi_rank == 0) {
|
||||
|
||||
if (pass) {
|
||||
|
||||
PASSED();
|
||||
}
|
||||
else {
|
||||
|
||||
H5_FAILED();
|
||||
|
||||
if ((disp_failure_mssgs) || (show_progress)) {
|
||||
fprintf(stdout, "%s: failure_mssg = \"%s\"\n", fcn_name, failure_mssg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (!pass);
|
||||
|
||||
} /* vector_write_test_8() */
|
||||
|
||||
static void
|
||||
test_vector_io(int mpi_rank, int mpi_size)
|
||||
{
|
||||
@ -4249,6 +4526,13 @@ test_vector_io(int mpi_rank, int mpi_size)
|
||||
nerrs +=
|
||||
vector_write_test_7(6, mpi_rank, mpi_size, H5FD_MPIO_COLLECTIVE, H5FD_MPIO_COLLECTIVE_IO, "mpio");
|
||||
|
||||
nerrs +=
|
||||
vector_write_test_8(7, mpi_rank, mpi_size, H5FD_MPIO_INDEPENDENT, H5FD_MPIO_INDIVIDUAL_IO, "mpio");
|
||||
nerrs +=
|
||||
vector_write_test_8(7, mpi_rank, mpi_size, H5FD_MPIO_COLLECTIVE, H5FD_MPIO_INDIVIDUAL_IO, "mpio");
|
||||
nerrs +=
|
||||
vector_write_test_8(7, mpi_rank, mpi_size, H5FD_MPIO_COLLECTIVE, H5FD_MPIO_COLLECTIVE_IO, "mpio");
|
||||
|
||||
MPI_Barrier(comm);
|
||||
|
||||
#ifdef H5_HAVE_SUBFILING_VFD
|
||||
|
Loading…
Reference in New Issue
Block a user