Implement streaming xlog for backup tools

Add option for parallel streaming of the transaction log while a
base backup is running, to get the logfiles before the server has
removed them.

Also add a tool called pg_receivexlog, which streams the transaction
log into files, creating a log archive without having to wait for
segments to complete, thus decreasing the window of data loss without
having to waste space using archive_timeout. This works best in
combination with archive_command - suggested usage docs etc coming later.
This commit is contained in:
Magnus Hagander 2011-10-26 20:13:33 +02:00
parent 2b64f3f17a
commit d9bae53173
13 changed files with 1805 additions and 165 deletions

View File

@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY pgCtl SYSTEM "pg_ctl-ref.sgml">
<!ENTITY pgDump SYSTEM "pg_dump.sgml">
<!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml">
<!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml">
<!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml">
<!ENTITY pgRestore SYSTEM "pg_restore.sgml">
<!ENTITY postgres SYSTEM "postgres-ref.sgml">

View File

@ -143,8 +143,8 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
<term><option>-x</option></term>
<term><option>--xlog</option></term>
<term><option>-x <replaceable class="parameter">method</replaceable></option></term>
<term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
@ -154,16 +154,43 @@ PostgreSQL documentation
to consult the log archive, thus making this a completely standalone
backup.
</para>
<note>
<para>
The transaction log files are collected at the end of the backup.
Therefore, it is necessary for the
<xref linkend="guc-wal-keep-segments"> parameter to be set high
enough that the log is not removed before the end of the backup.
If the log has been rotated when it's time to transfer it, the
backup will fail and be unusable.
</para>
</note>
<para>
The following methods for collecting the transaction logs are
supported:
<variablelist>
<varlistentry>
<term><literal>f</literal></term>
<term><literal>fetch</literal></term>
<listitem>
<para>
The transaction log files are collected at the end of the backup.
Therefore, it is necessary for the
<xref linkend="guc-wal-keep-segments"> parameter to be set high
enough that the log is not removed before the end of the backup.
If the log has been rotated when it's time to transfer it, the
backup will fail and be unusable.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>s</literal></term>
<term><literal>stream</literal></term>
<listitem>
<para>
Stream the transaction log while the backup is created. This will
open a second connection to the server and start streaming the
transaction log in parallel while running the backup. Therefore,
it will use up two slots configured by the
<xref linkend="guc-max-wal-senders"> parameter. As long as the
client can keep up with transaction log received, using this mode
requires no extra transaction logs to be saved on the master.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
@ -260,6 +287,20 @@ PostgreSQL documentation
The following command-line options control the database connection parameters.
<variablelist>
<varlistentry>
<term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
<listitem>
<para>
Specifies the number of seconds between status packets sent back to the
server. This is required when streaming the transaction log (using
<literal>--xlog=stream</literal>) if replication timeout is configured
on the server, and allows for easier monitoring. The default value is
10 seconds.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-h <replaceable class="parameter">host</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">host</replaceable></option></term>

View File

@ -0,0 +1,270 @@
<!--
doc/src/sgml/ref/pg_receivexlog.sgml
PostgreSQL documentation
-->
<refentry id="app-pgreceivexlog">
<refmeta>
<refentrytitle>pg_receivexlog</refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo>Application</refmiscinfo>
</refmeta>
<refnamediv>
<refname>pg_receivexlog</refname>
<refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
</refnamediv>
<indexterm zone="app-pgreceivexlog">
<primary>pg_receivexlog</primary>
</indexterm>
<refsynopsisdiv>
<cmdsynopsis>
<command>pg_receivexlog</command>
<arg rep="repeat"><replaceable>option</></arg>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>
Description
</title>
<para>
<application>pg_receivexlog</application> is used to stream transaction log
from a running <productname>PostgreSQL</productname> cluster. The transaction
log is streamed using the streaming replication protocol, and is written
to a local directory of files. This directory can be used as the archive
location for doing a restore using point-in-time recovery (see
<xref linkend="continuous-archiving">).
</para>
<para>
<application>pg_receivexlog</application> streams the transaction
log in real time as it's being generated on the server, and does not wait
for segments to complete like <xref linkend="guc-archive-command"> does.
For this reason, it is not necessary to set
<xref linkend="guc-archive-timeout"> when using
<application>pg_receivexlog</application>.
</para>
<para>
The transaction log is streamed over a regular
<productname>PostgreSQL</productname> connection, and uses the
replication protocol. The connection must be
made with a user having <literal>REPLICATION</literal> permissions (see
<xref linkend="role-attributes">), and the user must be granted explicit
permissions in <filename>pg_hba.conf</filename>. The server must also
be configured with <xref linkend="guc-max-wal-senders"> set high enough
to leave at least one session available for the stream.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<para>
The following command-line options control the location and format of the
output.
<variablelist>
<varlistentry>
<term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
<term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
<listitem>
<para>
Directory to write the output to.
</para>
<para>
This parameter is required.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
The following command-line options control the running of the program.
<variablelist>
<varlistentry>
<term><option>-v</option></term>
<term><option>--verbose</option></term>
<listitem>
<para>
Enables verbose mode.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
The following command-line options control the database connection parameters.
<variablelist>
<varlistentry>
<term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
<listitem>
<para>
Specifies the number of seconds between status packets sent back to the
server. This is required if replication timeout is configured on the
server, and allows for easier monitoring. The default value is
10 seconds.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-h <replaceable class="parameter">host</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
<listitem>
<para>
Specifies the host name of the machine on which the server is
running. If the value begins with a slash, it is used as the
directory for the Unix domain socket. The default is taken
from the <envar>PGHOST</envar> environment variable, if set,
else a Unix domain socket connection is attempted.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-p <replaceable class="parameter">port</replaceable></option></term>
<term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
<listitem>
<para>
Specifies the TCP port or local Unix domain socket file
extension on which the server is listening for connections.
Defaults to the <envar>PGPORT</envar> environment variable, if
set, or a compiled-in default.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-U <replaceable>username</replaceable></option></term>
<term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
<listitem>
<para>
User name to connect as.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-w</></term>
<term><option>--no-password</></term>
<listitem>
<para>
Never issue a password prompt. If the server requires
password authentication and a password is not available by
other means such as a <filename>.pgpass</filename> file, the
connection attempt will fail. This option can be useful in
batch jobs and scripts where no user is present to enter a
password.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-W</option></term>
<term><option>--password</option></term>
<listitem>
<para>
Force <application>pg_receivexlog</application> to prompt for a
password before connecting to a database.
</para>
<para>
This option is never essential, since
<application>pg_receivexlog</application> will automatically prompt
for a password if the server demands password authentication.
However, <application>pg_receivexlog</application> will waste a
connection attempt finding out that the server wants a password.
In some cases it is worth typing <option>-W</> to avoid the extra
connection attempt.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
Other, less commonly used, parameters are also available:
<variablelist>
<varlistentry>
<term><option>-V</></term>
<term><option>--version</></term>
<listitem>
<para>
Print the <application>pg_receivexlog</application> version and exit.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-?</></term>
<term><option>--help</></term>
<listitem>
<para>
Show help about <application>pg_receivexlog</application> command line
arguments, and exit.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</refsect1>
<refsect1>
<title>Environment</title>
<para>
This utility, like most other <productname>PostgreSQL</> utilities,
uses the environment variables supported by <application>libpq</>
(see <xref linkend="libpq-envars">).
</para>
</refsect1>
<refsect1>
<title>Notes</title>
<para>
When using <application>pg_receivexlog</application> instead of
<xref linkend="guc-archive-command">, the server will continue to
recycle transaction log files even if the backups are not properly
archived, since there is no command that fails. This can be worked
around by having an <xref linkend="guc-archive-command"> that fails
when the file has not been properly archived yet.
</para>
</refsect1>
<refsect1>
<title>Examples</title>
<para>
To stream the transaction log from the server at
<literal>mydbserver</literal> and store it in the local directory
<filename>/usr/local/pgsql/archive</filename>:
<screen>
<prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /home/pgbackup/archive</userinput>
</screen>
</para>
</refsect1>
<refsect1>
<title>See Also</title>
<simplelist type="inline">
<member><xref linkend="APP-PGBASEBACKUP"></member>
</simplelist>
</refsect1>
</refentry>

View File

@ -220,6 +220,7 @@
&pgConfig;
&pgDump;
&pgDumpall;
&pgReceivexlog;
&pgRestore;
&psqlRef;
&reindexdb;

View File

@ -1 +1,2 @@
/pg_basebackup
/pg_receivexlog

View File

@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS= pg_basebackup.o $(WIN32RES)
OBJS=receivelog.o streamutil.o $(WIN32RES)
all: pg_basebackup
all: pg_basebackup pg_receivexlog
pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
clean distclean maintainer-clean:
rm -f pg_basebackup$(X) $(OBJS)
rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o

View File

@ -11,12 +11,20 @@
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
/*
* We have to use postgres.h not postgres_fe.h here, because there's so much
* backend-only stuff in the XLOG include files we need. But we need a
* frontend-ish environment otherwise. Hence this ugly hack.
*/
#define FRONTEND 1
#include "postgres.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
@ -24,9 +32,11 @@
#include "getopt_long.h"
#include "receivelog.h"
#include "streamutil.h"
/* Global options */
static const char *progname;
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
@ -34,38 +44,38 @@ bool showprogress = false;
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
bool streamwal = false;
bool fastcheckpoint = false;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
int standby_message_timeout = 10; /* 10 sec = default */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
/* Connection kept global so we can disconnect easily */
static PGconn *conn = NULL;
/* Pipe to communicate with background wal receiver process */
#ifndef WIN32
static int bgpipe[2] = {-1, -1};
#endif
#define disconnect_and_exit(code) \
{ \
if (conn != NULL) PQfinish(conn); \
exit(code); \
}
/* Handle to child process */
static pid_t bgchild = -1;
/* End position for xlog streaming, empty string if unknown yet */
static XLogRecPtr xlogendptr;
static int has_xlogendptr = 0;
/* Function headers */
static char *xstrdup(const char *s);
static void *xmalloc0(int size);
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
static PGconn *GetConnection(void);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
@ -81,39 +91,6 @@ get_gz_error(gzFile *gzf)
}
#endif
/*
* strdup() and malloc() replacements that prints an error and exits
* if something goes wrong. Can never return NULL.
*/
static char *
xstrdup(const char *s)
{
char *result;
result = strdup(s);
if (!result)
{
fprintf(stderr, _("%s: out of memory\n"), progname);
exit(1);
}
return result;
}
static void *
xmalloc0(int size)
{
void *result;
result = malloc(size);
if (!result)
{
fprintf(stderr, _("%s: out of memory\n"), progname);
exit(1);
}
MemSet(result, 0, size);
return result;
}
static void
usage(void)
@ -125,7 +102,7 @@ usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
printf(_(" -x, --xlog include required WAL files in backup\n"));
printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
@ -137,6 +114,7 @@ usage(void)
printf(_(" --help show this help, then exit\n"));
printf(_(" --version output version information, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
@ -146,6 +124,199 @@ usage(void)
}
/*
* Called in the background process whenever a complete segment of WAL
* has been received.
* On Unix, we check to see if there is any data on our pipe
* (which would mean we have a stop position), and if it is, check if
* it is time to stop.
* On Windows, we are in a single process, so we can just check if it's
* time to stop.
*/
static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline)
{
if (!has_xlogendptr)
{
#ifndef WIN32
fd_set fds;
struct timeval tv;
int r;
/*
* Don't have the end pointer yet - check our pipe to see if it has
* been sent yet.
*/
FD_ZERO(&fds);
FD_SET(bgpipe[0], &fds);
MemSet(&tv, 0, sizeof(tv));
r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
if (r == 1)
{
char xlogend[64];
MemSet(xlogend, 0, sizeof(xlogend));
r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
if (r < 0)
{
fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
progname, strerror(errno));
exit(1);
}
if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
{
fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
progname, xlogend);
exit(1);
}
has_xlogendptr = 1;
/*
* Fall through to check if we've reached the point further
* already.
*/
}
else
{
/*
* No data received on the pipe means we don't know the end
* position yet - so just say it's not time to stop yet.
*/
return false;
}
#else
/*
* On win32, has_xlogendptr is set by the main thread, so if it's not
* set here, we just go back and wait until it shows up.
*/
return false;
#endif
}
/*
* At this point we have an end pointer, so compare it to the current
* position to figure out if it's time to stop.
*/
if (segendpos.xlogid > xlogendptr.xlogid ||
(segendpos.xlogid == xlogendptr.xlogid &&
segendpos.xrecoff >= xlogendptr.xrecoff))
return true;
/*
* Have end pointer, but haven't reached it yet - so tell the caller to
* keep streaming.
*/
return false;
}
typedef struct
{
PGconn *bgconn;
XLogRecPtr startptr;
char xlogdir[MAXPGPATH];
char *sysidentifier;
int timeline;
} logstreamer_param;
static int
LogStreamerMain(logstreamer_param * param)
{
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
segment_callback, NULL, standby_message_timeout))
/*
* Any errors will already have been reported in the function process,
* but we need to tell the parent that we didn't shutdown in a nice
* way.
*/
return 1;
PQfinish(param->bgconn);
return 0;
}
/*
* Initiate background process for receiving xlog during the backup.
* The background stream will use its own database connection so we can
* stream the logfile in parallel with the backups.
*/
static void
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
{
logstreamer_param *param;
param = xmalloc0(sizeof(logstreamer_param));
param->timeline = timeline;
param->sysidentifier = sysidentifier;
/* Convert the starting position */
if (sscanf(startpos, "%X/%X", &param->startptr.xlogid, &param->startptr.xrecoff) != 2)
{
fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
progname, startpos);
disconnect_and_exit(1);
}
/* Round off to even segment position */
param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
#ifndef WIN32
/* Create our background pipe */
if (pgpipe(bgpipe) < 0)
{
fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
#endif
/* Get a second connection */
param->bgconn = GetConnection();
/*
* Always in plain format, so we can write to basedir/pg_xlog. But the
* directory entry in the tar file may arrive later, so make sure it's
* created before we start.
*/
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
verify_dir_is_empty_or_create(param->xlogdir);
/*
* Start a child process and tell it to start streaming. On Unix, this is
* a fork(). On Windows, we create a thread.
*/
#ifndef WIN32
bgchild = fork();
if (bgchild == 0)
{
/* in child process */
exit(LogStreamerMain(param));
}
else if (bgchild < 0)
{
fprintf(stderr, _("%s: could not create background process: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
/*
* Else we are in the parent process and all is well.
*/
#else /* WIN32 */
bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
if (bgchild == 0)
{
fprintf(stderr, _("%s: could not create background thread: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
#endif
}
/*
* Verify that the given directory exists and is empty. If it does not
* exist, it is created. If it exists but is not empty, an error will
@ -502,11 +673,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
else
strcpy(current_path, PQgetvalue(res, rownum, 1));
/*
* Make sure we're unpacking into an empty directory
*/
verify_dir_is_empty_or_create(current_path);
/*
* Get the COPY data
*/
@ -597,13 +763,21 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Directory
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
fprintf(stderr,
/*
* When streaming WAL, pg_xlog will have been created
* by the wal receiver process, so just ignore failure
* on that.
*/
if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
progname, filename, strerror(errno));
disconnect_and_exit(1);
progname, filename, strerror(errno));
disconnect_and_exit(1);
}
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
@ -616,12 +790,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Symbolic link
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(&copybuf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
progname, filename, &copybuf[157], strerror(errno));
progname, filename, &copybuf[157], strerror(errno));
disconnect_and_exit(1);
}
}
@ -714,94 +888,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
}
static PGconn *
GetConnection(void)
{
PGconn *tmpconn;
int argcount = 4; /* dbname, replication, fallback_app_name,
* password */
int i;
const char **keywords;
const char **values;
char *password = NULL;
if (dbhost)
argcount++;
if (dbuser)
argcount++;
if (dbport)
argcount++;
keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
values = xmalloc0((argcount + 1) * sizeof(*values));
keywords[0] = "dbname";
values[0] = "replication";
keywords[1] = "replication";
values[1] = "true";
keywords[2] = "fallback_application_name";
values[2] = progname;
i = 3;
if (dbhost)
{
keywords[i] = "host";
values[i] = dbhost;
i++;
}
if (dbuser)
{
keywords[i] = "user";
values[i] = dbuser;
i++;
}
if (dbport)
{
keywords[i] = "port";
values[i] = dbport;
i++;
}
while (true)
{
if (dbgetpassword == 1)
{
/* Prompt for a password */
password = simple_prompt(_("Password: "), 100, false);
keywords[argcount - 1] = "password";
values[argcount - 1] = password;
}
tmpconn = PQconnectdbParams(keywords, values, true);
if (password)
free(password);
if (PQstatus(tmpconn) == CONNECTION_BAD &&
PQconnectionNeedsPassword(tmpconn) &&
dbgetpassword != -1)
{
dbgetpassword = 1; /* ask for password next time */
PQfinish(tmpconn);
continue;
}
if (PQstatus(tmpconn) != CONNECTION_OK)
{
fprintf(stderr, _("%s: could not connect to server: %s"),
progname, PQerrorMessage(tmpconn));
exit(1);
}
/* Connection ok! */
free(values);
free(keywords);
return tmpconn;
}
}
static void
BaseBackup(void)
{
PGresult *res;
char *sysidentifier;
uint32 timeline;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
@ -813,6 +905,26 @@ BaseBackup(void)
*/
conn = GetConnection();
/*
* Run IDENTIFY_SYSTEM so we can get the timeline
*/
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not identify system: %s\n"),
progname, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1)
{
fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
progname, PQntuples(res));
disconnect_and_exit(1);
}
sysidentifier = strdup(PQgetvalue(res, 0, 0));
timeline = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
/*
* Start the actual backup
*/
@ -820,7 +932,7 @@ BaseBackup(void)
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal ? "WAL" : "",
includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
@ -898,6 +1010,18 @@ BaseBackup(void)
disconnect_and_exit(1);
}
/*
* If we're streaming WAL, start the streaming session before we start
* receiving the actual data chunks.
*/
if (streamwal)
{
if (verbose)
fprintf(stderr, _("%s: starting background WAL receiver\n"),
progname);
StartLogStreamer(xlogstart, timeline, sysidentifier);
}
/*
* Start receiving chunks
*/
@ -945,6 +1069,92 @@ BaseBackup(void)
disconnect_and_exit(1);
}
if (bgchild > 0)
{
int status;
#ifndef WIN32
int r;
#endif
if (verbose)
fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
#ifndef WIN32
if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
{
fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
/* Just wait for the background process to exit */
r = waitpid(bgchild, &status, 0);
if (r == -1)
{
fprintf(stderr, _("%s: could not wait for child process: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
if (r != bgchild)
{
fprintf(stderr, _("%s: child %i died, expected %i\n"),
progname, r, bgchild);
disconnect_and_exit(1);
}
if (!WIFEXITED(status))
{
fprintf(stderr, _("%s: child process did not exit normally\n"),
progname);
disconnect_and_exit(1);
}
if (WEXITSTATUS(status) != 0)
{
fprintf(stderr, _("%s: child process exited with error %i\n"),
progname, WEXITSTATUS(status));
disconnect_and_exit(1);
}
/* Exited normally, we're happy! */
#else /* WIN32 */
/*
* On Windows, since we are in the same process, we can just store the
* value directly in the variable, and then set the flag that says
* it's there.
*/
if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
{
fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
progname, xlogend);
exit(1);
}
InterlockedIncrement(&has_xlogendptr);
/* First wait for the thread to exit */
if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
{
_dosmaperr(GetLastError());
fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
{
_dosmaperr(GetLastError());
fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
progname, strerror(errno));
disconnect_and_exit(1);
}
if (status != 0)
{
fprintf(stderr, _("%s: child thread exited with error %u\n"),
progname, status);
disconnect_and_exit(1);
}
/* Exited normally, we're happy */
#endif
}
/*
* End of copy data. Final result is already checked inside the loop.
*/
@ -964,7 +1174,7 @@ main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
{"xlog", no_argument, NULL, 'x'},
{"xlog", required_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
@ -973,6 +1183,7 @@ main(int argc, char **argv)
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"statusint", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"progress", no_argument, NULL, 'P'},
{NULL, 0, NULL, 0}
@ -999,7 +1210,7 @@ main(int argc, char **argv)
}
}
while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
@ -1021,6 +1232,18 @@ main(int argc, char **argv)
break;
case 'x':
includewal = true;
if (strcmp(optarg, "f") == 0 ||
strcmp(optarg, "fetch") == 0)
streamwal = false;
else if (strcmp(optarg, "s") == 0 ||
strcmp(optarg, "stream") == 0)
streamwal = true;
else
{
fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
progname, optarg);
exit(1);
}
break;
case 'l':
label = xstrdup(optarg);
@ -1068,6 +1291,15 @@ main(int argc, char **argv)
case 'W':
dbgetpassword = 1;
break;
case 's':
standby_message_timeout = atoi(optarg);
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'v':
verbose++;
break;
@ -1122,6 +1354,16 @@ main(int argc, char **argv)
exit(1);
}
if (format != 'p' && streamwal)
{
fprintf(stderr,
_("%s: wal streaming can only be used in plain mode\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{

View File

@ -0,0 +1,465 @@
/*-------------------------------------------------------------------------
*
* pg_receivexlog.c - receive streaming transaction log data and write it
* to a local file.
*
* Author: Magnus Hagander <magnus@hagander.net>
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/pg_receivexlog.c
*-------------------------------------------------------------------------
*/
/*
* We have to use postgres.h not postgres_fe.h here, because there's so much
* backend-only stuff in the XLOG include files we need. But we need a
* frontend-ish environment otherwise. Hence this ugly hack.
*/
#define FRONTEND 1
#include "postgres.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "access/xlog_internal.h"
#include "receivelog.h"
#include "streamutil.h"
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "getopt_long.h"
/* Global options */
char *basedir = NULL;
int verbose = 0;
int standby_message_timeout = 10; /* 10 sec = default */
volatile bool time_to_abort = false;
static void usage(void);
static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
static void StreamLog();
static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
static void
usage(void)
{
printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
progname);
printf(_("Usage:\n"));
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --dir=directory receive xlog files into this directory\n"));
printf(_("\nGeneral options:\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline)
{
char fn[MAXPGPATH];
struct stat statbuf;
if (verbose)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, segendpos.xlogid, segendpos.xrecoff, timeline);
/*
* Check if there is a partial file for the name we just finished, and if
* there is, remove it under the assumption that we have now got all the
* data we need.
*/
segendpos.xrecoff /= XLOG_SEG_SIZE;
PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
basedir, timeline,
segendpos.xlogid,
segendpos.xrecoff);
if (stat(fn, &statbuf) == 0)
{
/* File existed, get rid of it */
if (verbose)
fprintf(stderr, _("%s: removing file \"%s\"\n"),
progname, fn);
unlink(fn);
}
/*
* Never abort from this - we handle all aborting in continue_streaming()
*/
return false;
}
static bool
continue_streaming(void)
{
if (time_to_abort)
{
fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
progname);
return true;
}
return false;
}
/*
* Determine starting location for streaming, based on:
* 1. If there are existing xlog segments, start at the end of the last one
* 2. If the last one is a partial segment, rename it and start over, since
* we don't sync after every write.
* 3. If no existing xlog exists, start from the beginning of the current
* WAL segment.
*/
static XLogRecPtr
FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
{
DIR *dir;
struct dirent *dirent;
int i;
bool b;
uint32 high_log = 0;
uint32 high_seg = 0;
bool partial = false;
dir = opendir(basedir);
if (dir == NULL)
{
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
progname, basedir, strerror(errno));
disconnect_and_exit(1);
}
while ((dirent = readdir(dir)) != NULL)
{
char fullpath[MAXPGPATH];
struct stat statbuf;
uint32 tli,
log,
seg;
if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
continue;
/* xlog files are always 24 characters */
if (strlen(dirent->d_name) != 24)
continue;
/* Filenames are always made out of 0-9 and A-F */
b = false;
for (i = 0; i < 24; i++)
{
if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
{
b = true;
break;
}
}
if (b)
continue;
/*
* Looks like an xlog file. Parse its position.
*/
if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
{
fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
progname, dirent->d_name);
disconnect_and_exit(1);
}
/* Ignore any files that are for another timeline */
if (tli != currenttimeline)
continue;
/* Check if this is a completed segment or not */
snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
if (stat(fullpath, &statbuf) != 0)
{
fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
progname, fullpath, strerror(errno));
disconnect_and_exit(1);
}
if (statbuf.st_size == 16 * 1024 * 1024)
{
/* Completed segment */
if (log > high_log ||
(log == high_log && seg > high_seg))
{
high_log = log;
high_seg = seg;
continue;
}
}
else
{
/*
* This is a partial file. Rename it out of the way.
*/
char newfn[MAXPGPATH];
fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
progname, dirent->d_name, dirent->d_name);
snprintf(newfn, sizeof(newfn), "%s/%s.partial",
basedir, dirent->d_name);
if (stat(newfn, &statbuf) == 0)
{
/*
* XXX: perhaps we should only error out if the existing file
* is larger?
*/
fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
progname, newfn);
disconnect_and_exit(1);
}
if (rename(fullpath, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
progname, fullpath, newfn, strerror(errno));
disconnect_and_exit(1);
}
/* Don't continue looking for more, we assume this is the last */
partial = true;
break;
}
}
closedir(dir);
if (high_log > 0 || high_seg > 0)
{
XLogRecPtr high_ptr;
if (!partial)
{
/*
* If the segment was partial, the pointer is already at the right
* location since we want to re-transmit that segment. If it was
* not, we need to move it to the next segment, since we are
* tracking the last one that was complete.
*/
NextLogSeg(high_log, high_seg);
}
high_ptr.xlogid = high_log;
high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
return high_ptr;
}
else
return currentpos;
}
/*
* Start the log streaming
*/
static void
StreamLog(void)
{
PGresult *res;
uint32 timeline;
XLogRecPtr startpos;
/*
* Connect in replication mode to the server
*/
conn = GetConnection();
/*
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
* position.
*/
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not identify system: %s\n"),
progname, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1)
{
fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
progname, PQntuples(res));
disconnect_and_exit(1);
}
timeline = atoi(PQgetvalue(res, 0, 1));
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
{
fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
progname, PQgetvalue(res, 0, 2));
disconnect_and_exit(1);
}
PQclear(res);
/*
* Figure out where to start streaming.
*/
startpos = FindStreamingStart(startpos, timeline);
/*
* Always start streaming at the beginning of a segment
*/
startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
/*
* Start the replication
*/
if (verbose)
fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
progname, startpos.xlogid, startpos.xrecoff, timeline);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
segment_callback, continue_streaming,
standby_message_timeout);
}
/*
* When sigint is called, just tell the system to exit at the next possible
* moment.
*/
static void
sigint_handler(int signum)
{
time_to_abort = true;
}
int
main(int argc, char **argv)
{
static struct option long_options[] = {
{"help", no_argument, NULL, '?'},
{"version", no_argument, NULL, 'V'},
{"dir", required_argument, NULL, 'D'},
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"statusint", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
};
int c;
int option_index;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
usage();
exit(0);
}
else if (strcmp(argv[1], "-V") == 0
|| strcmp(argv[1], "--version") == 0)
{
puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
exit(0);
}
}
while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
long_options, &option_index)) != -1)
{
switch (c)
{
case 'D':
basedir = xstrdup(optarg);
break;
case 'h':
dbhost = xstrdup(optarg);
break;
case 'p':
if (atoi(optarg) <= 0)
{
fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
progname, optarg);
exit(1);
}
dbport = xstrdup(optarg);
break;
case 'U':
dbuser = xstrdup(optarg);
break;
case 'w':
dbgetpassword = -1;
break;
case 'W':
dbgetpassword = 1;
break;
case 's':
standby_message_timeout = atoi(optarg);
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'v':
verbose++;
break;
default:
/*
* getopt_long already emitted a complaint
*/
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
}
/*
* Any non-option arguments?
*/
if (optind < argc)
{
fprintf(stderr,
_("%s: too many command-line arguments (first is \"%s\")\n"),
progname, argv[optind]);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
/*
* Required arguments
*/
if (basedir == NULL)
{
fprintf(stderr, _("%s: no target directory specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
#endif
StreamLog();
exit(0);
}

View File

@ -0,0 +1,398 @@
/*-------------------------------------------------------------------------
*
* receivelog.c - receive transaction log files using the streaming
* replication protocol.
*
* Author: Magnus Hagander <magnus@hagander.net>
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/receivelog.c
*-------------------------------------------------------------------------
*/
/*
* We have to use postgres.h not postgres_fe.h here, because there's so much
* backend-only stuff in the XLOG include files we need. But we need a
* frontend-ish environment otherwise. Hence this ugly hack.
*/
#define FRONTEND 1
#include "postgres.h"
#include "libpq-fe.h"
#include "access/xlog_internal.h"
#include "replication/walprotocol.h"
#include "utils/datetime.h"
#include "receivelog.h"
#include "streamutil.h"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
/* Size of the streaming replication protocol header */
#define STREAMING_HEADER_SIZE (1+8+8+8)
const XLogRecPtr InvalidXLogRecPtr = {0, 0};
/*
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer...
*/
static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
{
int f;
char fn[MAXPGPATH];
XLogFileName(namebuf, timeline, startpoint.xlogid,
startpoint.xrecoff / XLOG_SEG_SIZE);
snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
if (f == -1)
fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
progname, namebuf, strerror(errno));
return f;
}
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
* backend code.
*/
static TimestampTz
localGetCurrentTimestamp(void)
{
TimestampTz result;
struct timeval tp;
gettimeofday(&tp, NULL);
result = (TimestampTz) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
#ifdef HAVE_INT64_TIMESTAMP
result = (result * USECS_PER_SEC) + tp.tv_usec;
#else
result = result + (tp.tv_usec / 1000000.0);
#endif
return result;
}
/*
* Receive a log stream starting at the specified position.
*
* If sysidentifier is specified, validate that both the system
* identifier and the timeline matches the specified ones
* (by sending an extra IDENTIFY_SYSTEM command)
*
* All received segments will be written to the directory
* specified by basedir.
*
* The segment_finish callback will be called after each segment
* has been finished, and the stream_continue callback will be
* called every time data is received. If either of these callbacks
* return true, the streaming will stop and the function
* return. As long as they return false, streaming will continue
* indefinitely.
*
* standby_message_timeout controls how often we send a message
* back to the master letting it know our progress, in seconds.
* This message will only contain the write location, and never
* flush or replay.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
{
char query[128];
char current_walfile_name[MAXPGPATH];
PGresult *res;
char *copybuf = NULL;
int walfile = -1;
int64 last_status = -1;
XLogRecPtr blockpos = InvalidXLogRecPtr;
if (sysidentifier != NULL)
{
/* Validate system identifier and timeline hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not identify system: %s\n"),
progname, PQerrorMessage(conn));
PQclear(res);
return false;
}
if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname);
PQclear(res);
return false;
}
if (timeline != atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname);
PQclear(res);
return false;
}
PQclear(res);
}
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not start replication: %s\n"),
progname, PQresultErrorMessage(res));
return false;
}
PQclear(res);
/*
* Receive the actual xlog data
*/
while (1)
{
int r;
int xlogoff;
int bytes_left;
int bytes_written;
int64 now;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
/*
* Check if we should continue streaming, or abort at this point.
*/
if (stream_continue && stream_continue())
{
if (walfile != -1)
{
fsync(walfile);
close(walfile);
}
return true;
}
/*
* Potentially send a status message to the master
*/
now = localGetCurrentTimestamp();
if (standby_message_timeout > 0 &&
last_status < now - standby_message_timeout * 1000000)
{
/* Time to send feedback! */
char replybuf[sizeof(StandbyReplyMessage) + 1];
StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
replymsg->write = blockpos;
replymsg->flush = InvalidXLogRecPtr;
replymsg->apply = InvalidXLogRecPtr;
replymsg->sendTime = now;
replybuf[0] = 'r';
if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
PQflush(conn))
{
fprintf(stderr, _("%s: could not send feedback packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
last_status = now;
}
r = PQgetCopyData(conn, &copybuf, 1);
if (r == 0)
{
/*
* In async mode, and no data available. We block on reading but
* not more than the specified timeout, so that we can send a
* response back to the client.
*/
fd_set input_mask;
struct timeval timeout;
struct timeval *timeoutptr;
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
if (standby_message_timeout)
{
timeout.tv_sec = last_status + standby_message_timeout - now - 1;
if (timeout.tv_sec <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
timeout.tv_usec = 0;
timeoutptr = &timeout;
}
else
timeoutptr = NULL;
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
if (r == 0 || (r < 0 && errno == EINTR))
{
/*
* Got a timeout or signal. Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking.
*/
continue;
}
else if (r < 0)
{
fprintf(stderr, _("%s: select() failed: %m\n"), progname);
return false;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
progname, PQerrorMessage(conn));
return false;
}
continue;
}
if (r == -1)
/* End of copy stream */
break;
if (r == -2)
{
fprintf(stderr, _("%s: could not read copy data: %s\n"),
progname, PQerrorMessage(conn));
return false;
}
if (r < STREAMING_HEADER_SIZE + 1)
{
fprintf(stderr, _("%s: streaming header too small: %i\n"),
progname, r);
return false;
}
if (copybuf[0] != 'w')
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
return false;
}
/* Extract WAL location for this block */
memcpy(&blockpos, copybuf + 1, 8);
xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
/*
* Verify that the initial location in the stream matches where we
* think we are.
*/
if (walfile == -1)
{
/* No file open yet */
if (xlogoff != 0)
{
fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
progname, xlogoff);
return false;
}
}
else
{
/* More data in existing segment */
/* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
return false;
}
}
bytes_left = r - STREAMING_HEADER_SIZE;
bytes_written = 0;
while (bytes_left)
{
int bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach
* XLOG_SEG_SIZE.
*/
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
else
bytes_to_write = bytes_left;
if (walfile == -1)
{
walfile = open_walfile(blockpos, timeline,
basedir, current_walfile_name);
if (walfile == -1)
/* Error logged by open_walfile */
return false;
}
if (write(walfile,
copybuf + STREAMING_HEADER_SIZE + bytes_written,
bytes_to_write) != bytes_to_write)
{
fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
progname,
bytes_to_write,
current_walfile_name,
strerror(errno));
return false;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
XLByteAdvance(blockpos, bytes_to_write);
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
fsync(walfile);
close(walfile);
walfile = -1;
xlogoff = 0;
if (segment_finish != NULL)
{
/*
* Callback when the segment finished, and return if it
* told us to.
*/
if (segment_finish(blockpos, timeline))
return true;
}
}
}
/* No more data left to write, start receiving next copy packet */
}
/*
* The only way to get out of the loop is if the server shut down the
* replication stream. If it's a controlled shutdown, the server will send
* a shutdown message, and we'll return the latest xlog location that has
* been streamed.
*/
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
progname, PQresultErrorMessage(res));
return false;
}
PQclear(res);
return true;
}

View File

@ -0,0 +1,22 @@
#include "access/xlogdefs.h"
/*
* Called whenever a segment is finished, return true to stop
* the streaming at this point.
*/
typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
/*
* Called before trying to read more data. Return true to stop
* the streaming at this point.
*/
typedef bool (*stream_continue_callback)(void);
extern bool ReceiveXlogStream(PGconn *conn,
XLogRecPtr startpos,
uint32 timeline,
char *sysidentifier,
char *basedir,
segment_finish_callback segment_finish,
stream_continue_callback stream_continue,
int standby_message_timeout);

View File

@ -0,0 +1,165 @@
/*-------------------------------------------------------------------------
*
* streamutil.c - utility functions for pg_basebackup and pg_receivelog
*
* Author: Magnus Hagander <magnus@hagander.net>
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/streamutil.c
*-------------------------------------------------------------------------
*/
/*
* We have to use postgres.h not postgres_fe.h here, because there's so much
* backend-only stuff in the XLOG include files we need. But we need a
* frontend-ish environment otherwise. Hence this ugly hack.
*/
#define FRONTEND 1
#include "postgres.h"
#include "streamutil.h"
#include <stdio.h>
#include <string.h>
const char *progname;
char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;
/*
* strdup() and malloc() replacements that prints an error and exits
* if something goes wrong. Can never return NULL.
*/
char *
xstrdup(const char *s)
{
char *result;
result = strdup(s);
if (!result)
{
fprintf(stderr, _("%s: out of memory\n"), progname);
exit(1);
}
return result;
}
void *
xmalloc0(int size)
{
void *result;
result = malloc(size);
if (!result)
{
fprintf(stderr, _("%s: out of memory\n"), progname);
exit(1);
}
MemSet(result, 0, size);
return result;
}
PGconn *
GetConnection(void)
{
PGconn *tmpconn;
int argcount = 4; /* dbname, replication, fallback_app_name,
* password */
int i;
const char **keywords;
const char **values;
char *password = NULL;
if (dbhost)
argcount++;
if (dbuser)
argcount++;
if (dbport)
argcount++;
keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
values = xmalloc0((argcount + 1) * sizeof(*values));
keywords[0] = "dbname";
values[0] = "replication";
keywords[1] = "replication";
values[1] = "true";
keywords[2] = "fallback_application_name";
values[2] = progname;
i = 3;
if (dbhost)
{
keywords[i] = "host";
values[i] = dbhost;
i++;
}
if (dbuser)
{
keywords[i] = "user";
values[i] = dbuser;
i++;
}
if (dbport)
{
keywords[i] = "port";
values[i] = dbport;
i++;
}
while (true)
{
if (password)
free(password);
if (dbpassword)
{
/*
* We've saved a password when a previous connection succeeded,
* meaning this is the call for a second session to the same
* database, so just forcibly reuse that password.
*/
keywords[argcount - 1] = "password";
values[argcount - 1] = dbpassword;
dbgetpassword = -1; /* Don't try again if this fails */
}
else if (dbgetpassword == 1)
{
password = simple_prompt(_("Password: "), 100, false);
keywords[argcount - 1] = "password";
values[argcount - 1] = password;
}
tmpconn = PQconnectdbParams(keywords, values, true);
if (PQstatus(tmpconn) == CONNECTION_BAD &&
PQconnectionNeedsPassword(tmpconn) &&
dbgetpassword != -1)
{
dbgetpassword = 1; /* ask for password next time */
PQfinish(tmpconn);
continue;
}
if (PQstatus(tmpconn) != CONNECTION_OK)
{
fprintf(stderr, _("%s: could not connect to server: %s\n"),
progname, PQerrorMessage(tmpconn));
exit(1);
}
/* Connection ok! */
free(values);
free(keywords);
/* Store the password for next run */
if (password)
dbpassword = password;
return tmpconn;
}
}

View File

@ -0,0 +1,22 @@
#include "libpq-fe.h"
extern const char *progname;
extern char *dbhost;
extern char *dbuser;
extern char *dbport;
extern int dbgetpassword;
/* Connection kept global so we can disconnect easily */
extern PGconn *conn;
#define disconnect_and_exit(code) \
{ \
if (conn != NULL) PQfinish(conn); \
exit(code); \
}
char *xstrdup(const char *s);
void *xmalloc0(int size);
PGconn *GetConnection(void);

View File

@ -305,6 +305,13 @@ sub mkvcbuild
$initdb->AddLibrary('ws2_32.lib');
my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
$pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
$pgbasebackup->AddLibrary('ws2_32.lib');
my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
$pgreceivexlog->{name} = 'pg_receivexlog';
$pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
$pgreceivexlog->AddLibrary('ws2_32.lib');
my $pgconfig = AddSimpleFrontend('pg_config');