Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* test_decoding.c
|
|
|
|
* example logical decoding output plugin
|
|
|
|
*
|
|
|
|
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
|
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* contrib/test_decoding/test_decoding.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "access/sysattr.h"
|
|
|
|
|
|
|
|
#include "catalog/pg_class.h"
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
|
|
|
|
#include "nodes/parsenodes.h"
|
|
|
|
|
|
|
|
#include "replication/output_plugin.h"
|
|
|
|
#include "replication/logical.h"
|
|
|
|
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
#include "utils/lsyscache.h"
|
|
|
|
#include "utils/memutils.h"
|
|
|
|
#include "utils/rel.h"
|
|
|
|
#include "utils/relcache.h"
|
|
|
|
#include "utils/syscache.h"
|
|
|
|
#include "utils/typcache.h"
|
|
|
|
|
|
|
|
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
|
2014-03-13 02:03:09 +08:00
|
|
|
/* These must be available to pg_dlsym() */
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
extern void _PG_init(void);
|
|
|
|
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
|
|
|
|
|
|
|
typedef struct
|
|
|
|
{
|
|
|
|
MemoryContext context;
|
|
|
|
bool include_xids;
|
|
|
|
bool include_timestamp;
|
|
|
|
} TestDecodingData;
|
|
|
|
|
|
|
|
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
|
|
bool is_init);
|
|
|
|
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
|
|
|
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn);
|
|
|
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
|
|
|
static void pg_decode_change(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, Relation rel,
|
|
|
|
ReorderBufferChange *change);
|
|
|
|
|
|
|
|
void
|
|
|
|
_PG_init(void)
|
|
|
|
{
|
|
|
|
/* other plugins can perform things here */
|
|
|
|
}
|
|
|
|
|
|
|
|
/* specify output plugin callbacks */
|
|
|
|
void
|
|
|
|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|
|
|
{
|
|
|
|
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
|
|
|
|
|
|
|
|
cb->startup_cb = pg_decode_startup;
|
|
|
|
cb->begin_cb = pg_decode_begin_txn;
|
|
|
|
cb->change_cb = pg_decode_change;
|
|
|
|
cb->commit_cb = pg_decode_commit_txn;
|
|
|
|
cb->shutdown_cb = pg_decode_shutdown;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* initialize this plugin */
|
|
|
|
static void
|
|
|
|
pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
|
|
bool is_init)
|
|
|
|
{
|
|
|
|
ListCell *option;
|
|
|
|
TestDecodingData *data;
|
|
|
|
|
|
|
|
data = palloc(sizeof(TestDecodingData));
|
|
|
|
data->context = AllocSetContextCreate(ctx->context,
|
|
|
|
"text conversion context",
|
|
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
|
|
data->include_xids = true;
|
|
|
|
data->include_timestamp = false;
|
|
|
|
|
|
|
|
ctx->output_plugin_private = data;
|
|
|
|
|
|
|
|
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
|
|
|
|
|
|
|
|
foreach(option, ctx->output_plugin_options)
|
|
|
|
{
|
|
|
|
DefElem *elem = lfirst(option);
|
|
|
|
|
|
|
|
Assert(elem->arg == NULL || IsA(elem->arg, String));
|
|
|
|
|
|
|
|
if (strcmp(elem->defname, "include-xids") == 0)
|
|
|
|
{
|
|
|
|
/* if option does not provide a value, it means its value is true */
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->include_xids = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
|
|
|
else if (strcmp(elem->defname, "include-timestamp") == 0)
|
|
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->include_timestamp = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
|
|
|
else if (strcmp(elem->defname, "force-binary") == 0)
|
|
|
|
{
|
|
|
|
bool force_binary;
|
|
|
|
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
continue;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &force_binary))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
|
|
|
|
if (force_binary)
|
|
|
|
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("option \"%s\" = \"%s\" is unknown",
|
|
|
|
elem->defname,
|
|
|
|
elem->arg ? strVal(elem->arg) : "(null)")));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* cleanup this plugin's resources */
|
|
|
|
static void
|
|
|
|
pg_decode_shutdown(LogicalDecodingContext *ctx)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
/* cleanup our own resources via memory context reset */
|
|
|
|
MemoryContextDelete(data->context);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* BEGIN callback */
|
|
|
|
static void
|
|
|
|
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
|
|
|
|
else
|
|
|
|
appendStringInfoString(ctx->out, "BEGIN");
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* COMMIT callback */
|
|
|
|
static void
|
|
|
|
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
|
|
|
|
else
|
|
|
|
appendStringInfoString(ctx->out, "COMMIT");
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
|
|
|
timestamptz_to_str(txn->commit_time));
|
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Print literal `outputstr' already represented as string of type `typid'
|
|
|
|
* into stringbuf `s'.
|
|
|
|
*
|
|
|
|
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
|
|
|
|
* if standard_conforming_strings were enabled.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
print_literal(StringInfo s, Oid typid, char *outputstr)
|
|
|
|
{
|
|
|
|
const char *valptr;
|
|
|
|
|
|
|
|
switch (typid)
|
|
|
|
{
|
|
|
|
case INT2OID:
|
|
|
|
case INT4OID:
|
|
|
|
case INT8OID:
|
|
|
|
case OIDOID:
|
|
|
|
case FLOAT4OID:
|
|
|
|
case FLOAT8OID:
|
|
|
|
case NUMERICOID:
|
|
|
|
/* NB: We don't care about Inf, NaN et al. */
|
|
|
|
appendStringInfoString(s, outputstr);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case BITOID:
|
|
|
|
case VARBITOID:
|
|
|
|
appendStringInfo(s, "B'%s'", outputstr);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case BOOLOID:
|
|
|
|
if (strcmp(outputstr, "t") == 0)
|
|
|
|
appendStringInfoString(s, "true");
|
|
|
|
else
|
|
|
|
appendStringInfoString(s, "false");
|
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
|
|
|
appendStringInfoChar(s, '\'');
|
|
|
|
for (valptr = outputstr; *valptr; valptr++)
|
|
|
|
{
|
|
|
|
char ch = *valptr;
|
|
|
|
|
|
|
|
if (SQL_STR_DOUBLE(ch, false))
|
|
|
|
appendStringInfoChar(s, ch);
|
|
|
|
appendStringInfoChar(s, ch);
|
|
|
|
}
|
|
|
|
appendStringInfoChar(s, '\'');
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* print the tuple 'tuple' into the StringInfo s */
|
|
|
|
static void
|
|
|
|
tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
|
|
|
|
{
|
|
|
|
int natt;
|
|
|
|
Oid oid;
|
|
|
|
|
|
|
|
/* print oid of tuple, it's not included in the TupleDesc */
|
|
|
|
if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
|
|
|
|
{
|
|
|
|
appendStringInfo(s, " oid[oid]:%u", oid);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* print all columns individually */
|
|
|
|
for (natt = 0; natt < tupdesc->natts; natt++)
|
|
|
|
{
|
|
|
|
Form_pg_attribute attr; /* the attribute itself */
|
|
|
|
Oid typid; /* type of current attribute */
|
|
|
|
Oid typoutput; /* output function */
|
|
|
|
bool typisvarlena;
|
|
|
|
Datum origval; /* possibly toasted Datum */
|
|
|
|
bool isnull; /* column is null? */
|
|
|
|
|
|
|
|
attr = tupdesc->attrs[natt];
|
|
|
|
|
|
|
|
/*
|
|
|
|
* don't print dropped columns, we can't be sure everything is
|
|
|
|
* available for them
|
|
|
|
*/
|
|
|
|
if (attr->attisdropped)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Don't print system columns, oid will already have been printed if
|
|
|
|
* present.
|
|
|
|
*/
|
|
|
|
if (attr->attnum < 0)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
typid = attr->atttypid;
|
|
|
|
|
|
|
|
/* get Datum from tuple */
|
|
|
|
origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
|
|
|
|
|
|
|
|
if (isnull && skip_nulls)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
/* print attribute name */
|
|
|
|
appendStringInfoChar(s, ' ');
|
|
|
|
appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
|
|
|
|
|
|
|
|
/* print attribute type */
|
|
|
|
appendStringInfoChar(s, '[');
|
|
|
|
appendStringInfoString(s, format_type_be(typid));
|
|
|
|
appendStringInfoChar(s, ']');
|
|
|
|
|
|
|
|
/* query output function */
|
|
|
|
getTypeOutputInfo(typid,
|
|
|
|
&typoutput, &typisvarlena);
|
|
|
|
|
|
|
|
/* print separator */
|
|
|
|
appendStringInfoChar(s, ':');
|
|
|
|
|
|
|
|
/* print data */
|
|
|
|
if (isnull)
|
|
|
|
appendStringInfoString(s, "null");
|
|
|
|
else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
|
|
|
|
appendStringInfoString(s, "unchanged-toast-datum");
|
|
|
|
else if (!typisvarlena)
|
|
|
|
print_literal(s, typid,
|
|
|
|
OidOutputFunctionCall(typoutput, origval));
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Datum val; /* definitely detoasted Datum */
|
|
|
|
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
|
|
|
|
print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* callback for individual changed tuples
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
Relation relation, ReorderBufferChange *change)
|
|
|
|
{
|
|
|
|
TestDecodingData *data;
|
|
|
|
Form_pg_class class_form;
|
|
|
|
TupleDesc tupdesc;
|
|
|
|
MemoryContext old;
|
|
|
|
|
|
|
|
data = ctx->output_plugin_private;
|
|
|
|
class_form = RelationGetForm(relation);
|
|
|
|
tupdesc = RelationGetDescr(relation);
|
|
|
|
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfoString(ctx->out, "table ");
|
|
|
|
appendStringInfoString(ctx->out,
|
|
|
|
quote_qualified_identifier(
|
|
|
|
get_namespace_name(
|
|
|
|
get_rel_namespace(RelationGetRelid(relation))),
|
|
|
|
NameStr(class_form->relname)));
|
|
|
|
appendStringInfoString(ctx->out, ":");
|
|
|
|
|
|
|
|
switch (change->action)
|
|
|
|
{
|
|
|
|
case REORDER_BUFFER_CHANGE_INSERT:
|
|
|
|
appendStringInfoString(ctx->out, " INSERT:");
|
2014-03-08 06:02:48 +08:00
|
|
|
if (change->data.tp.newtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 06:02:48 +08:00
|
|
|
&change->data.tp.newtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
false);
|
|
|
|
break;
|
|
|
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
|
|
|
appendStringInfoString(ctx->out, " UPDATE:");
|
2014-03-08 06:02:48 +08:00
|
|
|
if (change->data.tp.oldtuple != NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
{
|
|
|
|
appendStringInfoString(ctx->out, " old-key:");
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 06:02:48 +08:00
|
|
|
&change->data.tp.oldtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
true);
|
|
|
|
appendStringInfoString(ctx->out, " new-tuple:");
|
|
|
|
}
|
|
|
|
|
2014-03-08 06:02:48 +08:00
|
|
|
if (change->data.tp.newtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 06:02:48 +08:00
|
|
|
&change->data.tp.newtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
false);
|
|
|
|
break;
|
|
|
|
case REORDER_BUFFER_CHANGE_DELETE:
|
|
|
|
appendStringInfoString(ctx->out, " DELETE:");
|
|
|
|
|
|
|
|
/* if there was no PK, we only know that a delete happened */
|
2014-03-08 06:02:48 +08:00
|
|
|
if (change->data.tp.oldtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
/* In DELETE, only the replica identity is present; display that */
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 06:02:48 +08:00
|
|
|
&change->data.tp.oldtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
true);
|
|
|
|
break;
|
2014-03-08 06:02:48 +08:00
|
|
|
default:
|
|
|
|
Assert(false);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 05:32:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(old);
|
|
|
|
MemoryContextReset(data->context);
|
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|