mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-15 08:20:16 +08:00
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
This commit is contained in:
parent
ccb161b66a
commit
2c0a485896
@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
|
||||
}
|
||||
|
||||
/* send the WAL file itself */
|
||||
_tarWriteHeader(pathbuf, NULL, &statbuf);
|
||||
|
||||
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
|
||||
@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
}
|
||||
|
||||
/* XLogSegSize is a multiple of 512, so no need for padding */
|
||||
|
||||
FreeFile(fp);
|
||||
|
||||
/*
|
||||
* Mark file as archived, otherwise files can get archived again
|
||||
* after promotion of a new node. This is in line with
|
||||
* walreceiver.c always doing a XLogArchiveForceDone() after a
|
||||
* complete segment.
|
||||
*/
|
||||
StatusFilePath(pathbuf, walFiles[i], ".done");
|
||||
sendFileWithContent(pathbuf, "");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
|
||||
errmsg("could not stat file \"%s\": %m", pathbuf)));
|
||||
|
||||
sendFile(pathbuf, pathbuf, &statbuf, false);
|
||||
|
||||
/* unconditionally mark file as archived */
|
||||
StatusFilePath(pathbuf, fname, ".done");
|
||||
sendFileWithContent(pathbuf, "");
|
||||
}
|
||||
|
||||
/* Send CopyDone message for the last tar file */
|
||||
@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
|
||||
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
|
||||
}
|
||||
size += 512; /* Size of the header just added */
|
||||
|
||||
/*
|
||||
* Also send archive_status directory (by hackishly reusing
|
||||
* statbuf from above ...).
|
||||
*/
|
||||
if (!sizeonly)
|
||||
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
|
||||
size += 512; /* Size of the header just added */
|
||||
|
||||
continue; /* don't recurse into pg_xlog */
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
#include "common/string.h"
|
||||
#include "getopt_long.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "pqexpbuffer.h"
|
||||
@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
|
||||
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
|
||||
param->sysidentifier, param->xlogdir,
|
||||
reached_end_position, standby_message_timeout,
|
||||
NULL, false))
|
||||
NULL, false, true))
|
||||
|
||||
/*
|
||||
* Any errors will already have been reported in the function process,
|
||||
@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
||||
logstreamer_param *param;
|
||||
uint32 hi,
|
||||
lo;
|
||||
char statusdir[MAXPGPATH];
|
||||
|
||||
param = pg_malloc0(sizeof(logstreamer_param));
|
||||
param->timeline = timeline;
|
||||
@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
||||
/* Error message already written in GetConnection() */
|
||||
exit(1);
|
||||
|
||||
/*
|
||||
* Always in plain format, so we can write to basedir/pg_xlog. But the
|
||||
* directory entry in the tar file may arrive later, so make sure it's
|
||||
* created before we start.
|
||||
*/
|
||||
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
|
||||
verify_dir_is_empty_or_create(param->xlogdir);
|
||||
|
||||
/*
|
||||
* Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
|
||||
* basedir/pg_xlog as the directory entry in the tar file may arrive
|
||||
* later.
|
||||
*/
|
||||
snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
|
||||
basedir);
|
||||
|
||||
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not create directory \"%s\": %s\n"),
|
||||
progname, statusdir, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
|
||||
/*
|
||||
* Start a child process and tell it to start streaming. On Unix, this is
|
||||
@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
|
||||
* by the wal receiver process. Also, when transaction
|
||||
* log directory location was specified, pg_xlog has
|
||||
* already been created as a symbolic link before
|
||||
* starting the actual backup. So just ignore failure
|
||||
* on them.
|
||||
* starting the actual backup. So just ignore creation
|
||||
* failures on related directories.
|
||||
*/
|
||||
if ((!streamwal && (strcmp(xlog_dir, "") == 0))
|
||||
|| strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
|
||||
if (!((pg_str_endswith(filename, "/pg_xlog") ||
|
||||
pg_str_endswith(filename, "/archive_status")) &&
|
||||
errno == EEXIST))
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not create directory \"%s\": %s\n"),
|
||||
|
@ -342,7 +342,7 @@ StreamLog(void)
|
||||
|
||||
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
|
||||
stop_streaming, standby_message_timeout, ".partial",
|
||||
synchronous);
|
||||
synchronous, false);
|
||||
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
|
@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
|
||||
uint32 timeline, char *basedir,
|
||||
stream_stop_callback stream_stop, int standby_message_timeout,
|
||||
char *partial_suffix, XLogRecPtr *stoppos,
|
||||
bool synchronous);
|
||||
bool synchronous, bool mark_done);
|
||||
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
|
||||
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
|
||||
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
|
||||
@ -45,20 +45,50 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
|
||||
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
|
||||
XLogRecPtr *blockpos, uint32 timeline,
|
||||
char *basedir, stream_stop_callback stream_stop,
|
||||
char *partial_suffix);
|
||||
char *partial_suffix, bool mark_done);
|
||||
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
|
||||
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
|
||||
XLogRecPtr *stoppos);
|
||||
XLogRecPtr *stoppos, bool mark_done);
|
||||
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
|
||||
uint32 timeline, char *basedir,
|
||||
stream_stop_callback stream_stop,
|
||||
char *partial_suffix, XLogRecPtr *stoppos);
|
||||
char *partial_suffix, XLogRecPtr *stoppos,
|
||||
bool mark_done);
|
||||
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
|
||||
int64 last_status);
|
||||
|
||||
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
|
||||
uint32 *timeline);
|
||||
|
||||
static bool
|
||||
mark_file_as_archived(const char *basedir, const char *fname)
|
||||
{
|
||||
int fd;
|
||||
static char tmppath[MAXPGPATH];
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
|
||||
basedir, fname);
|
||||
|
||||
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fsync(fd) != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
close(fd);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Open a new WAL file in the specified directory.
|
||||
*
|
||||
@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
|
||||
* and returns false, otherwise returns true.
|
||||
*/
|
||||
static bool
|
||||
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
|
||||
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
|
||||
{
|
||||
off_t currpos;
|
||||
|
||||
@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
|
||||
_("%s: not renaming \"%s%s\", segment is not complete\n"),
|
||||
progname, current_walfile_name, partial_suffix);
|
||||
|
||||
/*
|
||||
* Mark file as archived if requested by the caller - pg_basebackup needs
|
||||
* to do so as files can otherwise get archived again after promotion of a
|
||||
* new node. This is in line with walreceiver.c always doing a
|
||||
* XLogArchiveForceDone() after a complete segment.
|
||||
*/
|
||||
if (currpos == XLOG_SEG_SIZE && mark_done)
|
||||
{
|
||||
/* writes error message if failed */
|
||||
if (!mark_file_as_archived(basedir, current_walfile_name))
|
||||
return false;
|
||||
}
|
||||
|
||||
lastFlushPosition = pos;
|
||||
return true;
|
||||
}
|
||||
@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
|
||||
}
|
||||
|
||||
static bool
|
||||
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
|
||||
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
|
||||
char *content, bool mark_done)
|
||||
{
|
||||
int size = strlen(content);
|
||||
char path[MAXPGPATH];
|
||||
@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Maintain archive_status, check close_walfile() for details. */
|
||||
if (mark_done)
|
||||
{
|
||||
/* writes error message if failed */
|
||||
if (!mark_file_as_archived(basedir, histfname))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
char *sysidentifier, char *basedir,
|
||||
stream_stop_callback stream_stop,
|
||||
int standby_message_timeout, char *partial_suffix,
|
||||
bool synchronous)
|
||||
bool synchronous, bool mark_done)
|
||||
{
|
||||
char query[128];
|
||||
char slotcmd[128];
|
||||
@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
/* Write the history file to disk */
|
||||
writeTimeLineHistoryFile(basedir, timeline,
|
||||
PQgetvalue(res, 0, 0),
|
||||
PQgetvalue(res, 0, 1));
|
||||
PQgetvalue(res, 0, 1),
|
||||
mark_done);
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
/* Stream the WAL */
|
||||
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
|
||||
standby_message_timeout, partial_suffix,
|
||||
&stoppos, synchronous);
|
||||
&stoppos, synchronous, mark_done);
|
||||
if (res == NULL)
|
||||
goto error;
|
||||
|
||||
@ -757,7 +810,7 @@ static PGresult *
|
||||
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
char *basedir, stream_stop_callback stream_stop,
|
||||
int standby_message_timeout, char *partial_suffix,
|
||||
XLogRecPtr *stoppos, bool synchronous)
|
||||
XLogRecPtr *stoppos, bool synchronous, bool mark_done)
|
||||
{
|
||||
char *copybuf = NULL;
|
||||
int64 last_status = -1;
|
||||
@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
* Check if we should continue streaming, or abort at this point.
|
||||
*/
|
||||
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
|
||||
stream_stop, partial_suffix, stoppos))
|
||||
stream_stop, partial_suffix, stoppos,
|
||||
mark_done))
|
||||
goto error;
|
||||
|
||||
now = feGetCurrentTimestamp();
|
||||
@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
if (r == -2)
|
||||
{
|
||||
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
|
||||
basedir, partial_suffix, stoppos);
|
||||
basedir, partial_suffix,
|
||||
stoppos, mark_done);
|
||||
if (res == NULL)
|
||||
goto error;
|
||||
else
|
||||
@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
|
||||
else if (copybuf[0] == 'w')
|
||||
{
|
||||
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
|
||||
timeline, basedir, stream_stop, partial_suffix))
|
||||
timeline, basedir, stream_stop,
|
||||
partial_suffix, true))
|
||||
goto error;
|
||||
|
||||
/*
|
||||
* Check if we should continue streaming, or abort at this point.
|
||||
*/
|
||||
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
|
||||
stream_stop, partial_suffix, stoppos))
|
||||
stream_stop, partial_suffix, stoppos,
|
||||
mark_done))
|
||||
goto error;
|
||||
}
|
||||
else
|
||||
@ -1055,7 +1112,7 @@ static bool
|
||||
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
|
||||
XLogRecPtr *blockpos, uint32 timeline,
|
||||
char *basedir, stream_stop_callback stream_stop,
|
||||
char *partial_suffix)
|
||||
char *partial_suffix, bool mark_done)
|
||||
{
|
||||
int xlogoff;
|
||||
int bytes_left;
|
||||
@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if (*blockpos % XLOG_SEG_SIZE == 0)
|
||||
{
|
||||
if (!close_walfile(basedir, partial_suffix, *blockpos))
|
||||
if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
|
||||
/* Error message written in close_walfile() */
|
||||
return false;
|
||||
|
||||
@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
|
||||
static PGresult *
|
||||
HandleEndOfCopyStream(PGconn *conn, char *copybuf,
|
||||
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
|
||||
XLogRecPtr *stoppos)
|
||||
XLogRecPtr *stoppos, bool mark_done)
|
||||
{
|
||||
PGresult *res = PQgetResult(conn);
|
||||
|
||||
@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
|
||||
*/
|
||||
if (still_sending)
|
||||
{
|
||||
if (!close_walfile(basedir, partial_suffix, blockpos))
|
||||
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
|
||||
{
|
||||
/* Error message written in close_walfile() */
|
||||
PQclear(res);
|
||||
@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
|
||||
static bool
|
||||
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
|
||||
char *basedir, stream_stop_callback stream_stop,
|
||||
char *partial_suffix, XLogRecPtr *stoppos)
|
||||
char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
|
||||
{
|
||||
if (still_sending && stream_stop(blockpos, timeline, false))
|
||||
{
|
||||
if (!close_walfile(basedir, partial_suffix, blockpos))
|
||||
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
|
||||
{
|
||||
/* Potential error message is written by close_walfile */
|
||||
return false;
|
||||
|
@ -31,6 +31,7 @@ extern bool ReceiveXlogStream(PGconn *conn,
|
||||
stream_stop_callback stream_stop,
|
||||
int standby_message_timeout,
|
||||
char *partial_suffix,
|
||||
bool synchronous);
|
||||
bool synchronous,
|
||||
bool mark_done);
|
||||
|
||||
#endif /* RECEIVELOG_H */
|
||||
|
Loading…
Reference in New Issue
Block a user