When a cursor is opened using dblink_open, only start a transaction

if there isn't one already open. Upon dblink_close, only commit
the open transaction if it was started by dblink_open, and only
then when all cursors opened by dblink_open are closed. The transaction
accounting is done individually for all named connections, plus
the persistent unnamed connection.
This commit is contained in:
Joe Conway 2005-10-18 02:55:49 +00:00
parent c62b29a603
commit 056eb1412c
3 changed files with 193 additions and 39 deletions

View File

@ -60,9 +60,9 @@
typedef struct remoteConn typedef struct remoteConn
{ {
PGconn *conn; /* Hold the remote connection */ PGconn *conn; /* Hold the remote connection */
int autoXactCursors;/* Indicates the number of open cursors, int openCursorCount; /* The number of open cursors */
* non-zero means we opened the xact ourselves */ bool newXactForCursor; /* Opened a transaction for a cursor */
} remoteConn; } remoteConn;
/* /*
@ -84,10 +84,8 @@ static Oid get_relid_from_relname(text *relname_text);
static char *generate_relation_name(Oid relid); static char *generate_relation_name(Oid relid);
/* Global */ /* Global */
List *res_id = NIL; static remoteConn *pconn = NULL;
int res_id_index = 0; static HTAB *remoteConnHash = NULL;
PGconn *persistent_conn = NULL;
static HTAB *remoteConnHash = NULL;
/* /*
* Following is list that holds multiple remote connections. * Following is list that holds multiple remote connections.
@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
} \ } \
} while (0) } while (0)
#define DBLINK_INIT \
do { \
if (!pconn) \
{ \
pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
pconn->conn = NULL; \
pconn->openCursorCount = 0; \
pconn->newXactForCursor = FALSE; \
} \
} while (0)
/* /*
* Create a persistent connection to another database * Create a persistent connection to another database
@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
PGconn *conn = NULL; PGconn *conn = NULL;
remoteConn *rconn = NULL; remoteConn *rconn = NULL;
DBLINK_INIT;
if (PG_NARGS() == 2) if (PG_NARGS() == 2)
{ {
connstr = GET_STR(PG_GETARG_TEXT_P(1)); connstr = GET_STR(PG_GETARG_TEXT_P(1));
@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
createNewConnection(connname, rconn); createNewConnection(connname, rconn);
} }
else else
persistent_conn = conn; pconn->conn = conn;
PG_RETURN_TEXT_P(GET_TEXT("OK")); PG_RETURN_TEXT_P(GET_TEXT("OK"));
} }
@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL; remoteConn *rconn = NULL;
PGconn *conn = NULL; PGconn *conn = NULL;
DBLINK_INIT;
if (PG_NARGS() == 1) if (PG_NARGS() == 1)
{ {
conname = GET_STR(PG_GETARG_TEXT_P(0)); conname = GET_STR(PG_GETARG_TEXT_P(0));
@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
conn = rconn->conn; conn = rconn->conn;
} }
else else
conn = persistent_conn; conn = pconn->conn;
if (!conn) if (!conn)
DBLINK_CONN_NOT_AVAIL; DBLINK_CONN_NOT_AVAIL;
@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
pfree(rconn); pfree(rconn);
} }
else else
persistent_conn = NULL; pconn->conn = NULL;
PG_RETURN_TEXT_P(GET_TEXT("OK")); PG_RETURN_TEXT_P(GET_TEXT("OK"));
} }
@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL; remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */ bool fail = true; /* default to backward compatible behavior */
DBLINK_INIT;
if (PG_NARGS() == 2) if (PG_NARGS() == 2)
{ {
/* text,text */ /* text,text */
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(1));
conn = persistent_conn; rconn = pconn;
} }
else if (PG_NARGS() == 3) else if (PG_NARGS() == 3)
{ {
@ -307,7 +321,7 @@ dblink_open(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
sql = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2); fail = PG_GETARG_BOOL(2);
conn = persistent_conn; rconn = pconn;
} }
else else
{ {
@ -315,8 +329,6 @@ dblink_open(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(1)); curname = GET_STR(PG_GETARG_TEXT_P(1));
sql = GET_STR(PG_GETARG_TEXT_P(2)); sql = GET_STR(PG_GETARG_TEXT_P(2));
rconn = getConnectionByName(conname); rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
} }
} }
else if (PG_NARGS() == 4) else if (PG_NARGS() == 4)
@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
sql = GET_STR(PG_GETARG_TEXT_P(2)); sql = GET_STR(PG_GETARG_TEXT_P(2));
fail = PG_GETARG_BOOL(3); fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname); rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
} }
if (!conn) if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL; DBLINK_CONN_NOT_AVAIL;
else
conn = rconn->conn;
res = PQexec(conn, "BEGIN"); /* If we are not in a transaction, start one */
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQtransactionStatus(conn) == PQTRANS_IDLE)
DBLINK_RES_INTERNALERROR("begin error"); {
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
DBLINK_RES_INTERNALERROR("begin error");
PQclear(res);
rconn->newXactForCursor = TRUE;
}
PQclear(res); /* if we started a transaction, increment cursor count */
if (rconn->newXactForCursor)
(rconn->openCursorCount)++;
appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
res = PQexec(conn, str->data); res = PQexec(conn, str->data);
@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL; remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */ bool fail = true; /* default to backward compatible behavior */
DBLINK_INIT;
if (PG_NARGS() == 1) if (PG_NARGS() == 1)
{ {
/* text */ /* text */
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
conn = persistent_conn; rconn = pconn;
} }
else if (PG_NARGS() == 2) else if (PG_NARGS() == 2)
{ {
@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
{ {
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1); fail = PG_GETARG_BOOL(1);
conn = persistent_conn; rconn = pconn;
} }
else else
{ {
conname = GET_STR(PG_GETARG_TEXT_P(0)); conname = GET_STR(PG_GETARG_TEXT_P(0));
curname = GET_STR(PG_GETARG_TEXT_P(1)); curname = GET_STR(PG_GETARG_TEXT_P(1));
rconn = getConnectionByName(conname); rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
} }
} }
if (PG_NARGS() == 3) if (PG_NARGS() == 3)
@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(1)); curname = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2); fail = PG_GETARG_BOOL(2);
rconn = getConnectionByName(conname); rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
} }
if (!conn) if (!rconn || !rconn->conn)
DBLINK_CONN_NOT_AVAIL; DBLINK_CONN_NOT_AVAIL;
else
conn = rconn->conn;
appendStringInfo(str, "CLOSE %s", curname); appendStringInfo(str, "CLOSE %s", curname);
@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
PQclear(res); PQclear(res);
/* commit the transaction */ /* if we started a transaction, decrement cursor count */
res = PQexec(conn, "COMMIT"); if (rconn->newXactForCursor)
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
DBLINK_RES_INTERNALERROR("commit error"); (rconn->openCursorCount)--;
PQclear(res); /* if count is zero, commit the transaction */
if (rconn->openCursorCount == 0)
{
rconn->newXactForCursor = FALSE;
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
DBLINK_RES_INTERNALERROR("commit error");
PQclear(res);
}
}
PG_RETURN_TEXT_P(GET_TEXT("OK")); PG_RETURN_TEXT_P(GET_TEXT("OK"));
} }
@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
char *conname = NULL; char *conname = NULL;
remoteConn *rconn = NULL; remoteConn *rconn = NULL;
DBLINK_INIT;
/* stuff done only on the first call of the function */ /* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) if (SRF_IS_FIRSTCALL())
{ {
@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1); howmany = PG_GETARG_INT32(1);
fail = PG_GETARG_BOOL(2); fail = PG_GETARG_BOOL(2);
conn = persistent_conn; conn = pconn->conn;
} }
else else
{ {
@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
/* text,int */ /* text,int */
curname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(0));
howmany = PG_GETARG_INT32(1); howmany = PG_GETARG_INT32(1);
conn = persistent_conn; conn = pconn->conn;
} }
if (!conn) if (!conn)
@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
MemoryContext oldcontext; MemoryContext oldcontext;
bool freeconn = false; bool freeconn = false;
DBLINK_INIT;
/* stuff done only on the first call of the function */ /* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL()) if (SRF_IS_FIRSTCALL())
{ {
@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
/* text,text or text,bool */ /* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{ {
conn = persistent_conn; conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1); fail = PG_GETARG_BOOL(1);
} }
@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
else if (PG_NARGS() == 1) else if (PG_NARGS() == 1)
{ {
/* text */ /* text */
conn = persistent_conn; conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
} }
else else
@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
bool freeconn = false; bool freeconn = false;
bool fail = true; /* default to backward compatible behavior */ bool fail = true; /* default to backward compatible behavior */
DBLINK_INIT;
if (PG_NARGS() == 3) if (PG_NARGS() == 3)
{ {
/* must be text,text,bool */ /* must be text,text,bool */
@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
/* might be text,text or text,bool */ /* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{ {
conn = persistent_conn; conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1); fail = PG_GETARG_BOOL(1);
} }
@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
else if (PG_NARGS() == 1) else if (PG_NARGS() == 1)
{ {
/* must be single text argument */ /* must be single text argument */
conn = persistent_conn; conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(0));
} }
else else

View File

@ -436,6 +436,88 @@ SELECT dblink_exec('myconn','ABORT');
ROLLBACK ROLLBACK
(1 row) (1 row)
-- test opening cursor in a transaction
SELECT dblink_exec('myconn','BEGIN');
dblink_exec
-------------
BEGIN
(1 row)
-- an open transaction will prevent dblink_open() from opening its own
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
-------------
OK
(1 row)
-- this should not commit the transaction because the client opened it
SELECT dblink_close('myconn','rmt_foo_cursor');
dblink_close
--------------
OK
(1 row)
-- this should succeed because we have an open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
dblink_exec
----------------
DECLARE CURSOR
(1 row)
-- commit remote transaction
SELECT dblink_exec('myconn','COMMIT');
dblink_exec
-------------
COMMIT
(1 row)
-- test automatic transactions for multiple cursor opens
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open
-------------
OK
(1 row)
-- the second cursor
SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
dblink_open
-------------
OK
(1 row)
-- this should not commit the transaction
SELECT dblink_close('myconn','rmt_foo_cursor2');
dblink_close
--------------
OK
(1 row)
-- this should succeed because we have an open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
dblink_exec
----------------
DECLARE CURSOR
(1 row)
-- this should commit the transaction
SELECT dblink_close('myconn','rmt_foo_cursor');
dblink_close
--------------
OK
(1 row)
-- this should fail because there is no open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
ERROR: sql error
DETAIL: ERROR: cursor "xact_test" already exists
-- reset remote transaction state
SELECT dblink_exec('myconn','ABORT');
dblink_exec
-------------
ROLLBACK
(1 row)
-- open a cursor -- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
dblink_open dblink_open

View File

@ -217,6 +217,42 @@ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foobar',false);
-- reset remote transaction state -- reset remote transaction state
SELECT dblink_exec('myconn','ABORT'); SELECT dblink_exec('myconn','ABORT');
-- test opening cursor in a transaction
SELECT dblink_exec('myconn','BEGIN');
-- an open transaction will prevent dblink_open() from opening its own
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
-- this should not commit the transaction because the client opened it
SELECT dblink_close('myconn','rmt_foo_cursor');
-- this should succeed because we have an open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
-- commit remote transaction
SELECT dblink_exec('myconn','COMMIT');
-- test automatic transactions for multiple cursor opens
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
-- the second cursor
SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
-- this should not commit the transaction
SELECT dblink_close('myconn','rmt_foo_cursor2');
-- this should succeed because we have an open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
-- this should commit the transaction
SELECT dblink_close('myconn','rmt_foo_cursor');
-- this should fail because there is no open transaction
SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
-- reset remote transaction state
SELECT dblink_exec('myconn','ABORT');
-- open a cursor -- open a cursor
SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo'); SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');