Add pg_recvlogical, a tool to receive data logical decoding data.

This is fairly basic at the moment, but it's at least useful for
testing and debugging, and possibly more.

Andres Freund
This commit is contained in:
Robert Haas 2014-03-18 12:19:57 -04:00
parent 250f8a7bbe
commit 8bdd12bbf0
8 changed files with 1135 additions and 125 deletions

View File

@ -1,2 +1,3 @@
/pg_basebackup
/pg_receivexlog
/pg_recvlogical

View File

@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS=receivelog.o streamutil.o $(WIN32RES)
all: pg_basebackup pg_receivexlog
all: pg_basebackup pg_receivexlog pg_recvlogical
pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
@ -38,6 +42,9 @@ installdirs:
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
clean distclean maintainer-clean:
rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) \
pg_basebackup.o pg_receivexlog.o pg_recvlogical.o \
$(OBJS)

View File

@ -1,4 +1,4 @@
# src/bin/pg_basebackup/nls.mk
CATALOG_NAME = pg_basebackup
AVAIL_LANGUAGES = cs de es fr it ja pl pt_BR ru zh_CN
GETTEXT_FILES = pg_basebackup.c pg_receivexlog.c receivelog.c streamutil.c ../../common/fe_memutils.c
GETTEXT_FILES = pg_basebackup.c pg_receivexlog.c pg_recvlogical.c receivelog.c streamutil.c ../../common/fe_memutils.c

View File

@ -0,0 +1,978 @@
/*-------------------------------------------------------------------------
*
* pg_recvlogical.c - receive data from a logical decoding slot in a streaming fashion
* and write it to to a local file.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/pg_recvlogical.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <dirent.h>
#include <sys/stat.h>
#include <unistd.h>
/* local includes */
#include "streamutil.h"
#include "access/xlog_internal.h"
#include "common/fe_memutils.h"
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
#include "pqexpbuffer.h"
/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool do_start_slot = false;
static bool do_drop_slot = false;
/* filled pairwise with option, value. value may be NULL */
static char **options;
static size_t noptions = 0;
static const char *plugin = "test_decoding";
/* Global State */
static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static int64 output_last_fsync = -1;
static bool output_unsynced = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLog();
static void disconnect_and_exit(int code);
static void
usage(void)
{
printf(_("%s receives PostgreSQL logical change stream.\n\n"),
progname);
printf(_("Usage:\n"));
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n"));
printf(_(" -f, --file=FILE receive log into this file. - for stdout\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=DBNAME database to connect to\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_("\nReplication options:\n"));
printf(_(" -F --fsync-interval=INTERVAL\n"
" frequency of syncs to the output file (in seconds, defaults to 10)\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" Specify option NAME with optional value VAL, to be passed\n"
" to the output plugin\n"));
printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (defaults to test_decoding)\n"));
printf(_(" -s, --status-interval=INTERVAL\n"
" time between status packets sent to server (in seconds, defaults to 10)\n"));
printf(_(" -S, --slot=SLOT use existing replication slot SLOT instead of starting a new one\n"));
printf(_(" -I, --startpos=PTR Where in an existing slot should the streaming start\n"));
printf(_("\nAction to be performed:\n"));
printf(_(" --create create a new replication slot (for the slotname see --slot)\n"));
printf(_(" --start start streaming in a replication slot (for the slotname see --slot)\n"));
printf(_(" --drop drop the replication slot (for the slotname see --slot)\n"));
printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
}
/*
* Send a Standby Status Update message to server.
*/
static bool
sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
{
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
/*
* we normally don't want to send superflous feedbacks, but if it's
* because of a timeout we need to, otherwise wal_sender_timeout will
* kill us.
*/
if (!force &&
last_written_lsn == output_written_lsn &&
last_fsync_lsn != output_fsync_lsn)
return true;
if (verbose)
fprintf(stderr,
_("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
progname,
(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
replication_slot);
replybuf[len] = 'r';
len += 1;
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
len += 8;
fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
len += 8;
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
startpos = output_written_lsn;
last_written_lsn = output_written_lsn;
last_fsync_lsn = output_fsync_lsn;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send feedback packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
return true;
}
static void
disconnect_and_exit(int code)
{
if (conn != NULL)
PQfinish(conn);
exit(code);
}
static bool
OutputFsync(int64 now)
{
output_last_fsync = now;
output_fsync_lsn = output_written_lsn;
if (fsync_interval <= 0)
return true;
if (!output_unsynced)
return true;
output_unsynced = false;
/* Accept EINVAL, in case output is writing to a pipe or similar. */
if (fsync(outfd) != 0 && errno != EINVAL)
{
fprintf(stderr,
_("%s: could not fsync log file \"%s\": %s\n"),
progname, outfile, strerror(errno));
return false;
}
return true;
}
/*
* Start the log streaming
*/
static void
StreamLog(void)
{
PGresult *res;
char *copybuf = NULL;
int64 last_status = -1;
int i;
PQExpBuffer query;
output_written_lsn = InvalidXLogRecPtr;
output_fsync_lsn = InvalidXLogRecPtr;
query = createPQExpBuffer();
/*
* Connect in replication mode to the server
*/
if (!conn)
conn = GetConnection();
if (!conn)
/* Error message already written in GetConnection() */
return;
/*
* Start the replication
*/
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (slot %s)\n"),
progname, (uint32) (startpos >> 32), (uint32) startpos,
replication_slot);
/* Initiate the replication stream at specified location */
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
/* print options if there are any */
if (noptions)
appendPQExpBufferStr(query, " (");
for (i = 0; i < noptions; i++)
{
/* separator */
if (i > 0)
appendPQExpBufferStr(query, ", ");
/* write option name */
appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
/* write option value if specified */
if (options[(i * 2) + 1] != NULL)
appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
}
if (noptions)
appendPQExpBufferChar(query, ')');
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
progname, query->data, PQresultErrorMessage(res));
PQclear(res);
goto error;
}
PQclear(res);
resetPQExpBuffer(query);
if (verbose)
fprintf(stderr,
_("%s: initiated streaming\n"),
progname);
while (!time_to_abort)
{
int r;
int bytes_left;
int bytes_written;
int64 now;
int hdr_len;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
/*
* Potentially send a status message to the master
*/
now = feGetCurrentTimestamp();
if (outfd != -1 &&
feTimestampDifferenceExceeds(output_last_fsync, now,
fsync_interval))
{
if (!OutputFsync(now))
goto error;
}
if (standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, now, true, false))
goto error;
last_status = now;
}
r = PQgetCopyData(conn, &copybuf, 1);
if (r == 0)
{
/*
* In async mode, and no data available. We block on reading but
* not more than the specified timeout, so that we can send a
* response back to the client.
*/
fd_set input_mask;
int64 message_target = 0;
int64 fsync_target = 0;
struct timeval timeout;
struct timeval *timeoutptr;
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
/* Compute when we need to wakeup to send a keepalive message. */
if (standby_message_timeout)
message_target = last_status + (standby_message_timeout - 1) *
((int64) 1000);
/* Compute when we need to wakeup to fsync the output file. */
if (fsync_interval > 0 && output_unsynced)
fsync_target = output_last_fsync + (fsync_interval - 1) *
((int64) 1000);
/* Now compute when to wakeup. */
if (message_target > 0 || fsync_target > 0)
{
int64 targettime;
long secs;
int usecs;
targettime = message_target;
if (fsync_target > 0 && fsync_target < targettime)
targettime = fsync_target;
feTimestampDifference(now,
targettime,
&secs,
&usecs);
if (secs <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
else
timeout.tv_sec = secs;
timeout.tv_usec = usecs;
timeoutptr = &timeout;
}
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
if (r == 0 || (r < 0 && errno == EINTR))
{
/*
* Got a timeout or signal. Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking.
*/
continue;
}
else if (r < 0)
{
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn));
goto error;
}
continue;
}
/* End of copy stream */
if (r == -1)
break;
/* Failure while reading the copy stream */
if (r == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn));
goto error;
}
/* Check the message type. */
if (copybuf[0] == 'k')
{
int pos;
bool replyRequested;
XLogRecPtr walEnd;
/*
* Parse the keepalive message, enclosed in the CopyData message.
* We just check if the server requested a reply, and ignore the
* rest.
*/
pos = 1; /* skip msgtype 'k' */
walEnd = fe_recvint64(&copybuf[pos]);
output_written_lsn = Max(walEnd, output_written_lsn);
pos += 8; /* read walEnd */
pos += 8; /* skip sendTime */
if (r < pos + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */
if (replyRequested)
{
/* fsync data, so we send a recent flush pointer */
if (!OutputFsync(now))
goto error;
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, now, true, false))
goto error;
last_status = now;
}
continue;
}
else if (copybuf[0] != 'w')
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
/* Extract WAL location for this block */
{
XLogRecPtr temp = fe_recvint64(&copybuf[1]);
output_written_lsn = Max(temp, output_written_lsn);
}
/* redirect output to stdout */
if (outfd == -1 && strcmp(outfile, "-") == 0)
{
outfd = fileno(stdout);
}
/* got SIGHUP, close output file */
if (outfd != -1 && output_reopen)
{
now = feGetCurrentTimestamp();
if (!OutputFsync(now))
goto error;
close(outfd);
outfd = -1;
output_reopen = false;
}
if (outfd == -1)
{
outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
S_IRUSR | S_IWUSR);
if (outfd == -1)
{
fprintf(stderr,
_("%s: could not open log file \"%s\": %s\n"),
progname, outfile, strerror(errno));
goto error;
}
}
bytes_left = r - hdr_len;
bytes_written = 0;
/* signal that a fsync is needed */
output_unsynced = true;
while (bytes_left)
{
int ret;
ret = write(outfd,
copybuf + hdr_len + bytes_written,
bytes_left);
if (ret < 0)
{
fprintf(stderr,
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
progname, bytes_left, outfile,
strerror(errno));
goto error;
}
/* Write was successful, advance our position */
bytes_written += ret;
bytes_left -= ret;
}
if (write(outfd, "\n", 1) != 1)
{
fprintf(stderr,
_("%s: could not write %u bytes to log file \"%s\": %s\n"),
progname, 1, outfile,
strerror(errno));
goto error;
}
}
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
if (copybuf != NULL)
PQfreemem(copybuf);
if (outfd != -1 && strcmp(outfile, "-") != 0)
{
int64 t = feGetCurrentTimestamp();
/* no need to jump to error on failure here, we're finishing anyway */
OutputFsync(t);
if (close(outfd) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, outfile, strerror(errno));
}
outfd = -1;
error:
destroyPQExpBuffer(query);
PQfinish(conn);
conn = NULL;
}
/*
* Unfortunately we can't do sensible signal handling on windows...
*/
#ifndef WIN32
/*
* When sigint is called, just tell the system to exit at the next possible
* moment.
*/
static void
sigint_handler(int signum)
{
time_to_abort = true;
}
/*
* Trigger the output file to be reopened.
*/
static void
sighup_handler(int signum)
{
output_reopen = true;
}
#endif
int
main(int argc, char **argv)
{
PGresult *res;
static struct option long_options[] = {
/* general options */
{"file", required_argument, NULL, 'f'},
{"no-loop", no_argument, NULL, 'n'},
{"verbose", no_argument, NULL, 'v'},
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, '?'},
/* connnection options */
{"dbname", required_argument, NULL, 'd'},
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
/* replication options */
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
{"fsync-interval", required_argument, NULL, 'F'},
{"slot", required_argument, NULL, 'S'},
{"startpos", required_argument, NULL, 'I'},
/* action */
{"create", no_argument, NULL, 1},
{"start", no_argument, NULL, 2},
{"drop", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
};
int c;
int option_index;
uint32 hi,
lo;
progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
usage();
exit(0);
}
else if (strcmp(argv[1], "-V") == 0 ||
strcmp(argv[1], "--version") == 0)
{
puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
exit(0);
}
}
while ((c = getopt_long(argc, argv, "f:F:nvd:h:o:p:U:wWP:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
{
/* general options */
case 'f':
outfile = pg_strdup(optarg);
break;
case 'n':
noloop = 1;
break;
case 'v':
verbose++;
break;
/* connnection options */
case 'd':
dbname = pg_strdup(optarg);
break;
case 'h':
dbhost = pg_strdup(optarg);
break;
case 'p':
if (atoi(optarg) <= 0)
{
fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
progname, optarg);
exit(1);
}
dbport = pg_strdup(optarg);
break;
case 'U':
dbuser = pg_strdup(optarg);
break;
case 'w':
dbgetpassword = -1;
break;
case 'W':
dbgetpassword = 1;
break;
/* replication options */
case 'o':
{
char *data = pg_strdup(optarg);
char *val = strchr(data, '=');
if (val != NULL)
{
/* remove =; separate data from val */
*val = '\0';
val++;
}
noptions += 1;
options = pg_realloc(options, sizeof(char*) * noptions * 2);
options[(noptions - 1) * 2] = data;
options[(noptions - 1) * 2 + 1] = val;
}
break;
case 'P':
plugin = pg_strdup(optarg);
break;
case 's':
standby_message_timeout = atoi(optarg) * 1000;
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'F':
fsync_interval = atoi(optarg) * 1000;
if (fsync_interval < 0)
{
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'S':
replication_slot = pg_strdup(optarg);
break;
case 'I':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse start position \"%s\"\n"),
progname, optarg);
exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
break;
/* action */
case 1:
do_create_slot = true;
break;
case 2:
do_start_slot = true;
break;
case 3:
do_drop_slot = true;
break;
default:
/*
* getopt_long already emitted a complaint
*/
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
}
/*
* Any non-option arguments?
*/
if (optind < argc)
{
fprintf(stderr,
_("%s: too many command-line arguments (first is \"%s\")\n"),
progname, argv[optind]);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
/*
* Required arguments
*/
if (replication_slot == NULL)
{
fprintf(stderr, _("%s: no slot specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (do_start_slot && outfile == NULL)
{
fprintf(stderr, _("%s: no target file specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (!do_drop_slot && dbname == NULL)
{
fprintf(stderr, _("%s: no database specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (!do_drop_slot && !do_create_slot && !do_start_slot)
{
fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (do_drop_slot && (do_create_slot || do_start_slot))
{
fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (startpos && (do_create_slot || do_drop_slot))
{
fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
#endif
/*
* don't really need this but it actually helps to get more precise error
* messages about authentication, required GUCs and such without starting
* to loop around connection attempts lateron.
*/
{
conn = GetConnection();
if (!conn)
/* Error message already written in GetConnection() */
exit(1);
/*
* Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
* position.
*/
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1 || PQnfields(res) < 4)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 4);
disconnect_and_exit(1);
}
PQclear(res);
}
/*
* stop a replication slot
*/
if (do_drop_slot)
{
char query[256];
if (verbose)
fprintf(stderr,
_("%s: freeing replication slot \"%s\"\n"),
progname, replication_slot);
snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
replication_slot);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 0 || PQnfields(res) != 0)
{
fprintf(stderr,
_("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 0, 0);
disconnect_and_exit(1);
}
PQclear(res);
disconnect_and_exit(0);
}
/*
* init a replication slot
*/
if (do_create_slot)
{
char query[256];
if (verbose)
fprintf(stderr,
_("%s: initializing replication slot \"%s\"\n"),
progname, replication_slot);
snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
replication_slot, plugin);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, query, PQerrorMessage(conn));
disconnect_and_exit(1);
}
if (PQntuples(res) != 1 || PQnfields(res) != 4)
{
fprintf(stderr,
_("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 4);
disconnect_and_exit(1);
}
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 1));
disconnect_and_exit(1);
}
startpos = ((uint64) hi) << 32 | lo;
replication_slot = strdup(PQgetvalue(res, 0, 0));
PQclear(res);
}
if (!do_start_slot)
disconnect_and_exit(0);
while (true)
{
StreamLog();
if (time_to_abort)
{
/*
* We've been Ctrl-C'ed. That's not an error, so exit without an
* errorcode.
*/
disconnect_and_exit(0);
}
else if (noloop)
{
fprintf(stderr, _("%s: disconnected.\n"), progname);
exit(1);
}
else
{
fprintf(stderr,
/* translator: check source for value for %d */
_("%s: disconnected. Waiting %d seconds to try again.\n"),
progname, RECONNECT_SLEEP_TIME);
pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
}
}
}

View File

@ -11,22 +11,19 @@
* src/bin/pg_basebackup/receivelog.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
/* for ntohl/htonl */
#include <netinet/in.h>
#include <arpa/inet.h>
/* local includes */
#include "receivelog.h"
#include "streamutil.h"
#include "libpq-fe.h"
#include "access/xlog_internal.h"
#include "receivelog.h"
#include "streamutil.h"
/* fd and filename for currently open WAL file */
static int walfile = -1;
@ -194,63 +191,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
}
/*
* Local version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of
* server setting.
*/
static int64
localGetCurrentTimestamp(void)
{
int64 result;
struct timeval tp;
gettimeofday(&tp, NULL);
result = (int64) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
result = (result * USECS_PER_SEC) + tp.tv_usec;
return result;
}
/*
* Local version of TimestampDifference(), since we are not linked with
* backend code.
*/
static void
localTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs)
{
int64 diff = stop_time - start_time;
if (diff <= 0)
{
*secs = 0;
*microsecs = 0;
}
else
{
*secs = (long) (diff / USECS_PER_SEC);
*microsecs = (int) (diff % USECS_PER_SEC);
}
}
/*
* Local version of TimestampDifferenceExceeds(), since we are not
* linked with backend code.
*/
static bool
localTimestampDifferenceExceeds(int64 start_time,
int64 stop_time,
int msec)
{
int64 diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000));
}
/*
* Check if a timeline history file exists.
*/
@ -370,47 +310,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
return true;
}
/*
* Converts an int64 to network byte order.
*/
static void
sendint64(int64 i, char *buf)
{
uint32 n32;
/* High order half first, since we're doing MSB-first */
n32 = (uint32) (i >> 32);
n32 = htonl(n32);
memcpy(&buf[0], &n32, 4);
/* Now the low order half */
n32 = (uint32) i;
n32 = htonl(n32);
memcpy(&buf[4], &n32, 4);
}
/*
* Converts an int64 from network byte order to native format.
*/
static int64
recvint64(char *buf)
{
int64 result;
uint32 h32;
uint32 l32;
memcpy(&h32, buf, 4);
memcpy(&l32, buf + 4, 4);
h32 = ntohl(h32);
l32 = ntohl(l32);
result = h32;
result <<= 32;
result |= l32;
return result;
}
/*
* Send a Standby Status Update message to server.
*/
@ -422,16 +321,16 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
replybuf[len] = 'r';
len += 1;
sendint64(blockpos, &replybuf[len]); /* write */
fe_sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
if (reportFlushPosition)
sendint64(lastFlushPosition, &replybuf[len]); /* flush */
fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
else
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
len += 8;
sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
sendint64(now, &replybuf[len]); /* sendTime */
fe_sendint64(now, &replybuf[len]); /* sendTime */
len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
@ -864,9 +763,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Potentially send a status message to the master
*/
now = localGetCurrentTimestamp();
now = feGetCurrentTimestamp();
if (still_sending && standby_message_timeout > 0 &&
localTimestampDifferenceExceeds(last_status, now,
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
{
/* Time to send feedback! */
@ -895,10 +794,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int usecs;
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
localTimestampDifference(now,
targettime,
&secs,
&usecs);
feTimestampDifference(now,
targettime,
&secs,
&usecs);
if (secs <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
else
@ -1002,7 +901,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* If the server requested an immediate reply, send one. */
if (replyRequested && still_sending)
{
now = localGetCurrentTimestamp();
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
@ -1032,7 +931,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
progname, r);
goto error;
}
blockpos = recvint64(&copybuf[1]);
blockpos = fe_recvint64(&copybuf[1]);
/* Extract WAL location for this block */
xlogoff = blockpos % XLOG_SEG_SIZE;

View File

@ -1,3 +1,5 @@
#include "libpq-fe.h"
#include "access/xlogdefs.h"
/*

View File

@ -12,10 +12,23 @@
*/
#include "postgres_fe.h"
#include "streamutil.h"
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
/* for ntohl/htonl */
#include <netinet/in.h>
#include <arpa/inet.h>
/* local includes */
#include "receivelog.h"
#include "streamutil.h"
#include "common/fe_memutils.h"
#include "datatype/timestamp.h"
const char *progname;
char *connection_string = NULL;
@ -23,6 +36,7 @@ char *dbhost = NULL;
char *dbuser = NULL;
char *dbport = NULL;
char *replication_slot = NULL;
char *dbname = NULL;
int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
static char *dbpassword = NULL;
PGconn *conn = NULL;
@ -87,10 +101,10 @@ GetConnection(void)
}
keywords[i] = "dbname";
values[i] = "replication";
values[i] = dbname == NULL ? "replication" : dbname;
i++;
keywords[i] = "replication";
values[i] = "true";
values[i] = dbname == NULL ? "true" : "database";
i++;
keywords[i] = "fallback_application_name";
values[i] = progname;
@ -212,3 +226,102 @@ GetConnection(void)
return tmpconn;
}
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of
* server setting.
*/
int64
feGetCurrentTimestamp(void)
{
int64 result;
struct timeval tp;
gettimeofday(&tp, NULL);
result = (int64) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
result = (result * USECS_PER_SEC) + tp.tv_usec;
return result;
}
/*
* Frontend version of TimestampDifference(), since we are not linked with
* backend code.
*/
void
feTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs)
{
int64 diff = stop_time - start_time;
if (diff <= 0)
{
*secs = 0;
*microsecs = 0;
}
else
{
*secs = (long) (diff / USECS_PER_SEC);
*microsecs = (int) (diff % USECS_PER_SEC);
}
}
/*
* Frontend version of TimestampDifferenceExceeds(), since we are not
* linked with backend code.
*/
bool
feTimestampDifferenceExceeds(int64 start_time,
int64 stop_time,
int msec)
{
int64 diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000));
}
/*
* Converts an int64 to network byte order.
*/
void
fe_sendint64(int64 i, char *buf)
{
uint32 n32;
/* High order half first, since we're doing MSB-first */
n32 = (uint32) (i >> 32);
n32 = htonl(n32);
memcpy(&buf[0], &n32, 4);
/* Now the low order half */
n32 = (uint32) i;
n32 = htonl(n32);
memcpy(&buf[4], &n32, 4);
}
/*
* Converts an int64 from network byte order to native format.
*/
int64
fe_recvint64(char *buf)
{
int64 result;
uint32 h32;
uint32 l32;
memcpy(&h32, buf, 4);
memcpy(&l32, buf + 4, 4);
h32 = ntohl(h32);
l32 = ntohl(l32);
result = h32;
result <<= 32;
result |= l32;
return result;
}

View File

@ -5,6 +5,7 @@ extern char *connection_string;
extern char *dbhost;
extern char *dbuser;
extern char *dbport;
extern char *dbname;
extern int dbgetpassword;
extern char *replication_slot;
@ -12,3 +13,12 @@ extern char *replication_slot;
extern PGconn *conn;
extern PGconn *GetConnection(void);
extern int64 feGetCurrentTimestamp(void);
extern void feTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs);
extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
int msec);
extern void fe_sendint64(int64 i, char *buf);
extern int64 fe_recvint64(char *buf);