mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-11-27 07:21:09 +08:00
Make standby server continuously retry restoring the next WAL segment with
restore_command, if the connection to the primary server is lost. This ensures that the standby can recover automatically, if the connection is lost for a long time and standby falls behind so much that the required WAL segments have been archived and deleted in the master. This also makes standby_mode useful without streaming replication; the server will keep retrying restore_command every few seconds until the trigger file is found. That's the same basic functionality pg_standby offers, but without the bells and whistles. To implement that, refactor the ReadRecord/FetchRecord functions. The FetchRecord() function introduced in the original streaming replication patch is removed, and all the retry logic is now in a new function called XLogReadPage(). XLogReadPage() is now responsible for executing restore_command, launching walreceiver, and waiting for new WAL to arrive from primary, as required. This also changes the life cycle of walreceiver. When launched, it now only tries to connect to the master once, and exits if the connection fails, or is lost during streaming for any reason. The startup process detects the death, and re-launches walreceiver if necessary.
This commit is contained in:
parent
ab13d1e925
commit
1bb2558046
@ -7,7 +7,7 @@
|
||||
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -143,16 +143,6 @@ HotStandbyState standbyState = STANDBY_DISABLED;
|
||||
|
||||
static XLogRecPtr LastRec;
|
||||
|
||||
/*
|
||||
* Are we doing recovery from XLOG stream? If so, we recover without using
|
||||
* offline XLOG archives even though InArchiveRecovery==true. This flag is
|
||||
* used only in standby mode.
|
||||
*/
|
||||
static bool InStreamingRecovery = false;
|
||||
|
||||
/* The current log page is partially-filled, and so needs to be read again? */
|
||||
static bool needReread = false;
|
||||
|
||||
/*
|
||||
* Local copy of SharedRecoveryInProgress variable. True actually means "not
|
||||
* known, need to check the shared state".
|
||||
@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
|
||||
* These variables are used similarly to the ones above, but for reading
|
||||
* the XLOG. Note, however, that readOff generally represents the offset
|
||||
* of the page just read, not the seek position of the FD itself, which
|
||||
* will be just past that page.
|
||||
* will be just past that page. readLen indicates how much of the current
|
||||
* page has been read into readBuf.
|
||||
*/
|
||||
static int readFile = -1;
|
||||
static uint32 readId = 0;
|
||||
static uint32 readSeg = 0;
|
||||
static uint32 readOff = 0;
|
||||
static uint32 readLen = 0;
|
||||
/* Is the currently open segment being streamed from primary? */
|
||||
static bool readStreamed = false;
|
||||
|
||||
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
|
||||
static char *readBuf = NULL;
|
||||
@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
|
||||
/* State information for XLOG reading */
|
||||
static XLogRecPtr ReadRecPtr; /* start of last record read */
|
||||
static XLogRecPtr EndRecPtr; /* end+1 of last record read */
|
||||
static XLogRecord *nextRecord = NULL;
|
||||
static TimeLineID lastPageTLI = 0;
|
||||
|
||||
static XLogRecPtr minRecoveryPoint; /* local copy of
|
||||
@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
|
||||
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
|
||||
bool find_free, int *max_advance,
|
||||
bool use_lock);
|
||||
static int XLogFileRead(uint32 log, uint32 seg, int emode);
|
||||
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
|
||||
bool fromArchive, bool notexistOk);
|
||||
static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
|
||||
bool fromArchive);
|
||||
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
|
||||
bool randAccess);
|
||||
static void XLogFileClose(void);
|
||||
static bool RestoreArchivedFile(char *path, const char *xlogfname,
|
||||
const char *recovername, off_t expectedSize);
|
||||
@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
|
||||
static void ValidateXLOGDirectoryStructure(void);
|
||||
static void CleanupBackupHistory(void);
|
||||
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
|
||||
static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
|
||||
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
|
||||
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
|
||||
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
|
||||
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
|
||||
static List *readTimeLineHistory(TimeLineID targetTLI);
|
||||
@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
|
||||
static void WriteControlFile(void);
|
||||
static void ReadControlFile(void);
|
||||
static char *str_time(pg_time_t tnow);
|
||||
static bool CheckForStandbyTrigger(void);
|
||||
|
||||
#ifdef WAL_DEBUG
|
||||
static void xlog_outrec(StringInfo buf, XLogRecord *record);
|
||||
@ -2586,13 +2584,72 @@ XLogFileOpen(uint32 log, uint32 seg)
|
||||
|
||||
/*
|
||||
* Open a logfile segment for reading (during recovery).
|
||||
*
|
||||
* If fromArchive is true, the segment is retrieved from archive, otherwise
|
||||
* it's read from pg_xlog.
|
||||
*/
|
||||
static int
|
||||
XLogFileRead(uint32 log, uint32 seg, int emode)
|
||||
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
|
||||
bool fromArchive, bool notfoundOk)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
char xlogfname[MAXFNAMELEN];
|
||||
char activitymsg[MAXFNAMELEN + 16];
|
||||
char path[MAXPGPATH];
|
||||
int fd;
|
||||
|
||||
XLogFileName(xlogfname, tli, log, seg);
|
||||
|
||||
if (fromArchive)
|
||||
{
|
||||
/* Report recovery progress in PS display */
|
||||
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
|
||||
xlogfname);
|
||||
set_ps_display(activitymsg, false);
|
||||
|
||||
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
|
||||
"RECOVERYXLOG",
|
||||
XLogSegSize);
|
||||
if (!restoredFromArchive)
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogFilePath(path, tli, log, seg);
|
||||
restoredFromArchive = false;
|
||||
}
|
||||
|
||||
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
|
||||
if (fd >= 0)
|
||||
{
|
||||
/* Success! */
|
||||
curFileTLI = tli;
|
||||
|
||||
/* Report recovery progress in PS display */
|
||||
snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
|
||||
xlogfname);
|
||||
set_ps_display(activitymsg, false);
|
||||
|
||||
return fd;
|
||||
}
|
||||
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
|
||||
path, log, seg)));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a logfile segment for reading (during recovery).
|
||||
*
|
||||
* This version searches for the segment with any TLI listed in expectedTLIs.
|
||||
* If not in StandbyMode and fromArchive is true, the segment is also
|
||||
* searched in pg_xlog if not found in archive.
|
||||
*/
|
||||
static int
|
||||
XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
ListCell *cell;
|
||||
int fd;
|
||||
|
||||
@ -2613,40 +2670,23 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
|
||||
if (tli < curFileTLI)
|
||||
break; /* don't bother looking at too-old TLIs */
|
||||
|
||||
XLogFileName(xlogfname, tli, log, seg);
|
||||
|
||||
if (InArchiveRecovery && !InStreamingRecovery)
|
||||
{
|
||||
/* Report recovery progress in PS display */
|
||||
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
|
||||
xlogfname);
|
||||
set_ps_display(activitymsg, false);
|
||||
|
||||
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
|
||||
"RECOVERYXLOG",
|
||||
XLogSegSize);
|
||||
}
|
||||
else
|
||||
XLogFilePath(path, tli, log, seg);
|
||||
|
||||
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
|
||||
if (fd >= 0)
|
||||
{
|
||||
/* Success! */
|
||||
curFileTLI = tli;
|
||||
|
||||
/* Report recovery progress in PS display */
|
||||
snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
|
||||
xlogfname);
|
||||
set_ps_display(activitymsg, false);
|
||||
|
||||
fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
|
||||
if (fd != -1)
|
||||
return fd;
|
||||
|
||||
/*
|
||||
* If not in StandbyMode, fall back to searching pg_xlog. In
|
||||
* StandbyMode we're streaming segments from the primary to pg_xlog,
|
||||
* and we mustn't confuse the (possibly partial) segments in pg_xlog
|
||||
* with complete segments ready to be applied. We rather wait for
|
||||
* the records to arrive through streaming.
|
||||
*/
|
||||
if (!StandbyMode && fromArchive)
|
||||
{
|
||||
fd = XLogFileRead(log, seg, emode, tli, false, true);
|
||||
if (fd != -1)
|
||||
return fd;
|
||||
}
|
||||
if (errno != ENOENT) /* unexpected failure? */
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
|
||||
path, log, seg)));
|
||||
}
|
||||
|
||||
/* Couldn't find it. For simplicity, complain about front timeline */
|
||||
@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
|
||||
* different filename that can't be confused with regular XLOG
|
||||
* files.
|
||||
*/
|
||||
if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
|
||||
if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
|
||||
{
|
||||
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
|
||||
|
||||
@ -3473,79 +3513,6 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to fetch an XLOG record.
|
||||
*
|
||||
* If RecPtr is not NULL, try to fetch a record at that position. Otherwise
|
||||
* try to fetch a record just after the last one previously read.
|
||||
*
|
||||
* In standby mode, if we failed in reading a valid record and are not doing
|
||||
* recovery from XLOG stream yet, we ignore the failure and start walreceiver
|
||||
* process to fetch the record from the primary. Otherwise, returns NULL,
|
||||
* or fails if emode is PANIC. (emode must be either PANIC or LOG.)
|
||||
*
|
||||
* If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
|
||||
* this case, if we have to start XLOG streaming, we use RedoStartLSN as the
|
||||
* streaming start position instead of RecPtr.
|
||||
*
|
||||
* The record is copied into readRecordBuf, so that on successful return,
|
||||
* the returned record pointer always points there.
|
||||
*/
|
||||
static XLogRecord *
|
||||
FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
|
||||
{
|
||||
if (StandbyMode && !InStreamingRecovery)
|
||||
{
|
||||
XLogRecord *record;
|
||||
XLogRecPtr startlsn;
|
||||
bool haveNextRecord = (nextRecord != NULL);
|
||||
|
||||
/* An invalid record is OK here, so we set emode to DEBUG2 */
|
||||
record = ReadRecord(RecPtr, DEBUG2);
|
||||
if (record != NULL)
|
||||
return record;
|
||||
|
||||
/*
|
||||
* Start XLOG streaming if there is no more valid records available
|
||||
* in the archive.
|
||||
*
|
||||
* We need to calculate the start position of XLOG streaming. If we
|
||||
* read a record in the middle of a segment which doesn't exist in
|
||||
* pg_xlog, we use the start of the segment as the start position.
|
||||
* That prevents a broken segment (i.e., with no records in the
|
||||
* first half of a segment) from being created by XLOG streaming,
|
||||
* which might cause trouble later on if the segment is e.g
|
||||
* archived.
|
||||
*/
|
||||
startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
|
||||
if (startlsn.xrecoff % XLogSegSize != 0)
|
||||
{
|
||||
char xlogpath[MAXPGPATH];
|
||||
struct stat stat_buf;
|
||||
uint32 log;
|
||||
uint32 seg;
|
||||
|
||||
XLByteToSeg(startlsn, log, seg);
|
||||
XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
|
||||
|
||||
if (stat(xlogpath, &stat_buf) != 0)
|
||||
startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
|
||||
}
|
||||
RequestXLogStreaming(startlsn, PrimaryConnInfo);
|
||||
|
||||
/* Needs to read the current page again if the next record is in it */
|
||||
needReread = haveNextRecord;
|
||||
nextRecord = NULL;
|
||||
|
||||
InStreamingRecovery = true;
|
||||
ereport(LOG,
|
||||
(errmsg("starting streaming recovery at %X/%X",
|
||||
startlsn.xlogid, startlsn.xrecoff)));
|
||||
}
|
||||
|
||||
return ReadRecord(RecPtr, emode);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to read an XLOG record.
|
||||
*
|
||||
@ -3553,13 +3520,13 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
|
||||
* try to read a record just after the last one previously read.
|
||||
*
|
||||
* If no valid record is available, returns NULL, or fails if emode is PANIC.
|
||||
* (emode must be either PANIC, LOG or DEBUG2.)
|
||||
* (emode must be either PANIC, LOG)
|
||||
*
|
||||
* The record is copied into readRecordBuf, so that on successful return,
|
||||
* the returned record pointer always points there.
|
||||
*/
|
||||
static XLogRecord *
|
||||
ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
|
||||
{
|
||||
XLogRecord *record;
|
||||
char *buffer;
|
||||
@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
bool randAccess = false;
|
||||
uint32 len,
|
||||
total_len;
|
||||
uint32 targetPageOff;
|
||||
uint32 targetRecOff;
|
||||
uint32 pageHeaderSize;
|
||||
XLogRecPtr receivedUpto = {0,0};
|
||||
bool finished;
|
||||
int emode;
|
||||
|
||||
/*
|
||||
@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
* should never hit the end of WAL because we wait for it to be streamed.
|
||||
* Therefore treat any broken WAL as PANIC, instead of failing over.
|
||||
*/
|
||||
if (InStreamingRecovery)
|
||||
if (StandbyMode)
|
||||
emode = PANIC;
|
||||
else
|
||||
emode = emode_arg;
|
||||
@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
if (RecPtr == NULL)
|
||||
{
|
||||
RecPtr = &tmpRecPtr;
|
||||
/* fast case if next record is on same page */
|
||||
if (nextRecord != NULL)
|
||||
{
|
||||
record = nextRecord;
|
||||
goto got_record;
|
||||
}
|
||||
|
||||
/*
|
||||
* Align old recptr to next page if the current page is filled and
|
||||
* doesn't need to be read again.
|
||||
* Align recptr to next page if no more records can fit on the
|
||||
* current page.
|
||||
*/
|
||||
if (!needReread)
|
||||
if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
|
||||
{
|
||||
NextLogPage(tmpRecPtr);
|
||||
/* We will account for page header size below */
|
||||
/* We will account for page header size below */
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
randAccess = true; /* allow curFileTLI to go backwards too */
|
||||
}
|
||||
|
||||
if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
|
||||
{
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
}
|
||||
/* Read the page containing the record */
|
||||
if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
|
||||
return NULL;
|
||||
|
||||
/* Is the target record ready yet? */
|
||||
if (InStreamingRecovery)
|
||||
{
|
||||
receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
|
||||
if (finished)
|
||||
{
|
||||
if (emode_arg == PANIC)
|
||||
ereport(PANIC,
|
||||
(errmsg("streaming recovery ended")));
|
||||
else
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
XLByteToSeg(*RecPtr, readId, readSeg);
|
||||
if (readFile < 0)
|
||||
{
|
||||
/* Now it's okay to reset curFileTLI if random fetch */
|
||||
if (randAccess)
|
||||
curFileTLI = 0;
|
||||
|
||||
readFile = XLogFileRead(readId, readSeg, emode);
|
||||
if (readFile < 0)
|
||||
goto next_record_is_invalid;
|
||||
|
||||
/*
|
||||
* Whenever switching to a new WAL segment, we read the first page of
|
||||
* the file and validate its header, even if that's not where the
|
||||
* target record is. This is so that we can check the additional
|
||||
* identification info that is present in the first page's "long"
|
||||
* header.
|
||||
*/
|
||||
readOff = 0;
|
||||
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log file %u, segment %u, offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
|
||||
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
|
||||
if (readOff != targetPageOff || needReread)
|
||||
{
|
||||
readOff = targetPageOff;
|
||||
needReread = false;
|
||||
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log file %u, segment %u to offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log file %u, segment %u, offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
|
||||
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
|
||||
if (targetRecOff == 0)
|
||||
@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
|
||||
}
|
||||
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
|
||||
|
||||
got_record:;
|
||||
|
||||
/*
|
||||
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
|
||||
* required.
|
||||
@ -3838,58 +3725,35 @@ got_record:;
|
||||
}
|
||||
|
||||
buffer = readRecordBuf;
|
||||
nextRecord = NULL;
|
||||
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
|
||||
if (total_len > len)
|
||||
{
|
||||
/* Need to reassemble record */
|
||||
XLogContRecord *contrecord;
|
||||
XLogRecPtr nextpagelsn = *RecPtr;
|
||||
XLogRecPtr pagelsn;
|
||||
uint32 gotlen = len;
|
||||
|
||||
/* Initialize pagelsn to the beginning of the page this record is on */
|
||||
pagelsn = *RecPtr;
|
||||
pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
|
||||
|
||||
memcpy(buffer, record, len);
|
||||
record = (XLogRecord *) buffer;
|
||||
buffer += len;
|
||||
for (;;)
|
||||
{
|
||||
/* Is the next page ready yet? */
|
||||
if (InStreamingRecovery)
|
||||
/* Calculate pointer to beginning of next page */
|
||||
pagelsn.xrecoff += XLOG_BLCKSZ;
|
||||
if (pagelsn.xrecoff >= XLogFileSize)
|
||||
{
|
||||
if (gotlen != len)
|
||||
nextpagelsn.xrecoff += XLOG_BLCKSZ;
|
||||
NextLogPage(nextpagelsn);
|
||||
receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
|
||||
if (finished)
|
||||
{
|
||||
if (emode_arg == PANIC)
|
||||
ereport(PANIC,
|
||||
(errmsg("streaming recovery ended")));
|
||||
else
|
||||
return NULL;
|
||||
}
|
||||
(pagelsn.xlogid)++;
|
||||
pagelsn.xrecoff = 0;
|
||||
}
|
||||
/* Wait for the next page to become available */
|
||||
if (!XLogPageRead(&pagelsn, emode, false, false))
|
||||
return NULL;
|
||||
|
||||
readOff += XLOG_BLCKSZ;
|
||||
if (readOff >= XLogSegSize)
|
||||
{
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
NextLogSeg(readId, readSeg);
|
||||
readFile = XLogFileRead(readId, readSeg, emode);
|
||||
if (readFile < 0)
|
||||
goto next_record_is_invalid;
|
||||
readOff = 0;
|
||||
}
|
||||
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log file %u, segment %u, offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
|
||||
goto next_record_is_invalid;
|
||||
/* Check that the continuation record looks valid */
|
||||
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
|
||||
{
|
||||
ereport(emode,
|
||||
@ -3923,31 +3787,11 @@ got_record:;
|
||||
if (!RecordIsValid(record, *RecPtr, emode))
|
||||
goto next_record_is_invalid;
|
||||
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
|
||||
if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
|
||||
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
|
||||
{
|
||||
nextRecord = (XLogRecord *) ((char *) contrecord +
|
||||
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
|
||||
}
|
||||
EndRecPtr.xlogid = readId;
|
||||
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
|
||||
pageHeaderSize +
|
||||
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
|
||||
|
||||
/*
|
||||
* Check whether the current page needs to be read again. If there is no
|
||||
* unread record in the current page (nextRecord == NULL), obviously we
|
||||
* don't need to reread it. If we're not in streaming recovery mode yet,
|
||||
* partially-filled page doesn't need to be reread because it is the
|
||||
* last valid page.
|
||||
*/
|
||||
if (nextRecord != NULL && InStreamingRecovery &&
|
||||
XLByteLE(receivedUpto, EndRecPtr))
|
||||
{
|
||||
nextRecord = NULL;
|
||||
needReread = true;
|
||||
}
|
||||
|
||||
ReadRecPtr = *RecPtr;
|
||||
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
|
||||
return record;
|
||||
@ -3956,26 +3800,9 @@ got_record:;
|
||||
/* Record does not cross a page boundary */
|
||||
if (!RecordIsValid(record, *RecPtr, emode))
|
||||
goto next_record_is_invalid;
|
||||
if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
|
||||
MAXALIGN(total_len))
|
||||
nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
|
||||
EndRecPtr.xlogid = RecPtr->xlogid;
|
||||
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
|
||||
|
||||
/*
|
||||
* Check whether the current page needs to be read again. If there is no
|
||||
* unread record in the current page (nextRecord == NULL), obviously we
|
||||
* don't need to reread it. If we're not in streaming recovery mode yet,
|
||||
* partially-filled page doesn't need to be reread because it is the last
|
||||
* valid page.
|
||||
*/
|
||||
if (nextRecord != NULL && InStreamingRecovery &&
|
||||
XLByteLE(receivedUpto, EndRecPtr))
|
||||
{
|
||||
nextRecord = NULL;
|
||||
needReread = true;
|
||||
}
|
||||
|
||||
ReadRecPtr = *RecPtr;
|
||||
memcpy(buffer, record, total_len);
|
||||
|
||||
@ -3987,8 +3814,6 @@ got_record:;
|
||||
/* Pretend it extends to end of segment */
|
||||
EndRecPtr.xrecoff += XLogSegSize - 1;
|
||||
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
|
||||
nextRecord = NULL; /* definitely not on same page */
|
||||
needReread = false;
|
||||
|
||||
/*
|
||||
* Pretend that readBuf contains the last page of the segment. This is
|
||||
@ -4005,7 +3830,6 @@ next_record_is_invalid:;
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
}
|
||||
nextRecord = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -5730,7 +5554,7 @@ StartupXLOG(void)
|
||||
(errmsg("checkpoint record is at %X/%X",
|
||||
checkPointLoc.xlogid, checkPointLoc.xrecoff)));
|
||||
}
|
||||
else if (InStreamingRecovery)
|
||||
else if (StandbyMode)
|
||||
{
|
||||
/*
|
||||
* The last valid checkpoint record required for a streaming
|
||||
@ -5938,12 +5762,12 @@ StartupXLOG(void)
|
||||
if (XLByteLT(checkPoint.redo, RecPtr))
|
||||
{
|
||||
/* back up to find the record */
|
||||
record = FetchRecord(&(checkPoint.redo), PANIC, false);
|
||||
record = ReadRecord(&(checkPoint.redo), PANIC, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* just have to read next record after CheckPoint */
|
||||
record = FetchRecord(NULL, LOG, false);
|
||||
record = ReadRecord(NULL, LOG, false);
|
||||
}
|
||||
|
||||
if (record != NULL)
|
||||
@ -6096,7 +5920,7 @@ StartupXLOG(void)
|
||||
|
||||
LastRec = ReadRecPtr;
|
||||
|
||||
record = FetchRecord(NULL, LOG, false);
|
||||
record = ReadRecord(NULL, LOG, false);
|
||||
} while (record != NULL && recoveryContinue);
|
||||
|
||||
/*
|
||||
@ -6130,22 +5954,17 @@ StartupXLOG(void)
|
||||
|
||||
/*
|
||||
* We are now done reading the xlog from stream. Turn off streaming
|
||||
* recovery, and restart fetching the files (which would be required
|
||||
* at end of recovery, e.g., timeline history file) from archive.
|
||||
* recovery to force fetching the files (which would be required
|
||||
* at end of recovery, e.g., timeline history file) from archive or
|
||||
* pg_xlog.
|
||||
*/
|
||||
if (InStreamingRecovery)
|
||||
{
|
||||
/* We are no longer in streaming recovery state */
|
||||
InStreamingRecovery = false;
|
||||
ereport(LOG,
|
||||
(errmsg("streaming recovery complete")));
|
||||
}
|
||||
StandbyMode = false;
|
||||
|
||||
/*
|
||||
* Re-fetch the last valid or last applied record, so we can identify the
|
||||
* exact endpoint of what we consider the valid portion of WAL.
|
||||
*/
|
||||
record = ReadRecord(&LastRec, PANIC);
|
||||
record = ReadRecord(&LastRec, PANIC, false);
|
||||
EndOfLog = EndRecPtr;
|
||||
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
|
||||
|
||||
@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
record = FetchRecord(&RecPtr, LOG, true);
|
||||
record = ReadRecord(&RecPtr, LOG, true);
|
||||
|
||||
if (record == NULL)
|
||||
{
|
||||
@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags)
|
||||
}
|
||||
LWLockRelease(ControlFileLock);
|
||||
|
||||
/* Are we doing recovery from XLOG stream? */
|
||||
if (!InStreamingRecovery)
|
||||
InStreamingRecovery = WalRcvInProgress();
|
||||
|
||||
/*
|
||||
* Delete old log files (those no longer needed even for previous
|
||||
* checkpoint/restartpoint) to prevent the disk holding the xlog from
|
||||
@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags)
|
||||
* streaming recovery we have to or the disk will eventually fill up from
|
||||
* old log files streamed from master.
|
||||
*/
|
||||
if (InStreamingRecovery && (_logId || _logSeg))
|
||||
if (WalRcvInProgress() && (_logId || _logSeg))
|
||||
{
|
||||
XLogRecPtr endptr;
|
||||
|
||||
@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void)
|
||||
*/
|
||||
if (shutdown_requested)
|
||||
proc_exit(1);
|
||||
|
||||
/*
|
||||
* Emergency bailout if postmaster has died. This is to avoid the
|
||||
* necessity for manual cleanup of all postmaster children.
|
||||
*/
|
||||
if (IsUnderPostmaster && !PostmasterIsAlive(true))
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Main entry point for startup process */
|
||||
@ -8843,3 +8665,281 @@ StartupProcessMain(void)
|
||||
*/
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Read the XLOG page containing RecPtr into readBuf (if not read already).
|
||||
* Returns true if successful, false otherwise or fails if emode is PANIC.
|
||||
*
|
||||
* This is responsible for restoring files from archive as needed, as well
|
||||
* as for waiting for the requested WAL record to arrive in standby mode.
|
||||
*/
|
||||
static bool
|
||||
XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
|
||||
bool randAccess)
|
||||
{
|
||||
static XLogRecPtr receivedUpto = {0, 0};
|
||||
bool switched_segment = false;
|
||||
uint32 targetPageOff;
|
||||
uint32 targetRecOff;
|
||||
uint32 targetId;
|
||||
uint32 targetSeg;
|
||||
|
||||
XLByteToSeg(*RecPtr, targetId, targetSeg);
|
||||
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
|
||||
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
|
||||
|
||||
/* Fast exit if we have read the record in the current buffer already */
|
||||
if (targetId == readId && targetSeg == readSeg &&
|
||||
targetPageOff == readOff && targetRecOff < readLen)
|
||||
return true;
|
||||
|
||||
/*
|
||||
* See if we need to switch to a new segment because the requested record
|
||||
* is not in the currently open one.
|
||||
*/
|
||||
if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
|
||||
{
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
}
|
||||
|
||||
XLByteToSeg(*RecPtr, readId, readSeg);
|
||||
|
||||
/* See if we need to retrieve more data */
|
||||
if (readFile < 0 ||
|
||||
(readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
|
||||
{
|
||||
if (StandbyMode)
|
||||
{
|
||||
bool last_restore_failed = false;
|
||||
|
||||
/*
|
||||
* In standby mode, wait for the requested record to become
|
||||
* available, either via restore_command succeeding to restore
|
||||
* the segment, or via walreceiver having streamed the record.
|
||||
*/
|
||||
for (;;)
|
||||
{
|
||||
if (WalRcvInProgress())
|
||||
{
|
||||
/*
|
||||
* While walreceiver is active, wait for new WAL to
|
||||
* arrive from primary.
|
||||
*/
|
||||
receivedUpto = GetWalRcvWriteRecPtr();
|
||||
if (XLByteLT(*RecPtr, receivedUpto))
|
||||
{
|
||||
/*
|
||||
* Great, streamed far enough. Open the file if it's
|
||||
* not open already.
|
||||
*/
|
||||
if (readFile < 0)
|
||||
{
|
||||
readFile =
|
||||
XLogFileRead(readId, readSeg, PANIC,
|
||||
recoveryTargetTLI, false, false);
|
||||
switched_segment = true;
|
||||
readStreamed = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (CheckForStandbyTrigger())
|
||||
goto next_record_is_invalid;
|
||||
|
||||
/*
|
||||
* When streaming is active, we want to react quickly when
|
||||
* the next WAL record arrives, so sleep only a bit.
|
||||
*/
|
||||
pg_usleep(100000L); /* 100ms */
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Until walreceiver manages to reconnect, poll the
|
||||
* archive.
|
||||
*/
|
||||
if (readFile >= 0)
|
||||
{
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
}
|
||||
/* Reset curFileTLI if random fetch. */
|
||||
if (randAccess)
|
||||
curFileTLI = 0;
|
||||
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
|
||||
switched_segment = true;
|
||||
readStreamed = false;
|
||||
if (readFile != -1)
|
||||
{
|
||||
elog(DEBUG1, "got WAL segment from archive");
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we succeeded restoring some segments from archive
|
||||
* since the last connection attempt (or we haven't
|
||||
* tried streaming yet, retry immediately. But if we
|
||||
* haven't, assume the problem is persistent, so be
|
||||
* less aggressive.
|
||||
*/
|
||||
if (last_restore_failed)
|
||||
{
|
||||
/*
|
||||
* Check to see if the trigger file exists. Note that
|
||||
* we do this only after failure, so when you create
|
||||
* the trigger file, we still finish replaying as much
|
||||
* as we can before failover.
|
||||
*/
|
||||
if (CheckForStandbyTrigger())
|
||||
goto next_record_is_invalid;
|
||||
pg_usleep(5000000L); /* 5 seconds */
|
||||
}
|
||||
last_restore_failed = true;
|
||||
|
||||
/*
|
||||
* Nope, not found in archive. Try to stream it.
|
||||
*
|
||||
* If fetching_ckpt is TRUE, RecPtr points to the initial
|
||||
* checkpoint location. In that case, we use RedoStartLSN
|
||||
* as the streaming start position instead of RecPtr, so
|
||||
* that when we later jump backwards to start redo at
|
||||
* RedoStartLSN, we will have the logs streamed already.
|
||||
*/
|
||||
RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
|
||||
PrimaryConnInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* This possibly-long loop needs to handle interrupts of startup
|
||||
* process.
|
||||
*/
|
||||
HandleStartupProcInterrupts();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* In archive or crash recovery. */
|
||||
if (readFile < 0)
|
||||
{
|
||||
/* Reset curFileTLI if random fetch. */
|
||||
if (randAccess)
|
||||
curFileTLI = 0;
|
||||
readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
|
||||
InArchiveRecovery);
|
||||
switched_segment = true;
|
||||
readStreamed = false;
|
||||
if (readFile < 0)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* At this point, we have the right segment open and we know the
|
||||
* requested record is in it.
|
||||
*/
|
||||
Assert(readFile != -1);
|
||||
|
||||
/*
|
||||
* If the current segment is being streamed from master, calculate
|
||||
* how much of the current page we have received already. We know the
|
||||
* requested record has been received, but this is for the benefit
|
||||
* of future calls, to allow quick exit at the top of this function.
|
||||
*/
|
||||
if (readStreamed)
|
||||
{
|
||||
if (RecPtr->xlogid != receivedUpto.xlogid ||
|
||||
(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
|
||||
{
|
||||
readLen = XLOG_BLCKSZ;
|
||||
}
|
||||
else
|
||||
readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
|
||||
}
|
||||
else
|
||||
readLen = XLOG_BLCKSZ;
|
||||
|
||||
if (switched_segment && targetPageOff != 0)
|
||||
{
|
||||
/*
|
||||
* Whenever switching to a new WAL segment, we read the first page of
|
||||
* the file and validate its header, even if that's not where the
|
||||
* target record is. This is so that we can check the additional
|
||||
* identification info that is present in the first page's "long"
|
||||
* header.
|
||||
*/
|
||||
readOff = 0;
|
||||
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log file %u, segment %u, offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
|
||||
/* Read the requested page */
|
||||
readOff = targetPageOff;
|
||||
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not seek in log file %u, segment %u to offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
ereport(emode,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read from log file %u, segment %u, offset %u: %m",
|
||||
readId, readSeg, readOff)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
|
||||
goto next_record_is_invalid;
|
||||
|
||||
Assert(targetId == readId);
|
||||
Assert(targetSeg == readSeg);
|
||||
Assert(targetPageOff == readOff);
|
||||
Assert(targetRecOff < readLen);
|
||||
|
||||
return true;
|
||||
|
||||
next_record_is_invalid:
|
||||
if (readFile >= 0)
|
||||
close(readFile);
|
||||
readFile = -1;
|
||||
readStreamed = false;
|
||||
readLen = 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check to see if the trigger file exists. If it does, request postmaster
|
||||
* to shut down walreceiver, wait for it to exit, remove the trigger
|
||||
* file, and return true.
|
||||
*/
|
||||
static bool
|
||||
CheckForStandbyTrigger(void)
|
||||
{
|
||||
struct stat stat_buf;
|
||||
|
||||
if (TriggerFile == NULL)
|
||||
return false;
|
||||
|
||||
if (stat(TriggerFile, &stat_buf) == 0)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("trigger file found: %s", TriggerFile)));
|
||||
ShutdownWalRcv();
|
||||
unlink(TriggerFile);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -37,7 +37,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.601 2010/01/15 09:19:02 heikki Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.602 2010/01/27 15:27:50 heikki Exp $
|
||||
*
|
||||
* NOTES
|
||||
*
|
||||
@ -224,9 +224,6 @@ static int Shutdown = NoShutdown;
|
||||
static bool FatalError = false; /* T if recovering from backend crash */
|
||||
static bool RecoveryError = false; /* T if WAL recovery failed */
|
||||
|
||||
/* If WalReceiverActive is true, restart walreceiver if it dies */
|
||||
static bool WalReceiverActive = false;
|
||||
|
||||
/*
|
||||
* We use a simple state machine to control startup, shutdown, and
|
||||
* crash recovery (which is rather like shutdown followed by startup).
|
||||
@ -1469,11 +1466,6 @@ ServerLoop(void)
|
||||
if (PgStatPID == 0 && pmState == PM_RUN)
|
||||
PgStatPID = pgstat_start();
|
||||
|
||||
/* If we have lost walreceiver, try to start a new one */
|
||||
if (WalReceiverPID == 0 && WalReceiverActive &&
|
||||
(pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT))
|
||||
WalReceiverPID = StartWalReceiver();
|
||||
|
||||
/* If we need to signal the autovacuum launcher, do so now */
|
||||
if (avlauncher_needs_signal)
|
||||
{
|
||||
@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS)
|
||||
WalReceiverPID == 0)
|
||||
{
|
||||
/* Startup Process wants us to start the walreceiver process. */
|
||||
WalReceiverActive = true;
|
||||
WalReceiverPID = StartWalReceiver();
|
||||
}
|
||||
|
||||
if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER))
|
||||
{
|
||||
/* The walreceiver process doesn't want to be restarted anymore */
|
||||
WalReceiverActive = false;
|
||||
}
|
||||
|
||||
PG_SETMASK(&UnBlockSig);
|
||||
|
||||
errno = save_errno;
|
||||
|
@ -29,7 +29,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
|
||||
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
|
||||
|
||||
/* Prototypes for private functions */
|
||||
static void InitWalRcv(void);
|
||||
static void WalRcvKill(int code, Datum arg);
|
||||
static void WalRcvDie(int code, Datum arg);
|
||||
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
|
||||
static void XLogWalRcvFlush(void);
|
||||
|
||||
@ -153,21 +152,57 @@ static struct
|
||||
void
|
||||
WalReceiverMain(void)
|
||||
{
|
||||
sigjmp_buf local_sigjmp_buf;
|
||||
MemoryContext walrcv_context;
|
||||
char conninfo[MAXCONNINFO];
|
||||
XLogRecPtr startpoint;
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
|
||||
/* Load the libpq-specific functions */
|
||||
load_file("libpqwalreceiver", false);
|
||||
if (walrcv_connect == NULL || walrcv_receive == NULL ||
|
||||
walrcv_disconnect == NULL)
|
||||
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||
/*
|
||||
* WalRcv should be set up already (if we are a backend, we inherit
|
||||
* this by fork() or EXEC_BACKEND mechanism from the postmaster).
|
||||
*/
|
||||
Assert(walrcv != NULL);
|
||||
|
||||
/* Mark walreceiver in progress */
|
||||
InitWalRcv();
|
||||
/*
|
||||
* Mark walreceiver as running in shared memory.
|
||||
*
|
||||
* Do this as early as possible, so that if we fail later on, we'll
|
||||
* set state to STOPPED. If we die before this, the startup process
|
||||
* will keep waiting for us to start up, until it times out.
|
||||
*/
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
Assert(walrcv->pid == 0);
|
||||
switch(walrcv->walRcvState)
|
||||
{
|
||||
case WALRCV_STOPPING:
|
||||
/* If we've already been requested to stop, don't start up. */
|
||||
walrcv->walRcvState = WALRCV_STOPPED;
|
||||
/* fall through */
|
||||
|
||||
case WALRCV_STOPPED:
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
proc_exit(1);
|
||||
break;
|
||||
|
||||
case WALRCV_STARTING:
|
||||
/* The usual case */
|
||||
break;
|
||||
|
||||
case WALRCV_RUNNING:
|
||||
/* Shouldn't happen */
|
||||
elog(PANIC, "walreceiver still running according to shared memory state");
|
||||
}
|
||||
/* Advertise our PID so that the startup process can kill us */
|
||||
walrcv->pid = MyProcPid;
|
||||
walrcv->walRcvState = WALRCV_RUNNING;
|
||||
|
||||
/* Fetch information required to start streaming */
|
||||
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
|
||||
startpoint = walrcv->receivedUpto;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/* Arrange to clean up at walreceiver exit */
|
||||
on_shmem_exit(WalRcvDie, 0);
|
||||
|
||||
/*
|
||||
* If possible, make this process a group leader, so that the postmaster
|
||||
@ -200,81 +235,21 @@ WalReceiverMain(void)
|
||||
/* We allow SIGQUIT (quickdie) at all times */
|
||||
sigdelset(&BlockSig, SIGQUIT);
|
||||
|
||||
/* Load the libpq-specific functions */
|
||||
load_file("libpqwalreceiver", false);
|
||||
if (walrcv_connect == NULL || walrcv_receive == NULL ||
|
||||
walrcv_disconnect == NULL)
|
||||
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||
|
||||
/*
|
||||
* Create a resource owner to keep track of our resources (not clear that
|
||||
* we need this, but may as well have one).
|
||||
*/
|
||||
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
|
||||
|
||||
/*
|
||||
* Create a memory context that we will do all our work in. We do this so
|
||||
* that we can reset the context during error recovery and thereby avoid
|
||||
* possible memory leaks.
|
||||
*/
|
||||
walrcv_context = AllocSetContextCreate(TopMemoryContext,
|
||||
"Wal Receiver",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
MemoryContextSwitchTo(walrcv_context);
|
||||
|
||||
/*
|
||||
* If an exception is encountered, processing resumes here.
|
||||
*
|
||||
* This code is heavily based on bgwriter.c, q.v.
|
||||
*/
|
||||
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
|
||||
{
|
||||
/* Since not using PG_TRY, must reset error stack by hand */
|
||||
error_context_stack = NULL;
|
||||
|
||||
/* Reset WalRcvImmediateInterruptOK */
|
||||
DisableWalRcvImmediateExit();
|
||||
|
||||
/* Prevent interrupts while cleaning up */
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
/* Report the error to the server log */
|
||||
EmitErrorReport();
|
||||
|
||||
/* Disconnect any previous connection. */
|
||||
EnableWalRcvImmediateExit();
|
||||
walrcv_disconnect();
|
||||
DisableWalRcvImmediateExit();
|
||||
|
||||
/*
|
||||
* Now return to normal top-level context and clear ErrorContext for
|
||||
* next time.
|
||||
*/
|
||||
MemoryContextSwitchTo(walrcv_context);
|
||||
FlushErrorState();
|
||||
|
||||
/* Flush any leaked data in the top-level context */
|
||||
MemoryContextResetAndDeleteChildren(walrcv_context);
|
||||
|
||||
/* Now we can allow interrupts again */
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Sleep at least 1 second after any error. A write error is likely
|
||||
* to be repeated, and we don't want to be filling the error logs as
|
||||
* fast as we can.
|
||||
*/
|
||||
pg_usleep(1000000L);
|
||||
}
|
||||
|
||||
/* We can now handle ereport(ERROR) */
|
||||
PG_exception_stack = &local_sigjmp_buf;
|
||||
|
||||
/* Unblock signals (they were blocked when the postmaster forked us) */
|
||||
PG_SETMASK(&UnBlockSig);
|
||||
|
||||
/* Fetch connection information from shared memory */
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
|
||||
startpoint = walrcv->receivedUpto;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/* Establish the connection to the primary for XLOG streaming */
|
||||
EnableWalRcvImmediateExit();
|
||||
walrcv_connect(conninfo, startpoint);
|
||||
@ -330,63 +305,24 @@ WalReceiverMain(void)
|
||||
}
|
||||
}
|
||||
|
||||
/* Advertise our pid in shared memory, so that startup process can kill us. */
|
||||
static void
|
||||
InitWalRcv(void)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
|
||||
/*
|
||||
* WalRcv should be set up already (if we are a backend, we inherit
|
||||
* this by fork() or EXEC_BACKEND mechanism from the postmaster).
|
||||
*/
|
||||
if (walrcv == NULL)
|
||||
elog(PANIC, "walreceiver control data uninitialized");
|
||||
|
||||
/* If we've already been requested to stop, don't start up */
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
Assert(walrcv->pid == 0);
|
||||
if (walrcv->walRcvState == WALRCV_STOPPED ||
|
||||
walrcv->walRcvState == WALRCV_STOPPING)
|
||||
{
|
||||
walrcv->walRcvState = WALRCV_STOPPED;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
proc_exit(1);
|
||||
}
|
||||
walrcv->pid = MyProcPid;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/* Arrange to clean up at walreceiver exit */
|
||||
on_shmem_exit(WalRcvKill, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Clear our pid from shared memory at exit.
|
||||
* Mark us as STOPPED in shared memory at exit.
|
||||
*/
|
||||
static void
|
||||
WalRcvKill(int code, Datum arg)
|
||||
WalRcvDie(int code, Datum arg)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
bool stopped = false;
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
if (walrcv->walRcvState == WALRCV_STOPPING ||
|
||||
walrcv->walRcvState == WALRCV_STOPPED)
|
||||
{
|
||||
walrcv->walRcvState = WALRCV_STOPPED;
|
||||
stopped = true;
|
||||
elog(LOG, "walreceiver stopped");
|
||||
}
|
||||
Assert(walrcv->walRcvState == WALRCV_RUNNING ||
|
||||
walrcv->walRcvState == WALRCV_STOPPING);
|
||||
walrcv->walRcvState = WALRCV_STOPPED;
|
||||
walrcv->pid = 0;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/* Terminate the connection gracefully. */
|
||||
walrcv_disconnect();
|
||||
|
||||
/* If requested to stop, tell postmaster to not restart us. */
|
||||
if (stopped)
|
||||
SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
|
||||
}
|
||||
|
||||
/* SIGHUP: set flag to re-read config file at next convenient time */
|
||||
|
@ -10,7 +10,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -18,6 +18,8 @@
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
|
||||
@ -30,8 +32,11 @@
|
||||
|
||||
WalRcvData *WalRcv = NULL;
|
||||
|
||||
static bool CheckForStandbyTrigger(void);
|
||||
static void ShutdownWalRcv(void);
|
||||
/*
|
||||
* How long to wait for walreceiver to start up after requesting
|
||||
* postmaster to launch it. In seconds.
|
||||
*/
|
||||
#define WALRCV_STARTUP_TIMEOUT 10
|
||||
|
||||
/* Report shared memory space needed by WalRcvShmemInit */
|
||||
Size
|
||||
@ -62,7 +67,7 @@ WalRcvShmemInit(void)
|
||||
|
||||
/* Initialize the data structures */
|
||||
MemSet(WalRcv, 0, WalRcvShmemSize());
|
||||
WalRcv->walRcvState = WALRCV_NOT_STARTED;
|
||||
WalRcv->walRcvState = WALRCV_STOPPED;
|
||||
SpinLockInit(&WalRcv->mutex);
|
||||
}
|
||||
|
||||
@ -73,90 +78,51 @@ WalRcvInProgress(void)
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
WalRcvState state;
|
||||
pg_time_t startTime;
|
||||
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
|
||||
state = walrcv->walRcvState;
|
||||
startTime = walrcv->startTime;
|
||||
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
if (state == WALRCV_RUNNING || state == WALRCV_STOPPING)
|
||||
/*
|
||||
* If it has taken too long for walreceiver to start up, give up.
|
||||
* Setting the state to STOPPED ensures that if walreceiver later
|
||||
* does start up after all, it will see that it's not supposed to be
|
||||
* running and die without doing anything.
|
||||
*/
|
||||
if (state == WALRCV_STARTING)
|
||||
{
|
||||
pg_time_t now = (pg_time_t) time(NULL);
|
||||
|
||||
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
|
||||
{
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
|
||||
if (walrcv->walRcvState == WALRCV_STARTING)
|
||||
state = walrcv->walRcvState = WALRCV_STOPPED;
|
||||
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
if (state != WALRCV_STOPPED)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for the XLOG record at given position to become available.
|
||||
*
|
||||
* 'recptr' indicates the byte position which caller wants to read the
|
||||
* XLOG record up to. The byte position actually written and flushed
|
||||
* by walreceiver is returned. It can be higher than the requested
|
||||
* location, and the caller can safely read up to that point without
|
||||
* calling WaitNextXLogAvailable() again.
|
||||
*
|
||||
* If WAL streaming is ended (because a trigger file is found), *finished
|
||||
* is set to true and function returns immediately. The returned position
|
||||
* can be lower than requested in that case.
|
||||
*
|
||||
* Called by the startup process during streaming recovery.
|
||||
* Stop walreceiver (if running) and wait for it to die.
|
||||
*/
|
||||
XLogRecPtr
|
||||
WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
|
||||
{
|
||||
static XLogRecPtr receivedUpto = {0, 0};
|
||||
|
||||
*finished = false;
|
||||
|
||||
/* Quick exit if already known available */
|
||||
if (XLByteLT(recptr, receivedUpto))
|
||||
return receivedUpto;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
|
||||
/* Update local status */
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
receivedUpto = walrcv->receivedUpto;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/* If available already, leave here */
|
||||
if (XLByteLT(recptr, receivedUpto))
|
||||
return receivedUpto;
|
||||
|
||||
/* Check to see if the trigger file exists */
|
||||
if (CheckForStandbyTrigger())
|
||||
{
|
||||
*finished = true;
|
||||
return receivedUpto;
|
||||
}
|
||||
|
||||
pg_usleep(100000L); /* 100ms */
|
||||
|
||||
/*
|
||||
* This possibly-long loop needs to handle interrupts of startup
|
||||
* process.
|
||||
*/
|
||||
HandleStartupProcInterrupts();
|
||||
|
||||
/*
|
||||
* Emergency bailout if postmaster has died. This is to avoid the
|
||||
* necessity for manual cleanup of all postmaster children.
|
||||
*/
|
||||
if (!PostmasterIsAlive(true))
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Stop walreceiver and wait for it to die.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
ShutdownWalRcv(void)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
pid_t walrcvpid;
|
||||
pid_t walrcvpid = 0;
|
||||
|
||||
/*
|
||||
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
|
||||
@ -164,15 +130,25 @@ ShutdownWalRcv(void)
|
||||
* restart itself.
|
||||
*/
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
Assert(walrcv->walRcvState == WALRCV_RUNNING);
|
||||
walrcv->walRcvState = WALRCV_STOPPING;
|
||||
walrcvpid = walrcv->pid;
|
||||
switch(walrcv->walRcvState)
|
||||
{
|
||||
case WALRCV_STOPPED:
|
||||
break;
|
||||
case WALRCV_STARTING:
|
||||
walrcv->walRcvState = WALRCV_STOPPED;
|
||||
break;
|
||||
|
||||
case WALRCV_RUNNING:
|
||||
walrcv->walRcvState = WALRCV_STOPPING;
|
||||
/* fall through */
|
||||
case WALRCV_STOPPING:
|
||||
walrcvpid = walrcv->pid;
|
||||
break;
|
||||
}
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
/*
|
||||
* Pid can be 0, if no walreceiver process is active right now.
|
||||
* Postmaster should restart it, and when it does, it will see the
|
||||
* STOPPING state.
|
||||
* Signal walreceiver process if it was still running.
|
||||
*/
|
||||
if (walrcvpid != 0)
|
||||
kill(walrcvpid, SIGTERM);
|
||||
@ -193,30 +169,6 @@ ShutdownWalRcv(void)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check to see if the trigger file exists. If it does, request postmaster
|
||||
* to shut down walreceiver and wait for it to exit, and remove the trigger
|
||||
* file.
|
||||
*/
|
||||
static bool
|
||||
CheckForStandbyTrigger(void)
|
||||
{
|
||||
struct stat stat_buf;
|
||||
|
||||
if (TriggerFile == NULL)
|
||||
return false;
|
||||
|
||||
if (stat(TriggerFile, &stat_buf) == 0)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("trigger file found: %s", TriggerFile)));
|
||||
ShutdownWalRcv();
|
||||
unlink(TriggerFile);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Request postmaster to start walreceiver.
|
||||
*
|
||||
@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
|
||||
{
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = WalRcv;
|
||||
pg_time_t now = (pg_time_t) time(NULL);
|
||||
|
||||
Assert(walrcv->walRcvState == WALRCV_NOT_STARTED);
|
||||
/*
|
||||
* We always start at the beginning of the segment.
|
||||
* That prevents a broken segment (i.e., with no records in the
|
||||
* first half of a segment) from being created by XLOG streaming,
|
||||
* which might cause trouble later on if the segment is e.g
|
||||
* archived.
|
||||
*/
|
||||
if (recptr.xrecoff % XLogSegSize != 0)
|
||||
recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
|
||||
|
||||
/* It better be stopped before we try to restart it */
|
||||
Assert(walrcv->walRcvState == WALRCV_STOPPED);
|
||||
|
||||
/* locking is just pro forma here; walreceiver isn't started yet */
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
walrcv->receivedUpto = recptr;
|
||||
if (conninfo != NULL)
|
||||
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
|
||||
else
|
||||
walrcv->conninfo[0] = '\0';
|
||||
walrcv->walRcvState = WALRCV_RUNNING;
|
||||
walrcv->walRcvState = WALRCV_STARTING;
|
||||
walrcv->startTime = now;
|
||||
|
||||
walrcv->receivedUpto = recptr;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
|
||||
@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void)
|
||||
|
||||
return recptr;
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
*
|
||||
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
|
||||
*
|
||||
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.4 2010/01/20 18:54:27 heikki Exp $
|
||||
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -27,10 +27,10 @@
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
WALRCV_NOT_STARTED,
|
||||
WALRCV_RUNNING, /* walreceiver has been started */
|
||||
WALRCV_STOPPING, /* requested to stop, but still running */
|
||||
WALRCV_STOPPED /* stopped and mustn't start up again */
|
||||
WALRCV_STOPPED, /* stopped and mustn't start up again */
|
||||
WALRCV_STARTING, /* launched, but the process hasn't initialized yet */
|
||||
WALRCV_RUNNING, /* walreceiver is running */
|
||||
WALRCV_STOPPING /* requested to stop, but still running */
|
||||
} WalRcvState;
|
||||
|
||||
/* Shared memory area for management of walreceiver process */
|
||||
@ -47,6 +47,7 @@ typedef struct
|
||||
*/
|
||||
pid_t pid;
|
||||
WalRcvState walRcvState;
|
||||
pg_time_t startTime;
|
||||
|
||||
/*
|
||||
* receivedUpto-1 is the last byte position that has been already
|
||||
@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
|
||||
extern void WalReceiverMain(void);
|
||||
extern Size WalRcvShmemSize(void);
|
||||
extern void WalRcvShmemInit(void);
|
||||
extern void ShutdownWalRcv(void);
|
||||
extern bool WalRcvInProgress(void);
|
||||
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
|
||||
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
|
||||
|
@ -7,7 +7,7 @@
|
||||
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.28 2010/01/15 09:19:09 heikki Exp $
|
||||
* $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.29 2010/01/27 15:27:51 heikki Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -30,7 +30,6 @@ typedef enum
|
||||
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
|
||||
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
|
||||
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
|
||||
PMSIGNAL_SHUTDOWN_WALRECEIVER, /* shut down a walreceiver */
|
||||
|
||||
NUM_PMSIGNALS /* Must be last value of enum! */
|
||||
} PMSignalReason;
|
||||
|
Loading…
Reference in New Issue
Block a user