mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-03-13 19:57:53 +08:00
Refactor pg_receivexlog main loop code, for readability.
Previously the source codes for receiving the data and for polling the socket were included in pg_receivexlog main loop. This commit splits out them as separate functions. This is useful for improving the readability of main loop code and making the future pg_receivexlog-related patch simpler.
This commit is contained in:
parent
644d85351e
commit
74cbe966fe
@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
|
|||||||
uint32 timeline, char *basedir,
|
uint32 timeline, char *basedir,
|
||||||
stream_stop_callback stream_stop, int standby_message_timeout,
|
stream_stop_callback stream_stop, int standby_message_timeout,
|
||||||
char *partial_suffix, XLogRecPtr *stoppos);
|
char *partial_suffix, XLogRecPtr *stoppos);
|
||||||
|
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
|
||||||
|
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
|
||||||
|
|
||||||
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
|
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
|
||||||
uint32 *timeline);
|
uint32 *timeline);
|
||||||
@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
int bytes_written;
|
int bytes_written;
|
||||||
int64 now;
|
int64 now;
|
||||||
int hdr_len;
|
int hdr_len;
|
||||||
|
long sleeptime;
|
||||||
if (copybuf != NULL)
|
|
||||||
{
|
|
||||||
PQfreemem(copybuf);
|
|
||||||
copybuf = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if we should continue streaming, or abort at this point.
|
* Check if we should continue streaming, or abort at this point.
|
||||||
@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
last_status = now;
|
last_status = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
r = PQgetCopyData(conn, ©buf, 1);
|
/*
|
||||||
if (r == 0)
|
* Compute how long send/receive loops should sleep
|
||||||
|
*/
|
||||||
|
if (standby_message_timeout && still_sending)
|
||||||
{
|
{
|
||||||
/*
|
int64 targettime;
|
||||||
* No data available. Wait for some to appear, but not longer than
|
long secs;
|
||||||
* the specified timeout, so that we can ping the server.
|
int usecs;
|
||||||
*/
|
|
||||||
fd_set input_mask;
|
|
||||||
struct timeval timeout;
|
|
||||||
struct timeval *timeoutptr;
|
|
||||||
|
|
||||||
FD_ZERO(&input_mask);
|
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
|
||||||
FD_SET(PQsocket(conn), &input_mask);
|
feTimestampDifference(now,
|
||||||
if (standby_message_timeout && still_sending)
|
targettime,
|
||||||
|
&secs,
|
||||||
|
&usecs);
|
||||||
|
/* Always sleep at least 1 sec */
|
||||||
|
if (secs <= 0)
|
||||||
{
|
{
|
||||||
int64 targettime;
|
secs = 1;
|
||||||
long secs;
|
usecs = 0;
|
||||||
int usecs;
|
}
|
||||||
|
|
||||||
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
|
sleeptime = secs * 1000 + usecs / 1000;
|
||||||
feTimestampDifference(now,
|
|
||||||
targettime,
|
|
||||||
&secs,
|
|
||||||
&usecs);
|
|
||||||
if (secs <= 0)
|
|
||||||
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
|
|
||||||
else
|
|
||||||
timeout.tv_sec = secs;
|
|
||||||
timeout.tv_usec = usecs;
|
|
||||||
timeoutptr = &timeout;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
timeoutptr = NULL;
|
|
||||||
|
|
||||||
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
|
|
||||||
if (r == 0 || (r < 0 && errno == EINTR))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Got a timeout or signal. Continue the loop and either
|
|
||||||
* deliver a status packet to the server or just go back into
|
|
||||||
* blocking.
|
|
||||||
*/
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else if (r < 0)
|
|
||||||
{
|
|
||||||
fprintf(stderr, _("%s: select() failed: %s\n"),
|
|
||||||
progname, strerror(errno));
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
/* Else there is actually data on the socket */
|
|
||||||
if (PQconsumeInput(conn) == 0)
|
|
||||||
{
|
|
||||||
fprintf(stderr,
|
|
||||||
_("%s: could not receive data from WAL stream: %s"),
|
|
||||||
progname, PQerrorMessage(conn));
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
sleeptime = -1;
|
||||||
|
|
||||||
|
r = CopyStreamReceive(conn, sleeptime, ©buf);
|
||||||
|
if (r == 0)
|
||||||
|
continue;
|
||||||
if (r == -1)
|
if (r == -1)
|
||||||
|
goto error;
|
||||||
|
if (r == -2)
|
||||||
{
|
{
|
||||||
PGresult *res = PQgetResult(conn);
|
PGresult *res = PQgetResult(conn);
|
||||||
|
|
||||||
@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
|||||||
}
|
}
|
||||||
if (copybuf != NULL)
|
if (copybuf != NULL)
|
||||||
PQfreemem(copybuf);
|
PQfreemem(copybuf);
|
||||||
|
copybuf = NULL;
|
||||||
*stoppos = blockpos;
|
*stoppos = blockpos;
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
if (r == -2)
|
|
||||||
{
|
|
||||||
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
|
||||||
progname, PQerrorMessage(conn));
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Check the message type. */
|
/* Check the message type. */
|
||||||
if (copybuf[0] == 'k')
|
if (copybuf[0] == 'k')
|
||||||
@ -1056,3 +1019,115 @@ error:
|
|||||||
PQfreemem(copybuf);
|
PQfreemem(copybuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Wait until we can read CopyData message, or timeout.
|
||||||
|
*
|
||||||
|
* Returns 1 if data has become available for reading, 0 if timed out
|
||||||
|
* or interrupted by signal, and -1 on an error.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CopyStreamPoll(PGconn *conn, long timeout_ms)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
fd_set input_mask;
|
||||||
|
struct timeval timeout;
|
||||||
|
struct timeval *timeoutptr;
|
||||||
|
|
||||||
|
if (PQsocket(conn) < 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: socket not open"), progname);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
FD_ZERO(&input_mask);
|
||||||
|
FD_SET(PQsocket(conn), &input_mask);
|
||||||
|
|
||||||
|
if (timeout_ms < 0)
|
||||||
|
timeoutptr = NULL;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
timeout.tv_sec = timeout_ms / 1000L;
|
||||||
|
timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
|
||||||
|
timeoutptr = &timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
|
||||||
|
if (ret == 0 || (ret < 0 && errno == EINTR))
|
||||||
|
return 0; /* Got a timeout or signal */
|
||||||
|
else if (ret < 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: select() failed: %s\n"),
|
||||||
|
progname, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Receive CopyData message available from XLOG stream, blocking for
|
||||||
|
* maximum of 'timeout' ms.
|
||||||
|
*
|
||||||
|
* If data was received, returns the length of the data. *buffer is set to
|
||||||
|
* point to a buffer holding the received message. The buffer is only valid
|
||||||
|
* until the next CopyStreamReceive call.
|
||||||
|
*
|
||||||
|
* 0 if no data was available within timeout, or wait was interrupted
|
||||||
|
* by signal. -1 on error. -2 if the server ended the COPY.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
|
||||||
|
{
|
||||||
|
static char *copybuf = NULL;
|
||||||
|
int rawlen;
|
||||||
|
|
||||||
|
if (copybuf != NULL)
|
||||||
|
PQfreemem(copybuf);
|
||||||
|
copybuf = NULL;
|
||||||
|
*buffer = NULL;
|
||||||
|
|
||||||
|
/* Try to receive a CopyData message */
|
||||||
|
rawlen = PQgetCopyData(conn, ©buf, 1);
|
||||||
|
if (rawlen == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* No data available. Wait for some to appear, but not longer than
|
||||||
|
* the specified timeout, so that we can ping the server.
|
||||||
|
*/
|
||||||
|
if (timeout > 0)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
ret = CopyStreamPoll(conn, timeout);
|
||||||
|
if (ret <= 0)
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Else there is actually data on the socket */
|
||||||
|
if (PQconsumeInput(conn) == 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr,
|
||||||
|
_("%s: could not receive data from WAL stream: %s"),
|
||||||
|
progname, PQerrorMessage(conn));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Now that we've consumed some input, try again */
|
||||||
|
rawlen = PQgetCopyData(conn, ©buf, 1);
|
||||||
|
if (rawlen == 0)
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (rawlen == -1) /* end-of-streaming or error */
|
||||||
|
return -2;
|
||||||
|
if (rawlen == -2)
|
||||||
|
{
|
||||||
|
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
||||||
|
progname, PQerrorMessage(conn));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return received messages to caller */
|
||||||
|
*buffer = copybuf;
|
||||||
|
return rawlen;
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user