diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c index cc12808375..330d7fd769 100644 --- a/contrib/bloom/blinsert.c +++ b/contrib/bloom/blinsert.c @@ -200,7 +200,7 @@ blinsert(Relation index, Datum *values, bool *isnull, /* * At first, try to insert new tuple to the first page in notFullPage - * array. If success we don't need to modify the meta page. + * array. If successful, we don't need to modify the meta page. */ metaBuffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO); LockBuffer(metaBuffer, BUFFER_LOCK_SHARE); @@ -212,17 +212,20 @@ blinsert(Relation index, Datum *values, bool *isnull, Page page; blkno = metaData->notFullPage[metaData->nStart]; - Assert(blkno != InvalidBlockNumber); + + /* Don't hold metabuffer lock while doing insert */ LockBuffer(metaBuffer, BUFFER_LOCK_UNLOCK); buffer = ReadBuffer(index, blkno); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + state = GenericXLogStart(index); page = GenericXLogRegister(state, buffer, false); if (BloomPageAddItem(&blstate, page, itup)) { + /* Success! Apply the change, clean up, and exit */ GenericXLogFinish(state); UnlockReleaseBuffer(buffer); ReleaseBuffer(metaBuffer); @@ -230,15 +233,14 @@ blinsert(Relation index, Datum *values, bool *isnull, MemoryContextDelete(insertCtx); return false; } - else - { - GenericXLogAbort(state); - UnlockReleaseBuffer(buffer); - } + + /* Didn't fit, must try other pages */ + GenericXLogAbort(state); + UnlockReleaseBuffer(buffer); } else { - /* First page in notFullPage isn't suitable */ + /* No entries in notFullPage */ LockBuffer(metaBuffer, BUFFER_LOCK_UNLOCK); } @@ -248,20 +250,30 @@ blinsert(Relation index, Datum *values, bool *isnull, */ LockBuffer(metaBuffer, BUFFER_LOCK_EXCLUSIVE); - state = GenericXLogStart(index); - metaPage = GenericXLogRegister(state, metaBuffer, false); - metaData = BloomPageGetMeta(metaPage); - - /* - * Iterate over notFullPage array. Skip page we already tried first. - */ + /* nStart might have changed while we didn't have lock */ nStart = metaData->nStart; - if (metaData->nEnd > nStart && + + /* Skip first page if we already tried it above */ + if (nStart < metaData->nEnd && blkno == metaData->notFullPage[nStart]) nStart++; - while (metaData->nEnd > nStart) + /* + * This loop iterates for each page we try from the notFullPage array, and + * will also initialize a GenericXLogState for the fallback case of having + * to allocate a new page. + */ + for (;;) { + state = GenericXLogStart(index); + + /* get modifiable copy of metapage */ + metaPage = GenericXLogRegister(state, metaBuffer, false); + metaData = BloomPageGetMeta(metaPage); + + if (nStart >= metaData->nEnd) + break; /* no more entries in notFullPage array */ + blkno = metaData->notFullPage[nStart]; Assert(blkno != InvalidBlockNumber); @@ -271,6 +283,7 @@ blinsert(Relation index, Datum *values, bool *isnull, if (BloomPageAddItem(&blstate, page, itup)) { + /* Success! Apply the changes, clean up, and exit */ metaData->nStart = nStart; GenericXLogFinish(state); UnlockReleaseBuffer(buffer); @@ -279,41 +292,41 @@ blinsert(Relation index, Datum *values, bool *isnull, MemoryContextDelete(insertCtx); return false; } - else - { - GenericXLogUnregister(state, buffer); - UnlockReleaseBuffer(buffer); - } + + /* Didn't fit, must try other pages */ + GenericXLogAbort(state); + UnlockReleaseBuffer(buffer); nStart++; } - GenericXLogAbort(state); - /* * Didn't find place to insert in notFullPage array. Allocate new page. + * (XXX is it good to do this while holding ex-lock on the metapage??) */ buffer = BloomNewBuffer(index); - state = GenericXLogStart(index); - metaPage = GenericXLogRegister(state, metaBuffer, false); - metaData = BloomPageGetMeta(metaPage); page = GenericXLogRegister(state, buffer, true); BloomInitPage(page, 0); if (!BloomPageAddItem(&blstate, page, itup)) { - /* We shouldn't be here since we're inserting to the empty page */ + /* We shouldn't be here since we're inserting to an empty page */ elog(ERROR, "could not add new bloom tuple to empty page"); } + /* Reset notFullPage array to contain just this new page */ metaData->nStart = 0; metaData->nEnd = 1; metaData->notFullPage[0] = BufferGetBlockNumber(buffer); + /* Apply the changes, clean up, and exit */ GenericXLogFinish(state); UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(metaBuffer); + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(insertCtx); + return false; }