mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-03-13 19:57:53 +08:00
Clean up thread management in parallel pg_dump for Windows.
Since we start the worker threads with _beginthreadex(), we should use _endthreadex() to terminate them. We got this right in the normal-exit code path, but not so much during an error exit from a worker. In addition, be sure to apply CloseHandle to the thread handle after each thread exits. It's not clear that these oversights cause any user-visible problems, since the pg_dump run is about to terminate anyway. Still, it's clearly better to follow Microsoft's API specifications than ignore them. Also a few cosmetic cleanups in WaitForTerminatingWorkers(), including being a bit less random about where to cast between uintptr_t and HANDLE, and being sure to clear the worker identity field for each dead worker (not that false matches should be possible later, but let's be careful). Original observation and patch by Armin Schöffmann, cosmetic improvements by Michael Paquier and me. (Armin's patch also included closing sockets in ShutdownWorkersHard(), but that's been dealt with already in commit df8d2d8c4.) Back-patch to 9.3 where parallel pg_dump was introduced. Discussion: <zarafa.570306bd.3418.074bf1420d8f2ba2@root.aegaeon.de>
This commit is contained in:
parent
d81ecb9b20
commit
807b45375b
@ -327,7 +327,7 @@ ShutdownWorkersHard(ParallelState *pstate)
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for the termination of the processes using the OS-specific method.
|
||||
* Wait for all workers to terminate.
|
||||
*/
|
||||
static void
|
||||
WaitForTerminatingWorkers(ParallelState *pstate)
|
||||
@ -338,39 +338,58 @@ WaitForTerminatingWorkers(ParallelState *pstate)
|
||||
int j;
|
||||
|
||||
#ifndef WIN32
|
||||
/* On non-Windows, use wait() to wait for next worker to end */
|
||||
int status;
|
||||
pid_t pid = wait(&status);
|
||||
|
||||
/* Find dead worker's slot, and clear the PID field */
|
||||
for (j = 0; j < pstate->numWorkers; j++)
|
||||
if (pstate->parallelSlot[j].pid == pid)
|
||||
slot = &(pstate->parallelSlot[j]);
|
||||
#else
|
||||
uintptr_t hThread;
|
||||
DWORD ret;
|
||||
uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
|
||||
{
|
||||
slot = &(pstate->parallelSlot[j]);
|
||||
if (slot->pid == pid)
|
||||
{
|
||||
slot->pid = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
#else /* WIN32 */
|
||||
/* On Windows, we must use WaitForMultipleObjects() */
|
||||
HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
|
||||
int nrun = 0;
|
||||
DWORD ret;
|
||||
uintptr_t hThread;
|
||||
|
||||
for (j = 0; j < pstate->numWorkers; j++)
|
||||
{
|
||||
if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
|
||||
{
|
||||
lpHandles[nrun] = pstate->parallelSlot[j].hThread;
|
||||
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
|
||||
nrun++;
|
||||
}
|
||||
ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
|
||||
}
|
||||
ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
|
||||
Assert(ret != WAIT_FAILED);
|
||||
hThread = lpHandles[ret - WAIT_OBJECT_0];
|
||||
|
||||
for (j = 0; j < pstate->numWorkers; j++)
|
||||
if (pstate->parallelSlot[j].hThread == hThread)
|
||||
slot = &(pstate->parallelSlot[j]);
|
||||
|
||||
hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
|
||||
free(lpHandles);
|
||||
#endif
|
||||
Assert(slot);
|
||||
|
||||
/* Find dead worker's slot, and clear the hThread field */
|
||||
for (j = 0; j < pstate->numWorkers; j++)
|
||||
{
|
||||
slot = &(pstate->parallelSlot[j]);
|
||||
if (slot->hThread == hThread)
|
||||
{
|
||||
/* For cleanliness, close handles for dead threads */
|
||||
CloseHandle((HANDLE) slot->hThread);
|
||||
slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
#endif /* WIN32 */
|
||||
|
||||
/* On all platforms, update workerStatus as well */
|
||||
Assert(j < pstate->numWorkers);
|
||||
slot->workerStatus = WRKR_TERMINATED;
|
||||
}
|
||||
Assert(HasEveryWorkerTerminated(pstate));
|
||||
}
|
||||
|
||||
#ifndef WIN32
|
||||
|
@ -149,7 +149,7 @@ exit_nicely(int code)
|
||||
|
||||
#ifdef WIN32
|
||||
if (parallel_init_done && GetCurrentThreadId() != mainThreadId)
|
||||
ExitThread(code);
|
||||
_endthreadex(code);
|
||||
#endif
|
||||
|
||||
exit(code);
|
||||
|
Loading…
x
Reference in New Issue
Block a user