mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-27 08:39:28 +08:00
578b229718
Previously tables declared WITH OIDS, including a significant fraction of the catalog tables, stored the oid column not as a normal column, but as part of the tuple header. This special column was not shown by default, which was somewhat odd, as it's often (consider e.g. pg_class.oid) one of the more important parts of a row. Neither pg_dump nor COPY included the contents of the oid column by default. The fact that the oid column was not an ordinary column necessitated a significant amount of special case code to support oid columns. That already was painful for the existing, but upcoming work aiming to make table storage pluggable, would have required expanding and duplicating that "specialness" significantly. WITH OIDS has been deprecated since 2005 (commit ff02d0a05280e0). Remove it. Removing includes: - CREATE TABLE and ALTER TABLE syntax for declaring the table to be WITH OIDS has been removed (WITH (oids[ = true]) will error out) - pg_dump does not support dumping tables declared WITH OIDS and will issue a warning when dumping one (and ignore the oid column). - restoring an pg_dump archive with pg_restore will warn when restoring a table with oid contents (and ignore the oid column) - COPY will refuse to load binary dump that includes oids. - pg_upgrade will error out when encountering tables declared WITH OIDS, they have to be altered to remove the oid column first. - Functionality to access the oid of the last inserted row (like plpgsql's RESULT_OID, spi's SPI_lastoid, ...) has been removed. The syntax for declaring a table WITHOUT OIDS (or WITH (oids = false) for CREATE TABLE) is still supported. While that requires a bit of support code, it seems unnecessary to break applications / dumps that do not use oids, and are explicit about not using them. The biggest user of WITH OID columns was postgres' catalog. This commit changes all 'magic' oid columns to be columns that are normally declared and stored. To reduce unnecessary query breakage all the newly added columns are still named 'oid', even if a table's column naming scheme would indicate 'reloid' or such. This obviously requires adapting a lot code, mostly replacing oid access via HeapTupleGetOid() with access to the underlying Form_pg_*->oid column. The bootstrap process now assigns oids for all oid columns in genbki.pl that do not have an explicit value (starting at the largest oid previously used), only oids assigned later by oids will be above FirstBootstrapObjectId. As the oid column now is a normal column the special bootstrap syntax for oids has been removed. Oids are not automatically assigned during insertion anymore, all backend code explicitly assigns oids with GetNewOidWithIndex(). For the rare case that insertions into the catalog via SQL are called for the new pg_nextoid() function can be used (which only works on catalog tables). The fact that oid columns on system tables are now normal columns means that they will be included in the set of columns expanded by * (i.e. SELECT * FROM pg_class will now include the table's oid, previously it did not). It'd not technically be hard to hide oid column by default, but that'd mean confusing behavior would either have to be carried forward forever, or it'd cause breakage down the line. While it's not unlikely that further adjustments are needed, the scope/invasiveness of the patch makes it worthwhile to get merge this now. It's painful to maintain externally, too complicated to commit after the code code freeze, and a dependency of a number of other patches. Catversion bump, for obvious reasons. Author: Andres Freund, with contributions by John Naylor Discussion: https://postgr.es/m/20180930034810.ywp2c7awz7opzcfr@alap3.anarazel.de
545 lines
15 KiB
C
545 lines
15 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* test_decoding.c
|
|
* example logical decoding output plugin
|
|
*
|
|
* Copyright (c) 2012-2018, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* contrib/test_decoding/test_decoding.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
#include "replication/logical.h"
|
|
#include "replication/origin.h"
|
|
|
|
#include "utils/builtins.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/rel.h"
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
/* These must be available to pg_dlsym() */
|
|
extern void _PG_init(void);
|
|
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
|
|
|
typedef struct
|
|
{
|
|
MemoryContext context;
|
|
bool include_xids;
|
|
bool include_timestamp;
|
|
bool skip_empty_xacts;
|
|
bool xact_wrote_changes;
|
|
bool only_local;
|
|
} TestDecodingData;
|
|
|
|
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
bool is_init);
|
|
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
|
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
static void pg_output_begin(LogicalDecodingContext *ctx,
|
|
TestDecodingData *data,
|
|
ReorderBufferTXN *txn,
|
|
bool last_write);
|
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
|
static void pg_decode_change(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, Relation rel,
|
|
ReorderBufferChange *change);
|
|
static void pg_decode_truncate(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn,
|
|
int nrelations, Relation relations[],
|
|
ReorderBufferChange *change);
|
|
static bool pg_decode_filter(LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id);
|
|
static void pg_decode_message(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
|
|
bool transactional, const char *prefix,
|
|
Size sz, const char *message);
|
|
|
|
void
|
|
_PG_init(void)
|
|
{
|
|
/* other plugins can perform things here */
|
|
}
|
|
|
|
/* specify output plugin callbacks */
|
|
void
|
|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|
{
|
|
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
|
|
|
|
cb->startup_cb = pg_decode_startup;
|
|
cb->begin_cb = pg_decode_begin_txn;
|
|
cb->change_cb = pg_decode_change;
|
|
cb->truncate_cb = pg_decode_truncate;
|
|
cb->commit_cb = pg_decode_commit_txn;
|
|
cb->filter_by_origin_cb = pg_decode_filter;
|
|
cb->shutdown_cb = pg_decode_shutdown;
|
|
cb->message_cb = pg_decode_message;
|
|
}
|
|
|
|
|
|
/* initialize this plugin */
|
|
static void
|
|
pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
bool is_init)
|
|
{
|
|
ListCell *option;
|
|
TestDecodingData *data;
|
|
|
|
data = palloc0(sizeof(TestDecodingData));
|
|
data->context = AllocSetContextCreate(ctx->context,
|
|
"text conversion context",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
data->include_xids = true;
|
|
data->include_timestamp = false;
|
|
data->skip_empty_xacts = false;
|
|
data->only_local = false;
|
|
|
|
ctx->output_plugin_private = data;
|
|
|
|
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
|
|
opt->receive_rewrites = false;
|
|
|
|
foreach(option, ctx->output_plugin_options)
|
|
{
|
|
DefElem *elem = lfirst(option);
|
|
|
|
Assert(elem->arg == NULL || IsA(elem->arg, String));
|
|
|
|
if (strcmp(elem->defname, "include-xids") == 0)
|
|
{
|
|
/* if option does not provide a value, it means its value is true */
|
|
if (elem->arg == NULL)
|
|
data->include_xids = true;
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
}
|
|
else if (strcmp(elem->defname, "include-timestamp") == 0)
|
|
{
|
|
if (elem->arg == NULL)
|
|
data->include_timestamp = true;
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
}
|
|
else if (strcmp(elem->defname, "force-binary") == 0)
|
|
{
|
|
bool force_binary;
|
|
|
|
if (elem->arg == NULL)
|
|
continue;
|
|
else if (!parse_bool(strVal(elem->arg), &force_binary))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
if (force_binary)
|
|
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
|
}
|
|
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
data->skip_empty_xacts = true;
|
|
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
}
|
|
else if (strcmp(elem->defname, "only-local") == 0)
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
data->only_local = true;
|
|
else if (!parse_bool(strVal(elem->arg), &data->only_local))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
}
|
|
else if (strcmp(elem->defname, "include-rewrites") == 0)
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
continue;
|
|
else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
strVal(elem->arg), elem->defname)));
|
|
}
|
|
else
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("option \"%s\" = \"%s\" is unknown",
|
|
elem->defname,
|
|
elem->arg ? strVal(elem->arg) : "(null)")));
|
|
}
|
|
}
|
|
}
|
|
|
|
/* cleanup this plugin's resources */
|
|
static void
|
|
pg_decode_shutdown(LogicalDecodingContext *ctx)
|
|
{
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
/* cleanup our own resources via memory context reset */
|
|
MemoryContextDelete(data->context);
|
|
}
|
|
|
|
/* BEGIN callback */
|
|
static void
|
|
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|
{
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
data->xact_wrote_changes = false;
|
|
if (data->skip_empty_xacts)
|
|
return;
|
|
|
|
pg_output_begin(ctx, data, txn, true);
|
|
}
|
|
|
|
static void
|
|
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
|
|
{
|
|
OutputPluginPrepareWrite(ctx, last_write);
|
|
if (data->include_xids)
|
|
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
|
|
else
|
|
appendStringInfoString(ctx->out, "BEGIN");
|
|
OutputPluginWrite(ctx, last_write);
|
|
}
|
|
|
|
/* COMMIT callback */
|
|
static void
|
|
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn)
|
|
{
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
if (data->skip_empty_xacts && !data->xact_wrote_changes)
|
|
return;
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
if (data->include_xids)
|
|
appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
|
|
else
|
|
appendStringInfoString(ctx->out, "COMMIT");
|
|
|
|
if (data->include_timestamp)
|
|
appendStringInfo(ctx->out, " (at %s)",
|
|
timestamptz_to_str(txn->commit_time));
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
|
|
static bool
|
|
pg_decode_filter(LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id)
|
|
{
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
if (data->only_local && origin_id != InvalidRepOriginId)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Print literal `outputstr' already represented as string of type `typid'
|
|
* into stringbuf `s'.
|
|
*
|
|
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
|
|
* if standard_conforming_strings were enabled.
|
|
*/
|
|
static void
|
|
print_literal(StringInfo s, Oid typid, char *outputstr)
|
|
{
|
|
const char *valptr;
|
|
|
|
switch (typid)
|
|
{
|
|
case INT2OID:
|
|
case INT4OID:
|
|
case INT8OID:
|
|
case OIDOID:
|
|
case FLOAT4OID:
|
|
case FLOAT8OID:
|
|
case NUMERICOID:
|
|
/* NB: We don't care about Inf, NaN et al. */
|
|
appendStringInfoString(s, outputstr);
|
|
break;
|
|
|
|
case BITOID:
|
|
case VARBITOID:
|
|
appendStringInfo(s, "B'%s'", outputstr);
|
|
break;
|
|
|
|
case BOOLOID:
|
|
if (strcmp(outputstr, "t") == 0)
|
|
appendStringInfoString(s, "true");
|
|
else
|
|
appendStringInfoString(s, "false");
|
|
break;
|
|
|
|
default:
|
|
appendStringInfoChar(s, '\'');
|
|
for (valptr = outputstr; *valptr; valptr++)
|
|
{
|
|
char ch = *valptr;
|
|
|
|
if (SQL_STR_DOUBLE(ch, false))
|
|
appendStringInfoChar(s, ch);
|
|
appendStringInfoChar(s, ch);
|
|
}
|
|
appendStringInfoChar(s, '\'');
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* print the tuple 'tuple' into the StringInfo s */
|
|
static void
|
|
tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
|
|
{
|
|
int natt;
|
|
|
|
/* print all columns individually */
|
|
for (natt = 0; natt < tupdesc->natts; natt++)
|
|
{
|
|
Form_pg_attribute attr; /* the attribute itself */
|
|
Oid typid; /* type of current attribute */
|
|
Oid typoutput; /* output function */
|
|
bool typisvarlena;
|
|
Datum origval; /* possibly toasted Datum */
|
|
bool isnull; /* column is null? */
|
|
|
|
attr = TupleDescAttr(tupdesc, natt);
|
|
|
|
/*
|
|
* don't print dropped columns, we can't be sure everything is
|
|
* available for them
|
|
*/
|
|
if (attr->attisdropped)
|
|
continue;
|
|
|
|
/*
|
|
* Don't print system columns, oid will already have been printed if
|
|
* present.
|
|
*/
|
|
if (attr->attnum < 0)
|
|
continue;
|
|
|
|
typid = attr->atttypid;
|
|
|
|
/* get Datum from tuple */
|
|
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
|
|
|
|
if (isnull && skip_nulls)
|
|
continue;
|
|
|
|
/* print attribute name */
|
|
appendStringInfoChar(s, ' ');
|
|
appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
|
|
|
|
/* print attribute type */
|
|
appendStringInfoChar(s, '[');
|
|
appendStringInfoString(s, format_type_be(typid));
|
|
appendStringInfoChar(s, ']');
|
|
|
|
/* query output function */
|
|
getTypeOutputInfo(typid,
|
|
&typoutput, &typisvarlena);
|
|
|
|
/* print separator */
|
|
appendStringInfoChar(s, ':');
|
|
|
|
/* print data */
|
|
if (isnull)
|
|
appendStringInfoString(s, "null");
|
|
else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
|
|
appendStringInfoString(s, "unchanged-toast-datum");
|
|
else if (!typisvarlena)
|
|
print_literal(s, typid,
|
|
OidOutputFunctionCall(typoutput, origval));
|
|
else
|
|
{
|
|
Datum val; /* definitely detoasted Datum */
|
|
|
|
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
|
|
print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* callback for individual changed tuples
|
|
*/
|
|
static void
|
|
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
Relation relation, ReorderBufferChange *change)
|
|
{
|
|
TestDecodingData *data;
|
|
Form_pg_class class_form;
|
|
TupleDesc tupdesc;
|
|
MemoryContext old;
|
|
|
|
data = ctx->output_plugin_private;
|
|
|
|
/* output BEGIN if we haven't yet */
|
|
if (data->skip_empty_xacts && !data->xact_wrote_changes)
|
|
{
|
|
pg_output_begin(ctx, data, txn, false);
|
|
}
|
|
data->xact_wrote_changes = true;
|
|
|
|
class_form = RelationGetForm(relation);
|
|
tupdesc = RelationGetDescr(relation);
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
appendStringInfoString(ctx->out, "table ");
|
|
appendStringInfoString(ctx->out,
|
|
quote_qualified_identifier(
|
|
get_namespace_name(
|
|
get_rel_namespace(RelationGetRelid(relation))),
|
|
class_form->relrewrite ?
|
|
get_rel_name(class_form->relrewrite) :
|
|
NameStr(class_form->relname)));
|
|
appendStringInfoChar(ctx->out, ':');
|
|
|
|
switch (change->action)
|
|
{
|
|
case REORDER_BUFFER_CHANGE_INSERT:
|
|
appendStringInfoString(ctx->out, " INSERT:");
|
|
if (change->data.tp.newtuple == NULL)
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
else
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
|
&change->data.tp.newtuple->tuple,
|
|
false);
|
|
break;
|
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
|
appendStringInfoString(ctx->out, " UPDATE:");
|
|
if (change->data.tp.oldtuple != NULL)
|
|
{
|
|
appendStringInfoString(ctx->out, " old-key:");
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
|
&change->data.tp.oldtuple->tuple,
|
|
true);
|
|
appendStringInfoString(ctx->out, " new-tuple:");
|
|
}
|
|
|
|
if (change->data.tp.newtuple == NULL)
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
else
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
|
&change->data.tp.newtuple->tuple,
|
|
false);
|
|
break;
|
|
case REORDER_BUFFER_CHANGE_DELETE:
|
|
appendStringInfoString(ctx->out, " DELETE:");
|
|
|
|
/* if there was no PK, we only know that a delete happened */
|
|
if (change->data.tp.oldtuple == NULL)
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
/* In DELETE, only the replica identity is present; display that */
|
|
else
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
|
&change->data.tp.oldtuple->tuple,
|
|
true);
|
|
break;
|
|
default:
|
|
Assert(false);
|
|
}
|
|
|
|
MemoryContextSwitchTo(old);
|
|
MemoryContextReset(data->context);
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
|
|
static void
|
|
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
int nrelations, Relation relations[], ReorderBufferChange *change)
|
|
{
|
|
TestDecodingData *data;
|
|
MemoryContext old;
|
|
int i;
|
|
|
|
data = ctx->output_plugin_private;
|
|
|
|
/* output BEGIN if we haven't yet */
|
|
if (data->skip_empty_xacts && !data->xact_wrote_changes)
|
|
{
|
|
pg_output_begin(ctx, data, txn, false);
|
|
}
|
|
data->xact_wrote_changes = true;
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
appendStringInfoString(ctx->out, "table ");
|
|
|
|
for (i = 0; i < nrelations; i++)
|
|
{
|
|
if (i > 0)
|
|
appendStringInfoString(ctx->out, ", ");
|
|
|
|
appendStringInfoString(ctx->out,
|
|
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
|
|
NameStr(relations[i]->rd_rel->relname)));
|
|
}
|
|
|
|
appendStringInfoString(ctx->out, ": TRUNCATE:");
|
|
|
|
if (change->data.truncate.restart_seqs
|
|
|| change->data.truncate.cascade)
|
|
{
|
|
if (change->data.truncate.restart_seqs)
|
|
appendStringInfo(ctx->out, " restart_seqs");
|
|
if (change->data.truncate.cascade)
|
|
appendStringInfo(ctx->out, " cascade");
|
|
}
|
|
else
|
|
appendStringInfoString(ctx->out, " (no-flags)");
|
|
|
|
MemoryContextSwitchTo(old);
|
|
MemoryContextReset(data->context);
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
|
|
static void
|
|
pg_decode_message(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
|
const char *prefix, Size sz, const char *message)
|
|
{
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
|
|
transactional, prefix, sz);
|
|
appendBinaryStringInfo(ctx->out, message, sz);
|
|
OutputPluginWrite(ctx, true);
|
|
}
|