diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index fd920fc197..4f5ac6df1a 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -35,10 +35,16 @@ OBJS = \ streamutil.o \ walmethods.o +BBOBJS = \ + pg_basebackup.o \ + bbstreamer_file.o \ + bbstreamer_inject.o \ + bbstreamer_tar.o + all: pg_basebackup pg_receivewal pg_recvlogical -pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils - $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils + $(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils $(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) @@ -61,7 +67,7 @@ uninstall: clean distclean maintainer-clean: rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \ - pg_basebackup.o pg_receivewal.o pg_recvlogical.o \ + $(BBOBJS) pg_receivewal.o pg_recvlogical.o \ $(OBJS) rm -rf tmp_check diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h new file mode 100644 index 0000000000..b24dc848c1 --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -0,0 +1,217 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer.h + * + * Each tar archive returned by the server is passed to one or more + * bbstreamer objects for further processing. The bbstreamer may do + * something simple, like write the archive to a file, perhaps after + * compressing it, but it can also do more complicated things, like + * annotating the byte stream to indicate which parts of the data + * correspond to tar headers or trailing padding, vs. which parts are + * payload data. A subsequent bbstreamer may use this information to + * make further decisions about how to process the data; for example, + * it might choose to modify the archive contents. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer.h + *------------------------------------------------------------------------- + */ + +#ifndef BBSTREAMER_H +#define BBSTREAMER_H + +#include "lib/stringinfo.h" +#include "pqexpbuffer.h" + +struct bbstreamer; +struct bbstreamer_ops; +typedef struct bbstreamer bbstreamer; +typedef struct bbstreamer_ops bbstreamer_ops; + +/* + * Each chunk of archive data passed to a bbstreamer is classified into one + * of these categories. When data is first received from the remote server, + * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will + * be of whatever size the remote server chose to send. + * + * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all + * chunks should be labelled as one of the other types listed here. In + * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and + * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if + * that means a zero-length call. There can be any number of + * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There + * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the + * last BBSTREAMER_MEMBER_TRAILER chunk. + * + * In theory, we could need other classifications here, such as a way of + * indicating an archive header, but the "tar" format doesn't need anything + * else, so for the time being there's no point. + */ +typedef enum +{ + BBSTREAMER_UNKNOWN, + BBSTREAMER_MEMBER_HEADER, + BBSTREAMER_MEMBER_CONTENTS, + BBSTREAMER_MEMBER_TRAILER, + BBSTREAMER_ARCHIVE_TRAILER +} bbstreamer_archive_context; + +/* + * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER, + * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also + * pass a pointer to an instance of this struct. The details are expected + * to be present in the archive header and used to fill the struct, after + * which all subsequent calls for the same archive member are expected to + * pass the same details. + */ +typedef struct +{ + char pathname[MAXPGPATH]; + pgoff_t size; + mode_t mode; + uid_t uid; + gid_t gid; + bool is_directory; + bool is_link; + char linktarget[MAXPGPATH]; +} bbstreamer_member; + +/* + * Generally, each type of bbstreamer will define its own struct, but the + * first element should be 'bbstreamer base'. A bbstreamer that does not + * require any additional private data could use this structure directly. + * + * bbs_ops is a pointer to the bbstreamer_ops object which contains the + * function pointers appropriate to this type of bbstreamer. + * + * bbs_next is a pointer to the successor bbstreamer, for those types of + * bbstreamer which forward data to a successor. It need not be used and + * should be set to NULL when not relevant. + * + * bbs_buffer is a buffer for accumulating data for temporary storage. Each + * type of bbstreamer makes its own decisions about whether and how to use + * this buffer. + */ +struct bbstreamer +{ + const bbstreamer_ops *bbs_ops; + bbstreamer *bbs_next; + StringInfoData bbs_buffer; +}; + +/* + * There are three callbacks for a bbstreamer. The 'content' callback is + * called repeatedly, as described in the bbstreamer_archive_context comments. + * Then, the 'finalize' callback is called once at the end, to give the + * bbstreamer a chance to perform cleanup such as closing files. Finally, + * because this code is running in a frontend environment where, as of this + * writing, there are no memory contexts, the 'free' callback is called to + * release memory. These callbacks should always be invoked using the static + * inline functions defined below. + */ +struct bbstreamer_ops +{ + void (*content) (bbstreamer *streamer, bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); + void (*finalize) (bbstreamer *streamer); + void (*free) (bbstreamer *streamer); +}; + +/* Send some content to a bbstreamer. */ +static inline void +bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + Assert(streamer != NULL); + streamer->bbs_ops->content(streamer, member, data, len, context); +} + +/* Finalize a bbstreamer. */ +static inline void +bbstreamer_finalize(bbstreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->finalize(streamer); +} + +/* Free a bbstreamer. */ +static inline void +bbstreamer_free(bbstreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->free(streamer); +} + +/* + * This is a convenience method for use when implementing a bbstreamer; it is + * not for use by outside callers. It adds the amount of data specified by + * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data' + * accordingly. + */ +static inline void +bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len, + int nbytes) +{ + Assert(nbytes <= *len); + + appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); + *len -= nbytes; + *data += nbytes; +} + +/* + * This is a convenence method for use when implementing a bbstreamer; it is + * not for use by outsider callers. It attempts to add enough data to the + * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len' + * and '*data' accordingly. It returns true if the target length has been + * reached and false otherwise. + */ +static inline bool +bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len, + int target_bytes) +{ + int buflen = streamer->bbs_buffer.len; + + if (buflen >= target_bytes) + { + /* Target length already reached; nothing to do. */ + return true; + } + + if (buflen + *len < target_bytes) + { + /* Not enough data to reach target length; buffer all of it. */ + bbstreamer_buffer_bytes(streamer, data, len, *len); + return false; + } + + /* Buffer just enough to reach the target length. */ + bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); + return true; +} + +/* + * Functions for creating bbstreamer objects of various types. See the header + * comments for each of these functions for details. + */ +extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file); +extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file, + int compresslevel); +extern bbstreamer *bbstreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)); + +extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); +extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); + +extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents); +extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname, + char *data, int len); + +#endif diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/bbstreamer_file.c new file mode 100644 index 0000000000..03e1ea2550 --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer_file.c @@ -0,0 +1,579 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer_file.c + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer_file.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#ifdef HAVE_LIBZ +#include +#endif + +#include + +#include "bbstreamer.h" +#include "common/logging.h" +#include "common/file_perm.h" +#include "common/string.h" + +typedef struct bbstreamer_plain_writer +{ + bbstreamer base; + char *pathname; + FILE *file; + bool should_close_file; +} bbstreamer_plain_writer; + +#ifdef HAVE_LIBZ +typedef struct bbstreamer_gzip_writer +{ + bbstreamer base; + char *pathname; + gzFile gzfile; +} bbstreamer_gzip_writer; +#endif + +typedef struct bbstreamer_extractor +{ + bbstreamer base; + char *basepath; + const char *(*link_map) (const char *); + void (*report_output_file) (const char *); + char filename[MAXPGPATH]; + FILE *file; +} bbstreamer_extractor; + +static void bbstreamer_plain_writer_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_plain_writer_finalize(bbstreamer *streamer); +static void bbstreamer_plain_writer_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_plain_writer_ops = { + .content = bbstreamer_plain_writer_content, + .finalize = bbstreamer_plain_writer_finalize, + .free = bbstreamer_plain_writer_free +}; + +#ifdef HAVE_LIBZ +static void bbstreamer_gzip_writer_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer); +static void bbstreamer_gzip_writer_free(bbstreamer *streamer); +static const char *get_gz_error(gzFile gzf); + +const bbstreamer_ops bbstreamer_gzip_writer_ops = { + .content = bbstreamer_gzip_writer_content, + .finalize = bbstreamer_gzip_writer_finalize, + .free = bbstreamer_gzip_writer_free +}; +#endif + +static void bbstreamer_extractor_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_extractor_finalize(bbstreamer *streamer); +static void bbstreamer_extractor_free(bbstreamer *streamer); +static void extract_directory(const char *filename, mode_t mode); +static void extract_link(const char *filename, const char *linktarget); +static FILE *create_file_for_extract(const char *filename, mode_t mode); + +const bbstreamer_ops bbstreamer_extractor_ops = { + .content = bbstreamer_extractor_content, + .finalize = bbstreamer_extractor_finalize, + .free = bbstreamer_extractor_free +}; + +/* + * Create a bbstreamer that just writes data to a file. + * + * The caller must specify a pathname and may specify a file. The pathname is + * used for error-reporting purposes either way. If file is NULL, the pathname + * also identifies the file to which the data should be written: it is opened + * for writing and closed when done. If file is not NULL, the data is written + * there. + */ +bbstreamer * +bbstreamer_plain_writer_new(char *pathname, FILE *file) +{ + bbstreamer_plain_writer *streamer; + + streamer = palloc0(sizeof(bbstreamer_plain_writer)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_plain_writer_ops; + + streamer->pathname = pstrdup(pathname); + streamer->file = file; + + if (file == NULL) + { + streamer->file = fopen(pathname, "wb"); + if (streamer->file == NULL) + { + pg_log_error("could not create file \"%s\": %m", pathname); + exit(1); + } + streamer->should_close_file = true; + } + + return &streamer->base; +} + +/* + * Write archive content to file. + */ +static void +bbstreamer_plain_writer_content(bbstreamer *streamer, + bbstreamer_member *member, const char *data, + int len, bbstreamer_archive_context context) +{ + bbstreamer_plain_writer *mystreamer; + + mystreamer = (bbstreamer_plain_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to file \"%s\": %m", + mystreamer->pathname); + exit(1); + } +} + +/* + * End-of-archive processing when writing to a plain file consists of closing + * the file if we opened it, but not if the caller provided it. + */ +static void +bbstreamer_plain_writer_finalize(bbstreamer *streamer) +{ + bbstreamer_plain_writer *mystreamer; + + mystreamer = (bbstreamer_plain_writer *) streamer; + + if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) + { + pg_log_error("could not close file \"%s\": %m", + mystreamer->pathname); + exit(1); + } + + mystreamer->file = NULL; + mystreamer->should_close_file = false; +} + +/* + * Free memory associated with this bbstreamer. + */ +static void +bbstreamer_plain_writer_free(bbstreamer *streamer) +{ + bbstreamer_plain_writer *mystreamer; + + mystreamer = (bbstreamer_plain_writer *) streamer; + + Assert(!mystreamer->should_close_file); + Assert(mystreamer->base.bbs_next == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Create a bbstreamer that just compresses data using gzip, and then writes + * it to a file. + * + * As in the case of bbstreamer_plain_writer_new, pathname is always used + * for error reporting purposes; if file is NULL, it is also the opened and + * closed so that the data may be written there. + */ +bbstreamer * +bbstreamer_gzip_writer_new(char *pathname, FILE *file, int compresslevel) +{ +#ifdef HAVE_LIBZ + bbstreamer_gzip_writer *streamer; + + streamer = palloc0(sizeof(bbstreamer_gzip_writer)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_gzip_writer_ops; + + streamer->pathname = pstrdup(pathname); + + if (file == NULL) + { + streamer->gzfile = gzopen(pathname, "wb"); + if (streamer->gzfile == NULL) + { + pg_log_error("could not create compressed file \"%s\": %m", + pathname); + exit(1); + } + } + else + { + int fd = dup(fileno(file)); + + if (fd < 0) + { + pg_log_error("could not duplicate stdout: %m"); + exit(1); + } + + streamer->gzfile = gzdopen(fd, "wb"); + if (streamer->gzfile == NULL) + { + pg_log_error("could not open output file: %m"); + exit(1); + } + } + + if (gzsetparams(streamer->gzfile, compresslevel, + Z_DEFAULT_STRATEGY) != Z_OK) + { + pg_log_error("could not set compression level %d: %s", + compresslevel, get_gz_error(streamer->gzfile)); + exit(1); + } + + return &streamer->base; +#else + pg_log_error("this build does not support compression"); + exit(1); +#endif +} + +#ifdef HAVE_LIBZ +/* + * Write archive content to gzip file. + */ +static void +bbstreamer_gzip_writer_content(bbstreamer *streamer, + bbstreamer_member *member, const char *data, + int len, bbstreamer_archive_context context) +{ + bbstreamer_gzip_writer *mystreamer; + + mystreamer = (bbstreamer_gzip_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (gzwrite(mystreamer->gzfile, data, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to compressed file \"%s\": %s", + mystreamer->pathname, get_gz_error(mystreamer->gzfile)); + exit(1); + } +} + +/* + * End-of-archive processing when writing to a gzip file consists of just + * calling gzclose. + * + * It makes no difference whether we opened the file or the caller did it, + * because libz provides no way of avoiding a close on the underling file + * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to + * work around this issue, so that the behavior from the caller's viewpoint + * is the same as for bbstreamer_plain_writer. + */ +static void +bbstreamer_gzip_writer_finalize(bbstreamer *streamer) +{ + bbstreamer_gzip_writer *mystreamer; + + mystreamer = (bbstreamer_gzip_writer *) streamer; + + if (gzclose(mystreamer->gzfile) != 0) + { + pg_log_error("could not close compressed file \"%s\": %s", + mystreamer->pathname, + get_gz_error(mystreamer->gzfile)); + exit(1); + } + + mystreamer->gzfile = NULL; +} + +/* + * Free memory associated with this bbstreamer. + */ +static void +bbstreamer_gzip_writer_free(bbstreamer *streamer) +{ + bbstreamer_gzip_writer *mystreamer; + + mystreamer = (bbstreamer_gzip_writer *) streamer; + + Assert(mystreamer->base.bbs_next == NULL); + Assert(mystreamer->gzfile == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Helper function for libz error reporting. + */ +static const char * +get_gz_error(gzFile gzf) +{ + int errnum; + const char *errmsg; + + errmsg = gzerror(gzf, &errnum); + if (errnum == Z_ERRNO) + return strerror(errno); + else + return errmsg; +} +#endif + +/* + * Create a bbstreamer that extracts an archive. + * + * All pathnames in the archive are interpreted relative to basepath. + * + * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here + * with untyped chunks; we need typed chunks which follow the rules described + * in bbstreamer.h. Assuming we have that, we don't need to worry about the + * original archive format; it's enough to just look at the member information + * provided and write to the corresponding file. + * + * 'link_map' is a function that will be applied to the target of any + * symbolic link, and which should return a replacement pathname to be used + * in its place. If NULL, the symbolic link target is used without + * modification. + * + * 'report_output_file' is a function that will be called each time we open a + * new output file. The pathname to that file is passed as an argument. If + * NULL, the call is skipped. + */ +bbstreamer * +bbstreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)) +{ + bbstreamer_extractor *streamer; + + streamer = palloc0(sizeof(bbstreamer_extractor)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_extractor_ops; + streamer->basepath = pstrdup(basepath); + streamer->link_map = link_map; + streamer->report_output_file = report_output_file; + + return &streamer->base; +} + +/* + * Extract archive contents to the filesystem. + */ +static void +bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; + int fnamelen; + + Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); + Assert(context != BBSTREAMER_UNKNOWN); + + switch (context) + { + case BBSTREAMER_MEMBER_HEADER: + Assert(mystreamer->file == NULL); + + /* Prepend basepath. */ + snprintf(mystreamer->filename, sizeof(mystreamer->filename), + "%s/%s", mystreamer->basepath, member->pathname); + + /* Remove any trailing slash. */ + fnamelen = strlen(mystreamer->filename); + if (mystreamer->filename[fnamelen - 1] == '/') + mystreamer->filename[fnamelen - 1] = '\0'; + + /* Dispatch based on file type. */ + if (member->is_directory) + extract_directory(mystreamer->filename, member->mode); + else if (member->is_link) + { + const char *linktarget = member->linktarget; + + if (mystreamer->link_map) + linktarget = mystreamer->link_map(linktarget); + extract_link(mystreamer->filename, linktarget); + } + else + mystreamer->file = + create_file_for_extract(mystreamer->filename, + member->mode); + + /* Report output file change. */ + if (mystreamer->report_output_file) + mystreamer->report_output_file(mystreamer->filename); + break; + + case BBSTREAMER_MEMBER_CONTENTS: + if (mystreamer->file == NULL) + break; + + errno = 0; + if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to file \"%s\": %m", + mystreamer->filename); + exit(1); + } + break; + + case BBSTREAMER_MEMBER_TRAILER: + if (mystreamer->file == NULL) + break; + fclose(mystreamer->file); + mystreamer->file = NULL; + break; + + case BBSTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_log_error("unexpected state while extracting archive"); + exit(1); + } +} + +/* + * Create a directory. + */ +static void +extract_directory(const char *filename, mode_t mode) +{ + if (mkdir(filename, pg_dir_create_mode) != 0) + { + /* + * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will + * have been created by the wal receiver process. Also, when the WAL + * directory location was specified, pg_wal (or pg_xlog) has already + * been created as a symbolic link before starting the actual backup. + * So just ignore creation failures on related directories. + */ + if (!((pg_str_endswith(filename, "/pg_wal") || + pg_str_endswith(filename, "/pg_xlog") || + pg_str_endswith(filename, "/archive_status")) && + errno == EEXIST)) + { + pg_log_error("could not create directory \"%s\": %m", + filename); + exit(1); + } + } + +#ifndef WIN32 + if (chmod(filename, mode)) + { + pg_log_error("could not set permissions on directory \"%s\": %m", + filename); + exit(1); + } +#endif +} + +/* + * Create a symbolic link. + * + * It's most likely a link in pg_tblspc directory, to the location of a + * tablespace. Apply any tablespace mapping given on the command line + * (--tablespace-mapping). (We blindly apply the mapping without checking that + * the link really is inside pg_tblspc. We don't expect there to be other + * symlinks in a data directory, but if there are, you can call it an + * undocumented feature that you can map them too.) + */ +static void +extract_link(const char *filename, const char *linktarget) +{ + if (symlink(linktarget, filename) != 0) + { + pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", + filename, linktarget); + exit(1); + } +} + +/* + * Create a regular file. + * + * Return the resulting handle so we can write the content to the file. + */ +static FILE * +create_file_for_extract(const char *filename, mode_t mode) +{ + FILE *file; + + file = fopen(filename, "wb"); + if (file == NULL) + { + pg_log_error("could not create file \"%s\": %m", filename); + exit(1); + } + +#ifndef WIN32 + if (chmod(filename, mode)) + { + pg_log_error("could not set permissions on file \"%s\": %m", + filename); + exit(1); + } +#endif + + return file; +} + +/* + * End-of-stream processing for extracting an archive. + * + * There's nothing to do here but sanity checking. + */ +static void +bbstreamer_extractor_finalize(bbstreamer *streamer) +{ + bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; + + Assert(mystreamer->file == NULL); +} + +/* + * Free memory. + */ +static void +bbstreamer_extractor_free(bbstreamer *streamer) +{ + bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; + + pfree(mystreamer->basepath); + pfree(mystreamer); +} diff --git a/src/bin/pg_basebackup/bbstreamer_inject.c b/src/bin/pg_basebackup/bbstreamer_inject.c new file mode 100644 index 0000000000..4d15251fdc --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer_inject.c @@ -0,0 +1,250 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer_inject.c + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer_inject.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "bbstreamer.h" +#include "common/file_perm.h" +#include "common/logging.h" + +typedef struct bbstreamer_recovery_injector +{ + bbstreamer base; + bool skip_file; + bool is_recovery_guc_supported; + bool is_postgresql_auto_conf; + bool found_postgresql_auto_conf; + PQExpBuffer recoveryconfcontents; + bbstreamer_member member; +} bbstreamer_recovery_injector; + +static void bbstreamer_recovery_injector_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer); +static void bbstreamer_recovery_injector_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_recovery_injector_ops = { + .content = bbstreamer_recovery_injector_content, + .finalize = bbstreamer_recovery_injector_finalize, + .free = bbstreamer_recovery_injector_free +}; + +/* + * Create a bbstreamer that can edit recoverydata into an archive stream. + * + * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as + * per the conventions described in bbstreamer.h; the chunks forwarded to + * the next bbstreamer will be similarly typed, but the + * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've + * edited the archive stream. + * + * Our goal is to do one of the following three things with the content passed + * via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then + * put the content into recovery.conf, replacing any existing archive member + * by that name; (2) if is_recovery_guc_supported is true and + * postgresql.auto.conf exists in the archive, then append the content + * provided to the existing file; and (3) if is_recovery_guc_supported is + * true but postgresql.auto.conf does not exist in the archive, then create + * it with the specified content. + * + * In addition, if is_recovery_guc_supported is true, then we create a + * zero-length standby.signal file, dropping any file with that name from + * the archive. + */ +extern bbstreamer * +bbstreamer_recovery_injector_new(bbstreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents) +{ + bbstreamer_recovery_injector *streamer; + + streamer = palloc0(sizeof(bbstreamer_recovery_injector)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_recovery_injector_ops; + streamer->base.bbs_next = next; + streamer->is_recovery_guc_supported = is_recovery_guc_supported; + streamer->recoveryconfcontents = recoveryconfcontents; + + return &streamer->base; +} + +/* + * Handle each chunk of tar content while injecting recovery configuration. + */ +static void +bbstreamer_recovery_injector_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_recovery_injector *mystreamer; + + mystreamer = (bbstreamer_recovery_injector *) streamer; + Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); + + switch (context) + { + case BBSTREAMER_MEMBER_HEADER: + /* Must copy provided data so we have the option to modify it. */ + memcpy(&mystreamer->member, member, sizeof(bbstreamer_member)); + + /* + * On v12+, skip standby.signal and edit postgresql.auto.conf; on + * older versions, skip recovery.conf. + */ + if (mystreamer->is_recovery_guc_supported) + { + mystreamer->skip_file = + (strcmp(member->pathname, "standby.signal") == 0); + mystreamer->is_postgresql_auto_conf = + (strcmp(member->pathname, "postgresql.auto.conf") == 0); + if (mystreamer->is_postgresql_auto_conf) + { + /* Remember we saw it so we don't add it again. */ + mystreamer->found_postgresql_auto_conf = true; + + /* Increment length by data to be injected. */ + mystreamer->member.size += + mystreamer->recoveryconfcontents->len; + + /* + * Zap data and len because the archive header is no + * longer valid; some subsequent bbstreamer must + * regenerate it if it's necessary. + */ + data = NULL; + len = 0; + } + } + else + mystreamer->skip_file = + (strcmp(member->pathname, "recovery.conf") == 0); + + /* Do not forward if the file is to be skipped. */ + if (mystreamer->skip_file) + return; + break; + + case BBSTREAMER_MEMBER_CONTENTS: + /* Do not forward if the file is to be skipped. */ + if (mystreamer->skip_file) + return; + break; + + case BBSTREAMER_MEMBER_TRAILER: + /* Do not forward it the file is to be skipped. */ + if (mystreamer->skip_file) + return; + + /* Append provided content to whatever we already sent. */ + if (mystreamer->is_postgresql_auto_conf) + bbstreamer_content(mystreamer->base.bbs_next, member, + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len, + BBSTREAMER_MEMBER_CONTENTS); + break; + + case BBSTREAMER_ARCHIVE_TRAILER: + if (mystreamer->is_recovery_guc_supported) + { + /* + * If we didn't already find (and thus modify) + * postgresql.auto.conf, inject it as an additional archive + * member now. + */ + if (!mystreamer->found_postgresql_auto_conf) + bbstreamer_inject_file(mystreamer->base.bbs_next, + "postgresql.auto.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); + + /* Inject empty standby.signal file. */ + bbstreamer_inject_file(mystreamer->base.bbs_next, + "standby.signal", "", 0); + } + else + { + /* Inject recovery.conf file with specified contents. */ + bbstreamer_inject_file(mystreamer->base.bbs_next, + "recovery.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); + } + + /* Nothing to do here. */ + break; + + default: + /* Shouldn't happen. */ + pg_log_error("unexpected state while injecting recovery settings"); + exit(1); + } + + bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member, + data, len, context); +} + +/* + * End-of-stream processing for this bbstreamer. + */ +static void +bbstreamer_recovery_injector_finalize(bbstreamer *streamer) +{ + bbstreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with this bbstreamer. + */ +static void +bbstreamer_recovery_injector_free(bbstreamer *streamer) +{ + bbstreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Inject a member into the archive with specified contents. + */ +void +bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data, + int len) +{ + bbstreamer_member member; + + strlcpy(member.pathname, pathname, MAXPGPATH); + member.size = len; + member.mode = pg_file_create_mode; + member.is_directory = false; + member.is_link = false; + member.linktarget[0] = '\0'; + + /* + * There seems to be no principled argument for these values, but they are + * what PostgreSQL has historically used. + */ + member.uid = 04000; + member.gid = 02000; + + /* + * We don't know here how to generate valid member headers and trailers + * for the archiving format in use, so if those are needed, some successor + * bbstreamer will have to generate them using the data from 'member'. + */ + bbstreamer_content(streamer, &member, NULL, 0, + BBSTREAMER_MEMBER_HEADER); + bbstreamer_content(streamer, &member, data, len, + BBSTREAMER_MEMBER_CONTENTS); + bbstreamer_content(streamer, &member, NULL, 0, + BBSTREAMER_MEMBER_TRAILER); +} diff --git a/src/bin/pg_basebackup/bbstreamer_tar.c b/src/bin/pg_basebackup/bbstreamer_tar.c new file mode 100644 index 0000000000..5a9f587dca --- /dev/null +++ b/src/bin/pg_basebackup/bbstreamer_tar.c @@ -0,0 +1,444 @@ +/*------------------------------------------------------------------------- + * + * bbstreamer_tar.c + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/bbstreamer_tar.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "bbstreamer.h" +#include "common/logging.h" +#include "pgtar.h" + +typedef struct bbstreamer_tar_parser +{ + bbstreamer base; + bbstreamer_archive_context next_context; + bbstreamer_member member; + size_t file_bytes_sent; + size_t pad_bytes_expected; +} bbstreamer_tar_parser; + +typedef struct bbstreamer_tar_archiver +{ + bbstreamer base; + bool rearchive_member; +} bbstreamer_tar_archiver; + +static void bbstreamer_tar_parser_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_tar_parser_finalize(bbstreamer *streamer); +static void bbstreamer_tar_parser_free(bbstreamer *streamer); +static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer); + +const bbstreamer_ops bbstreamer_tar_parser_ops = { + .content = bbstreamer_tar_parser_content, + .finalize = bbstreamer_tar_parser_finalize, + .free = bbstreamer_tar_parser_free +}; + +static void bbstreamer_tar_archiver_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context); +static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer); +static void bbstreamer_tar_archiver_free(bbstreamer *streamer); + +const bbstreamer_ops bbstreamer_tar_archiver_ops = { + .content = bbstreamer_tar_archiver_content, + .finalize = bbstreamer_tar_archiver_finalize, + .free = bbstreamer_tar_archiver_free +}; + +/* + * Create a bbstreamer that can parse a stream of content as tar data. + * + * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer + * specified by 'next' will receive a series of typed chunks, as per the + * conventions described in bbstreamer.h. + */ +extern bbstreamer * +bbstreamer_tar_parser_new(bbstreamer *next) +{ + bbstreamer_tar_parser *streamer; + + streamer = palloc0(sizeof(bbstreamer_tar_parser)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_tar_parser_ops; + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->next_context = BBSTREAMER_MEMBER_HEADER; + + return &streamer->base; +} + +/* + * Parse unknown content as tar data. + */ +static void +bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; + size_t nbytes; + + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == BBSTREAMER_UNKNOWN); + + while (len > 0) + { + switch (mystreamer->next_context) + { + case BBSTREAMER_MEMBER_HEADER: + + /* + * If we're expecting an archive member header, accumulate a + * full block of data before doing anything further. + */ + if (!bbstreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) + return; + + /* + * Now we can process the header and get ready to process the + * file contents; however, we might find out that what we + * thought was the next file header is actually the start of + * the archive trailer. Switch modes accordingly. + */ + if (bbstreamer_tar_header(mystreamer)) + { + if (mystreamer->member.size == 0) + { + /* No content; trailer is zero-length. */ + bbstreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + BBSTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + } + else + { + /* Expect contents. */ + mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS; + } + mystreamer->base.bbs_buffer.len = 0; + mystreamer->file_bytes_sent = 0; + } + else + mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER; + break; + + case BBSTREAMER_MEMBER_CONTENTS: + + /* + * Send as much content as we have, but not more than the + * remaining file length. + */ + Assert(mystreamer->file_bytes_sent < mystreamer->member.size); + nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; + nbytes = Min(nbytes, len); + Assert(nbytes > 0); + bbstreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + BBSTREAMER_MEMBER_CONTENTS); + mystreamer->file_bytes_sent += nbytes; + data += nbytes; + len -= nbytes; + + /* + * If we've not yet sent the whole file, then there's more + * content to come; otherwise, it's time to expect the file + * trailer. + */ + Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); + if (mystreamer->file_bytes_sent == mystreamer->member.size) + { + if (mystreamer->pad_bytes_expected == 0) + { + /* Trailer is zero-length. */ + bbstreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + BBSTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + } + else + { + /* Trailer is not zero-length. */ + mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER; + } + mystreamer->base.bbs_buffer.len = 0; + } + break; + + case BBSTREAMER_MEMBER_TRAILER: + + /* + * If we're expecting an archive member trailer, accumulate + * the expected number of padding bytes before sending + * anything onward. + */ + if (!bbstreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) + return; + + /* OK, now we can send it. */ + bbstreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + BBSTREAMER_MEMBER_TRAILER); + + /* Expect next file header. */ + mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + mystreamer->base.bbs_buffer.len = 0; + break; + + case BBSTREAMER_ARCHIVE_TRAILER: + + /* + * We've seen an end-of-archive indicator, so anything more is + * buffered and sent as part of the archive trailer. But we + * don't expect more than 2 blocks. + */ + bbstreamer_buffer_bytes(streamer, &data, &len, len); + if (len > 2 * TAR_BLOCK_SIZE) + { + pg_log_error("tar file trailer exceeds 2 blocks"); + exit(1); + } + return; + + default: + /* Shouldn't happen. */ + pg_log_error("unexpected state while parsing tar archive"); + exit(1); + } + } +} + +/* + * Parse a file header within a tar stream. + * + * The return value is true if we found a file header and passed it on to the + * next bbstreamer; it is false if we have reached the archive trailer. + */ +static bool +bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer) +{ + bool has_nonzero_byte = false; + int i; + bbstreamer_member *member = &mystreamer->member; + char *buffer = mystreamer->base.bbs_buffer.data; + + Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); + + /* Check whether we've got a block of all zero bytes. */ + for (i = 0; i < TAR_BLOCK_SIZE; ++i) + { + if (buffer[i] != '\0') + { + has_nonzero_byte = true; + break; + } + } + + /* + * If the entire block was zeros, this is the end of the archive, not the + * start of the next file. + */ + if (!has_nonzero_byte) + return false; + + /* + * Parse key fields out of the header. + * + * FIXME: It's terrible that we use hard-coded values here instead of some + * more principled approach. It's been like this for a long time, but we + * ought to do better. + */ + strlcpy(member->pathname, &buffer[0], MAXPGPATH); + if (member->pathname[0] == '\0') + { + pg_log_error("tar member has empty name"); + exit(1); + } + member->size = read_tar_number(&buffer[124], 12); + member->mode = read_tar_number(&buffer[100], 8); + member->uid = read_tar_number(&buffer[108], 8); + member->gid = read_tar_number(&buffer[116], 8); + member->is_directory = (buffer[156] == '5'); + member->is_link = (buffer[156] == '2'); + if (member->is_link) + strlcpy(member->linktarget, &buffer[157], 100); + + /* Compute number of padding bytes. */ + mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); + + /* Forward the entire header to the next bbstreamer. */ + bbstreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + BBSTREAMER_MEMBER_HEADER); + + return true; +} + +/* + * End-of-stream processing for a tar parser. + */ +static void +bbstreamer_tar_parser_finalize(bbstreamer *streamer) +{ + bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; + + if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER || + mystreamer->base.bbs_buffer.len > 0)) + { + pg_log_error("COPY stream ended before last file was finished"); + exit(1); + } + + /* Send the archive trailer, even if empty. */ + bbstreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + BBSTREAMER_ARCHIVE_TRAILER); + + /* Now finalize successor. */ + bbstreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar parser. + */ +static void +bbstreamer_tar_parser_free(bbstreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + bbstreamer_free(streamer->bbs_next); +} + +/* + * Create an bbstreamer that can generate a tar archive. + * + * This is intended to be usable either for generating a brand-new tar archive + * or for modifying one on the fly. The input should be a series of typed + * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for + * bbstreamer_tar_parser_content. + */ +extern bbstreamer * +bbstreamer_tar_archiver_new(bbstreamer *next) +{ + bbstreamer_tar_archiver *streamer; + + streamer = palloc0(sizeof(bbstreamer_tar_archiver)); + *((const bbstreamer_ops **) &streamer->base.bbs_ops) = + &bbstreamer_tar_archiver_ops; + streamer->base.bbs_next = next; + + return &streamer->base; +} + +/* + * Fix up the stream of input chunks to create a valid tar file. + * + * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is + * passed through without change. Any other size is a fatal error (and + * indicates a bug). + * + * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from + * scratch. Specifically, we construct a block of zero bytes sufficient to + * pad out to a block boundary, as required by the tar format. Other + * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change. + * + * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * + * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * blocks of zero bytes. Not all tar programs require this, but apparently + * some do. The server does not supply this trailer. If no archive trailer is + * present, one will be added by bbstreamer_tar_parser_finalize. + */ +static void +bbstreamer_tar_archiver_content(bbstreamer *streamer, + bbstreamer_member *member, + const char *data, int len, + bbstreamer_archive_context context) +{ + bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer; + char buffer[2 * TAR_BLOCK_SIZE]; + + Assert(context != BBSTREAMER_UNKNOWN); + + if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + { + Assert(len == 0); + + /* Replace zero-length tar header with a newly constructed one. */ + tarCreateHeader(buffer, member->pathname, NULL, + member->size, member->mode, member->uid, member->gid, + time(NULL)); + data = buffer; + len = TAR_BLOCK_SIZE; + + /* Also make a note to replace padding, in case size changed. */ + mystreamer->rearchive_member = true; + } + else if (context == BBSTREAMER_MEMBER_TRAILER && + mystreamer->rearchive_member) + { + int pad_bytes = tarPaddingBytesRequired(member->size); + + /* Also replace padding, if we regenerated the header. */ + memset(buffer, 0, pad_bytes); + data = buffer; + len = pad_bytes; + + /* Don't do this agian unless we replace another header. */ + mystreamer->rearchive_member = false; + } + else if (context == BBSTREAMER_ARCHIVE_TRAILER) + { + /* Trailer should always be two blocks of zero bytes. */ + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + data = buffer; + len = 2 * TAR_BLOCK_SIZE; + } + + bbstreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * End-of-stream processing for a tar archiver. + */ +static void +bbstreamer_tar_archiver_finalize(bbstreamer *streamer) +{ + bbstreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar archiver. + */ +static void +bbstreamer_tar_archiver_free(bbstreamer *streamer) +{ + bbstreamer_free(streamer->bbs_next); + pfree(streamer); +} diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index cdea3711b7..169afa5645 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -28,18 +28,13 @@ #endif #include "access/xlog_internal.h" +#include "bbstreamer.h" #include "common/file_perm.h" #include "common/file_utils.h" #include "common/logging.h" -#include "common/string.h" #include "fe_utils/option_utils.h" #include "fe_utils/recovery_gen.h" -#include "fe_utils/string_utils.h" #include "getopt_long.h" -#include "libpq-fe.h" -#include "pgtar.h" -#include "pgtime.h" -#include "pqexpbuffer.h" #include "receivelog.h" #include "replication/basebackup.h" #include "streamutil.h" @@ -62,34 +57,9 @@ typedef struct TablespaceList typedef struct WriteTarState { int tablespacenum; - char filename[MAXPGPATH]; - FILE *tarfile; - char tarhdr[TAR_BLOCK_SIZE]; - bool basetablespace; - bool in_tarhdr; - bool skip_file; - bool is_recovery_guc_supported; - bool is_postgresql_auto_conf; - bool found_postgresql_auto_conf; - int file_padding_len; - size_t tarhdrsz; - pgoff_t filesz; -#ifdef HAVE_LIBZ - gzFile ztarfile; -#endif + bbstreamer *streamer; } WriteTarState; -typedef struct UnpackTarState -{ - int tablespacenum; - char current_path[MAXPGPATH]; - char filename[MAXPGPATH]; - const char *mapped_tblspc_path; - pgoff_t current_len_left; - int current_padding; - FILE *file; -} UnpackTarState; - typedef struct WriteManifestState { char filename[MAXPGPATH]; @@ -161,10 +131,11 @@ static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; -/* Progress counters */ +/* Progress indicators */ static uint64 totalsize_kb; static uint64 totaldone; static int tablespacecount; +static const char *progress_filename; /* Pipe to communicate with background wal receiver process */ #ifndef WIN32 @@ -190,14 +161,15 @@ static PQExpBuffer recoveryconfcontents = NULL; /* Function headers */ static void usage(void); static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found); -static void progress_report(int tablespacenum, const char *filename, bool force, - bool finished); +static void progress_update_filename(const char *filename); +static void progress_report(int tablespacenum, bool force, bool finished); -static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); +static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, + bbstreamer **manifest_inject_streamer_p, + bool is_recovery_guc_supported); +static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, + bool tablespacenum); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); -static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); -static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, - void *callback_data); static void ReceiveBackupManifest(PGconn *conn); static void ReceiveBackupManifestChunk(size_t r, char *copybuf, void *callback_data); @@ -360,21 +332,6 @@ tablespace_list_append(const char *arg) } -#ifdef HAVE_LIBZ -static const char * -get_gz_error(gzFile gzf) -{ - int errnum; - const char *errmsg; - - errmsg = gzerror(gzf, &errnum); - if (errnum == Z_ERRNO) - return strerror(errno); - else - return errmsg; -} -#endif - static void usage(void) { @@ -766,6 +723,14 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found) } } +/* + * Callback to update our notion of the current filename. + */ +static void +progress_update_filename(const char *filename) +{ + progress_filename = filename; +} /* * Print a progress report based on the global variables. If verbose output @@ -778,8 +743,7 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found) * is moved to the next line. */ static void -progress_report(int tablespacenum, const char *filename, - bool force, bool finished) +progress_report(int tablespacenum, bool force, bool finished) { int percent; char totaldone_str[32]; @@ -814,7 +778,7 @@ progress_report(int tablespacenum, const char *filename, #define VERBOSE_FILENAME_LENGTH 35 if (verbose) { - if (!filename) + if (!progress_filename) /* * No filename given, so clear the status line (used for last @@ -830,7 +794,7 @@ progress_report(int tablespacenum, const char *filename, VERBOSE_FILENAME_LENGTH + 5, ""); else { - bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH); + bool truncate = (strlen(progress_filename) > VERBOSE_FILENAME_LENGTH); fprintf(stderr, ngettext("%*s/%s kB (%d%%), %d/%d tablespace (%s%-*.*s)", @@ -844,7 +808,7 @@ progress_report(int tablespacenum, const char *filename, truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, /* Truncate filename at beginning if it's too long */ - truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename); + truncate ? progress_filename + strlen(progress_filename) - VERBOSE_FILENAME_LENGTH + 3 : progress_filename); } } else @@ -990,257 +954,170 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, } /* - * Write a piece of tar data + * Figure out what to do with an archive received from the server based on + * the options selected by the user. We may just write the results directly + * to a file, or we might compress first, or we might extract the tar file + * and write each member separately. This function doesn't do any of that + * directly, but it works out what kind of bbstreamer we need to create so + * that the right stuff happens when, down the road, we actually receive + * the data. */ -static void -writeTarData(WriteTarState *state, char *buf, int r) +static bbstreamer * +CreateBackupStreamer(char *archive_name, char *spclocation, + bbstreamer **manifest_inject_streamer_p, + bool is_recovery_guc_supported) { -#ifdef HAVE_LIBZ - if (state->ztarfile != NULL) - { - errno = 0; - if (gzwrite(state->ztarfile, buf, r) != r) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_log_error("could not write to compressed file \"%s\": %s", - state->filename, get_gz_error(state->ztarfile)); - exit(1); - } - } - else -#endif - { - errno = 0; - if (fwrite(buf, r, 1, state->tarfile) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_log_error("could not write to file \"%s\": %m", - state->filename); - exit(1); - } - } -} - -/* - * Receive a tar format file from the connection to the server, and write - * the data from this file directly into a tar file. If compression is - * enabled, the data will be compressed while written to the file. - * - * The file will be named base.tar[.gz] if it's for the main data directory - * or .tar[.gz] if it's for another tablespace. - * - * No attempt to inspect or validate the contents of the file is done. - */ -static void -ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) -{ - char zerobuf[TAR_BLOCK_SIZE * 2]; - WriteTarState state; - - memset(&state, 0, sizeof(state)); - state.tablespacenum = rownum; - state.basetablespace = PQgetisnull(res, rownum, 0); - state.in_tarhdr = true; - - /* recovery.conf is integrated into postgresql.conf in 12 and newer */ - if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC) - state.is_recovery_guc_supported = true; - - if (state.basetablespace) - { - /* - * Base tablespaces - */ - if (strcmp(basedir, "-") == 0) - { -#ifdef WIN32 - _setmode(fileno(stdout), _O_BINARY); -#endif - -#ifdef HAVE_LIBZ - if (compresslevel != 0) - { - int fd = dup(fileno(stdout)); - - if (fd < 0) - { - pg_log_error("could not duplicate stdout: %m"); - exit(1); - } - - state.ztarfile = gzdopen(fd, "wb"); - if (state.ztarfile == NULL) - { - pg_log_error("could not open output file: %m"); - exit(1); - } - - if (gzsetparams(state.ztarfile, compresslevel, - Z_DEFAULT_STRATEGY) != Z_OK) - { - pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(state.ztarfile)); - exit(1); - } - } - else -#endif - state.tarfile = stdout; - strcpy(state.filename, "-"); - } - else - { -#ifdef HAVE_LIBZ - if (compresslevel != 0) - { - snprintf(state.filename, sizeof(state.filename), - "%s/base.tar.gz", basedir); - state.ztarfile = gzopen(state.filename, "wb"); - if (gzsetparams(state.ztarfile, compresslevel, - Z_DEFAULT_STRATEGY) != Z_OK) - { - pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(state.ztarfile)); - exit(1); - } - } - else -#endif - { - snprintf(state.filename, sizeof(state.filename), - "%s/base.tar", basedir); - state.tarfile = fopen(state.filename, "wb"); - } - } - } - else - { - /* - * Specific tablespace - */ -#ifdef HAVE_LIBZ - if (compresslevel != 0) - { - snprintf(state.filename, sizeof(state.filename), - "%s/%s.tar.gz", - basedir, PQgetvalue(res, rownum, 0)); - state.ztarfile = gzopen(state.filename, "wb"); - if (gzsetparams(state.ztarfile, compresslevel, - Z_DEFAULT_STRATEGY) != Z_OK) - { - pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(state.ztarfile)); - exit(1); - } - } - else -#endif - { - snprintf(state.filename, sizeof(state.filename), "%s/%s.tar", - basedir, PQgetvalue(res, rownum, 0)); - state.tarfile = fopen(state.filename, "wb"); - } - } - -#ifdef HAVE_LIBZ - if (compresslevel != 0) - { - if (!state.ztarfile) - { - /* Compression is in use */ - pg_log_error("could not create compressed file \"%s\": %s", - state.filename, get_gz_error(state.ztarfile)); - exit(1); - } - } - else -#endif - { - /* Either no zlib support, or zlib support but compresslevel = 0 */ - if (!state.tarfile) - { - pg_log_error("could not create file \"%s\": %m", state.filename); - exit(1); - } - } - - ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); - - /* - * End of copy data. If requested, and this is the base tablespace, write - * configuration file into the tarfile. When done, close the file (but not - * stdout). - * - * Also, write two completely empty blocks at the end of the tar file, as - * required by some tar programs. - */ - - MemSet(zerobuf, 0, sizeof(zerobuf)); - - if (state.basetablespace && writerecoveryconf) - { - char header[TAR_BLOCK_SIZE]; - - /* - * If postgresql.auto.conf has not been found in the streamed data, - * add recovery configuration to postgresql.auto.conf if recovery - * parameters are GUCs. If the instance connected to is older than - * 12, create recovery.conf with this data otherwise. - */ - if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported) - { - int padding; - - tarCreateHeader(header, - state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf", - NULL, - recoveryconfcontents->len, - pg_file_create_mode, 04000, 02000, - time(NULL)); - - padding = tarPaddingBytesRequired(recoveryconfcontents->len); - - writeTarData(&state, header, sizeof(header)); - writeTarData(&state, recoveryconfcontents->data, - recoveryconfcontents->len); - if (padding) - writeTarData(&state, zerobuf, padding); - } - - /* - * standby.signal is supported only if recovery parameters are GUCs. - */ - if (state.is_recovery_guc_supported) - { - tarCreateHeader(header, "standby.signal", NULL, - 0, /* zero-length file */ - pg_file_create_mode, 04000, 02000, - time(NULL)); - - writeTarData(&state, header, sizeof(header)); - - /* - * we don't need to pad out to a multiple of the tar block size - * here, because the file is zero length, which is a multiple of - * any block size. - */ - } - } + bbstreamer *streamer; + bbstreamer *manifest_inject_streamer = NULL; + bool inject_manifest; + bool must_parse_archive; /* * Normally, we emit the backup manifest as a separate file, but when * we're writing a tarfile to stdout, we don't have that option, so * include it in the one tarfile we've got. */ - if (strcmp(basedir, "-") == 0 && manifest) + inject_manifest = (format == 't' && strcmp(basedir, "-") == 0 && manifest); + + /* + * We have to parse the archive if (1) we're suppose to extract it, or if + * (2) we need to inject backup_manifest or recovery configuration into it. + */ + must_parse_archive = (format == 'p' || inject_manifest || + (spclocation == NULL && writerecoveryconf)); + + if (format == 'p') + { + const char *directory; + + /* + * In plain format, we must extract the archive. The data for the main + * tablespace will be written to the base directory, and the data for + * other tablespaces will be written to the directory where they're + * located on the server, after applying any user-specified tablespace + * mappings. + */ + directory = spclocation == NULL ? basedir + : get_tablespace_mapping(spclocation); + streamer = bbstreamer_extractor_new(directory, + get_tablespace_mapping, + progress_update_filename); + } + else + { + FILE *archive_file; + char archive_filename[MAXPGPATH]; + + /* + * In tar format, we just write the archive without extracting it. + * Normally, we write it to the archive name provided by the caller, + * but when the base directory is "-" that means we need to write + * to standard output. + */ + if (strcmp(basedir, "-") == 0) + { + snprintf(archive_filename, sizeof(archive_filename), "-"); + archive_file = stdout; + } + else + { + snprintf(archive_filename, sizeof(archive_filename), + "%s/%s", basedir, archive_name); + archive_file = NULL; + } + +#ifdef HAVE_LIBZ + if (compresslevel != 0) + { + strlcat(archive_filename, ".gz", sizeof(archive_filename)); + streamer = bbstreamer_gzip_writer_new(archive_filename, + archive_file, + compresslevel); + } + else +#endif + streamer = bbstreamer_plain_writer_new(archive_filename, + archive_file); + + + /* + * If we need to parse the archive for whatever reason, then we'll + * also need to re-archive, because, if the output format is tar, the + * only point of parsing the archive is to be able to inject stuff + * into it. + */ + if (must_parse_archive) + streamer = bbstreamer_tar_archiver_new(streamer); + progress_filename = archive_filename; + } + + /* + * If we're supposed to inject the backup manifest into the results, + * it should be done here, so that the file content can be injected + * directly, without worrying about the details of the tar format. + */ + if (inject_manifest) + manifest_inject_streamer = streamer; + + /* + * If this is the main tablespace and we're supposed to write + * recovery information, arrange to do that. + */ + if (spclocation == NULL && writerecoveryconf) + { + Assert(must_parse_archive); + streamer = bbstreamer_recovery_injector_new(streamer, + is_recovery_guc_supported, + recoveryconfcontents); + } + + /* + * If we're doing anything that involves understanding the contents of + * the archive, we'll need to parse it. + */ + if (must_parse_archive) + streamer = bbstreamer_tar_parser_new(streamer); + + /* Return the results. */ + *manifest_inject_streamer_p = manifest_inject_streamer; + return streamer; +} + +/* + * Receive raw tar data from the server, and stream it to the appropriate + * location. If we're writing a single tarfile to standard output, also + * receive the backup manifest and inject it into that tarfile. + */ +static void +ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, + bool tablespacenum) +{ + WriteTarState state; + bbstreamer *manifest_inject_streamer; + bool is_recovery_guc_supported; + + /* Pass all COPY data through to the backup streamer. */ + memset(&state, 0, sizeof(state)); + is_recovery_guc_supported = + PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC; + state.streamer = CreateBackupStreamer(archive_name, spclocation, + &manifest_inject_streamer, + is_recovery_guc_supported); + state.tablespacenum = tablespacenum; + ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); + progress_filename = NULL; + + /* + * The decision as to whether we need to inject the backup manifest into + * the output at this stage is made by CreateBackupStreamer; if that is + * needed, manifest_inject_streamer will be non-NULL; otherwise, it will + * be NULL. + */ + if (manifest_inject_streamer != NULL) { - char header[TAR_BLOCK_SIZE]; PQExpBufferData buf; + /* Slurp the entire backup manifest into a buffer. */ initPQExpBuffer(&buf); ReceiveBackupManifestInMemory(conn, &buf); if (PQExpBufferDataBroken(buf)) @@ -1248,42 +1125,20 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) pg_log_error("out of memory"); exit(1); } - tarCreateHeader(header, "backup_manifest", NULL, buf.len, - pg_file_create_mode, 04000, 02000, - time(NULL)); - writeTarData(&state, header, sizeof(header)); - writeTarData(&state, buf.data, buf.len); + + /* Inject it into the output tarfile. */ + bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest", + buf.data, buf.len); + + /* Free memory. */ termPQExpBuffer(&buf); } - /* 2 * TAR_BLOCK_SIZE bytes empty data at end of file */ - writeTarData(&state, zerobuf, sizeof(zerobuf)); + /* Cleanup. */ + bbstreamer_finalize(state.streamer); + bbstreamer_free(state.streamer); -#ifdef HAVE_LIBZ - if (state.ztarfile != NULL) - { - if (gzclose(state.ztarfile) != 0) - { - pg_log_error("could not close compressed file \"%s\": %s", - state.filename, get_gz_error(state.ztarfile)); - exit(1); - } - } - else -#endif - { - if (strcmp(basedir, "-") != 0) - { - if (fclose(state.tarfile) != 0) - { - pg_log_error("could not close file \"%s\": %m", - state.filename); - exit(1); - } - } - } - - progress_report(rownum, state.filename, true, false); + progress_report(tablespacenum, true, false); /* * Do not sync the resulting tar file yet, all files are synced once at @@ -1299,184 +1154,10 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data) { WriteTarState *state = callback_data; - if (!writerecoveryconf || !state->basetablespace) - { - /* - * When not writing config file, or when not working on the base - * tablespace, we never have to look for an existing configuration - * file in the stream. - */ - writeTarData(state, copybuf, r); - } - else - { - /* - * Look for a config file in the existing tar stream. If it's there, - * we must skip it so we can later overwrite it with our own version - * of the file. - * - * To do this, we have to process the individual files inside the TAR - * stream. The stream consists of a header and zero or more chunks, - * each with a length equal to TAR_BLOCK_SIZE. The stream from the - * server is broken up into smaller pieces, so we have to track the - * size of the files to find the next header structure. - */ - int rr = r; - int pos = 0; + bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN); - while (rr > 0) - { - if (state->in_tarhdr) - { - /* - * We're currently reading a header structure inside the TAR - * stream, i.e. the file metadata. - */ - if (state->tarhdrsz < TAR_BLOCK_SIZE) - { - /* - * Copy the header structure into tarhdr in case the - * header is not aligned properly or it's not returned in - * whole by the last PQgetCopyData call. - */ - int hdrleft; - int bytes2copy; - - hdrleft = TAR_BLOCK_SIZE - state->tarhdrsz; - bytes2copy = (rr > hdrleft ? hdrleft : rr); - - memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos, - bytes2copy); - - rr -= bytes2copy; - pos += bytes2copy; - state->tarhdrsz += bytes2copy; - } - else - { - /* - * We have the complete header structure in tarhdr, look - * at the file metadata: we may want append recovery info - * into postgresql.auto.conf and skip standby.signal file - * if recovery parameters are integrated as GUCs, and - * recovery.conf otherwise. In both cases we must - * calculate tar padding. - */ - if (state->is_recovery_guc_supported) - { - state->skip_file = - (strcmp(&state->tarhdr[0], "standby.signal") == 0); - state->is_postgresql_auto_conf = - (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0); - } - else - state->skip_file = - (strcmp(&state->tarhdr[0], "recovery.conf") == 0); - - state->filesz = read_tar_number(&state->tarhdr[124], 12); - state->file_padding_len = - tarPaddingBytesRequired(state->filesz); - - if (state->is_recovery_guc_supported && - state->is_postgresql_auto_conf && - writerecoveryconf) - { - /* replace tar header */ - char header[TAR_BLOCK_SIZE]; - - tarCreateHeader(header, "postgresql.auto.conf", NULL, - state->filesz + recoveryconfcontents->len, - pg_file_create_mode, 04000, 02000, - time(NULL)); - - writeTarData(state, header, sizeof(header)); - } - else - { - /* copy stream with padding */ - state->filesz += state->file_padding_len; - - if (!state->skip_file) - { - /* - * If we're not skipping the file, write the tar - * header unmodified. - */ - writeTarData(state, state->tarhdr, TAR_BLOCK_SIZE); - } - } - - /* Next part is the file, not the header */ - state->in_tarhdr = false; - } - } - else - { - /* - * We're processing a file's contents. - */ - if (state->filesz > 0) - { - /* - * We still have data to read (and possibly write). - */ - int bytes2write; - - bytes2write = (state->filesz > rr ? rr : state->filesz); - - if (!state->skip_file) - writeTarData(state, copybuf + pos, bytes2write); - - rr -= bytes2write; - pos += bytes2write; - state->filesz -= bytes2write; - } - else if (state->is_recovery_guc_supported && - state->is_postgresql_auto_conf && - writerecoveryconf) - { - /* append recovery config to postgresql.auto.conf */ - int padding; - int tailsize; - - tailsize = (TAR_BLOCK_SIZE - state->file_padding_len) + recoveryconfcontents->len; - padding = tarPaddingBytesRequired(tailsize); - - writeTarData(state, recoveryconfcontents->data, - recoveryconfcontents->len); - - if (padding) - { - char zerobuf[TAR_BLOCK_SIZE]; - - MemSet(zerobuf, 0, sizeof(zerobuf)); - writeTarData(state, zerobuf, padding); - } - - /* skip original file padding */ - state->is_postgresql_auto_conf = false; - state->skip_file = true; - state->filesz += state->file_padding_len; - - state->found_postgresql_auto_conf = true; - } - else - { - /* - * No more data in the current file, the next piece of - * data (if any) will be a new file header structure. - */ - state->in_tarhdr = true; - state->skip_file = false; - state->is_postgresql_auto_conf = false; - state->tarhdrsz = 0; - state->filesz = 0; - } - } - } - } totaldone += r; - progress_report(state->tablespacenum, state->filename, false, false); + progress_report(state->tablespacenum, false, false); } @@ -1501,242 +1182,6 @@ get_tablespace_mapping(const char *dir) return dir; } - -/* - * Receive a tar format stream from the connection to the server, and unpack - * the contents of it into a directory. Only files, directories and - * symlinks are supported, no other kinds of special files. - * - * If the data is for the main data directory, it will be restored in the - * specified directory. If it's for another tablespace, it will be restored - * in the original or mapped directory. - */ -static void -ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) -{ - UnpackTarState state; - bool basetablespace; - - memset(&state, 0, sizeof(state)); - state.tablespacenum = rownum; - - basetablespace = PQgetisnull(res, rownum, 0); - if (basetablespace) - strlcpy(state.current_path, basedir, sizeof(state.current_path)); - else - strlcpy(state.current_path, - get_tablespace_mapping(PQgetvalue(res, rownum, 1)), - sizeof(state.current_path)); - - ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state); - - - if (state.file) - fclose(state.file); - - progress_report(rownum, state.filename, true, false); - - if (state.file != NULL) - { - pg_log_error("COPY stream ended before last file was finished"); - exit(1); - } - - if (basetablespace && writerecoveryconf) - WriteRecoveryConfig(conn, basedir, recoveryconfcontents); - - /* - * No data is synced here, everything is done for all tablespaces at the - * end. - */ -} - -static void -ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data) -{ - UnpackTarState *state = callback_data; - - if (state->file == NULL) - { -#ifndef WIN32 - int filemode; -#endif - - /* - * No current file, so this must be the header for a new file - */ - if (r != TAR_BLOCK_SIZE) - { - pg_log_error("invalid tar block header size: %zu", r); - exit(1); - } - totaldone += TAR_BLOCK_SIZE; - - state->current_len_left = read_tar_number(©buf[124], 12); - -#ifndef WIN32 - /* Set permissions on the file */ - filemode = read_tar_number(©buf[100], 8); -#endif - - /* - * All files are padded up to a multiple of TAR_BLOCK_SIZE - */ - state->current_padding = - tarPaddingBytesRequired(state->current_len_left); - - /* - * First part of header is zero terminated filename - */ - snprintf(state->filename, sizeof(state->filename), - "%s/%s", state->current_path, copybuf); - if (state->filename[strlen(state->filename) - 1] == '/') - { - /* - * Ends in a slash means directory or symlink to directory - */ - if (copybuf[156] == '5') - { - /* - * Directory. Remove trailing slash first. - */ - state->filename[strlen(state->filename) - 1] = '\0'; - if (mkdir(state->filename, pg_dir_create_mode) != 0) - { - /* - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 - * clusters) will have been created by the wal receiver - * process. Also, when the WAL directory location was - * specified, pg_wal (or pg_xlog) has already been created - * as a symbolic link before starting the actual backup. - * So just ignore creation failures on related - * directories. - */ - if (!((pg_str_endswith(state->filename, "/pg_wal") || - pg_str_endswith(state->filename, "/pg_xlog") || - pg_str_endswith(state->filename, "/archive_status")) && - errno == EEXIST)) - { - pg_log_error("could not create directory \"%s\": %m", - state->filename); - exit(1); - } - } -#ifndef WIN32 - if (chmod(state->filename, (mode_t) filemode)) - { - pg_log_error("could not set permissions on directory \"%s\": %m", - state->filename); - exit(1); - } -#endif - } - else if (copybuf[156] == '2') - { - /* - * Symbolic link - * - * It's most likely a link in pg_tblspc directory, to the - * location of a tablespace. Apply any tablespace mapping - * given on the command line (--tablespace-mapping). (We - * blindly apply the mapping without checking that the link - * really is inside pg_tblspc. We don't expect there to be - * other symlinks in a data directory, but if there are, you - * can call it an undocumented feature that you can map them - * too.) - */ - state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */ - - state->mapped_tblspc_path = - get_tablespace_mapping(©buf[157]); - if (symlink(state->mapped_tblspc_path, state->filename) != 0) - { - pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", - state->filename, state->mapped_tblspc_path); - exit(1); - } - } - else - { - pg_log_error("unrecognized link indicator \"%c\"", - copybuf[156]); - exit(1); - } - return; /* directory or link handled */ - } - - /* - * regular file - */ - state->file = fopen(state->filename, "wb"); - if (!state->file) - { - pg_log_error("could not create file \"%s\": %m", state->filename); - exit(1); - } - -#ifndef WIN32 - if (chmod(state->filename, (mode_t) filemode)) - { - pg_log_error("could not set permissions on file \"%s\": %m", - state->filename); - exit(1); - } -#endif - - if (state->current_len_left == 0) - { - /* - * Done with this file, next one will be a new tar header - */ - fclose(state->file); - state->file = NULL; - return; - } - } /* new file */ - else - { - /* - * Continuing blocks in existing file - */ - if (state->current_len_left == 0 && r == state->current_padding) - { - /* - * Received the padding block for this file, ignore it and close - * the file, then move on to the next tar header. - */ - fclose(state->file); - state->file = NULL; - totaldone += r; - return; - } - - errno = 0; - if (fwrite(copybuf, r, 1, state->file) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_log_error("could not write to file \"%s\": %m", state->filename); - exit(1); - } - totaldone += r; - progress_report(state->tablespacenum, state->filename, false, false); - - state->current_len_left -= r; - if (state->current_len_left == 0 && state->current_padding == 0) - { - /* - * Received the last block, and there is no padding to be - * expected. Close the file and move on to the next tar header. - */ - fclose(state->file); - state->file = NULL; - return; - } - } /* continuing data in existing file */ -} - /* * Receive the backup manifest file and write it out to a file. */ @@ -2035,16 +1480,32 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* - * Start receiving chunks - */ + /* Receive a tar file for each tablespace in turn */ for (i = 0; i < PQntuples(res); i++) { - if (format == 't') - ReceiveTarFile(conn, res, i); + char archive_name[MAXPGPATH]; + char *spclocation; + + /* + * If we write the data out to a tar file, it will be named base.tar + * if it's the main data directory or .tar if it's for + * another tablespace. CreateBackupStreamer() will arrange to add .gz + * to the archive name if pg_basebackup is performing compression. + */ + if (PQgetisnull(res, i, 0)) + { + strlcpy(archive_name, "base.tar", sizeof(archive_name)); + spclocation = NULL; + } else - ReceiveAndUnpackTarFile(conn, res, i); - } /* Loop over all tablespaces */ + { + snprintf(archive_name, sizeof(archive_name), + "%s.tar", PQgetvalue(res, i, 0)); + spclocation = PQgetvalue(res, i, 1); + } + + ReceiveTarFile(conn, archive_name, spclocation, i); + } /* * Now receive backup manifest, if appropriate. @@ -2060,7 +1521,10 @@ BaseBackup(void) ReceiveBackupManifest(conn); if (showprogress) - progress_report(PQntuples(res), NULL, true, true); + { + progress_filename = NULL; + progress_report(PQntuples(res), true, true); + } PQclear(res);