diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 7c50b01a57..c15776fc58 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -105,6 +105,21 @@ PostgreSQL documentation
+
+
+
+
+
+ Specifies the maximum time to issue sync commands to ensure the
+ received WAL file is safely flushed to disk, in seconds. The default
+ value is zero, which disables issuing fsyncs except when WAL file is
+ closed. If -1 is specified, WAL file is flushed as
+ soon as possible, that is, as soon as there are WAL data which has
+ not been flushed yet.
+
+
+
+
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 5df2eb8c0d..0b02c4c401 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
- NULL))
+ NULL, 0))
/*
* Any errors will already have been reported in the function process,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 9640838906..0b7af54a7b 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -36,6 +36,7 @@ static char *basedir = NULL;
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
+static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
@@ -62,6 +63,8 @@ usage(void)
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -F --fsync-interval=INTERVAL\n"
+ " frequency of syncs to transaction log files (in seconds)\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
@@ -330,7 +333,8 @@ StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
- stop_streaming, standby_message_timeout, ".partial");
+ stop_streaming, standby_message_timeout, ".partial",
+ fsync_interval);
PQfinish(conn);
}
@@ -360,6 +364,7 @@ main(int argc, char **argv)
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
@@ -389,7 +394,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+ while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
@@ -436,6 +441,15 @@ main(int argc, char **argv)
case 'n':
noloop = 1;
break;
+ case 'F':
+ fsync_interval = atoi(optarg) * 1000;
+ if (fsync_interval < -1000)
+ {
+ fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d28e13b4d8..89b22f20e2 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
- char *partial_suffix, XLogRecPtr *stoppos);
+ char *partial_suffix, XLogRecPtr *stoppos,
+ int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+ uint32 timeline, char *basedir,
+ stream_stop_callback stream_stop,
+ char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+ int64 last_status, int fsync_interval,
+ XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
+ last_fsync = feGetCurrentTimestamp();
return true;
}
@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix)
+ int standby_message_timeout, char *partial_suffix,
+ int fsync_interval)
{
char query[128];
char slotcmd[128];
@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
- &stoppos);
+ &stoppos, fsync_interval);
if (res == NULL)
goto error;
@@ -746,7 +760,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos)
+ XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
- if (still_sending && stream_stop(blockpos, timeline, false))
+ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+ stream_stop, partial_suffix, stoppos))
+ goto error;
+
+ now = feGetCurrentTimestamp();
+
+ /*
+ * If fsync_interval has elapsed since last WAL flush and we've written
+ * some WAL data, flush them to disk.
+ */
+ if (lastFlushPosition < blockpos &&
+ walfile != -1 &&
+ ((fsync_interval > 0 &&
+ feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+ fsync_interval < 0))
{
- if (!close_walfile(basedir, partial_suffix, blockpos))
+ if (fsync(walfile) != 0)
{
- /* Potential error message is written by close_walfile */
+ fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ progname, current_walfile_name, strerror(errno));
goto error;
}
- if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
- {
- fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- progname, PQerrorMessage(conn));
- goto error;
- }
- still_sending = false;
+
+ lastFlushPosition = blockpos;
+ last_fsync = now;
}
/*
* Potentially send a status message to the master
*/
- now = feGetCurrentTimestamp();
if (still_sending && standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/*
- * Compute how long send/receive loops should sleep
+ * Calculate how long send/receive loops should sleep
*/
- if (standby_message_timeout && still_sending)
- {
- int64 targettime;
- long secs;
- int usecs;
-
- targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
- feTimestampDifference(now,
- targettime,
- &secs,
- &usecs);
- /* Always sleep at least 1 sec */
- if (secs <= 0)
- {
- secs = 1;
- usecs = 0;
- }
-
- sleeptime = secs * 1000 + usecs / 1000;
- }
- else
- sleeptime = -1;
+ sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+ last_status, fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, ©buf);
- if (r == 0)
- continue;
- if (r == -1)
- goto error;
- if (r == -2)
+ while (r != 0)
{
- PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
- basedir, partial_suffix, stoppos);
- if (res == NULL)
+ if (r == -1)
goto error;
- else
- return res;
- }
+ if (r == -2)
+ {
+ PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+ basedir, partial_suffix, stoppos);
+ if (res == NULL)
+ goto error;
+ else
+ return res;
+ }
- /* Check the message type. */
- if (copybuf[0] == 'k')
- {
- if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
- &last_status))
+ /* Check the message type. */
+ if (copybuf[0] == 'k')
+ {
+ if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+ &last_status))
+ goto error;
+ }
+ else if (copybuf[0] == 'w')
+ {
+ if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+ timeline, basedir, stream_stop, partial_suffix))
+ goto error;
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+ stream_stop, partial_suffix, stoppos))
+ goto error;
+ }
+ else
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
goto error;
- }
- else if (copybuf[0] == 'w')
- {
- if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
- timeline, basedir, stream_stop, partial_suffix))
- goto error;
- }
- else
- {
- fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
- progname, copybuf[0]);
- goto error;
+ }
+
+ /*
+ * Process the received data, and any subsequent data we
+ * can read without blocking.
+ */
+ r = CopyStreamReceive(conn, 0, ©buf);
}
}
@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*stoppos = blockpos;
return res;
}
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+ char *basedir, stream_stop_callback stream_stop,
+ char *partial_suffix, XLogRecPtr *stoppos)
+{
+ if (still_sending && stream_stop(blockpos, timeline, false))
+ {
+ if (!close_walfile(basedir, partial_suffix, blockpos))
+ {
+ /* Potential error message is written by close_walfile */
+ return false;
+ }
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ still_sending = false;
+ }
+
+ return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+ int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+ int64 targettime = 0;
+ int64 status_targettime = 0;
+ int64 fsync_targettime = 0;
+ long sleeptime;
+
+ if (standby_message_timeout && still_sending)
+ status_targettime = last_status +
+ (standby_message_timeout - 1) * ((int64) 1000);
+
+ if (fsync_interval > 0 && lastFlushPosition < blockpos)
+ fsync_targettime = last_fsync +
+ (fsync_interval - 1) * ((int64) 1000);
+
+ if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+ fsync_targettime == 0)
+ targettime = status_targettime;
+ else
+ targettime = fsync_targettime;
+
+ if (targettime > 0)
+ {
+ long secs;
+ int usecs;
+
+ feTimestampDifference(now,
+ targettime,
+ &secs,
+ &usecs);
+ /* Always sleep at least 1 sec */
+ if (secs <= 0)
+ {
+ secs = 1;
+ usecs = 0;
+ }
+
+ sleeptime = secs * 1000 + usecs / 1000;
+ }
+ else
+ sleeptime = -1;
+
+ return sleeptime;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index f4789a580a..72f8245373 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
- char *partial_suffix);
+ char *partial_suffix,
+ int fsync_interval);