From 2b44d74dd4caa0d5cec2aeb0ceec7923b69109d3 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 17 Feb 2010 16:54:06 +0000 Subject: [PATCH] Take care to reprocess an uncommitted notify message. Oversight in my changes to cope with possible errors during message processing; spotted by Joachim Wieland. --- src/backend/commands/async.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 23c57cea04..8a31182c99 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.152 2010/02/17 00:52:09 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.153 2010/02/17 16:54:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1951,6 +1951,7 @@ asyncQueueReadAllNotifications(void) * The function returns true once we have reached the stop position or an * uncommitted notification, and false if we have finished with the page. * In other words: once it returns true there is no need to look further. + * The QueuePosition *current is advanced past all processed messages. */ static bool asyncQueueProcessPageEntries(QueuePosition *current, @@ -1963,10 +1964,12 @@ asyncQueueProcessPageEntries(QueuePosition *current, do { - if (QUEUE_POS_EQUAL(*current, stop)) + QueuePosition thisentry = *current; + + if (QUEUE_POS_EQUAL(thisentry, stop)) break; - qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(*current)); + qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); /* * Advance *current over this message, possibly to the next page. @@ -2002,8 +2005,14 @@ asyncQueueProcessPageEntries(QueuePosition *current, { /* * The transaction has neither committed nor aborted so far, - * so we can't process its message yet. Break out of the loop. + * so we can't process its message yet. Break out of the loop, + * but first back up *current so we will reprocess the message + * next time. (Note: it is unlikely but not impossible for + * TransactionIdDidCommit to fail, so we can't really avoid + * this advance-then-back-up behavior when dealing with an + * uncommitted message.) */ + *current = thisentry; reachedStop = true; break; }