diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 66d800f0cff..1024d51dca8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -931,10 +931,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state and SYNCDONE state, we would have already - * dropped the tablesync origin. + * For READY state, we would have already dropped the + * tablesync origin. */ - if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) + if (state != SUBREL_STATE_READY) { char originname[NAMEDATALEN]; @@ -942,8 +942,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Drop the tablesync's origin tracking if exists. * * It is possible that the origin is not yet created for - * tablesync worker so passing missing_ok = true. This can - * happen for the states before SUBREL_STATE_FINISHEDCOPY. + * tablesync worker, this can happen for the states before + * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or + * apply worker can also concurrently try to drop the + * origin and by this time the origin might be already + * removed. For these reasons, passing missing_ok = true. */ ReplicationOriginNameForTablesync(sub->oid, relid, originname, sizeof(originname)); @@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * - * For SYNCDONE/READY states, the tablesync origin tracking is known - * to have already been dropped by the tablesync worker. - * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - if (rstate->state != SUBREL_STATE_SYNCDONE) - { - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - } + ReplicationOriginNameForTablesync(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8514835ff4c..831d42016c1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * UpdateSubscriptionRelState must be called within a transaction. - * That transaction will be ended within the finish_sync_worker(). */ if (!IsTransactionState()) StartTransactionCommand(); @@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* - * Cleanup the tablesync origin tracking. - * - * Resetting the origin session removes the ownership of the slot. - * This is needed to allow the origin to be dropped. - */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; - - /* - * We expect that origin must be present. The concurrent operations - * that remove origin like a refresh for the subscription take an - * access exclusive lock on pg_subscription which prevent the previous - * operation to update the rel state to SUBREL_STATE_SYNCDONE to - * succeed. - */ - replorigin_drop_by_name(originname, false, false); - /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. @@ -343,7 +318,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Cleanup the tablesync slot. * - * This has to be done after the data changes because otherwise if + * This has to be done after updating the state because otherwise if * there is an error while doing the database operations we won't be * able to rollback dropped slot. */ @@ -359,6 +334,49 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + CommitTransactionCommand(); + pgstat_report_stat(false); + + /* + * Start a new transaction to clean up the tablesync origin tracking. + * This transaction will be ended within the finish_sync_worker(). + * Now, even, if we fail to remove this here, the apply worker will + * ensure to clean it up afterward. + * + * We need to do this after the table state is set to SYNCDONE. + * Otherwise, if an error occurs while performing the database + * operation, the worker will be restarted and the in-memory state of + * replication progress (remote_lsn) won't be rolled-back which would + * have been cleared before restart. So, the restarted worker will use + * invalid replication progress state resulting in replay of + * transactions that have already been applied. + */ + StartTransactionCommand(); + + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Resetting the origin session removes the ownership of the slot. + * This is needed to allow the origin to be dropped. + */ + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + /* + * Drop the tablesync's origin tracking if exists. + * + * There is a chance that the user is concurrently performing refresh + * for the subscription where we remove the table state and its origin + * or the apply worker would have removed this origin. So passing + * missing_ok = true. + */ + replorigin_drop_by_name(originname, true, false); + finish_sync_worker(); } else @@ -466,6 +484,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { + char originname[NAMEDATALEN]; + rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -475,7 +495,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Update the state to READY. + * Remove the tablesync origin tracking if exists. + * + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin or the tablesync worker would have + * already removed this origin. We can't rely on tablesync + * worker to remove the origin tracking as if there is any + * error while dropping we won't restart it to drop the + * origin. So passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + + /* + * Update the state to READY only after the origin cleanup. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state,