mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-03-07 19:47:50 +08:00
Use a nonblocking socket for FE/BE communication and block using latches.
This allows to introduce more elaborate handling of interrupts while reading from a socket. Currently some interrupt handlers have to do significant work from inside signal handlers, and it's very hard to correctly write code to do so. Generic signal handler limitations, combined with the fact that we can't safely jump out of a signal handler while reading from the client have prohibited implementation of features like timeouts for idle-in-transaction. Additionally we use the latch code to wait in a couple places where we previously only had waiting code on windows as other platforms just busy looped. This can increase the number of systemcalls happening during FE/BE communication. Benchmarks so far indicate that the impact isn't very high, and there's room for optimization in the latch code. The chance of cleaning up the usage of latches gives us, seem to outweigh the risk of small performance regressions. This commit theoretically can't used without the next patch in the series, as WaitLatchOrSocket is not defined to be fully signal safe. As we already do that in some cases though, it seems better to keep the commits separate, so they're easier to understand. Author: Andres Freund Reviewed-By: Heikki Linnakangas
This commit is contained in:
parent
778d498c7d
commit
387da18874
@ -71,6 +71,8 @@
|
||||
#endif
|
||||
|
||||
#include "libpq/libpq.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/latch.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
@ -338,6 +340,7 @@ be_tls_open_server(Port *port)
|
||||
{
|
||||
int r;
|
||||
int err;
|
||||
int waitfor;
|
||||
|
||||
Assert(!port->ssl);
|
||||
Assert(!port->peer);
|
||||
@ -371,12 +374,15 @@ aloop:
|
||||
{
|
||||
case SSL_ERROR_WANT_READ:
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
#ifdef WIN32
|
||||
pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
|
||||
(err == SSL_ERROR_WANT_READ) ?
|
||||
FD_READ | FD_CLOSE | FD_ACCEPT : FD_WRITE | FD_CLOSE,
|
||||
INFINITE);
|
||||
#endif
|
||||
/* not allowed during connection establishment */
|
||||
Assert(!port->noblock);
|
||||
|
||||
if (err == SSL_ERROR_WANT_READ)
|
||||
waitfor = WL_SOCKET_READABLE;
|
||||
else
|
||||
waitfor = WL_SOCKET_WRITEABLE;
|
||||
|
||||
WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
|
||||
goto aloop;
|
||||
case SSL_ERROR_SYSCALL:
|
||||
if (r < 0)
|
||||
@ -504,6 +510,7 @@ be_tls_read(Port *port, void *ptr, size_t len)
|
||||
{
|
||||
ssize_t n;
|
||||
int err;
|
||||
int waitfor;
|
||||
|
||||
rloop:
|
||||
errno = 0;
|
||||
@ -516,18 +523,20 @@ rloop:
|
||||
break;
|
||||
case SSL_ERROR_WANT_READ:
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
/* Don't retry if the socket is in nonblocking mode. */
|
||||
if (port->noblock)
|
||||
{
|
||||
errno = EWOULDBLOCK;
|
||||
n = -1;
|
||||
break;
|
||||
}
|
||||
#ifdef WIN32
|
||||
pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
|
||||
(err == SSL_ERROR_WANT_READ) ?
|
||||
FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE,
|
||||
INFINITE);
|
||||
#endif
|
||||
|
||||
if (err == SSL_ERROR_WANT_READ)
|
||||
waitfor = WL_SOCKET_READABLE;
|
||||
else
|
||||
waitfor = WL_SOCKET_WRITEABLE;
|
||||
|
||||
WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
|
||||
goto rloop;
|
||||
case SSL_ERROR_SYSCALL:
|
||||
/* leave it to caller to ereport the value of errno */
|
||||
@ -567,6 +576,7 @@ be_tls_write(Port *port, void *ptr, size_t len)
|
||||
{
|
||||
ssize_t n;
|
||||
int err;
|
||||
int waitfor;
|
||||
|
||||
/*
|
||||
* If SSL renegotiations are enabled and we're getting close to the
|
||||
@ -630,12 +640,13 @@ wloop:
|
||||
break;
|
||||
case SSL_ERROR_WANT_READ:
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
#ifdef WIN32
|
||||
pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl),
|
||||
(err == SSL_ERROR_WANT_READ) ?
|
||||
FD_READ | FD_CLOSE : FD_WRITE | FD_CLOSE,
|
||||
INFINITE);
|
||||
#endif
|
||||
|
||||
if (err == SSL_ERROR_WANT_READ)
|
||||
waitfor = WL_SOCKET_READABLE;
|
||||
else
|
||||
waitfor = WL_SOCKET_WRITEABLE;
|
||||
|
||||
WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
|
||||
goto wloop;
|
||||
case SSL_ERROR_SYSCALL:
|
||||
/* leave it to caller to ereport the value of errno */
|
||||
@ -722,7 +733,7 @@ my_sock_read(BIO *h, char *buf, int size)
|
||||
if (res <= 0)
|
||||
{
|
||||
/* If we were interrupted, tell caller to retry */
|
||||
if (errno == EINTR)
|
||||
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
|
||||
{
|
||||
BIO_set_retry_read(h);
|
||||
}
|
||||
@ -741,7 +752,8 @@ my_sock_write(BIO *h, const char *buf, int size)
|
||||
BIO_clear_retry_flags(h);
|
||||
if (res <= 0)
|
||||
{
|
||||
if (errno == EINTR)
|
||||
/* If we were interrupted, tell caller to retry */
|
||||
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
|
||||
{
|
||||
BIO_set_retry_write(h);
|
||||
}
|
||||
|
@ -32,8 +32,10 @@
|
||||
#endif
|
||||
|
||||
#include "libpq/libpq.h"
|
||||
#include "miscadmin.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "storage/proc.h"
|
||||
|
||||
|
||||
char *ssl_cert_file;
|
||||
@ -147,7 +149,39 @@ secure_raw_read(Port *port, void *ptr, size_t len)
|
||||
|
||||
prepare_for_client_read();
|
||||
|
||||
/*
|
||||
* Try to read from the socket without blocking. If it succeeds we're
|
||||
* done, otherwise we'll wait for the socket using the latch mechanism.
|
||||
*/
|
||||
rloop:
|
||||
#ifdef WIN32
|
||||
pgwin32_noblock = true;
|
||||
#endif
|
||||
n = recv(port->sock, ptr, len, 0);
|
||||
#ifdef WIN32
|
||||
pgwin32_noblock = false;
|
||||
#endif
|
||||
|
||||
if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
|
||||
{
|
||||
int w;
|
||||
int save_errno = errno;
|
||||
|
||||
w = WaitLatchOrSocket(MyLatch,
|
||||
WL_SOCKET_READABLE,
|
||||
port->sock, 0);
|
||||
|
||||
if (w & WL_SOCKET_READABLE)
|
||||
{
|
||||
goto rloop;
|
||||
}
|
||||
|
||||
/*
|
||||
* Restore errno, clobbered by WaitLatchOrSocket, so the caller can
|
||||
* react properly.
|
||||
*/
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
client_read_ended();
|
||||
|
||||
@ -170,7 +204,9 @@ secure_write(Port *port, void *ptr, size_t len)
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
n = secure_raw_write(port, ptr, len);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
@ -178,5 +214,44 @@ secure_write(Port *port, void *ptr, size_t len)
|
||||
ssize_t
|
||||
secure_raw_write(Port *port, const void *ptr, size_t len)
|
||||
{
|
||||
return send(port->sock, ptr, len, 0);
|
||||
ssize_t n;
|
||||
|
||||
wloop:
|
||||
|
||||
#ifdef WIN32
|
||||
pgwin32_noblock = true;
|
||||
#endif
|
||||
n = send(port->sock, ptr, len, 0);
|
||||
#ifdef WIN32
|
||||
pgwin32_noblock = false;
|
||||
#endif
|
||||
|
||||
if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
|
||||
{
|
||||
int w;
|
||||
int save_errno = errno;
|
||||
|
||||
/*
|
||||
* We probably want to check for latches being set at some point
|
||||
* here. That'd allow us to handle interrupts while blocked on
|
||||
* writes. If set we'd not retry directly, but return. That way we
|
||||
* don't do anything while (possibly) inside a ssl library.
|
||||
*/
|
||||
w = WaitLatchOrSocket(MyLatch,
|
||||
WL_SOCKET_WRITEABLE,
|
||||
port->sock, 0);
|
||||
|
||||
if (w & WL_SOCKET_WRITEABLE)
|
||||
{
|
||||
goto wloop;
|
||||
}
|
||||
|
||||
/*
|
||||
* Restore errno, clobbered by WaitLatchOrSocket, so the caller can
|
||||
* react properly.
|
||||
*/
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
@ -181,6 +181,22 @@ pq_init(void)
|
||||
PqCommReadingMsg = false;
|
||||
DoingCopyOut = false;
|
||||
on_proc_exit(socket_close, 0);
|
||||
|
||||
/*
|
||||
* In backends (as soon as forked) we operate the underlying socket in
|
||||
* nonblocking mode and use latches to implement blocking semantics if
|
||||
* needed. That allows us to provide safely interruptible reads.
|
||||
*
|
||||
* Use COMMERROR on failure, because ERROR would try to send the error to
|
||||
* the client, which might require changing the mode again, leading to
|
||||
* infinite recursion.
|
||||
*/
|
||||
#ifndef WIN32
|
||||
if (!pg_set_noblock(MyProcPort->sock))
|
||||
ereport(COMMERROR,
|
||||
(errmsg("could not set socket to nonblocking mode: %m")));
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
@ -820,31 +836,6 @@ socket_set_nonblocking(bool nonblocking)
|
||||
(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||
errmsg("there is no client connection")));
|
||||
|
||||
if (MyProcPort->noblock == nonblocking)
|
||||
return;
|
||||
|
||||
#ifdef WIN32
|
||||
pgwin32_noblock = nonblocking ? 1 : 0;
|
||||
#else
|
||||
|
||||
/*
|
||||
* Use COMMERROR on failure, because ERROR would try to send the error to
|
||||
* the client, which might require changing the mode again, leading to
|
||||
* infinite recursion.
|
||||
*/
|
||||
if (nonblocking)
|
||||
{
|
||||
if (!pg_set_noblock(MyProcPort->sock))
|
||||
ereport(COMMERROR,
|
||||
(errmsg("could not set socket to nonblocking mode: %m")));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!pg_set_block(MyProcPort->sock))
|
||||
ereport(COMMERROR,
|
||||
(errmsg("could not set socket to blocking mode: %m")));
|
||||
}
|
||||
#endif
|
||||
MyProcPort->noblock = nonblocking;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user