Bug fixes for GiST crash recovery.

- add forgotten check of lsn for insert completion
- remove level of pages: hard to check in recovery
- some cleanups
This commit is contained in:
Teodor Sigaev 2005-06-30 17:52:14 +00:00
parent 7a30b1fb96
commit 898a7bd13b
6 changed files with 97 additions and 67 deletions

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.124 2005/06/29 14:06:14 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.125 2005/06/30 17:52:13 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -587,7 +587,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
* Should have the same interface as XLogReadBuffer * Should have the same interface as XLogReadBuffer
*/ */
static Buffer static Buffer
gistReadAndLockBuffer( bool unused, Relation r, BlockNumber blkno ) { gistReadAndLockBuffer( Relation r, BlockNumber blkno ) {
Buffer buffer = ReadBuffer( r, blkno ); Buffer buffer = ReadBuffer( r, blkno );
LockBuffer( buffer, GIST_SHARE ); LockBuffer( buffer, GIST_SHARE );
return buffer; return buffer;
@ -601,7 +601,7 @@ gistReadAndLockBuffer( bool unused, Relation r, BlockNumber blkno ) {
* returns from the begining of closest parent; * returns from the begining of closest parent;
*/ */
GISTInsertStack* GISTInsertStack*
gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ) { gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(Relation, BlockNumber) ) {
Page page; Page page;
Buffer buffer; Buffer buffer;
OffsetNumber i, maxoff; OffsetNumber i, maxoff;
@ -614,9 +614,15 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relat
top->blkno = GIST_ROOT_BLKNO; top->blkno = GIST_ROOT_BLKNO;
while( top && top->blkno != child ) { while( top && top->blkno != child ) {
buffer = myReadBuffer(false, r, top->blkno); /* buffer locked */ buffer = myReadBuffer(r, top->blkno); /* buffer locked */
page = (Page)BufferGetPage( buffer ); page = (Page)BufferGetPage( buffer );
Assert( !GistPageIsLeaf(page) );
if ( GistPageIsLeaf(page) ) {
/* we can safety go away, follows only leaf pages */
LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer( buffer );
return NULL;
}
top->lsn = PageGetLSN(page); top->lsn = PageGetLSN(page);
@ -662,7 +668,7 @@ gistFindPath( Relation r, BlockNumber child, Buffer (*myReadBuffer)(bool, Relat
LockBuffer( buffer, GIST_UNLOCK ); LockBuffer( buffer, GIST_UNLOCK );
ReleaseBuffer( buffer ); ReleaseBuffer( buffer );
return top; return top;
} else if ( GistPageGetOpaque(page)->level> 0 ) { } else {
/* Install next inner page to the end of stack */ /* Install next inner page to the end of stack */
ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) ); ptr = (GISTInsertStack*)palloc0( sizeof(GISTInsertStack) );
ptr->blkno = blkno; ptr->blkno = blkno;
@ -855,11 +861,9 @@ gistSplit(Relation r,
OffsetNumber *realoffset; OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup; IndexTuple *cleaneditup = itup;
int lencleaneditup = *len; int lencleaneditup = *len;
int level;
p = (Page) BufferGetPage(buffer); p = (Page) BufferGetPage(buffer);
opaque = GistPageGetOpaque(p); opaque = GistPageGetOpaque(p);
level = opaque->level;
/* /*
* The root of the tree is the first block in the relation. If we're * The root of the tree is the first block in the relation. If we're
@ -872,7 +876,6 @@ gistSplit(Relation r,
GISTInitBuffer(leftbuf, opaque->flags&F_LEAF); GISTInitBuffer(leftbuf, opaque->flags&F_LEAF);
lbknum = BufferGetBlockNumber(leftbuf); lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf); left = (Page) BufferGetPage(leftbuf);
GistPageGetOpaque(left)->level = level;
} }
else else
{ {
@ -886,7 +889,6 @@ gistSplit(Relation r,
GISTInitBuffer(rightbuf, opaque->flags&F_LEAF); GISTInitBuffer(rightbuf, opaque->flags&F_LEAF);
rbknum = BufferGetBlockNumber(rightbuf); rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf); right = (Page) BufferGetPage(rightbuf);
GistPageGetOpaque(right)->level = level;
/* generate the item array */ /* generate the item array */
realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
@ -1068,13 +1070,10 @@ void
gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
{ {
Page page; Page page;
int level;
Assert( BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO ); Assert( BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO );
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
level = GistPageGetOpaque(page)->level;
GISTInitBuffer(buffer, 0); GISTInitBuffer(buffer, 0);
GistPageGetOpaque(page)->level = level+1;
gistfillbuffer(r, page, itup, len, FirstOffsetNumber); gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
if ( !r->rd_istemp ) { if ( !r->rd_istemp ) {

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.4 2005/06/28 15:51:00 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.5 2005/06/30 17:52:14 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
@ -809,8 +809,6 @@ GISTInitBuffer(Buffer b, uint32 f)
opaque = GistPageGetOpaque(page); opaque = GistPageGetOpaque(page);
opaque->flags = f; opaque->flags = f;
opaque->nsplited = 0;
opaque->level = 0;
opaque->rightlink = InvalidBlockNumber; opaque->rightlink = InvalidBlockNumber;
memset( &(opaque->nsn), 0, sizeof(GistNSN) ); memset( &(opaque->nsn), 0, sizeof(GistNSN) );
} }

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.5 2005/06/29 14:06:14 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.6 2005/06/30 17:52:14 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -60,7 +60,6 @@ gistVacuumUpdate( GistVacuum *gv, BlockNumber blkno, bool needunion ) {
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
if ( GistPageIsLeaf(page) ) { if ( GistPageIsLeaf(page) ) {
if ( GistTuplesDeleted(page) ) { if ( GistTuplesDeleted(page) ) {
needunion = needwrite = true; needunion = needwrite = true;

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.5 2005/06/28 15:51:00 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.6 2005/06/30 17:52:14 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
@ -44,6 +44,7 @@ typedef struct {
typedef struct gistIncompleteInsert { typedef struct gistIncompleteInsert {
RelFileNode node; RelFileNode node;
BlockNumber origblkno; /* for splits */
ItemPointerData key; ItemPointerData key;
int lenblk; int lenblk;
BlockNumber *blkno; BlockNumber *blkno;
@ -79,6 +80,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
ninsert->lenblk = lenblk; ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk); memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
ninsert->origblkno = *blkno;
} else { } else {
int i; int i;
@ -87,6 +89,7 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
for(i=0;i<ninsert->lenblk;i++) for(i=0;i<ninsert->lenblk;i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno; ninsert->blkno[i] = xlinfo->page[i].header->blkno;
ninsert->origblkno = xlinfo->data->origblkno;
} }
Assert( ninsert->lenblk>0 ); Assert( ninsert->lenblk>0 );
@ -209,6 +212,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer); WriteBuffer(buffer);
@ -466,81 +470,98 @@ gist_form_invalid_tuple(BlockNumber blkno) {
return tuple; return tuple;
} }
static Buffer
gistXLogReadAndLockBuffer( Relation r, BlockNumber blkno ) {
Buffer buffer = XLogReadBuffer( false, r, blkno );
if (!BufferIsValid(buffer))
elog(PANIC, "gistXLogReadAndLockBuffer: block %u unfound", blkno);
if ( PageIsNew( (PageHeader)(BufferGetPage(buffer)) ) )
elog(PANIC, "gistXLogReadAndLockBuffer: uninitialized page %u", blkno);
return buffer;
}
static void static void
gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) { gixtxlogFindPath( Relation index, gistIncompleteInsert *insert ) {
int i;
GISTInsertStack *top; GISTInsertStack *top;
insert->pathlen = 0; insert->pathlen = 0;
insert->path = NULL; insert->path = NULL;
for(i=0;insert->lenblk;i++) { if ( (top=gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL ) {
if ( (top=gistFindPath(index, insert->blkno[i], XLogReadBuffer)) != NULL ) { int i;
GISTInsertStack *ptr=top; GISTInsertStack *ptr=top;
while(ptr) { while(ptr) {
insert->pathlen++; insert->pathlen++;
ptr = ptr->parent; ptr = ptr->parent;
}
insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
i=0;
ptr = top;
while(ptr) {
insert->path[i] = ptr->blkno;
i++;
ptr = ptr->parent;
}
break;
} }
}
insert->path=(BlockNumber*)palloc( sizeof(BlockNumber) * insert->pathlen );
i=0;
ptr = top;
while(ptr) {
insert->path[i] = ptr->blkno;
i++;
ptr = ptr->parent;
}
} else
elog(LOG, "gixtxlogFindPath: lost parent for block %u", insert->origblkno);
} }
static void static void
gistContinueInsert(gistIncompleteInsert *insert) { gistContinueInsert(gistIncompleteInsert *insert) {
IndexTuple *itup; IndexTuple *itup;
int i, lenitup; int i, lenitup;
MemoryContext oldCxt;
Relation index; Relation index;
oldCxt = MemoryContextSwitchTo(opCtx);
index = XLogOpenRelation(insert->node); index = XLogOpenRelation(insert->node);
if (!RelationIsValid(index)) if (!RelationIsValid(index))
return; return;
elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
/* needed vector itup never will be more than initial lenblkno+2, /* needed vector itup never will be more than initial lenblkno+2,
because during this processing Indextuple can be only smaller */ because during this processing Indextuple can be only smaller */
lenitup = insert->lenblk; lenitup = insert->lenblk;
itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/)); itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
for(i=0;i<insert->lenblk;i++) for(i=0;i<insert->lenblk;i++)
itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
/* construct path */ if ( insert->origblkno==GIST_ROOT_BLKNO ) {
gixtxlogFindPath( index, insert ); /*it was split root, so we should only make new root.
it can't be simple insert into root, look at call
if ( insert->pathlen==0 ) { pushIncompleteInsert in gistRedoPageSplitRecord */
/*it was split root, so we should only make new root*/
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
Page page; Page page;
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
elog(PANIC, "gistContinueInsert: root block unfound"); elog(PANIC, "gistContinueInsert: root block unfound");
page = BufferGetPage(buffer);
if (XLByteLE(insert->lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, 0); GISTInitBuffer(buffer, 0);
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
PageSetLSN(page, insert->lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer); WriteBuffer(buffer);
} else { } else {
Buffer *buffers; Buffer *buffers;
Page *pages; Page *pages;
int numbuffer; int numbuffer;
/* construct path */
gixtxlogFindPath( index, insert );
Assert( insert->pathlen > 0 );
buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) ); buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) ); pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
@ -555,6 +576,12 @@ gistContinueInsert(gistIncompleteInsert *insert) {
if ( PageIsNew((PageHeader)(pages[numbuffer-1])) ) if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
elog(PANIC, "gistContinueInsert: uninitialized page"); elog(PANIC, "gistContinueInsert: uninitialized page");
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer-1]))) {
LockBuffer(buffers[numbuffer-1], BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffers[numbuffer-1]);
return;
}
pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]); pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
/* remove old IndexTuples */ /* remove old IndexTuples */
@ -587,9 +614,10 @@ gistContinueInsert(gistIncompleteInsert *insert) {
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) { if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
IndexTuple *parentitup; IndexTuple *parentitup;
/* we split root, just copy tuples from old root to new page */
parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen); parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
/* we split root, just copy tuples from old root to new page */ /* sanity check */
if ( i+1 != insert->pathlen ) if ( i+1 != insert->pathlen )
elog(PANIC,"gistContinueInsert: can't restore index '%s'", elog(PANIC,"gistContinueInsert: can't restore index '%s'",
RelationGetRelationName( index )); RelationGetRelationName( index ));
@ -624,14 +652,15 @@ gistContinueInsert(gistIncompleteInsert *insert) {
itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) ); itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
PageSetLSN(pages[j], insert->lsn); PageSetLSN(pages[j], insert->lsn);
PageSetTLI(pages[j], ThisTimeLineID); PageSetTLI(pages[j], ThisTimeLineID);
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK); LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
WriteBuffer( buffers[j] ); WriteBuffer( buffers[j] );
} }
} }
} }
MemoryContextSwitchTo(oldCxt); elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
MemoryContextReset(opCtx); insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
} }
void void
@ -648,11 +677,22 @@ gist_xlog_startup(void) {
void void
gist_xlog_cleanup(void) { gist_xlog_cleanup(void) {
ListCell *l; ListCell *l;
List *reverse=NIL;
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
foreach(l, incomplete_inserts) { /* we should call gistContinueInsert in reverse order */
foreach(l, incomplete_inserts)
reverse = lappend(reverse, lfirst(l));
MemoryContextSwitchTo(opCtx);
foreach(l, reverse) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
gistContinueInsert(insert); gistContinueInsert(insert);
MemoryContextReset(opCtx);
} }
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(opCtx); MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx); MemoryContextDelete(insertCtx);
} }

View File

@ -9,7 +9,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/gist.h,v 1.48 2005/06/27 12:45:22 teodor Exp $ * $PostgreSQL: pgsql/src/include/access/gist.h,v 1.49 2005/06/30 17:52:14 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -45,13 +45,7 @@ typedef XLogRecPtr GistNSN;
typedef struct GISTPageOpaqueData typedef struct GISTPageOpaqueData
{ {
uint8 flags; uint32 flags; /* 29 bits are unused for now */
/* number page to which current one is splitted in last split */
uint8 nsplited;
/* level of page, 0 - leaf */
uint16 level;
BlockNumber rightlink; BlockNumber rightlink;
/* the only meaning - change this value if /* the only meaning - change this value if

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.6 2005/06/27 12:45:22 teodor Exp $ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.7 2005/06/30 17:52:14 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -234,7 +234,7 @@ extern IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
int *len, SplitedPageLayout **dist, GISTSTATE *giststate); int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
extern GISTInsertStack* gistFindPath( Relation r, BlockNumber child, extern GISTInsertStack* gistFindPath( Relation r, BlockNumber child,
Buffer (*myReadBuffer)(bool, Relation, BlockNumber) ); Buffer (*myReadBuffer)(Relation, BlockNumber) );
/* gistxlog.c */ /* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec); extern void gist_desc(char *buf, uint8 xl_info, char *rec);