[svn-r4090]

Purpose:
    Implemented port hunting
    Really catch SIGPIPE signals
Description:
    Port hunting allows to try and bind to a successive port number
    if the port number given in the filename is already used.

    Because of a stupid typo in the code, SIGPIPE wasn't catched
    which might have caused the sending side to hang.
Solution:
    The Stream VFD's file access property list was extended by two
    elements:
      - maxhunt: how many successive ports to try if the one
                 given in the filename is already in use
                 Default is not to hunt for additional ports.
      - port:    port number which is finally used to bind a socket
                 This might be different to the port number
                 as given in the filename if port hunting is enabled.
                 The H5Pget_fapl_stream() can be used to obtain
                 this port number.
Platforms tested:
    x86 Linux, Irix 32/64 bit, Dec Alpha, Unicos on T3E, AIX on SP2
    Hitachi SR8000
This commit is contained in:
Thomas Radke 2001-07-02 07:57:46 -05:00
parent 8c2c4cd51c
commit 9f06972b48
2 changed files with 63 additions and 29 deletions

View File

@ -21,7 +21,7 @@
*
*/
#include "H5public.h" /* H5_HAVE_STREAM */
#include "H5public.h" /* H5_HAVE_STREAM */
/* Only build this driver if it was configured with --with-Stream-VFD */
#ifdef H5_HAVE_STREAM
@ -71,12 +71,10 @@ typedef int socklen_t;
#ifdef H5FD_STREAM_HAVE_UNIX_SOCKETS
#define H5FD_STREAM_CLOSE_SOCKET(a) close(a)
#define H5FD_STREAM_IOCTL_SOCKET(a, b, c) ioctl(a, b, c)
#define H5FD_STREAM_INVALID_SOCKET -1
#define H5FD_STREAM_ERROR_CHECK(rc) ((rc) < 0)
#else
#define H5FD_STREAM_CLOSE_SOCKET(a) closesocket (a)
#define H5FD_STREAM_IOCTL_SOCKET(a, b, c) ioctlsocket (a, b, (u_long *) (c))
#define H5FD_STREAM_INVALID_SOCKET SOCKET_ERROR
#define H5FD_STREAM_ERROR_CHECK(rc) ((rc) == (SOCKET) (SOCKET_ERROR))
#endif
@ -107,15 +105,21 @@ typedef struct H5FD_stream_t
/* default backlog argument for listen call */
#define H5FD_STREAM_BACKLOG 1
/* number of successive ports to hunt for until bind(2) succeeds
(default 0 means no port hunting - only try the one given in the filename) */
#define H5FD_STREAM_MAXHUNT 0
/* default file access property list */
static const H5FD_stream_fapl_t default_fapl =
{
H5FD_STREAM_INCREMENT, /* address space allocation blocksize */
H5FD_STREAM_INVALID_SOCKET, /* no external socket descriptor */
H5FD_STREAM_INVALID_SOCKET, /* no external socket descriptor */
TRUE, /* enable I/O on socket */
H5FD_STREAM_BACKLOG, /* default backlog for listen(2) */
NULL, /* do not broadcast received files */
NULL /* argument to READ broadcast routine */
NULL, /* argument to READ broadcast routine */
H5FD_STREAM_MAXHUNT, /* default number of ports to hunt */
0 /* unknown port for unbound socket */
};
/*
@ -282,6 +286,7 @@ herr_t H5Pset_fapl_stream (hid_t fapl_id, H5FD_stream_fapl_t *fapl)
{
user_fapl.increment = H5FD_STREAM_INCREMENT;
}
user_fapl.port = 0;
result = H5Pset_driver (fapl_id, H5FD_STREAM, &user_fapl);
}
else
@ -374,15 +379,15 @@ static void *H5FD_stream_fapl_get (H5FD_t *_stream)
static H5FD_STREAM_SOCKET_TYPE
H5FDstream_open_socket (const char *filename, int o_flags,
int backlog,
H5FD_stream_fapl_t *fapl,
const char **errormsg,
H5E_major_t *major, H5E_minor_t *minor)
{
struct sockaddr_in server;
struct hostent *he;
unsigned short int port;
H5FD_STREAM_SOCKET_TYPE sock;
char *hostname;
unsigned short int first_port;
const char *separator, *tmp;
int on = 1;
@ -431,11 +436,11 @@ H5FDstream_open_socket (const char *filename, int o_flags,
HDstrncpy (hostname, filename, (size_t)(separator - filename));
hostname[separator - filename] = 0;
port = atoi (separator + 1);
fapl->port = atoi (separator + 1);
HDmemset (&server, 0, sizeof (server));
server.sin_family = AF_INET;
server.sin_port = htons (port);
server.sin_port = htons (fapl->port);
if (! (he = gethostbyname (hostname)))
{
@ -453,7 +458,7 @@ H5FDstream_open_socket (const char *filename, int o_flags,
HDmemcpy (&server.sin_addr, he->h_addr, (size_t)he->h_length);
#ifdef DEBUG
fprintf (stderr, "Stream VFD: connecting to host '%s' port %d\n",
hostname, port);
hostname, fapl->port);
#endif
if (connect (sock, (struct sockaddr *) &server, sizeof (server)) < 0)
{
@ -477,13 +482,35 @@ H5FDstream_open_socket (const char *filename, int o_flags,
{
*errormsg = "unable to set socket option SO_REUSEADDR";
}
else if (bind (sock, (struct sockaddr *) &server, sizeof (server)) < 0)
else
{
*errormsg = "unable to bind socket";
}
else if (listen (sock, backlog) < 0)
{
*errormsg = "unable to listen on socket";
/* Try to bind the socket to the given port.
If maxhunt is given try some successive ports also. */
first_port = fapl->port;
while (fapl->port <= first_port + fapl->maxhunt)
{
#ifdef DEBUG
fprintf (stderr, "Stream VFD: binding to port %d\n", fapl->port);
#endif
server.sin_port = htons (fapl->port);
if (bind (sock, (struct sockaddr *) &server, sizeof (server)) < 0)
{
fapl->port++;
}
else
{
break;
}
}
if (fapl->port > first_port + fapl->maxhunt)
{
fapl->port = 0;
*errormsg = "unable to bind socket";
}
else if (listen (sock, fapl->backlog) < 0)
{
*errormsg = "unable to listen on socket";
}
}
}
}
@ -675,8 +702,14 @@ static H5FD_t *H5FD_stream_open (const char *filename,
else
{
_stream.internal_socket = TRUE;
_stream.socket = H5FDstream_open_socket (filename, o_flags, fapl->backlog,
_stream.socket = H5FDstream_open_socket (filename, o_flags, &_stream.fapl,
&errormsg, &major, &minor);
if (_stream.socket != H5FD_STREAM_INVALID_SOCKET)
{
/* update the port ID in the file access property
so that it can be queried via H5P_get_fapl_stream() later on */
H5Pset_driver (fapl_id, H5FD_STREAM, &_stream.fapl);
}
}
}
else
@ -805,7 +838,7 @@ static herr_t H5FD_stream_flush (H5FD_t *_stream)
if (H5FD_STREAM_IOCTL_SOCKET (sock, FIONBIO, &on) < 0)
{
H5FD_STREAM_CLOSE_SOCKET (sock);
continue; /* continue the loop for other clients' requests */
continue; /* continue the loop for other clients to connect */
}
size = stream->eof;
@ -814,16 +847,15 @@ static herr_t H5FD_stream_flush (H5FD_t *_stream)
while (size)
{
bytes_send = send (sock, ptr, size, 0);
if (bytes_send < 0 && (EINTR == errno || EAGAIN == errno || EWOULDBLOCK))
{
continue;
}
if (bytes_send < 0)
{
H5FD_STREAM_CLOSE_SOCKET (sock);
/* FIXME: continue the loop for other clients here */
HRETURN_ERROR (H5E_IO, H5E_WRITEERROR, FAIL,
"error writing to socket");
if (EINTR == errno || EAGAIN == errno || EWOULDBLOCK == errno)
{
continue;
}
/* continue the outermost loop for other clients to connect */
break;
}
ptr += bytes_send;
size -= bytes_send;
@ -864,7 +896,7 @@ static herr_t H5FD_stream_close (H5FD_t *_stream)
FUNC_ENTER (H5FD_stream_close, FAIL);
/* Flush */
if (H5FD_stream_flush (_stream) < 0)
if (H5FD_stream_flush (_stream) != SUCCEED)
{
HRETURN_ERROR (H5E_FILE, H5E_CANTFLUSH, FAIL, "unable to flush file");
}

View File

@ -40,7 +40,7 @@
#include <winsock.h>
#define H5FD_STREAM_SOCKET_TYPE SOCKET
#define H5FD_STREAM_INVALID_SOCKET SOCKET_ERROR
#define H5FD_STREAM_INVALID_SOCKET INVALID_SOCKET
#endif
@ -59,11 +59,13 @@ typedef int (*H5FD_stream_broadcast_t) (unsigned char **file,
typedef struct H5FD_stream_fapl_t
{
size_t increment; /* how much to grow memory in reallocs */
H5FD_STREAM_SOCKET_TYPE socket; /* external socket descriptor */
H5FD_STREAM_SOCKET_TYPE socket; /* externally provided socket descriptor*/
hbool_t do_socket_io; /* do I/O on socket */
int backlog; /* backlog argument for listen call */
H5FD_stream_broadcast_t broadcast_fn; /* READ broadcast callback */
void *broadcast_arg; /* READ broadcast callback user argument*/
unsigned int maxhunt; /* how many more ports to try to bind to*/
unsigned short int port; /* port a socket was bound/connected to */
} H5FD_stream_fapl_t;