mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-06 15:24:56 +08:00
a89c46f9bc
This patch implements parallel copying/linking of files by tablespace using the --jobs option in pg_upgrade.
340 lines
8.1 KiB
C
340 lines
8.1 KiB
C
/*
|
|
* parallel.c
|
|
*
|
|
* multi-process support
|
|
*
|
|
* Copyright (c) 2010-2013, PostgreSQL Global Development Group
|
|
* contrib/pg_upgrade/parallel.c
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "pg_upgrade.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <sys/types.h>
|
|
#include <sys/wait.h>
|
|
|
|
#ifdef WIN32
|
|
#include <io.h>
|
|
#endif
|
|
|
|
static int parallel_jobs;
|
|
|
|
#ifdef WIN32
|
|
/*
|
|
* Array holding all active threads. There can't be any gaps/zeros so
|
|
* it can be passed to WaitForMultipleObjects(). We use two arrays
|
|
* so the thread_handles array can be passed to WaitForMultipleObjects().
|
|
*/
|
|
HANDLE *thread_handles;
|
|
|
|
typedef struct {
|
|
char log_file[MAXPGPATH];
|
|
char opt_log_file[MAXPGPATH];
|
|
char cmd[MAX_STRING];
|
|
} exec_thread_arg;
|
|
|
|
typedef struct {
|
|
DbInfoArr *old_db_arr;
|
|
DbInfoArr *new_db_arr;
|
|
char old_pgdata[MAXPGPATH];
|
|
char new_pgdata[MAXPGPATH];
|
|
char old_tablespace[MAXPGPATH];
|
|
} transfer_thread_arg;
|
|
|
|
exec_thread_arg **exec_thread_args;
|
|
transfer_thread_arg **transfer_thread_args;
|
|
|
|
/* track current thread_args struct so reap_child() can be used for all cases */
|
|
void **cur_thread_args;
|
|
|
|
DWORD win32_exec_prog(exec_thread_arg *args);
|
|
DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
|
|
|
|
#endif
|
|
|
|
/*
|
|
* parallel_exec_prog
|
|
*
|
|
* This has the same API as exec_prog, except it does parallel execution,
|
|
* and therefore must throw errors and doesn't return an error status.
|
|
*/
|
|
void
|
|
parallel_exec_prog(const char *log_file, const char *opt_log_file,
|
|
const char *fmt,...)
|
|
{
|
|
va_list args;
|
|
char cmd[MAX_STRING];
|
|
#ifndef WIN32
|
|
pid_t child;
|
|
#else
|
|
HANDLE child;
|
|
exec_thread_arg *new_arg;
|
|
#endif
|
|
|
|
va_start(args, fmt);
|
|
vsnprintf(cmd, sizeof(cmd), fmt, args);
|
|
va_end(args);
|
|
|
|
if (user_opts.jobs <= 1)
|
|
/* throw_error must be true to allow jobs */
|
|
exec_prog(log_file, opt_log_file, true, "%s", cmd);
|
|
else
|
|
{
|
|
/* parallel */
|
|
#ifdef WIN32
|
|
cur_thread_args = (void **)exec_thread_args;
|
|
#endif
|
|
/* harvest any dead children */
|
|
while (reap_child(false) == true)
|
|
;
|
|
|
|
/* must we wait for a dead child? */
|
|
if (parallel_jobs >= user_opts.jobs)
|
|
reap_child(true);
|
|
|
|
/* set this before we start the job */
|
|
parallel_jobs++;
|
|
|
|
/* Ensure stdio state is quiesced before forking */
|
|
fflush(NULL);
|
|
|
|
#ifndef WIN32
|
|
child = fork();
|
|
if (child == 0)
|
|
/* use _exit to skip atexit() functions */
|
|
_exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
|
|
else if (child < 0)
|
|
/* fork failed */
|
|
pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
|
|
#else
|
|
if (thread_handles == NULL)
|
|
{
|
|
int i;
|
|
|
|
thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
|
|
exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
|
|
|
|
/*
|
|
* For safety and performance, we keep the args allocated during
|
|
* the entire life of the process, and we don't free the args
|
|
* in a thread different from the one that allocated it.
|
|
*/
|
|
for (i = 0; i < user_opts.jobs; i++)
|
|
exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg));
|
|
}
|
|
|
|
/* use first empty array element */
|
|
new_arg = exec_thread_args[parallel_jobs-1];
|
|
|
|
/* Can only pass one pointer into the function, so use a struct */
|
|
strcpy(new_arg->log_file, log_file);
|
|
strcpy(new_arg->opt_log_file, opt_log_file);
|
|
strcpy(new_arg->cmd, cmd);
|
|
|
|
child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
|
|
new_arg, 0, NULL);
|
|
if (child == 0)
|
|
pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
|
|
|
|
thread_handles[parallel_jobs-1] = child;
|
|
#endif
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
#ifdef WIN32
|
|
DWORD
|
|
win32_exec_prog(exec_thread_arg *args)
|
|
{
|
|
int ret;
|
|
|
|
ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
|
|
|
|
/* terminates thread */
|
|
return ret;
|
|
}
|
|
#endif
|
|
|
|
|
|
/*
|
|
* parallel_transfer_all_new_dbs
|
|
*
|
|
* This has the same API as transfer_all_new_dbs, except it does parallel execution
|
|
* by transfering multiple tablespaces in parallel
|
|
*/
|
|
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
|
|
char *old_pgdata, char *new_pgdata,
|
|
char *old_tablespace)
|
|
{
|
|
#ifndef WIN32
|
|
pid_t child;
|
|
#else
|
|
HANDLE child;
|
|
transfer_thread_arg *new_arg;
|
|
#endif
|
|
|
|
if (user_opts.jobs <= 1)
|
|
/* throw_error must be true to allow jobs */
|
|
transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
|
|
else
|
|
{
|
|
/* parallel */
|
|
#ifdef WIN32
|
|
cur_thread_args = (void **)transfer_thread_args;
|
|
#endif
|
|
/* harvest any dead children */
|
|
while (reap_child(false) == true)
|
|
;
|
|
|
|
/* must we wait for a dead child? */
|
|
if (parallel_jobs >= user_opts.jobs)
|
|
reap_child(true);
|
|
|
|
/* set this before we start the job */
|
|
parallel_jobs++;
|
|
|
|
/* Ensure stdio state is quiesced before forking */
|
|
fflush(NULL);
|
|
|
|
#ifndef WIN32
|
|
child = fork();
|
|
if (child == 0)
|
|
{
|
|
transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
|
|
old_tablespace);
|
|
/* if we take another exit path, it will be non-zero */
|
|
/* use _exit to skip atexit() functions */
|
|
_exit(0);
|
|
}
|
|
else if (child < 0)
|
|
/* fork failed */
|
|
pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
|
|
#else
|
|
if (thread_handles == NULL)
|
|
{
|
|
int i;
|
|
|
|
thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
|
|
transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
|
|
|
|
/*
|
|
* For safety and performance, we keep the args allocated during
|
|
* the entire life of the process, and we don't free the args
|
|
* in a thread different from the one that allocated it.
|
|
*/
|
|
for (i = 0; i < user_opts.jobs; i++)
|
|
transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg));
|
|
}
|
|
|
|
/* use first empty array element */
|
|
new_arg = transfer_thread_args[parallel_jobs-1];
|
|
|
|
/* Can only pass one pointer into the function, so use a struct */
|
|
new_arg->old_db_arr = old_db_arr;
|
|
new_arg->new_db_arr = new_db_arr;
|
|
strcpy(new_arg->old_pgdata, old_pgdata);
|
|
strcpy(new_arg->new_pgdata, new_pgdata);
|
|
strcpy(new_arg->old_tablespace, old_tablespace);
|
|
|
|
child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
|
|
new_arg, 0, NULL);
|
|
if (child == 0)
|
|
pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
|
|
|
|
thread_handles[parallel_jobs-1] = child;
|
|
#endif
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
#ifdef WIN32
|
|
DWORD
|
|
win32_transfer_all_new_dbs(transfer_thread_arg *args)
|
|
{
|
|
transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
|
|
args->new_pgdata, args->old_tablespace);
|
|
|
|
/* terminates thread */
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
|
|
/*
|
|
* collect status from a completed worker child
|
|
*/
|
|
bool
|
|
reap_child(bool wait_for_child)
|
|
{
|
|
#ifndef WIN32
|
|
int work_status;
|
|
int ret;
|
|
#else
|
|
int thread_num;
|
|
DWORD res;
|
|
#endif
|
|
|
|
if (user_opts.jobs <= 1 || parallel_jobs == 0)
|
|
return false;
|
|
|
|
#ifndef WIN32
|
|
ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
|
|
|
|
/* no children or, for WNOHANG, no dead children */
|
|
if (ret <= 0 || !WIFEXITED(work_status))
|
|
return false;
|
|
|
|
if (WEXITSTATUS(work_status) != 0)
|
|
pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
|
|
|
|
#else
|
|
/* wait for one to finish */
|
|
thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
|
|
false, wait_for_child ? INFINITE : 0);
|
|
|
|
if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
|
|
return false;
|
|
|
|
/* compute thread index in active_threads */
|
|
thread_num -= WAIT_OBJECT_0;
|
|
|
|
/* get the result */
|
|
GetExitCodeThread(thread_handles[thread_num], &res);
|
|
if (res != 0)
|
|
pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
|
|
|
|
/* dispose of handle to stop leaks */
|
|
CloseHandle(thread_handles[thread_num]);
|
|
|
|
/* Move last slot into dead child's position */
|
|
if (thread_num != parallel_jobs - 1)
|
|
{
|
|
void *tmp_args;
|
|
|
|
thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
|
|
|
|
/*
|
|
* We must swap the arg struct pointers because the thread we
|
|
* just moved is active, and we must make sure it is not
|
|
* reused by the next created thread. Instead, the new thread
|
|
* will use the arg struct of the thread that just died.
|
|
*/
|
|
tmp_args = cur_thread_args[thread_num];
|
|
cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
|
|
cur_thread_args[parallel_jobs - 1] = tmp_args;
|
|
}
|
|
#endif
|
|
|
|
/* do this after job has been removed */
|
|
parallel_jobs--;
|
|
|
|
return true;
|
|
}
|