diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index 78d36429c8a..de6dc416130 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -138,12 +138,46 @@ SELECT slot_name FROM pg_stat_replication_slots;
 (3 rows)
 
 COMMIT;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
+ ?column? 
+----------
+ init
+(1 row)
+
+-- The INSERT changes are large enough to be spilled but will not be, because
+-- the transaction is aborted. The logical decoding skips collecting further
+-- changes too. The transaction is prepared to make sure the decoding processes
+-- the aborted transaction.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i);
+PREPARE TRANSACTION 'test1_abort';
+ROLLBACK PREPARED 'test1_abort';
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ count 
+-------
+     1
+(1 row)
+
+-- Verify that the decoding doesn't spill already-aborted transaction's changes.
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+ 
+(1 row)
+
+SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+            slot_name            | spill_txns | spill_count 
+---------------------------------+------------+-------------
+ regression_slot_stats4_twophase |          0 |           0
+(1 row)
+
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot_stats1'),
     pg_drop_replication_slot('regression_slot_stats2'),
-    pg_drop_replication_slot('regression_slot_stats3');
- pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot 
---------------------------+--------------------------+--------------------------
-                          |                          | 
+    pg_drop_replication_slot('regression_slot_stats3'),
+    pg_drop_replication_slot('regression_slot_stats4_twophase');
+ pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot 
+--------------------------+--------------------------+--------------------------+--------------------------
+                          |                          |                          | 
 (1 row)
 
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index a76f77601e2..9879e02ca84 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -114,7 +114,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  * detect that the subtransaction was aborted, and reset the transaction while having
  * the TOAST changes in memory, resulting in deallocating both decoded changes and
  * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ *
+ * Set debug_logical_replication_streaming to 'immediate' to disable the transaction
+ * status check happening before streaming the second insertion, so we can detect a
+ * concurrent abort while streaming.
  */
+SET debug_logical_replication_streaming = immediate;
 BEGIN;
 INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
 ALTER TABLE stream_test ADD COLUMN i INT;
@@ -128,6 +133,7 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
      5
 (1 row)
 
+RESET debug_logical_replication_streaming;
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 630371f147a..a022fe1bf07 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -50,7 +50,25 @@ SELECT slot_name FROM pg_stat_replication_slots;
 SELECT slot_name FROM pg_stat_replication_slots;
 COMMIT;
 
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
+
+-- The INSERT changes are large enough to be spilled but will not be, because
+-- the transaction is aborted. The logical decoding skips collecting further
+-- changes too. The transaction is prepared to make sure the decoding processes
+-- the aborted transaction.
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i);
+PREPARE TRANSACTION 'test1_abort';
+ROLLBACK PREPARED 'test1_abort';
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Verify that the decoding doesn't spill already-aborted transaction's changes.
+SELECT pg_stat_force_next_flush();
+SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot_stats1'),
     pg_drop_replication_slot('regression_slot_stats2'),
-    pg_drop_replication_slot('regression_slot_stats3');
+    pg_drop_replication_slot('regression_slot_stats3'),
+    pg_drop_replication_slot('regression_slot_stats4_twophase');
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 7f43f0c2ab7..f1269403e0a 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -49,7 +49,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  * detect that the subtransaction was aborted, and reset the transaction while having
  * the TOAST changes in memory, resulting in deallocating both decoded changes and
  * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ *
+ * Set debug_logical_replication_streaming to 'immediate' to disable the transaction
+ * status check happening before streaming the second insertion, so we can detect a
+ * concurrent abort while streaming.
  */
+SET debug_logical_replication_streaming = immediate;
 BEGIN;
 INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
 ALTER TABLE stream_test ADD COLUMN i INT;
@@ -58,6 +63,7 @@ INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000')
 ROLLBACK TO s1;
 COMMIT;
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+RESET debug_logical_replication_streaming;
 
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 10a37667a51..ed5a2946dc1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -106,6 +106,7 @@
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
@@ -260,6 +261,8 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									 bool txn_prepared);
+static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
@@ -793,11 +796,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	/*
-	 * While streaming the previous changes we have detected that the
-	 * transaction is aborted.  So there is no point in collecting further
-	 * changes for it.
+	 * If we have detected that the transaction is aborted while streaming the
+	 * previous changes or by checking its CLOG, there is no point in
+	 * collecting further changes for it.
 	 */
-	if (txn->concurrent_abort)
+	if (rbtxn_is_aborted(txn))
 	{
 		/*
 		 * We don't need to update memory accounting for this change as we
@@ -1620,8 +1623,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 /*
  * Discard changes from a transaction (and subtransactions), either after
- * streaming or decoding them at PREPARE. Keep the remaining info -
- * transactions, tuplecids, invalidations and snapshots.
+ * streaming, decoding them at PREPARE, or detecting the transaction abort.
+ * Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
  *
  * We additionally remove tuplecids after decoding the transaction at prepare
  * time as we only need to perform invalidation at rollback or commit prepared.
@@ -1650,6 +1654,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		Assert(rbtxn_is_known_subxact(subtxn));
 		Assert(subtxn->nsubtxns == 0);
 
+		ReorderBufferMaybeMarkTXNStreamed(rb, subtxn);
 		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
 	}
 
@@ -1680,24 +1685,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	/* Update the memory counter */
 	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
 
-	/*
-	 * Mark the transaction as streamed.
-	 *
-	 * The top-level transaction, is marked as streamed always, even if it
-	 * does not contain any changes (that is, when all the changes are in
-	 * subtransactions).
-	 *
-	 * For subtransactions, we only mark them as streamed when there are
-	 * changes in them.
-	 *
-	 * We do it this way because of aborts - we don't want to send aborts for
-	 * XIDs the downstream is not aware of. And of course, it always knows
-	 * about the toplevel xact (we send the XID in all messages), but we never
-	 * stream XIDs of empty subxacts.
-	 */
-	if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
-		txn->txn_flags |= RBTXN_IS_STREAMED;
-
 	if (txn_prepared)
 	{
 		/*
@@ -1752,6 +1739,76 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	txn->nentries = 0;
 }
 
+/*
+ * Check the transaction status by CLOG lookup and discard all changes if
+ * the transaction is aborted. The transaction status is cached in
+ * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
+ * next call.
+ *
+ * Return true if the transaction is aborted, otherwise return false.
+ *
+ * When the 'debug_logical_replication_streaming' is set to "immediate", we
+ * don't check the transaction status, meaning the caller will always process
+ * this transaction.
+ */
+static bool
+ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	/* Quick return for regression tests */
+	if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
+		return false;
+
+	/*
+	 * Quick return if the transaction status is already known.
+	 */
+
+	if (rbtxn_is_committed(txn))
+		return false;
+	if (rbtxn_is_aborted(txn))
+	{
+		/* Already-aborted transactions should not have any changes */
+		Assert(txn->size == 0);
+
+		return true;
+	}
+
+	/* Otherwise, check the transaction status using CLOG lookup */
+
+	if (TransactionIdIsInProgress(txn->xid))
+		return false;
+
+	if (TransactionIdDidCommit(txn->xid))
+	{
+		/*
+		 * Remember the transaction is committed so that we can skip CLOG
+		 * check next time, avoiding the pressure on CLOG lookup.
+		 */
+		Assert(!rbtxn_is_aborted(txn));
+		txn->txn_flags |= RBTXN_IS_COMMITTED;
+		return false;
+	}
+
+	/*
+	 * The transaction aborted. We discard both the changes collected so far
+	 * and the toast reconstruction data. The full cleanup will happen as part
+	 * of decoding ABORT record of this transaction.
+	 */
+	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+	ReorderBufferToastReset(rb, txn);
+
+	/* All changes should be discarded */
+	Assert(txn->size == 0);
+
+	/*
+	 * Mark the transaction as aborted so we can ignore future changes of this
+	 * transaction.
+	 */
+	Assert(!rbtxn_is_committed(txn));
+	txn->txn_flags |= RBTXN_IS_ABORTED;
+
+	return true;
+}
+
 /*
  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
  * HeapTupleSatisfiesHistoricMVCC.
@@ -1917,7 +1974,9 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * Note, we send stream prepare even if a concurrent abort is
 		 * detected. See DecodePrepare for more information.
 		 */
+		Assert(!rbtxn_sent_prepare(txn));
 		rb->stream_prepare(rb, txn, txn->final_lsn);
+		txn->txn_flags |= RBTXN_SENT_PREPARE;
 
 		/*
 		 * This is a PREPARED transaction, part of a two-phase commit. The
@@ -2052,6 +2111,30 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
 												  txn, command_id);
 }
 
+/*
+ * Mark the given transaction as streamed if it's a top-level transaction
+ * or has changes.
+ */
+static void
+ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	/*
+	 * The top-level transaction, is marked as streamed always, even if it
+	 * does not contain any changes (that is, when all the changes are in
+	 * subtransactions).
+	 *
+	 * For subtransactions, we only mark them as streamed when there are
+	 * changes in them.
+	 *
+	 * We do it this way because of aborts - we don't want to send aborts for
+	 * XIDs the downstream is not aware of. And of course, it always knows
+	 * about the top-level xact (we send the XID in all messages), but we
+	 * never stream XIDs of empty subxacts.
+	 */
+	if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
+		txn->txn_flags |= RBTXN_IS_STREAMED;
+}
+
 /*
  * Helper function for ReorderBufferProcessTXN to handle the concurrent
  * abort of the streaming transaction.  This resets the TXN such that it
@@ -2543,7 +2626,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			 * regular ones).
 			 */
 			if (rbtxn_prepared(txn))
+			{
+				Assert(!rbtxn_sent_prepare(txn));
 				rb->prepare(rb, txn, commit_lsn);
+				txn->txn_flags |= RBTXN_SENT_PREPARE;
+			}
 			else
 				rb->commit(rb, txn, commit_lsn);
 		}
@@ -2595,6 +2682,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 */
 		if (streaming || rbtxn_prepared(txn))
 		{
+			if (streaming)
+				ReorderBufferMaybeMarkTXNStreamed(rb, txn);
+
 			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
 			/* Reset the CheckXidAlive */
 			CheckXidAlive = InvalidTransactionId;
@@ -2648,7 +2738,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			FlushErrorState();
 			FreeErrorData(errdata);
 			errdata = NULL;
-			curtxn->concurrent_abort = true;
+
+			/* Remember the transaction is aborted. */
+			Assert(!rbtxn_is_committed(curtxn));
+			curtxn->txn_flags |= RBTXN_IS_ABORTED;
+
+			/* Mark the transaction is streamed if appropriate */
+			if (stream_started)
+				ReorderBufferMaybeMarkTXNStreamed(rb, txn);
 
 			/* Reset the TXN so that it is allowed to stream remaining data. */
 			ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2828,15 +2925,15 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 						txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
 
 	/*
-	 * We send the prepare for the concurrently aborted xacts so that later
-	 * when rollback prepared is decoded and sent, the downstream should be
-	 * able to rollback such a xact. See comments atop DecodePrepare.
-	 *
-	 * Note, for the concurrent_abort + streaming case a stream_prepare was
-	 * already sent within the ReorderBufferReplay call above.
+	 * Send a prepare if not already done so. This might occur if we have
+	 * detected a concurrent abort while replaying the non-streaming
+	 * transaction.
 	 */
-	if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+	if (!rbtxn_sent_prepare(txn))
+	{
 		rb->prepare(rb, txn, txn->final_lsn);
+		txn->txn_flags |= RBTXN_SENT_PREPARE;
+	}
 }
 
 /*
@@ -3566,7 +3663,8 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
 }
 
 /*
- * Find the largest streamable toplevel transaction to evict (by streaming).
+ * Find the largest streamable (and non-aborted) toplevel transaction to evict
+ * (by streaming).
  *
  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
  * should give us the same transaction (because we don't update memory account
@@ -3608,9 +3706,15 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 		/* base_snapshot must be set */
 		Assert(txn->base_snapshot != NULL);
 
+		/* Don't consider these kinds of transactions for eviction. */
+		if (rbtxn_has_partial_change(txn) ||
+			!rbtxn_has_streamable_change(txn) ||
+			rbtxn_is_aborted(txn))
+			continue;
+
+		/* Find the largest of the eviction candidates. */
 		if ((largest == NULL || txn->total_size > largest_size) &&
-			(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
-			rbtxn_has_streamable_change(txn))
+			(txn->total_size > 0))
 		{
 			largest = txn;
 			largest_size = txn->total_size;
@@ -3661,8 +3765,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			rb->size > 0))
 	{
 		/*
-		 * Pick the largest transaction and evict it from memory by streaming,
-		 * if possible.  Otherwise, spill to disk.
+		 * Pick the largest non-aborted transaction and evict it from memory
+		 * by streaming, if possible.  Otherwise, spill to disk.
 		 */
 		if (ReorderBufferCanStartStreaming(rb) &&
 			(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
@@ -3672,6 +3776,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->total_size > 0);
 			Assert(rb->size >= txn->total_size);
 
+			/* skip the transaction if aborted */
+			if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+				continue;
+
 			ReorderBufferStreamTXN(rb, txn);
 		}
 		else
@@ -3687,6 +3795,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->size > 0);
 			Assert(rb->size >= txn->size);
 
+			/* skip the transaction if aborted */
+			if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+				continue;
+
 			ReorderBufferSerializeTXN(rb, txn);
 		}
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index a669658b3f1..9d9ac2f0830 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,9 @@ typedef struct ReorderBufferChange
 #define RBTXN_PREPARE             	0x0040
 #define RBTXN_SKIPPED_PREPARE	  	0x0080
 #define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
+#define RBTXN_SENT_PREPARE			0x0200
+#define RBTXN_IS_COMMITTED			0x0400
+#define RBTXN_IS_ABORTED			0x0800
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -224,12 +227,36 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
 )
 
-/* Has this transaction been prepared? */
+/*
+ * Is this a prepared transaction?
+ *
+ * Being true means that this transaction should be prepared instead of
+ * committed. To check whether a prepare or a stream_prepare has already
+ * been sent for this transaction, we need to use rbtxn_sent_prepare().
+ */
 #define rbtxn_prepared(txn) \
 ( \
 	((txn)->txn_flags & RBTXN_PREPARE) != 0 \
 )
 
+/* Has a prepare or stream_prepare already been sent? */
+#define rbtxn_sent_prepare(txn) \
+( \
+	((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
+)
+
+/* Is this transaction committed? */
+#define rbtxn_is_committed(txn) \
+( \
+	((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
+)
+
+/* Is this transaction aborted? */
+#define rbtxn_is_aborted(txn) \
+( \
+	((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
+)
+
 /* prepare for this transaction skipped? */
 #define rbtxn_skip_prepared(txn) \
 ( \
@@ -419,9 +446,6 @@ typedef struct ReorderBufferTXN
 	/* Size of top-transaction including sub-transactions. */
 	Size		total_size;
 
-	/* If we have detected concurrent abort then ignore future changes. */
-	bool		concurrent_abort;
-
 	/*
 	 * Private data pointer of the output plugin.
 	 */