mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-18 18:44:06 +08:00
Add pg_recvlogical —-endpos=LSN
Allow pg_recvlogical to specify an ending LSN, complementing the existing -—startpos=LSN option. Craig Ringer, reviewed by Euler Taveira and Naoki Okano
This commit is contained in:
parent
698127a4a9
commit
7c030783a5
@ -38,6 +38,14 @@ PostgreSQL documentation
|
|||||||
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
|
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
|
||||||
replication (see <xref linkend="logicaldecoding">).
|
replication (see <xref linkend="logicaldecoding">).
|
||||||
</para>
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
<command>pg_recvlogical</> has no equivalent to the logical decoding
|
||||||
|
SQL interface's peek and get modes. It sends replay confirmations for
|
||||||
|
data lazily as it receives it and on clean exit. To examine pending data on
|
||||||
|
a slot without consuming it, use
|
||||||
|
<link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
|
||||||
|
</para>
|
||||||
</refsect1>
|
</refsect1>
|
||||||
|
|
||||||
<refsect1>
|
<refsect1>
|
||||||
@ -154,6 +162,32 @@ PostgreSQL documentation
|
|||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><option>-E <replaceable>lsn</replaceable></option></term>
|
||||||
|
<term><option>--endpos=<replaceable>lsn</replaceable></option></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
In <option>--start</option> mode, automatically stop replication
|
||||||
|
and exit with normal exit status 0 when receiving reaches the
|
||||||
|
specified LSN. If specified when not in <option>--start</option>
|
||||||
|
mode, an error is raised.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
If there's a record with LSN exactly equal to <replaceable>lsn</>,
|
||||||
|
the record will be output.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<para>
|
||||||
|
The <option>--endpos</option> option is not aware of transaction
|
||||||
|
boundaries and may truncate output partway through a transaction.
|
||||||
|
Any partially output transaction will not be consumed and will be
|
||||||
|
replayed again when the slot is next read from. Individual messages
|
||||||
|
are never truncated.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
<varlistentry>
|
<varlistentry>
|
||||||
<term><option>--if-not-exists</option></term>
|
<term><option>--if-not-exists</option></term>
|
||||||
<listitem>
|
<listitem>
|
||||||
|
@ -40,6 +40,7 @@ static int noloop = 0;
|
|||||||
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
||||||
static int fsync_interval = 10 * 1000; /* 10 sec = default */
|
static int fsync_interval = 10 * 1000; /* 10 sec = default */
|
||||||
static XLogRecPtr startpos = InvalidXLogRecPtr;
|
static XLogRecPtr startpos = InvalidXLogRecPtr;
|
||||||
|
static XLogRecPtr endpos = InvalidXLogRecPtr;
|
||||||
static bool do_create_slot = false;
|
static bool do_create_slot = false;
|
||||||
static bool slot_exists_ok = false;
|
static bool slot_exists_ok = false;
|
||||||
static bool do_start_slot = false;
|
static bool do_start_slot = false;
|
||||||
@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
|
|||||||
static void usage(void);
|
static void usage(void);
|
||||||
static void StreamLogicalLog(void);
|
static void StreamLogicalLog(void);
|
||||||
static void disconnect_and_exit(int code);
|
static void disconnect_and_exit(int code);
|
||||||
|
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
|
||||||
|
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
|
||||||
|
bool keepalive, XLogRecPtr lsn);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
usage(void)
|
usage(void)
|
||||||
@ -81,6 +85,7 @@ usage(void)
|
|||||||
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
|
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
|
||||||
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
|
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
|
||||||
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
|
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
|
||||||
|
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
|
||||||
printf(_(" -n, --no-loop do not loop on connection lost\n"));
|
printf(_(" -n, --no-loop do not loop on connection lost\n"));
|
||||||
printf(_(" -o, --option=NAME[=VALUE]\n"
|
printf(_(" -o, --option=NAME[=VALUE]\n"
|
||||||
" pass option NAME with optional value VALUE to the\n"
|
" pass option NAME with optional value VALUE to the\n"
|
||||||
@ -281,6 +286,7 @@ StreamLogicalLog(void)
|
|||||||
int bytes_written;
|
int bytes_written;
|
||||||
int64 now;
|
int64 now;
|
||||||
int hdr_len;
|
int hdr_len;
|
||||||
|
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
|
||||||
|
|
||||||
if (copybuf != NULL)
|
if (copybuf != NULL)
|
||||||
{
|
{
|
||||||
@ -454,6 +460,7 @@ StreamLogicalLog(void)
|
|||||||
int pos;
|
int pos;
|
||||||
bool replyRequested;
|
bool replyRequested;
|
||||||
XLogRecPtr walEnd;
|
XLogRecPtr walEnd;
|
||||||
|
bool endposReached = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Parse the keepalive message, enclosed in the CopyData message.
|
* Parse the keepalive message, enclosed in the CopyData message.
|
||||||
@ -476,18 +483,32 @@ StreamLogicalLog(void)
|
|||||||
}
|
}
|
||||||
replyRequested = copybuf[pos];
|
replyRequested = copybuf[pos];
|
||||||
|
|
||||||
/* If the server requested an immediate reply, send one. */
|
if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
|
||||||
if (replyRequested)
|
|
||||||
{
|
{
|
||||||
/* fsync data, so we send a recent flush pointer */
|
/*
|
||||||
if (!OutputFsync(now))
|
* If there's nothing to read on the socket until a keepalive
|
||||||
goto error;
|
* we know that the server has nothing to send us; and if
|
||||||
|
* walEnd has passed endpos, we know nothing else can have
|
||||||
|
* committed before endpos. So we can bail out now.
|
||||||
|
*/
|
||||||
|
endposReached = true;
|
||||||
|
}
|
||||||
|
|
||||||
now = feGetCurrentTimestamp();
|
/* Send a reply, if necessary */
|
||||||
if (!sendFeedback(conn, now, true, false))
|
if (replyRequested || endposReached)
|
||||||
|
{
|
||||||
|
if (!flushAndSendFeedback(conn, &now))
|
||||||
goto error;
|
goto error;
|
||||||
last_status = now;
|
last_status = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (endposReached)
|
||||||
|
{
|
||||||
|
prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
|
||||||
|
time_to_abort = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (copybuf[0] != 'w')
|
else if (copybuf[0] != 'w')
|
||||||
@ -497,7 +518,6 @@ StreamLogicalLog(void)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Read the header of the XLogData message, enclosed in the CopyData
|
* Read the header of the XLogData message, enclosed in the CopyData
|
||||||
* message. We only need the WAL location field (dataStart), the rest
|
* message. We only need the WAL location field (dataStart), the rest
|
||||||
@ -515,12 +535,23 @@ StreamLogicalLog(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Extract WAL location for this block */
|
/* Extract WAL location for this block */
|
||||||
{
|
cur_record_lsn = fe_recvint64(©buf[1]);
|
||||||
XLogRecPtr temp = fe_recvint64(©buf[1]);
|
|
||||||
|
|
||||||
output_written_lsn = Max(temp, output_written_lsn);
|
if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We've read past our endpoint, so prepare to go away being
|
||||||
|
* cautious about what happens to our output data.
|
||||||
|
*/
|
||||||
|
if (!flushAndSendFeedback(conn, &now))
|
||||||
|
goto error;
|
||||||
|
prepareToTerminate(conn, endpos, false, cur_record_lsn);
|
||||||
|
time_to_abort = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
output_written_lsn = Max(cur_record_lsn, output_written_lsn);
|
||||||
|
|
||||||
bytes_left = r - hdr_len;
|
bytes_left = r - hdr_len;
|
||||||
bytes_written = 0;
|
bytes_written = 0;
|
||||||
|
|
||||||
@ -557,10 +588,29 @@ StreamLogicalLog(void)
|
|||||||
strerror(errno));
|
strerror(errno));
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
|
||||||
|
{
|
||||||
|
/* endpos was exactly the record we just processed, we're done */
|
||||||
|
if (!flushAndSendFeedback(conn, &now))
|
||||||
|
goto error;
|
||||||
|
prepareToTerminate(conn, endpos, false, cur_record_lsn);
|
||||||
|
time_to_abort = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res = PQgetResult(conn);
|
res = PQgetResult(conn);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) == PGRES_COPY_OUT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We're doing a client-initiated clean exit and have sent CopyDone to
|
||||||
|
* the server. We've already sent replay confirmation and fsync'd so
|
||||||
|
* we can just clean up the connection now.
|
||||||
|
*/
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
_("%s: unexpected termination of replication stream: %s"),
|
_("%s: unexpected termination of replication stream: %s"),
|
||||||
@ -638,6 +688,7 @@ main(int argc, char **argv)
|
|||||||
{"password", no_argument, NULL, 'W'},
|
{"password", no_argument, NULL, 'W'},
|
||||||
/* replication options */
|
/* replication options */
|
||||||
{"startpos", required_argument, NULL, 'I'},
|
{"startpos", required_argument, NULL, 'I'},
|
||||||
|
{"endpos", required_argument, NULL, 'E'},
|
||||||
{"option", required_argument, NULL, 'o'},
|
{"option", required_argument, NULL, 'o'},
|
||||||
{"plugin", required_argument, NULL, 'P'},
|
{"plugin", required_argument, NULL, 'P'},
|
||||||
{"status-interval", required_argument, NULL, 's'},
|
{"status-interval", required_argument, NULL, 's'},
|
||||||
@ -673,7 +724,7 @@ main(int argc, char **argv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
|
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
|
||||||
long_options, &option_index)) != -1)
|
long_options, &option_index)) != -1)
|
||||||
{
|
{
|
||||||
switch (c)
|
switch (c)
|
||||||
@ -733,6 +784,16 @@ main(int argc, char **argv)
|
|||||||
}
|
}
|
||||||
startpos = ((uint64) hi) << 32 | lo;
|
startpos = ((uint64) hi) << 32 | lo;
|
||||||
break;
|
break;
|
||||||
|
case 'E':
|
||||||
|
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
|
||||||
|
{
|
||||||
|
fprintf(stderr,
|
||||||
|
_("%s: could not parse end position \"%s\"\n"),
|
||||||
|
progname, optarg);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
endpos = ((uint64) hi) << 32 | lo;
|
||||||
|
break;
|
||||||
case 'o':
|
case 'o':
|
||||||
{
|
{
|
||||||
char *data = pg_strdup(optarg);
|
char *data = pg_strdup(optarg);
|
||||||
@ -857,6 +918,16 @@ main(int argc, char **argv)
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (endpos != InvalidXLogRecPtr && !do_start_slot)
|
||||||
|
{
|
||||||
|
fprintf(stderr,
|
||||||
|
_("%s: --endpos may only be specified with --start\n"),
|
||||||
|
progname);
|
||||||
|
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
||||||
|
progname);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
pqsignal(SIGINT, sigint_handler);
|
pqsignal(SIGINT, sigint_handler);
|
||||||
pqsignal(SIGHUP, sighup_handler);
|
pqsignal(SIGHUP, sighup_handler);
|
||||||
@ -923,8 +994,8 @@ main(int argc, char **argv)
|
|||||||
if (time_to_abort)
|
if (time_to_abort)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We've been Ctrl-C'ed. That's not an error, so exit without an
|
* We've been Ctrl-C'ed or reached an exit limit condition. That's
|
||||||
* errorcode.
|
* not an error, so exit without an errorcode.
|
||||||
*/
|
*/
|
||||||
disconnect_and_exit(0);
|
disconnect_and_exit(0);
|
||||||
}
|
}
|
||||||
@ -943,3 +1014,47 @@ main(int argc, char **argv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fsync our output data, and send a feedback message to the server. Returns
|
||||||
|
* true if successful, false otherwise.
|
||||||
|
*
|
||||||
|
* If successful, *now is updated to the current timestamp just before sending
|
||||||
|
* feedback.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
flushAndSendFeedback(PGconn *conn, TimestampTz *now)
|
||||||
|
{
|
||||||
|
/* flush data to disk, so that we send a recent flush pointer */
|
||||||
|
if (!OutputFsync(*now))
|
||||||
|
return false;
|
||||||
|
*now = feGetCurrentTimestamp();
|
||||||
|
if (!sendFeedback(conn, *now, true, false))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Try to inform the server about of upcoming demise, but don't wait around or
|
||||||
|
* retry on failure.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
|
||||||
|
{
|
||||||
|
(void) PQputCopyEnd(conn, NULL);
|
||||||
|
(void) PQflush(conn);
|
||||||
|
|
||||||
|
if (verbose)
|
||||||
|
{
|
||||||
|
if (keepalive)
|
||||||
|
fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
|
||||||
|
progname,
|
||||||
|
(uint32) (endpos >> 32), (uint32) endpos);
|
||||||
|
else
|
||||||
|
fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
|
||||||
|
progname, (uint32) (endpos >> 32), (uint32) (endpos),
|
||||||
|
(uint32) (lsn >> 32), (uint32) lsn);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user