Support for UnifyFS with MPI_File_sync (#1801)

* Initial implementation for supporting UnifyFS in HDF5 with MPI_File_sync after write

* Committing clang-format changes

* Fix format

* Fix env variable and return value check

* Fix flag retrieve

* Fix issues with getting/setting the flag

* Fix merge conflicts

* Update

* Committing clang-format changes

* Update based on suggestions

* Committing clang-format changes

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Houjun Tang 2022-07-11 13:59:19 -07:00 committed by GitHub
parent 63ce6839b5
commit 663321087a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 172 additions and 11 deletions

View File

@ -830,6 +830,18 @@ H5D__ioinfo_adjust(H5D_io_info_t *io_info, const H5D_t *dset, const H5S_t *file_
} /* end if */
} /* end if */
else {
/* Fail when file sync is required, since it requires collective write */
if (io_info->op_type == H5D_IO_OP_WRITE) {
hbool_t mpi_file_sync_required = FALSE;
if (H5F_shared_get_mpi_file_sync_required(io_info->f_sh, &mpi_file_sync_required) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "can't get MPI file_sync_required flag")
if (mpi_file_sync_required)
HGOTO_ERROR(
H5E_DATASET, H5E_NO_INDEPENDENT, FAIL,
"Can't perform independent write when MPI_File_sync is required by ROMIO driver.")
}
/* Check if there are any filters in the pipeline. If there are,
* we cannot break to independent I/O if this is a write operation
* with multiple ranks involved; otherwise, there will be metadata

View File

@ -252,6 +252,43 @@ H5FD_mpi_haddr_to_MPIOff(haddr_t addr, MPI_Offset *mpi_off /*out*/)
FUNC_LEAVE_NOAPI(ret_value)
}
/*-------------------------------------------------------------------------
* Function: H5FD_mpi_get_file_sync_required
*
* Purpose: Retrieves the mpi_file_sync_required used for the file
*
* Return: Success: Non-negative
*
* Failure: Negative
*
* Programmer: Houjun Tang
* May 19, 2022
*
*-------------------------------------------------------------------------
*/
herr_t
H5FD_mpi_get_file_sync_required(H5FD_t *file, hbool_t *file_sync_required)
{
const H5FD_class_t *cls;
uint64_t flags = H5FD_CTL_ROUTE_TO_TERMINAL_VFD_FLAG;
void * file_sync_required_ptr = (void *)(&file_sync_required);
herr_t ret_value = SUCCEED;
FUNC_ENTER_NOAPI(FAIL)
HDassert(file);
cls = (const H5FD_class_t *)(file->cls);
HDassert(cls);
HDassert(cls->ctl); /* All MPI drivers must implement this */
/* Dispatch to driver */
if ((cls->ctl)(file, H5FD_CTL_GET_MPI_FILE_SYNC_OPCODE, flags, NULL, file_sync_required_ptr) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "driver get_mpi_file_synce request failed")
done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5FD_mpi_get_file_sync_required() */
#ifdef NOT_YET
/*-------------------------------------------------------------------------

View File

@ -60,16 +60,17 @@ static char H5FD_mpi_native_g[] = "native";
* driver doesn't bother to keep it updated since it's an expensive operation.
*/
typedef struct H5FD_mpio_t {
H5FD_t pub; /* Public stuff, must be first */
MPI_File f; /* MPIO file handle */
MPI_Comm comm; /* MPI Communicator */
MPI_Info info; /* MPI info object */
int mpi_rank; /* This process's rank */
int mpi_size; /* Total number of processes */
haddr_t eof; /* End-of-file marker */
haddr_t eoa; /* End-of-address marker */
haddr_t last_eoa; /* Last known end-of-address marker */
haddr_t local_eof; /* Local end-of-file address for each process */
H5FD_t pub; /* Public stuff, must be first */
MPI_File f; /* MPIO file handle */
MPI_Comm comm; /* MPI Communicator */
MPI_Info info; /* MPI info object */
int mpi_rank; /* This process's rank */
int mpi_size; /* Total number of processes */
haddr_t eof; /* End-of-file marker */
haddr_t eoa; /* End-of-address marker */
haddr_t last_eoa; /* Last known end-of-address marker */
haddr_t local_eof; /* Local end-of-file address for each process */
hbool_t mpi_file_sync_required; /* Whether the ROMIO driver requires MPI_File_sync after write */
} H5FD_mpio_t;
/* Private Prototypes */
@ -964,6 +965,10 @@ H5FD__mpio_open(const char *name, unsigned flags, hid_t fapl_id, haddr_t H5_ATTR
file->mpi_rank = mpi_rank;
file->mpi_size = mpi_size;
/* Retrieve the flag indicating whether MPI_File_sync is needed after each write */
if (H5_mpio_get_file_sync_required(fh, &file->mpi_file_sync_required) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, NULL, "unable to get mpi_file_sync_required hint")
/* Only processor p0 will get the filesize and broadcast it. */
if (mpi_rank == 0) {
/* If MPI_File_get_size fails, broadcast file size as -1 to signal error */
@ -1629,6 +1634,12 @@ H5FD__mpio_write(H5FD_t *_file, H5FD_mem_t type, hid_t H5_ATTR_UNUSED dxpl_id, h
if (MPI_SUCCESS !=
(mpi_code = MPI_File_write_at_all(file->f, mpi_off, buf, size_i, buf_type, &mpi_stat)))
HMPI_GOTO_ERROR(FAIL, "MPI_File_write_at_all failed", mpi_code)
/* Do MPI_File_sync when needed by underlying ROMIO driver */
if (file->mpi_file_sync_required) {
if (MPI_SUCCESS != (mpi_code = MPI_File_sync(file->f)))
HMPI_GOTO_ERROR(FAIL, "MPI_File_sync failed", mpi_code)
}
} /* end if */
else {
if (type != H5FD_MEM_DRAW)
@ -2638,6 +2649,12 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co
if (MPI_SUCCESS != (mpi_code = MPI_File_write_at_all(file->f, mpi_off, mpi_bufs_base, size_i,
buf_type, &mpi_stat)))
HMPI_GOTO_ERROR(FAIL, "MPI_File_write_at_all failed", mpi_code)
/* Do MPI_File_sync when needed by underlying ROMIO driver */
if (file->mpi_file_sync_required) {
if (MPI_SUCCESS != (mpi_code = MPI_File_sync(file->f)))
HMPI_GOTO_ERROR(FAIL, "MPI_File_sync failed", mpi_code)
}
} /* end if */
else if (size_i > 0) {
#ifdef H5FDmpio_DEBUG
@ -3020,6 +3037,7 @@ done:
* H5FD_CTL_GET_MPI_COMMUNICATOR_OPCODE
* H5FD_CTL_GET_MPI_RANK_OPCODE
* H5FD_CTL_GET_MPI_SIZE_OPCODE
* H5FD_CTL_GET_MPI_FILE_SYNC_OPCODE
*
* Note that these opcodes must be supported by all VFDs that
* support MPI.
@ -3063,6 +3081,12 @@ H5FD__mpio_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, const void H5_AT
**((int **)output) = file->mpi_size;
break;
case H5FD_CTL_GET_MPI_FILE_SYNC_OPCODE:
HDassert(output);
HDassert(*output);
**((hbool_t **)output) = file->mpi_file_sync_required;
break;
default: /* unknown op code */
if (flags & H5FD_CTL_FAIL_IF_UNKNOWN_FLAG) {

View File

@ -194,6 +194,7 @@ H5_DLL herr_t H5FD_get_mpio_atomicity(H5FD_t *file, hbool_t *flag);
H5_DLL int H5FD_mpi_get_rank(H5FD_t *file);
H5_DLL int H5FD_mpi_get_size(H5FD_t *file);
H5_DLL MPI_Comm H5FD_mpi_get_comm(H5FD_t *file);
H5_DLL herr_t H5FD_mpi_get_file_sync_required(H5FD_t *file, hbool_t *file_sync_required);
#endif /* H5_HAVE_PARALLEL */
#endif /* H5FDprivate_H */

View File

@ -196,6 +196,7 @@
#define H5FD_CTL_MEM_ALLOC 5
#define H5FD_CTL_MEM_FREE 6
#define H5FD_CTL_MEM_COPY 7
#define H5FD_CTL_GET_MPI_FILE_SYNC_OPCODE 8
/* ctl function flags: */

View File

@ -587,5 +587,4 @@ done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5F_mpi_get_file_block_type() */
#endif /* H5_HAVE_PARALLEL */

View File

@ -974,6 +974,7 @@ H5_DLL herr_t H5F_mpi_retrieve_comm(hid_t loc_id, hid_t acspl_id, MPI_Comm *mp
H5_DLL herr_t H5F_mpi_get_file_block_type(hbool_t commit, MPI_Datatype *new_type, hbool_t *new_type_derived);
H5_DLL hbool_t H5F_get_coll_metadata_reads(const H5F_t *f);
H5_DLL void H5F_set_coll_metadata_reads(H5F_t *f, H5P_coll_md_read_flag_t *file_flag, hbool_t *context_flag);
H5_DLL herr_t H5F_shared_get_mpi_file_sync_required(const H5F_shared_t *f_sh, hbool_t *flag);
#endif /* H5_HAVE_PARALLEL */
/* External file cache routines */

View File

@ -1068,6 +1068,37 @@ H5F_coll_md_read(const H5F_t *f)
FUNC_LEAVE_NOAPI(f->shared->coll_md_read)
} /* end H5F_coll_md_read() */
/*-------------------------------------------------------------------------
* Function: H5F_shared_get_mpi_file_sync_required
*
* Purpose: Returns the mpi_file_sync_required flag
*
* Return: Success: Non-negative
* Failure: Negative
*
* Programmer: Houjun Tang
* May 19, 2022
*
*-------------------------------------------------------------------------
*/
herr_t
H5F_shared_get_mpi_file_sync_required(const H5F_shared_t *f_sh, hbool_t *flag /*out*/)
{
herr_t ret_value = SUCCEED; /* Return value */
FUNC_ENTER_NOAPI(FAIL)
HDassert(f_sh);
HDassert(flag);
/* Dispatch to driver */
if ((ret_value = H5FD_mpi_get_file_sync_required(f_sh->lf, flag)) < 0)
HGOTO_ERROR(H5E_FILE, H5E_CANTGET, FAIL, "driver get_file_sync_required request failed")
done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5F_shared_get_mpi_file_sync_required() */
#endif /* H5_HAVE_PARALLEL */
/*-------------------------------------------------------------------------

View File

@ -782,4 +782,58 @@ done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5_mpio_gatherv_alloc_simple() */
/*-------------------------------------------------------------------------
* Function: H5_mpio_get_file_sync_required
*
* Purpose: Retrieve the MPI hint indicating whether the data written
* by the MPI ROMIO driver is immediately visible to all MPI
* ranks.
*
* Notes: This routine is designed for supporting UnifyFS that needs
* MPI_File_sync in order to make the written data available
* to all ranks.
*
* Return: Non-negative on success/Negative on failure
*
* Programmer: Houjun Tang, April 7, 2022
*
*-------------------------------------------------------------------------
*/
herr_t
H5_mpio_get_file_sync_required(MPI_File fh, hbool_t *file_sync_required)
{
MPI_Info info_used;
int flag;
char value[MPI_MAX_INFO_VAL];
herr_t ret_value = SUCCEED;
FUNC_ENTER_NOAPI(FAIL)
HDassert(file_sync_required);
*file_sync_required = FALSE;
if (MPI_SUCCESS != MPI_File_get_info(fh, &info_used))
HGOTO_ERROR(H5E_LIB, H5E_CANTGET, FAIL, "can't get MPI info")
if (MPI_SUCCESS !=
MPI_Info_get(info_used, "romio_visibility_immediate", MPI_MAX_INFO_VAL - 1, value, &flag))
HGOTO_ERROR(H5E_LIB, H5E_CANTGET, FAIL, "can't get MPI info")
if (flag && !HDstrcmp(value, "false"))
*file_sync_required = TRUE;
if (MPI_SUCCESS != MPI_Info_free(&info_used))
HGOTO_ERROR(H5E_LIB, H5E_CANTFREE, FAIL, "can't free MPI info")
/* Force setting the flag via env variable (temp solution before the flag is implemented in MPI) */
char *sync_env_var = HDgetenv("HDF5_DO_MPI_FILE_SYNC");
if (sync_env_var && (!HDstrcmp(sync_env_var, "TRUE") || !HDstrcmp(sync_env_var, "1")))
*file_sync_required = TRUE;
if (sync_env_var && (!HDstrcmp(sync_env_var, "FALSE") || !HDstrcmp(sync_env_var, "0")))
*file_sync_required = FALSE;
done:
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5_mpio_get_file_sync_required() */
#endif /* H5_HAVE_PARALLEL */

View File

@ -2614,6 +2614,7 @@ H5_DLL herr_t H5_mpio_gatherv_alloc_simple(void *send_buf, int send_count, MPI_
MPI_Datatype recv_type, hbool_t allgather, int root, MPI_Comm comm,
int mpi_rank, int mpi_size, void **out_buf,
size_t *out_buf_num_entries);
H5_DLL herr_t H5_mpio_get_file_sync_required(MPI_File fh, hbool_t *file_sync_required);
#endif /* H5_HAVE_PARALLEL */
/* Functions for debugging */