From 03a571a4cf26e06ff504e5b38a9432a003008c19 Mon Sep 17 00:00:00 2001 From: Magnus Hagander Date: Mon, 19 Apr 2010 14:10:45 +0000 Subject: [PATCH] Add wrapper function libpqrcv_PQexec() in the walreceiver that uses async libpq to send queries, making the waiting for responses interruptible on platforms where PQexec() can't normally be interrupted by signals, such as win32. Fujii Masao and Magnus Hagander --- .../libpqwalreceiver/libpqwalreceiver.c | 88 ++++++++++++++++++- src/backend/replication/walreceiver.c | 6 +- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9e318e669d..d41858e49a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void); /* Prototypes for private functions */ static bool libpq_select(int timeout_ms); +static PGresult *libpqrcv_PQexec(const char *query); /* * Module load callback @@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); - res = PQexec(streamConn, cmd); + res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); ereport(ERROR, (errmsg("could not start WAL streaming: %s", PQerrorMessage(streamConn)))); + } PQclear(res); justconnected = true; @@ -224,6 +228,84 @@ libpq_select(int timeout_ms) return true; } +/* + * Send a query and wait for the results by using the asynchronous libpq + * functions and the backend version of select(). + * + * We must not use the regular blocking libpq functions like PQexec() + * since they are uninterruptible by signals on some platforms, such as + * Windows. + * + * We must also not use vanilla select() here since it cannot handle the + * signal emulation layer on Windows. + * + * The function is modeled on PQexec() in libpq, but only implements + * those parts that are in use in the walreceiver. + * + * Queries are always executed on the connection in streamConn. + */ +static PGresult * +libpqrcv_PQexec(const char *query) +{ + PGresult *result = NULL; + PGresult *lastResult = NULL; + + /* + * PQexec() silently discards any prior query results on the + * connection. This is not required for walreceiver since it's + * expected that walsender won't generate any such junk results. + */ + + /* + * Submit a query. Since we don't use non-blocking mode, this also + * can block. But its risk is relatively small, so we ignore that + * for now. + */ + if (!PQsendQuery(streamConn, query)) + return NULL; + + for (;;) + { + /* + * Receive data until PQgetResult is ready to get the result + * without blocking. + */ + while (PQisBusy(streamConn)) + { + /* + * We don't need to break down the sleep into smaller increments, + * and check for interrupts after each nap, since we can just + * elog(FATAL) within SIGTERM signal handler if the signal + * arrives in the middle of establishment of replication connection. + */ + if (!libpq_select(-1)) + continue; /* interrupted */ + if (PQconsumeInput(streamConn) == 0) + return NULL; /* trouble */ + } + + /* + * Emulate the PQexec()'s behavior of returning the last result + * when there are many. + * Since walsender will never generate multiple results, we skip + * the concatenation of error messages. + */ + result = PQgetResult(streamConn); + if (result == NULL) + break; /* query is complete */ + + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQstatus(streamConn) == CONNECTION_BAD) + break; + } + + return lastResult; +} + /* * Disconnect connection to primary, if any. */ diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 090111bb11..f2694db873 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void); * We can't just exit(1) within SIGTERM signal handler, because the signal * might arrive in the middle of some critical operation, like while we're * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking libpq - * operations like PQexec(), which can take a long time to finish. + * check it in the main loop, because we perform some blocking operations + * like libpqrcv_PQexec(), which can take a long time to finish. * * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's * safe for the signal handler to elog(FATAL) immediately. Otherwise it just