Take care to reprocess an uncommitted notify message.

Oversight in my changes to cope with possible errors during message
processing; spotted by Joachim Wieland.
This commit is contained in:
Tom Lane 2010-02-17 16:54:06 +00:00
parent 075d43a01a
commit 2b44d74dd4

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.152 2010/02/17 00:52:09 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.153 2010/02/17 16:54:06 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -1951,6 +1951,7 @@ asyncQueueReadAllNotifications(void)
* The function returns true once we have reached the stop position or an * The function returns true once we have reached the stop position or an
* uncommitted notification, and false if we have finished with the page. * uncommitted notification, and false if we have finished with the page.
* In other words: once it returns true there is no need to look further. * In other words: once it returns true there is no need to look further.
* The QueuePosition *current is advanced past all processed messages.
*/ */
static bool static bool
asyncQueueProcessPageEntries(QueuePosition *current, asyncQueueProcessPageEntries(QueuePosition *current,
@ -1963,10 +1964,12 @@ asyncQueueProcessPageEntries(QueuePosition *current,
do do
{ {
if (QUEUE_POS_EQUAL(*current, stop)) QueuePosition thisentry = *current;
if (QUEUE_POS_EQUAL(thisentry, stop))
break; break;
qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current)); qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
/* /*
* Advance *current over this message, possibly to the next page. * Advance *current over this message, possibly to the next page.
@ -2002,8 +2005,14 @@ asyncQueueProcessPageEntries(QueuePosition *current,
{ {
/* /*
* The transaction has neither committed nor aborted so far, * The transaction has neither committed nor aborted so far,
* so we can't process its message yet. Break out of the loop. * so we can't process its message yet. Break out of the loop,
* but first back up *current so we will reprocess the message
* next time. (Note: it is unlikely but not impossible for
* TransactionIdDidCommit to fail, so we can't really avoid
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*/ */
*current = thisentry;
reachedStop = true; reachedStop = true;
break; break;
} }