Fix walsender failure at promotion.

If a standby server has a cascading standby server connected to it, it's
possible that WAL has already been sent up to the next WAL page boundary,
splitting a WAL record in the middle, when the first standby server is
promoted. Don't throw an assertion failure or error in walsender if that
happens.

Also, fix a variant of the same bug in pg_receivexlog: if it had already
received WAL on previous timeline up to a segment boundary, when the
upstream standby server is promoted so that the timeline switch record falls
on the previous segment, pg_receivexlog would miss the segment containing
the timeline switch. To fix that, have walsender send the position of the
timeline switch at end-of-streaming, in addition to the next timeline's ID.
It was previously assumed that the switch happened exactly where the
streaming stopped.

Note: this is an incompatible change in the streaming protocol. You might
get an error if you try to stream over timeline switches, if the client is
running 9.3beta1 and the server is more recent. It should be fine after a
reconnect, however.

Reported by Fujii Masao.
This commit is contained in:
Heikki Linnakangas 2013-05-08 20:10:17 +03:00
parent cb953d8b1b
commit 2ffa66f497
7 changed files with 142 additions and 43 deletions

View File

@ -1423,10 +1423,15 @@ The commands accepted in walsender mode are:
<para> <para>
After streaming all the WAL on a timeline that is not the latest one, After streaming all the WAL on a timeline that is not the latest one,
the server will end streaming by exiting the COPY mode. When the client the server will end streaming by exiting the COPY mode. When the client
acknowledges this by also exiting COPY mode, the server sends a acknowledges this by also exiting COPY mode, the server sends a result
single-row, single-column result set indicating the next timeline in set with one row and two columns, indicating the next timeline in this
this server's history. That is followed by a CommandComplete message, server's history. The first column is the next timeline's ID, and the
and the server is ready to accept a new command. second column is the XLOG position where the switch happened. Usually,
the switch position is the end of the WAL that was streamed, but there
are corner cases where the server can send some WAL from the old
timeline that it has not itself replayed before promoting. Finally, the
server sends CommandComplete message, and is ready to accept a new
command.
</para> </para>
<para> <para>

View File

@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
} }
else else
{ {
ptr = RecPtr; ptr = tliRecPtr;
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs); tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI) if (curFileTLI > 0 && tli < curFileTLI)
@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI); tli, curFileTLI);
} }
curFileTLI = tli; curFileTLI = tli;
RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo); RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
receivedUpto = 0;
} }
/* /*
* Move to XLOG_FROM_STREAM state in either case. We'll get * Move to XLOG_FROM_STREAM state in either case. We'll get

View File

@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
res = PQgetResult(streamConn); res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) if (PQresultStatus(res) == PGRES_TUPLES_OK)
{ {
/* Read the next timeline's ID */ /*
if (PQnfields(res) != 1 || PQntuples(res) != 1) * Read the next timeline's ID. The server also sends the timeline's
* starting point, but it is ignored.
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming"))); (errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);

View File

@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
walrcv->startTime = now; walrcv->startTime = now;
/* /*
* If this is the first startup of walreceiver, we initialize receivedUpto * If this is the first startup of walreceiver (on this timeline),
* and latestChunkStart to receiveStart. * initialize receivedUpto and latestChunkStart to the starting point.
*/ */
if (walrcv->receiveStart == 0) if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{ {
walrcv->receivedUpto = recptr; walrcv->receivedUpto = recptr;
walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr; walrcv->latestChunkStart = recptr;
} }
walrcv->receiveStart = recptr; walrcv->receiveStart = recptr;

View File

@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd)
*/ */
if (sendTimeLineIsHistoric) if (sendTimeLineIsHistoric)
{ {
char str[11]; char tli_str[11];
snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); char startpos_str[8+1+8+1];
pq_beginmessage(&buf, 'T'); /* RowDescription */ snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
pq_sendint(&buf, 1, 2); /* 1 field */ snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
pq_beginmessage(&buf, 'T'); /* RowDescription */
pq_sendint(&buf, 2, 2); /* 2 fields */
/* Field header */ /* Field header */
pq_sendstring(&buf, "next_tli"); pq_sendstring(&buf, "next_tli");
pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, 0, 2); /* attnum */
/* /*
* int8 may seem like a surprising data type for this, but in theory * int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned. * int4 would not be wide enough for this, as TimeLineID is unsigned.
@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd)
pq_sendint(&buf, -1, 2); pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2); pq_sendint(&buf, 0, 2);
pq_sendstring(&buf, "next_tli_startpos");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf); pq_endmessage(&buf);
/* Data row */ /* Data row */
pq_beginmessage(&buf, 'D'); pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 1, 2); /* number of columns */ pq_sendint(&buf, 2, 2); /* number of columns */
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str)); pq_sendint(&buf, strlen(tli_str), 4); /* length */
pq_sendbytes(&buf, tli_str, strlen(tli_str));
pq_sendint(&buf, strlen(startpos_str), 4); /* length */
pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
pq_endmessage(&buf); pq_endmessage(&buf);
} }
@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup)
history = readTimeLineHistory(ThisTimeLineID); history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sentPtr <= sendTimeLineValidUpto);
Assert(sendTimeLine < sendTimeLineNextTLI); Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history); list_free_deep(history);
/* the current send pointer should be <= the switchpoint */
if (!(sentPtr <= sendTimeLineValidUpto))
elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
sendTimeLine,
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto,
(uint32) (sentPtr >> 32),
(uint32) sentPtr);
sendTimeLineIsHistoric = true; sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto; SendRqstPtr = sendTimeLineValidUpto;
@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup)
/* /*
* If this is a historic timeline and we've reached the point where we * If this is a historic timeline and we've reached the point where we
* forked to the next timeline, stop streaming. * forked to the next timeline, stop streaming.
*
* Note: We might already have sent WAL > sendTimeLineValidUpto. The
* startup process will normally replay all WAL that has been received from
* the master, before promoting, but if the WAL streaming is terminated at
* a WAL page boundary, the valid portion of the timeline might end in the
* middle of a WAL record. We might've already sent the first half of that
* partial WAL record to the cascading standby, so that sentPtr >
* sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
* partial WAL record either, so it can still follow our timeline switch.
*/ */
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{ {
@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup)
streamingDoneSending = true; streamingDoneSending = true;
*caughtup = true; *caughtup = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
(uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
(uint32) (sentPtr >> 32), (uint32) sentPtr);
return; return;
} }

View File

@ -83,10 +83,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
timeline); timeline);
/* /*
* Note that we report the previous, not current, position here. That's * Note that we report the previous, not current, position here. After a
* the exact location where the timeline switch happend. After the switch, * timeline switch, xlogpos points to the beginning of the segment because
* we restart streaming from the beginning of the segment, so xlogpos can * that's where we always begin streaming. Reporting the end of previous
* smaller than prevpos if we just switched to new timeline. * timeline isn't totally accurate, because the next timeline can begin
* slightly before the end of the WAL that we received on the previous
* timeline, but it's close enough for reporting purposes.
*/ */
if (prevtimeline != 0 && prevtimeline != timeline) if (prevtimeline != 0 && prevtimeline != timeline)
fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"), fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),

View File

@ -37,6 +37,9 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
stream_stop_callback stream_stop, int standby_message_timeout, stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos); char *partial_suffix, XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
/* /*
* Open a new WAL file in the specified directory. * Open a new WAL file in the specified directory.
* *
@ -627,26 +630,44 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* There are two possible reasons for that: a controlled shutdown, * There are two possible reasons for that: a controlled shutdown,
* or we reached the end of the current timeline. In case of * or we reached the end of the current timeline. In case of
* end-of-timeline, the server sends a result set after Copy has * end-of-timeline, the server sends a result set after Copy has
* finished, containing the next timeline's ID. Read that, and * finished, containing information about the next timeline. Read
* restart streaming from the next timeline. * that, and restart streaming from the next timeline. In case of
* controlled shutdown, stop here.
*/ */
if (PQresultStatus(res) == PGRES_TUPLES_OK) if (PQresultStatus(res) == PGRES_TUPLES_OK)
{ {
/* /*
* End-of-timeline. Read the next timeline's ID. * End-of-timeline. Read the next timeline's ID and starting
* position. Usually, the starting position will match the end of
* the previous timeline, but there are corner cases like if the
* server had sent us half of a WAL record, when it was promoted.
* The new timeline will begin at the end of the last complete
* record in that case, overlapping the partial WAL record on the
* the old timeline.
*/ */
uint32 newtimeline; uint32 newtimeline;
bool parsed;
newtimeline = atoi(PQgetvalue(res, 0, 0)); parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
PQclear(res); PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
if (newtimeline <= timeline) if (newtimeline <= timeline)
{ {
/* shouldn't happen */
fprintf(stderr, fprintf(stderr,
"server reported unexpected next timeline %u, following timeline %u\n", _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
newtimeline, timeline); progname, newtimeline, timeline);
goto error;
}
if (startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
goto error; goto error;
} }
@ -666,7 +687,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Always start streaming at the beginning of a segment. * Always start streaming at the beginning of a segment.
*/ */
timeline = newtimeline; timeline = newtimeline;
startpos = stoppos - (stoppos % XLOG_SEG_SIZE); startpos = startpos - (startpos % XLOG_SEG_SIZE);
continue; continue;
} }
else if (PQresultStatus(res) == PGRES_COMMAND_OK) else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@ -704,6 +725,50 @@ error:
return false; return false;
} }
/*
* Helper function to parse the result set returned by server after streaming
* has finished. On failure, prints an error to stderr and returns false.
*/
static bool
ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
{
uint32 startpos_xlogid,
startpos_xrecoff;
/*----------
* The result set consists of one row and two columns, e.g:
*
* next_tli | next_tli_startpos
* ----------+-------------------
* 4 | 0/9949AE0
*
* next_tli is the timeline ID of the next timeline after the one that
* just finished streaming. next_tli_startpos is the XLOG position where
* the server switched to it.
*----------
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
return false;
}
*timeline = atoi(PQgetvalue(res, 0, 0));
if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
&startpos_xrecoff) != 2)
{
fprintf(stderr,
_("%s: could not parse next timeline's starting point \"%s\"\n"),
progname, PQgetvalue(res, 0, 1));
return false;
}
*startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
return true;
}
/* /*
* The main loop of ReceiveXLogStream. Handles the COPY stream after * The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command. * initiating streaming with the START_STREAMING command.