diff --git a/doc/src/sgml/ref/prepare_transaction.sgml b/doc/src/sgml/ref/prepare_transaction.sgml index 0c8293f0ec..47cb1bd0dd 100644 --- a/doc/src/sgml/ref/prepare_transaction.sgml +++ b/doc/src/sgml/ref/prepare_transaction.sgml @@ -1,5 +1,5 @@ @@ -88,8 +88,10 @@ PREPARE TRANSACTION transaction_id It is not currently allowed to PREPARE a transaction that - has executed any operations involving temporary tables or - created any cursors WITH HOLD. Those features are too tightly + has executed any operations involving temporary tables, + created any cursors WITH HOLD, or executed + LISTEN or UNLISTEN. + Those features are too tightly tied to the current session to be useful in a transaction to be prepared. diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index d49bef06ac..450fd5a87c 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.134 2006/07/14 14:52:18 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.134.2.1 2008/03/12 20:12:01 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,16 @@ * transaction, since by assumption it is only called from outside any * transaction. * + * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list + * of pending actions. If we reach transaction commit, the changes are + * applied to pg_listener just before executing any pending NOTIFYs. This + * method is necessary because to avoid race conditions, we must hold lock + * on pg_listener from when we insert a new listener tuple until we commit. + * To do that and not create undue hazard of deadlock, we don't want to + * touch pg_listener until we are otherwise done with the transaction; + * in particular it'd be uncool to still be taking user-commanded locks + * while holding the pg_listener lock. + * * Although we grab ExclusiveLock on pg_listener for any operation, * the lock is never held very long, so it shouldn't cause too much of * a performance problem. (Previously we used AccessExclusiveLock, but @@ -75,7 +85,6 @@ #include #include -#include #include "access/heapam.h" #include "access/twophase_rmgr.h" @@ -88,11 +97,39 @@ #include "storage/ipc.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/ps_status.h" +/* + * State for pending LISTEN/UNLISTEN actions consists of an ordered list of + * all actions requested in the current transaction. As explained above, + * we don't actually modify pg_listener until we reach transaction commit. + * + * The list is kept in CurTransactionContext. In subtransactions, each + * subtransaction has its own list in its own CurTransactionContext, but + * successful subtransactions attach their lists to their parent's list. + * Failed subtransactions simply discard their lists. + */ +typedef enum +{ + LISTEN_LISTEN, + LISTEN_UNLISTEN, + LISTEN_UNLISTEN_ALL +} ListenActionKind; + +typedef struct +{ + ListenActionKind action; + char condname[1]; /* actually, as long as needed */ +} ListenAction; + +static List *pendingActions = NIL; /* list of ListenAction */ + +static List *upperPendingActions = NIL; /* list of upper-xact lists */ + /* * State for outbound notifies consists of a list of all relnames NOTIFYed * in the current transaction. We do not actually perform a NOTIFY until @@ -103,8 +140,13 @@ * subtransaction has its own list in its own CurTransactionContext, but * successful subtransactions attach their lists to their parent's list. * Failed subtransactions simply discard their lists. + * + * Note: the action and notify lists do not interact within a transaction. + * In particular, if a transaction does NOTIFY and then LISTEN on the same + * condition name, it will get a self-notify at commit. This is a bit odd + * but is consistent with our historical behavior. */ -static List *pendingNotifies = NIL; +static List *pendingNotifies = NIL; /* list of C strings */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ @@ -118,8 +160,8 @@ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ * does not grok "volatile", you'd be best advised to compile this file * with all optimization turned off. */ -static volatile int notifyInterruptEnabled = 0; -static volatile int notifyInterruptOccurred = 0; +static volatile sig_atomic_t notifyInterruptEnabled = 0; +static volatile sig_atomic_t notifyInterruptOccurred = 0; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; @@ -127,16 +169,20 @@ static bool unlistenExitRegistered = false; bool Trace_notify = false; +static void queue_listen(ListenActionKind action, const char *condname); static void Async_UnlistenAll(void); static void Async_UnlistenOnExit(int code, Datum arg); +static void Exec_Listen(Relation lRel, const char *relname); +static void Exec_Unlisten(Relation lRel, const char *relname); +static void Exec_UnlistenAll(Relation lRel); +static void Send_Notify(Relation lRel); static void ProcessIncomingNotify(void); static void NotifyMyFrontEnd(char *relname, int32 listenerPID); static bool AsyncExistsPendingNotify(const char *relname); -static void ClearPendingNotifies(void); +static void ClearPendingActionsAndNotifies(void); /* - *-------------------------------------------------------------- * Async_Notify * * This is executed by the SQL notify command. @@ -144,8 +190,6 @@ static void ClearPendingNotifies(void); * Adds the relation to the list of pending notifies. * Actual notification happens during transaction commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - * - *-------------------------------------------------------------- */ void Async_Notify(const char *relname) @@ -164,6 +208,12 @@ Async_Notify(const char *relname) oldcontext = MemoryContextSwitchTo(CurTransactionContext); + /* + * Ordering of the list isn't important. We choose to put new + * entries on the front, as this might make duplicate-elimination + * a tad faster when the same condition is signaled many times in + * a row. + */ pendingNotifies = lcons(pstrdup(relname), pendingNotifies); MemoryContextSwitchTo(oldcontext); @@ -171,34 +221,245 @@ Async_Notify(const char *relname) } /* - *-------------------------------------------------------------- + * queue_listen + * Common code for listen, unlisten, unlisten all commands. + * + * Adds the request to the list of pending actions. + * Actual update of pg_listener happens during transaction commit. + * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + */ +static void +queue_listen(ListenActionKind action, const char *condname) +{ + MemoryContext oldcontext; + ListenAction *actrec; + + /* + * Unlike Async_Notify, we don't try to collapse out duplicates. + * It would be too complicated to ensure we get the right interactions + * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that + * there would be any performance benefit anyway in sane applications. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* space for terminating null is included in sizeof(ListenAction) */ + actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname)); + actrec->action = action; + strcpy(actrec->condname, condname); + + pendingActions = lappend(pendingActions, actrec); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Async_Listen * * This is executed by the SQL listen command. - * - * Register the current backend as listening on the specified - * relation. - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- */ void Async_Listen(const char *relname) +{ + if (Trace_notify) + elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); + + queue_listen(LISTEN_LISTEN, relname); +} + +/* + * Async_Unlisten + * + * This is executed by the SQL unlisten command. + */ +void +Async_Unlisten(const char *relname) +{ + /* Handle specially the `unlisten "*"' command */ + if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) + { + Async_UnlistenAll(); + } + else + { + if (Trace_notify) + elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); + + queue_listen(LISTEN_UNLISTEN, relname); + } +} + +/* + * Async_UnlistenAll + * + * This is invoked by UNLISTEN "*" command, and also at backend exit. + */ +static void +Async_UnlistenAll(void) +{ + if (Trace_notify) + elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); + + queue_listen(LISTEN_UNLISTEN_ALL, ""); +} + +/* + * Async_UnlistenOnExit + * + * Clean up the pg_listener table at backend exit. + * + * This is executed if we have done any LISTENs in this backend. + * It might not be necessary anymore, if the user UNLISTENed everything, + * but we don't try to detect that case. + */ +static void +Async_UnlistenOnExit(int code, Datum arg) +{ + /* + * We need to start/commit a transaction for the unlisten, but if there is + * already an active transaction we had better abort that one first. + * Otherwise we'd end up committing changes that probably ought to be + * discarded. + */ + AbortOutOfAnyTransaction(); + /* Now we can do the unlisten */ + StartTransactionCommand(); + Async_UnlistenAll(); + CommitTransactionCommand(); +} + +/* + * AtPrepare_Notify + * + * This is called at the prepare phase of a two-phase + * transaction. Save the state for possible commit later. + */ +void +AtPrepare_Notify(void) +{ + ListCell *p; + + /* It's not sensible to have any pending LISTEN/UNLISTEN actions */ + if (pendingActions) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN"))); + + /* We can deal with pending NOTIFY though */ + foreach(p, pendingNotifies) + { + const char *relname = (const char *) lfirst(p); + + RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, + relname, strlen(relname) + 1); + } + + /* + * We can clear the state immediately, rather than needing a separate + * PostPrepare call, because if the transaction fails we'd just discard + * the state anyway. + */ + ClearPendingActionsAndNotifies(); +} + +/* + * AtCommit_Notify + * + * This is called at transaction commit. + * + * If there are pending LISTEN/UNLISTEN actions, insert or delete + * tuples in pg_listener accordingly. + * + * If there are outbound notify requests in the pendingNotifies list, + * scan pg_listener for matching tuples, and either signal the other + * backend or send a message to our own frontend. + * + * NOTE: we are still inside the current transaction, therefore can + * piggyback on its committing of changes. + */ +void +AtCommit_Notify(void) { Relation lRel; + ListCell *p; + + if (pendingActions == NIL && pendingNotifies == NIL) + return; /* no relevant statements in this xact */ + + /* + * NOTIFY is disabled if not normal processing mode. This test used to be + * in xact.c, but it seems cleaner to do it here. + */ + if (!IsNormalProcessingMode()) + { + ClearPendingActionsAndNotifies(); + return; + } + + if (Trace_notify) + elog(DEBUG1, "AtCommit_Notify"); + + /* Acquire ExclusiveLock on pg_listener */ + lRel = heap_open(ListenerRelationId, ExclusiveLock); + + /* Perform any pending listen/unlisten actions */ + foreach(p, pendingActions) + { + ListenAction *actrec = (ListenAction *) lfirst(p); + + switch (actrec->action) + { + case LISTEN_LISTEN: + Exec_Listen(lRel, actrec->condname); + break; + case LISTEN_UNLISTEN: + Exec_Unlisten(lRel, actrec->condname); + break; + case LISTEN_UNLISTEN_ALL: + Exec_UnlistenAll(lRel); + break; + } + + /* We must CCI after each action in case of conflicting actions */ + CommandCounterIncrement(); + } + + /* Perform any pending notifies */ + if (pendingNotifies) + Send_Notify(lRel); + + /* + * We do NOT release the lock on pg_listener here; we need to hold it + * until end of transaction (which is about to happen, anyway) to ensure + * that notified backends see our tuple updates when they look. Else they + * might disregard the signal, which would make the application programmer + * very unhappy. Also, this prevents race conditions when we have just + * inserted a listening tuple. + */ + heap_close(lRel, NoLock); + + ClearPendingActionsAndNotifies(); + + if (Trace_notify) + elog(DEBUG1, "AtCommit_Notify: done"); +} + +/* + * Exec_Listen --- subroutine for AtCommit_Notify + * + * Register the current backend as listening on the specified relation. + */ +static void +Exec_Listen(Relation lRel, const char *relname) +{ HeapScanDesc scan; HeapTuple tuple; Datum values[Natts_pg_listener]; char nulls[Natts_pg_listener]; - int i; + NameData condname; bool alreadyListener = false; if (Trace_notify) - elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); + elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid); /* Detect whether we are already listening on this relname */ scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); @@ -217,27 +478,20 @@ Async_Listen(const char *relname) heap_endscan(scan); if (alreadyListener) - { - heap_close(lRel, ExclusiveLock); return; - } /* * OK to insert a new tuple */ + memset(nulls, ' ', sizeof(nulls)); - for (i = 0; i < Natts_pg_listener; i++) - { - nulls[i] = ' '; - values[i] = PointerGetDatum(NULL); - } - - i = 0; - values[i++] = (Datum) relname; - values[i++] = (Datum) MyProcPid; - values[i++] = (Datum) 0; /* no notifies pending */ + namestrcpy(&condname, relname); + values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname); + values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid); + values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */ tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls); + simple_heap_insert(lRel, tuple); #ifdef NOT_USED /* currently there are no indexes */ @@ -246,8 +500,6 @@ Async_Listen(const char *relname) heap_freetuple(tuple); - heap_close(lRel, ExclusiveLock); - /* * now that we are listening, make sure we will unlisten before dying. */ @@ -259,37 +511,19 @@ Async_Listen(const char *relname) } /* - *-------------------------------------------------------------- - * Async_Unlisten - * - * This is executed by the SQL unlisten command. + * Exec_Unlisten --- subroutine for AtCommit_Notify * * Remove the current backend from the list of listening backends * for the specified relation. - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- */ -void -Async_Unlisten(const char *relname) +static void +Exec_Unlisten(Relation lRel, const char *relname) { - Relation lRel; HeapScanDesc scan; HeapTuple tuple; - /* Handle specially the `unlisten "*"' command */ - if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0)) - { - Async_UnlistenAll(); - return; - } - if (Trace_notify) - elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); + elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) @@ -311,8 +545,6 @@ Async_Unlisten(const char *relname) } heap_endscan(scan); - heap_close(lRel, ExclusiveLock); - /* * We do not complain about unlistening something not being listened; * should we? @@ -320,35 +552,19 @@ Async_Unlisten(const char *relname) } /* - *-------------------------------------------------------------- - * Async_UnlistenAll + * Exec_UnlistenAll --- subroutine for AtCommit_Notify * - * Unlisten all relations for this backend. - * - * This is invoked by UNLISTEN "*" command, and also at backend exit. - * - * Results: - * XXX - * - * Side effects: - * pg_listener is updated. - * - *-------------------------------------------------------------- + * Update pg_listener to unlisten all relations for this backend. */ static void -Async_UnlistenAll(void) +Exec_UnlistenAll(Relation lRel) { - Relation lRel; - TupleDesc tdesc; HeapScanDesc scan; HeapTuple lTuple; ScanKeyData key[1]; if (Trace_notify) - elog(DEBUG1, "Async_UnlistenAll"); - - lRel = heap_open(ListenerRelationId, ExclusiveLock); - tdesc = RelationGetDescr(lRel); + elog(DEBUG1, "Exec_UnlistenAll"); /* Find and delete all entries with my listenerPID */ ScanKeyInit(&key[0], @@ -361,100 +577,18 @@ Async_UnlistenAll(void) simple_heap_delete(lRel, &lTuple->t_self); heap_endscan(scan); - heap_close(lRel, ExclusiveLock); } /* - *-------------------------------------------------------------- - * Async_UnlistenOnExit + * Send_Notify --- subroutine for AtCommit_Notify * - * Clean up the pg_listener table at backend exit. - * - * This is executed if we have done any LISTENs in this backend. - * It might not be necessary anymore, if the user UNLISTENed everything, - * but we don't try to detect that case. - * - * Results: - * XXX - * - * Side effects: - * pg_listener is updated if necessary. - * - *-------------------------------------------------------------- + * Scan pg_listener for tuples matching our pending notifies, and + * either signal the other backend or send a message to our own frontend. */ static void -Async_UnlistenOnExit(int code, Datum arg) +Send_Notify(Relation lRel) { - /* - * We need to start/commit a transaction for the unlisten, but if there is - * already an active transaction we had better abort that one first. - * Otherwise we'd end up committing changes that probably ought to be - * discarded. - */ - AbortOutOfAnyTransaction(); - /* Now we can do the unlisten */ - StartTransactionCommand(); - Async_UnlistenAll(); - CommitTransactionCommand(); -} - - -/* - *-------------------------------------------------------------- - * AtPrepare_Notify - * - * This is called at the prepare phase of a two-phase - * transaction. Save the state for possible commit later. - *-------------------------------------------------------------- - */ -void -AtPrepare_Notify(void) -{ - ListCell *p; - - foreach(p, pendingNotifies) - { - const char *relname = (const char *) lfirst(p); - - RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0, - relname, strlen(relname) + 1); - } - - /* - * We can clear the state immediately, rather than needing a separate - * PostPrepare call, because if the transaction fails we'd just discard - * the state anyway. - */ - ClearPendingNotifies(); -} - -/* - *-------------------------------------------------------------- - * AtCommit_Notify - * - * This is called at transaction commit. - * - * If there are outbound notify requests in the pendingNotifies list, - * scan pg_listener for matching tuples, and either signal the other - * backend or send a message to our own frontend. - * - * NOTE: we are still inside the current transaction, therefore can - * piggyback on its committing of changes. - * - * Results: - * XXX - * - * Side effects: - * Tuples in pg_listener that have matching relnames and other peoples' - * listenerPIDs are updated with a nonzero notification field. - * - *-------------------------------------------------------------- - */ -void -AtCommit_Notify(void) -{ - Relation lRel; - TupleDesc tdesc; + TupleDesc tdesc = RelationGetDescr(lRel); HeapScanDesc scan; HeapTuple lTuple, rTuple; @@ -462,22 +596,6 @@ AtCommit_Notify(void) char repl[Natts_pg_listener], nulls[Natts_pg_listener]; - if (pendingNotifies == NIL) - return; /* no NOTIFY statements in this transaction */ - - /* - * NOTIFY is disabled if not normal processing mode. This test used to be - * in xact.c, but it seems cleaner to do it here. - */ - if (!IsNormalProcessingMode()) - { - ClearPendingNotifies(); - return; - } - - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify"); - /* preset data to update notify column to MyProcPid */ nulls[0] = nulls[1] = nulls[2] = ' '; repl[0] = repl[1] = repl[2] = ' '; @@ -485,8 +603,6 @@ AtCommit_Notify(void) value[0] = value[1] = value[2] = (Datum) 0; value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid); - lRel = heap_open(ListenerRelationId, ExclusiveLock); - tdesc = RelationGetDescr(lRel); scan = heap_beginscan(lRel, SnapshotNow, 0, NULL); while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) @@ -506,7 +622,6 @@ AtCommit_Notify(void) * could lose an outside notify, which'd be bad for applications * that ignore self-notify messages. */ - if (Trace_notify) elog(DEBUG1, "AtCommit_Notify: notifying self"); @@ -539,98 +654,32 @@ AtCommit_Notify(void) } else if (listener->notification == 0) { - HTSU_Result result; - ItemPointerData update_ctid; - TransactionId update_xmax; + /* Rewrite the tuple with my PID in notification column */ + rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); + simple_heap_update(lRel, &lTuple->t_self, rTuple); - rTuple = heap_modifytuple(lTuple, tdesc, - value, nulls, repl); - - /* - * We cannot use simple_heap_update here because the tuple - * could have been modified by an uncommitted transaction; - * specifically, since UNLISTEN releases exclusive lock on the - * table before commit, the other guy could already have tried - * to unlisten. There are no other cases where we should be - * able to see an uncommitted update or delete. Therefore, our - * response to a HeapTupleBeingUpdated result is just to - * ignore it. We do *not* wait for the other guy to commit - * --- that would risk deadlock, and we don't want to block - * while holding the table lock anyway for performance - * reasons. We also ignore HeapTupleUpdated, which could occur - * if the other guy commits between our heap_getnext and - * heap_update calls. - */ - result = heap_update(lRel, &lTuple->t_self, rTuple, - &update_ctid, &update_xmax, - GetCurrentCommandId(), InvalidSnapshot, - false /* no wait for commit */ ); - switch (result) - { - case HeapTupleSelfUpdated: - /* Tuple was already updated in current command? */ - elog(ERROR, "tuple already updated by self"); - break; - - case HeapTupleMayBeUpdated: - /* done successfully */ #ifdef NOT_USED /* currently there are no indexes */ - CatalogUpdateIndexes(lRel, rTuple); + CatalogUpdateIndexes(lRel, rTuple); #endif - break; - - case HeapTupleBeingUpdated: - /* ignore uncommitted tuples */ - break; - - case HeapTupleUpdated: - /* ignore just-committed tuples */ - break; - - default: - elog(ERROR, "unrecognized heap_update status: %u", - result); - break; - } } } } heap_endscan(scan); - - /* - * We do NOT release the lock on pg_listener here; we need to hold it - * until end of transaction (which is about to happen, anyway) to ensure - * that notified backends see our tuple updates when they look. Else they - * might disregard the signal, which would make the application programmer - * very unhappy. - */ - heap_close(lRel, NoLock); - - ClearPendingNotifies(); - - if (Trace_notify) - elog(DEBUG1, "AtCommit_Notify: done"); } /* - *-------------------------------------------------------------- * AtAbort_Notify * * This is called at transaction abort. * - * Gets rid of pending outbound notifies that we would have executed - * if the transaction got committed. - * - * Results: - * XXX - * - *-------------------------------------------------------------- + * Gets rid of pending actions and outbound notifies that we would have + * executed if the transaction got committed. */ void AtAbort_Notify(void) { - ClearPendingNotifies(); + ClearPendingActionsAndNotifies(); } /* @@ -646,6 +695,13 @@ AtSubStart_Notify(void) /* Keep the list-of-lists in TopTransactionContext for simplicity */ old_cxt = MemoryContextSwitchTo(TopTransactionContext); + upperPendingActions = lcons(pendingActions, upperPendingActions); + + Assert(list_length(upperPendingActions) == + GetCurrentTransactionNestLevel() - 1); + + pendingActions = NIL; + upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); Assert(list_length(upperPendingNotifies) == @@ -659,13 +715,25 @@ AtSubStart_Notify(void) /* * AtSubCommit_Notify() --- Take care of subtransaction commit. * - * Reassign all items in the pending notifies list to the parent transaction. + * Reassign all items in the pending lists to the parent transaction. */ void AtSubCommit_Notify(void) { + List *parentPendingActions; List *parentPendingNotifies; + parentPendingActions = (List *) linitial(upperPendingActions); + upperPendingActions = list_delete_first(upperPendingActions); + + Assert(list_length(upperPendingActions) == + GetCurrentTransactionNestLevel() - 2); + + /* + * Mustn't try to eliminate duplicates here --- see queue_listen() + */ + pendingActions = list_concat(parentPendingActions, pendingActions); + parentPendingNotifies = (List *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); @@ -687,7 +755,7 @@ AtSubAbort_Notify(void) int my_level = GetCurrentTransactionNestLevel(); /* - * All we have to do is pop the stack --- the notifies made in this + * All we have to do is pop the stack --- the actions/notifies made in this * subxact are no longer interesting, and the space will be freed when * CurTransactionContext is recycled. * @@ -696,6 +764,12 @@ AtSubAbort_Notify(void) * GetCurrentTransactionNestLevel as the indicator of how far we need to * prune the list. */ + while (list_length(upperPendingActions) > my_level - 2) + { + pendingActions = (List *) linitial(upperPendingActions); + upperPendingActions = list_delete_first(upperPendingActions); + } + while (list_length(upperPendingNotifies) > my_level - 2) { pendingNotifies = (List *) linitial(upperPendingNotifies); @@ -704,7 +778,6 @@ AtSubAbort_Notify(void) } /* - *-------------------------------------------------------------- * NotifyInterruptHandler * * This is the signal handler for SIGUSR2. @@ -712,13 +785,6 @@ AtSubAbort_Notify(void) * If we are idle (notifyInterruptEnabled is set), we can safely invoke * ProcessIncomingNotify directly. Otherwise, just set a flag * to do it later. - * - * Results: - * none - * - * Side effects: - * per above - *-------------------------------------------------------------- */ void NotifyInterruptHandler(SIGNAL_ARGS) @@ -794,7 +860,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) } /* - * -------------------------------------------------------------- * EnableNotifyInterrupt * * This is called by the PostgresMain main loop just before waiting @@ -804,7 +869,6 @@ NotifyInterruptHandler(SIGNAL_ARGS) * * NOTE: the signal handler starts out disabled, and stays so until * PostgresMain calls this the first time. - * -------------------------------------------------------------- */ void EnableNotifyInterrupt(void) @@ -853,7 +917,6 @@ EnableNotifyInterrupt(void) } /* - * -------------------------------------------------------------- * DisableNotifyInterrupt * * This is called by the PostgresMain main loop just after receiving @@ -863,7 +926,6 @@ EnableNotifyInterrupt(void) * The SIGUSR1 signal handler also needs to call this, so as to * prevent conflicts if one signal interrupts the other. So we * must return the previous state of the flag. - * -------------------------------------------------------------- */ bool DisableNotifyInterrupt(void) @@ -876,7 +938,6 @@ DisableNotifyInterrupt(void) } /* - * -------------------------------------------------------------- * ProcessIncomingNotify * * Deal with arriving NOTIFYs from other backends. @@ -886,7 +947,6 @@ DisableNotifyInterrupt(void) * and clear the notification field in pg_listener until next time. * * NOTE: since we are outside any transaction, we must create our own. - * -------------------------------------------------------------- */ static void ProcessIncomingNotify(void) @@ -949,9 +1009,6 @@ ProcessIncomingNotify(void) /* * Rewrite the tuple with 0 in notification column. - * - * simple_heap_update is safe here because no one else would have - * tried to UNLISTEN us, so there can be no uncommitted changes. */ rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl); simple_heap_update(lRel, &lTuple->t_self, rTuple); @@ -1035,17 +1092,18 @@ AsyncExistsPendingNotify(const char *relname) return false; } -/* Clear the pendingNotifies list. */ +/* Clear the pendingActions and pendingNotifies lists. */ static void -ClearPendingNotifies(void) +ClearPendingActionsAndNotifies(void) { /* * We used to have to explicitly deallocate the list members and nodes, * because they were malloc'd. Now, since we know they are palloc'd in * CurTransactionContext, we need not do that --- they'll go away * automatically at transaction exit. We need only reset the list head - * pointer. + * pointers. */ + pendingActions = NIL; pendingNotifies = NIL; }