diff --git a/contrib/pg_upgrade/Makefile b/contrib/pg_upgrade/Makefile index dec57a6130..bbb14a1b66 100644 --- a/contrib/pg_upgrade/Makefile +++ b/contrib/pg_upgrade/Makefile @@ -5,7 +5,7 @@ PGAPPICON = win32 PROGRAM = pg_upgrade OBJS = check.o controldata.o dump.o exec.o file.o function.o info.o \ - option.o page.o pg_upgrade.o relfilenode.o server.o \ + option.o page.o parallel.o pg_upgrade.o relfilenode.o server.o \ tablespace.o util.o version.o version_old_8_3.o $(WIN32RES) PG_CPPFLAGS = -DFRONTEND -DDLSUFFIX=\"$(DLSUFFIX)\" -I$(srcdir) -I$(libpq_srcdir) diff --git a/contrib/pg_upgrade/dump.c b/contrib/pg_upgrade/dump.c index f35852b5f0..a4b0127be9 100644 --- a/contrib/pg_upgrade/dump.c +++ b/contrib/pg_upgrade/dump.c @@ -33,18 +33,23 @@ generate_old_dump(void) /* create per-db dump files */ for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - char file_name[MAXPGPATH]; + char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); - snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); - exec_prog(RESTORE_LOG_FILE, NULL, true, + parallel_exec_prog(log_file_name, NULL, "\"%s/pg_dump\" %s --schema-only --binary-upgrade --format=custom %s --file=\"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&old_cluster), - log_opts.verbose ? "--verbose" : "", file_name, old_db->db_name); + log_opts.verbose ? "--verbose" : "", sql_file_name, old_db->db_name); } + /* reap all children */ + while (reap_child(true) == true) + ; + end_progress_output(); check_ok(); } diff --git a/contrib/pg_upgrade/option.c b/contrib/pg_upgrade/option.c index 19053fa9d0..88686c5f19 100644 --- a/contrib/pg_upgrade/option.c +++ b/contrib/pg_upgrade/option.c @@ -52,6 +52,7 @@ parseCommandLine(int argc, char *argv[]) {"check", no_argument, NULL, 'c'}, {"link", no_argument, NULL, 'k'}, {"retain", no_argument, NULL, 'r'}, + {"jobs", required_argument, NULL, 'j'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} }; @@ -101,7 +102,7 @@ parseCommandLine(int argc, char *argv[]) if ((log_opts.internal = fopen_priv(INTERNAL_LOG_FILE, "a")) == NULL) pg_log(PG_FATAL, "cannot write to log file %s\n", INTERNAL_LOG_FILE); - while ((option = getopt_long(argc, argv, "d:D:b:B:cko:O:p:P:ru:v", + while ((option = getopt_long(argc, argv, "d:D:b:B:cj:ko:O:p:P:ru:v", long_options, &optindex)) != -1) { switch (option) @@ -128,6 +129,10 @@ parseCommandLine(int argc, char *argv[]) new_cluster.pgconfig = pg_strdup(optarg); break; + case 'j': + user_opts.jobs = atoi(optarg); + break; + case 'k': user_opts.transfer_mode = TRANSFER_MODE_LINK; break; @@ -229,6 +234,7 @@ Options:\n\ -c, --check check clusters only, don't change any data\n\ -d, --old-datadir=OLDDATADIR old cluster data directory\n\ -D, --new-datadir=NEWDATADIR new cluster data directory\n\ + -j, --jobs number of simultaneous processes or threads to use\n\ -k, --link link instead of copying files to new cluster\n\ -o, --old-options=OPTIONS old cluster options to pass to the server\n\ -O, --new-options=OPTIONS new cluster options to pass to the server\n\ diff --git a/contrib/pg_upgrade/parallel.c b/contrib/pg_upgrade/parallel.c new file mode 100644 index 0000000000..5d97e15f19 --- /dev/null +++ b/contrib/pg_upgrade/parallel.c @@ -0,0 +1,218 @@ +/* + * parallel.c + * + * multi-process support + * + * Copyright (c) 2010-2012, PostgreSQL Global Development Group + * contrib/pg_upgrade/parallel.c + */ + +#include "postgres.h" + +#include "pg_upgrade.h" + +#include +#include +#include +#include + +#ifdef WIN32 +#include +#endif + +static int parallel_jobs; + +#ifdef WIN32 +/* + * Array holding all active threads. There can't be any gaps/zeros so + * it can be passed to WaitForMultipleObjects(). We use two arrays + * so the thread_handles array can be passed to WaitForMultipleObjects(). + */ +HANDLE *thread_handles; + +typedef struct { + char log_file[MAXPGPATH]; + char opt_log_file[MAXPGPATH]; + char cmd[MAX_STRING]; +} thread_arg; + +thread_arg **thread_args; + +DWORD win32_exec_prog(thread_arg *args); + +#endif + +/* + * parallel_exec_prog + * + * This has the same API as exec_prog, except it does parallel execution, + * and therefore must throw errors and doesn't return an error status. + */ +void +parallel_exec_prog(const char *log_file, const char *opt_log_file, + const char *fmt,...) +{ + va_list args; + char cmd[MAX_STRING]; +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + thread_arg *new_arg; +#endif + + va_start(args, fmt); + vsnprintf(cmd, sizeof(cmd), fmt, args); + va_end(args); + + if (user_opts.jobs <= 1) + /* throw_error must be true to allow jobs */ + exec_prog(log_file, opt_log_file, true, "%s", cmd); + else + { + /* parallel */ + + /* harvest any dead children */ + while (reap_child(false) == true) + ; + + /* must we wait for a dead child? */ + if (parallel_jobs >= user_opts.jobs) + reap_child(true); + + /* set this before we start the job */ + parallel_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + /* use _exit to skip atexit() functions */ + _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd)); + else if (child < 0) + /* fork failed */ + pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno)); +#else + if (thread_handles == NULL) + { + int i; + + thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); + thread_args = pg_malloc(user_opts.jobs * sizeof(thread_arg *)); + + /* + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args + * in a thread different from the one that allocated it. + */ + for (i = 0; i < user_opts.jobs; i++) + thread_args[i] = pg_malloc(sizeof(thread_arg)); + } + + /* use first empty array element */ + new_arg = thread_args[parallel_jobs-1]; + + /* Can only pass one pointer into the function, so use a struct */ + strcpy(new_arg->log_file, log_file); + strcpy(new_arg->opt_log_file, opt_log_file); + strcpy(new_arg->cmd, cmd); + + child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, + new_arg, 0, NULL); + if (child == 0) + pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); + + thread_handles[parallel_jobs-1] = child; +#endif + } + + return; +} + + +#ifdef WIN32 +DWORD +win32_exec_prog(thread_arg *args) +{ + int ret; + + ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd); + + /* terminates thread */ + return ret; +} +#endif + + +/* + * collect status from a completed worker child + */ +bool +reap_child(bool wait_for_child) +{ +#ifndef WIN32 + int work_status; + int ret; +#else + int thread_num; + DWORD res; +#endif + + if (user_opts.jobs <= 1 || parallel_jobs == 0) + return false; + +#ifndef WIN32 + ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); + + /* no children or, for WNOHANG, no dead children */ + if (ret <= 0 || !WIFEXITED(work_status)) + return false; + + if (WEXITSTATUS(work_status) != 0) + pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno)); + +#else + /* wait for one to finish */ + thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles, + false, wait_for_child ? INFINITE : 0); + + if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED) + return false; + + /* compute thread index in active_threads */ + thread_num -= WAIT_OBJECT_0; + + /* get the result */ + GetExitCodeThread(thread_handles[thread_num], &res); + if (res != 0) + pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno)); + + /* dispose of handle to stop leaks */ + CloseHandle(thread_handles[thread_num]); + + /* Move last slot into dead child's position */ + if (thread_num != parallel_jobs - 1) + { + thread_arg *tmp_args; + + thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; + + /* + * We must swap the arg struct pointers because the thread we + * just moved is active, and we must make sure it is not + * reused by the next created thread. Instead, the new thread + * will use the arg struct of the thread that just died. + */ + tmp_args = thread_args[thread_num]; + thread_args[thread_num] = thread_args[parallel_jobs - 1]; + thread_args[parallel_jobs - 1] = tmp_args; + } +#endif + + /* do this after job has been removed */ + parallel_jobs--; + + return true; +} diff --git a/contrib/pg_upgrade/pg_upgrade.c b/contrib/pg_upgrade/pg_upgrade.c index 2d4b6787f8..8fa64b7edf 100644 --- a/contrib/pg_upgrade/pg_upgrade.c +++ b/contrib/pg_upgrade/pg_upgrade.c @@ -61,7 +61,6 @@ char *output_files[] = { /* unique file for pg_ctl start */ SERVER_START_LOG_FILE, #endif - RESTORE_LOG_FILE, UTILITY_LOG_FILE, INTERNAL_LOG_FILE, NULL @@ -270,7 +269,7 @@ prepare_new_databases(void) * support functions in template1 but pg_dumpall creates database using * the template0 template. */ - exec_prog(RESTORE_LOG_FILE, NULL, true, + exec_prog(UTILITY_LOG_FILE, NULL, true, "\"%s/psql\" " EXEC_PSQL_ARGS " %s -f \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), GLOBALS_DUMP_FILE); @@ -307,22 +306,28 @@ create_new_objects(void) for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - char file_name[MAXPGPATH]; + char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; pg_log(PG_STATUS, "%s", old_db->db_name); - snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); /* * Using pg_restore --single-transaction is faster than other * methods, like --jobs. pg_dump only produces its output at the * end, so there is little parallelism using the pipe. */ - exec_prog(RESTORE_LOG_FILE, NULL, true, + parallel_exec_prog(log_file_name, NULL, "\"%s/pg_restore\" %s --exit-on-error --single-transaction --verbose --dbname \"%s\" \"%s\"", new_cluster.bindir, cluster_conn_opts(&new_cluster), - old_db->db_name, file_name); + old_db->db_name, sql_file_name); } + + /* reap all children */ + while (reap_child(true) == true) + ; + end_progress_output(); check_ok(); @@ -494,11 +499,14 @@ cleanup(void) if (old_cluster.dbarr.dbs) for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - char file_name[MAXPGPATH]; + char sql_file_name[MAXPGPATH], log_file_name[MAXPGPATH]; DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; - snprintf(file_name, sizeof(file_name), DB_DUMP_FILE_MASK, old_db->db_oid); - unlink(file_name); + snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + unlink(sql_file_name); + + snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + unlink(log_file_name); } } } diff --git a/contrib/pg_upgrade/pg_upgrade.h b/contrib/pg_upgrade/pg_upgrade.h index cae1e46c95..81d9d511d0 100644 --- a/contrib/pg_upgrade/pg_upgrade.h +++ b/contrib/pg_upgrade/pg_upgrade.h @@ -32,8 +32,8 @@ #define GLOBALS_DUMP_FILE "pg_upgrade_dump_globals.sql" #define DB_DUMP_FILE_MASK "pg_upgrade_dump_%u.custom" +#define DB_DUMP_LOG_FILE_MASK "pg_upgrade_dump_%u.log" #define SERVER_LOG_FILE "pg_upgrade_server.log" -#define RESTORE_LOG_FILE "pg_upgrade_restore.log" #define UTILITY_LOG_FILE "pg_upgrade_utility.log" #define INTERNAL_LOG_FILE "pg_upgrade_internal.log" @@ -264,6 +264,7 @@ typedef struct bool check; /* TRUE -> ask user for permission to make * changes */ transferMode transfer_mode; /* copy files or link them? */ + int jobs; } UserOpts; @@ -461,3 +462,11 @@ void old_8_3_invalidate_hash_gin_indexes(ClusterInfo *cluster, bool check_mode) void old_8_3_invalidate_bpchar_pattern_ops_indexes(ClusterInfo *cluster, bool check_mode); char *old_8_3_create_sequence_script(ClusterInfo *cluster); + +/* parallel.c */ +void parallel_exec_prog(const char *log_file, const char *opt_log_file, + const char *fmt,...) +__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4))); + +bool reap_child(bool wait_for_child); + diff --git a/doc/src/sgml/pgupgrade.sgml b/doc/src/sgml/pgupgrade.sgml index 998cb2fc9a..53781e45ed 100644 --- a/doc/src/sgml/pgupgrade.sgml +++ b/doc/src/sgml/pgupgrade.sgml @@ -112,6 +112,13 @@ variable PGDATANEW + + + + number of simultaneous processes or threads to use + + + @@ -331,10 +338,15 @@ NET STOP pgsql-8.3 (PostgreSQL 8.3 and older used a different s requires that the old and new cluster data directories be in the same file system. See pg_upgrade --help for a full list of options. - + - - + + The For Windows users, you must be logged into an administrative account, and