mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-15 08:20:16 +08:00
Refactor the syslogger pipe protocol to use a bitmask for its options
The previous protocol expected a set of matching characters to check if a message sent was the last one or not, that changed depending on the destination wanted: - 't' and 'f' tracked the last message of a log sent to stderr. - 'T' and 'F' tracked the last message of a log sent to csvlog. This could be extended with more characters when introducing new destinations, but using a bitmask is much more elegant. This commit changes the protocol so as a bitmask is used in the header of a log chunk message sent to the syslogger, with the following options available for now: - log_destination as stderr. - log_destination as csvlog. - if a message is the last chunk of a message. Sehrope found this issue in a patch set to introduce JSON as an option for log_destination, but his patch made the size of the protocol header larger. This commit keeps the same size as the original, and adapts the protocol as wanted. Thanks also to Andrew Dunstan and Greg Stark for the discussion. Author: Michael Paquier, Sehrope Sarkuni Discussion: https://postgr.es/m/CAH7T-aqswBM6JWe4pDehi1uOiufqe06DJWaU5=X7dDLyqUExHg@mail.gmail.com
This commit is contained in:
parent
e757080e04
commit
2d77d83540
@ -38,6 +38,7 @@
|
||||
#include "nodes/pg_list.h"
|
||||
#include "pgstat.h"
|
||||
#include "pgtime.h"
|
||||
#include "port/pg_bitutils.h"
|
||||
#include "postmaster/fork_process.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
@ -885,14 +886,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
||||
{
|
||||
PipeProtoHeader p;
|
||||
int chunklen;
|
||||
bits8 dest_flags;
|
||||
|
||||
/* Do we have a valid header? */
|
||||
memcpy(&p, cursor, offsetof(PipeProtoHeader, data));
|
||||
dest_flags = p.flags & (PIPE_PROTO_DEST_STDERR | PIPE_PROTO_DEST_CSVLOG);
|
||||
if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
|
||||
p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
|
||||
p.pid != 0 &&
|
||||
(p.is_last == 't' || p.is_last == 'f' ||
|
||||
p.is_last == 'T' || p.is_last == 'F'))
|
||||
pg_popcount((char *) &dest_flags, 1) == 1)
|
||||
{
|
||||
List *buffer_list;
|
||||
ListCell *cell;
|
||||
@ -906,8 +908,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
||||
if (count < chunklen)
|
||||
break;
|
||||
|
||||
dest = (p.is_last == 'T' || p.is_last == 'F') ?
|
||||
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
|
||||
if ((p.flags & PIPE_PROTO_DEST_STDERR) != 0)
|
||||
dest = LOG_DESTINATION_STDERR;
|
||||
else if ((p.flags & PIPE_PROTO_DEST_CSVLOG) != 0)
|
||||
dest = LOG_DESTINATION_CSVLOG;
|
||||
else
|
||||
{
|
||||
/* this should never happen as of the header validation */
|
||||
Assert(false);
|
||||
}
|
||||
|
||||
/* Locate any existing buffer for this source pid */
|
||||
buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
|
||||
@ -924,7 +933,7 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
||||
free_slot = buf;
|
||||
}
|
||||
|
||||
if (p.is_last == 'f' || p.is_last == 'F')
|
||||
if ((p.flags & PIPE_PROTO_IS_LAST) == 0)
|
||||
{
|
||||
/*
|
||||
* Save a complete non-final chunk in a per-pid buffer
|
||||
|
@ -3250,11 +3250,16 @@ write_pipe_chunks(char *data, int len, int dest)
|
||||
|
||||
p.proto.nuls[0] = p.proto.nuls[1] = '\0';
|
||||
p.proto.pid = MyProcPid;
|
||||
p.proto.flags = 0;
|
||||
if (dest == LOG_DESTINATION_STDERR)
|
||||
p.proto.flags |= PIPE_PROTO_DEST_STDERR;
|
||||
else if (dest == LOG_DESTINATION_CSVLOG)
|
||||
p.proto.flags |= PIPE_PROTO_DEST_CSVLOG;
|
||||
|
||||
/* write all but the last chunk */
|
||||
while (len > PIPE_MAX_PAYLOAD)
|
||||
{
|
||||
p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'F' : 'f');
|
||||
/* no need to set PIPE_PROTO_IS_LAST yet */
|
||||
p.proto.len = PIPE_MAX_PAYLOAD;
|
||||
memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
|
||||
rc = write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
|
||||
@ -3264,7 +3269,7 @@ write_pipe_chunks(char *data, int len, int dest)
|
||||
}
|
||||
|
||||
/* write the last chunk */
|
||||
p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'T' : 't');
|
||||
p.proto.flags |= PIPE_PROTO_IS_LAST;
|
||||
p.proto.len = len;
|
||||
memcpy(p.proto.data, data, len);
|
||||
rc = write(fd, &p, PIPE_HEADER_SIZE + len);
|
||||
|
@ -46,8 +46,7 @@ typedef struct
|
||||
char nuls[2]; /* always \0\0 */
|
||||
uint16 len; /* size of this chunk (counts data only) */
|
||||
int32 pid; /* writer's pid */
|
||||
char is_last; /* last chunk of message? 't' or 'f' ('T' or
|
||||
* 'F' for CSV case) */
|
||||
bits8 flags; /* bitmask of PIPE_PROTO_* */
|
||||
char data[FLEXIBLE_ARRAY_MEMBER]; /* data payload starts here */
|
||||
} PipeProtoHeader;
|
||||
|
||||
@ -60,6 +59,11 @@ typedef union
|
||||
#define PIPE_HEADER_SIZE offsetof(PipeProtoHeader, data)
|
||||
#define PIPE_MAX_PAYLOAD ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
|
||||
|
||||
/* flag bits for PipeProtoHeader->flags */
|
||||
#define PIPE_PROTO_IS_LAST 0x01 /* last chunk of message? */
|
||||
/* log destinations */
|
||||
#define PIPE_PROTO_DEST_STDERR 0x10
|
||||
#define PIPE_PROTO_DEST_CSVLOG 0x20
|
||||
|
||||
/* GUC options */
|
||||
extern bool Logging_collector;
|
||||
|
Loading…
Reference in New Issue
Block a user