[svn-r10170] Purpose:

Bug Fixes

Description:
Fixes for several bugs, including dumping of excess output to a temporary file, fix for printing
hsize_t datatype, and the long awaited fix for intermixed output.

Solution:
Fix 1:  Overflow file
Previously, any output that a worker task made was buffered locally in memory, up to a point.  Any
output beyond the size of the buffer (used to be 10k) was discarded.  Now, the memory buffer size has been
changed to 1k and any output beyond this amount is sent a temporary file.  This way, no output is lost
and memory usage is kept under control.  The temporary file is deleted as soon as a worker task finishes
sending its contents to the manager.

Fix 2:  hsize_t printing
Printing of the hsize_t datatype used to be handled by %Hu passed to HDfprintf.  However, there is no corresponding HDvsnprintf that
is able to print hsize_t types.  These are now printed with the aid of H5_PRINTF_LL_WIDTH.

Fix 3:  Intermixed output fix
Intermixed output would occur on some machines (although I haven't seen it happen for a while) due to the unpredictability of the underlying network
and the speed at which various message would travel.  This has been fixed by having all output send to the manager
for printing.  The worker tasks no longer print the output themselves upon receipt of a token, but instead
send that data to the manager.



Platforms tested:
heping, eirene, tg-login (the only place that seems to still experience intermixed output every now and then)

Misc. update:
This commit is contained in:
Leon Arber 2005-03-09 13:38:36 -05:00
parent 0be2fb3aa3
commit 15e0a2331e
4 changed files with 160 additions and 56 deletions

View File

@ -131,8 +131,10 @@ ph5diff_worker(int nID)
struct diff_args args;
hid_t file1_id, file2_id;
char filenames[2][1024];
char out_data[PRINT_DATA_MAX_SIZE] = {0};
hsize_t nfound=0;
MPI_Status Status;
int i;
MPI_Comm_rank(MPI_COMM_WORLD, &nID);
outBuffOffset = 0;
@ -140,8 +142,6 @@ ph5diff_worker(int nID)
MPI_Recv(filenames, 1024*2, MPI_CHAR, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
if(Status.MPI_TAG == MPI_TAG_PARALLEL)
{
/*printf("We're in parallel mode...opening the files\n"); */
/* disable error reporting */
H5E_BEGIN_TRY
{
@ -180,8 +180,37 @@ ph5diff_worker(int nID)
/*Wait for print token. */
MPI_Recv(NULL, 0, MPI_BYTE, 0, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD, &Status);
/*When get token, print stuff out and return token */
printf("%s", outBuff);
/*When get token, send all of our output to the manager task and then return the token */
for(i=0; i<outBuffOffset; i+=PRINT_DATA_MAX_SIZE)
MPI_Send(outBuff+i, PRINT_DATA_MAX_SIZE, MPI_BYTE, 0, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD);
/* An overflow file exists, so we send it's output to the manager too and then delete it */
if(overflow_file)
{
int tmp;
memset(out_data, 0, PRINT_DATA_MAX_SIZE);
i=0;
rewind(overflow_file);
while((tmp = getc(overflow_file)) >= 0)
{
*(out_data + i++) = (char)tmp;
if(i==PRINT_DATA_MAX_SIZE)
{
MPI_Send(out_data, PRINT_DATA_MAX_SIZE, MPI_BYTE, 0, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD);
i=0;
memset(out_data, 0, PRINT_DATA_MAX_SIZE);
}
}
if(i>0)
MPI_Send(out_data, PRINT_DATA_MAX_SIZE, MPI_BYTE, 0, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD);
fclose(overflow_file);
overflow_file = NULL;
}
fflush(stdout);
memset(outBuff, 0, OUTBUFF_SIZE);
outBuffOffset = 0;
@ -196,17 +225,17 @@ ph5diff_worker(int nID)
}
else
MPI_Send(&nfound, sizeof(nfound), MPI_BYTE, 0, MPI_TAG_DONE, MPI_COMM_WORLD);
}
else if(Status.MPI_TAG == MPI_TAG_END)
{
MPI_Recv(NULL, 0, MPI_BYTE, 0, MPI_TAG_END, MPI_COMM_WORLD, &Status);
/* printf("exiting..., task: %d\n", nID); */
/* printf("exiting..., task: %d\n", nID);
fflush(stdout);*/
break;
}
else
{
printf("ERROR....invalid tag received\n");
printf("ph5diff_worker: ERROR: invalid tag (%d) received\n", Status.MPI_TAG);
MPI_Abort(MPI_COMM_WORLD, 0);
}

View File

@ -50,7 +50,6 @@ print_objname (diff_opt_t * options, hsize_t nfound)
void phdiff_dismiss_workers(void)
{
int i;
for(i=1; i<g_nTasks; i++)
MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD);
}
@ -76,16 +75,62 @@ void print_manager_output(void)
if( (outBuffOffset>0) && g_Parallel)
{
printf("%s", outBuff);
if(overflow_file)
{
int tmp;
rewind(overflow_file);
while((tmp = getc(overflow_file)) >= 0)
putchar(tmp);
fclose(overflow_file);
overflow_file = NULL;
}
fflush(stdout);
memset(outBuff, 0, OUTBUFF_SIZE);
outBuffOffset = 0;
}
else if( (outBuffOffset>0) && !g_Parallel)
{
printf("h5diff error: outBuffOffset>0, but we're not in parallel\n");
printf("h5diff error: outBuffOffset>0, but we're not in parallel!\n");
}
}
/*-------------------------------------------------------------------------
* Function: print_incoming_data
*
* Purpose: special function that prints any output that has been sent to the manager
* and is currently sitting in the incoming message queue
*
* Return: none
*
* Programmer: Leon Arber
*
* Date: March 7, 2005
*
*-------------------------------------------------------------------------
*/
static void print_incoming_data(void)
{
char data[PRINT_DATA_MAX_SIZE+1];
int incomingMessage;
MPI_Status Status;
do
{
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &incomingMessage, &Status);
if(incomingMessage)
{
memset(data, 0, PRINT_DATA_MAX_SIZE+1);
MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &Status);
printf("%s", data);
}
} while(incomingMessage);
}
#endif
/*-------------------------------------------------------------------------
@ -237,8 +282,7 @@ h5diff (const char *fname1,
if(g_Parallel)
{
/* Let tasks know that they won't be needed */
for(i=1; i<g_nTasks; i++)
MPI_Send(filenames, 1024*2, MPI_CHAR, i, MPI_TAG_END, MPI_COMM_WORLD);
phdiff_dismiss_workers();
}
#endif
assert (objname2);
@ -246,10 +290,6 @@ h5diff (const char *fname1,
nfound = diff_compare (file1_id, fname1, objname1, nobjects1, info1,
file2_id, fname2, objname2, nobjects2, info2,
options);
#ifdef H5_HAVE_PARALLEL
/* If there was something we buffered, let's print it now */
print_manager_output();
#endif
}
/*-------------------------------------------------------------------------
@ -494,7 +534,6 @@ diff_match (hid_t file1_id,
/* check to see if the print token was returned. */
if(!havePrintToken)
{
/* check incoming queue for token */
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, &Status);
@ -521,6 +560,9 @@ diff_match (hid_t file1_id,
havePrintToken = 0;
}
}
/* Print all the data in our incoming queue */
print_incoming_data();
}
/* check array of tasks to see which ones are free.
@ -551,28 +593,19 @@ diff_match (hid_t file1_id,
* if we don't have the token, some task is currently printing so we'll wait for that task to return it. */
if(!havePrintToken)
{
MPI_Probe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
if(Status.MPI_TAG == MPI_TAG_TOK_RETURN)
{
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
havePrintToken = 1;
nfound += nFoundbyWorker;
/* send this task the work unit. */
MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
}
else
{
printf("ERROR: Invalid (%d) tag received\n", Status.MPI_TAG);
MPI_Abort(MPI_COMM_WORLD, 0);
MPI_Finalize();
}
MPI_Recv(&nFoundbyWorker, sizeof(hsize_t), MPI_BYTE, MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
havePrintToken = 1;
nfound += nFoundbyWorker;
/* send this task the work unit. */
MPI_Send(&args, sizeof(struct diff_args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, MPI_COMM_WORLD);
}
/* if we do have the token, check for task to free up, or wait for a task to request it */
else
{
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
/* But first print all the data in our incoming queue */
print_incoming_data();
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
if(Status.MPI_TAG == MPI_TAG_DONE)
{
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
@ -595,9 +628,6 @@ diff_match (hid_t file1_id,
MPI_Finalize();
}
}
}
}
#endif
@ -611,10 +641,17 @@ diff_match (hid_t file1_id,
{
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
if(Status.MPI_TAG == MPI_TAG_DONE)
{
MPI_Recv(&nFoundbyWorker, sizeof(hsize_t), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker;
busyTasks--;
}
else if(Status.MPI_TAG == MPI_TAG_TOK_RETURN)
{
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker;
busyTasks--;
havePrintToken = 1;
}
else if(Status.MPI_TAG == MPI_TAG_TOK_REQUEST)
{
@ -642,15 +679,28 @@ diff_match (hid_t file1_id,
busyTasks--;
havePrintToken = 1;
}
else if(Status.MPI_TAG == MPI_TAG_PRINT_DATA)
{
char data[PRINT_DATA_MAX_SIZE+1];
memset(data, 0, PRINT_DATA_MAX_SIZE+1);
MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &Status);
printf("%s", data);
}
else
{
printf("ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG);
printf("ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG);
MPI_Abort(MPI_COMM_WORLD, 0);
}
}
for(i=1; i<g_nTasks; i++)
MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD);
/* Print any final data waiting in our queue */
print_incoming_data();
}
free(workerTasks);
@ -669,10 +719,6 @@ diff_match (hid_t file1_id,
/* the manager can do this. */
nfound += diff (file1_id, "/", file2_id, "/", options, H5G_GROUP);
#ifdef H5_HAVE_PARALLEL
/* If there was something we buffered, let's print it now */
print_manager_output();
#endif
return nfound;
}
@ -712,13 +758,13 @@ diff_compare (hid_t file1_id,
if (i == -1)
{
printf ("Object <%s> could not be found in <%s>\n", obj1_name,
parallel_print ("Object <%s> could not be found in <%s>\n", obj1_name,
file1_name);
f1 = 1;
}
if (j == -1)
{
printf ("Object <%s> could not be found in <%s>\n", obj2_name,
parallel_print ("Object <%s> could not be found in <%s>\n", obj2_name,
file2_name);
f2 = 1;
}
@ -736,7 +782,7 @@ diff_compare (hid_t file1_id,
if (info1[i].type != info2[j].type)
{
if (options->m_verbose)
printf
parallel_print
("Comparison not possible: <%s> is of type %s and <%s> is of type %s\n",
obj1_name, get_type (info1[i].type), obj2_name,
get_type (info2[j].type));

View File

@ -21,6 +21,7 @@ int g_nTasks = 1;
unsigned char g_Parallel = 0; /*0 for serial, 1 for parallel */
char outBuff[OUTBUFF_SIZE];
unsigned int outBuffOffset;
FILE* overflow_file = NULL;
/*-------------------------------------------------------------------------
* Function: parallel_print
@ -35,6 +36,7 @@ unsigned int outBuffOffset;
*/
void parallel_print(const char* format, ...)
{
int bytes_written;
va_list ap;
va_start(ap, format);
@ -43,13 +45,37 @@ void parallel_print(const char* format, ...)
vprintf(format, ap);
else
{
if((OUTBUFF_SIZE-outBuffOffset) > 0)
outBuffOffset += HDvsnprintf(outBuff+outBuffOffset, OUTBUFF_SIZE-outBuffOffset, format, ap);
if(overflow_file == NULL) /*no overflow has occurred yet */
{
bytes_written = HDvsnprintf(outBuff+outBuffOffset, OUTBUFF_SIZE-outBuffOffset, format, ap);
va_end(ap);
va_start(ap, format);
if(bytes_written >= (OUTBUFF_SIZE-outBuffOffset))
{
/* Delete the characters that were written to outBuff since they will be written to the overflow_file */
memset(outBuff+outBuffOffset, 0, OUTBUFF_SIZE - outBuffOffset);
overflow_file = tmpfile();
if(overflow_file == NULL)
printf("Warning: Could not create overflow file. Output may be truncated.\n");
else
bytes_written = HDvfprintf(overflow_file, format, ap);
}
else
outBuffOffset += bytes_written;
}
else
bytes_written = HDvfprintf(overflow_file, format, ap);
}
va_end(ap);
}
/*-------------------------------------------------------------------------
* Function: print_pos
*
@ -110,7 +136,7 @@ void print_pos( int *ph,
for ( i = 0; i < rank; i++)
{
/* HDfprintf(stdout,"%Hu ", pos[i] ); */
parallel_print("%d ",(int) pos[i]);
parallel_print("%"H5_PRINTF_LL_WIDTH"u ", pos[i]);
}
parallel_print("]" );
}
@ -377,7 +403,7 @@ get_class(H5T_class_t tclass)
void print_found(hsize_t nfound)
{
if(g_Parallel)
outBuffOffset += HDsnprintf(outBuff+outBuffOffset, OUTBUFF_SIZE-outBuffOffset, "%lld differences found\n", nfound);
parallel_print("%"H5_PRINTF_LL_WIDTH"u differences found\n", nfound);
else
HDfprintf(stdout,"%Hu differences found\n",nfound);
}

View File

@ -16,7 +16,8 @@
#define _PH5DIFF_H__
#define OUTBUFF_SIZE 50000
#define PRINT_DATA_MAX_SIZE 512
#define OUTBUFF_SIZE PRINT_DATA_MAX_SIZE*2
/* Send from manager to workers */
#define MPI_TAG_ARGS 1
#define MPI_TAG_PRINT_TOK 2
@ -25,15 +26,17 @@
#define MPI_TAG_TOK_REQUEST 3
#define MPI_TAG_DONE 4
#define MPI_TAG_TOK_RETURN 5
#define MPI_TAG_PRINT_DATA 6
/* Operational tags used to init and complete diff */
#define MPI_TAG_END 6
#define MPI_TAG_PARALLEL 7
#define MPI_TAG_END 7
#define MPI_TAG_PARALLEL 8
extern int g_nTasks;
extern unsigned char g_Parallel;
extern char outBuff[OUTBUFF_SIZE];
extern char outBuff[];
extern unsigned int outBuffOffset;
extern FILE* overflow_file;
struct diff_args
{