Code review for txid patch: add binary I/O functions, avoid dependence

on SerializableSnapshot, minor other cleanup.  Marko Kreen, some further
editorialization by me.
This commit is contained in:
Tom Lane 2007-10-11 19:54:17 +00:00
parent 1246fcd02a
commit d77717bae7
5 changed files with 226 additions and 133 deletions

View File

@ -1,3 +1,4 @@
# $PostgreSQL: pgsql/contrib/txid/Makefile,v 1.2 2007/10/11 19:54:17 tgl Exp $
MODULES = txid
DATA_built = txid.sql
@ -5,7 +6,6 @@ DATA = uninstall_txid.sql
DOCS = README.txid
REGRESS = txid
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
@ -16,11 +16,3 @@ top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
test: install
$(MAKE) installcheck || { less regression.diffs; exit 1; }
ack:
cp results/* expected/

View File

@ -1,8 +1,7 @@
txid - export transaction IDs to user level
===========================================
txid - export transaction id's to user level
============================================
The goal is to make PostgreSQL internal transaction ID and snapshot
The goal is to make PostgreSQL's internal transaction ID and snapshot
data usable externally. This allows very efficient queue
implementation done inside database.
@ -32,7 +31,7 @@ txid_snapshot_xmax( snap ) returns int8
txid_snapshot_xip( snap ) setof int8
List of in-progress TXID's in snapshot, that are invisible.
Values are between xmin and xmax.
Values are between xmin (inclusive) and xmax (exclusive).
txid_visible_in_snapshot(id, snap) returns bool
@ -73,8 +72,8 @@ fetching possible txids below snap1.xmax explicitly:
AND NOT txid_visible_in_snapshot(ev_txid, :snap1)
AND txid_visible_in_snapshot(ev_txid, :snap2);
Note that although the above queries work, the PostgreSQL fails to
plan them correctly. For actual usage the values for txid_snapshot_xmin,
Note that although the above queries work, PostgreSQL fails to
plan them efficiently. For actual usage the values for txid_snapshot_xmin,
txid_snapshot_xmax and txid_snapshot_xip should be filled in directly,
only then will they use index.
@ -92,20 +91,16 @@ To see example code for that it's best to see pgq.batch_event_sql() function in
Dumping and restoring data containing TXIDs.
--------------------------------------------
[towrite: reason for epoch increase]
When reloading TXID data you will typically want to be sure that the current
XID counter is beyond the reloaded data. The easiest way to do this is to
increase the XID epoch to beyond the largest one in the input data.
You can look at current epoch with query:
You can look at current epoch with queries such as:
SELECT txid_current() >> 32 as epoch;
So new epoch should be:
SELECT (txid_current() >> 32) + 1 as newepoch;
SELECT MAX(txid) >> 32 as epoch FROM ...;
Epoch can be changed with pg_resetxlog command:
pg_resetxlog -e NEWEPOCH DATADIR
Database needs to be shut down for that moment.

View File

@ -1,12 +1,21 @@
/*-------------------------------------------------------------------------
* txid.c
*
* Export backend internal tranasction id's to user level.
* Export internal transaction IDs to user level.
*
* Note that only top-level transaction IDs are ever converted to TXID.
* This is important because TXIDs frequently persist beyond the global
* xmin horizon, or may even be shipped to other machines, so we cannot
* rely on being able to correlate subtransaction IDs with their parents
* via functions such as SubTransGetTopmostTransaction().
*
*
* Copyright (c) 2003-2007, PostgreSQL Global Development Group
* Author: Jan Wieck, Afilias USA INC.
*
* 64-bit txids: Marko Kreen, Skype Technologies
*
* $PostgreSQL: pgsql/contrib/txid/txid.c,v 1.4 2007/10/11 19:54:17 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -15,32 +24,33 @@
#include "access/transam.h"
#include "access/xact.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#ifdef INT64_IS_BUSTED
#error txid needs working int64
#endif
/* txid will be signed int8 in database */
/* txid will be signed int8 in database, so must limit to 63 bits */
#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
/*
* If defined, use bsearch() function for searching
* txid's inside snapshots that have more than given values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
/* format code for uint64 to appendStringInfo */
#define TXID_FMT UINT64_FORMAT
/* Use unsigned variant internally */
typedef uint64 txid;
/* sprintf format code for uint64 */
#define TXID_FMT UINT64_FORMAT
/*
* Snapshot for 8byte txids.
* If defined, use bsearch() function for searching for txids in snapshots
* that have more than the specified number of values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
/*
* Snapshot containing 8byte txids.
*/
typedef struct
{
@ -55,22 +65,27 @@ typedef struct
uint32 nxip; /* number of txids in xip array */
txid xmin;
txid xmax;
txid xip[1]; /* in-progress txids */
txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
} TxidSnapshot;
#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
#define TXID_SNAPSHOT_SIZE(nxip) \
(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
/*
* Epoch values from backend.
* Epoch values from xact.c
*/
typedef struct {
uint64 last_value;
uint64 epoch;
typedef struct
{
TransactionId last_xid;
uint32 epoch;
} TxidEpoch;
/* public functions */
Datum txid_snapshot_in(PG_FUNCTION_ARGS);
Datum txid_snapshot_out(PG_FUNCTION_ARGS);
Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
Datum txid_snapshot_send(PG_FUNCTION_ARGS);
Datum txid_current(PG_FUNCTION_ARGS);
Datum txid_current_snapshot(PG_FUNCTION_ARGS);
Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
@ -81,6 +96,8 @@ Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS);
/* public function tags */
PG_FUNCTION_INFO_V1(txid_snapshot_in);
PG_FUNCTION_INFO_V1(txid_snapshot_out);
PG_FUNCTION_INFO_V1(txid_snapshot_recv);
PG_FUNCTION_INFO_V1(txid_snapshot_send);
PG_FUNCTION_INFO_V1(txid_current);
PG_FUNCTION_INFO_V1(txid_current_snapshot);
PG_FUNCTION_INFO_V1(txid_snapshot_xmin);
@ -88,8 +105,18 @@ PG_FUNCTION_INFO_V1(txid_snapshot_xmax);
PG_FUNCTION_INFO_V1(txid_snapshot_xip);
PG_FUNCTION_INFO_V1(txid_visible_in_snapshot);
/*
* do a TransactionId -> txid conversion
* Fetch epoch data from xact.c.
*/
static void
load_xid_epoch(TxidEpoch *state)
{
GetNextXidAndEpoch(&state->last_xid, &state->epoch);
}
/*
* do a TransactionId -> txid conversion for an XID near the given epoch
*/
static txid
convert_xid(TransactionId xid, const TxidEpoch *state)
@ -97,53 +124,42 @@ convert_xid(TransactionId xid, const TxidEpoch *state)
uint64 epoch;
/* return special xid's as-is */
if (xid < FirstNormalTransactionId)
return xid;
if (!TransactionIdIsNormal(xid))
return (txid) xid;
/* xid can be on either side when near wrap-around */
epoch = (uint64) state->epoch;
if (xid > state->last_xid &&
TransactionIdPrecedes(xid, state->last_xid))
epoch--;
else if (xid < state->last_xid &&
TransactionIdFollows(xid, state->last_xid))
epoch++;
/* xid can on both sides on wrap-around */
epoch = state->epoch;
if (TransactionIdPrecedes(xid, state->last_value)) {
if (xid > state->last_value)
epoch--;
} else if (TransactionIdFollows(xid, state->last_value)) {
if (xid < state->last_value)
epoch++;
}
return (epoch << 32) | xid;
}
/*
* Fetch epoch data from backend.
*/
static void
load_xid_epoch(TxidEpoch *state)
{
TransactionId xid;
uint32 epoch;
GetNextXidAndEpoch(&xid, &epoch);
state->epoch = epoch;
state->last_value = xid;
}
/*
* compare txid in memory.
* txid comparator for qsort/bsearch
*/
static int
cmp_txid(const void *aa, const void *bb)
{
const uint64 *a = aa;
const uint64 *b = bb;
if (*a < *b)
txid a = *(const txid *) aa;
txid b = *(const txid *) bb;
if (a < b)
return -1;
if (*a > *b)
if (a > b)
return 1;
return 0;
}
/*
* order txids, for bsearch().
* sort a snapshot's txids, so we can use bsearch() later.
*
* For consistency of on-disk representation, we always sort even if bsearch
* will not be used.
*/
static void
sort_snapshot(TxidSnapshot *snap)
@ -166,13 +182,16 @@ is_visible_txid(txid value, const TxidSnapshot *snap)
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
{
void *res;
res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
/* if found, transaction is still in progress */
return (res) ? false : true;
}
#endif
else
{
int i;
uint32 i;
for (i = 0; i < snap->nxip; i++)
{
if (value == snap->xip[i])
@ -206,7 +225,7 @@ buf_add_txid(StringInfo buf, txid xid)
{
TxidSnapshot *snap = (TxidSnapshot *)buf->data;
/* do it before possible realloc */
/* do this before possible realloc */
snap->nxip++;
appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid));
@ -216,6 +235,7 @@ static TxidSnapshot *
buf_finalize(StringInfo buf)
{
TxidSnapshot *snap = (TxidSnapshot *)buf->data;
SET_VARSIZE(snap, buf->len);
/* buf is not needed anymore */
@ -319,13 +339,17 @@ bad_format:
}
/*
* Public functions
* Public functions.
*
* txid_current() and txid_current_snapshot() are the only ones that
* communicate with core xid machinery. All the others work on data
* returned by them.
*/
/*
* txid_current() returns int8
*
* Return the current transaction ID
* Return the current toplevel transaction ID as TXID
*/
Datum
txid_current(PG_FUNCTION_ARGS)
@ -343,19 +367,21 @@ txid_current(PG_FUNCTION_ARGS)
/*
* txid_current_snapshot() returns txid_snapshot
*
* Return current snapshot
* Return current snapshot in TXID format
*
* Note that only top-transaction XIDs are included in the snapshot.
*/
Datum
txid_current_snapshot(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap;
unsigned nxip, i, size;
uint32 nxip, i, size;
TxidEpoch state;
Snapshot cur;
cur = SerializableSnapshot;
cur = ActiveSnapshot;
if (cur == NULL)
elog(ERROR, "get_current_snapshot: SerializableSnapshot == NULL");
elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
load_xid_epoch(&state);
@ -372,7 +398,7 @@ txid_current_snapshot(PG_FUNCTION_ARGS)
for (i = 0; i < nxip; i++)
snap->xip[i] = convert_xid(cur->xip[i], &state);
/* we want them guaranteed ascending order */
/* we want them guaranteed to be in ascending order */
sort_snapshot(snap);
PG_RETURN_POINTER(snap);
@ -386,8 +412,8 @@ txid_current_snapshot(PG_FUNCTION_ARGS)
Datum
txid_snapshot_in(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap;
char *str = PG_GETARG_CSTRING(0);
TxidSnapshot *snap;
snap = parse_snapshot(str);
@ -402,11 +428,9 @@ txid_snapshot_in(PG_FUNCTION_ARGS)
Datum
txid_snapshot_out(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap;
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
StringInfoData str;
int i;
snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
uint32 i;
initStringInfo(&str);
@ -415,16 +439,92 @@ txid_snapshot_out(PG_FUNCTION_ARGS)
for (i = 0; i < snap->nxip; i++)
{
appendStringInfo(&str, "%s" TXID_FMT,
((i > 0) ? "," : ""),
snap->xip[i]);
if (i > 0)
appendStringInfoChar(&str, ',');
appendStringInfo(&str, TXID_FMT, snap->xip[i]);
}
PG_FREE_IF_COPY(snap, 0);
PG_RETURN_CSTRING(str.data);
}
/*
* txid_snapshot_recv(internal) returns txid_snapshot
*
* binary input function for type txid_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum
txid_snapshot_recv(PG_FUNCTION_ARGS)
{
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
TxidSnapshot *snap;
txid last = 0;
int nxip;
int i;
int avail;
int expect;
txid xmin, xmax;
/*
* load nxip and check for nonsense.
*
* (nxip > avail) check is against int overflows in 'expect'.
*/
nxip = pq_getmsgint(buf, 4);
avail = buf->len - buf->cursor;
expect = 8 + 8 + nxip * 8;
if (nxip < 0 || nxip > avail || expect > avail)
goto bad_format;
xmin = pq_getmsgint64(buf);
xmax = pq_getmsgint64(buf);
if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
goto bad_format;
snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
snap->xmin = xmin;
snap->xmax = xmax;
snap->nxip = nxip;
SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
for (i = 0; i < nxip; i++)
{
txid cur = pq_getmsgint64(buf);
if (cur <= last || cur < xmin || cur >= xmax)
goto bad_format;
snap->xip[i] = cur;
last = cur;
}
PG_RETURN_POINTER(snap);
bad_format:
elog(ERROR, "invalid snapshot data");
return (Datum)NULL;
}
/*
* txid_snapshot_send(txid_snapshot) returns bytea
*
* binary output function for type txid_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum
txid_snapshot_send(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0);
StringInfoData buf;
uint32 i;
pq_begintypsend(&buf);
pq_sendint(&buf, snap->nxip, 4);
pq_sendint64(&buf, snap->xmin);
pq_sendint64(&buf, snap->xmax);
for (i = 0; i < snap->nxip; i++)
pq_sendint64(&buf, snap->xip[i]);
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool
@ -436,12 +536,8 @@ txid_visible_in_snapshot(PG_FUNCTION_ARGS)
{
txid value = PG_GETARG_INT64(0);
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
int res;
res = is_visible_txid(value, snap) ? true : false;
PG_FREE_IF_COPY(snap, 1);
PG_RETURN_BOOL(res);
PG_RETURN_BOOL(is_visible_txid(value, snap));
}
/*
@ -453,9 +549,8 @@ Datum
txid_snapshot_xmin(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
txid res = snap->xmin;
PG_FREE_IF_COPY(snap, 0);
PG_RETURN_INT64(res);
PG_RETURN_INT64(snap->xmin);
}
/*
@ -467,9 +562,8 @@ Datum
txid_snapshot_xmax(PG_FUNCTION_ARGS)
{
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
txid res = snap->xmax;
PG_FREE_IF_COPY(snap, 0);
PG_RETURN_INT64(res);
PG_RETURN_INT64(snap->xmax);
}
/*
@ -486,15 +580,13 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)
/* on first call initialize snap_state and get copy of snapshot */
if (SRF_IS_FIRSTCALL()) {
TxidSnapshot *arg;
TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
fctx = SRF_FIRSTCALL_INIT();
/* make a copy of user snapshot */
arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
memcpy(snap, arg, VARSIZE(arg));
PG_FREE_IF_COPY(arg, 0);
fctx->user_fctx = snap;
}
@ -509,4 +601,3 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(fctx);
}
}

View File

@ -5,21 +5,38 @@
--
-- Copyright (c) 2003-2007, PostgreSQL Global Development Group
-- Author: Jan Wieck, Afilias USA INC.
--
-- 64-bit txids: Marko Kreen, Skype Technologies
--
-- $PostgreSQL: pgsql/contrib/txid/txid.sql.in,v 1.2 2007/10/11 19:54:17 tgl Exp $
--
-- ----------
-- Adjust this setting to control where the objects get created.
SET search_path = public;
BEGIN;
--
-- A special transaction snapshot data type for faster visibility checks
--
CREATE OR REPLACE FUNCTION txid_snapshot_in(cstring)
CREATE TYPE txid_snapshot;
CREATE FUNCTION txid_snapshot_in(cstring)
RETURNS txid_snapshot
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot)
CREATE FUNCTION txid_snapshot_out(txid_snapshot)
RETURNS cstring
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
CREATE FUNCTION txid_snapshot_recv(internal)
RETURNS txid_snapshot
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
CREATE FUNCTION txid_snapshot_send(txid_snapshot)
RETURNS bytea
AS 'MODULE_PATHNAME' LANGUAGE C
IMMUTABLE STRICT;
--
-- The data type itself
@ -27,42 +44,44 @@ CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot)
CREATE TYPE txid_snapshot (
INPUT = txid_snapshot_in,
OUTPUT = txid_snapshot_out,
RECEIVE = txid_snapshot_recv,
SEND = txid_snapshot_send,
INTERNALLENGTH = variable,
STORAGE = extended,
ALIGNMENT = double
);
CREATE OR REPLACE FUNCTION txid_current()
--
-- Functions for txid
--
CREATE FUNCTION txid_current()
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C
STABLE;
CREATE OR REPLACE FUNCTION txid_current_snapshot()
CREATE FUNCTION txid_current_snapshot()
RETURNS txid_snapshot
AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C
STABLE;
CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot)
CREATE FUNCTION txid_snapshot_xmin(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C
IMMUTABLE STRICT;
CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot)
CREATE FUNCTION txid_snapshot_xmax(txid_snapshot)
RETURNS bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C
IMMUTABLE STRICT;
CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot)
CREATE FUNCTION txid_snapshot_xip(txid_snapshot)
RETURNS setof bigint
AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C
IMMUTABLE STRICT;
--
-- Special comparision functions for visibility checks
--
CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)
CREATE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)
RETURNS boolean
AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C
IMMUTABLE STRICT;
COMMIT;

View File

@ -1,4 +1,4 @@
SET search_path = public;
DROP FUNCTION txid_current();
DROP FUNCTION txid_current_snapshot();
@ -6,10 +6,6 @@ DROP FUNCTION txid_snapshot_xmin(txid_snapshot);
DROP FUNCTION txid_snapshot_xmax(txid_snapshot);
DROP FUNCTION txid_snapshot_xip(txid_snapshot);
DROP FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot);
DROP FUNCTION txid_not_visible_in_snapshot(bigint, txid_snapshot);
DROP TYPE txid_snapshot cascade;
-- need cascade to drop those:
-- DROP FUNCTION txid_snapshot_in(cstring);
-- DROP FUNCTION txid_snapshot_out(txid_snapshot);
DROP TYPE txid_snapshot CASCADE;
-- need cascade to drop the I/O functions