mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-30 19:00:29 +08:00
Refactor pg_receivexlog main loop code, for readability.
Previously the source codes for receiving the data and for polling the socket were included in pg_receivexlog main loop. This commit splits out them as separate functions. This is useful for improving the readability of main loop code and making the future pg_receivexlog-related patch simpler.
This commit is contained in:
parent
644d85351e
commit
74cbe966fe
@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
|
||||
uint32 timeline, char *basedir,
|
||||
stream_stop_callback stream_stop, int standby_message_timeout,
|
||||
char *partial_suffix, XLogRecPtr *stoppos);
|
||||
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
|
||||
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
|
||||
|
||||
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
|
||||
uint32 *timeline);
|
||||
@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
int bytes_written;
|
||||
int64 now;
|
||||
int hdr_len;
|
||||
|
||||
if (copybuf != NULL)
|
||||
{
|
||||
PQfreemem(copybuf);
|
||||
copybuf = NULL;
|
||||
}
|
||||
long sleeptime;
|
||||
|
||||
/*
|
||||
* Check if we should continue streaming, or abort at this point.
|
||||
@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
last_status = now;
|
||||
}
|
||||
|
||||
r = PQgetCopyData(conn, ©buf, 1);
|
||||
if (r == 0)
|
||||
/*
|
||||
* Compute how long send/receive loops should sleep
|
||||
*/
|
||||
if (standby_message_timeout && still_sending)
|
||||
{
|
||||
/*
|
||||
* No data available. Wait for some to appear, but not longer than
|
||||
* the specified timeout, so that we can ping the server.
|
||||
*/
|
||||
fd_set input_mask;
|
||||
struct timeval timeout;
|
||||
struct timeval *timeoutptr;
|
||||
int64 targettime;
|
||||
long secs;
|
||||
int usecs;
|
||||
|
||||
FD_ZERO(&input_mask);
|
||||
FD_SET(PQsocket(conn), &input_mask);
|
||||
if (standby_message_timeout && still_sending)
|
||||
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
|
||||
feTimestampDifference(now,
|
||||
targettime,
|
||||
&secs,
|
||||
&usecs);
|
||||
/* Always sleep at least 1 sec */
|
||||
if (secs <= 0)
|
||||
{
|
||||
int64 targettime;
|
||||
long secs;
|
||||
int usecs;
|
||||
secs = 1;
|
||||
usecs = 0;
|
||||
}
|
||||
|
||||
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
|
||||
feTimestampDifference(now,
|
||||
targettime,
|
||||
&secs,
|
||||
&usecs);
|
||||
if (secs <= 0)
|
||||
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
|
||||
else
|
||||
timeout.tv_sec = secs;
|
||||
timeout.tv_usec = usecs;
|
||||
timeoutptr = &timeout;
|
||||
}
|
||||
else
|
||||
timeoutptr = NULL;
|
||||
|
||||
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
|
||||
if (r == 0 || (r < 0 && errno == EINTR))
|
||||
{
|
||||
/*
|
||||
* Got a timeout or signal. Continue the loop and either
|
||||
* deliver a status packet to the server or just go back into
|
||||
* blocking.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
else if (r < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: select() failed: %s\n"),
|
||||
progname, strerror(errno));
|
||||
goto error;
|
||||
}
|
||||
/* Else there is actually data on the socket */
|
||||
if (PQconsumeInput(conn) == 0)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not receive data from WAL stream: %s"),
|
||||
progname, PQerrorMessage(conn));
|
||||
goto error;
|
||||
}
|
||||
continue;
|
||||
sleeptime = secs * 1000 + usecs / 1000;
|
||||
}
|
||||
else
|
||||
sleeptime = -1;
|
||||
|
||||
r = CopyStreamReceive(conn, sleeptime, ©buf);
|
||||
if (r == 0)
|
||||
continue;
|
||||
if (r == -1)
|
||||
goto error;
|
||||
if (r == -2)
|
||||
{
|
||||
PGresult *res = PQgetResult(conn);
|
||||
|
||||
@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
}
|
||||
if (copybuf != NULL)
|
||||
PQfreemem(copybuf);
|
||||
copybuf = NULL;
|
||||
*stoppos = blockpos;
|
||||
return res;
|
||||
}
|
||||
if (r == -2)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
||||
progname, PQerrorMessage(conn));
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Check the message type. */
|
||||
if (copybuf[0] == 'k')
|
||||
@ -1056,3 +1019,115 @@ error:
|
||||
PQfreemem(copybuf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait until we can read CopyData message, or timeout.
|
||||
*
|
||||
* Returns 1 if data has become available for reading, 0 if timed out
|
||||
* or interrupted by signal, and -1 on an error.
|
||||
*/
|
||||
static int
|
||||
CopyStreamPoll(PGconn *conn, long timeout_ms)
|
||||
{
|
||||
int ret;
|
||||
fd_set input_mask;
|
||||
struct timeval timeout;
|
||||
struct timeval *timeoutptr;
|
||||
|
||||
if (PQsocket(conn) < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: socket not open"), progname);
|
||||
return -1;
|
||||
}
|
||||
|
||||
FD_ZERO(&input_mask);
|
||||
FD_SET(PQsocket(conn), &input_mask);
|
||||
|
||||
if (timeout_ms < 0)
|
||||
timeoutptr = NULL;
|
||||
else
|
||||
{
|
||||
timeout.tv_sec = timeout_ms / 1000L;
|
||||
timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
|
||||
timeoutptr = &timeout;
|
||||
}
|
||||
|
||||
ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
|
||||
if (ret == 0 || (ret < 0 && errno == EINTR))
|
||||
return 0; /* Got a timeout or signal */
|
||||
else if (ret < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: select() failed: %s\n"),
|
||||
progname, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive CopyData message available from XLOG stream, blocking for
|
||||
* maximum of 'timeout' ms.
|
||||
*
|
||||
* If data was received, returns the length of the data. *buffer is set to
|
||||
* point to a buffer holding the received message. The buffer is only valid
|
||||
* until the next CopyStreamReceive call.
|
||||
*
|
||||
* 0 if no data was available within timeout, or wait was interrupted
|
||||
* by signal. -1 on error. -2 if the server ended the COPY.
|
||||
*/
|
||||
static int
|
||||
CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
|
||||
{
|
||||
static char *copybuf = NULL;
|
||||
int rawlen;
|
||||
|
||||
if (copybuf != NULL)
|
||||
PQfreemem(copybuf);
|
||||
copybuf = NULL;
|
||||
*buffer = NULL;
|
||||
|
||||
/* Try to receive a CopyData message */
|
||||
rawlen = PQgetCopyData(conn, ©buf, 1);
|
||||
if (rawlen == 0)
|
||||
{
|
||||
/*
|
||||
* No data available. Wait for some to appear, but not longer than
|
||||
* the specified timeout, so that we can ping the server.
|
||||
*/
|
||||
if (timeout > 0)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = CopyStreamPoll(conn, timeout);
|
||||
if (ret <= 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Else there is actually data on the socket */
|
||||
if (PQconsumeInput(conn) == 0)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not receive data from WAL stream: %s"),
|
||||
progname, PQerrorMessage(conn));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Now that we've consumed some input, try again */
|
||||
rawlen = PQgetCopyData(conn, ©buf, 1);
|
||||
if (rawlen == 0)
|
||||
return 0;
|
||||
}
|
||||
if (rawlen == -1) /* end-of-streaming or error */
|
||||
return -2;
|
||||
if (rawlen == -2)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not read COPY data: %s"),
|
||||
progname, PQerrorMessage(conn));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Return received messages to caller */
|
||||
*buffer = copybuf;
|
||||
return rawlen;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user