mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-24 18:55:04 +08:00
Add circular WAL decoding buffer, take II.
Teach xlogreader.c to decode the WAL into a circular buffer. This will support optimizations based on looking ahead, to follow in a later commit. * XLogReadRecord() works as before, decoding records one by one, and allowing them to be examined via the traditional XLogRecGetXXX() macros and certain traditional members like xlogreader->ReadRecPtr. * An alternative new interface XLogReadAhead()/XLogNextRecord() is added that returns pointers to DecodedXLogRecord objects so that it's now possible to look ahead in the WAL stream while replaying. * In order to be able to use the new interface effectively while streaming data, support is added for the page_read() callback to respond to a new nonblocking mode with XLREAD_WOULDBLOCK instead of waiting for more data to arrive. No direct user of the new interface is included in this commit, though XLogReadRecord() uses it internally. Existing code doesn't need to change, except in a few places where it was accessing reader internals directly and now needs to go through accessor macros. Reviewed-by: Julien Rouhaud <rjuju123@gmail.com> Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com> Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions) Discussion: https://postgr.es/m/CA+hUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq=AovOddfHpA@mail.gmail.com
This commit is contained in:
parent
7a7cd84893
commit
3f1ce97346
@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record)
|
||||
uint8 block_id;
|
||||
|
||||
/* Protect limited size of buffers[] array */
|
||||
Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
|
||||
Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES);
|
||||
|
||||
/* Iterate over blocks */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
XLogRedoAction action;
|
||||
|
||||
@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record)
|
||||
}
|
||||
|
||||
/* Changes are done: unlock and release all buffers */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
if (BufferIsValid(buffers[block_id]))
|
||||
UnlockReleaseBuffer(buffers[block_id]);
|
||||
|
@ -971,6 +971,8 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
if (XLOG_DEBUG)
|
||||
{
|
||||
static XLogReaderState *debug_reader = NULL;
|
||||
XLogRecord *record;
|
||||
DecodedXLogRecord *decoded;
|
||||
StringInfoData buf;
|
||||
StringInfoData recordBuf;
|
||||
char *errormsg = NULL;
|
||||
@ -990,6 +992,11 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
for (; rdata != NULL; rdata = rdata->next)
|
||||
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
|
||||
|
||||
/* We also need temporary space to decode the record. */
|
||||
record = (XLogRecord *) recordBuf.data;
|
||||
decoded = (DecodedXLogRecord *)
|
||||
palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
|
||||
|
||||
if (!debug_reader)
|
||||
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
|
||||
XL_ROUTINE(), NULL);
|
||||
@ -998,7 +1005,10 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
{
|
||||
appendStringInfoString(&buf, "error decoding record: out of memory while allocating a WAL reading processor");
|
||||
}
|
||||
else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
|
||||
else if (!DecodeXLogRecord(debug_reader,
|
||||
decoded,
|
||||
record,
|
||||
EndPos,
|
||||
&errormsg))
|
||||
{
|
||||
appendStringInfo(&buf, "error decoding record: %s",
|
||||
@ -1007,10 +1017,14 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
else
|
||||
{
|
||||
appendStringInfoString(&buf, " - ");
|
||||
|
||||
debug_reader->record = decoded;
|
||||
xlog_outdesc(&buf, debug_reader);
|
||||
debug_reader->record = NULL;
|
||||
}
|
||||
elog(LOG, "%s", buf.data);
|
||||
|
||||
pfree(decoded);
|
||||
pfree(buf.data);
|
||||
pfree(recordBuf.data);
|
||||
MemoryContextSwitchTo(oldCxt);
|
||||
@ -7738,7 +7752,7 @@ xlog_redo(XLogReaderState *record)
|
||||
* resource manager needs to generate conflicts, it has to define a
|
||||
* separate WAL record type and redo routine.
|
||||
*/
|
||||
for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
Buffer buffer;
|
||||
|
||||
|
@ -45,6 +45,7 @@ static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
|
||||
static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
|
||||
int reqLen);
|
||||
static void XLogReaderInvalReadState(XLogReaderState *state);
|
||||
static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool non_blocking);
|
||||
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
|
||||
XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
|
||||
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
|
||||
@ -56,6 +57,12 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
|
||||
/* size of the buffer allocated for error message. */
|
||||
#define MAX_ERRORMSG_LEN 1000
|
||||
|
||||
/*
|
||||
* Default size; large enough that typical users of XLogReader won't often need
|
||||
* to use the 'oversized' memory allocation code path.
|
||||
*/
|
||||
#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
|
||||
|
||||
/*
|
||||
* Construct a string in state->errormsg_buf explaining what's wrong with
|
||||
* the current record being read.
|
||||
@ -70,6 +77,24 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
|
||||
va_start(args, fmt);
|
||||
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
|
||||
va_end(args);
|
||||
|
||||
state->errormsg_deferred = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Set the size of the decoding buffer. A pointer to a caller supplied memory
|
||||
* region may also be passed in, in which case non-oversized records will be
|
||||
* decoded there.
|
||||
*/
|
||||
void
|
||||
XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
|
||||
{
|
||||
Assert(state->decode_buffer == NULL);
|
||||
|
||||
state->decode_buffer = buffer;
|
||||
state->decode_buffer_size = size;
|
||||
state->decode_buffer_tail = buffer;
|
||||
state->decode_buffer_head = buffer;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -92,8 +117,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
|
||||
/* initialize caller-provided support functions */
|
||||
state->routine = *routine;
|
||||
|
||||
state->max_block_id = -1;
|
||||
|
||||
/*
|
||||
* Permanently allocate readBuf. We do it this way, rather than just
|
||||
* making a static array, for two reasons: (1) no need to waste the
|
||||
@ -144,18 +167,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
|
||||
void
|
||||
XLogReaderFree(XLogReaderState *state)
|
||||
{
|
||||
int block_id;
|
||||
|
||||
if (state->seg.ws_file != -1)
|
||||
state->routine.segment_close(state);
|
||||
|
||||
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
|
||||
{
|
||||
if (state->blocks[block_id].data)
|
||||
pfree(state->blocks[block_id].data);
|
||||
}
|
||||
if (state->main_data)
|
||||
pfree(state->main_data);
|
||||
if (state->decode_buffer && state->free_decode_buffer)
|
||||
pfree(state->decode_buffer);
|
||||
|
||||
pfree(state->errormsg_buf);
|
||||
if (state->readRecordBuf)
|
||||
@ -251,7 +267,133 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
|
||||
|
||||
/* Begin at the passed-in record pointer. */
|
||||
state->EndRecPtr = RecPtr;
|
||||
state->NextRecPtr = RecPtr;
|
||||
state->ReadRecPtr = InvalidXLogRecPtr;
|
||||
state->DecodeRecPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* See if we can release the last record that was returned by
|
||||
* XLogNextRecord(), if any, to free up space.
|
||||
*/
|
||||
void
|
||||
XLogReleasePreviousRecord(XLogReaderState *state)
|
||||
{
|
||||
DecodedXLogRecord *record;
|
||||
|
||||
if (!state->record)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Remove it from the decoded record queue. It must be the oldest item
|
||||
* decoded, decode_queue_head.
|
||||
*/
|
||||
record = state->record;
|
||||
Assert(record == state->decode_queue_head);
|
||||
state->record = NULL;
|
||||
state->decode_queue_head = record->next;
|
||||
|
||||
/* It might also be the newest item decoded, decode_queue_tail. */
|
||||
if (state->decode_queue_tail == record)
|
||||
state->decode_queue_tail = NULL;
|
||||
|
||||
/* Release the space. */
|
||||
if (unlikely(record->oversized))
|
||||
{
|
||||
/* It's not in the the decode buffer, so free it to release space. */
|
||||
pfree(record);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* It must be the head (oldest) record in the decode buffer. */
|
||||
Assert(state->decode_buffer_head == (char *) record);
|
||||
|
||||
/*
|
||||
* We need to update head to point to the next record that is in the
|
||||
* decode buffer, if any, being careful to skip oversized ones
|
||||
* (they're not in the decode buffer).
|
||||
*/
|
||||
record = record->next;
|
||||
while (unlikely(record && record->oversized))
|
||||
record = record->next;
|
||||
|
||||
if (record)
|
||||
{
|
||||
/* Adjust head to release space up to the next record. */
|
||||
state->decode_buffer_head = (char *) record;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Otherwise we might as well just reset head and tail to the
|
||||
* start of the buffer space, because we're empty. This means
|
||||
* we'll keep overwriting the same piece of memory if we're not
|
||||
* doing any prefetching.
|
||||
*/
|
||||
state->decode_buffer_head = state->decode_buffer;
|
||||
state->decode_buffer_tail = state->decode_buffer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Attempt to read an XLOG record.
|
||||
*
|
||||
* XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be
|
||||
* called before the first call to XLogNextRecord(). This functions returns
|
||||
* records and errors that were put into an internal queue by XLogReadAhead().
|
||||
*
|
||||
* On success, a record is returned.
|
||||
*
|
||||
* The returned record (or *errormsg) points to an internal buffer that's
|
||||
* valid until the next call to XLogNextRecord.
|
||||
*/
|
||||
DecodedXLogRecord *
|
||||
XLogNextRecord(XLogReaderState *state, char **errormsg)
|
||||
{
|
||||
/* Release the last record returned by XLogNextRecord(). */
|
||||
XLogReleasePreviousRecord(state);
|
||||
|
||||
if (state->decode_queue_head == NULL)
|
||||
{
|
||||
*errormsg = NULL;
|
||||
if (state->errormsg_deferred)
|
||||
{
|
||||
if (state->errormsg_buf[0] != '\0')
|
||||
*errormsg = state->errormsg_buf;
|
||||
state->errormsg_deferred = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* state->EndRecPtr is expected to have been set by the last call to
|
||||
* XLogBeginRead() or XLogNextRecord(), and is the location of the
|
||||
* error.
|
||||
*/
|
||||
Assert(!XLogRecPtrIsInvalid(state->EndRecPtr));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Record this as the most recent record returned, so that we'll release
|
||||
* it next time. This also exposes it to the traditional
|
||||
* XLogRecXXX(xlogreader) macros, which work with the decoder rather than
|
||||
* the record for historical reasons.
|
||||
*/
|
||||
state->record = state->decode_queue_head;
|
||||
|
||||
/*
|
||||
* Update the pointers to the beginning and one-past-the-end of this
|
||||
* record, again for the benefit of historical code that expected the
|
||||
* decoder to track this rather than accessing these fields of the record
|
||||
* itself.
|
||||
*/
|
||||
state->ReadRecPtr = state->record->lsn;
|
||||
state->EndRecPtr = state->record->next_lsn;
|
||||
|
||||
*errormsg = NULL;
|
||||
|
||||
return state->record;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -272,6 +414,119 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
|
||||
*/
|
||||
XLogRecord *
|
||||
XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
{
|
||||
DecodedXLogRecord *decoded;
|
||||
|
||||
/*
|
||||
* Release last returned record, if there is one. We need to do this so
|
||||
* that we can check for empty decode queue accurately.
|
||||
*/
|
||||
XLogReleasePreviousRecord(state);
|
||||
|
||||
/*
|
||||
* Call XLogReadAhead() in blocking mode to make sure there is something
|
||||
* in the queue, though we don't use the result.
|
||||
*/
|
||||
if (!XLogReaderHasQueuedRecordOrError(state))
|
||||
XLogReadAhead(state, false /* nonblocking */ );
|
||||
|
||||
/* Consume the head record or error. */
|
||||
decoded = XLogNextRecord(state, errormsg);
|
||||
if (decoded)
|
||||
{
|
||||
/*
|
||||
* This function returns a pointer to the record's header, not the
|
||||
* actual decoded record. The caller will access the decoded record
|
||||
* through the XLogRecGetXXX() macros, which reach the decoded
|
||||
* recorded as xlogreader->record.
|
||||
*/
|
||||
Assert(state->record == decoded);
|
||||
return &decoded->header;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Allocate space for a decoded record. The only member of the returned
|
||||
* object that is initialized is the 'oversized' flag, indicating that the
|
||||
* decoded record wouldn't fit in the decode buffer and must eventually be
|
||||
* freed explicitly.
|
||||
*
|
||||
* The caller is responsible for adjusting decode_buffer_tail with the real
|
||||
* size after successfully decoding a record into this space. This way, if
|
||||
* decoding fails, then there is nothing to undo unless the 'oversized' flag
|
||||
* was set and pfree() must be called.
|
||||
*
|
||||
* Return NULL if there is no space in the decode buffer and allow_oversized
|
||||
* is false, or if memory allocation fails for an oversized buffer.
|
||||
*/
|
||||
static DecodedXLogRecord *
|
||||
XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
|
||||
{
|
||||
size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
|
||||
DecodedXLogRecord *decoded = NULL;
|
||||
|
||||
/* Allocate a circular decode buffer if we don't have one already. */
|
||||
if (unlikely(state->decode_buffer == NULL))
|
||||
{
|
||||
if (state->decode_buffer_size == 0)
|
||||
state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
|
||||
state->decode_buffer = palloc(state->decode_buffer_size);
|
||||
state->decode_buffer_head = state->decode_buffer;
|
||||
state->decode_buffer_tail = state->decode_buffer;
|
||||
state->free_decode_buffer = true;
|
||||
}
|
||||
|
||||
/* Try to allocate space in the circular decode buffer. */
|
||||
if (state->decode_buffer_tail >= state->decode_buffer_head)
|
||||
{
|
||||
/* Empty, or tail is to the right of head. */
|
||||
if (state->decode_buffer_tail + required_space <=
|
||||
state->decode_buffer + state->decode_buffer_size)
|
||||
{
|
||||
/* There is space between tail and end. */
|
||||
decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
|
||||
decoded->oversized = false;
|
||||
return decoded;
|
||||
}
|
||||
else if (state->decode_buffer + required_space <
|
||||
state->decode_buffer_head)
|
||||
{
|
||||
/* There is space between start and head. */
|
||||
decoded = (DecodedXLogRecord *) state->decode_buffer;
|
||||
decoded->oversized = false;
|
||||
return decoded;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Tail is to the left of head. */
|
||||
if (state->decode_buffer_tail + required_space <
|
||||
state->decode_buffer_head)
|
||||
{
|
||||
/* There is space between tail and head. */
|
||||
decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
|
||||
decoded->oversized = false;
|
||||
return decoded;
|
||||
}
|
||||
}
|
||||
|
||||
/* Not enough space in the decode buffer. Are we allowed to allocate? */
|
||||
if (allow_oversized)
|
||||
{
|
||||
decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
|
||||
if (decoded == NULL)
|
||||
return NULL;
|
||||
decoded->oversized = true;
|
||||
return decoded;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static XLogPageReadResult
|
||||
XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
|
||||
{
|
||||
XLogRecPtr RecPtr;
|
||||
XLogRecord *record;
|
||||
@ -284,6 +539,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
bool assembled;
|
||||
bool gotheader;
|
||||
int readOff;
|
||||
DecodedXLogRecord *decoded;
|
||||
char *errormsg; /* not used */
|
||||
|
||||
/*
|
||||
* randAccess indicates whether to verify the previous-record pointer of
|
||||
@ -293,21 +550,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
randAccess = false;
|
||||
|
||||
/* reset error state */
|
||||
*errormsg = NULL;
|
||||
state->errormsg_buf[0] = '\0';
|
||||
decoded = NULL;
|
||||
|
||||
ResetDecoder(state);
|
||||
state->abortedRecPtr = InvalidXLogRecPtr;
|
||||
state->missingContrecPtr = InvalidXLogRecPtr;
|
||||
|
||||
RecPtr = state->EndRecPtr;
|
||||
RecPtr = state->NextRecPtr;
|
||||
|
||||
if (state->ReadRecPtr != InvalidXLogRecPtr)
|
||||
if (state->DecodeRecPtr != InvalidXLogRecPtr)
|
||||
{
|
||||
/* read the record after the one we just read */
|
||||
|
||||
/*
|
||||
* EndRecPtr is pointing to end+1 of the previous WAL record. If
|
||||
* NextRecPtr is pointing to end+1 of the previous WAL record. If
|
||||
* we're at a page boundary, no more records can fit on the current
|
||||
* page. We must skip over the page header, but we can't do that until
|
||||
* we've read in the page, since the header size is variable.
|
||||
@ -318,7 +574,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
/*
|
||||
* Caller supplied a position to start at.
|
||||
*
|
||||
* In this case, EndRecPtr should already be pointing to a valid
|
||||
* In this case, NextRecPtr should already be pointing to a valid
|
||||
* record starting position.
|
||||
*/
|
||||
Assert(XRecOffIsValid(RecPtr));
|
||||
@ -326,6 +582,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
}
|
||||
|
||||
restart:
|
||||
state->nonblocking = nonblocking;
|
||||
state->currRecPtr = RecPtr;
|
||||
assembled = false;
|
||||
|
||||
@ -339,7 +596,9 @@ restart:
|
||||
*/
|
||||
readOff = ReadPageInternal(state, targetPagePtr,
|
||||
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
|
||||
if (readOff < 0)
|
||||
if (readOff == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readOff < 0)
|
||||
goto err;
|
||||
|
||||
/*
|
||||
@ -395,7 +654,7 @@ restart:
|
||||
*/
|
||||
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
|
||||
{
|
||||
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
|
||||
if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
|
||||
randAccess))
|
||||
goto err;
|
||||
gotheader = true;
|
||||
@ -414,6 +673,31 @@ restart:
|
||||
gotheader = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find space to decode this record. Don't allow oversized allocation if
|
||||
* the caller requested nonblocking. Otherwise, we *have* to try to
|
||||
* decode the record now because the caller has nothing else to do, so
|
||||
* allow an oversized record to be palloc'd if that turns out to be
|
||||
* necessary.
|
||||
*/
|
||||
decoded = XLogReadRecordAlloc(state,
|
||||
total_len,
|
||||
!nonblocking /* allow_oversized */ );
|
||||
if (decoded == NULL)
|
||||
{
|
||||
/*
|
||||
* There is no space in the decode buffer. The caller should help
|
||||
* with that problem by consuming some records.
|
||||
*/
|
||||
if (nonblocking)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
|
||||
/* We failed to allocate memory for an oversized record. */
|
||||
report_invalid_record(state,
|
||||
"out of memory while trying to decode a record of length %u", total_len);
|
||||
goto err;
|
||||
}
|
||||
|
||||
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
|
||||
if (total_len > len)
|
||||
{
|
||||
@ -453,7 +737,9 @@ restart:
|
||||
Min(total_len - gotlen + SizeOfXLogShortPHD,
|
||||
XLOG_BLCKSZ));
|
||||
|
||||
if (readOff < 0)
|
||||
if (readOff == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readOff < 0)
|
||||
goto err;
|
||||
|
||||
Assert(SizeOfXLogShortPHD <= readOff);
|
||||
@ -471,7 +757,6 @@ restart:
|
||||
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
|
||||
{
|
||||
state->overwrittenRecPtr = RecPtr;
|
||||
ResetDecoder(state);
|
||||
RecPtr = targetPagePtr;
|
||||
goto restart;
|
||||
}
|
||||
@ -526,7 +811,7 @@ restart:
|
||||
if (!gotheader)
|
||||
{
|
||||
record = (XLogRecord *) state->readRecordBuf;
|
||||
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
|
||||
if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
|
||||
record, randAccess))
|
||||
goto err;
|
||||
gotheader = true;
|
||||
@ -540,8 +825,8 @@ restart:
|
||||
goto err;
|
||||
|
||||
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
|
||||
state->ReadRecPtr = RecPtr;
|
||||
state->EndRecPtr = targetPagePtr + pageHeaderSize
|
||||
state->DecodeRecPtr = RecPtr;
|
||||
state->NextRecPtr = targetPagePtr + pageHeaderSize
|
||||
+ MAXALIGN(pageHeader->xlp_rem_len);
|
||||
}
|
||||
else
|
||||
@ -549,16 +834,18 @@ restart:
|
||||
/* Wait for the record data to become available */
|
||||
readOff = ReadPageInternal(state, targetPagePtr,
|
||||
Min(targetRecOff + total_len, XLOG_BLCKSZ));
|
||||
if (readOff < 0)
|
||||
if (readOff == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readOff < 0)
|
||||
goto err;
|
||||
|
||||
/* Record does not cross a page boundary */
|
||||
if (!ValidXLogRecord(state, record, RecPtr))
|
||||
goto err;
|
||||
|
||||
state->EndRecPtr = RecPtr + MAXALIGN(total_len);
|
||||
state->NextRecPtr = RecPtr + MAXALIGN(total_len);
|
||||
|
||||
state->ReadRecPtr = RecPtr;
|
||||
state->DecodeRecPtr = RecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -568,14 +855,40 @@ restart:
|
||||
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
|
||||
{
|
||||
/* Pretend it extends to end of segment */
|
||||
state->EndRecPtr += state->segcxt.ws_segsize - 1;
|
||||
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
|
||||
state->NextRecPtr += state->segcxt.ws_segsize - 1;
|
||||
state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
|
||||
}
|
||||
|
||||
if (DecodeXLogRecord(state, record, errormsg))
|
||||
return record;
|
||||
if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
|
||||
{
|
||||
/* Record the location of the next record. */
|
||||
decoded->next_lsn = state->NextRecPtr;
|
||||
|
||||
/*
|
||||
* If it's in the decode buffer, mark the decode buffer space as
|
||||
* occupied.
|
||||
*/
|
||||
if (!decoded->oversized)
|
||||
{
|
||||
/* The new decode buffer head must be MAXALIGNed. */
|
||||
Assert(decoded->size == MAXALIGN(decoded->size));
|
||||
if ((char *) decoded == state->decode_buffer)
|
||||
state->decode_buffer_tail = state->decode_buffer + decoded->size;
|
||||
else
|
||||
state->decode_buffer_tail += decoded->size;
|
||||
}
|
||||
|
||||
/* Insert it into the queue of decoded records. */
|
||||
Assert(state->decode_queue_tail != decoded);
|
||||
if (state->decode_queue_tail)
|
||||
state->decode_queue_tail->next = decoded;
|
||||
state->decode_queue_tail = decoded;
|
||||
if (!state->decode_queue_head)
|
||||
state->decode_queue_head = decoded;
|
||||
return XLREAD_SUCCESS;
|
||||
}
|
||||
else
|
||||
return NULL;
|
||||
return XLREAD_FAIL;
|
||||
|
||||
err:
|
||||
if (assembled)
|
||||
@ -593,14 +906,46 @@ err:
|
||||
state->missingContrecPtr = targetPagePtr;
|
||||
}
|
||||
|
||||
if (decoded && decoded->oversized)
|
||||
pfree(decoded);
|
||||
|
||||
/*
|
||||
* Invalidate the read state. We might read from a different source after
|
||||
* failure.
|
||||
*/
|
||||
XLogReaderInvalReadState(state);
|
||||
|
||||
if (state->errormsg_buf[0] != '\0')
|
||||
*errormsg = state->errormsg_buf;
|
||||
/*
|
||||
* If an error was written to errmsg_buf, it'll be returned to the caller
|
||||
* of XLogReadRecord() after all successfully decoded records from the
|
||||
* read queue.
|
||||
*/
|
||||
|
||||
return XLREAD_FAIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to decode the next available record, and return it. The record will
|
||||
* also be returned to XLogNextRecord(), which must be called to 'consume'
|
||||
* each record.
|
||||
*
|
||||
* If nonblocking is true, may return NULL due to lack of data or WAL decoding
|
||||
* space.
|
||||
*/
|
||||
DecodedXLogRecord *
|
||||
XLogReadAhead(XLogReaderState *state, bool nonblocking)
|
||||
{
|
||||
XLogPageReadResult result;
|
||||
|
||||
if (state->errormsg_deferred)
|
||||
return NULL;
|
||||
|
||||
result = XLogDecodeNextRecord(state, nonblocking);
|
||||
if (result == XLREAD_SUCCESS)
|
||||
{
|
||||
Assert(state->decode_queue_tail != NULL);
|
||||
return state->decode_queue_tail;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@ -609,8 +954,14 @@ err:
|
||||
* Read a single xlog page including at least [pageptr, reqLen] of valid data
|
||||
* via the page_read() callback.
|
||||
*
|
||||
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
|
||||
* is set in that case (unless the error occurs in the page_read callback).
|
||||
* Returns XLREAD_FAIL if the required page cannot be read for some
|
||||
* reason; errormsg_buf is set in that case (unless the error occurs in the
|
||||
* page_read callback).
|
||||
*
|
||||
* Returns XLREAD_WOULDBLOCK if the requested data can't be read without
|
||||
* waiting. This can be returned only if the installed page_read callback
|
||||
* respects the state->nonblocking flag, and cannot read the requested data
|
||||
* immediately.
|
||||
*
|
||||
* We fetch the page from a reader-local cache if we know we have the required
|
||||
* data and if there hasn't been any error since caching the data.
|
||||
@ -652,7 +1003,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
|
||||
state->currRecPtr,
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
if (readLen == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readLen < 0)
|
||||
goto err;
|
||||
|
||||
/* we can be sure to have enough WAL available, we scrolled back */
|
||||
@ -670,7 +1023,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
|
||||
state->currRecPtr,
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
if (readLen == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readLen < 0)
|
||||
goto err;
|
||||
|
||||
Assert(readLen <= XLOG_BLCKSZ);
|
||||
@ -689,7 +1044,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
|
||||
state->currRecPtr,
|
||||
state->readBuf);
|
||||
if (readLen < 0)
|
||||
if (readLen == XLREAD_WOULDBLOCK)
|
||||
return XLREAD_WOULDBLOCK;
|
||||
else if (readLen < 0)
|
||||
goto err;
|
||||
}
|
||||
|
||||
@ -707,8 +1064,12 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
|
||||
return readLen;
|
||||
|
||||
err:
|
||||
XLogReaderInvalReadState(state);
|
||||
return -1;
|
||||
if (state->errormsg_buf[0] != '\0')
|
||||
{
|
||||
state->errormsg_deferred = true;
|
||||
XLogReaderInvalReadState(state);
|
||||
}
|
||||
return XLREAD_FAIL;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -987,6 +1348,9 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
|
||||
|
||||
Assert(!XLogRecPtrIsInvalid(RecPtr));
|
||||
|
||||
/* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
|
||||
state->nonblocking = false;
|
||||
|
||||
/*
|
||||
* skip over potential continuation data, keeping in mind that it may span
|
||||
* multiple pages
|
||||
@ -1187,34 +1551,83 @@ WALRead(XLogReaderState *state,
|
||||
* ----------------------------------------
|
||||
*/
|
||||
|
||||
/* private function to reset the state between records */
|
||||
/*
|
||||
* Private function to reset the state, forgetting all decoded records, if we
|
||||
* are asked to move to a new read position.
|
||||
*/
|
||||
static void
|
||||
ResetDecoder(XLogReaderState *state)
|
||||
{
|
||||
int block_id;
|
||||
DecodedXLogRecord *r;
|
||||
|
||||
state->decoded_record = NULL;
|
||||
|
||||
state->main_data_len = 0;
|
||||
|
||||
for (block_id = 0; block_id <= state->max_block_id; block_id++)
|
||||
/* Reset the decoded record queue, freeing any oversized records. */
|
||||
while ((r = state->decode_queue_head) != NULL)
|
||||
{
|
||||
state->blocks[block_id].in_use = false;
|
||||
state->blocks[block_id].has_image = false;
|
||||
state->blocks[block_id].has_data = false;
|
||||
state->blocks[block_id].apply_image = false;
|
||||
state->decode_queue_head = r->next;
|
||||
if (r->oversized)
|
||||
pfree(r);
|
||||
}
|
||||
state->max_block_id = -1;
|
||||
state->decode_queue_tail = NULL;
|
||||
state->decode_queue_head = NULL;
|
||||
state->record = NULL;
|
||||
|
||||
/* Reset the decode buffer to empty. */
|
||||
state->decode_buffer_tail = state->decode_buffer;
|
||||
state->decode_buffer_head = state->decode_buffer;
|
||||
|
||||
/* Clear error state. */
|
||||
state->errormsg_buf[0] = '\0';
|
||||
state->errormsg_deferred = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode the previously read record.
|
||||
* Compute the maximum possible amount of padding that could be required to
|
||||
* decode a record, given xl_tot_len from the record's header. This is the
|
||||
* amount of output buffer space that we need to decode a record, though we
|
||||
* might not finish up using it all.
|
||||
*
|
||||
* This computation is pessimistic and assumes the maximum possible number of
|
||||
* blocks, due to lack of better information.
|
||||
*/
|
||||
size_t
|
||||
DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
|
||||
{
|
||||
size_t size = 0;
|
||||
|
||||
/* Account for the fixed size part of the decoded record struct. */
|
||||
size += offsetof(DecodedXLogRecord, blocks[0]);
|
||||
/* Account for the flexible blocks array of maximum possible size. */
|
||||
size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
|
||||
/* Account for all the raw main and block data. */
|
||||
size += xl_tot_len;
|
||||
/* We might insert padding before main_data. */
|
||||
size += (MAXIMUM_ALIGNOF - 1);
|
||||
/* We might insert padding before each block's data. */
|
||||
size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
|
||||
/* We might insert padding at the end. */
|
||||
size += (MAXIMUM_ALIGNOF - 1);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode a record. "decoded" must point to a MAXALIGNed memory area that has
|
||||
* space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
|
||||
* success, decoded->size contains the actual space occupied by the decoded
|
||||
* record, which may turn out to be less.
|
||||
*
|
||||
* Only decoded->oversized member must be initialized already, and will not be
|
||||
* modified. Other members will be initialized as required.
|
||||
*
|
||||
* On error, a human-readable error message is returned in *errormsg, and
|
||||
* the return value is false.
|
||||
*/
|
||||
bool
|
||||
DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
DecodeXLogRecord(XLogReaderState *state,
|
||||
DecodedXLogRecord *decoded,
|
||||
XLogRecord *record,
|
||||
XLogRecPtr lsn,
|
||||
char **errormsg)
|
||||
{
|
||||
/*
|
||||
* read next _size bytes from record buffer, but check for overrun first.
|
||||
@ -1229,17 +1642,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
} while(0)
|
||||
|
||||
char *ptr;
|
||||
char *out;
|
||||
uint32 remaining;
|
||||
uint32 datatotal;
|
||||
RelFileNode *rnode = NULL;
|
||||
uint8 block_id;
|
||||
|
||||
ResetDecoder(state);
|
||||
|
||||
state->decoded_record = record;
|
||||
state->record_origin = InvalidRepOriginId;
|
||||
state->toplevel_xid = InvalidTransactionId;
|
||||
|
||||
decoded->header = *record;
|
||||
decoded->lsn = lsn;
|
||||
decoded->next = NULL;
|
||||
decoded->record_origin = InvalidRepOriginId;
|
||||
decoded->toplevel_xid = InvalidTransactionId;
|
||||
decoded->main_data = NULL;
|
||||
decoded->main_data_len = 0;
|
||||
decoded->max_block_id = -1;
|
||||
ptr = (char *) record;
|
||||
ptr += SizeOfXLogRecord;
|
||||
remaining = record->xl_tot_len - SizeOfXLogRecord;
|
||||
@ -1257,7 +1673,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
|
||||
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
|
||||
|
||||
state->main_data_len = main_data_len;
|
||||
decoded->main_data_len = main_data_len;
|
||||
datatotal += main_data_len;
|
||||
break; /* by convention, the main data fragment is
|
||||
* always last */
|
||||
@ -1268,18 +1684,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
uint32 main_data_len;
|
||||
|
||||
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
|
||||
state->main_data_len = main_data_len;
|
||||
decoded->main_data_len = main_data_len;
|
||||
datatotal += main_data_len;
|
||||
break; /* by convention, the main data fragment is
|
||||
* always last */
|
||||
}
|
||||
else if (block_id == XLR_BLOCK_ID_ORIGIN)
|
||||
{
|
||||
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
|
||||
COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
|
||||
}
|
||||
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
|
||||
{
|
||||
COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
|
||||
COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
|
||||
}
|
||||
else if (block_id <= XLR_MAX_BLOCK_ID)
|
||||
{
|
||||
@ -1287,7 +1703,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
DecodedBkpBlock *blk;
|
||||
uint8 fork_flags;
|
||||
|
||||
if (block_id <= state->max_block_id)
|
||||
/* mark any intervening block IDs as not in use */
|
||||
for (int i = decoded->max_block_id + 1; i < block_id; ++i)
|
||||
decoded->blocks[i].in_use = false;
|
||||
|
||||
if (block_id <= decoded->max_block_id)
|
||||
{
|
||||
report_invalid_record(state,
|
||||
"out-of-order block_id %u at %X/%X",
|
||||
@ -1295,9 +1715,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
LSN_FORMAT_ARGS(state->ReadRecPtr));
|
||||
goto err;
|
||||
}
|
||||
state->max_block_id = block_id;
|
||||
decoded->max_block_id = block_id;
|
||||
|
||||
blk = &state->blocks[block_id];
|
||||
blk = &decoded->blocks[block_id];
|
||||
blk->in_use = true;
|
||||
blk->apply_image = false;
|
||||
|
||||
@ -1440,17 +1860,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
/*
|
||||
* Ok, we've parsed the fragment headers, and verified that the total
|
||||
* length of the payload in the fragments is equal to the amount of data
|
||||
* left. Copy the data of each fragment to a separate buffer.
|
||||
*
|
||||
* We could just set up pointers into readRecordBuf, but we want to align
|
||||
* the data for the convenience of the callers. Backup images are not
|
||||
* copied, however; they don't need alignment.
|
||||
* left. Copy the data of each fragment to contiguous space after the
|
||||
* blocks array, inserting alignment padding before the data fragments so
|
||||
* they can be cast to struct pointers by REDO routines.
|
||||
*/
|
||||
out = ((char *) decoded) +
|
||||
offsetof(DecodedXLogRecord, blocks) +
|
||||
sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
|
||||
|
||||
/* block data first */
|
||||
for (block_id = 0; block_id <= state->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
|
||||
{
|
||||
DecodedBkpBlock *blk = &state->blocks[block_id];
|
||||
DecodedBkpBlock *blk = &decoded->blocks[block_id];
|
||||
|
||||
if (!blk->in_use)
|
||||
continue;
|
||||
@ -1459,58 +1880,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
|
||||
|
||||
if (blk->has_image)
|
||||
{
|
||||
blk->bkp_image = ptr;
|
||||
/* no need to align image */
|
||||
blk->bkp_image = out;
|
||||
memcpy(out, ptr, blk->bimg_len);
|
||||
ptr += blk->bimg_len;
|
||||
out += blk->bimg_len;
|
||||
}
|
||||
if (blk->has_data)
|
||||
{
|
||||
if (!blk->data || blk->data_len > blk->data_bufsz)
|
||||
{
|
||||
if (blk->data)
|
||||
pfree(blk->data);
|
||||
|
||||
/*
|
||||
* Force the initial request to be BLCKSZ so that we don't
|
||||
* waste time with lots of trips through this stanza as a
|
||||
* result of WAL compression.
|
||||
*/
|
||||
blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
|
||||
blk->data = palloc(blk->data_bufsz);
|
||||
}
|
||||
out = (char *) MAXALIGN(out);
|
||||
blk->data = out;
|
||||
memcpy(blk->data, ptr, blk->data_len);
|
||||
ptr += blk->data_len;
|
||||
out += blk->data_len;
|
||||
}
|
||||
}
|
||||
|
||||
/* and finally, the main data */
|
||||
if (state->main_data_len > 0)
|
||||
if (decoded->main_data_len > 0)
|
||||
{
|
||||
if (!state->main_data || state->main_data_len > state->main_data_bufsz)
|
||||
{
|
||||
if (state->main_data)
|
||||
pfree(state->main_data);
|
||||
|
||||
/*
|
||||
* main_data_bufsz must be MAXALIGN'ed. In many xlog record
|
||||
* types, we omit trailing struct padding on-disk to save a few
|
||||
* bytes; but compilers may generate accesses to the xlog struct
|
||||
* that assume that padding bytes are present. If the palloc
|
||||
* request is not large enough to include such padding bytes then
|
||||
* we'll get valgrind complaints due to otherwise-harmless fetches
|
||||
* of the padding bytes.
|
||||
*
|
||||
* In addition, force the initial request to be reasonably large
|
||||
* so that we don't waste time with lots of trips through this
|
||||
* stanza. BLCKSZ / 2 seems like a good compromise choice.
|
||||
*/
|
||||
state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
|
||||
BLCKSZ / 2));
|
||||
state->main_data = palloc(state->main_data_bufsz);
|
||||
}
|
||||
memcpy(state->main_data, ptr, state->main_data_len);
|
||||
ptr += state->main_data_len;
|
||||
out = (char *) MAXALIGN(out);
|
||||
decoded->main_data = out;
|
||||
memcpy(decoded->main_data, ptr, decoded->main_data_len);
|
||||
ptr += decoded->main_data_len;
|
||||
out += decoded->main_data_len;
|
||||
}
|
||||
|
||||
/* Report the actual size we used. */
|
||||
decoded->size = MAXALIGN(out - (char *) decoded);
|
||||
Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
|
||||
decoded->size);
|
||||
|
||||
return true;
|
||||
|
||||
shortdata_err:
|
||||
@ -1536,10 +1936,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
|
||||
{
|
||||
DecodedBkpBlock *bkpb;
|
||||
|
||||
if (!record->blocks[block_id].in_use)
|
||||
if (block_id > record->record->max_block_id ||
|
||||
!record->record->blocks[block_id].in_use)
|
||||
return false;
|
||||
|
||||
bkpb = &record->blocks[block_id];
|
||||
bkpb = &record->record->blocks[block_id];
|
||||
if (rnode)
|
||||
*rnode = bkpb->rnode;
|
||||
if (forknum)
|
||||
@ -1559,10 +1960,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
|
||||
{
|
||||
DecodedBkpBlock *bkpb;
|
||||
|
||||
if (!record->blocks[block_id].in_use)
|
||||
if (block_id > record->record->max_block_id ||
|
||||
!record->record->blocks[block_id].in_use)
|
||||
return NULL;
|
||||
|
||||
bkpb = &record->blocks[block_id];
|
||||
bkpb = &record->record->blocks[block_id];
|
||||
|
||||
if (!bkpb->has_data)
|
||||
{
|
||||
@ -1590,12 +1992,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
|
||||
char *ptr;
|
||||
PGAlignedBlock tmp;
|
||||
|
||||
if (!record->blocks[block_id].in_use)
|
||||
if (block_id > record->record->max_block_id ||
|
||||
!record->record->blocks[block_id].in_use)
|
||||
return false;
|
||||
if (!record->blocks[block_id].has_image)
|
||||
if (!record->record->blocks[block_id].has_image)
|
||||
return false;
|
||||
|
||||
bkpb = &record->blocks[block_id];
|
||||
bkpb = &record->record->blocks[block_id];
|
||||
ptr = bkpb->bkp_image;
|
||||
|
||||
if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
|
||||
|
@ -2139,7 +2139,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
|
||||
int block_id;
|
||||
|
||||
/* decode block references */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
RelFileNode rnode;
|
||||
ForkNumber forknum;
|
||||
@ -2271,7 +2271,7 @@ verifyBackupPageConsistency(XLogReaderState *record)
|
||||
|
||||
Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
|
||||
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
Buffer buf;
|
||||
Page page;
|
||||
|
@ -370,7 +370,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
|
||||
* going to initialize it. And vice versa.
|
||||
*/
|
||||
zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
|
||||
willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
|
||||
willinit = (XLogRecGetBlock(record, block_id)->flags & BKPBLOCK_WILL_INIT) != 0;
|
||||
if (willinit && !zeromode)
|
||||
elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
|
||||
if (!willinit && zeromode)
|
||||
|
@ -111,7 +111,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
|
||||
{
|
||||
ReorderBufferAssignChild(ctx->reorder,
|
||||
txid,
|
||||
record->decoded_record->xl_xid,
|
||||
XLogRecGetXid(record),
|
||||
buf.origptr);
|
||||
}
|
||||
|
||||
|
@ -432,7 +432,7 @@ extractPageInfo(XLogReaderState *record)
|
||||
RmgrNames[rmid], info);
|
||||
}
|
||||
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
RelFileNode rnode;
|
||||
ForkNumber forknum;
|
||||
|
@ -403,14 +403,13 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len)
|
||||
* Calculate the amount of FPI data in the record.
|
||||
*
|
||||
* XXX: We peek into xlogreader's private decoded backup blocks for the
|
||||
* bimg_len indicating the length of FPI data. It doesn't seem worth it to
|
||||
* add an accessor macro for this.
|
||||
* bimg_len indicating the length of FPI data.
|
||||
*/
|
||||
*fpi_len = 0;
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
if (XLogRecHasBlockImage(record, block_id))
|
||||
*fpi_len += record->blocks[block_id].bimg_len;
|
||||
*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -508,7 +507,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
|
||||
if (!config->bkp_details)
|
||||
{
|
||||
/* print block references (short format) */
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
if (!XLogRecHasBlockRef(record, block_id))
|
||||
continue;
|
||||
@ -539,7 +538,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
|
||||
{
|
||||
/* print block references (detailed format) */
|
||||
putchar('\n');
|
||||
for (block_id = 0; block_id <= record->max_block_id; block_id++)
|
||||
for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
|
||||
{
|
||||
if (!XLogRecHasBlockRef(record, block_id))
|
||||
continue;
|
||||
@ -552,7 +551,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
|
||||
blk);
|
||||
if (XLogRecHasBlockImage(record, block_id))
|
||||
{
|
||||
uint8 bimg_info = record->blocks[block_id].bimg_info;
|
||||
uint8 bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
|
||||
|
||||
if (BKPIMAGE_COMPRESSED(bimg_info))
|
||||
{
|
||||
@ -571,11 +570,11 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
|
||||
"compression saved: %u, method: %s",
|
||||
XLogRecBlockImageApply(record, block_id) ?
|
||||
"" : " for WAL verification",
|
||||
record->blocks[block_id].hole_offset,
|
||||
record->blocks[block_id].hole_length,
|
||||
XLogRecGetBlock(record, block_id)->hole_offset,
|
||||
XLogRecGetBlock(record, block_id)->hole_length,
|
||||
BLCKSZ -
|
||||
record->blocks[block_id].hole_length -
|
||||
record->blocks[block_id].bimg_len,
|
||||
XLogRecGetBlock(record, block_id)->hole_length -
|
||||
XLogRecGetBlock(record, block_id)->bimg_len,
|
||||
method);
|
||||
}
|
||||
else
|
||||
@ -583,8 +582,8 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
|
||||
printf(" (FPW%s); hole: offset: %u, length: %u",
|
||||
XLogRecBlockImageApply(record, block_id) ?
|
||||
"" : " for WAL verification",
|
||||
record->blocks[block_id].hole_offset,
|
||||
record->blocks[block_id].hole_length);
|
||||
XLogRecGetBlock(record, block_id)->hole_offset,
|
||||
XLogRecGetBlock(record, block_id)->hole_length);
|
||||
}
|
||||
}
|
||||
putchar('\n');
|
||||
|
@ -144,6 +144,30 @@ typedef struct
|
||||
uint16 data_bufsz;
|
||||
} DecodedBkpBlock;
|
||||
|
||||
/*
|
||||
* The decoded contents of a record. This occupies a contiguous region of
|
||||
* memory, with main_data and blocks[n].data pointing to memory after the
|
||||
* members declared here.
|
||||
*/
|
||||
typedef struct DecodedXLogRecord
|
||||
{
|
||||
/* Private member used for resource management. */
|
||||
size_t size; /* total size of decoded record */
|
||||
bool oversized; /* outside the regular decode buffer? */
|
||||
struct DecodedXLogRecord *next; /* decoded record queue link */
|
||||
|
||||
/* Public members. */
|
||||
XLogRecPtr lsn; /* location */
|
||||
XLogRecPtr next_lsn; /* location of next record */
|
||||
XLogRecord header; /* header */
|
||||
RepOriginId record_origin;
|
||||
TransactionId toplevel_xid; /* XID of top-level transaction */
|
||||
char *main_data; /* record's main data portion */
|
||||
uint32 main_data_len; /* main data portion's length */
|
||||
int max_block_id; /* highest block_id in use (-1 if none) */
|
||||
DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
|
||||
} DecodedXLogRecord;
|
||||
|
||||
struct XLogReaderState
|
||||
{
|
||||
/*
|
||||
@ -171,6 +195,9 @@ struct XLogReaderState
|
||||
* Start and end point of last record read. EndRecPtr is also used as the
|
||||
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the
|
||||
* starting position and ReadRecPtr to invalid.
|
||||
*
|
||||
* Start and end point of last record returned by XLogReadRecord(). These
|
||||
* are also available as record->lsn and record->next_lsn.
|
||||
*/
|
||||
XLogRecPtr ReadRecPtr; /* start of last record read */
|
||||
XLogRecPtr EndRecPtr; /* end+1 of last record read */
|
||||
@ -192,27 +219,43 @@ struct XLogReaderState
|
||||
* Use XLogRecGet* functions to investigate the record; these fields
|
||||
* should not be accessed directly.
|
||||
* ----------------------------------------
|
||||
* Start and end point of the last record read and decoded by
|
||||
* XLogReadRecordInternal(). NextRecPtr is also used as the position to
|
||||
* decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
|
||||
* the requested starting position.
|
||||
*/
|
||||
XLogRecord *decoded_record; /* currently decoded record */
|
||||
XLogRecPtr DecodeRecPtr; /* start of last record decoded */
|
||||
XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
|
||||
XLogRecPtr PrevRecPtr; /* start of previous record decoded */
|
||||
|
||||
char *main_data; /* record's main data portion */
|
||||
uint32 main_data_len; /* main data portion's length */
|
||||
uint32 main_data_bufsz; /* allocated size of the buffer */
|
||||
|
||||
RepOriginId record_origin;
|
||||
|
||||
TransactionId toplevel_xid; /* XID of top-level transaction */
|
||||
|
||||
/* information about blocks referenced by the record. */
|
||||
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
|
||||
|
||||
int max_block_id; /* highest block_id in use (-1 if none) */
|
||||
/* Last record returned by XLogReadRecord(). */
|
||||
DecodedXLogRecord *record;
|
||||
|
||||
/* ----------------------------------------
|
||||
* private/internal state
|
||||
* ----------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Buffer for decoded records. This is a circular buffer, though
|
||||
* individual records can't be split in the middle, so some space is often
|
||||
* wasted at the end. Oversized records that don't fit in this space are
|
||||
* allocated separately.
|
||||
*/
|
||||
char *decode_buffer;
|
||||
size_t decode_buffer_size;
|
||||
bool free_decode_buffer; /* need to free? */
|
||||
char *decode_buffer_head; /* data is read from the head */
|
||||
char *decode_buffer_tail; /* new data is written at the tail */
|
||||
|
||||
/*
|
||||
* Queue of records that have been decoded. This is a linked list that
|
||||
* usually consists of consecutive records in decode_buffer, but may also
|
||||
* contain oversized records allocated with palloc().
|
||||
*/
|
||||
DecodedXLogRecord *decode_queue_head; /* oldest decoded record */
|
||||
DecodedXLogRecord *decode_queue_tail; /* newest decoded record */
|
||||
|
||||
/*
|
||||
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
|
||||
* readLen bytes)
|
||||
@ -262,8 +305,24 @@ struct XLogReaderState
|
||||
|
||||
/* Buffer to hold error message */
|
||||
char *errormsg_buf;
|
||||
bool errormsg_deferred;
|
||||
|
||||
/*
|
||||
* Flag to indicate to XLogPageReadCB that it should not block waiting for
|
||||
* data.
|
||||
*/
|
||||
bool nonblocking;
|
||||
};
|
||||
|
||||
/*
|
||||
* Check if XLogNextRecord() has any more queued records or an error to return.
|
||||
*/
|
||||
static inline bool
|
||||
XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
|
||||
{
|
||||
return (state->decode_queue_head != NULL) || state->errormsg_deferred;
|
||||
}
|
||||
|
||||
/* Get a new XLogReader */
|
||||
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
|
||||
const char *waldir,
|
||||
@ -274,16 +333,40 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
|
||||
/* Free an XLogReader */
|
||||
extern void XLogReaderFree(XLogReaderState *state);
|
||||
|
||||
/* Optionally provide a circular decoding buffer to allow readahead. */
|
||||
extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
|
||||
void *buffer,
|
||||
size_t size);
|
||||
|
||||
/* Position the XLogReader to given record */
|
||||
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
|
||||
#ifdef FRONTEND
|
||||
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
|
||||
#endif /* FRONTEND */
|
||||
|
||||
/* Return values from XLogPageReadCB. */
|
||||
typedef enum XLogPageReadResult
|
||||
{
|
||||
XLREAD_SUCCESS = 0, /* record is successfully read */
|
||||
XLREAD_FAIL = -1, /* failed during reading a record */
|
||||
XLREAD_WOULDBLOCK = -2 /* nonblocking mode only, no data */
|
||||
} XLogPageReadResult;
|
||||
|
||||
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
|
||||
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
|
||||
char **errormsg);
|
||||
|
||||
/* Consume the next record or error. */
|
||||
extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
|
||||
char **errormsg);
|
||||
|
||||
/* Release the previously returned record, if necessary. */
|
||||
extern void XLogReleasePreviousRecord(XLogReaderState *state);
|
||||
|
||||
/* Try to read ahead, if there is data and space. */
|
||||
extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,
|
||||
bool nonblocking);
|
||||
|
||||
/* Validate a page */
|
||||
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
|
||||
XLogRecPtr recptr, char *phdr);
|
||||
@ -307,25 +390,36 @@ extern bool WALRead(XLogReaderState *state,
|
||||
|
||||
/* Functions for decoding an XLogRecord */
|
||||
|
||||
extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
|
||||
extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
|
||||
extern bool DecodeXLogRecord(XLogReaderState *state,
|
||||
DecodedXLogRecord *decoded,
|
||||
XLogRecord *record,
|
||||
XLogRecPtr lsn,
|
||||
char **errmsg);
|
||||
|
||||
#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len)
|
||||
#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev)
|
||||
#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
|
||||
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
|
||||
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
|
||||
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
|
||||
#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
|
||||
#define XLogRecGetData(decoder) ((decoder)->main_data)
|
||||
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
|
||||
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
|
||||
#define XLogRecHasBlockRef(decoder, block_id) \
|
||||
((decoder)->blocks[block_id].in_use)
|
||||
#define XLogRecHasBlockImage(decoder, block_id) \
|
||||
((decoder)->blocks[block_id].has_image)
|
||||
#define XLogRecBlockImageApply(decoder, block_id) \
|
||||
((decoder)->blocks[block_id].apply_image)
|
||||
/*
|
||||
* Macros that provide access to parts of the record most recently returned by
|
||||
* XLogReadRecord() or XLogNextRecord().
|
||||
*/
|
||||
#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
|
||||
#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
|
||||
#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
|
||||
#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
|
||||
#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
|
||||
#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
|
||||
#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
|
||||
#define XLogRecGetData(decoder) ((decoder)->record->main_data)
|
||||
#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
|
||||
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
|
||||
#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
|
||||
#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
|
||||
#define XLogRecHasBlockRef(decoder, block_id) \
|
||||
(((decoder)->record->max_block_id >= (block_id)) && \
|
||||
((decoder)->record->blocks[block_id].in_use))
|
||||
#define XLogRecHasBlockImage(decoder, block_id) \
|
||||
((decoder)->record->blocks[block_id].has_image)
|
||||
#define XLogRecBlockImageApply(decoder, block_id) \
|
||||
((decoder)->record->blocks[block_id].apply_image)
|
||||
|
||||
#ifndef FRONTEND
|
||||
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);
|
||||
|
@ -533,6 +533,7 @@ DeadLockState
|
||||
DeallocateStmt
|
||||
DeclareCursorStmt
|
||||
DecodedBkpBlock
|
||||
DecodedXLogRecord
|
||||
DecodingOutputState
|
||||
DefElem
|
||||
DefElemAction
|
||||
@ -2941,6 +2942,7 @@ XLogPageHeader
|
||||
XLogPageHeaderData
|
||||
XLogPageReadCB
|
||||
XLogPageReadPrivate
|
||||
XLogPageReadResult
|
||||
XLogReaderRoutine
|
||||
XLogReaderState
|
||||
XLogRecData
|
||||
|
Loading…
Reference in New Issue
Block a user