mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-09 08:10:09 +08:00
postgres_fdw: Allow cancellation of transaction control commands.
Commitf039eaac71
, later back-patched with commit1b812afb0e
, allowed many of the queries issued by postgres_fdw to fetch remote data to respond to cancel interrupts in a timely fashion. However, it didn't do anything about the transaction control commands, which remained noninterruptible. Improve the situation by changing do_sql_command() to retrieve query results using pgfdw_get_result(), which uses the asynchronous interface to libpq so that it can check for interrupts every time libpq returns control. Since this might result in a situation where we can no longer be sure that the remote transaction state matches the local transaction state, add a facility to force all levels of the local transaction to abort if we've lost track of the remote state; without this, an apparently-successful commit of the local transaction might fail to commit changes made on the remote side. Also, add a 60-second timeout for queries issue during transaction abort; if that expires, give up and mark the state of the connection as unknown. Drop all such connections when we exit the local transaction. Together, these changes mean that if we're aborting the local toplevel transaction anyway, we can just drop the remote connection in lieu of waiting (possibly for a very long time) for it to complete an abort. This still leaves quite a bit of room for improvement. PQcancel() has no asynchronous interface, so if we get stuck sending the cancel request we'll still hang. Also, PQsetnonblocking() is not used, which means we could block uninterruptibly when sending a query. There might be some other optimizations possible as well. Nonetheless, this allows us to escape a wait for an unresponsive remote server quickly in many more cases than previously. Report by Suraj Kharage. Patch by me and Rafia Sabih. Review and testing by Amit Kapila and Tushar Ahuja. Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
This commit is contained in:
parent
9d1758d1e6
commit
fc267a0c3c
@ -14,6 +14,8 @@
|
||||
|
||||
#include "postgres_fdw.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "catalog/pg_user_mapping.h"
|
||||
#include "access/xact.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "miscadmin.h"
|
||||
@ -21,6 +23,8 @@
|
||||
#include "storage/proc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -51,6 +55,7 @@ typedef struct ConnCacheEntry
|
||||
* one level of subxact open, etc */
|
||||
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
|
||||
bool have_error; /* have any subxacts aborted in this xact? */
|
||||
bool changing_xact_state; /* xact state change in process */
|
||||
} ConnCacheEntry;
|
||||
|
||||
/*
|
||||
@ -76,6 +81,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
|
||||
SubTransactionId mySubid,
|
||||
SubTransactionId parentSubid,
|
||||
void *arg);
|
||||
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
|
||||
static bool pgfdw_cancel_query(PGconn *conn);
|
||||
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
|
||||
bool ignore_errors);
|
||||
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
|
||||
PGresult **result);
|
||||
|
||||
|
||||
/*
|
||||
@ -144,8 +155,12 @@ GetConnection(ForeignServer *server, UserMapping *user,
|
||||
entry->xact_depth = 0;
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
entry->changing_xact_state = false;
|
||||
}
|
||||
|
||||
/* Reject further use of connections which failed abort cleanup. */
|
||||
pgfdw_reject_incomplete_xact_state_change(entry);
|
||||
|
||||
/*
|
||||
* We don't check the health of cached connection here, because it would
|
||||
* require some overhead. Broken connection will be detected when the
|
||||
@ -355,7 +370,9 @@ do_sql_command(PGconn *conn, const char *sql)
|
||||
{
|
||||
PGresult *res;
|
||||
|
||||
res = PQexec(conn, sql);
|
||||
if (!PQsendQuery(conn, sql))
|
||||
pgfdw_report_error(ERROR, NULL, conn, false, sql);
|
||||
res = pgfdw_get_result(conn, sql);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, true, sql);
|
||||
PQclear(res);
|
||||
@ -388,8 +405,10 @@ begin_remote_xact(ConnCacheEntry *entry)
|
||||
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
|
||||
else
|
||||
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
|
||||
entry->changing_xact_state = true;
|
||||
do_sql_command(entry->conn, sql);
|
||||
entry->xact_depth = 1;
|
||||
entry->changing_xact_state = false;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -402,8 +421,10 @@ begin_remote_xact(ConnCacheEntry *entry)
|
||||
char sql[64];
|
||||
|
||||
snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
|
||||
entry->changing_xact_state = true;
|
||||
do_sql_command(entry->conn, sql);
|
||||
entry->xact_depth++;
|
||||
entry->changing_xact_state = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -616,14 +637,25 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
/* If it has an open remote transaction, try to close it */
|
||||
if (entry->xact_depth > 0)
|
||||
{
|
||||
bool abort_cleanup_failure = false;
|
||||
|
||||
elog(DEBUG3, "closing remote transaction on connection %p",
|
||||
entry->conn);
|
||||
|
||||
switch (event)
|
||||
{
|
||||
case XACT_EVENT_PRE_COMMIT:
|
||||
|
||||
/*
|
||||
* If abort cleanup previously failed for this connection,
|
||||
* we can't issue any more commands against it.
|
||||
*/
|
||||
pgfdw_reject_incomplete_xact_state_change(entry);
|
||||
|
||||
/* Commit all remote transactions during pre-commit */
|
||||
entry->changing_xact_state = true;
|
||||
do_sql_command(entry->conn, "COMMIT TRANSACTION");
|
||||
entry->changing_xact_state = false;
|
||||
|
||||
/*
|
||||
* If there were any errors in subtransactions, and we
|
||||
@ -669,6 +701,27 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
elog(ERROR, "missed cleaning up connection during pre-commit");
|
||||
break;
|
||||
case XACT_EVENT_ABORT:
|
||||
|
||||
/*
|
||||
* Don't try to clean up the connection if we're already
|
||||
* in error recursion trouble.
|
||||
*/
|
||||
if (in_error_recursion_trouble())
|
||||
entry->changing_xact_state = true;
|
||||
|
||||
/*
|
||||
* If connection is already unsalvageable, don't touch it
|
||||
* further.
|
||||
*/
|
||||
if (entry->changing_xact_state)
|
||||
break;
|
||||
|
||||
/*
|
||||
* Mark this connection as in the process of changing
|
||||
* transaction state.
|
||||
*/
|
||||
entry->changing_xact_state = true;
|
||||
|
||||
/* Assume we might have lost track of prepared statements */
|
||||
entry->have_error = true;
|
||||
|
||||
@ -679,40 +732,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
* is still being processed by the remote server, and if so,
|
||||
* request cancellation of the command.
|
||||
*/
|
||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
|
||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
|
||||
!pgfdw_cancel_query(entry->conn))
|
||||
{
|
||||
PGcancel *cancel;
|
||||
char errbuf[256];
|
||||
|
||||
if ((cancel = PQgetCancel(entry->conn)))
|
||||
{
|
||||
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not send cancel request: %s",
|
||||
errbuf)));
|
||||
PQfreeCancel(cancel);
|
||||
}
|
||||
/* Unable to cancel running query. */
|
||||
abort_cleanup_failure = true;
|
||||
}
|
||||
else if (!pgfdw_exec_cleanup_query(entry->conn,
|
||||
"ABORT TRANSACTION",
|
||||
false))
|
||||
{
|
||||
/* Unable to abort remote transaction. */
|
||||
abort_cleanup_failure = true;
|
||||
}
|
||||
else if (entry->have_prep_stmt && entry->have_error &&
|
||||
!pgfdw_exec_cleanup_query(entry->conn,
|
||||
"DEALLOCATE ALL",
|
||||
true))
|
||||
{
|
||||
/* Trouble clearing prepared statements. */
|
||||
abort_cleanup_failure = true;
|
||||
}
|
||||
|
||||
/* If we're aborting, abort all remote transactions too */
|
||||
res = PQexec(entry->conn, "ABORT TRANSACTION");
|
||||
/* Note: can't throw ERROR, it would be infinite loop */
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(WARNING, res, entry->conn, true,
|
||||
"ABORT TRANSACTION");
|
||||
else
|
||||
{
|
||||
PQclear(res);
|
||||
/* As above, make sure to clear any prepared stmts */
|
||||
if (entry->have_prep_stmt && entry->have_error)
|
||||
{
|
||||
res = PQexec(entry->conn, "DEALLOCATE ALL");
|
||||
PQclear(res);
|
||||
}
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
}
|
||||
|
||||
/* Disarm changing_xact_state if it all worked. */
|
||||
entry->changing_xact_state = abort_cleanup_failure;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -725,11 +773,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
* recover. Next GetConnection will open a new connection.
|
||||
*/
|
||||
if (PQstatus(entry->conn) != CONNECTION_OK ||
|
||||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
|
||||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
|
||||
entry->changing_xact_state)
|
||||
{
|
||||
elog(DEBUG3, "discarding connection %p", entry->conn);
|
||||
PQfinish(entry->conn);
|
||||
entry->conn = NULL;
|
||||
entry->changing_xact_state = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -772,7 +822,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
|
||||
hash_seq_init(&scan, ConnectionHash);
|
||||
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
|
||||
{
|
||||
PGresult *res;
|
||||
char sql[100];
|
||||
|
||||
/*
|
||||
@ -788,12 +837,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
|
||||
|
||||
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
|
||||
{
|
||||
/*
|
||||
* If abort cleanup previously failed for this connection, we
|
||||
* can't issue any more commands against it.
|
||||
*/
|
||||
pgfdw_reject_incomplete_xact_state_change(entry);
|
||||
|
||||
/* Commit all remote subtransactions during pre-commit */
|
||||
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
|
||||
entry->changing_xact_state = true;
|
||||
do_sql_command(entry->conn, sql);
|
||||
entry->changing_xact_state = false;
|
||||
}
|
||||
else
|
||||
else if (in_error_recursion_trouble())
|
||||
{
|
||||
/*
|
||||
* Don't try to clean up the connection if we're already in error
|
||||
* recursion trouble.
|
||||
*/
|
||||
entry->changing_xact_state = true;
|
||||
}
|
||||
else if (!entry->changing_xact_state)
|
||||
{
|
||||
bool abort_cleanup_failure = false;
|
||||
|
||||
/* Remember that abort cleanup is in progress. */
|
||||
entry->changing_xact_state = true;
|
||||
|
||||
/* Assume we might have lost track of prepared statements */
|
||||
entry->have_error = true;
|
||||
|
||||
@ -804,34 +874,211 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
|
||||
* the remote server, and if so, request cancellation of the
|
||||
* command.
|
||||
*/
|
||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
|
||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
|
||||
!pgfdw_cancel_query(entry->conn))
|
||||
abort_cleanup_failure = true;
|
||||
else
|
||||
{
|
||||
PGcancel *cancel;
|
||||
char errbuf[256];
|
||||
|
||||
if ((cancel = PQgetCancel(entry->conn)))
|
||||
{
|
||||
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not send cancel request: %s",
|
||||
errbuf)));
|
||||
PQfreeCancel(cancel);
|
||||
}
|
||||
/* Rollback all remote subtransactions during abort */
|
||||
snprintf(sql, sizeof(sql),
|
||||
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
|
||||
curlevel, curlevel);
|
||||
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
|
||||
abort_cleanup_failure = true;
|
||||
}
|
||||
|
||||
/* Rollback all remote subtransactions during abort */
|
||||
snprintf(sql, sizeof(sql),
|
||||
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
|
||||
curlevel, curlevel);
|
||||
res = PQexec(entry->conn, sql);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(WARNING, res, entry->conn, true, sql);
|
||||
else
|
||||
PQclear(res);
|
||||
/* Disarm changing_xact_state if it all worked. */
|
||||
entry->changing_xact_state = abort_cleanup_failure;
|
||||
}
|
||||
|
||||
/* OK, we're outta that level of subtransaction */
|
||||
entry->xact_depth--;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Raise an error if the given connection cache entry is marked as being
|
||||
* in the middle of an xact state change. This should be called at which no
|
||||
* such change is expected to be in progress; if one is found to be in
|
||||
* progress, it means that we aborted in the middle of a previous state change
|
||||
* and now don't know what the remote transaction state actually is.
|
||||
* Such connections can't safely be further used. Re-establishing the
|
||||
* connection would change the snapshot and roll back any writes already
|
||||
* performed, so that's not an option, either. Thus, we must abort.
|
||||
*/
|
||||
static void
|
||||
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
|
||||
{
|
||||
ForeignServer *server;
|
||||
|
||||
if (!entry->changing_xact_state)
|
||||
return;
|
||||
|
||||
server = GetForeignServer(entry->key.serverid);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONNECTION_EXCEPTION),
|
||||
errmsg("connection to server \"%s\" was lost",
|
||||
server->servername)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Cancel the currently-in-progress query (whose query text we do not have)
|
||||
* and ignore the result. Returns true if we successfully cancel the query
|
||||
* and discard any pending result, and false if not.
|
||||
*/
|
||||
static bool
|
||||
pgfdw_cancel_query(PGconn *conn)
|
||||
{
|
||||
PGcancel *cancel;
|
||||
char errbuf[256];
|
||||
PGresult *result = NULL;
|
||||
TimestampTz endtime;
|
||||
|
||||
/*
|
||||
* If it takes too long to cancel the query and discard the result, assume
|
||||
* the connection is dead.
|
||||
*/
|
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
|
||||
|
||||
/*
|
||||
* Issue cancel request. Unfortunately, there's no good way to limit the
|
||||
* amount of time that we might block inside PQgetCancel().
|
||||
*/
|
||||
if ((cancel = PQgetCancel(conn)))
|
||||
{
|
||||
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not send cancel request: %s",
|
||||
errbuf)));
|
||||
PQfreeCancel(cancel);
|
||||
return false;
|
||||
}
|
||||
PQfreeCancel(cancel);
|
||||
}
|
||||
|
||||
/* Get and discard the result of the query. */
|
||||
if (pgfdw_get_cleanup_result(conn, endtime, &result))
|
||||
return false;
|
||||
PQclear(result);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
|
||||
* result. If the query is executed without error, the return value is true.
|
||||
* If the query is executed successfully but returns an error, the return
|
||||
* value is true if and only if ignore_errors is set. If the query can't be
|
||||
* sent or times out, the return value is false.
|
||||
*/
|
||||
static bool
|
||||
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
|
||||
{
|
||||
PGresult *result = NULL;
|
||||
TimestampTz endtime;
|
||||
|
||||
/*
|
||||
* If it takes too long to execute a cleanup query, assume the connection
|
||||
* is dead. It's fairly likely that this is why we aborted in the first
|
||||
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
|
||||
* be too long.
|
||||
*/
|
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
|
||||
|
||||
/*
|
||||
* Submit a query. Since we don't use non-blocking mode, this also can
|
||||
* block. But its risk is relatively small, so we ignore that for now.
|
||||
*/
|
||||
if (!PQsendQuery(conn, query))
|
||||
{
|
||||
pgfdw_report_error(WARNING, NULL, conn, false, query);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Get the result of the query. */
|
||||
if (pgfdw_get_cleanup_result(conn, endtime, &result))
|
||||
return false;
|
||||
|
||||
/* Issue a warning if not successful. */
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
pgfdw_report_error(WARNING, result, conn, true, query);
|
||||
return ignore_errors;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get, during abort cleanup, the result of a query that is in progress. This
|
||||
* might be a query that is being interrupted by transaction abort, or it might
|
||||
* be a query that was initiated as part of transaction abort to get the remote
|
||||
* side back to the appropriate state.
|
||||
*
|
||||
* It's not a huge problem if we throw an ERROR here, but if we get into error
|
||||
* recursion trouble, we'll end up slamming the connection shut, which will
|
||||
* necessitate failing the entire toplevel transaction even if subtransactions
|
||||
* were used. Try to use WARNING where we can.
|
||||
*
|
||||
* endtime is the time at which we should give up and assume the remote
|
||||
* side is dead. Returns true if the timeout expired, otherwise false.
|
||||
* Sets *result except in case of a timeout.
|
||||
*/
|
||||
static bool
|
||||
pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
|
||||
{
|
||||
PGresult *last_res = NULL;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
PGresult *res;
|
||||
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
int wc;
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
long secs;
|
||||
int microsecs;
|
||||
long cur_timeout;
|
||||
|
||||
/* If timeout has expired, give up, else get sleep time. */
|
||||
if (now >= endtime)
|
||||
return true;
|
||||
TimestampDifference(now, endtime, &secs, µsecs);
|
||||
|
||||
/* To protect against clock skew, limit sleep to one minute. */
|
||||
cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(&MyProc->procLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
|
||||
PQsocket(conn),
|
||||
cur_timeout);
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
*result = NULL;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
if (res == NULL)
|
||||
break; /* query is complete */
|
||||
|
||||
PQclear(last_res);
|
||||
last_res = res;
|
||||
}
|
||||
|
||||
*result = last_res;
|
||||
return false;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user