From 3917c397f034970a8952b39f64c989b5ab46075e Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Wed, 12 Mar 2008 20:12:01 +0000 Subject: [PATCH] Fix LISTEN/NOTIFY race condition reported by Laurent Birtz, by postponing pg_listener modifications commanded by LISTEN and UNLISTEN until the end of the current transaction. This allows us to hold the ExclusiveLock on pg_listener until after commit, with no greater risk of deadlock than there was before. Aside from fixing the race condition, this gets rid of a truly ugly kludge that was there before, namely having to ignore HeapTupleBeingUpdated failures during NOTIFY. There is a small potential incompatibility, which is that if a transaction issues LISTEN or UNLISTEN and then looks into pg_listener before committing, it won't see any resulting row insertion or deletion, where before it would have. It seems unlikely that anyone would be depending on that, though. This patch also disallows LISTEN and UNLISTEN inside a prepared transaction. That case had some pretty undesirable properties already, such as possibly allowing pg_listener entries to be made for PIDs no longer present, so disallowing it seems like a better idea than trying to maintain the behavior. --- doc/src/sgml/ref/prepare_transaction.sgml | 8 +- src/backend/commands/async.c | 622 ++++++++++++---------- 2 files changed, 345 insertions(+), 285 deletions(-) diff --git a/doc/src/sgml/ref/prepare_transaction.sgml b/doc/src/sgml/ref/prepare_transaction.sgml index 0c8293f0ec..47cb1bd0dd 100644 --- a/doc/src/sgml/ref/prepare_transaction.sgml +++ b/doc/src/sgml/ref/prepare_transaction.sgml @@ -1,5 +1,5 @@ @@ -88,8 +88,10 @@ PREPARE TRANSACTION transaction_id It is not currently allowed to PREPARE a transaction that - has executed any operations involving temporary tables or - created any cursors WITH HOLD. Those features are too tightly + has executed any operations involving temporary tables, + created any cursors WITH HOLD, or executed + LISTEN or UNLISTEN. + Those features are too tightly tied to the current session to be useful in a transaction to be prepared. diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index d49bef06ac..450fd5a87c 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.134 2006/07/14 14:52:18 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.134.2.1 2008/03/12 20:12:01 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,16 @@ * transaction, since by assumption it is only called from outside any * transaction. * + * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list + * of pending actions. If we reach transaction commit, the changes are + * applied to pg_listener just before executing any pending NOTIFYs. This + * method is necessary because to avoid race conditions, we must hold lock + * on pg_listener from when we insert a new listener tuple until we commit. + * To do that and not create undue hazard of deadlock, we don't want to + * touch pg_listener until we are otherwise done with the transaction; + * in particular it'd be uncool to still be taking user-commanded locks + * while holding the pg_listener lock. + * * Although we grab ExclusiveLock on pg_listener for any operation, * the lock is never held very long, so it shouldn't cause too much of * a performance problem. (Previously we used AccessExclusiveLock, but @@ -75,7 +85,6 @@ #include #include -#include #include "access/heapam.h" #include "access/twophase_rmgr.h" @@ -88,11 +97,39 @@ #include "storage/ipc.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/ps_status.h" +/* + * State for pending LISTEN/UNLISTEN actions consists of an ordered list of + * all actions requested in the current transaction. As explained above, + * we don't actually modify pg_listener until we reach transaction commit. + * + * The list is kept in CurTransactionContext. In subtransactions, each + * subtransaction has its own list in its own CurTransactionContext, but + * successful subtransactions attach their lists to their parent's list. + * Failed subtransactions simply discard their lists. + */ +typedef enum +{ + LISTEN_LISTEN, + LISTEN_UNLISTEN, + LISTEN_UNLISTEN_ALL +} ListenActionKind; + +typedef struct +{ + ListenActionKind action; + char condname[1]; /* actually, as long as needed */ +} ListenAction; + +static List *pendingActions = NIL; /* list of ListenAction */ + +static List *upperPendingActions = NIL; /* list of upper-xact lists */ + /* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until @@ -103,8 +140,13 @@ * subtransaction has its own list in its own CurTransactionContext, but * successful subtransactions attach their lists to their parent's list. * Failed subtransactions simply discard their lists. + * + * Note: the action and notify lists do not interact within a transaction. + * In particular, if a transaction does NOTIFY and then LISTEN on the same + * condition name, it will get a self-notify at commit. This is a bit odd + * but is consistent with our historical behavior. */ -static List *pendingNotifies = NIL; +static List *pendingNotifies = NIL; /* list of C strings */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ @@ -118,8 +160,8 @@ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */ -static volatile int notifyInterruptEnabled = 0; -static volatile int notifyInterruptOccurred = 0; +static volatile sig_atomic_t notifyInterruptEnabled = 0; +static volatile sig_atomic_t notifyInterruptOccurred = 0; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; @@ -127,16 +169,20 @@ static bool unlistenExitRegistered = false; bool Trace_notify = false; +static void queue_listen(ListenActionKind action, const char *condname); static void Async_UnlistenAll(void); static void Async_UnlistenOnExit(int code, Datum arg); +static void Exec_Listen(Relation lRel, const char *relname); +static void Exec_Unlisten(Relation lRel, const char *relname); +static void Exec_UnlistenAll(Relation lRel); +static void Send_Notify(Relation lRel); static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(char *relname, int32 listenerPID); static bool AsyncExistsPendingNotify(const char *relname); -static void ClearPendingNotifies(void); +static void ClearPendingActionsAndNotifies(void); /* - *-------------------------------------------------------------- * Async_Notify * * This is executed by the SQL notify command. @@ -144,8 +190,6 @@ static void ClearPendingNotifies(void); * Adds the relation to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - * - *-------------------------------------------------------------- */ void Async_Notify(const char *relname) @@ -164,6 +208,12 @@ Async_Notify(const char *relname) oldcontext = MemoryContextSwitchTo(CurTransactionContext); + /* + * Ordering of the list isn't important. We choose to put new + * entries on the front, as this might make duplicate-elimination + * a tad faster when the same condition is signaled many times in + * a row. + */ pendingNotifies = lcons(pstrdup(relname), pendingNotifies); MemoryContextSwitchTo(oldcontext); @@ -171,34 +221,245 @@ Async_Notify(const char *relname) } /* - *-------------------------------------------------------------- + * queue_listen + * Common code for listen, unlisten, unlisten all commands. + * + * Adds the request to the list of pending actions. + * Actual update of pg_listener happens during transaction commit. + * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + */ +static void +queue_listen(ListenActionKind action, const char *condname) +{ + MemoryContext oldcontext; + ListenAction *actrec; + + /* + * Unlike Async_Notify, we don't try to collapse out duplicates. + * It would be too complicated to ensure we get the right interactions + * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that + * there would be any performance benefit anyway in sane applications. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* space for terminating null is included in sizeof(ListenAction) */ + actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); + actrec->action = action; + strcpy(actrec->condname, condname); + + pendingActions = lappend(pendingActions, actrec); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Async_Listen * * This is executed by the SQL listen command. - * - * Register the current backend as listening on the specified - * relation. - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- */ void Async_Listen(const char *relname) +{ + if (Trace_notify) + elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); + + queue_listen(LISTEN_LISTEN, relname); +} + +/* + * Async_Unlisten + * + * This is executed by the SQL unlisten command. + */ +void +Async_Unlisten(const char *relname) +{ + /* Handle specially the `unlisten "*"' command */ + if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) + { + Async_UnlistenAll(); + } + else + { + if (Trace_notify) + elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); + + queue_listen(LISTEN_UNLISTEN, relname); + } +} + +/* + * Async_UnlistenAll + * + * This is invoked by UNLISTEN "*" command, and also at backend exit. + */ +static void +Async_UnlistenAll(void) +{ + if (Trace_notify) + elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); + + queue_listen(LISTEN_UNLISTEN_ALL, ""); +} + +/* + * Async_UnlistenOnExit + * + * Clean up the pg_listener table at backend exit. + * + * This is executed if we have done any LISTENs in this backend. + * It might not be necessary anymore, if the user UNLISTENed everything, + * but we don't try to detect that case. + */ +static void +Async_UnlistenOnExit(int code, Datum arg) +{ + /* + * We need to start/commit a transaction for the unlisten, but if there is + * already an active transaction we had better abort that one first. + * Otherwise we'd end up committing changes that probably ought to be + * discarded. + */ + AbortOutOfAnyTransaction(); + /* Now we can do the unlisten */ + StartTransactionCommand(); + Async_UnlistenAll(); + CommitTransactionCommand(); +} + +/* + * AtPrepare_Notify + * + * This is called at the prepare phase of a two-phase + * transaction. Save the state for possible commit later. + */ +void +AtPrepare_Notify(void) +{ + ListCell *p; + + /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ + if (pendingActions) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); + + /* We can deal with pending NOTIFY though */ + foreach(p, pendingNotifies) + { + const char *relname = (const char *) lfirst(p); + + RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, + relname, strlen(relname) + 1); + } + + /* + * We can clear the state immediately, rather than needing a separate + * PostPrepare call, because if the transaction fails we'd just discard + * the state anyway. + */ + ClearPendingActionsAndNotifies(); +} + +/* + * AtCommit_Notify + * + * This is called at transaction commit. + * + * If there are pending LISTEN/UNLISTEN actions, insert or delete + * tuples in pg_listener accordingly. + * + * If there are outbound notify requests in the pendingNotifies list, + * scan pg_listener for matching tuples, and either signal the other + * backend or send a message to our own frontend. + * + * NOTE: we are still inside the current transaction, therefore can + * piggyback on its committing of changes. + */ +void +AtCommit_Notify(void) { Relation lRel; + ListCell *p; + + if (pendingActions == NIL && pendingNotifies == NIL) + return; /* no relevant statements in this xact */ + + /* + * NOTIFY is disabled if not normal processing mode. This test used to be + * in xact.c, but it seems cleaner to do it here. + */ + if (!IsNormalProcessingMode()) + { + ClearPendingActionsAndNotifies(); + return; + } + + if (Trace_notify) + elog(DEBUG1, "AtCommit_Notify"); + + /* Acquire ExclusiveLock on pg_listener */ + lRel = heap_open(ListenerRelationId, ExclusiveLock); + + /* Perform any pending listen/unlisten actions */ + foreach(p, pendingActions) + { + ListenAction *actrec = (ListenAction *) lfirst(p); + + switch (actrec->action) + { + case LISTEN_LISTEN: + Exec_Listen(lRel, actrec->condname); + break; + case LISTEN_UNLISTEN: + Exec_Unlisten(lRel, actrec->condname); + break; + case LISTEN_UNLISTEN_ALL: + Exec_UnlistenAll(lRel); + break; + } + + /* We must CCI after each action in case of conflicting actions */ + CommandCounterIncrement(); + } + + /* Perform any pending notifies */ + if (pendingNotifies) + Send_Notify(lRel); + + /* + * We do NOT release the lock on pg_listener here; we need to hold it + * until end of transaction (which is about to happen, anyway) to ensure + * that notified backends see our tuple updates when they look. Else they + * might disregard the signal, which would make the application programmer + * very unhappy. Also, this prevents race conditions when we have just + * inserted a listening tuple. + */ + heap_close(lRel, NoLock); + + ClearPendingActionsAndNotifies(); + + if (Trace_notify) + elog(DEBUG1, "AtCommit_Notify: done"); +} + +/* + * Exec_Listen --- subroutine for AtCommit_Notify + * + * Register the current backend as listening on the specified relation. + */ +static void +Exec_Listen(Relation lRel, const char *relname) +{ HeapScanDesc scan; HeapTuple tuple; Datum values[Natts_pg_listener]; char nulls[Natts_pg_listener]; - int i; + NameData condname; bool alreadyListener = false; if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); + elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); /* Detect whether we are already listening on this relname */ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); @@ -217,27 +478,20 @@ Async_Listen(const char *relname) heap_endscan(scan); if (alreadyListener) - { - heap_close(lRel, ExclusiveLock); return; - } /* * OK to insert a new tuple */ + memset(nulls, ' ', sizeof(nulls)); - for (i = 0; i < Natts_pg_listener; i++) - { - nulls[i] = ' '; - values[i] = PointerGetDatum(NULL); - } - - i = 0; - values[i++] = (Datum) relname; - values[i++] = (Datum) MyProcPid; - values[i++] = (Datum) 0; /* no notifies pending */ + namestrcpy(&condname, relname); + values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); + values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid); + values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); + simple_heap_insert(lRel, tuple); #ifdef NOT_USED /* currently there are no indexes */ @@ -246,8 +500,6 @@ Async_Listen(const char *relname) heap_freetuple(tuple); - heap_close(lRel, ExclusiveLock); - /* * now that we are listening, make sure we will unlisten before dying. */ @@ -259,37 +511,19 @@ Async_Listen(const char *relname) } /* - *-------------------------------------------------------------- - * Async_Unlisten - * - * This is executed by the SQL unlisten command. + * Exec_Unlisten --- subroutine for AtCommit_Notify * * Remove the current backend from the list of listening backends * for the specified relation. - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- */ -void -Async_Unlisten(const char *relname) +static void +Exec_Unlisten(Relation lRel, const char *relname) { - Relation lRel; HeapScanDesc scan; HeapTuple tuple; - /* Handle specially the `unlisten "*"' command */ - if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) - { - Async_UnlistenAll(); - return; - } - if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); + elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) @@ -311,8 +545,6 @@ Async_Unlisten(const char *relname) } heap_endscan(scan); - heap_close(lRel, ExclusiveLock); - /* * We do not complain about unlistening something not being listened; * should we? @@ -320,35 +552,19 @@ Async_Unlisten(const char *relname) } /* - *-------------------------------------------------------------- - * Async_UnlistenAll + * Exec_UnlistenAll --- subroutine for AtCommit_Notify * - * Unlisten all relations for this backend. - * - * This is invoked by UNLISTEN "*" command, and also at backend exit. - * - * Results: - * XXX - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- + * Update pg_listener to unlisten all relations for this backend. */ static void -Async_UnlistenAll(void) +Exec_UnlistenAll(Relation lRel) { - Relation lRel; - TupleDesc tdesc; HeapScanDesc scan; HeapTuple lTuple; ScanKeyData key[1]; if (Trace_notify) - elog(DEBUG1, "Async_UnlistenAll"); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); - tdesc = RelationGetDescr(lRel); + elog(DEBUG1, "Exec_UnlistenAll"); /* Find and delete all entries with my listenerPID */ ScanKeyInit(&key[0], @@ -361,100 +577,18 @@ Async_UnlistenAll(void) simple_heap_delete(lRel, &lTuple->t_self); heap_endscan(scan); - heap_close(lRel, ExclusiveLock); } /* - *-------------------------------------------------------------- - * Async_UnlistenOnExit + * Send_Notify --- subroutine for AtCommit_Notify * - * Clean up the pg_listener table at backend exit. - * - * This is executed if we have done any LISTENs in this backend. - * It might not be necessary anymore, if the user UNLISTENed everything, - * but we don't try to detect that case. - * - * Results: - * XXX - * - * Side effects: - * pg_listener is updated if necessary. - * - *-------------------------------------------------------------- + * Scan pg_listener for tuples matching our pending notifies, and + * either signal the other backend or send a message to our own frontend. */ static void -Async_UnlistenOnExit(int code, Datum arg) +Send_Notify(Relation lRel) { - /* - * We need to start/commit a transaction for the unlisten, but if there is - * already an active transaction we had better abort that one first. - * Otherwise we'd end up committing changes that probably ought to be - * discarded. - */ - AbortOutOfAnyTransaction(); - /* Now we can do the unlisten */ - StartTransactionCommand(); - Async_UnlistenAll(); - CommitTransactionCommand(); -} - - -/* - *-------------------------------------------------------------- - * AtPrepare_Notify - * - * This is called at the prepare phase of a two-phase - * transaction. Save the state for possible commit later. - *-------------------------------------------------------------- - */ -void -AtPrepare_Notify(void) -{ - ListCell *p; - - foreach(p, pendingNotifies) - { - const char *relname = (const char *) lfirst(p); - - RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, - relname, strlen(relname) + 1); - } - - /* - * We can clear the state immediately, rather than needing a separate - * PostPrepare call, because if the transaction fails we'd just discard - * the state anyway. - */ - ClearPendingNotifies(); -} - -/* - *-------------------------------------------------------------- - * AtCommit_Notify - * - * This is called at transaction commit. - * - * If there are outbound notify requests in the pendingNotifies list, - * scan pg_listener for matching tuples, and either signal the other - * backend or send a message to our own frontend. - * - * NOTE: we are still inside the current transaction, therefore can - * piggyback on its committing of changes. - * - * Results: - * XXX - * - * Side effects: - * Tuples in pg_listener that have matching relnames and other peoples' - * listenerPIDs are updated with a nonzero notification field. - * - *-------------------------------------------------------------- - */ -void -AtCommit_Notify(void) -{ - Relation lRel; - TupleDesc tdesc; + TupleDesc tdesc = RelationGetDescr(lRel); HeapScanDesc scan; HeapTuple lTuple, rTuple; @@ -462,22 +596,6 @@ AtCommit_Notify(void) char repl[Natts_pg_listener], nulls[Natts_pg_listener]; - if (pendingNotifies == NIL) - return; /* no NOTIFY statements in this transaction */ - - /* - * NOTIFY is disabled if not normal processing mode. This test used to be - * in xact.c, but it seems cleaner to do it here. - */ - if (!IsNormalProcessingMode()) - { - ClearPendingNotifies(); - return; - } - - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify"); - /* preset data to update notify column to MyProcPid */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; @@ -485,8 +603,6 @@ AtCommit_Notify(void) value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); - lRel = heap_open(ListenerRelationId, ExclusiveLock); - tdesc = RelationGetDescr(lRel); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) @@ -506,7 +622,6 @@ AtCommit_Notify(void) * could lose an outside notify, which'd be bad for applications * that ignore self-notify messages. */ - if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying self"); @@ -539,98 +654,32 @@ AtCommit_Notify(void) } else if (listener->notification == 0) { - HTSU_Result result; - ItemPointerData update_ctid; - TransactionId update_xmax; + /* Rewrite the tuple with my PID in notification column */ + rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); + simple_heap_update(lRel, &lTuple->t_self, rTuple); - rTuple = heap_modifytuple(lTuple, tdesc, - value, nulls, repl); - - /* - * We cannot use simple_heap_update here because the tuple - * could have been modified by an uncommitted transaction; - * specifically, since UNLISTEN releases exclusive lock on the - * table before commit, the other guy could already have tried - * to unlisten. There are no other cases where we should be - * able to see an uncommitted update or delete. Therefore, our - * response to a HeapTupleBeingUpdated result is just to - * ignore it. We do *not* wait for the other guy to commit - * --- that would risk deadlock, and we don't want to block - * while holding the table lock anyway for performance - * reasons. We also ignore HeapTupleUpdated, which could occur - * if the other guy commits between our heap_getnext and - * heap_update calls. - */ - result = heap_update(lRel, &lTuple->t_self, rTuple, - &update_ctid, &update_xmax, - GetCurrentCommandId(), InvalidSnapshot, - false /* no wait for commit */ ); - switch (result) - { - case HeapTupleSelfUpdated: - /* Tuple was already updated in current command? */ - elog(ERROR, "tuple already updated by self"); - break; - - case HeapTupleMayBeUpdated: - /* done successfully */ #ifdef NOT_USED /* currently there are no indexes */ - CatalogUpdateIndexes(lRel, rTuple); + CatalogUpdateIndexes(lRel, rTuple); #endif - break; - - case HeapTupleBeingUpdated: - /* ignore uncommitted tuples */ - break; - - case HeapTupleUpdated: - /* ignore just-committed tuples */ - break; - - default: - elog(ERROR, "unrecognized heap_update status: %u", - result); - break; - } } } } heap_endscan(scan); - - /* - * We do NOT release the lock on pg_listener here; we need to hold it - * until end of transaction (which is about to happen, anyway) to ensure - * that notified backends see our tuple updates when they look. Else they - * might disregard the signal, which would make the application programmer - * very unhappy. - */ - heap_close(lRel, NoLock); - - ClearPendingNotifies(); - - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: done"); } /* - *-------------------------------------------------------------- * AtAbort_Notify * * This is called at transaction abort. * - * Gets rid of pending outbound notifies that we would have executed - * if the transaction got committed. - * - * Results: - * XXX - * - *-------------------------------------------------------------- + * Gets rid of pending actions and outbound notifies that we would have + * executed if the transaction got committed. */ void AtAbort_Notify(void) { - ClearPendingNotifies(); + ClearPendingActionsAndNotifies(); } /* @@ -646,6 +695,13 @@ AtSubStart_Notify(void) /* Keep the list-of-lists in TopTransactionContext for simplicity */ old_cxt = MemoryContextSwitchTo(TopTransactionContext); + upperPendingActions = lcons(pendingActions, upperPendingActions); + + Assert(list_length(upperPendingActions) == + GetCurrentTransactionNestLevel() - 1); + + pendingActions = NIL; + upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); Assert(list_length(upperPendingNotifies) == @@ -659,13 +715,25 @@ AtSubStart_Notify(void) /* * AtSubCommit_Notify() --- Take care of subtransaction commit. * - * Reassign all items in the pending notifies list to the parent transaction. + * Reassign all items in the pending lists to the parent transaction. */ void AtSubCommit_Notify(void) { + List *parentPendingActions; List *parentPendingNotifies; + parentPendingActions = (List *) linitial(upperPendingActions); + upperPendingActions = list_delete_first(upperPendingActions); + + Assert(list_length(upperPendingActions) == + GetCurrentTransactionNestLevel() - 2); + + /* + * Mustn't try to eliminate duplicates here --- see queue_listen() + */ + pendingActions = list_concat(parentPendingActions, pendingActions); + parentPendingNotifies = (List *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); @@ -687,7 +755,7 @@ AtSubAbort_Notify(void) int my_level = GetCurrentTransactionNestLevel(); /* - * All we have to do is pop the stack --- the notifies made in this + * All we have to do is pop the stack --- the actions/notifies made in this * subxact are no longer interesting, and the space will be freed when * CurTransactionContext is recycled. * @@ -696,6 +764,12 @@ AtSubAbort_Notify(void) * GetCurrentTransactionNestLevel as the indicator of how far we need to * prune the list. */ + while (list_length(upperPendingActions) > my_level - 2) + { + pendingActions = (List *) linitial(upperPendingActions); + upperPendingActions = list_delete_first(upperPendingActions); + } + while (list_length(upperPendingNotifies) > my_level - 2) { pendingNotifies = (List *) linitial(upperPendingNotifies); @@ -704,7 +778,6 @@ AtSubAbort_Notify(void) } /* - *-------------------------------------------------------------- * NotifyInterruptHandler * * This is the signal handler for SIGUSR2. @@ -712,13 +785,6 @@ AtSubAbort_Notify(void) * If we are idle (notifyInterruptEnabled is set), we can safely invoke * ProcessIncomingNotify directly. Otherwise, just set a flag * to do it later. - * - * Results: - * none - * - * Side effects: - * per above - *-------------------------------------------------------------- */ void NotifyInterruptHandler(SIGNAL_ARGS) @@ -794,7 +860,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) } /* - * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting @@ -804,7 +869,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. - * -------------------------------------------------------------- */ void EnableNotifyInterrupt(void) @@ -853,7 +917,6 @@ EnableNotifyInterrupt(void) } /* - * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving @@ -863,7 +926,6 @@ EnableNotifyInterrupt(void) * The SIGUSR1 signal handler also needs to call this, so as to * prevent conflicts if one signal interrupts the other. So we * must return the previous state of the flag. - * -------------------------------------------------------------- */ bool DisableNotifyInterrupt(void) @@ -876,7 +938,6 @@ DisableNotifyInterrupt(void) } /* - * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. @@ -886,7 +947,6 @@ DisableNotifyInterrupt(void) * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. - * -------------------------------------------------------------- */ static void ProcessIncomingNotify(void) @@ -949,9 +1009,6 @@ ProcessIncomingNotify(void) /* * Rewrite the tuple with 0 in notification column. - * - * simple_heap_update is safe here because no one else would have - * tried to UNLISTEN us, so there can be no uncommitted changes. */ rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple); @@ -1035,17 +1092,18 @@ AsyncExistsPendingNotify(const char *relname) return false; } -/* Clear the pendingNotifies list. */ +/* Clear the pendingActions and pendingNotifies lists. */ static void -ClearPendingNotifies(void) +ClearPendingActionsAndNotifies(void) { /* * We used to have to explicitly deallocate the list members and nodes, * because they were malloc'd. Now, since we know they are palloc'd in * CurTransactionContext, we need not do that --- they'll go away * automatically at transaction exit. We need only reset the list head - * pointer. + * pointers. */ + pendingActions = NIL; pendingNotifies = NIL; }