2012-01-20 12:15:15 +08:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* tcn.c
|
|
|
|
* triggered change notification support for PostgreSQL
|
|
|
|
*
|
2013-01-02 06:15:01 +08:00
|
|
|
* Portions Copyright (c) 2011-2013, PostgreSQL Global Development Group
|
2012-01-20 12:15:15 +08:00
|
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* contrib/tcn/tcn.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
2012-08-31 04:15:44 +08:00
|
|
|
#include "access/htup_details.h"
|
2012-01-20 12:15:15 +08:00
|
|
|
#include "executor/spi.h"
|
|
|
|
#include "commands/async.h"
|
|
|
|
#include "commands/trigger.h"
|
|
|
|
#include "lib/stringinfo.h"
|
|
|
|
#include "utils/rel.h"
|
|
|
|
#include "utils/syscache.h"
|
|
|
|
|
|
|
|
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
|
|
|
|
|
|
|
|
/* forward declarations */
|
|
|
|
Datum triggered_change_notification(PG_FUNCTION_ARGS);
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Copy from s (for source) to r (for result), wrapping with q (quote)
|
|
|
|
* characters and doubling any quote characters found.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
strcpy_quoted(StringInfo r, const char *s, const char q)
|
|
|
|
{
|
|
|
|
appendStringInfoCharMacro(r, q);
|
|
|
|
while (*s)
|
|
|
|
{
|
|
|
|
if (*s == q)
|
|
|
|
appendStringInfoCharMacro(r, q);
|
|
|
|
appendStringInfoCharMacro(r, *s);
|
|
|
|
s++;
|
|
|
|
}
|
|
|
|
appendStringInfoCharMacro(r, q);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* triggered_change_notification
|
|
|
|
*
|
|
|
|
* This trigger function will send a notification of data modification with
|
|
|
|
* primary key values. The channel will be "tcn" unless the trigger is
|
|
|
|
* created with a parameter, in which case that parameter will be used.
|
|
|
|
*/
|
|
|
|
PG_FUNCTION_INFO_V1(triggered_change_notification);
|
|
|
|
|
|
|
|
Datum
|
|
|
|
triggered_change_notification(PG_FUNCTION_ARGS)
|
|
|
|
{
|
|
|
|
TriggerData *trigdata = (TriggerData *) fcinfo->context;
|
|
|
|
Trigger *trigger;
|
|
|
|
int nargs;
|
|
|
|
HeapTuple trigtuple;
|
|
|
|
Relation rel;
|
|
|
|
TupleDesc tupdesc;
|
|
|
|
char *channel;
|
|
|
|
char operation;
|
|
|
|
StringInfo payload = makeStringInfo();
|
|
|
|
bool foundPK;
|
|
|
|
|
|
|
|
List *indexoidlist;
|
|
|
|
ListCell *indexoidscan;
|
|
|
|
|
|
|
|
/* make sure it's called as a trigger */
|
|
|
|
if (!CALLED_AS_TRIGGER(fcinfo))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
|
|
|
errmsg("triggered_change_notification: must be called as trigger")));
|
|
|
|
|
|
|
|
/* and that it's called after the change */
|
|
|
|
if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
|
|
|
errmsg("triggered_change_notification: must be called after the change")));
|
|
|
|
|
|
|
|
/* and that it's called for each row */
|
|
|
|
if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
|
|
|
errmsg("triggered_change_notification: must be called for each row")));
|
|
|
|
|
|
|
|
if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
|
|
|
|
operation = 'I';
|
|
|
|
else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
|
|
|
|
operation = 'U';
|
|
|
|
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
|
|
|
|
operation = 'D';
|
|
|
|
else
|
|
|
|
{
|
|
|
|
elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
|
|
|
|
operation = 'X'; /* silence compiler warning */
|
|
|
|
}
|
|
|
|
|
|
|
|
trigger = trigdata->tg_trigger;
|
|
|
|
nargs = trigger->tgnargs;
|
|
|
|
if (nargs > 1)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
|
|
|
errmsg("triggered_change_notification: must not be called with more than one parameter")));
|
|
|
|
|
|
|
|
if (nargs == 0)
|
|
|
|
channel = "tcn";
|
|
|
|
else
|
|
|
|
channel = trigger->tgargs[0];
|
|
|
|
|
|
|
|
/* get tuple data */
|
|
|
|
trigtuple = trigdata->tg_trigtuple;
|
|
|
|
rel = trigdata->tg_relation;
|
|
|
|
tupdesc = rel->rd_att;
|
|
|
|
|
|
|
|
foundPK = false;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Get the list of index OIDs for the table from the relcache, and look up
|
|
|
|
* each one in the pg_index syscache until we find one marked primary key
|
|
|
|
* (hopefully there isn't more than one such).
|
|
|
|
*/
|
|
|
|
indexoidlist = RelationGetIndexList(rel);
|
|
|
|
|
|
|
|
foreach(indexoidscan, indexoidlist)
|
|
|
|
{
|
|
|
|
Oid indexoid = lfirst_oid(indexoidscan);
|
|
|
|
HeapTuple indexTuple;
|
|
|
|
Form_pg_index index;
|
|
|
|
|
|
|
|
indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
|
|
|
|
if (!HeapTupleIsValid(indexTuple)) /* should not happen */
|
|
|
|
elog(ERROR, "cache lookup failed for index %u", indexoid);
|
|
|
|
index = (Form_pg_index) GETSTRUCT(indexTuple);
|
Fix assorted bugs in CREATE/DROP INDEX CONCURRENTLY.
Commit 8cb53654dbdb4c386369eb988062d0bbb6de725e, which introduced DROP
INDEX CONCURRENTLY, managed to break CREATE INDEX CONCURRENTLY via a poor
choice of catalog state representation. The pg_index state for an index
that's reached the final pre-drop stage was the same as the state for an
index just created by CREATE INDEX CONCURRENTLY. This meant that the
(necessary) change to make RelationGetIndexList ignore about-to-die indexes
also made it ignore freshly-created indexes; which is catastrophic because
the latter do need to be considered in HOT-safety decisions. Failure to
do so leads to incorrect index entries and subsequently wrong results from
queries depending on the concurrently-created index.
To fix, add an additional boolean column "indislive" to pg_index, so that
the freshly-created and about-to-die states can be distinguished. (This
change obviously is only possible in HEAD. This patch will need to be
back-patched, but in 9.2 we'll use a kluge consisting of overloading the
formerly-impossible state of indisvalid = true and indisready = false.)
In addition, change CREATE/DROP INDEX CONCURRENTLY so that the pg_index
flag changes they make without exclusive lock on the index are made via
heap_inplace_update() rather than a normal transactional update. The
latter is not very safe because moving the pg_index tuple could result in
concurrent SnapshotNow scans finding it twice or not at all, thus possibly
resulting in index corruption. This is a pre-existing bug in CREATE INDEX
CONCURRENTLY, which was copied into the DROP code.
In addition, fix various places in the code that ought to check to make
sure that the indexes they are manipulating are valid and/or ready as
appropriate. These represent bugs that have existed since 8.2, since
a failed CREATE INDEX CONCURRENTLY could leave a corrupt or invalid
index behind, and we ought not try to do anything that might fail with
such an index.
Also fix RelationReloadIndexInfo to ensure it copies all the pg_index
columns that are allowed to change after initial creation. Previously we
could have been left with stale values of some fields in an index relcache
entry. It's not clear whether this actually had any user-visible
consequences, but it's at least a bug waiting to happen.
In addition, do some code and docs review for DROP INDEX CONCURRENTLY;
some cosmetic code cleanup but mostly addition and revision of comments.
This will need to be back-patched, but in a noticeably different form,
so I'm committing it to HEAD before working on the back-patch.
Problem reported by Amit Kapila, diagnosis by Pavan Deolassee,
fix by Tom Lane and Andres Freund.
2012-11-29 10:25:27 +08:00
|
|
|
/* we're only interested if it is the primary key and valid */
|
|
|
|
if (index->indisprimary && IndexIsValid(index))
|
2012-01-20 12:15:15 +08:00
|
|
|
{
|
|
|
|
int numatts = index->indnatts;
|
|
|
|
|
|
|
|
if (numatts > 0)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
|
|
|
|
foundPK = true;
|
|
|
|
|
|
|
|
strcpy_quoted(payload, RelationGetRelationName(rel), '"');
|
|
|
|
appendStringInfoCharMacro(payload, ',');
|
|
|
|
appendStringInfoCharMacro(payload, operation);
|
|
|
|
|
|
|
|
for (i = 0; i < numatts; i++)
|
|
|
|
{
|
|
|
|
int colno = index->indkey.values[i];
|
|
|
|
|
|
|
|
appendStringInfoCharMacro(payload, ',');
|
|
|
|
strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
|
|
|
|
appendStringInfoCharMacro(payload, '=');
|
|
|
|
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
|
|
|
|
}
|
|
|
|
|
|
|
|
Async_Notify(channel, payload->data);
|
|
|
|
}
|
|
|
|
ReleaseSysCache(indexTuple);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
ReleaseSysCache(indexTuple);
|
|
|
|
}
|
|
|
|
|
|
|
|
list_free(indexoidlist);
|
|
|
|
|
|
|
|
if (!foundPK)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
|
|
|
errmsg("triggered_change_notification: must be called on a table with a primary key")));
|
|
|
|
|
|
|
|
return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
|
|
|
|
}
|