Test code for sending chunk modification data around

This commit is contained in:
Jordan Henderson 2017-01-24 15:01:31 -06:00
parent b19b0ea67d
commit aab742c9a2

View File

@ -758,7 +758,7 @@ H5D__chunk_collective_io(H5D_io_info_t *io_info, const H5D_type_info_t *type_inf
/* step 1: choose an IO option */
/* If the average number of chunk per process is greater than a threshold, we will do one link chunked IO. */
if((unsigned)sum_chunk / mpi_size >= one_link_chunk_io_threshold)
if((unsigned)sum_chunk / (unsigned)mpi_size >= one_link_chunk_io_threshold)
io_option = H5D_ONE_LINK_CHUNK_IO_MORE_OPT;
#ifdef H5_HAVE_INSTRUMENTED_LIBRARY
else
@ -2660,10 +2660,13 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
{
H5D_filtered_collective_io_info_t *local_info_array = NULL;
H5D_filtered_collective_io_info_t *overlap_info_array = NULL;
H5S_sel_iter_t *mem_iter = NULL;
H5SL_node_t *chunk_node;
MPI_Request *send_requests = NULL;
MPI_Status *send_statuses = NULL;
hbool_t no_overlap = FALSE;
hbool_t mem_iter_init = FALSE;
uint8_t *mod_data = NULL;
size_t num_send_requests;
size_t num_chunks_selected;
size_t overlap_info_array_num_entries;
@ -2751,6 +2754,9 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
if (NULL == (send_requests = (MPI_Request *) H5MM_malloc(num_chunks_selected * sizeof(*send_requests))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate send requests buffer")
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(H5S_sel_iter_t))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
/* XXX: Change minor error code */
if (H5D__mpio_array_gather(io_info, local_info_array, num_chunks_selected,
sizeof(*local_info_array), (void **) &overlap_info_array, &overlap_info_array_num_entries,
@ -2771,7 +2777,7 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
/* Store the correct chunk entry information in case this process
* becomes the new chunk's owner. The chunk entry that this process
* contributed will be the only one with a valid dataspace selection
* on this particular process
* on that particular process
*/
if (mpi_rank == overlap_info_array[i].owner)
chunk_entry = overlap_info_array[i];
@ -2785,9 +2791,8 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
}
num_writers++;
i++;
if (i == overlap_info_array_num_entries) break;
if (++i == overlap_info_array_num_entries) break;
} while (overlap_info_array[i].old_chunk.offset == chunk_addr);
if (mpi_rank == new_owner) {
@ -2800,11 +2805,80 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
overlap_info_array[num_chunks_selected++] = chunk_entry;
} /* end if */
else {
/* Send modification data to new owner */
hssize_t iter_nelmts;
hssize_t mod_data_size;
uint8_t *mod_data_p = NULL;
/* if (MPI_SUCCESS != (mpi_code= MPI_Isend(, , , new_owner,
chunk_entry.old_chunk.offset, io_info->comm, &send_requests[num_send_requests++])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code) */
/* XXX: Need some way of checking chunk entry to validate that this process
* is actually contributing some data for this chunk update
*/
/* Determine size of serialized chunk selection plus the size
* of the data being written
*/
if ((mod_data_size = H5S_SELECT_SERIAL_SIZE(chunk_entry.chunk_info.mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to check dataspace selection size")
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(chunk_entry.chunk_info.mspace)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
/* XXX: For now, make sure enough memory is allocated by just adding the chunk
* size
*/
mod_data_size += iter_nelmts * type_info->src_type_size;
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Allocing %zd bytes for mod. data buffer.\n", (size_t) mod_data_size);
#endif
if (NULL == (mod_data = (uint8_t *) H5MM_malloc(mod_data_size)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk modification send buffer")
/* Serialize the current selection into the buffer */
mod_data_p = mod_data;
if (H5S_SELECT_SERIALIZE(chunk_entry.chunk_info.mspace, &mod_data_p) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTENCODE, FAIL, "unable to serialize dataspace selection")
/* Initialize iterator for memory selection */
if (H5S_select_iter_init(mem_iter, chunk_entry.chunk_info.mspace, type_info->src_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Iterating over %lld elements.\n", iter_nelmts);
#endif
/* Collect the modification data into the buffer */
if (!H5D__gather_mem(io_info->u.wbuf, chunk_entry.chunk_info.mspace, mem_iter,
(size_t) iter_nelmts, io_info->dxpl_cache, mod_data_p))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "| - Mod. Data Buffer:\n");
HDfprintf(debug_file, "| - [");
for (size_t j = 0; j < (size_t) iter_nelmts; j++) {
if (j > 0) HDfprintf(debug_file, ", ");
HDfprintf(debug_file, "%lld", ((long *) mod_data_p)[j]);
}
HDfprintf(debug_file, "]\n|\n");
HDfprintf(debug_file, "Sending modification data for chunk at address %a to process %d.\n", chunk_entry.old_chunk.offset, new_owner);
#endif
/* Send modification data to new owner */
H5_CHECK_OVERFLOW(mod_data_size, hssize_t, int)
if (MPI_SUCCESS != (mpi_code = MPI_Isend(mod_data, (int) mod_data_size, MPI_BYTE, new_owner,
chunk_entry.chunk_info.index, io_info->comm, &send_requests[num_send_requests++])))
HMPI_GOTO_ERROR(FAIL, "MPI_Isend failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Mod. data sent.\n");
#endif
if (mod_data)
mod_data = (uint8_t *) H5MM_free(mod_data);
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
} /* end else */
#ifdef PARALLEL_COMPRESS_DEBUG
@ -2866,12 +2940,15 @@ H5D__construct_filtered_io_info_list(const H5D_io_info_t *io_info, const H5D_typ
HMPI_GOTO_ERROR(FAIL, "MPI_Waitall failed", mpi_code)
}
done:
if (send_requests)
H5MM_free(send_requests);
if (send_statuses)
H5MM_free(send_statuses);
if (mod_data)
H5MM_free(mod_data);
if (mem_iter)
H5MM_free(mem_iter);
FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__construct_filtered_io_info_list() */
@ -3001,7 +3078,7 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
H5S_sel_iter_t *mem_iter = NULL;
unsigned filter_mask = 0;
hssize_t iter_nelmts;
hbool_t full_overwrite = TRUE;
hbool_t full_overwrite = FALSE;
hbool_t mem_iter_init = FALSE;
size_t buf_size;
herr_t ret_value = SUCCEED;
@ -3066,10 +3143,10 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
/* Owner of this chunk, receive modification data from other processes */
/* Initialize iterator for memory selection */
if (NULL == (mem_iter = (H5S_sel_iter_t *) H5MM_malloc(sizeof(*mem_iter))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate memory iterator")
/* Initialize iterator for memory selection */
/* XXX: dst_type_size may need to be src_type_size depending on operation */
if (H5S_select_iter_init(mem_iter, chunk_entry->chunk_info.mspace, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
@ -3097,8 +3174,111 @@ H5D__filtered_collective_chunk_entry_io(H5D_filtered_collective_io_info_t *chunk
(size_t) iter_nelmts, io_info->dxpl_cache, chunk_entry->buf))
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "couldn't gather from write buffer")
/* Update the chunk data with any modifications from other processes */
if (mem_iter_init && H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
/* Update the chunk data with any modifications from other processes */
while (chunk_entry->num_writers > 1) {
MPI_Status status;
uint8_t *buf = NULL;
uint8_t *buf_p = NULL;
H5S_t *selection = NULL;
int count;
int mpi_code;
/* XXX: Since the receive tag needs to be an int, it is possible that a chunk's index
* may fall outside the range of an int and cause an overflow problem when casting down
* here
*/
H5_CHECK_OVERFLOW(chunk_entry->chunk_info.index, hsize_t, int)
if (MPI_SUCCESS != (mpi_code = MPI_Probe(MPI_ANY_SOURCE, (int) chunk_entry->chunk_info.index,
io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Probe failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Found message from source %d with tag %d.\n", status.MPI_SOURCE, status.MPI_TAG);
#endif
/* Retrieve the message size */
if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_count failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Message size is %d bytes.\n", count);
#endif
if (NULL == (buf = (uint8_t *) H5MM_malloc(count)))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate modification data receive buffer")
buf_p = buf;
if (MPI_SUCCESS != (mpi_code = MPI_Recv(buf, count, MPI_BYTE, MPI_ANY_SOURCE,
chunk_entry->chunk_info.index, io_info->comm, &status)))
HMPI_GOTO_ERROR(FAIL, "MPI_Recv failed", mpi_code)
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Received the message.\n");
#endif
/* Deserialize the selection in the chunk's dataspace */
if (H5S_SELECT_DESERIALIZE(&selection, &buf_p) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTDECODE, FAIL, "unable to deserialize dataspace selection")
/* H5S_extent_copy(selection, chunk_entry->chunk_info.mspace); */
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Deserialized selection info:\n");
HDfprintf(debug_file, "| Mem Space:\n");
HDfprintf(debug_file, "| - Extent Num elements: %zd\n", H5S_GET_EXTENT_NPOINTS(selection));
HDfprintf(debug_file, "| - Extent type: %d\n", H5S_GET_EXTENT_TYPE(selection));
HDfprintf(debug_file, "| - Selection Num elements: %zd\n", H5S_GET_SELECT_NPOINTS(selection));
HDfprintf(debug_file, "| - Selection type: %d\n", H5S_GET_SELECT_TYPE(selection));
#endif
/* XXX: After receiving the selection information, the extent is not
* set/valid. Possibly copy the selection information directly into
* chunk entry mem space, or copy extent into received selection H5S type
*/
if (H5S_select_iter_init(mem_iter, selection, type_info->dst_type_size) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize memory selection information")
mem_iter_init = TRUE;
if ((iter_nelmts = H5S_GET_SELECT_NPOINTS(selection)) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTCOUNT, FAIL, "dataspace is invalid")
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Contents of message:\n[");
for (size_t j = 0; j < iter_nelmts; j++) {
if (j > 0) HDfprintf(debug_file, ", ");
HDfprintf(debug_file, "%lld", ((long *) buf_p)[j]);
}
HDfprintf(debug_file, "]\n");
#endif
#ifdef PARALLEL_COMPRESS_DEBUG
HDfprintf(debug_file, "Iter nelmts=%lld.\n", iter_nelmts);
HDfprintf(debug_file, "Mem space selected points: %zd.\n", H5S_GET_SELECT_NPOINTS(selection));
#endif
/* Update the chunk data with the received modification data */
/* if (!H5D__gather_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts,
io_info->dxpl_cache, chunk_entry->buf))
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't gather to write buffer") */
if (H5D__scatter_mem(buf_p, selection, mem_iter, (size_t) iter_nelmts,
io_info->dxpl_cache, chunk_entry->buf) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "couldn't scatter to write buffer")
chunk_entry->num_writers--;
if (buf)
H5MM_free(buf);
if (H5S_SELECT_ITER_RELEASE(mem_iter) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTFREE, FAIL, "couldn't release selection iterator")
if (selection)
if (H5S_close(selection) < 0)
HGOTO_ERROR(H5E_DATASPACE, H5E_CANTFREE, FAIL, "can't close dataspace")
}
#ifdef PARALLEL_COMPRESS_DEBUG