mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-18 18:44:06 +08:00
Use asynchronous connect API in libpqwalreceiver
This makes the connection attempt from CREATE SUBSCRIPTION and from WalReceiver interruptable by the user in case the libpq connection is hanging. The previous coding required immediate shutdown (SIGQUIT) of PostgreSQL in that situation. From: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Thom Brown <thom@linux.com>
This commit is contained in:
parent
9eb344faf5
commit
1e8a850094
@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
|
||||
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
|
||||
event_name = "WalReceiverWaitStart";
|
||||
break;
|
||||
case WAIT_EVENT_LIBPQWALRECEIVER_READ:
|
||||
event_name = "LibPQWalReceiverRead";
|
||||
case WAIT_EVENT_LIBPQWALRECEIVER:
|
||||
event_name = "LibPQWalReceiver";
|
||||
break;
|
||||
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
|
||||
event_name = "WalSenderWaitForWAL";
|
||||
|
@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
|
||||
char **err)
|
||||
{
|
||||
WalReceiverConn *conn;
|
||||
PostgresPollingStatusType status;
|
||||
const char *keys[5];
|
||||
const char *vals[5];
|
||||
int i = 0;
|
||||
@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
|
||||
Assert(i < sizeof(keys));
|
||||
|
||||
conn = palloc0(sizeof(WalReceiverConn));
|
||||
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
|
||||
conn->streamConn = PQconnectStartParams(keys, vals,
|
||||
/* expand_dbname = */ true);
|
||||
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
|
||||
{
|
||||
*err = pchomp(PQerrorMessage(conn->streamConn));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Poll connection. */
|
||||
do
|
||||
{
|
||||
/* Determine current state of the connection. */
|
||||
status = PQconnectPoll(conn->streamConn);
|
||||
|
||||
/* Sleep a bit if waiting for socket. */
|
||||
if (status == PGRES_POLLING_READING ||
|
||||
status == PGRES_POLLING_WRITING)
|
||||
{
|
||||
int extra_flag;
|
||||
int rc;
|
||||
|
||||
extra_flag = (status == PGRES_POLLING_READING
|
||||
? WL_SOCKET_READABLE
|
||||
: WL_SOCKET_WRITEABLE);
|
||||
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
rc = WaitLatchOrSocket(&MyProc->procLatch,
|
||||
WL_POSTMASTER_DEATH |
|
||||
WL_LATCH_SET | extra_flag,
|
||||
PQsocket(conn->streamConn),
|
||||
0,
|
||||
WAIT_EVENT_LIBPQWALRECEIVER);
|
||||
|
||||
/* Emergency bailout. */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
exit(1);
|
||||
|
||||
/* Interrupted. */
|
||||
if (rc & WL_LATCH_SET)
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
/* Otherwise loop until we have OK or FAILED status. */
|
||||
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
|
||||
|
||||
if (PQstatus(conn->streamConn) != CONNECTION_OK)
|
||||
{
|
||||
*err = pchomp(PQerrorMessage(conn->streamConn));
|
||||
@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
|
||||
WL_LATCH_SET,
|
||||
PQsocket(streamConn),
|
||||
0,
|
||||
WAIT_EVENT_LIBPQWALRECEIVER_READ);
|
||||
WAIT_EVENT_LIBPQWALRECEIVER);
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
exit(1);
|
||||
|
||||
|
@ -764,7 +764,7 @@ typedef enum
|
||||
WAIT_EVENT_CLIENT_WRITE,
|
||||
WAIT_EVENT_SSL_OPEN_SERVER,
|
||||
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
|
||||
WAIT_EVENT_LIBPQWALRECEIVER_READ,
|
||||
WAIT_EVENT_LIBPQWALRECEIVER,
|
||||
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
|
||||
WAIT_EVENT_WAL_SENDER_WRITE_DATA
|
||||
} WaitEventClient;
|
||||
|
Loading…
Reference in New Issue
Block a user