diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index bfd023f3e1..634b4444f9 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -35,9 +35,11 @@ * the required action (dump or restore) and returns a malloc'd status string. * The status string is passed back to the master where it is interpreted by * AH->MasterEndParallelItemPtr, another format-specific routine. That - * function can update state or catalog information on the master's side, + * function can update format-specific information on the master's side, * depending on the reply from the worker process. In the end it returns a - * status code, which is 0 for successful execution. + * status code, which we pass to the ParallelCompletionPtr callback function + * that was passed to DispatchJobForTocEntry(). The callback function does + * state updating for the master control logic in pg_backup_archiver.c. * * Remember that we have forked off the workers only after we have read in * the catalog. That's why our worker processes can also access the catalog @@ -48,13 +50,8 @@ * In the master process, the workerStatus field for each worker has one of * the following values: * WRKR_IDLE: it's waiting for a command - * WRKR_WORKING: it's been sent a command - * WRKR_FINISHED: it's returned a result + * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended - * The FINISHED state indicates that the worker is idle, but we've not yet - * dealt with the status code it returned from the prior command. - * ReapWorkerStatus() extracts the unhandled command status value and sets - * the workerStatus back to WRKR_IDLE. */ #include "postgres_fe.h" @@ -79,6 +76,8 @@ #define PIPE_READ 0 #define PIPE_WRITE 1 +#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ + #ifdef WIN32 /* @@ -175,9 +174,12 @@ static void setup_cancel_handler(void); static void set_cancel_pstate(ParallelState *pstate); static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH); static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); +static int GetIdleWorker(ParallelState *pstate); static bool HasEveryWorkerTerminated(ParallelState *pstate); static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); +static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, + bool do_wait); static char *getMessageFromMaster(int pipefd[2]); static void sendMessageToMaster(int pipefd[2], const char *str); static int select_loop(int maxFd, fd_set *workerset); @@ -349,8 +351,8 @@ archive_close_connection(int code, void *arg) * fail to detect it because there would be no EOF condition on * the other end of the pipe.) */ - if (slot->args->AH) - DisconnectDatabase(&(slot->args->AH->public)); + if (slot->AH) + DisconnectDatabase(&(slot->AH->public)); #ifdef WIN32 closesocket(slot->pipeRevRead); @@ -407,7 +409,7 @@ ShutdownWorkersHard(ParallelState *pstate) EnterCriticalSection(&signal_info_lock); for (i = 0; i < pstate->numWorkers; i++) { - ArchiveHandle *AH = pstate->parallelSlot[i].args->AH; + ArchiveHandle *AH = pstate->parallelSlot[i].AH; char errbuf[1]; if (AH != NULL && AH->connCancel != NULL) @@ -634,7 +636,7 @@ consoleHandler(DWORD dwCtrlType) for (i = 0; i < signal_info.pstate->numWorkers; i++) { ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]); - ArchiveHandle *AH = slot->args->AH; + ArchiveHandle *AH = slot->AH; HANDLE hThread = (HANDLE) slot->hThread; /* @@ -789,7 +791,7 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) EnterCriticalSection(&signal_info_lock); #endif - slot->args->AH = AH; + slot->AH = AH; #ifdef WIN32 LeaveCriticalSection(&signal_info_lock); @@ -935,9 +937,10 @@ ParallelBackupStart(ArchiveHandle *AH) strerror(errno)); slot->workerStatus = WRKR_IDLE; - slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); - slot->args->AH = NULL; - slot->args->te = NULL; + slot->AH = NULL; + slot->te = NULL; + slot->callback = NULL; + slot->callback_data = NULL; /* master's ends of the pipes */ slot->pipeRead = pipeWM[PIPE_READ]; @@ -1071,20 +1074,28 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) } /* - * Dispatch a job to some free worker (caller must ensure there is one!) + * Dispatch a job to some free worker. * * te is the TocEntry to be processed, act is the action to be taken on it. + * callback is the function to call on completion of the job. + * + * If no worker is currently available, this will block, and previously + * registered callback functions may be called. */ void -DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, - T_Action act) +DispatchJobForTocEntry(ArchiveHandle *AH, + ParallelState *pstate, + TocEntry *te, + T_Action act, + ParallelCompletionPtr callback, + void *callback_data) { int worker; char *arg; - /* our caller makes sure that at least one worker is idle */ - worker = GetIdleWorker(pstate); - Assert(worker != NO_SLOT); + /* Get a worker, waiting if none are idle */ + while ((worker = GetIdleWorker(pstate)) == NO_SLOT) + WaitForWorkers(AH, pstate, WFW_ONE_IDLE); /* Construct and send command string */ arg = (AH->MasterStartParallelItemPtr) (AH, te, act); @@ -1095,14 +1106,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; - pstate->parallelSlot[worker].args->te = te; + pstate->parallelSlot[worker].te = te; + pstate->parallelSlot[worker].callback = callback; + pstate->parallelSlot[worker].callback_data = callback_data; } /* * Find an idle worker and return its slot number. * Return NO_SLOT if none are idle. */ -int +static int GetIdleWorker(ParallelState *pstate) { int i; @@ -1274,17 +1287,16 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) * immediately if there is none available. * * When we get a status message, we let MasterEndParallelItemPtr process it, - * then save the resulting status code and switch the worker's state to - * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify - * that the status was "OK" and push the worker back to IDLE state. + * then pass the resulting status code to the callback function that was + * specified to DispatchJobForTocEntry, then reset the worker status to IDLE. * - * XXX Rube Goldberg would be proud of this API, but no one else should be. + * Returns true if we collected a status message, else false. * * XXX is it worth checking for more than one status message per call? * It seems somewhat unlikely that multiple workers would finish at exactly * the same time. */ -void +static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) { int worker; @@ -1298,34 +1310,39 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) /* If do_wait is true, we must have detected EOF on some socket */ if (do_wait) exit_horribly(modulename, "a worker process died unexpectedly\n"); - return; + return false; } /* Process it and update our idea of the worker's status */ if (messageStartsWith(msg, "OK ")) { - TocEntry *te = pstate->parallelSlot[worker].args->te; + ParallelSlot *slot = &pstate->parallelSlot[worker]; + TocEntry *te = slot->te; char *statusString; + int status; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); - pstate->parallelSlot[worker].status = + status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_RESTORE); + slot->callback(AH, te, status, slot->callback_data); } else if (messageStartsWith(msg, "OK DUMP ")) { statusString = msg + strlen("OK DUMP "); - pstate->parallelSlot[worker].status = + status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_DUMP); + slot->callback(AH, te, status, slot->callback_data); } else exit_horribly(modulename, "invalid message received from worker: \"%s\"\n", msg); - pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; + slot->workerStatus = WRKR_IDLE; + slot->te = NULL; } else exit_horribly(modulename, @@ -1334,110 +1351,79 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) /* Free the string returned from getMessageFromWorker */ free(msg); + + return true; } /* - * Check to see if any worker is in WRKR_FINISHED state. If so, - * return its command status code into *status, reset it to IDLE state, - * and return its slot number. Otherwise return NO_SLOT. + * Check for status results from workers, waiting if necessary. * - * This function is executed in the master process. - */ -int -ReapWorkerStatus(ParallelState *pstate, int *status) -{ - int i; - - for (i = 0; i < pstate->numWorkers; i++) - { - if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) - { - *status = pstate->parallelSlot[i].status; - pstate->parallelSlot[i].status = 0; - pstate->parallelSlot[i].workerStatus = WRKR_IDLE; - return i; - } - } - return NO_SLOT; -} - -/* - * Wait, if necessary, until we have at least one idle worker. - * Reap worker status as necessary to move FINISHED workers to IDLE state. + * Available wait modes are: + * WFW_NO_WAIT: reap any available status, but don't block + * WFW_GOT_STATUS: wait for at least one more worker to finish + * WFW_ONE_IDLE: wait for at least one worker to be idle + * WFW_ALL_IDLE: wait for all workers to be idle * - * We assume that no extra processing is required when reaping a finished - * command, except for checking that the status was OK (zero). - * Caution: that assumption means that this function can only be used in - * parallel dump, not parallel restore, because the latter has a more - * complex set of rules about handling status. + * Any received results are passed to MasterEndParallelItemPtr and then + * to the callback specified to DispatchJobForTocEntry. * * This function is executed in the master process. */ void -EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate) +WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) { - int ret_worker; - int work_status; + bool do_wait = false; + + /* + * In GOT_STATUS mode, always block waiting for a message, since we can't + * return till we get something. In other modes, we don't block the first + * time through the loop. + */ + if (mode == WFW_GOT_STATUS) + { + /* Assert that caller knows what it's doing */ + Assert(!IsEveryWorkerIdle(pstate)); + do_wait = true; + } for (;;) { - int nTerm = 0; - - while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) + /* + * Check for status messages, even if we don't need to block. We do + * not try very hard to reap all available messages, though, since + * there's unlikely to be more than one. + */ + if (ListenToWorkers(AH, pstate, do_wait)) { - if (work_status != 0) - exit_horribly(modulename, "error processing a parallel work item\n"); - - nTerm++; + /* + * If we got a message, we are done by definition for GOT_STATUS + * mode, and we can also be certain that there's at least one idle + * worker. So we're done in all but ALL_IDLE mode. + */ + if (mode != WFW_ALL_IDLE) + return; } - /* - * We need to make sure that we have an idle worker before dispatching - * the next item. If nTerm > 0 we already have that (quick check). - */ - if (nTerm > 0) - return; + /* Check whether we must wait for new status messages */ + switch (mode) + { + case WFW_NO_WAIT: + return; /* never wait */ + case WFW_GOT_STATUS: + Assert(false); /* can't get here, because we waited */ + break; + case WFW_ONE_IDLE: + if (GetIdleWorker(pstate) != NO_SLOT) + return; + break; + case WFW_ALL_IDLE: + if (IsEveryWorkerIdle(pstate)) + return; + break; + } - /* explicit check for an idle worker */ - if (GetIdleWorker(pstate) != NO_SLOT) - return; - - /* - * If we have no idle worker, read the result of one or more workers - * and loop the loop to call ReapWorkerStatus() on them - */ - ListenToWorkers(AH, pstate, true); - } -} - -/* - * Wait for all workers to be idle. - * Reap worker status as necessary to move FINISHED workers to IDLE state. - * - * We assume that no extra processing is required when reaping a finished - * command, except for checking that the status was OK (zero). - * Caution: that assumption means that this function can only be used in - * parallel dump, not parallel restore, because the latter has a more - * complex set of rules about handling status. - * - * This function is executed in the master process. - */ -void -EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) -{ - int work_status; - - if (!pstate || pstate->numWorkers == 1) - return; - - /* Waiting for the remaining worker processes to finish */ - while (!IsEveryWorkerIdle(pstate)) - { - if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT) - ListenToWorkers(AH, pstate, true); - else if (work_status != 0) - exit_horribly(modulename, - "error processing a parallel work item\n"); + /* Loop back, and this time wait for something to happen */ + do_wait = true; } } diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h index 21739ca87c..8ee629b106 100644 --- a/src/bin/pg_dump/parallel.h +++ b/src/bin/pg_dump/parallel.h @@ -2,14 +2,11 @@ * * parallel.h * - * Parallel support header file for the pg_dump archiver + * Parallel support for pg_dump and pg_restore * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * The author is not responsible for loss or damages that may - * result from its use. - * * IDENTIFICATION * src/bin/pg_dump/parallel.h * @@ -21,31 +18,53 @@ #include "pg_backup_archiver.h" +/* Function to call in master process on completion of a worker task */ +typedef void (*ParallelCompletionPtr) (ArchiveHandle *AH, + TocEntry *te, + int status, + void *callback_data); + +/* Wait options for WaitForWorkers */ +typedef enum +{ + WFW_NO_WAIT, + WFW_GOT_STATUS, + WFW_ONE_IDLE, + WFW_ALL_IDLE +} WFW_WaitOption; + +/* Worker process statuses */ typedef enum { - WRKR_TERMINATED = 0, WRKR_IDLE, WRKR_WORKING, - WRKR_FINISHED + WRKR_TERMINATED } T_WorkerStatus; -/* Arguments needed for a worker process */ -typedef struct ParallelArgs -{ - ArchiveHandle *AH; - TocEntry *te; -} ParallelArgs; - -/* State for each parallel activity slot */ +/* + * Per-parallel-worker state of parallel.c. + * + * Much of this is valid only in the master process (or, on Windows, should + * be touched only by the master thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ typedef struct ParallelSlot { - ParallelArgs *args; - T_WorkerStatus workerStatus; - int status; + T_WorkerStatus workerStatus; /* see enum above */ + + /* These fields are valid if workerStatus == WRKR_WORKING: */ + TocEntry *te; /* item being worked on */ + ParallelCompletionPtr callback; /* function to call on completion */ + void *callback_data; /* passthru data for it */ + + ArchiveHandle *AH; /* Archive data worker is using */ + int pipeRead; /* master's end of the pipes */ int pipeWrite; int pipeRevRead; /* child's end of the pipes */ int pipeRevWrite; + + /* Child process/thread identity info: */ #ifdef WIN32 uintptr_t hThread; unsigned int threadId; @@ -54,12 +73,11 @@ typedef struct ParallelSlot #endif } ParallelSlot; -#define NO_SLOT (-1) - +/* Overall state for parallel.c */ typedef struct ParallelState { - int numWorkers; - ParallelSlot *parallelSlot; + int numWorkers; /* allowed number of workers */ + ParallelSlot *parallelSlot; /* array of numWorkers slots */ } ParallelState; #ifdef WIN32 @@ -69,17 +87,17 @@ extern DWORD mainThreadId; extern void init_parallel_dump_utils(void); -extern int GetIdleWorker(ParallelState *pstate); extern bool IsEveryWorkerIdle(ParallelState *pstate); -extern void ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait); -extern int ReapWorkerStatus(ParallelState *pstate, int *status); -extern void EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate); -extern void EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate); +extern void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, + WFW_WaitOption mode); extern ParallelState *ParallelBackupStart(ArchiveHandle *AH); extern void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, - TocEntry *te, T_Action act); + TocEntry *te, + T_Action act, + ParallelCompletionPtr callback, + void *callback_data); extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate); extern void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index a69b06f6d7..e19c24aec9 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -97,9 +97,14 @@ static void par_list_remove(TocEntry *te); static TocEntry *get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ParallelState *pstate); -static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, - int worker, int status, - ParallelState *pstate); +static void mark_dump_job_done(ArchiveHandle *AH, + TocEntry *te, + int status, + void *callback_data); +static void mark_restore_job_done(ArchiveHandle *AH, + TocEntry *te, + int status, + void *callback_data); static void fix_dependencies(ArchiveHandle *AH); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); @@ -2355,8 +2360,8 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) * If we are in a parallel backup, then we are always the master * process. Dispatch each data-transfer job to a worker. */ - EnsureIdleWorker(AH, pstate); - DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP); + DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP, + mark_dump_job_done, NULL); } else WriteDataChunksForTocEntry(AH, te); @@ -2365,9 +2370,32 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) /* * If parallel, wait for workers to finish. */ - EnsureWorkersFinished(AH, pstate); + if (pstate && pstate->numWorkers > 1) + WaitForWorkers(AH, pstate, WFW_ALL_IDLE); } + +/* + * Callback function that's invoked in the master process after a step has + * been parallel dumped. + * + * We don't need to do anything except check for worker failure. + */ +static void +mark_dump_job_done(ArchiveHandle *AH, + TocEntry *te, + int status, + void *callback_data) +{ + ahlog(AH, 1, "finished item %d %s %s\n", + te->dumpId, te->desc, te->tag); + + if (status != 0) + exit_horribly(modulename, "worker process failed: exit code %d\n", + status); +} + + void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te) { @@ -2751,9 +2779,9 @@ _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt) return 0; } - if (ropt->schemaExcludeNames.head != NULL - && te->namespace - && simple_string_list_member(&ropt->schemaExcludeNames, te->namespace)) + if (ropt->schemaExcludeNames.head != NULL && + te->namespace && + simple_string_list_member(&ropt->schemaExcludeNames, te->namespace)) return 0; if (ropt->selTypes) @@ -3769,11 +3797,9 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - int work_status; bool skipped_some; TocEntry ready_list; TocEntry *next_work_item; - int ret_child; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); @@ -3850,54 +3876,29 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, par_list_remove(next_work_item); - DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE); + DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, + mark_restore_job_done, &ready_list); } else { /* at least one child is working and we have nothing ready. */ } - for (;;) - { - int nTerm = 0; - - /* - * In order to reduce dependencies as soon as possible and - * especially to reap the status of workers who are working on - * items that pending items depend on, we do a non-blocking check - * for ended workers first. - * - * However, if we do not have any other work items currently that - * workers can work on, we do not busy-loop here but instead - * really wait for at least one worker to terminate. Hence we call - * ListenToWorkers(..., ..., do_wait = true) in this case. - */ - ListenToWorkers(AH, pstate, !next_work_item); - - while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) - { - nTerm++; - mark_work_done(AH, &ready_list, ret_child, work_status, pstate); - } - - /* - * We need to make sure that we have an idle worker before - * re-running the loop. If nTerm > 0 we already have that (quick - * check). - */ - if (nTerm > 0) - break; - - /* if nobody terminated, explicitly check for an idle worker */ - if (GetIdleWorker(pstate) != NO_SLOT) - break; - - /* - * If we have no idle worker, read the result of one or more - * workers and loop the loop to call ReapWorkerStatus() on them. - */ - ListenToWorkers(AH, pstate, true); - } + /* + * Before dispatching another job, check to see if anything has + * finished. We should check every time through the loop so as to + * reduce dependencies as soon as possible. If we were unable to + * dispatch any job this time through, wait until some worker finishes + * (and, hopefully, unblocks some pending item). If we did dispatch + * something, continue as soon as there's at least one idle worker. + * Note that in either case, there's guaranteed to be at least one + * idle worker when we return to the top of the loop. This ensures we + * won't block inside DispatchJobForTocEntry, which would be + * undesirable: we'd rather postpone dispatching until we see what's + * been unblocked by finished jobs. + */ + WaitForWorkers(AH, pstate, + next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS); } ahlog(AH, 1, "finished main parallel loop\n"); @@ -4025,9 +4026,11 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, int count = 0; for (k = 0; k < pstate->numWorkers; k++) - if (pstate->parallelSlot[k].args->te != NULL && - pstate->parallelSlot[k].args->te->section == SECTION_DATA) + { + if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING && + pstate->parallelSlot[k].te->section == SECTION_DATA) count++; + } if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) pref_non_data = false; } @@ -4044,13 +4047,13 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ - for (i = 0; i < pstate->numWorkers && !conflicts; i++) + for (i = 0; i < pstate->numWorkers; i++) { TocEntry *running_te; if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) continue; - running_te = pstate->parallelSlot[i].args->te; + running_te = pstate->parallelSlot[i].te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) @@ -4091,10 +4094,8 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * our work is finished, the master process will assign us a new work item. */ int -parallel_restore(ParallelArgs *args) +parallel_restore(ArchiveHandle *AH, TocEntry *te) { - ArchiveHandle *AH = args->AH; - TocEntry *te = args->te; int status; Assert(AH->connection != NULL); @@ -4110,22 +4111,18 @@ parallel_restore(ParallelArgs *args) /* - * Housekeeping to be done after a step has been parallel restored. + * Callback function that's invoked in the master process after a step has + * been parallel restored. * - * Clear the appropriate slot, free all the extra memory we allocated, - * update status, and reduce the dependency count of any dependent items. + * Update status and reduce the dependency count of any dependent items. */ static void -mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, - int worker, int status, - ParallelState *pstate) +mark_restore_job_done(ArchiveHandle *AH, + TocEntry *te, + int status, + void *callback_data) { - TocEntry *te = NULL; - - te = pstate->parallelSlot[worker].args->te; - - if (te == NULL) - exit_horribly(modulename, "could not find slot of finished worker\n"); + TocEntry *ready_list = (TocEntry *) callback_data; ahlog(AH, 1, "finished item %d %s %s\n", te->dumpId, te->desc, te->tag); diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 0376f2bff7..123aa5dc84 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -111,7 +111,6 @@ typedef z_stream *z_streamp; typedef struct _archiveHandle ArchiveHandle; typedef struct _tocEntry TocEntry; -struct ParallelArgs; struct ParallelState; #define READ_ERROR_EXIT(fd) \ @@ -375,7 +374,7 @@ struct _tocEntry int nLockDeps; /* number of such dependencies */ }; -extern int parallel_restore(struct ParallelArgs *args); +extern int parallel_restore(ArchiveHandle *AH, TocEntry *te); extern void on_exit_close_archive(Archive *AHX); extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4); diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 66329dc90c..c4f487a7ca 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -820,13 +820,9 @@ _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) */ const int buflen = 64; char *buf = (char *) pg_malloc(buflen); - ParallelArgs pargs; int status; - pargs.AH = AH; - pargs.te = te; - - status = parallel_restore(&pargs); + status = parallel_restore(AH, te); snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index e52f12258a..8071259acb 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -826,13 +826,9 @@ _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te) */ const int buflen = 64; char *buf = (char *) pg_malloc(buflen); - ParallelArgs pargs; int status; - pargs.AH = AH; - pargs.te = te; - - status = parallel_restore(&pargs); + status = parallel_restore(AH, te); snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);