Fix two distinct errors in creation of GIN_INSERT_LISTPAGE xlog records.

In practice these mistakes were always masked when full_page_writes was on,
because XLogInsert would always choose to log the full page, and then
ginRedoInsertListPage wouldn't try to do anything.  But with full_page_writes
off a WAL replay failure was certain.

The GIN_INSERT_LISTPAGE record type could probably be eliminated entirely
in favor of using XLOG_HEAP_NEWPAGE, but I refrained from doing that now
since it would have required a significantly more invasive patch.

In passing do a little bit of code cleanup, including making the accounting
for free space on GIN list pages more precise.  (This wasn't a bug as the
errors were always in the conservative direction.)

Per report from Simon.  Back-patch to 8.4 which contains the identical code.
This commit is contained in:
Tom Lane 2009-09-15 20:31:30 +00:00
parent f3ef948592
commit 384cad5c7b

View File

@ -11,7 +11,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.3 2009/06/11 14:48:53 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.4 2009/09/15 20:31:30 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -41,13 +41,15 @@ typedef struct DatumArray
/*
* Build a pending-list page from the given array of tuples, and write it out.
*
* Returns amount of free space left on the page.
*/
static int32
writeListPage(Relation index, Buffer buffer,
IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
{
Page page = BufferGetPage(buffer);
int i,
int32 i,
freesize,
size = 0;
OffsetNumber l,
@ -100,8 +102,6 @@ writeListPage(Relation index, Buffer buffer,
GinPageGetOpaque(page)->maxoff = 0;
}
freesize = PageGetFreeSpace(page);
MarkBufferDirty(buffer);
if (!index->rd_istemp)
@ -110,26 +110,30 @@ writeListPage(Relation index, Buffer buffer,
ginxlogInsertListPage data;
XLogRecPtr recptr;
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
data.node = index->rd_node;
data.blkno = BufferGetBlockNumber(buffer);
data.rightlink = rightlink;
data.ntuples = ntuples;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = sizeof(ginxlogInsertListPage);
rdata[0].next = rdata + 1;
rdata[1].buffer = InvalidBuffer;
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
rdata[1].data = workspace;
rdata[1].len = size;
rdata[1].next = NULL;
data.blkno = BufferGetBlockNumber(buffer);
data.rightlink = rightlink;
data.ntuples = ntuples;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
/* get free space before releasing buffer */
freesize = PageGetExactFreeSpace(page);
UnlockReleaseBuffer(buffer);
END_CRIT_SECTION();
@ -165,7 +169,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
{
res->nPendingPages++;
writeListPage(index, prevBuffer,
tuples + startTuple, i - startTuple,
tuples + startTuple,
i - startTuple,
BufferGetBlockNumber(curBuffer));
}
else
@ -180,7 +185,7 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
if (size + tupsize >= GinListPageSize)
if (size + tupsize > GinListPageSize)
{
/* won't fit, force a new page and reprocess */
i--;
@ -197,7 +202,8 @@ makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
*/
res->tail = BufferGetBlockNumber(curBuffer);
res->tailFreeSize = writeListPage(index, curBuffer,
tuples + startTuple, ntuples - startTuple,
tuples + startTuple,
ntuples - startTuple,
InvalidBlockNumber);
res->nPendingPages++;
/* that was only one heap tuple */
@ -237,7 +243,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
metapage = BufferGetPage(metabuffer);
if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE)
if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
{
/*
* Total size is greater than one page => make sublist
@ -265,13 +271,12 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
if (separateList)
{
GinMetaPageData sublist;
/*
* We should make sublist separately and append it to the tail
*/
memset(&sublist, 0, sizeof(GinMetaPageData));
GinMetaPageData sublist;
memset(&sublist, 0, sizeof(GinMetaPageData));
makeSublist(index, collector->tuples, collector->ntuples, &sublist);
/*
@ -283,45 +288,44 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
if (metadata->head == InvalidBlockNumber)
{
/*
* Sublist becomes main list
* Main list is empty, so just copy sublist into main list
*/
START_CRIT_SECTION();
memcpy(metadata, &sublist, sizeof(GinMetaPageData));
memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData));
}
else
{
/*
* merge lists
* Merge lists
*/
data.prevTail = metadata->tail;
data.newRightlink = sublist.head;
buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
START_CRIT_SECTION();
GinPageGetOpaque(page)->rightlink = sublist.head;
MarkBufferDirty(buffer);
metadata->tail = sublist.tail;
metadata->tailFreeSize = sublist.tailFreeSize;
metadata->nPendingPages += sublist.nPendingPages;
metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
data.newRightlink = sublist.head;
MarkBufferDirty(buffer);
}
}
else
{
/*
* Insert into tail page, metapage is already locked
* Insert into tail page. Metapage is already locked
*/
OffsetNumber l,
off;
int i,
@ -331,6 +335,7 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
buffer = ReadBuffer(index, metadata->tail);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
@ -368,20 +373,24 @@ ginHeapTupleFastInsert(Relation index, GinState *ginstate,
off++;
}
metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData);
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
Assert((ptr - rdata[1].data) <= collector->sumsize);
metadata->tailFreeSize = PageGetExactFreeSpace(page);
MarkBufferDirty(buffer);
}
/*
* Make real write
* Write metabuffer, make xlog entry
*/
MarkBufferDirty(metabuffer);
if (!index->rd_istemp)
{
XLogRecPtr recptr;
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
PageSetLSN(metapage, recptr);
PageSetTLI(metapage, ThisTimeLineID);
@ -552,7 +561,6 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
metadata->nPendingPages = 0;
metadata->nPendingHeapTuples = 0;
}
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
MarkBufferDirty(metabuffer);
@ -567,6 +575,8 @@ shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
{
XLogRecPtr recptr;
memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
PageSetLSN(metapage, recptr);
PageSetTLI(metapage, ThisTimeLineID);