mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-24 18:55:04 +08:00
Allow pg_basebackup to stream transaction log in tar mode
This will write the received transaction log into a file called pg_wal.tar(.gz) next to the other tarfiles instead of writing it to base.tar. When using fetch mode, the transaction log is still written to base.tar like before, and when used against a pre-10 server, the file is named pg_xlog.tar. To do this, implement a new concept of a "walmethod", which is responsible for writing the WAL. Two implementations exist, one that writes to a plain directory (which is also used by pg_receivexlog) and one that writes to a tar file with optional compression. Reviewed by Michael Paquier
This commit is contained in:
parent
1885c88459
commit
56c7d8d455
@ -180,7 +180,8 @@ PostgreSQL documentation
|
||||
target directory, the tar contents will be written to
|
||||
standard output, suitable for piping to for example
|
||||
<productname>gzip</productname>. This is only possible if
|
||||
the cluster has no additional tablespaces.
|
||||
the cluster has no additional tablespaces and transaction
|
||||
log streaming is not used.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
@ -323,6 +324,10 @@ PostgreSQL documentation
|
||||
If the log has been rotated when it's time to transfer it, the
|
||||
backup will fail and be unusable.
|
||||
</para>
|
||||
<para>
|
||||
The transaction log files will be written to
|
||||
the <filename>base.tar</filename> file.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
@ -339,6 +344,11 @@ PostgreSQL documentation
|
||||
client can keep up with transaction log received, using this mode
|
||||
requires no extra transaction logs to be saved on the master.
|
||||
</para>
|
||||
<para>
|
||||
The transaction log files are written to a separate file
|
||||
named <filename>pg_wal.tar</filename> (if the server is a version
|
||||
earlier than 10, the file will be named <filename>pg_xlog.tar</filename>).
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
@ -353,7 +363,8 @@ PostgreSQL documentation
|
||||
<para>
|
||||
Enables gzip compression of tar file output, with the default
|
||||
compression level. Compression is only available when using
|
||||
the tar format.
|
||||
the tar format, and the suffix <filename>.gz</filename> will
|
||||
automatically be added to all tar filenames.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
@ -366,7 +377,8 @@ PostgreSQL documentation
|
||||
Enables gzip compression of tar file output, and specifies the
|
||||
compression level (0 through 9, 0 being no compression and 9 being best
|
||||
compression). Compression is only available when using the tar
|
||||
format.
|
||||
format, and the suffix <filename>.gz</filename> will
|
||||
automatically be added to all tar filenames.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
|
||||
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
|
||||
LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
|
||||
|
||||
OBJS=receivelog.o streamutil.o $(WIN32RES)
|
||||
OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
|
||||
|
||||
all: pg_basebackup pg_receivexlog pg_recvlogical
|
||||
|
||||
|
@ -449,7 +449,7 @@ typedef struct
|
||||
{
|
||||
PGconn *bgconn;
|
||||
XLogRecPtr startptr;
|
||||
char xlogdir[MAXPGPATH];
|
||||
char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */
|
||||
char *sysidentifier;
|
||||
int timeline;
|
||||
} logstreamer_param;
|
||||
@ -470,9 +470,13 @@ LogStreamerMain(logstreamer_param *param)
|
||||
stream.synchronous = false;
|
||||
stream.do_sync = do_sync;
|
||||
stream.mark_done = true;
|
||||
stream.basedir = param->xlogdir;
|
||||
stream.partial_suffix = NULL;
|
||||
|
||||
if (format == 'p')
|
||||
stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
|
||||
else
|
||||
stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
|
||||
|
||||
if (!ReceiveXlogStream(param->bgconn, &stream))
|
||||
|
||||
/*
|
||||
@ -482,6 +486,14 @@ LogStreamerMain(logstreamer_param *param)
|
||||
*/
|
||||
return 1;
|
||||
|
||||
if (!stream.walmethod->finish())
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not finish writing WAL files: %s\n"),
|
||||
progname, strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
|
||||
PQfinish(param->bgconn);
|
||||
return 0;
|
||||
}
|
||||
@ -533,28 +545,32 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
|
||||
exit(1);
|
||||
|
||||
/* In post-10 cluster, pg_xlog has been renamed to pg_wal */
|
||||
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/%s",
|
||||
snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
|
||||
basedir,
|
||||
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
|
||||
"pg_xlog" : "pg_wal");
|
||||
|
||||
/*
|
||||
* Create pg_wal/archive_status or pg_xlog/archive_status (and thus
|
||||
* pg_wal or pg_xlog) depending on the target server so we can write to
|
||||
* basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
|
||||
* file may arrive later.
|
||||
*/
|
||||
snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
|
||||
basedir,
|
||||
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
|
||||
"pg_xlog" : "pg_wal");
|
||||
|
||||
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
|
||||
if (format == 'p')
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not create directory \"%s\": %s\n"),
|
||||
progname, statusdir, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
/*
|
||||
* Create pg_wal/archive_status or pg_xlog/archive_status (and thus
|
||||
* pg_wal or pg_xlog) depending on the target server so we can write to
|
||||
* basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
|
||||
* file may arrive later.
|
||||
*/
|
||||
snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
|
||||
basedir,
|
||||
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
|
||||
"pg_xlog" : "pg_wal");
|
||||
|
||||
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not create directory \"%s\": %s\n"),
|
||||
progname, statusdir, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2245,16 +2261,6 @@ main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (format != 'p' && streamwal)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: WAL streaming can only be used in plain mode\n"),
|
||||
progname);
|
||||
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
||||
progname);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (replication_slot && !streamwal)
|
||||
{
|
||||
fprintf(stderr,
|
||||
|
@ -338,11 +338,19 @@ StreamLog(void)
|
||||
stream.synchronous = synchronous;
|
||||
stream.do_sync = true;
|
||||
stream.mark_done = false;
|
||||
stream.basedir = basedir;
|
||||
stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
|
||||
stream.partial_suffix = ".partial";
|
||||
|
||||
ReceiveXlogStream(conn, &stream);
|
||||
|
||||
if (!stream.walmethod->finish())
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not finish writing WAL files: %s\n"),
|
||||
progname, strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
}
|
||||
|
@ -30,7 +30,7 @@
|
||||
|
||||
|
||||
/* fd and filename for currently open WAL file */
|
||||
static int walfile = -1;
|
||||
static Walfile *walfile = NULL;
|
||||
static char current_walfile_name[MAXPGPATH] = "";
|
||||
static bool reportFlushPosition = false;
|
||||
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
|
||||
@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
|
||||
uint32 *timeline);
|
||||
|
||||
static bool
|
||||
mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
|
||||
mark_file_as_archived(StreamCtl *stream, const char *fname)
|
||||
{
|
||||
int fd;
|
||||
Walfile *f;
|
||||
static char tmppath[MAXPGPATH];
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
|
||||
basedir, fname);
|
||||
snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
|
||||
fname);
|
||||
|
||||
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0)
|
||||
f = stream->walmethod->open_for_write(tmppath, NULL, 0);
|
||||
if (f == NULL)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
progname, tmppath, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
close(fd);
|
||||
|
||||
if (do_sync && fsync_fname(tmppath, false, progname) != 0)
|
||||
return false;
|
||||
|
||||
if (do_sync && fsync_parent_path(tmppath, progname) != 0)
|
||||
return false;
|
||||
stream->walmethod->close(f, CLOSE_NORMAL);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
|
||||
static bool
|
||||
open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
|
||||
{
|
||||
int f;
|
||||
Walfile *f;
|
||||
char fn[MAXPGPATH];
|
||||
struct stat statbuf;
|
||||
char *zerobuf;
|
||||
int bytes;
|
||||
ssize_t size;
|
||||
XLogSegNo segno;
|
||||
|
||||
XLByteToSeg(startpoint, segno);
|
||||
XLogFileName(current_walfile_name, stream->timeline, segno);
|
||||
|
||||
snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
|
||||
snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
|
||||
stream->partial_suffix ? stream->partial_suffix : "");
|
||||
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (f == -1)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not open transaction log file \"%s\": %s\n"),
|
||||
progname, fn, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Verify that the file is either empty (just created), or a complete
|
||||
* XLogSegSize segment. Anything in between indicates a corrupt file.
|
||||
* When streaming to files, if an existing file exists we verify that it's
|
||||
* either empty (just created), or a complete XLogSegSize segment (in
|
||||
* which case it has been created and padded). Anything else indicates a
|
||||
* corrupt file.
|
||||
*
|
||||
* When streaming to tar, no file with this name will exist before, so we
|
||||
* never have to verify a size.
|
||||
*/
|
||||
if (fstat(f, &statbuf) != 0)
|
||||
if (stream->walmethod->existsfile(fn))
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not stat transaction log file \"%s\": %s\n"),
|
||||
progname, fn, strerror(errno));
|
||||
close(f);
|
||||
return false;
|
||||
}
|
||||
if (statbuf.st_size == XLogSegSize)
|
||||
{
|
||||
/*
|
||||
* fsync, in case of a previous crash between padding and fsyncing the
|
||||
* file.
|
||||
*/
|
||||
if (stream->do_sync)
|
||||
size = stream->walmethod->get_file_size(fn);
|
||||
if (size < 0)
|
||||
{
|
||||
if (fsync_fname(fn, false, progname) != 0 ||
|
||||
fsync_parent_path(fn, progname) != 0)
|
||||
fprintf(stderr,
|
||||
_("%s: could not get size of transaction log file \"%s\": %s\n"),
|
||||
progname, fn, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
if (size == XLogSegSize)
|
||||
{
|
||||
/* Already padded file. Open it for use */
|
||||
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
|
||||
if (f == NULL)
|
||||
{
|
||||
/* error already printed */
|
||||
close(f);
|
||||
fprintf(stderr,
|
||||
_("%s: could not open existing transaction log file \"%s\": %s\n"),
|
||||
progname, fn, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
/* fsync file in case of a previous crash */
|
||||
if (!stream->walmethod->fsync(f))
|
||||
{
|
||||
stream->walmethod->close(f, CLOSE_UNLINK);
|
||||
return false;
|
||||
}
|
||||
|
||||
walfile = f;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* File is open and ready to use */
|
||||
walfile = f;
|
||||
return true;
|
||||
}
|
||||
if (statbuf.st_size != 0)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
|
||||
progname, fn, (int) statbuf.st_size, XLogSegSize);
|
||||
close(f);
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* New, empty, file. So pad it to 16Mb with zeroes. If we fail partway
|
||||
* through padding, we should attempt to unlink the file on failure, so as
|
||||
* not to leave behind a partially-filled file.
|
||||
*/
|
||||
zerobuf = pg_malloc0(XLOG_BLCKSZ);
|
||||
for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
|
||||
{
|
||||
errno = 0;
|
||||
if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
if (size != 0)
|
||||
{
|
||||
/* if write didn't set errno, assume problem is no disk space */
|
||||
if (errno == 0)
|
||||
errno = ENOSPC;
|
||||
fprintf(stderr,
|
||||
_("%s: could not pad transaction log file \"%s\": %s\n"),
|
||||
progname, fn, strerror(errno));
|
||||
free(zerobuf);
|
||||
close(f);
|
||||
unlink(fn);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
free(zerobuf);
|
||||
|
||||
/*
|
||||
* fsync WAL file and containing directory, to ensure the file is
|
||||
* persistently created and zeroed. That's particularly important when
|
||||
* using synchronous mode, where the file is modified and fsynced
|
||||
* in-place, without a directory fsync.
|
||||
*/
|
||||
if (stream->do_sync)
|
||||
{
|
||||
if (fsync_fname(fn, false, progname) != 0 ||
|
||||
fsync_parent_path(fn, progname) != 0)
|
||||
{
|
||||
/* error already printed */
|
||||
close(f);
|
||||
_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
|
||||
progname, fn, (int) size, XLogSegSize);
|
||||
return false;
|
||||
}
|
||||
/* File existed and was empty, so fall through and open */
|
||||
}
|
||||
|
||||
if (lseek(f, SEEK_SET, 0) != 0)
|
||||
/* No file existed, so create one */
|
||||
|
||||
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
|
||||
if (f == NULL)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
|
||||
progname, fn, strerror(errno));
|
||||
close(f);
|
||||
_("%s: could not open transaction log file \"%s\": %s\n"),
|
||||
progname, fn, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
/* File is open and ready to use */
|
||||
walfile = f;
|
||||
return true;
|
||||
}
|
||||
@ -223,59 +178,46 @@ static bool
|
||||
close_walfile(StreamCtl *stream, XLogRecPtr pos)
|
||||
{
|
||||
off_t currpos;
|
||||
int r;
|
||||
|
||||
if (walfile == -1)
|
||||
if (walfile == NULL)
|
||||
return true;
|
||||
|
||||
currpos = lseek(walfile, 0, SEEK_CUR);
|
||||
currpos = stream->walmethod->get_current_pos(walfile);
|
||||
if (currpos == -1)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not determine seek position in file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
close(walfile);
|
||||
walfile = -1;
|
||||
progname, current_walfile_name, stream->walmethod->getlasterror());
|
||||
stream->walmethod->close(walfile, CLOSE_UNLINK);
|
||||
walfile = NULL;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (stream->do_sync && fsync(walfile) != 0)
|
||||
if (stream->partial_suffix)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
close(walfile);
|
||||
walfile = -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (close(walfile) != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
walfile = -1;
|
||||
return false;
|
||||
}
|
||||
walfile = -1;
|
||||
|
||||
/*
|
||||
* If we finished writing a .partial file, rename it into place.
|
||||
*/
|
||||
if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
|
||||
{
|
||||
char oldfn[MAXPGPATH];
|
||||
char newfn[MAXPGPATH];
|
||||
|
||||
snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
|
||||
snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
|
||||
if (durable_rename(oldfn, newfn, progname) != 0)
|
||||
if (currpos == XLOG_SEG_SIZE)
|
||||
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
|
||||
else
|
||||
{
|
||||
/* durable_rename produced a log entry */
|
||||
return false;
|
||||
fprintf(stderr,
|
||||
_("%s: not renaming \"%s%s\", segment is not complete\n"),
|
||||
progname, current_walfile_name, stream->partial_suffix);
|
||||
r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
|
||||
}
|
||||
}
|
||||
else if (stream->partial_suffix)
|
||||
fprintf(stderr,
|
||||
_("%s: not renaming \"%s%s\", segment is not complete\n"),
|
||||
progname, current_walfile_name, stream->partial_suffix);
|
||||
else
|
||||
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
|
||||
|
||||
walfile = NULL;
|
||||
|
||||
if (r != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark file as archived if requested by the caller - pg_basebackup needs
|
||||
@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
|
||||
if (currpos == XLOG_SEG_SIZE && stream->mark_done)
|
||||
{
|
||||
/* writes error message if failed */
|
||||
if (!mark_file_as_archived(stream->basedir, current_walfile_name,
|
||||
stream->do_sync))
|
||||
if (!mark_file_as_archived(stream, current_walfile_name))
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
|
||||
static bool
|
||||
existsTimeLineHistoryFile(StreamCtl *stream)
|
||||
{
|
||||
char path[MAXPGPATH];
|
||||
char histfname[MAXFNAMELEN];
|
||||
int fd;
|
||||
|
||||
/*
|
||||
* Timeline 1 never has a history file. We treat that as if it existed,
|
||||
@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream)
|
||||
|
||||
TLHistoryFileName(histfname, stream->timeline);
|
||||
|
||||
snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
|
||||
|
||||
fd = open(path, O_RDONLY | PG_BINARY, 0);
|
||||
if (fd < 0)
|
||||
{
|
||||
if (errno != ENOENT)
|
||||
fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
|
||||
progname, path, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
close(fd);
|
||||
return true;
|
||||
}
|
||||
return stream->walmethod->existsfile(histfname);
|
||||
}
|
||||
|
||||
static bool
|
||||
writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
|
||||
{
|
||||
int size = strlen(content);
|
||||
char path[MAXPGPATH];
|
||||
char tmppath[MAXPGPATH];
|
||||
char histfname[MAXFNAMELEN];
|
||||
int fd;
|
||||
Walfile *f;
|
||||
|
||||
/*
|
||||
* Check that the server's idea of how timeline history files should be
|
||||
@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
|
||||
return false;
|
||||
}
|
||||
|
||||
snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
|
||||
|
||||
/*
|
||||
* Write into a temp file name.
|
||||
*/
|
||||
snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
|
||||
|
||||
unlink(tmppath);
|
||||
|
||||
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0)
|
||||
f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
|
||||
if (f == NULL)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
progname, histfname, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
if ((int) write(fd, content, size) != size)
|
||||
if ((int) stream->walmethod->write(f, content, size) != size)
|
||||
{
|
||||
int save_errno = errno;
|
||||
fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
|
||||
progname, histfname, stream->walmethod->getlasterror());
|
||||
|
||||
/*
|
||||
* If we fail to make the file, delete it to release disk space
|
||||
*/
|
||||
close(fd);
|
||||
unlink(tmppath);
|
||||
errno = save_errno;
|
||||
stream->walmethod->close(f, CLOSE_UNLINK);
|
||||
|
||||
fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (close(fd) != 0)
|
||||
if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
|
||||
progname, tmppath, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Now move the completed history file into place with its final name.
|
||||
*/
|
||||
if (durable_rename(tmppath, path, progname) < 0)
|
||||
{
|
||||
/* durable_rename produced a log entry */
|
||||
progname, histfname, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
|
||||
if (stream->mark_done)
|
||||
{
|
||||
/* writes error message if failed */
|
||||
if (!mark_file_as_archived(stream->basedir, histfname,
|
||||
stream->do_sync))
|
||||
if (!mark_file_as_archived(stream, histfname))
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
|
||||
{
|
||||
/*
|
||||
* Fetch the timeline history file for this timeline, if we don't have
|
||||
* it already.
|
||||
* it already. When streaming log to tar, this will always return
|
||||
* false, as we are never streaming into an existing file and
|
||||
* therefore there can be no pre-existing timeline history file.
|
||||
*/
|
||||
if (!existsTimeLineHistoryFile(stream))
|
||||
{
|
||||
@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
|
||||
}
|
||||
|
||||
error:
|
||||
if (walfile != -1 && close(walfile) != 0)
|
||||
if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
|
||||
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
walfile = -1;
|
||||
progname, current_walfile_name, stream->walmethod->getlasterror());
|
||||
walfile = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
|
||||
* If synchronous option is true, issue sync command as soon as there
|
||||
* are WAL data which has not been flushed yet.
|
||||
*/
|
||||
if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
|
||||
if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
|
||||
{
|
||||
if (stream->do_sync && fsync(walfile) != 0)
|
||||
if (stream->walmethod->fsync(walfile) != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
progname, current_walfile_name, stream->walmethod->getlasterror());
|
||||
goto error;
|
||||
}
|
||||
lastFlushPosition = blockpos;
|
||||
@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
if (replyRequested && still_sending)
|
||||
{
|
||||
if (reportFlushPosition && lastFlushPosition < blockpos &&
|
||||
walfile != -1)
|
||||
walfile != NULL)
|
||||
{
|
||||
/*
|
||||
* If a valid flush location needs to be reported, flush the
|
||||
@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
* data has been successfully replicated or not, at the normal
|
||||
* shutdown of the server.
|
||||
*/
|
||||
if (stream->do_sync && fsync(walfile) != 0)
|
||||
if (stream->walmethod->fsync(walfile) != 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
|
||||
progname, current_walfile_name, strerror(errno));
|
||||
progname, current_walfile_name, stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
lastFlushPosition = blockpos;
|
||||
@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
* Verify that the initial location in the stream matches where we think
|
||||
* we are.
|
||||
*/
|
||||
if (walfile == -1)
|
||||
if (walfile == NULL)
|
||||
{
|
||||
/* No file open yet */
|
||||
if (xlogoff != 0)
|
||||
@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
else
|
||||
{
|
||||
/* More data in existing segment */
|
||||
/* XXX: store seek value don't reseek all the time */
|
||||
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
|
||||
if (stream->walmethod->get_current_pos(walfile) != xlogoff)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: got WAL data offset %08x, expected %08x\n"),
|
||||
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
|
||||
progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
else
|
||||
bytes_to_write = bytes_left;
|
||||
|
||||
if (walfile == -1)
|
||||
if (walfile == NULL)
|
||||
{
|
||||
if (!open_walfile(stream, *blockpos))
|
||||
{
|
||||
@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
|
||||
}
|
||||
}
|
||||
|
||||
if (write(walfile,
|
||||
copybuf + hdr_len + bytes_written,
|
||||
bytes_to_write) != bytes_to_write)
|
||||
if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
|
||||
bytes_to_write) != bytes_to_write)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
|
||||
progname, bytes_to_write, current_walfile_name,
|
||||
strerror(errno));
|
||||
stream->walmethod->getlasterror());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
#define RECEIVELOG_H
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "walmethods.h"
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
|
||||
@ -41,7 +42,7 @@ typedef struct StreamCtl
|
||||
|
||||
stream_stop_callback stream_stop; /* Stop streaming when returns true */
|
||||
|
||||
char *basedir; /* Received segments written to this dir */
|
||||
WalWriteMethod *walmethod; /* How to write the WAL */
|
||||
char *partial_suffix; /* Suffix appended to partially received files */
|
||||
} StreamCtl;
|
||||
|
||||
|
@ -4,7 +4,7 @@ use Cwd;
|
||||
use Config;
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 67;
|
||||
use Test::More tests => 69;
|
||||
|
||||
program_help_ok('pg_basebackup');
|
||||
program_version_ok('pg_basebackup');
|
||||
@ -237,6 +237,10 @@ $node->command_ok(
|
||||
'pg_basebackup -X stream runs');
|
||||
ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
|
||||
'WAL files copied');
|
||||
$node->command_ok(
|
||||
[ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
|
||||
'pg_basebackup -X stream runs in tar mode');
|
||||
ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
|
||||
|
||||
$node->command_fails(
|
||||
[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
|
||||
|
886
src/bin/pg_basebackup/walmethods.c
Normal file
886
src/bin/pg_basebackup/walmethods.c
Normal file
@ -0,0 +1,886 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walmethods.c - implementations of different ways to write received wal
|
||||
*
|
||||
* NOTE! The caller must ensure that only one method is instantiated in
|
||||
* any given program, and that it's only instantiated once!
|
||||
*
|
||||
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/bin/pg_basebackup/walmethods.c
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres_fe.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#ifdef HAVE_LIBZ
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
|
||||
#include "pgtar.h"
|
||||
#include "common/file_utils.h"
|
||||
|
||||
#include "receivelog.h"
|
||||
#include "streamutil.h"
|
||||
|
||||
/* Size of zlib buffer for .tar.gz */
|
||||
#define ZLIB_OUT_SIZE 4096
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
* WalDirectoryMethod - write wal to a directory looking like pg_xlog
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Global static data for this method
|
||||
*/
|
||||
typedef struct DirectoryMethodData
|
||||
{
|
||||
char *basedir;
|
||||
bool sync;
|
||||
} DirectoryMethodData;
|
||||
static DirectoryMethodData *dir_data = NULL;
|
||||
|
||||
/*
|
||||
* Local file handle
|
||||
*/
|
||||
typedef struct DirectoryMethodFile
|
||||
{
|
||||
int fd;
|
||||
off_t currpos;
|
||||
char *pathname;
|
||||
char *fullpath;
|
||||
char *temp_suffix;
|
||||
} DirectoryMethodFile;
|
||||
|
||||
static char *
|
||||
dir_getlasterror(void)
|
||||
{
|
||||
/* Directory method always sets errno, so just use strerror */
|
||||
return strerror(errno);
|
||||
}
|
||||
|
||||
static Walfile
|
||||
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
||||
{
|
||||
static char tmppath[MAXPGPATH];
|
||||
int fd;
|
||||
DirectoryMethodFile *f;
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
|
||||
|
||||
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0)
|
||||
return NULL;
|
||||
|
||||
if (pad_to_size)
|
||||
{
|
||||
/* Always pre-pad on regular files */
|
||||
char *zerobuf;
|
||||
int bytes;
|
||||
|
||||
zerobuf = pg_malloc0(XLOG_BLCKSZ);
|
||||
for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
|
||||
{
|
||||
if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
pg_free(zerobuf);
|
||||
close(fd);
|
||||
errno = save_errno;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
pg_free(zerobuf);
|
||||
|
||||
if (lseek(fd, 0, SEEK_SET) != 0)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
close(fd);
|
||||
errno = save_errno;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* fsync WAL file and containing directory, to ensure the file is
|
||||
* persistently created and zeroed (if padded). That's particularly
|
||||
* important when using synchronous mode, where the file is modified and
|
||||
* fsynced in-place, without a directory fsync.
|
||||
*/
|
||||
if (dir_data->sync)
|
||||
{
|
||||
if (fsync_fname(tmppath, false, progname) != 0 ||
|
||||
fsync_parent_path(tmppath, progname) != 0)
|
||||
{
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
f = pg_malloc0(sizeof(DirectoryMethodFile));
|
||||
f->fd = fd;
|
||||
f->currpos = 0;
|
||||
f->pathname = pg_strdup(pathname);
|
||||
f->fullpath = pg_strdup(tmppath);
|
||||
if (temp_suffix)
|
||||
f->temp_suffix = pg_strdup(temp_suffix);
|
||||
|
||||
return f;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
dir_write(Walfile f, const void *buf, size_t count)
|
||||
{
|
||||
ssize_t r;
|
||||
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
|
||||
|
||||
Assert(f != NULL);
|
||||
|
||||
r = write(df->fd, buf, count);
|
||||
if (r > 0)
|
||||
df->currpos += r;
|
||||
return r;
|
||||
}
|
||||
|
||||
static off_t
|
||||
dir_get_current_pos(Walfile f)
|
||||
{
|
||||
Assert(f != NULL);
|
||||
|
||||
/* Use a cached value to prevent lots of reseeks */
|
||||
return ((DirectoryMethodFile *) f)->currpos;
|
||||
}
|
||||
|
||||
static int
|
||||
dir_close(Walfile f, WalCloseMethod method)
|
||||
{
|
||||
int r;
|
||||
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
|
||||
static char tmppath[MAXPGPATH];
|
||||
static char tmppath2[MAXPGPATH];
|
||||
|
||||
Assert(f != NULL);
|
||||
|
||||
r = close(df->fd);
|
||||
|
||||
if (r == 0)
|
||||
{
|
||||
/* Build path to the current version of the file */
|
||||
if (method == CLOSE_NORMAL && df->temp_suffix)
|
||||
{
|
||||
/*
|
||||
* If we have a temp prefix, normal operation is to rename the
|
||||
* file.
|
||||
*/
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, df->pathname, df->temp_suffix);
|
||||
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
|
||||
dir_data->basedir, df->pathname);
|
||||
r = durable_rename(tmppath, tmppath2, progname);
|
||||
}
|
||||
else if (method == CLOSE_UNLINK)
|
||||
{
|
||||
/* Unlink the file once it's closed */
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
|
||||
r = unlink(tmppath);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Else either CLOSE_NORMAL and no temp suffix, or
|
||||
* CLOSE_NO_RENAME. In this case, fsync the file and containing
|
||||
* directory if sync mode is requested.
|
||||
*/
|
||||
if (dir_data->sync)
|
||||
{
|
||||
r = fsync_fname(df->fullpath, false, progname);
|
||||
if (r == 0)
|
||||
r = fsync_parent_path(df->fullpath, progname);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pg_free(df->pathname);
|
||||
pg_free(df->fullpath);
|
||||
if (df->temp_suffix)
|
||||
pg_free(df->temp_suffix);
|
||||
pg_free(df);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
static int
|
||||
dir_fsync(Walfile f)
|
||||
{
|
||||
Assert(f != NULL);
|
||||
|
||||
if (!dir_data->sync)
|
||||
return 0;
|
||||
|
||||
return fsync(((DirectoryMethodFile *) f)->fd);
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
dir_get_file_size(const char *pathname)
|
||||
{
|
||||
struct stat statbuf;
|
||||
static char tmppath[MAXPGPATH];
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||
dir_data->basedir, pathname);
|
||||
|
||||
if (stat(tmppath, &statbuf) != 0)
|
||||
return -1;
|
||||
|
||||
return statbuf.st_size;
|
||||
}
|
||||
|
||||
static bool
|
||||
dir_existsfile(const char *pathname)
|
||||
{
|
||||
static char tmppath[MAXPGPATH];
|
||||
int fd;
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||
dir_data->basedir, pathname);
|
||||
|
||||
fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
|
||||
if (fd < 0)
|
||||
return false;
|
||||
close(fd);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool
|
||||
dir_finish(void)
|
||||
{
|
||||
if (dir_data->sync)
|
||||
{
|
||||
/*
|
||||
* Files are fsynced when they are closed, but we need to fsync the
|
||||
* directory entry here as well.
|
||||
*/
|
||||
if (fsync_fname(dir_data->basedir, true, progname) != 0)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
WalWriteMethod *
|
||||
CreateWalDirectoryMethod(const char *basedir, bool sync)
|
||||
{
|
||||
WalWriteMethod *method;
|
||||
|
||||
method = pg_malloc0(sizeof(WalWriteMethod));
|
||||
method->open_for_write = dir_open_for_write;
|
||||
method->write = dir_write;
|
||||
method->get_current_pos = dir_get_current_pos;
|
||||
method->get_file_size = dir_get_file_size;
|
||||
method->close = dir_close;
|
||||
method->fsync = dir_fsync;
|
||||
method->existsfile = dir_existsfile;
|
||||
method->finish = dir_finish;
|
||||
method->getlasterror = dir_getlasterror;
|
||||
|
||||
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
|
||||
dir_data->basedir = pg_strdup(basedir);
|
||||
dir_data->sync = sync;
|
||||
|
||||
return method;
|
||||
}
|
||||
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
* WalTarMethod - write wal to a tar file containing pg_xlog contents
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
typedef struct TarMethodFile
|
||||
{
|
||||
off_t ofs_start; /* Where does the *header* for this file start */
|
||||
off_t currpos;
|
||||
char header[512];
|
||||
char *pathname;
|
||||
size_t pad_to_size;
|
||||
} TarMethodFile;
|
||||
|
||||
typedef struct TarMethodData
|
||||
{
|
||||
char *tarfilename;
|
||||
int fd;
|
||||
int compression;
|
||||
bool sync;
|
||||
TarMethodFile *currentfile;
|
||||
char lasterror[1024];
|
||||
#ifdef HAVE_LIBZ
|
||||
z_streamp zp;
|
||||
void *zlibOut;
|
||||
#endif
|
||||
} TarMethodData;
|
||||
static TarMethodData *tar_data = NULL;
|
||||
|
||||
#define tar_clear_error() tar_data->lasterror[0] = '\0'
|
||||
#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
|
||||
|
||||
static char *
|
||||
tar_getlasterror(void)
|
||||
{
|
||||
/*
|
||||
* If a custom error is set, return that one. Otherwise, assume errno is
|
||||
* set and return that one.
|
||||
*/
|
||||
if (tar_data->lasterror[0])
|
||||
return tar_data->lasterror;
|
||||
return strerror(errno);
|
||||
}
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
static bool
|
||||
tar_write_compressed_data(void *buf, size_t count, bool flush)
|
||||
{
|
||||
tar_data->zp->next_in = buf;
|
||||
tar_data->zp->avail_in = count;
|
||||
|
||||
while (tar_data->zp->avail_in || flush)
|
||||
{
|
||||
int r;
|
||||
|
||||
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
|
||||
if (r == Z_STREAM_ERROR)
|
||||
{
|
||||
tar_set_error("deflate failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
|
||||
{
|
||||
size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
|
||||
|
||||
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
|
||||
return false;
|
||||
|
||||
tar_data->zp->next_out = tar_data->zlibOut;
|
||||
tar_data->zp->avail_out = ZLIB_OUT_SIZE;
|
||||
}
|
||||
|
||||
if (r == Z_STREAM_END)
|
||||
break;
|
||||
}
|
||||
|
||||
if (flush)
|
||||
{
|
||||
/* Reset the stream for writing */
|
||||
if (deflateReset(tar_data->zp) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateReset failed");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
static ssize_t
|
||||
tar_write(Walfile f, const void *buf, size_t count)
|
||||
{
|
||||
ssize_t r;
|
||||
|
||||
Assert(f != NULL);
|
||||
tar_clear_error();
|
||||
|
||||
/* Tarfile will always be positioned at the end */
|
||||
if (!tar_data->compression)
|
||||
{
|
||||
r = write(tar_data->fd, buf, count);
|
||||
if (r > 0)
|
||||
((TarMethodFile *) f)->currpos += r;
|
||||
return r;
|
||||
}
|
||||
#ifdef HAVE_LIBZ
|
||||
else
|
||||
{
|
||||
if (!tar_write_compressed_data((void *) buf, count, false))
|
||||
return -1;
|
||||
((TarMethodFile *) f)->currpos += count;
|
||||
return count;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static bool
|
||||
tar_write_padding_data(TarMethodFile * f, size_t bytes)
|
||||
{
|
||||
char *zerobuf = pg_malloc0(XLOG_BLCKSZ);
|
||||
size_t bytesleft = bytes;
|
||||
|
||||
while (bytesleft)
|
||||
{
|
||||
size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
|
||||
|
||||
size_t r = tar_write(f, zerobuf, bytestowrite);
|
||||
|
||||
if (r < 0)
|
||||
return false;
|
||||
bytesleft -= r;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static Walfile
|
||||
tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
|
||||
{
|
||||
int save_errno;
|
||||
static char tmppath[MAXPGPATH];
|
||||
|
||||
tar_clear_error();
|
||||
|
||||
if (tar_data->fd < 0)
|
||||
{
|
||||
/*
|
||||
* We open the tar file only when we first try to write to it.
|
||||
*/
|
||||
tar_data->fd = open(tar_data->tarfilename,
|
||||
O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (tar_data->fd < 0)
|
||||
return NULL;
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
if (tar_data->compression)
|
||||
{
|
||||
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
|
||||
tar_data->zp->zalloc = Z_NULL;
|
||||
tar_data->zp->zfree = Z_NULL;
|
||||
tar_data->zp->opaque = Z_NULL;
|
||||
tar_data->zp->next_out = tar_data->zlibOut;
|
||||
tar_data->zp->avail_out = ZLIB_OUT_SIZE;
|
||||
|
||||
/*
|
||||
* Initialize deflation library. Adding the magic value 16 to the
|
||||
* default 15 for the windowBits parameter makes the output be
|
||||
* gzip instead of zlib.
|
||||
*/
|
||||
if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
|
||||
{
|
||||
pg_free(tar_data->zp);
|
||||
tar_data->zp = NULL;
|
||||
tar_set_error("deflateInit2 failed");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* There's no tar header itself, the file starts with regular files */
|
||||
}
|
||||
|
||||
Assert(tar_data->currentfile == NULL);
|
||||
if (tar_data->currentfile != NULL)
|
||||
{
|
||||
tar_set_error("implementation error: tar files can't have more than one open file\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s%s",
|
||||
pathname, temp_suffix ? temp_suffix : "");
|
||||
|
||||
/* Create a header with size set to 0 - we will fill out the size on close */
|
||||
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
|
||||
{
|
||||
pg_free(tar_data->currentfile);
|
||||
tar_data->currentfile = NULL;
|
||||
tar_set_error("could not create tar header");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
if (tar_data->compression)
|
||||
{
|
||||
/* Flush existing data */
|
||||
if (!tar_write_compressed_data(NULL, 0, true))
|
||||
return NULL;
|
||||
|
||||
/* Turn off compression for header */
|
||||
if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateParams failed");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
|
||||
if (tar_data->currentfile->ofs_start == -1)
|
||||
{
|
||||
save_errno = errno;
|
||||
pg_free(tar_data->currentfile);
|
||||
tar_data->currentfile = NULL;
|
||||
errno = save_errno;
|
||||
return NULL;
|
||||
}
|
||||
tar_data->currentfile->currpos = 0;
|
||||
|
||||
if (!tar_data->compression)
|
||||
{
|
||||
if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
|
||||
{
|
||||
save_errno = errno;
|
||||
pg_free(tar_data->currentfile);
|
||||
tar_data->currentfile = NULL;
|
||||
errno = save_errno;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#ifdef HAVE_LIBZ
|
||||
else
|
||||
{
|
||||
/* Write header through the zlib APIs but with no compression */
|
||||
if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
|
||||
return NULL;
|
||||
|
||||
/* Re-enable compression for the rest of the file */
|
||||
if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateParams failed");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
tar_data->currentfile->pathname = pg_strdup(pathname);
|
||||
|
||||
/*
|
||||
* Uncompressed files are padded on creation, but for compression we can't
|
||||
* do that
|
||||
*/
|
||||
if (pad_to_size)
|
||||
{
|
||||
tar_data->currentfile->pad_to_size = pad_to_size;
|
||||
if (!tar_data->compression)
|
||||
{
|
||||
/* Uncompressed, so pad now */
|
||||
tar_write_padding_data(tar_data->currentfile, pad_to_size);
|
||||
/* Seek back to start */
|
||||
if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
|
||||
return NULL;
|
||||
|
||||
tar_data->currentfile->currpos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return tar_data->currentfile;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
tar_get_file_size(const char *pathname)
|
||||
{
|
||||
tar_clear_error();
|
||||
|
||||
/* Currently not used, so not supported */
|
||||
errno = ENOSYS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static off_t
|
||||
tar_get_current_pos(Walfile f)
|
||||
{
|
||||
Assert(f != NULL);
|
||||
tar_clear_error();
|
||||
|
||||
return ((TarMethodFile *) f)->currpos;
|
||||
}
|
||||
|
||||
static int
|
||||
tar_fsync(Walfile f)
|
||||
{
|
||||
Assert(f != NULL);
|
||||
tar_clear_error();
|
||||
|
||||
/*
|
||||
* Always sync the whole tarfile, because that's all we can do. This makes
|
||||
* no sense on compressed files, so just ignore those.
|
||||
*/
|
||||
if (tar_data->compression)
|
||||
return 0;
|
||||
|
||||
return fsync(tar_data->fd);
|
||||
}
|
||||
|
||||
static int
|
||||
tar_close(Walfile f, WalCloseMethod method)
|
||||
{
|
||||
ssize_t filesize;
|
||||
int padding;
|
||||
TarMethodFile *tf = (TarMethodFile *) f;
|
||||
|
||||
Assert(f != NULL);
|
||||
tar_clear_error();
|
||||
|
||||
if (method == CLOSE_UNLINK)
|
||||
{
|
||||
if (tar_data->compression)
|
||||
{
|
||||
tar_set_error("unlink not supported with compression");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Unlink the file that we just wrote to the tar. We do this by
|
||||
* truncating it to the start of the header. This is safe as we only
|
||||
* allow writing of the very last file.
|
||||
*/
|
||||
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
|
||||
return -1;
|
||||
|
||||
pg_free(tf->pathname);
|
||||
pg_free(tf);
|
||||
tar_data->currentfile = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Pad the file itself with zeroes if necessary. Note that this is
|
||||
* different from the tar format padding -- this is the padding we asked
|
||||
* for when the file was opened.
|
||||
*/
|
||||
if (tf->pad_to_size)
|
||||
{
|
||||
if (tar_data->compression)
|
||||
{
|
||||
/*
|
||||
* A compressed tarfile is padded on close since we cannot know
|
||||
* the size of the compressed output until the end.
|
||||
*/
|
||||
size_t sizeleft = tf->pad_to_size - tf->currpos;
|
||||
|
||||
if (sizeleft)
|
||||
{
|
||||
if (!tar_write_padding_data(tf, sizeleft))
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* An uncompressed tarfile was padded on creation, so just adjust
|
||||
* the current position as if we seeked to the end.
|
||||
*/
|
||||
tf->currpos = tf->pad_to_size;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the size of the file, and pad the current data up to the nearest
|
||||
* 512 byte boundary.
|
||||
*/
|
||||
filesize = tar_get_current_pos(f);
|
||||
padding = ((filesize + 511) & ~511) - filesize;
|
||||
if (padding)
|
||||
{
|
||||
char zerobuf[512];
|
||||
|
||||
MemSet(zerobuf, 0, padding);
|
||||
if (tar_write(f, zerobuf, padding) != padding)
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
if (tar_data->compression)
|
||||
{
|
||||
/* Flush the current buffer */
|
||||
if (!tar_write_compressed_data(NULL, 0, true))
|
||||
{
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Now go back and update the header with the correct filesize and
|
||||
* possibly also renaming the file. We overwrite the entire current header
|
||||
* when done, including the checksum.
|
||||
*/
|
||||
print_tar_number(&(tf->header[124]), 12, filesize);
|
||||
|
||||
if (method == CLOSE_NORMAL)
|
||||
|
||||
/*
|
||||
* We overwrite it with what it was before if we have no tempname,
|
||||
* since we're going to write the buffer anyway.
|
||||
*/
|
||||
strlcpy(&(tf->header[0]), tf->pathname, 100);
|
||||
|
||||
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
|
||||
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
|
||||
return -1;
|
||||
if (!tar_data->compression)
|
||||
{
|
||||
if (write(tar_data->fd, tf->header, 512) != 512)
|
||||
return -1;
|
||||
}
|
||||
#ifdef HAVE_LIBZ
|
||||
else
|
||||
{
|
||||
/* Turn off compression */
|
||||
if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateParams failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Overwrite the header, assuming the size will be the same */
|
||||
if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
|
||||
return -1;
|
||||
|
||||
/* Turn compression back on */
|
||||
if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateParams failed");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Move file pointer back down to end, so we can write the next file */
|
||||
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
|
||||
return -1;
|
||||
|
||||
/* Always fsync on close, so the padding gets fsynced */
|
||||
tar_fsync(f);
|
||||
|
||||
/* Clean up and done */
|
||||
pg_free(tf->pathname);
|
||||
pg_free(tf);
|
||||
tar_data->currentfile = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool
|
||||
tar_existsfile(const char *pathname)
|
||||
{
|
||||
tar_clear_error();
|
||||
/* We only deal with new tarfiles, so nothing externally created exists */
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool
|
||||
tar_finish(void)
|
||||
{
|
||||
char zerobuf[1024];
|
||||
|
||||
tar_clear_error();
|
||||
|
||||
if (tar_data->currentfile)
|
||||
{
|
||||
if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
|
||||
return false;
|
||||
}
|
||||
|
||||
/* A tarfile always ends with two empty blocks */
|
||||
MemSet(zerobuf, 0, sizeof(zerobuf));
|
||||
if (!tar_data->compression)
|
||||
{
|
||||
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
|
||||
return false;
|
||||
}
|
||||
#ifdef HAVE_LIBZ
|
||||
else
|
||||
{
|
||||
if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
|
||||
return false;
|
||||
|
||||
/* Also flush all data to make sure the gzip stream is finished */
|
||||
tar_data->zp->next_in = NULL;
|
||||
tar_data->zp->avail_in = 0;
|
||||
while (true)
|
||||
{
|
||||
int r;
|
||||
|
||||
r = deflate(tar_data->zp, Z_FINISH);
|
||||
|
||||
if (r == Z_STREAM_ERROR)
|
||||
{
|
||||
tar_set_error("deflate failed");
|
||||
return false;
|
||||
}
|
||||
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
|
||||
{
|
||||
size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
|
||||
|
||||
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
|
||||
return false;
|
||||
}
|
||||
if (r == Z_STREAM_END)
|
||||
break;
|
||||
}
|
||||
|
||||
if (deflateEnd(tar_data->zp) != Z_OK)
|
||||
{
|
||||
tar_set_error("deflateEnd failed");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* sync the empty blocks as well, since they're after the last file */
|
||||
fsync(tar_data->fd);
|
||||
|
||||
if (close(tar_data->fd) != 0)
|
||||
return false;
|
||||
|
||||
tar_data->fd = -1;
|
||||
|
||||
if (tar_data->sync)
|
||||
{
|
||||
if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
|
||||
return false;
|
||||
if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
WalWriteMethod *
|
||||
CreateWalTarMethod(const char *tarbase, int compression, bool sync)
|
||||
{
|
||||
WalWriteMethod *method;
|
||||
const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
|
||||
|
||||
method = pg_malloc0(sizeof(WalWriteMethod));
|
||||
method->open_for_write = tar_open_for_write;
|
||||
method->write = tar_write;
|
||||
method->get_current_pos = tar_get_current_pos;
|
||||
method->get_file_size = tar_get_file_size;
|
||||
method->close = tar_close;
|
||||
method->fsync = tar_fsync;
|
||||
method->existsfile = tar_existsfile;
|
||||
method->finish = tar_finish;
|
||||
method->getlasterror = tar_getlasterror;
|
||||
|
||||
tar_data = pg_malloc0(sizeof(TarMethodData));
|
||||
tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
|
||||
sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
|
||||
tar_data->fd = -1;
|
||||
tar_data->compression = compression;
|
||||
tar_data->sync = sync;
|
||||
if (compression)
|
||||
tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
|
||||
|
||||
return method;
|
||||
}
|
45
src/bin/pg_basebackup/walmethods.h
Normal file
45
src/bin/pg_basebackup/walmethods.h
Normal file
@ -0,0 +1,45 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walmethods.h
|
||||
*
|
||||
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/bin/pg_basebackup/walmethods.h
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
typedef void *Walfile;
|
||||
|
||||
typedef enum
|
||||
{
|
||||
CLOSE_NORMAL,
|
||||
CLOSE_UNLINK,
|
||||
CLOSE_NO_RENAME,
|
||||
} WalCloseMethod;
|
||||
|
||||
typedef struct WalWriteMethod WalWriteMethod;
|
||||
struct WalWriteMethod
|
||||
{
|
||||
Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
|
||||
int (*close) (Walfile f, WalCloseMethod method);
|
||||
bool (*existsfile) (const char *pathname);
|
||||
ssize_t (*get_file_size) (const char *pathname);
|
||||
|
||||
ssize_t (*write) (Walfile f, const void *buf, size_t count);
|
||||
off_t (*get_current_pos) (Walfile f);
|
||||
int (*fsync) (Walfile f);
|
||||
bool (*finish) (void);
|
||||
char *(*getlasterror) (void);
|
||||
};
|
||||
|
||||
/*
|
||||
* Available WAL methods:
|
||||
* - WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
|
||||
* - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
|
||||
* (only implements the methods required for pg_basebackup,
|
||||
* not all those required for pg_receivexlog)
|
||||
*/
|
||||
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
|
||||
WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
|
@ -22,4 +22,5 @@ enum tarError
|
||||
extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
|
||||
pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
|
||||
extern uint64 read_tar_number(const char *s, int len);
|
||||
extern void print_tar_number(char *s, int len, uint64 val);
|
||||
extern int tarChecksum(char *header);
|
||||
|
@ -16,7 +16,7 @@
|
||||
* support only non-negative numbers, so we don't worry about the GNU rules
|
||||
* for handling negative numbers.)
|
||||
*/
|
||||
static void
|
||||
void
|
||||
print_tar_number(char *s, int len, uint64 val)
|
||||
{
|
||||
if (val < (((uint64) 1) << ((len - 1) * 3)))
|
||||
|
Loading…
Reference in New Issue
Block a user