mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-27 08:39:28 +08:00
8396447cdb
libpgcommon is a new static library to allow sharing code among the various frontend programs and backend; this lets us eliminate duplicate implementations of common routines. We avoid libpgport, because that's intended as a place for porting issues; per discussion, it seems better to keep them separate. The first use case, and the only implemented by this patch, is pg_malloc and friends, which many frontend programs were already using. At the same time, we can use this to provide palloc emulation functions for the frontend; this way, some palloc-using files in the backend can also be used by the frontend cleanly. To do this, we change palloc() in the backend to be a function instead of a macro on top of MemoryContextAlloc(). This was previously believed to cause loss of performance, but this implementation has been tweaked by Tom and Andres so that on modern compilers it provides a slight improvement over the previous one. This lets us clean up some places that were already with localized hacks. Most of the pg_malloc/palloc changes in this patch were authored by Andres Freund. Zoltán Böszörményi also independently provided a form of that. libpgcommon infrastructure was authored by Álvaro.
340 lines
8.1 KiB
C
340 lines
8.1 KiB
C
/*
|
|
* parallel.c
|
|
*
|
|
* multi-process support
|
|
*
|
|
* Copyright (c) 2010-2013, PostgreSQL Global Development Group
|
|
* contrib/pg_upgrade/parallel.c
|
|
*/
|
|
|
|
#include "postgres_fe.h"
|
|
|
|
#include "pg_upgrade.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
|
|
#ifdef WIN32
|
|
#include <io.h>
|
|
#endif
|
|
|
|
static int parallel_jobs;
|
|
|
|
#ifdef WIN32
|
|
/*
|
|
* Array holding all active threads. There can't be any gaps/zeros so
|
|
* it can be passed to WaitForMultipleObjects(). We use two arrays
|
|
* so the thread_handles array can be passed to WaitForMultipleObjects().
|
|
*/
|
|
HANDLE *thread_handles;
|
|
|
|
typedef struct {
|
|
char log_file[MAXPGPATH];
|
|
char opt_log_file[MAXPGPATH];
|
|
char cmd[MAX_STRING];
|
|
} exec_thread_arg;
|
|
|
|
typedef struct {
|
|
DbInfoArr *old_db_arr;
|
|
DbInfoArr *new_db_arr;
|
|
char old_pgdata[MAXPGPATH];
|
|
char new_pgdata[MAXPGPATH];
|
|
char old_tablespace[MAXPGPATH];
|
|
} transfer_thread_arg;
|
|
|
|
exec_thread_arg **exec_thread_args;
|
|
transfer_thread_arg **transfer_thread_args;
|
|
|
|
/* track current thread_args struct so reap_child() can be used for all cases */
|
|
void **cur_thread_args;
|
|
|
|
DWORD win32_exec_prog(exec_thread_arg *args);
|
|
DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
|
|
|
|
#endif
|
|
|
|
/*
|
|
* parallel_exec_prog
|
|
*
|
|
* This has the same API as exec_prog, except it does parallel execution,
|
|
* and therefore must throw errors and doesn't return an error status.
|
|
*/
|
|
void
|
|
parallel_exec_prog(const char *log_file, const char *opt_log_file,
|
|
const char *fmt,...)
|
|
{
|
|
va_list args;
|
|
char cmd[MAX_STRING];
|
|
#ifndef WIN32
|
|
pid_t child;
|
|
#else
|
|
HANDLE child;
|
|
exec_thread_arg *new_arg;
|
|
#endif
|
|
|
|
va_start(args, fmt);
|
|
vsnprintf(cmd, sizeof(cmd), fmt, args);
|
|
va_end(args);
|
|
|
|
if (user_opts.jobs <= 1)
|
|
/* throw_error must be true to allow jobs */
|
|
exec_prog(log_file, opt_log_file, true, "%s", cmd);
|
|
else
|
|
{
|
|
/* parallel */
|
|
#ifdef WIN32
|
|
cur_thread_args = (void **)exec_thread_args;
|
|
#endif
|
|
/* harvest any dead children */
|
|
while (reap_child(false) == true)
|
|
;
|
|
|
|
/* must we wait for a dead child? */
|
|
if (parallel_jobs >= user_opts.jobs)
|
|
reap_child(true);
|
|
|
|
/* set this before we start the job */
|
|
parallel_jobs++;
|
|
|
|
/* Ensure stdio state is quiesced before forking */
|
|
fflush(NULL);
|
|
|
|
#ifndef WIN32
|
|
child = fork();
|
|
if (child == 0)
|
|
/* use _exit to skip atexit() functions */
|
|
_exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
|
|
else if (child < 0)
|
|
/* fork failed */
|
|
pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
|
|
#else
|
|
if (thread_handles == NULL)
|
|
{
|
|
int i;
|
|
|
|
thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
|
|
exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
|
|
|
|
/*
|
|
* For safety and performance, we keep the args allocated during
|
|
* the entire life of the process, and we don't free the args
|
|
* in a thread different from the one that allocated it.
|
|
*/
|
|
for (i = 0; i < user_opts.jobs; i++)
|
|
exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg));
|
|
}
|
|
|
|
/* use first empty array element */
|
|
new_arg = exec_thread_args[parallel_jobs-1];
|
|
|
|
/* Can only pass one pointer into the function, so use a struct */
|
|
strcpy(new_arg->log_file, log_file);
|
|
strcpy(new_arg->opt_log_file, opt_log_file);
|
|
strcpy(new_arg->cmd, cmd);
|
|
|
|
child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
|
|
new_arg, 0, NULL);
|
|
if (child == 0)
|
|
pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
|
|
|
|
thread_handles[parallel_jobs-1] = child;
|
|
#endif
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
#ifdef WIN32
|
|
DWORD
|
|
win32_exec_prog(exec_thread_arg *args)
|
|
{
|
|
int ret;
|
|
|
|
ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
|
|
|
|
/* terminates thread */
|
|
return ret;
|
|
}
|
|
#endif
|
|
|
|
|
|
/*
|
|
* parallel_transfer_all_new_dbs
|
|
*
|
|
* This has the same API as transfer_all_new_dbs, except it does parallel execution
|
|
* by transfering multiple tablespaces in parallel
|
|
*/
|
|
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
|
|
char *old_pgdata, char *new_pgdata,
|
|
char *old_tablespace)
|
|
{
|
|
#ifndef WIN32
|
|
pid_t child;
|
|
#else
|
|
HANDLE child;
|
|
transfer_thread_arg *new_arg;
|
|
#endif
|
|
|
|
if (user_opts.jobs <= 1)
|
|
/* throw_error must be true to allow jobs */
|
|
transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
|
|
else
|
|
{
|
|
/* parallel */
|
|
#ifdef WIN32
|
|
cur_thread_args = (void **)transfer_thread_args;
|
|
#endif
|
|
/* harvest any dead children */
|
|
while (reap_child(false) == true)
|
|
;
|
|
|
|
/* must we wait for a dead child? */
|
|
if (parallel_jobs >= user_opts.jobs)
|
|
reap_child(true);
|
|
|
|
/* set this before we start the job */
|
|
parallel_jobs++;
|
|
|
|
/* Ensure stdio state is quiesced before forking */
|
|
fflush(NULL);
|
|
|
|
#ifndef WIN32
|
|
child = fork();
|
|
if (child == 0)
|
|
{
|
|
transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
|
|
old_tablespace);
|
|
/* if we take another exit path, it will be non-zero */
|
|
/* use _exit to skip atexit() functions */
|
|
_exit(0);
|
|
}
|
|
else if (child < 0)
|
|
/* fork failed */
|
|
pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
|
|
#else
|
|
if (thread_handles == NULL)
|
|
{
|
|
int i;
|
|
|
|
thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
|
|
transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
|
|
|
|
/*
|
|
* For safety and performance, we keep the args allocated during
|
|
* the entire life of the process, and we don't free the args
|
|
* in a thread different from the one that allocated it.
|
|
*/
|
|
for (i = 0; i < user_opts.jobs; i++)
|
|
transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg));
|
|
}
|
|
|
|
/* use first empty array element */
|
|
new_arg = transfer_thread_args[parallel_jobs-1];
|
|
|
|
/* Can only pass one pointer into the function, so use a struct */
|
|
new_arg->old_db_arr = old_db_arr;
|
|
new_arg->new_db_arr = new_db_arr;
|
|
strcpy(new_arg->old_pgdata, old_pgdata);
|
|
strcpy(new_arg->new_pgdata, new_pgdata);
|
|
strcpy(new_arg->old_tablespace, old_tablespace);
|
|
|
|
child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
|
|
new_arg, 0, NULL);
|
|
if (child == 0)
|
|
pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
|
|
|
|
thread_handles[parallel_jobs-1] = child;
|
|
#endif
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
#ifdef WIN32
|
|
DWORD
|
|
win32_transfer_all_new_dbs(transfer_thread_arg *args)
|
|
{
|
|
transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
|
|
args->new_pgdata, args->old_tablespace);
|
|
|
|
/* terminates thread */
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
|
|
/*
|
|
* collect status from a completed worker child
|
|
*/
|
|
bool
|
|
reap_child(bool wait_for_child)
|
|
{
|
|
#ifndef WIN32
|
|
int work_status;
|
|
int ret;
|
|
#else
|
|
int thread_num;
|
|
DWORD res;
|
|
#endif
|
|
|
|
if (user_opts.jobs <= 1 || parallel_jobs == 0)
|
|
return false;
|
|
|
|
#ifndef WIN32
|
|
ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
|
|
|
|
/* no children or, for WNOHANG, no dead children */
|
|
if (ret <= 0 || !WIFEXITED(work_status))
|
|
return false;
|
|
|
|
if (WEXITSTATUS(work_status) != 0)
|
|
pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
|
|
|
|
#else
|
|
/* wait for one to finish */
|
|
thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
|
|
false, wait_for_child ? INFINITE : 0);
|
|
|
|
if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
|
|
return false;
|
|
|
|
/* compute thread index in active_threads */
|
|
thread_num -= WAIT_OBJECT_0;
|
|
|
|
/* get the result */
|
|
GetExitCodeThread(thread_handles[thread_num], &res);
|
|
if (res != 0)
|
|
pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
|
|
|
|
/* dispose of handle to stop leaks */
|
|
CloseHandle(thread_handles[thread_num]);
|
|
|
|
/* Move last slot into dead child's position */
|
|
if (thread_num != parallel_jobs - 1)
|
|
{
|
|
void *tmp_args;
|
|
|
|
thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
|
|
|
|
/*
|
|
* We must swap the arg struct pointers because the thread we
|
|
* just moved is active, and we must make sure it is not
|
|
* reused by the next created thread. Instead, the new thread
|
|
* will use the arg struct of the thread that just died.
|
|
*/
|
|
tmp_args = cur_thread_args[thread_num];
|
|
cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
|
|
cur_thread_args[parallel_jobs - 1] = tmp_args;
|
|
}
|
|
#endif
|
|
|
|
/* do this after job has been removed */
|
|
parallel_jobs--;
|
|
|
|
return true;
|
|
}
|