Consistently declare timestamp variables as TimestampTz.

Twiddle the replication-related code so that its timestamp variables
are declared TimestampTz, rather than the uninformative "int64" that
was previously used for meant-to-be-always-integer timestamps.
This resolves the int64-vs-TimestampTz declaration inconsistencies
introduced by commit 7c030783a, though in the opposite direction to
what was originally suggested.

This required including datatype/timestamp.h in a couple more places
than before.  I decided it would be a good idea to slim down that
header by not having it pull in <float.h> etc, as those headers are
no longer at all relevant to its purpose.  Unsurprisingly, a small number
of .c files turn out to have been depending on those inclusions, so add
them back in the .c files as needed.

Discussion: https://postgr.es/m/26788.1487455319@sss.pgh.pa.us
Discussion: https://postgr.es/m/27694.1487456324@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2017-02-23 15:57:08 -05:00
parent b9d092c962
commit c29aff959d
22 changed files with 100 additions and 94 deletions

View File

@ -11,6 +11,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "libpq/auth.h" #include "libpq/auth.h"
#include "port.h" #include "port.h"
#include "utils/guc.h" #include "utils/guc.h"

View File

@ -3,6 +3,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "btree_gist.h" #include "btree_gist.h"
#include "btree_utils_num.h" #include "btree_utils_num.h"
#include "utils/builtins.h" #include "utils/builtins.h"

View File

@ -15,6 +15,8 @@
#include "postgres.h" #include "postgres.h"
#include <float.h>
#include "access/gist_private.h" #include "access/gist_private.h"
#include "access/hash.h" #include "access/hash.h"
#include "access/htup_details.h" #include "access/htup_details.h"

View File

@ -15,6 +15,7 @@
#include "postgres.h" #include "postgres.h"
#include <ctype.h> #include <ctype.h>
#include <math.h>
#include <time.h> #include <time.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>

View File

@ -16,6 +16,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "access/xact.h" #include "access/xact.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/createas.h" #include "commands/createas.h"

View File

@ -35,6 +35,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include <math.h>
#include "access/relscan.h" #include "access/relscan.h"
#include "access/transam.h" #include "access/transam.h"
#include "executor/execdebug.h" #include "executor/execdebug.h"

View File

@ -92,10 +92,10 @@ static uint64 throttling_sample;
static int64 throttling_counter; static int64 throttling_counter;
/* The minimum time required to transfer throttling_sample bytes. */ /* The minimum time required to transfer throttling_sample bytes. */
static int64 elapsed_min_unit; static TimeOffset elapsed_min_unit;
/* The last check of the transfer rate. */ /* The last check of the transfer rate. */
static int64 throttled_last; static TimestampTz throttled_last;
/* /*
* The contents of these directories are removed or recreated during server * The contents of these directories are removed or recreated during server
@ -254,7 +254,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
throttling_counter = 0; throttling_counter = 0;
/* The 'real data' starts now (header was ignored). */ /* The 'real data' starts now (header was ignored). */
throttled_last = GetCurrentIntegerTimestamp(); throttled_last = GetCurrentTimestamp();
} }
else else
{ {
@ -1333,7 +1333,7 @@ _tarWriteDir(const char *pathbuf, int basepathlen, struct stat *statbuf,
static void static void
throttle(size_t increment) throttle(size_t increment)
{ {
int64 elapsed, TimeOffset elapsed,
elapsed_min, elapsed_min,
sleep; sleep;
int wait_result; int wait_result;
@ -1346,7 +1346,7 @@ throttle(size_t increment)
return; return;
/* Time elapsed since the last measurement (and possible wake up). */ /* Time elapsed since the last measurement (and possible wake up). */
elapsed = GetCurrentIntegerTimestamp() - throttled_last; elapsed = GetCurrentTimestamp() - throttled_last;
/* How much should have elapsed at minimum? */ /* How much should have elapsed at minimum? */
elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
sleep = elapsed_min - elapsed; sleep = elapsed_min - elapsed;
@ -1381,5 +1381,5 @@ throttle(size_t increment)
* Time interval for the remaining amount and possible next increments * Time interval for the remaining amount and possible next increments
* starts now. * starts now.
*/ */
throttled_last = GetCurrentIntegerTimestamp(); throttled_last = GetCurrentTimestamp();
} }

View File

@ -985,8 +985,7 @@ ApplyLoop(void)
start_lsn = pq_getmsgint64(&s); start_lsn = pq_getmsgint64(&s);
end_lsn = pq_getmsgint64(&s); end_lsn = pq_getmsgint64(&s);
send_time = send_time = pq_getmsgint64(&s);
IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
if (last_received < start_lsn) if (last_received < start_lsn)
last_received = start_lsn; last_received = start_lsn;
@ -1005,8 +1004,7 @@ ApplyLoop(void)
bool reply_requested; bool reply_requested;
endpos = pq_getmsgint64(&s); endpos = pq_getmsgint64(&s);
timestamp = timestamp = pq_getmsgint64(&s);
IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
reply_requested = pq_getmsgbyte(&s); reply_requested = pq_getmsgbyte(&s);
send_feedback(endpos, reply_requested, false); send_feedback(endpos, reply_requested, false);

View File

@ -892,8 +892,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
/* read the fields */ /* read the fields */
dataStart = pq_getmsgint64(&incoming_message); dataStart = pq_getmsgint64(&incoming_message);
walEnd = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message);
sendTime = IntegerTimestampToTimestampTz( sendTime = pq_getmsgint64(&incoming_message);
pq_getmsgint64(&incoming_message));
ProcessWalSndrMessage(walEnd, sendTime); ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen; buf += hdrlen;
@ -913,8 +912,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
/* read the fields */ /* read the fields */
walEnd = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message);
sendTime = IntegerTimestampToTimestampTz( sendTime = pq_getmsgint64(&incoming_message);
pq_getmsgint64(&incoming_message));
replyRequested = pq_getmsgbyte(&incoming_message); replyRequested = pq_getmsgbyte(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime); ProcessWalSndrMessage(walEnd, sendTime);
@ -1149,7 +1147,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
pq_sendint64(&reply_message, writePtr); pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0); pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */ /* Send it */
@ -1241,7 +1239,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */ /* Construct the message and send it. */
resetStringInfo(&reply_message); resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'h'); pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4); pq_sendint(&reply_message, nextEpoch, 4);
walrcv_send(wrconn, reply_message.data, reply_message.len); walrcv_send(wrconn, reply_message.data, reply_message.len);

View File

@ -823,12 +823,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
dest = CreateDestReceiver(DestRemoteSimple); dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls)); MemSet(nulls, false, sizeof(nulls));
/* /*----------
* Need a tuple descriptor representing four columns: * Need a tuple descriptor representing four columns:
* - first field: the slot name * - first field: the slot name
* - second field: LSN at which we became consistent * - second field: LSN at which we became consistent
* - third field: exported snapshot's name * - third field: exported snapshot's name
* - fourth field: output plugin * - fourth field: output plugin
*----------
*/ */
tupdesc = CreateTemplateTupleDesc(4, false); tupdesc = CreateTemplateTupleDesc(4, false);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
@ -1014,7 +1015,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
* several releases by streaming physical replication. * several releases by streaming physical replication.
*/ */
resetStringInfo(&tmpbuf); resetStringInfo(&tmpbuf);
pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp()); pq_sendint64(&tmpbuf, GetCurrentTimestamp());
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64)); tmpbuf.data, sizeof(int64));
@ -2334,7 +2335,7 @@ XLogSendPhysical(void)
* Fill the send timestamp last, so that it is taken as late as possible. * Fill the send timestamp last, so that it is taken as late as possible.
*/ */
resetStringInfo(&tmpbuf); resetStringInfo(&tmpbuf);
pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp()); pq_sendint64(&tmpbuf, GetCurrentTimestamp());
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64)); tmpbuf.data, sizeof(int64));
@ -2842,7 +2843,7 @@ WalSndKeepalive(bool requestReply)
resetStringInfo(&output_message); resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k'); pq_sendbyte(&output_message, 'k');
pq_sendint64(&output_message, sentPtr); pq_sendint64(&output_message, sentPtr);
pq_sendint64(&output_message, GetCurrentIntegerTimestamp()); pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0); pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */ /* ... and send it wrapped in CopyData */

View File

@ -15,6 +15,8 @@
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "access/xact.h" #include "access/xact.h"
#include "commands/prepare.h" #include "commands/prepare.h"
#include "executor/tstoreReceiver.h" #include "executor/tstoreReceiver.h"

View File

@ -19,6 +19,7 @@
#include <ctype.h> #include <ctype.h>
#include <float.h> #include <float.h>
#include <limits.h> #include <limits.h>
#include <math.h>
#include <time.h> #include <time.h>
#include <sys/time.h> #include <sys/time.h>

View File

@ -95,6 +95,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include <limits.h>
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"

View File

@ -83,13 +83,13 @@ typedef struct OldSnapshotControlData
* only allowed to move forward. * only allowed to move forward.
*/ */
slock_t mutex_current; /* protect current_timestamp */ slock_t mutex_current; /* protect current_timestamp */
int64 current_timestamp; /* latest snapshot timestamp */ TimestampTz current_timestamp; /* latest snapshot timestamp */
slock_t mutex_latest_xmin; /* protect latest_xmin and slock_t mutex_latest_xmin; /* protect latest_xmin and
* next_map_update */ * next_map_update */
TransactionId latest_xmin; /* latest snapshot xmin */ TransactionId latest_xmin; /* latest snapshot xmin */
int64 next_map_update; /* latest snapshot valid up to */ TimestampTz next_map_update; /* latest snapshot valid up to */
slock_t mutex_threshold; /* protect threshold fields */ slock_t mutex_threshold; /* protect threshold fields */
int64 threshold_timestamp; /* earlier snapshot is old */ TimestampTz threshold_timestamp; /* earlier snapshot is old */
TransactionId threshold_xid; /* earlier xid may be gone */ TransactionId threshold_xid; /* earlier xid may be gone */
/* /*
@ -121,7 +121,7 @@ typedef struct OldSnapshotControlData
* Persistence is not needed. * Persistence is not needed.
*/ */
int head_offset; /* subscript of oldest tracked time */ int head_offset; /* subscript of oldest tracked time */
int64 head_timestamp; /* time corresponding to head xid */ TimestampTz head_timestamp; /* time corresponding to head xid */
int count_used; /* how many slots are in use */ int count_used; /* how many slots are in use */
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER]; TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
} OldSnapshotControlData; } OldSnapshotControlData;
@ -219,7 +219,7 @@ static Snapshot FirstXactSnapshot = NULL;
static List *exportedSnapshots = NIL; static List *exportedSnapshots = NIL;
/* Prototypes for local functions */ /* Prototypes for local functions */
static int64 AlignTimestampToMinuteBoundary(int64 ts); static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
static Snapshot CopySnapshot(Snapshot snapshot); static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void); static void SnapshotResetXmin(void);
@ -239,7 +239,7 @@ typedef struct SerializedSnapshotData
bool suboverflowed; bool suboverflowed;
bool takenDuringRecovery; bool takenDuringRecovery;
CommandId curcid; CommandId curcid;
int64 whenTaken; TimestampTz whenTaken;
XLogRecPtr lsn; XLogRecPtr lsn;
} SerializedSnapshotData; } SerializedSnapshotData;
@ -1611,26 +1611,29 @@ ThereAreNoPriorRegisteredSnapshots(void)
/* /*
* Return an int64 timestamp which is exactly on a minute boundary. * Return a timestamp that is exactly on a minute boundary.
* *
* If the argument is already aligned, return that value, otherwise move to * If the argument is already aligned, return that value, otherwise move to
* the next minute boundary following the given time. * the next minute boundary following the given time.
*/ */
static int64 static TimestampTz
AlignTimestampToMinuteBoundary(int64 ts) AlignTimestampToMinuteBoundary(TimestampTz ts)
{ {
int64 retval = ts + (USECS_PER_MINUTE - 1); TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
return retval - (retval % USECS_PER_MINUTE); return retval - (retval % USECS_PER_MINUTE);
} }
/* /*
* Get current timestamp for snapshots as int64 that never moves backward. * Get current timestamp for snapshots
*
* This is basically GetCurrentTimestamp(), but with a guarantee that
* the result never moves backward.
*/ */
int64 TimestampTz
GetSnapshotCurrentTimestamp(void) GetSnapshotCurrentTimestamp(void)
{ {
int64 now = GetCurrentIntegerTimestamp(); TimestampTz now = GetCurrentTimestamp();
/* /*
* Don't let time move backward; if it hasn't advanced, use the old value. * Don't let time move backward; if it hasn't advanced, use the old value.
@ -1652,10 +1655,10 @@ GetSnapshotCurrentTimestamp(void)
* XXX: So far, we never trust that a 64-bit value can be read atomically; if * XXX: So far, we never trust that a 64-bit value can be read atomically; if
* that ever changes, we could get rid of the spinlock here. * that ever changes, we could get rid of the spinlock here.
*/ */
int64 TimestampTz
GetOldSnapshotThresholdTimestamp(void) GetOldSnapshotThresholdTimestamp(void)
{ {
int64 threshold_timestamp; TimestampTz threshold_timestamp;
SpinLockAcquire(&oldSnapshotControl->mutex_threshold); SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
threshold_timestamp = oldSnapshotControl->threshold_timestamp; threshold_timestamp = oldSnapshotControl->threshold_timestamp;
@ -1665,7 +1668,7 @@ GetOldSnapshotThresholdTimestamp(void)
} }
static void static void
SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit) SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
{ {
SpinLockAcquire(&oldSnapshotControl->mutex_threshold); SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
oldSnapshotControl->threshold_timestamp = ts; oldSnapshotControl->threshold_timestamp = ts;
@ -1690,10 +1693,10 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
&& old_snapshot_threshold >= 0 && old_snapshot_threshold >= 0
&& RelationAllowsEarlyPruning(relation)) && RelationAllowsEarlyPruning(relation))
{ {
int64 ts = GetSnapshotCurrentTimestamp(); TimestampTz ts = GetSnapshotCurrentTimestamp();
TransactionId xlimit = recentXmin; TransactionId xlimit = recentXmin;
TransactionId latest_xmin; TransactionId latest_xmin;
int64 update_ts; TimestampTz update_ts;
bool same_ts_as_threshold = false; bool same_ts_as_threshold = false;
SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
@ -1790,11 +1793,11 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
* Take care of the circular buffer that maps time to xid. * Take care of the circular buffer that maps time to xid.
*/ */
void void
MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin) MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
{ {
int64 ts; TimestampTz ts;
TransactionId latest_xmin; TransactionId latest_xmin;
int64 update_ts; TimestampTz update_ts;
bool map_update_required = false; bool map_update_required = false;
/* Never call this function when old snapshot checking is disabled. */ /* Never call this function when old snapshot checking is disabled. */

View File

@ -57,7 +57,7 @@ static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false; static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false; static volatile sig_atomic_t output_reopen = false;
static bool output_isfile; static bool output_isfile;
static int64 output_last_fsync = -1; static TimestampTz output_last_fsync = -1;
static bool output_needs_fsync = false; static bool output_needs_fsync = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr; static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
@ -112,7 +112,7 @@ usage(void)
* Send a Standby Status Update message to server. * Send a Standby Status Update message to server.
*/ */
static bool static bool
sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested) sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
{ {
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
@ -175,7 +175,7 @@ disconnect_and_exit(int code)
} }
static bool static bool
OutputFsync(int64 now) OutputFsync(TimestampTz now)
{ {
output_last_fsync = now; output_last_fsync = now;
@ -212,7 +212,7 @@ StreamLogicalLog(void)
{ {
PGresult *res; PGresult *res;
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; TimestampTz last_status = -1;
int i; int i;
PQExpBuffer query; PQExpBuffer query;
@ -285,7 +285,7 @@ StreamLogicalLog(void)
int r; int r;
int bytes_left; int bytes_left;
int bytes_written; int bytes_written;
int64 now; TimestampTz now;
int hdr_len; int hdr_len;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
@ -365,8 +365,8 @@ StreamLogicalLog(void)
* response back to the client. * response back to the client.
*/ */
fd_set input_mask; fd_set input_mask;
int64 message_target = 0; TimestampTz message_target = 0;
int64 fsync_target = 0; TimestampTz fsync_target = 0;
struct timeval timeout; struct timeval timeout;
struct timeval *timeoutptr = NULL; struct timeval *timeoutptr = NULL;
@ -394,7 +394,7 @@ StreamLogicalLog(void)
/* Now compute when to wakeup. */ /* Now compute when to wakeup. */
if (message_target > 0 || fsync_target > 0) if (message_target > 0 || fsync_target > 0)
{ {
int64 targettime; TimestampTz targettime;
long secs; long secs;
int usecs; int usecs;
@ -622,7 +622,7 @@ StreamLogicalLog(void)
if (outfd != -1 && strcmp(outfile, "-") != 0) if (outfd != -1 && strcmp(outfile, "-") != 0)
{ {
int64 t = feGetCurrentTimestamp(); TimestampTz t = feGetCurrentTimestamp();
/* no need to jump to error on failure here, we're finishing anyway */ /* no need to jump to error on failure here, we're finishing anyway */
OutputFsync(t); OutputFsync(t);

View File

@ -42,15 +42,15 @@ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
int len, XLogRecPtr blockpos, int64 *last_status); int len, XLogRecPtr blockpos, TimestampTz *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
XLogRecPtr *blockpos); XLogRecPtr *blockpos);
static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
XLogRecPtr blockpos, XLogRecPtr *stoppos); XLogRecPtr blockpos, XLogRecPtr *stoppos);
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos, static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
XLogRecPtr *stoppos); XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
int64 last_status); TimestampTz last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline); uint32 *timeline);
@ -319,7 +319,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
* Send a Standby Status Update message to server. * Send a Standby Status Update message to server.
*/ */
static bool static bool
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
{ {
char replybuf[1 + 8 + 8 + 8 + 8 + 1]; char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0; int len = 0;
@ -761,7 +761,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
XLogRecPtr *stoppos) XLogRecPtr *stoppos)
{ {
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; TimestampTz last_status = -1;
XLogRecPtr blockpos = stream->startpos; XLogRecPtr blockpos = stream->startpos;
still_sending = true; still_sending = true;
@ -769,7 +769,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
while (1) while (1)
{ {
int r; int r;
int64 now; TimestampTz now;
long sleeptime; long sleeptime;
/* /*
@ -994,11 +994,11 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
*/ */
static bool static bool
ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status) XLogRecPtr blockpos, TimestampTz *last_status)
{ {
int pos; int pos;
bool replyRequested; bool replyRequested;
int64 now; TimestampTz now;
/* /*
* Parse the keepalive message, enclosed in the CopyData message. We just * Parse the keepalive message, enclosed in the CopyData message. We just
@ -1253,10 +1253,10 @@ CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
* Calculate how long send/receive loops should sleep * Calculate how long send/receive loops should sleep
*/ */
static long static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
int64 last_status) TimestampTz last_status)
{ {
int64 status_targettime = 0; TimestampTz status_targettime = 0;
long sleeptime; long sleeptime;
if (standby_message_timeout && still_sending) if (standby_message_timeout && still_sending)

View File

@ -433,20 +433,18 @@ DropReplicationSlot(PGconn *conn, const char *slot_name)
/* /*
* Frontend version of GetCurrentTimestamp(), since we are not linked with * Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The replication protocol always uses integer timestamps, * backend code.
* regardless of the server setting.
*/ */
int64 TimestampTz
feGetCurrentTimestamp(void) feGetCurrentTimestamp(void)
{ {
int64 result; TimestampTz result;
struct timeval tp; struct timeval tp;
gettimeofday(&tp, NULL); gettimeofday(&tp, NULL);
result = (int64) tp.tv_sec - result = (TimestampTz) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
result = (result * USECS_PER_SEC) + tp.tv_usec; result = (result * USECS_PER_SEC) + tp.tv_usec;
return result; return result;
@ -457,10 +455,10 @@ feGetCurrentTimestamp(void)
* backend code. * backend code.
*/ */
void void
feTimestampDifference(int64 start_time, int64 stop_time, feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs) long *secs, int *microsecs)
{ {
int64 diff = stop_time - start_time; TimestampTz diff = stop_time - start_time;
if (diff <= 0) if (diff <= 0)
{ {
@ -479,11 +477,11 @@ feTimestampDifference(int64 start_time, int64 stop_time,
* linked with backend code. * linked with backend code.
*/ */
bool bool
feTimestampDifferenceExceeds(int64 start_time, feTimestampDifferenceExceeds(TimestampTz start_time,
int64 stop_time, TimestampTz stop_time,
int msec) int msec)
{ {
int64 diff = stop_time - start_time; TimestampTz diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000)); return (diff >= msec * INT64CONST(1000));
} }

View File

@ -15,6 +15,7 @@
#include "libpq-fe.h" #include "libpq-fe.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include "datatype/timestamp.h"
extern const char *progname; extern const char *progname;
extern char *connection_string; extern char *connection_string;
@ -38,11 +39,11 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli, TimeLineID *starttli,
XLogRecPtr *startpos, XLogRecPtr *startpos,
char **db_name); char **db_name);
extern int64 feGetCurrentTimestamp(void); extern TimestampTz feGetCurrentTimestamp(void);
extern void feTimestampDifference(int64 start_time, int64 stop_time, extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs); long *secs, int *microsecs);
extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time, extern bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time,
int msec); int msec);
extern void fe_sendint64(int64 i, char *buf); extern void fe_sendint64(int64 i, char *buf);
extern int64 fe_recvint64(char *buf); extern int64 fe_recvint64(char *buf);

View File

@ -15,10 +15,6 @@
#ifndef DATATYPE_TIMESTAMP_H #ifndef DATATYPE_TIMESTAMP_H
#define DATATYPE_TIMESTAMP_H #define DATATYPE_TIMESTAMP_H
#include <math.h>
#include <limits.h>
#include <float.h>
/* /*
* Timestamp represents absolute time. * Timestamp represents absolute time.
* *

View File

@ -51,8 +51,8 @@ extern PGDLLIMPORT int old_snapshot_threshold;
extern Size SnapMgrShmemSize(void); extern Size SnapMgrShmemSize(void);
extern void SnapMgrInit(void); extern void SnapMgrInit(void);
extern int64 GetSnapshotCurrentTimestamp(void); extern TimestampTz GetSnapshotCurrentTimestamp(void);
extern int64 GetOldSnapshotThresholdTimestamp(void); extern TimestampTz GetOldSnapshotThresholdTimestamp(void);
extern bool FirstSnapshotSet; extern bool FirstSnapshotSet;
@ -93,7 +93,8 @@ extern void DeleteAllExportedSnapshotFiles(void);
extern bool ThereAreNoPriorRegisteredSnapshots(void); extern bool ThereAreNoPriorRegisteredSnapshots(void);
extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
Relation relation); Relation relation);
extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin); extern void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken,
TransactionId xmin);
extern char *ExportSnapshot(Snapshot snapshot); extern char *ExportSnapshot(Snapshot snapshot);

View File

@ -15,6 +15,7 @@
#include "access/htup.h" #include "access/htup.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
#include "datatype/timestamp.h"
#include "lib/pairingheap.h" #include "lib/pairingheap.h"
#include "storage/buf.h" #include "storage/buf.h"
@ -107,7 +108,7 @@ typedef struct SnapshotData
uint32 regd_count; /* refcount on RegisteredSnapshots */ uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */ pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
int64 whenTaken; /* timestamp when snapshot was taken */ TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */ XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData; } SnapshotData;

View File

@ -76,13 +76,6 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
TimestampTz stop_time, TimestampTz stop_time,
int msec); int msec);
/*
* Prototypes for functions to deal with integer timestamps, when the native
* format is float timestamps.
*/
#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
extern pg_time_t timestamptz_to_time_t(TimestampTz t); extern pg_time_t timestamptz_to_time_t(TimestampTz t);