Add pg_recvlogical —-endpos=LSN

Allow pg_recvlogical to specify an ending LSN, complementing
the existing -—startpos=LSN option.

Craig Ringer, reviewed by Euler Taveira and Naoki Okano
This commit is contained in:
Simon Riggs 2017-01-04 19:02:07 +00:00
parent 698127a4a9
commit 7c030783a5
2 changed files with 164 additions and 15 deletions

View File

@ -38,6 +38,14 @@ PostgreSQL documentation
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
replication (see <xref linkend="logicaldecoding">). replication (see <xref linkend="logicaldecoding">).
</para> </para>
<para>
<command>pg_recvlogical</> has no equivalent to the logical decoding
SQL interface's peek and get modes. It sends replay confirmations for
data lazily as it receives it and on clean exit. To examine pending data on
a slot without consuming it, use
<link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
</para>
</refsect1> </refsect1>
<refsect1> <refsect1>
@ -154,6 +162,32 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-E <replaceable>lsn</replaceable></option></term>
<term><option>--endpos=<replaceable>lsn</replaceable></option></term>
<listitem>
<para>
In <option>--start</option> mode, automatically stop replication
and exit with normal exit status 0 when receiving reaches the
specified LSN. If specified when not in <option>--start</option>
mode, an error is raised.
</para>
<para>
If there's a record with LSN exactly equal to <replaceable>lsn</>,
the record will be output.
</para>
<para>
The <option>--endpos</option> option is not aware of transaction
boundaries and may truncate output partway through a transaction.
Any partially output transaction will not be consumed and will be
replayed again when the slot is next read from. Individual messages
are never truncated.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>--if-not-exists</option></term> <term><option>--if-not-exists</option></term>
<listitem> <listitem>

View File

@ -40,6 +40,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr; static XLogRecPtr startpos = InvalidXLogRecPtr;
static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false; static bool do_create_slot = false;
static bool slot_exists_ok = false; static bool slot_exists_ok = false;
static bool do_start_slot = false; static bool do_start_slot = false;
@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void); static void usage(void);
static void StreamLogicalLog(void); static void StreamLogicalLog(void);
static void disconnect_and_exit(int code); static void disconnect_and_exit(int code);
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
bool keepalive, XLogRecPtr lsn);
static void static void
usage(void) usage(void)
@ -81,6 +85,7 @@ usage(void)
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n")); printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n" printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n" " pass option NAME with optional value VALUE to the\n"
@ -281,6 +286,7 @@ StreamLogicalLog(void)
int bytes_written; int bytes_written;
int64 now; int64 now;
int hdr_len; int hdr_len;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL) if (copybuf != NULL)
{ {
@ -454,6 +460,7 @@ StreamLogicalLog(void)
int pos; int pos;
bool replyRequested; bool replyRequested;
XLogRecPtr walEnd; XLogRecPtr walEnd;
bool endposReached = false;
/* /*
* Parse the keepalive message, enclosed in the CopyData message. * Parse the keepalive message, enclosed in the CopyData message.
@ -476,18 +483,32 @@ StreamLogicalLog(void)
} }
replyRequested = copybuf[pos]; replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
if (replyRequested)
{ {
/* fsync data, so we send a recent flush pointer */ /*
if (!OutputFsync(now)) * If there's nothing to read on the socket until a keepalive
goto error; * we know that the server has nothing to send us; and if
* walEnd has passed endpos, we know nothing else can have
* committed before endpos. So we can bail out now.
*/
endposReached = true;
}
now = feGetCurrentTimestamp(); /* Send a reply, if necessary */
if (!sendFeedback(conn, now, true, false)) if (replyRequested || endposReached)
{
if (!flushAndSendFeedback(conn, &now))
goto error; goto error;
last_status = now; last_status = now;
} }
if (endposReached)
{
prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
time_to_abort = true;
break;
}
continue; continue;
} }
else if (copybuf[0] != 'w') else if (copybuf[0] != 'w')
@ -497,7 +518,6 @@ StreamLogicalLog(void)
goto error; goto error;
} }
/* /*
* Read the header of the XLogData message, enclosed in the CopyData * Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest * message. We only need the WAL location field (dataStart), the rest
@ -515,12 +535,23 @@ StreamLogicalLog(void)
} }
/* Extract WAL location for this block */ /* Extract WAL location for this block */
{ cur_record_lsn = fe_recvint64(&copybuf[1]);
XLogRecPtr temp = fe_recvint64(&copybuf[1]);
output_written_lsn = Max(temp, output_written_lsn); if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
{
/*
* We've read past our endpoint, so prepare to go away being
* cautious about what happens to our output data.
*/
if (!flushAndSendFeedback(conn, &now))
goto error;
prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
} }
output_written_lsn = Max(cur_record_lsn, output_written_lsn);
bytes_left = r - hdr_len; bytes_left = r - hdr_len;
bytes_written = 0; bytes_written = 0;
@ -557,10 +588,29 @@ StreamLogicalLog(void)
strerror(errno)); strerror(errno));
goto error; goto error;
} }
if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
{
/* endpos was exactly the record we just processed, we're done */
if (!flushAndSendFeedback(conn, &now))
goto error;
prepareToTerminate(conn, endpos, false, cur_record_lsn);
time_to_abort = true;
break;
}
} }
res = PQgetResult(conn); res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) == PGRES_COPY_OUT)
{
/*
* We're doing a client-initiated clean exit and have sent CopyDone to
* the server. We've already sent replay confirmation and fsync'd so
* we can just clean up the connection now.
*/
goto error;
}
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"), _("%s: unexpected termination of replication stream: %s"),
@ -638,6 +688,7 @@ main(int argc, char **argv)
{"password", no_argument, NULL, 'W'}, {"password", no_argument, NULL, 'W'},
/* replication options */ /* replication options */
{"startpos", required_argument, NULL, 'I'}, {"startpos", required_argument, NULL, 'I'},
{"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'}, {"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'}, {"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'}, {"status-interval", required_argument, NULL, 's'},
@ -673,7 +724,7 @@ main(int argc, char **argv)
} }
} }
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:", while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
long_options, &option_index)) != -1) long_options, &option_index)) != -1)
{ {
switch (c) switch (c)
@ -733,6 +784,16 @@ main(int argc, char **argv)
} }
startpos = ((uint64) hi) << 32 | lo; startpos = ((uint64) hi) << 32 | lo;
break; break;
case 'E':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse end position \"%s\"\n"),
progname, optarg);
exit(1);
}
endpos = ((uint64) hi) << 32 | lo;
break;
case 'o': case 'o':
{ {
char *data = pg_strdup(optarg); char *data = pg_strdup(optarg);
@ -857,6 +918,16 @@ main(int argc, char **argv)
exit(1); exit(1);
} }
if (endpos != InvalidXLogRecPtr && !do_start_slot)
{
fprintf(stderr,
_("%s: --endpos may only be specified with --start\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
#ifndef WIN32 #ifndef WIN32
pqsignal(SIGINT, sigint_handler); pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler); pqsignal(SIGHUP, sighup_handler);
@ -923,8 +994,8 @@ main(int argc, char **argv)
if (time_to_abort) if (time_to_abort)
{ {
/* /*
* We've been Ctrl-C'ed. That's not an error, so exit without an * We've been Ctrl-C'ed or reached an exit limit condition. That's
* errorcode. * not an error, so exit without an errorcode.
*/ */
disconnect_and_exit(0); disconnect_and_exit(0);
} }
@ -943,3 +1014,47 @@ main(int argc, char **argv)
} }
} }
} }
/*
* Fsync our output data, and send a feedback message to the server. Returns
* true if successful, false otherwise.
*
* If successful, *now is updated to the current timestamp just before sending
* feedback.
*/
static bool
flushAndSendFeedback(PGconn *conn, TimestampTz *now)
{
/* flush data to disk, so that we send a recent flush pointer */
if (!OutputFsync(*now))
return false;
*now = feGetCurrentTimestamp();
if (!sendFeedback(conn, *now, true, false))
return false;
return true;
}
/*
* Try to inform the server about of upcoming demise, but don't wait around or
* retry on failure.
*/
static void
prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
{
(void) PQputCopyEnd(conn, NULL);
(void) PQflush(conn);
if (verbose)
{
if (keepalive)
fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
progname,
(uint32) (endpos >> 32), (uint32) endpos);
else
fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
progname, (uint32) (endpos >> 32), (uint32) (endpos),
(uint32) (lsn >> 32), (uint32) lsn);
}
}