From fd0b9dcebda7b931a41ce5c8e86d13f2efd0af2e Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 2 Jun 2022 08:31:50 +0530 Subject: [PATCH] Prohibit combining publications with different column lists. Currently, we simply combine the column lists when publishing tables on multiple publications and that can sometimes lead to unexpected behavior. Say, if a column is published in any row-filtered publication, then the values for that column are sent to the subscriber even for rows that don't match the row filter, as long as the row matches the row filter for any other publication, even if that other publication doesn't include the column. The main purpose of introducing a column list is to have statically different shapes on publisher and subscriber or hide sensitive column data. In both cases, it doesn't seem to make sense to combine column lists. So, we disallow the cases where the column list is different for the same table when combining publications. It can be later extended to combine the column lists for selective cases where required. Reported-by: Alvaro Herrera Author: Hou Zhijie Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/202204251548.mudq7jbqnh7r@alvherre.pgsql --- doc/src/sgml/ref/alter_publication.sgml | 12 +- doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/commands/subscriptioncmds.c | 28 +++- src/backend/replication/logical/tablesync.c | 72 +++++----- src/backend/replication/pgoutput/pgoutput.c | 80 +++++------ src/test/subscription/t/031_column_list.pl | 150 +++++++++----------- 6 files changed, 181 insertions(+), 166 deletions(-) diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index e2cce49471b..3e338f4cc5d 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -116,7 +116,17 @@ ALTER PUBLICATION name RENAME TO Optionally, a column list can be specified. See for details. + linkend="sql-createpublication"/> for details. Note that a subscription + having several publications in which the same table has been published + with different column lists is not supported. So, changing the column + lists of the tables being subscribed could cause inconsistency of column + lists among publications, in which case ALTER PUBLICATION + will be successful but later the WalSender on the publisher or the + subscriber may throw an error. In this scenario, the user needs to + recreate the subscription after adjusting the column list or drop the + problematic publication using + ALTER SUBSCRIPTION ... DROP PUBLICATION and then add + it back after adjusting the column list. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 203bb41844f..35b39c28dac 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -355,6 +355,11 @@ CREATE SUBSCRIPTION subscription_name + + Subscriptions having several publications in which the same table has been + published with different column lists are not supported. + + We allow non-existent publications to be specified so that users can add those later. This means diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 690cdaa426e..83e6eae855f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1754,6 +1754,11 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) /* * Get the list of tables which belong to specified publications on the * publisher connection. + * + * Note that we don't support the case where the column list is different for + * the same table in different publications to avoid sending unwanted column + * information for some of the rows. This can happen when both the column + * list and row filter are specified for different publications. */ static List * fetch_table_list(WalReceiverConn *wrconn, List *publications) @@ -1761,17 +1766,23 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID}; List *tablelist = NIL; + bool check_columnlist = (walrcv_server_version(wrconn) >= 150000); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + + /* Get column lists for each relation if the publisher supports it */ + if (check_columnlist) + appendStringInfoString(&cmd, ", t.attnames\n"); + + appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" " WHERE t.pubname IN ("); get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1795,7 +1806,14 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(!isnull); rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); + + if (check_columnlist && list_member(tablelist, rv)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + else + tablelist = lappend(tablelist, rv); ExecClearTuple(slot); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 994c7a09d92..670c6fcada5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -753,17 +753,6 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Get column lists for each relation. * - * For initial synchronization, column lists can be ignored in following - * cases: - * - * 1) one of the subscribed publications for the table hasn't specified - * any column list - * - * 2) one of the subscribed publications has puballtables set to true - * - * 3) one of the subscribed publications is declared as ALL TABLES IN - * SCHEMA that includes this relation - * * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ @@ -771,7 +760,7 @@ fetch_remote_table_info(char *nspname, char *relname, { WalRcvExecResult *pubres; TupleTableSlot *slot; - Oid attrsRow[] = {INT2OID}; + Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; bool first = true; @@ -786,19 +775,17 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Fetch info about column lists for the relation (from all the - * publications). We unnest the int2vector values, because that makes - * it easier to combine lists by simply adding the attnums to a new - * bitmap (without having to parse the int2vector data). This - * preserves NULL values, so that if one of the publications has no - * column list, we'll know that. + * publications). */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT DISTINCT unnest" + "SELECT DISTINCT" + " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" + " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt" - " LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE" - " WHERE gpt.relid = %u" + " LATERAL pg_get_publication_tables(p.pubname) gpt," + " pg_class c" + " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names.data); @@ -813,26 +800,43 @@ fetch_remote_table_info(char *nspname, char *relname, nspname, relname, pubres->err))); /* - * Merge the column lists (from different publications) by creating a - * single bitmap with all the attnums. If we find a NULL value, that - * means one of the publications has no column list for the table - * we're syncing. + * We don't support the case where the column list is different for + * the same table when combining publications. See comments atop + * fetch_table_list. So there should be only one row returned. + * Although we already checked this when creating the subscription, we + * still need to check here in case the column list was changed after + * creating the subscription and before the sync worker is started. + */ + if (tuplestore_tuple_count(pubres->tuplestore) > 1) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + + /* + * Get the column list and build a single bitmap with the attnums. + * + * If we find a NULL value, it means all the columns should be + * replicated. */ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) { Datum cfval = slot_getattr(slot, 1, &isnull); - /* NULL means empty column list, so we're done. */ - if (isnull) + if (!isnull) { - bms_free(included_cols); - included_cols = NULL; - break; - } + ArrayType *arr; + int nelems; + int16 *elems; - included_cols = bms_add_member(included_cols, - DatumGetInt16(cfval)); + arr = DatumGetArrayTypeP(cfval); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (natt = 0; natt < nelems; natt++) + included_cols = bms_add_member(included_cols, elems[natt]); + } ExecClearTuple(slot); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 42c06af2391..8deae571433 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -979,30 +979,31 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry) { ListCell *lc; + bool first = true; + Relation relation = RelationIdGetRelation(entry->publish_as_relid); /* * Find if there are any column lists for this relation. If there are, - * build a bitmap merging all the column lists. - * - * All the given publication-table mappings must be checked. + * build a bitmap using the column lists. * * Multiple publications might have multiple column lists for this * relation. * + * Note that we don't support the case where the column list is different + * for the same table when combining publications. See comments atop + * fetch_table_list. But one can later change the publication so we still + * need to check all the given publication-table mappings and report an + * error if any publications have a different column list. + * * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column - * list" so it takes precedence. + * list". */ foreach(lc, publications) { Publication *pub = lfirst(lc); HeapTuple cftuple = NULL; Datum cfdatum = 0; - - /* - * Assume there's no column list. Only if we find pg_publication_rel - * entry with a column list we'll switch it to false. - */ - bool pub_no_list = true; + Bitmapset *cols = NULL; /* * If the publication is FOR ALL TABLES then it is treated the same as @@ -1011,6 +1012,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, */ if (!pub->alltables) { + bool pub_no_list = true; + /* * Check for the presence of a column list in this publication. * @@ -1024,51 +1027,48 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, if (HeapTupleIsValid(cftuple)) { - /* - * Lookup the column list attribute. - * - * Note: We update the pub_no_list value directly, because if - * the value is NULL, we have no list (and vice versa). - */ + /* Lookup the column list attribute. */ cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, Anum_pg_publication_rel_prattrs, &pub_no_list); - /* - * Build the column list bitmap in the per-entry context. - * - * We need to merge column lists from all publications, so we - * update the same bitmapset. If the column list is null, we - * interpret it as replicating all columns. - */ + /* Build the column list bitmap in the per-entry context. */ if (!pub_no_list) /* when not null */ { pgoutput_ensure_entry_cxt(data, entry); - entry->columns = pub_collist_to_bitmapset(entry->columns, - cfdatum, - entry->entry_cxt); + cols = pub_collist_to_bitmapset(cols, cfdatum, + entry->entry_cxt); + + /* + * If column list includes all the columns of the table, + * set it to NULL. + */ + if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation)) + { + bms_free(cols); + cols = NULL; + } } + + ReleaseSysCache(cftuple); } } - /* - * Found a publication with no column list, so we're done. But first - * discard column list we might have from preceding publications. - */ - if (pub_no_list) + if (first) { - if (cftuple) - ReleaseSysCache(cftuple); - - bms_free(entry->columns); - entry->columns = NULL; - - break; + entry->columns = cols; + first = false; } - - ReleaseSysCache(cftuple); + else if (!bms_equal(entry->columns, cols)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation))); } /* loop all subscribed publications */ + + RelationClose(relation); } /* diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 19812e11f31..7f031bc1951 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -20,6 +20,7 @@ $node_subscriber->append_conf('postgresql.conf', $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $offset = 0; sub wait_for_subscription_sync { @@ -361,13 +362,13 @@ is( $result, qq(1|abc 2|xyz), 'update on column tab2.c is not replicated'); -# TEST: add a table to two publications with different column lists, and +# TEST: add a table to two publications with same column lists, and # create a single subscription replicating both publications $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); - CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d); + CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b); -- insert a couple initial records INSERT INTO tab5 VALUES (1, 11, 111, 1111); @@ -388,8 +389,7 @@ wait_for_subscription_sync($node_subscriber); $node_publisher->wait_for_catchup('sub1'); -# insert data and make sure all the columns (union of the columns lists) -# get fully replicated +# insert data and make sure the columns in column list get fully replicated $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab5 VALUES (3, 33, 333, 3333); @@ -399,42 +399,12 @@ $node_publisher->safe_psql( $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111 -2|22|2222 -3|33|3333 -4|44|4444), + qq(1|11| +2|22| +3|33| +4|44|), 'overlapping publications with overlapping column lists'); -# and finally, remove the column list for one of the publications, which -# means replicating all columns (removing the column list), but first add -# the missing column to the table on subscriber -$node_publisher->safe_psql( - 'postgres', qq( - ALTER PUBLICATION pub3 SET TABLE tab5; -)); - -$node_subscriber->safe_psql( - 'postgres', qq( - ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; - ALTER TABLE tab5 ADD COLUMN c INT; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql( - 'postgres', qq( - INSERT INTO tab5 VALUES (5, 55, 555, 5555); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111| -2|22|2222| -3|33|3333| -4|44|4444| -5|55|5555|555), - 'overlapping publications with overlapping column lists'); # TEST: create a table with a column list, then change the replica # identity by replacing a primary key (but use a different column in @@ -900,57 +870,21 @@ is( $node_subscriber->safe_psql( 3|), 'partitions with different replica identities not replicated correctly'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. So with column lists (a,b) and (a,c) we -# should replicate (a,b,c). -$node_publisher->safe_psql( - 'postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); - CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); - - -- initial data - INSERT INTO test_mix_1 VALUES (1, 2, 3); -)); - -$node_subscriber->safe_psql( - 'postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql( - 'postgres', qq( - INSERT INTO test_mix_1 VALUES (4, 5, 6); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is( $node_subscriber->safe_psql( - 'postgres', "SELECT * FROM test_mix_1 ORDER BY a"), - qq(1|2|3 -4|5|6), - 'a mix of publications should use a union of column list'); - - -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES, we should replicate all columns. +# TEST: With a table included in the publications which is FOR ALL TABLES, it +# means replicate all columns. # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES $node_publisher->safe_psql( 'postgres', qq( - DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1, + DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_part, test_part_a, test_part_b, test_part_c, test_part_d; )); $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b); + CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c); CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; -- initial data @@ -976,12 +910,11 @@ $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); + 'all columns should be replicated'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES IN SCHEMA, we should replicate all columns. +# TEST: With a table included in the publication which is FOR ALL TABLES +# IN SCHEMA, it means replicate all columns. $node_subscriber->safe_psql( 'postgres', qq( @@ -993,7 +926,7 @@ $node_publisher->safe_psql( 'postgres', qq( DROP TABLE test_mix_2; CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b); + CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c); CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public; -- initial data @@ -1017,7 +950,7 @@ $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); + 'all columns should be replicated'); # TEST: Check handling of publish_via_partition_root - if a partition is @@ -1074,7 +1007,7 @@ is( $node_subscriber->safe_psql( # TEST: Multiple publications which publish schema of parent table and # partition. The partition is published through two publications, once # through a schema (so no column list) containing the parent, and then -# also directly (with a columns list). The expected outcome is there is +# also directly (with all columns). The expected outcome is there is # no column list. $node_publisher->safe_psql( @@ -1086,7 +1019,7 @@ $node_publisher->safe_psql( CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1; - CREATE PUBLICATION pub2 FOR TABLE t_1(b); + CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c); -- initial data INSERT INTO s1.t VALUES (1, 2, 3); @@ -1233,6 +1166,51 @@ is( $node_subscriber->safe_psql( 'publication containing both parent and child relation'); +# TEST: With a table included in multiple publications with different column +# lists, we should catch the error when creating the subscription. + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); + CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); +)); + +$node_subscriber->safe_psql('postgres', qq( + DROP SUBSCRIPTION sub1; + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); +)); + +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql( + 'postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; +)); + +ok( $stderr =~ + qr/cannot use different column lists for table "public.test_mix_1" in different publications/, + 'different column lists detected'); + +# TEST: If the column list is changed after creating the subscription, we +# should catch the error reported by walsender. + +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, c); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; +)); + +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', qq( + ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, b); + INSERT INTO test_mix_1 VALUES(1, 1, 1); +)); + +$offset = $node_publisher->wait_for_log( + qr/cannot use different column lists for table "public.test_mix_1" in different publications/, + $offset); + $node_subscriber->stop('fast'); $node_publisher->stop('fast');