Introduce compact WAL record for the common case of commit (non-DDL).

XLOG_XACT_COMMIT_COMPACT leaves out invalidation messages and relfilenodes,
saving considerable space for the vast majority of transaction commits.
XLOG_XACT_COMMIT keeps same definition as XLOG_PAGE_MAGIC 0xD067 and earlier.

Leonardo Francalanci and Simon Riggs
This commit is contained in:
Simon Riggs 2011-06-28 22:58:17 +01:00
parent 6f3efa76b0
commit 465883b0a2
4 changed files with 195 additions and 84 deletions

View File

@ -962,25 +962,9 @@ RecordTransactionCommit(void)
/* /*
* Begin commit critical section and insert the commit XLOG record. * Begin commit critical section and insert the commit XLOG record.
*/ */
XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit xlrec;
/* Tell bufmgr and smgr to prepare for commit */ /* Tell bufmgr and smgr to prepare for commit */
BufmgrCommit(); BufmgrCommit();
/*
* Set flags required for recovery processing of commits.
*/
xlrec.xinfo = 0;
if (RelcacheInitFileInval)
xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
if (forceSyncCommit)
xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
xlrec.dbId = MyDatabaseId;
xlrec.tsId = MyDatabaseTableSpace;
/* /*
* Mark ourselves as within our "commit critical section". This * Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated * forces any concurrent checkpoint to wait until we've updated
@ -1002,43 +986,88 @@ RecordTransactionCommit(void)
MyProc->inCommit = true; MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp(); SetCurrentTransactionStopTimestamp();
xlrec.xact_time = xactStopTimestamp;
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
/* dump rels to delete */
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
}
/* dump committed child Xids */
if (nchildren > 0)
{
rdata[lastrdata].next = &(rdata[2]);
rdata[2].data = (char *) children;
rdata[2].len = nchildren * sizeof(TransactionId);
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
/* dump shared cache invalidation messages */
if (nmsgs > 0)
{
rdata[lastrdata].next = &(rdata[3]);
rdata[3].data = (char *) invalMessages;
rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); /*
* Do we need the long commit record? If not, use the compact format.
*/
if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
{
XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit xlrec;
/*
* Set flags required for recovery processing of commits.
*/
xlrec.xinfo = 0;
if (RelcacheInitFileInval)
xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
if (forceSyncCommit)
xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
xlrec.dbId = MyDatabaseId;
xlrec.tsId = MyDatabaseTableSpace;
xlrec.xact_time = xactStopTimestamp;
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
/* dump rels to delete */
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
}
/* dump committed child Xids */
if (nchildren > 0)
{
rdata[lastrdata].next = &(rdata[2]);
rdata[2].data = (char *) children;
rdata[2].len = nchildren * sizeof(TransactionId);
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
/* dump shared cache invalidation messages */
if (nmsgs > 0)
{
rdata[lastrdata].next = &(rdata[3]);
rdata[3].data = (char *) invalMessages;
rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
}
else
{
XLogRecData rdata[2];
int lastrdata = 0;
xl_xact_commit_compact xlrec;
xlrec.xact_time = xactStopTimestamp;
xlrec.nsubxacts = nchildren;
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommitCompact;
rdata[0].buffer = InvalidBuffer;
/* dump committed child Xids */
if (nchildren > 0)
{
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) children;
rdata[1].len = nchildren * sizeof(TransactionId);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
}
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
}
} }
/* /*
@ -4441,19 +4470,17 @@ xactGetCommittedChildren(TransactionId **ptr)
* actions for which the order of execution is critical. * actions for which the order of execution is critical.
*/ */
static void static void
xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
TransactionId *sub_xids, int nsubxacts,
SharedInvalidationMessage *inval_msgs, int nmsgs,
RelFileNode *xnodes, int nrels,
Oid dbId, Oid tsId,
uint32 xinfo)
{ {
TransactionId *sub_xids;
SharedInvalidationMessage *inval_msgs;
TransactionId max_xid; TransactionId max_xid;
int i; int i;
/* subxid array follows relfilenodes */ max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
/* /*
* Make sure nextXid is beyond any XID mentioned in the record. * Make sure nextXid is beyond any XID mentioned in the record.
@ -4476,7 +4503,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
/* /*
* Mark the transaction committed in pg_clog. * Mark the transaction committed in pg_clog.
*/ */
TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids); TransactionIdCommitTree(xid, nsubxacts, sub_xids);
} }
else else
{ {
@ -4500,41 +4527,41 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
* bits set on changes made by transactions that haven't yet * bits set on changes made by transactions that haven't yet
* recovered. It's unlikely but it's good to be safe. * recovered. It's unlikely but it's good to be safe.
*/ */
TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn); TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
/* /*
* We must mark clog before we update the ProcArray. * We must mark clog before we update the ProcArray.
*/ */
ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
/* /*
* Send any cache invalidations attached to the commit. We must * Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as * maintain the same order of invalidation then release locks as
* occurs in CommitTransaction(). * occurs in CommitTransaction().
*/ */
ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
XactCompletionRelcacheInitFileInval(xlrec), XactCompletionRelcacheInitFileInval(xinfo),
xlrec->dbId, xlrec->tsId); dbId, tsId);
/* /*
* Release locks, if any. We do this for both two phase and normal one * Release locks, if any. We do this for both two phase and normal one
* phase transactions. In effect we are ignoring the prepare phase and * phase transactions. In effect we are ignoring the prepare phase and
* just going straight to lock release. * just going straight to lock release.
*/ */
StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
} }
/* Make sure files supposed to be dropped are dropped */ /* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++) for (i = 0; i < nrels; i++)
{ {
SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
ForkNumber fork; ForkNumber fork;
for (fork = 0; fork <= MAX_FORKNUM; fork++) for (fork = 0; fork <= MAX_FORKNUM; fork++)
{ {
if (smgrexists(srel, fork)) if (smgrexists(srel, fork))
{ {
XLogDropRelation(xlrec->xnodes[i], fork); XLogDropRelation(xnodes[i], fork);
smgrdounlink(srel, fork, true); smgrdounlink(srel, fork, true);
} }
} }
@ -4553,8 +4580,46 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
* to reduce that problem window, for any user that requested * to reduce that problem window, for any user that requested
* ForceSyncCommit(). * ForceSyncCommit().
*/ */
if (XactCompletionForceSyncCommit(xlrec)) if (XactCompletionForceSyncCommit(xinfo))
XLogFlush(lsn); XLogFlush(lsn);
}
/*
* Utility function to call xact_redo_commit_internal after breaking down xlrec
*/
static void
xact_redo_commit(xl_xact_commit *xlrec,
TransactionId xid, XLogRecPtr lsn)
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
/* subxid array follows relfilenodes */
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
xlrec->xinfo);
}
/*
* Utility function to call xact_redo_commit_internal for compact form of message.
*/
static void
xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
TransactionId xid, XLogRecPtr lsn)
{
xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
NULL, 0, /* inval msgs */
NULL, 0, /* relfilenodes */
InvalidOid, /* dbId */
InvalidOid, /* tsId */
0); /* xinfo */
} }
/* /*
@ -4655,7 +4720,13 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
/* Backup blocks are not used in xact records */ /* Backup blocks are not used in xact records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
if (info == XLOG_XACT_COMMIT) if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_COMMIT)
{ {
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
@ -4703,9 +4774,9 @@ static void
xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{ {
int i; int i;
TransactionId *xacts; TransactionId *subxacts;
xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
@ -4724,15 +4795,15 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{ {
appendStringInfo(buf, "; subxacts:"); appendStringInfo(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++) for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xacts[i]); appendStringInfo(buf, " %u", subxacts[i]);
} }
if (xlrec->nmsgs > 0) if (xlrec->nmsgs > 0)
{ {
SharedInvalidationMessage *msgs; SharedInvalidationMessage *msgs;
msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts]; msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec)) if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
xlrec->dbId, xlrec->tsId); xlrec->dbId, xlrec->tsId);
@ -4758,6 +4829,21 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
} }
} }
static void
xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
{
int i;
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nsubxacts > 0)
{
appendStringInfo(buf, "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
appendStringInfo(buf, " %u", xlrec->subxacts[i]);
}
}
static void static void
xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec) xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
{ {
@ -4802,7 +4888,14 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
{ {
uint8 info = xl_info & ~XLR_INFO_MASK; uint8 info = xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT) if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
appendStringInfo(buf, "commit: ");
xact_desc_commit_compact(buf, xlrec);
}
else if (info == XLOG_XACT_COMMIT)
{ {
xl_xact_commit *xlrec = (xl_xact_commit *) rec; xl_xact_commit *xlrec = (xl_xact_commit *) rec;

View File

@ -5593,7 +5593,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID) if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
return false; return false;
record_info = record->xl_info & ~XLR_INFO_MASK; record_info = record->xl_info & ~XLR_INFO_MASK;
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *recordXactCommitData;
recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
}
else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{ {
xl_xact_commit *recordXactCommitData; xl_xact_commit *recordXactCommitData;
@ -5680,7 +5687,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recoveryStopTime = recordXtime; recoveryStopTime = recordXtime;
recoveryStopAfter = *includeThis; recoveryStopAfter = *includeThis;
if (record_info == XLOG_XACT_COMMIT) if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{ {
if (recoveryStopAfter) if (recoveryStopAfter)
ereport(LOG, ereport(LOG,

View File

@ -106,6 +106,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50 #define XLOG_XACT_ASSIGNMENT 0x50
#define XLOG_XACT_COMMIT_COMPACT 0x60
typedef struct xl_xact_assignment typedef struct xl_xact_assignment
{ {
@ -116,6 +117,16 @@ typedef struct xl_xact_assignment
#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
typedef struct xl_xact_commit_compact
{
TimestampTz xact_time; /* time of commit */
int nsubxacts; /* number of subtransaction XIDs */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
TransactionId subxacts[1]; /* VARIABLE LENGTH ARRAY */
} xl_xact_commit_compact;
#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
typedef struct xl_xact_commit typedef struct xl_xact_commit
{ {
TimestampTz xact_time; /* time of commit */ TimestampTz xact_time; /* time of commit */
@ -145,8 +156,8 @@ typedef struct xl_xact_commit
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
/* Access macros for above flags */ /* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
#define XactCompletionForceSyncCommit(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) #define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
typedef struct xl_xact_abort typedef struct xl_xact_abort
{ {

View File

@ -71,7 +71,7 @@ typedef struct XLogContRecord
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD068 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {