mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-18 18:44:06 +08:00
Add pg_recvlogical —-endpos=LSN
Allow pg_recvlogical to specify an ending LSN, complementing the existing -—startpos=LSN option. Craig Ringer, reviewed by Euler Taveira and Naoki Okano
This commit is contained in:
parent
698127a4a9
commit
7c030783a5
@ -38,6 +38,14 @@ PostgreSQL documentation
|
||||
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
|
||||
replication (see <xref linkend="logicaldecoding">).
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<command>pg_recvlogical</> has no equivalent to the logical decoding
|
||||
SQL interface's peek and get modes. It sends replay confirmations for
|
||||
data lazily as it receives it and on clean exit. To examine pending data on
|
||||
a slot without consuming it, use
|
||||
<link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
|
||||
</para>
|
||||
</refsect1>
|
||||
|
||||
<refsect1>
|
||||
@ -154,6 +162,32 @@ PostgreSQL documentation
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><option>-E <replaceable>lsn</replaceable></option></term>
|
||||
<term><option>--endpos=<replaceable>lsn</replaceable></option></term>
|
||||
<listitem>
|
||||
<para>
|
||||
In <option>--start</option> mode, automatically stop replication
|
||||
and exit with normal exit status 0 when receiving reaches the
|
||||
specified LSN. If specified when not in <option>--start</option>
|
||||
mode, an error is raised.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
If there's a record with LSN exactly equal to <replaceable>lsn</>,
|
||||
the record will be output.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The <option>--endpos</option> option is not aware of transaction
|
||||
boundaries and may truncate output partway through a transaction.
|
||||
Any partially output transaction will not be consumed and will be
|
||||
replayed again when the slot is next read from. Individual messages
|
||||
are never truncated.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><option>--if-not-exists</option></term>
|
||||
<listitem>
|
||||
|
@ -40,6 +40,7 @@ static int noloop = 0;
|
||||
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
||||
static int fsync_interval = 10 * 1000; /* 10 sec = default */
|
||||
static XLogRecPtr startpos = InvalidXLogRecPtr;
|
||||
static XLogRecPtr endpos = InvalidXLogRecPtr;
|
||||
static bool do_create_slot = false;
|
||||
static bool slot_exists_ok = false;
|
||||
static bool do_start_slot = false;
|
||||
@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
|
||||
static void usage(void);
|
||||
static void StreamLogicalLog(void);
|
||||
static void disconnect_and_exit(int code);
|
||||
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
|
||||
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
|
||||
bool keepalive, XLogRecPtr lsn);
|
||||
|
||||
static void
|
||||
usage(void)
|
||||
@ -81,6 +85,7 @@ usage(void)
|
||||
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
|
||||
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
|
||||
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
|
||||
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
|
||||
printf(_(" -n, --no-loop do not loop on connection lost\n"));
|
||||
printf(_(" -o, --option=NAME[=VALUE]\n"
|
||||
" pass option NAME with optional value VALUE to the\n"
|
||||
@ -281,6 +286,7 @@ StreamLogicalLog(void)
|
||||
int bytes_written;
|
||||
int64 now;
|
||||
int hdr_len;
|
||||
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
|
||||
|
||||
if (copybuf != NULL)
|
||||
{
|
||||
@ -454,6 +460,7 @@ StreamLogicalLog(void)
|
||||
int pos;
|
||||
bool replyRequested;
|
||||
XLogRecPtr walEnd;
|
||||
bool endposReached = false;
|
||||
|
||||
/*
|
||||
* Parse the keepalive message, enclosed in the CopyData message.
|
||||
@ -476,18 +483,32 @@ StreamLogicalLog(void)
|
||||
}
|
||||
replyRequested = copybuf[pos];
|
||||
|
||||
/* If the server requested an immediate reply, send one. */
|
||||
if (replyRequested)
|
||||
if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
|
||||
{
|
||||
/* fsync data, so we send a recent flush pointer */
|
||||
if (!OutputFsync(now))
|
||||
goto error;
|
||||
/*
|
||||
* If there's nothing to read on the socket until a keepalive
|
||||
* we know that the server has nothing to send us; and if
|
||||
* walEnd has passed endpos, we know nothing else can have
|
||||
* committed before endpos. So we can bail out now.
|
||||
*/
|
||||
endposReached = true;
|
||||
}
|
||||
|
||||
now = feGetCurrentTimestamp();
|
||||
if (!sendFeedback(conn, now, true, false))
|
||||
/* Send a reply, if necessary */
|
||||
if (replyRequested || endposReached)
|
||||
{
|
||||
if (!flushAndSendFeedback(conn, &now))
|
||||
goto error;
|
||||
last_status = now;
|
||||
}
|
||||
|
||||
if (endposReached)
|
||||
{
|
||||
prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
|
||||
time_to_abort = true;
|
||||
break;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
else if (copybuf[0] != 'w')
|
||||
@ -497,7 +518,6 @@ StreamLogicalLog(void)
|
||||
goto error;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Read the header of the XLogData message, enclosed in the CopyData
|
||||
* message. We only need the WAL location field (dataStart), the rest
|
||||
@ -515,12 +535,23 @@ StreamLogicalLog(void)
|
||||
}
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
{
|
||||
XLogRecPtr temp = fe_recvint64(©buf[1]);
|
||||
cur_record_lsn = fe_recvint64(©buf[1]);
|
||||
|
||||
output_written_lsn = Max(temp, output_written_lsn);
|
||||
if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
|
||||
{
|
||||
/*
|
||||
* We've read past our endpoint, so prepare to go away being
|
||||
* cautious about what happens to our output data.
|
||||
*/
|
||||
if (!flushAndSendFeedback(conn, &now))
|
||||
goto error;
|
||||
prepareToTerminate(conn, endpos, false, cur_record_lsn);
|
||||
time_to_abort = true;
|
||||
break;
|
||||
}
|
||||
|
||||
output_written_lsn = Max(cur_record_lsn, output_written_lsn);
|
||||
|
||||
bytes_left = r - hdr_len;
|
||||
bytes_written = 0;
|
||||
|
||||
@ -557,10 +588,29 @@ StreamLogicalLog(void)
|
||||
strerror(errno));
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
|
||||
{
|
||||
/* endpos was exactly the record we just processed, we're done */
|
||||
if (!flushAndSendFeedback(conn, &now))
|
||||
goto error;
|
||||
prepareToTerminate(conn, endpos, false, cur_record_lsn);
|
||||
time_to_abort = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
if (PQresultStatus(res) == PGRES_COPY_OUT)
|
||||
{
|
||||
/*
|
||||
* We're doing a client-initiated clean exit and have sent CopyDone to
|
||||
* the server. We've already sent replay confirmation and fsync'd so
|
||||
* we can just clean up the connection now.
|
||||
*/
|
||||
goto error;
|
||||
}
|
||||
else if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: unexpected termination of replication stream: %s"),
|
||||
@ -638,6 +688,7 @@ main(int argc, char **argv)
|
||||
{"password", no_argument, NULL, 'W'},
|
||||
/* replication options */
|
||||
{"startpos", required_argument, NULL, 'I'},
|
||||
{"endpos", required_argument, NULL, 'E'},
|
||||
{"option", required_argument, NULL, 'o'},
|
||||
{"plugin", required_argument, NULL, 'P'},
|
||||
{"status-interval", required_argument, NULL, 's'},
|
||||
@ -673,7 +724,7 @@ main(int argc, char **argv)
|
||||
}
|
||||
}
|
||||
|
||||
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
|
||||
while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
|
||||
long_options, &option_index)) != -1)
|
||||
{
|
||||
switch (c)
|
||||
@ -733,6 +784,16 @@ main(int argc, char **argv)
|
||||
}
|
||||
startpos = ((uint64) hi) << 32 | lo;
|
||||
break;
|
||||
case 'E':
|
||||
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: could not parse end position \"%s\"\n"),
|
||||
progname, optarg);
|
||||
exit(1);
|
||||
}
|
||||
endpos = ((uint64) hi) << 32 | lo;
|
||||
break;
|
||||
case 'o':
|
||||
{
|
||||
char *data = pg_strdup(optarg);
|
||||
@ -857,6 +918,16 @@ main(int argc, char **argv)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (endpos != InvalidXLogRecPtr && !do_start_slot)
|
||||
{
|
||||
fprintf(stderr,
|
||||
_("%s: --endpos may only be specified with --start\n"),
|
||||
progname);
|
||||
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
||||
progname);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
#ifndef WIN32
|
||||
pqsignal(SIGINT, sigint_handler);
|
||||
pqsignal(SIGHUP, sighup_handler);
|
||||
@ -923,8 +994,8 @@ main(int argc, char **argv)
|
||||
if (time_to_abort)
|
||||
{
|
||||
/*
|
||||
* We've been Ctrl-C'ed. That's not an error, so exit without an
|
||||
* errorcode.
|
||||
* We've been Ctrl-C'ed or reached an exit limit condition. That's
|
||||
* not an error, so exit without an errorcode.
|
||||
*/
|
||||
disconnect_and_exit(0);
|
||||
}
|
||||
@ -943,3 +1014,47 @@ main(int argc, char **argv)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Fsync our output data, and send a feedback message to the server. Returns
|
||||
* true if successful, false otherwise.
|
||||
*
|
||||
* If successful, *now is updated to the current timestamp just before sending
|
||||
* feedback.
|
||||
*/
|
||||
static bool
|
||||
flushAndSendFeedback(PGconn *conn, TimestampTz *now)
|
||||
{
|
||||
/* flush data to disk, so that we send a recent flush pointer */
|
||||
if (!OutputFsync(*now))
|
||||
return false;
|
||||
*now = feGetCurrentTimestamp();
|
||||
if (!sendFeedback(conn, *now, true, false))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to inform the server about of upcoming demise, but don't wait around or
|
||||
* retry on failure.
|
||||
*/
|
||||
static void
|
||||
prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
|
||||
{
|
||||
(void) PQputCopyEnd(conn, NULL);
|
||||
(void) PQflush(conn);
|
||||
|
||||
if (verbose)
|
||||
{
|
||||
if (keepalive)
|
||||
fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
|
||||
progname,
|
||||
(uint32) (endpos >> 32), (uint32) endpos);
|
||||
else
|
||||
fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
|
||||
progname, (uint32) (endpos >> 32), (uint32) (endpos),
|
||||
(uint32) (lsn >> 32), (uint32) lsn);
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user