mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-03-07 19:47:50 +08:00
Add compression support to pg_receivexlog
Author: Michael Paquier, review and small changes by me
This commit is contained in:
parent
974ece58bb
commit
cada1af31d
@ -180,6 +180,19 @@ PostgreSQL documentation
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
|
||||
<term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
|
||||
<listitem>
|
||||
<para>
|
||||
Enables gzip compression of transaction logs, and specifies the
|
||||
compression level (0 through 9, 0 being no compression and 9 being best
|
||||
compression). The suffix <filename>.gz</filename> will
|
||||
automatically be added to all filenames.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
</variablelist>
|
||||
|
||||
<para>
|
||||
|
@ -494,7 +494,7 @@ LogStreamerMain(logstreamer_param *param)
|
||||
stream.replication_slot = psprintf("pg_basebackup_%d", (int) getpid());
|
||||
|
||||
if (format == 'p')
|
||||
stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
|
||||
stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
|
||||
else
|
||||
stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
|
||||
|
||||
|
@ -34,6 +34,7 @@
|
||||
/* Global options */
|
||||
static char *basedir = NULL;
|
||||
static int verbose = 0;
|
||||
static int compresslevel = 0;
|
||||
static int noloop = 0;
|
||||
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
||||
static volatile bool time_to_abort = false;
|
||||
@ -58,6 +59,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
|
||||
exit(code); \
|
||||
}
|
||||
|
||||
/* Routines to evaluate segment file format */
|
||||
#define IsCompressXLogFileName(fname) \
|
||||
(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
|
||||
strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
|
||||
strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
|
||||
#define IsPartialCompressXLogFileName(fname) \
|
||||
(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
|
||||
strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \
|
||||
strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
|
||||
|
||||
static void
|
||||
usage(void)
|
||||
@ -76,6 +86,7 @@ usage(void)
|
||||
printf(_(" --synchronous flush transaction log immediately after writing\n"));
|
||||
printf(_(" -v, --verbose output verbose messages\n"));
|
||||
printf(_(" -V, --version output version information, then exit\n"));
|
||||
printf(_(" -Z, --compress=0-9 compress logs with given compression level\n"));
|
||||
printf(_(" -?, --help show this help, then exit\n"));
|
||||
printf(_("\nConnection options:\n"));
|
||||
printf(_(" -d, --dbname=CONNSTR connection string\n"));
|
||||
@ -188,14 +199,31 @@ FindStreamingStart(uint32 *tli)
|
||||
uint32 tli;
|
||||
XLogSegNo segno;
|
||||
bool ispartial;
|
||||
bool iscompress;
|
||||
|
||||
/*
|
||||
* Check if the filename looks like an xlog file, or a .partial file.
|
||||
*/
|
||||
if (IsXLogFileName(dirent->d_name))
|
||||
{
|
||||
ispartial = false;
|
||||
iscompress = false;
|
||||
}
|
||||
else if (IsPartialXLogFileName(dirent->d_name))
|
||||
{
|
||||
ispartial = true;
|
||||
iscompress = false;
|
||||
}
|
||||
else if (IsCompressXLogFileName(dirent->d_name))
|
||||
{
|
||||
ispartial = false;
|
||||
iscompress = true;
|
||||
}
|
||||
else if (IsPartialCompressXLogFileName(dirent->d_name))
|
||||
{
|
||||
ispartial = true;
|
||||
iscompress = true;
|
||||
}
|
||||
else
|
||||
continue;
|
||||
|
||||
@ -206,9 +234,15 @@ FindStreamingStart(uint32 *tli)
|
||||
|
||||
/*
|
||||
* Check that the segment has the right size, if it's supposed to be
|
||||
* completed.
|
||||
* completed. For non-compressed segments just check the on-disk size
|
||||
* and see if it matches a completed segment.
|
||||
* For compressed segments, look at the last 4 bytes of the compressed
|
||||
* file, which is where the uncompressed size is located for gz files
|
||||
* with a size lower than 4GB, and then compare it to the size of a
|
||||
* completed segment. The 4 last bytes correspond to the ISIZE member
|
||||
* according to http://www.zlib.org/rfc-gzip.html.
|
||||
*/
|
||||
if (!ispartial)
|
||||
if (!ispartial && !iscompress)
|
||||
{
|
||||
struct stat statbuf;
|
||||
char fullpath[MAXPGPATH];
|
||||
@ -229,6 +263,47 @@ FindStreamingStart(uint32 *tli)
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (!ispartial && iscompress)
|
||||
{
|
||||
int fd;
|
||||
char buf[4];
|
||||
int bytes_out;
|
||||
char fullpath[MAXPGPATH];
|
||||
|
||||
snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
|
||||
|
||||
fd = open(fullpath, O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
|
||||
progname, fullpath, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
|
||||
{
|
||||
fprintf(stderr, _("%s: could not seek compressed file \"%s\": %s\n"),
|
||||
progname, fullpath, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
|
||||
{
|
||||
fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
|
||||
progname, fullpath, strerror(errno));
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
|
||||
close(fd);
|
||||
bytes_out = (buf[3] << 24) | (buf[2] << 16) |
|
||||
(buf[1] << 8) | buf[0];
|
||||
|
||||
if (bytes_out != XLOG_SEG_SIZE)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"),
|
||||
progname, dirent->d_name, bytes_out);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* Looks like a valid segment. Remember that we saw it. */
|
||||
if ((segno > high_segno) ||
|
||||
@ -339,7 +414,8 @@ StreamLog(void)
|
||||
stream.synchronous = synchronous;
|
||||
stream.do_sync = true;
|
||||
stream.mark_done = false;
|
||||
stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
|
||||
stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
|
||||
stream.do_sync);
|
||||
stream.partial_suffix = ".partial";
|
||||
stream.replication_slot = replication_slot;
|
||||
stream.temp_slot = false;
|
||||
@ -392,6 +468,7 @@ main(int argc, char **argv)
|
||||
{"status-interval", required_argument, NULL, 's'},
|
||||
{"slot", required_argument, NULL, 'S'},
|
||||
{"verbose", no_argument, NULL, 'v'},
|
||||
{"compress", required_argument, NULL, 'Z'},
|
||||
/* action */
|
||||
{"create-slot", no_argument, NULL, 1},
|
||||
{"drop-slot", no_argument, NULL, 2},
|
||||
@ -422,7 +499,7 @@ main(int argc, char **argv)
|
||||
}
|
||||
}
|
||||
|
||||
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
|
||||
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
|
||||
long_options, &option_index)) != -1)
|
||||
{
|
||||
switch (c)
|
||||
@ -472,6 +549,15 @@ main(int argc, char **argv)
|
||||
case 'v':
|
||||
verbose++;
|
||||
break;
|
||||
case 'Z':
|
||||
compresslevel = atoi(optarg);
|
||||
if (compresslevel < 0 || compresslevel > 9)
|
||||
{
|
||||
fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
|
||||
progname, optarg);
|
||||
exit(1);
|
||||
}
|
||||
break;
|
||||
/* action */
|
||||
case 1:
|
||||
do_create_slot = true;
|
||||
@ -538,6 +624,16 @@ main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
#ifndef HAVE_LIBZ
|
||||
if (compresslevel != 0)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: this build does not support compression\n"),
|
||||
progname);
|
||||
exit(1);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Check existence of destination folder.
|
||||
*/
|
||||
|
@ -41,6 +41,7 @@
|
||||
typedef struct DirectoryMethodData
|
||||
{
|
||||
char *basedir;
|
||||
int compression;
|
||||
bool sync;
|
||||
} DirectoryMethodData;
|
||||
static DirectoryMethodData *dir_data = NULL;
|
||||
@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
|
||||
char *pathname;
|
||||
char *fullpath;
|
||||
char *temp_suffix;
|
||||
#ifdef HAVE_LIBZ
|
||||
gzFile gzfp;
|
||||
#endif
|
||||
} DirectoryMethodFile;
|
||||
|
||||
static char *
|
||||
@ -70,17 +74,47 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
static char tmppath[MAXPGPATH];
|
||||
int fd;
|
||||
DirectoryMethodFile *f;
|
||||
#ifdef HAVE_LIBZ
|
||||
gzFile gzfp = NULL;
|
||||
#endif
|
||||
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
||||
dir_data->basedir, pathname,
|
||||
dir_data->compression > 0 ? ".gz" : "",
|
||||
temp_suffix ? temp_suffix : "");
|
||||
|
||||
/*
|
||||
* Open a file for non-compressed as well as compressed files. Tracking
|
||||
* the file descriptor is important for dir_sync() method as gzflush()
|
||||
* does not do any system calls to fsync() to make changes permanent on
|
||||
* disk.
|
||||
*/
|
||||
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0)
|
||||
return NULL;
|
||||
|
||||
if (pad_to_size)
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
{
|
||||
gzfp = gzdopen(fd, "wb");
|
||||
if (gzfp == NULL)
|
||||
{
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (gzsetparams(gzfp, dir_data->compression,
|
||||
Z_DEFAULT_STRATEGY) != Z_OK)
|
||||
{
|
||||
gzclose(gzfp);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Do pre-padding on non-compressed files */
|
||||
if (pad_to_size && dir_data->compression == 0)
|
||||
{
|
||||
/* Always pre-pad on regular files */
|
||||
char *zerobuf;
|
||||
int bytes;
|
||||
|
||||
@ -120,12 +154,21 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
if (fsync_fname(tmppath, false, progname) != 0 ||
|
||||
fsync_parent_path(tmppath, progname) != 0)
|
||||
{
|
||||
close(fd);
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
gzclose(gzfp);
|
||||
else
|
||||
#endif
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
f = pg_malloc0(sizeof(DirectoryMethodFile));
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
f->gzfp = gzfp;
|
||||
#endif
|
||||
f->fd = fd;
|
||||
f->currpos = 0;
|
||||
f->pathname = pg_strdup(pathname);
|
||||
@ -144,7 +187,12 @@ dir_write(Walfile f, const void *buf, size_t count)
|
||||
|
||||
Assert(f != NULL);
|
||||
|
||||
r = write(df->fd, buf, count);
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
r = (ssize_t) gzwrite(df->gzfp, buf, count);
|
||||
else
|
||||
#endif
|
||||
r = write(df->fd, buf, count);
|
||||
if (r > 0)
|
||||
df->currpos += r;
|
||||
return r;
|
||||
@ -169,7 +217,12 @@ dir_close(Walfile f, WalCloseMethod method)
|
||||
|
||||
Assert(f != NULL);
|
||||
|
||||
r = close(df->fd);
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
r = gzclose(df->gzfp);
|
||||
else
|
||||
#endif
|
||||
r = close(df->fd);
|
||||
|
||||
if (r == 0)
|
||||
{
|
||||
@ -180,17 +233,22 @@ dir_close(Walfile f, WalCloseMethod method)
|
||||
* If we have a temp prefix, normal operation is to rename the
|
||||
* file.
|
||||
*/
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, df->pathname, df->temp_suffix);
|
||||
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
|
||||
dir_data->basedir, df->pathname);
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
||||
dir_data->basedir, df->pathname,
|
||||
dir_data->compression > 0 ? ".gz" : "",
|
||||
df->temp_suffix);
|
||||
snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
|
||||
dir_data->basedir, df->pathname,
|
||||
dir_data->compression > 0 ? ".gz" : "");
|
||||
r = durable_rename(tmppath, tmppath2, progname);
|
||||
}
|
||||
else if (method == CLOSE_UNLINK)
|
||||
{
|
||||
/* Unlink the file once it's closed */
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
|
||||
dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
|
||||
dir_data->basedir, df->pathname,
|
||||
dir_data->compression > 0 ? ".gz" : "",
|
||||
df->temp_suffix ? df->temp_suffix : "");
|
||||
r = unlink(tmppath);
|
||||
}
|
||||
else
|
||||
@ -226,6 +284,14 @@ dir_sync(Walfile f)
|
||||
if (!dir_data->sync)
|
||||
return 0;
|
||||
|
||||
#ifdef HAVE_LIBZ
|
||||
if (dir_data->compression > 0)
|
||||
{
|
||||
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
return fsync(((DirectoryMethodFile *) f)->fd);
|
||||
}
|
||||
|
||||
@ -277,7 +343,7 @@ dir_finish(void)
|
||||
|
||||
|
||||
WalWriteMethod *
|
||||
CreateWalDirectoryMethod(const char *basedir, bool sync)
|
||||
CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
|
||||
{
|
||||
WalWriteMethod *method;
|
||||
|
||||
@ -293,6 +359,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
|
||||
method->getlasterror = dir_getlasterror;
|
||||
|
||||
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
|
||||
dir_data->compression = compression;
|
||||
dir_data->basedir = pg_strdup(basedir);
|
||||
dir_data->sync = sync;
|
||||
|
||||
|
@ -41,7 +41,8 @@ struct WalWriteMethod
|
||||
* (only implements the methods required for pg_basebackup,
|
||||
* not all those required for pg_receivexlog)
|
||||
*/
|
||||
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
|
||||
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
|
||||
int compression, bool sync);
|
||||
WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
|
||||
|
||||
/* Cleanup routines for previously-created methods */
|
||||
|
Loading…
Reference in New Issue
Block a user