mirror of
git://gcc.gnu.org/git/gcc.git
synced 2025-04-25 15:00:25 +08:00
PR go/56172 net: Fixes for select based pollster. Make Close work properly, mainly for testing. Restart the select if a descriptor is closed. From-SVN: r195823
672 lines
14 KiB
Go
672 lines
14 KiB
Go
// Copyright 2009 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// +build darwin freebsd linux netbsd openbsd
|
|
|
|
package net
|
|
|
|
import (
|
|
"io"
|
|
"os"
|
|
"runtime"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
// Network file descriptor.
|
|
type netFD struct {
|
|
// locking/lifetime of sysfd
|
|
sysmu sync.Mutex
|
|
sysref int
|
|
|
|
// must lock both sysmu and pollserver to write
|
|
// can lock either to read
|
|
closing bool
|
|
|
|
// immutable until Close
|
|
sysfd int
|
|
family int
|
|
sotype int
|
|
isConnected bool
|
|
sysfile *os.File
|
|
cr chan error
|
|
cw chan error
|
|
net string
|
|
laddr Addr
|
|
raddr Addr
|
|
|
|
// serialize access to Read and Write methods
|
|
rio, wio sync.Mutex
|
|
|
|
// read and write deadlines
|
|
rdeadline, wdeadline deadline
|
|
|
|
// owned by fd wait server
|
|
ncr, ncw int
|
|
|
|
// wait server
|
|
pollServer *pollServer
|
|
}
|
|
|
|
// A pollServer helps FDs determine when to retry a non-blocking
|
|
// read or write after they get EAGAIN. When an FD needs to wait,
|
|
// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server.
|
|
// When the pollServer finds that i/o on FD should be possible
|
|
// again, it will send on fd.cr/fd.cw to wake any waiting goroutines.
|
|
//
|
|
// To avoid races in closing, all fd operations are locked and
|
|
// refcounted. when netFD.Close() is called, it calls syscall.Shutdown
|
|
// and sets a closing flag. Only when the last reference is removed
|
|
// will the fd be closed.
|
|
|
|
type pollServer struct {
|
|
pr, pw *os.File
|
|
poll *pollster // low-level OS hooks
|
|
sync.Mutex // controls pending and deadline
|
|
pending map[int]*netFD
|
|
deadline int64 // next deadline (nsec since 1970)
|
|
}
|
|
|
|
func (s *pollServer) AddFD(fd *netFD, mode int) error {
|
|
s.Lock()
|
|
intfd := fd.sysfd
|
|
if intfd < 0 || fd.closing {
|
|
// fd closed underfoot
|
|
s.Unlock()
|
|
return errClosing
|
|
}
|
|
|
|
var t int64
|
|
key := intfd << 1
|
|
if mode == 'r' {
|
|
fd.ncr++
|
|
t = fd.rdeadline.value()
|
|
} else {
|
|
fd.ncw++
|
|
key++
|
|
t = fd.wdeadline.value()
|
|
}
|
|
s.pending[key] = fd
|
|
doWakeup := false
|
|
if t > 0 && (s.deadline == 0 || t < s.deadline) {
|
|
s.deadline = t
|
|
doWakeup = true
|
|
}
|
|
|
|
wake, err := s.poll.AddFD(intfd, mode, false)
|
|
s.Unlock()
|
|
if err != nil {
|
|
return &OpError{"addfd", fd.net, fd.laddr, err}
|
|
}
|
|
if wake || doWakeup {
|
|
s.Wakeup()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Evict evicts fd from the pending list, unblocking
|
|
// any I/O running on fd. The caller must have locked
|
|
// pollserver.
|
|
func (s *pollServer) Evict(fd *netFD) {
|
|
doWakeup := false
|
|
if s.pending[fd.sysfd<<1] == fd {
|
|
s.WakeFD(fd, 'r', errClosing)
|
|
if s.poll.DelFD(fd.sysfd, 'r') {
|
|
doWakeup = true
|
|
}
|
|
delete(s.pending, fd.sysfd<<1)
|
|
}
|
|
if s.pending[fd.sysfd<<1|1] == fd {
|
|
s.WakeFD(fd, 'w', errClosing)
|
|
if s.poll.DelFD(fd.sysfd, 'w') {
|
|
doWakeup = true
|
|
}
|
|
delete(s.pending, fd.sysfd<<1|1)
|
|
}
|
|
if doWakeup {
|
|
s.Wakeup()
|
|
}
|
|
}
|
|
|
|
var wakeupbuf [1]byte
|
|
|
|
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
|
|
|
|
func (s *pollServer) LookupFD(fd int, mode int) *netFD {
|
|
key := fd << 1
|
|
if mode == 'w' {
|
|
key++
|
|
}
|
|
netfd, ok := s.pending[key]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
delete(s.pending, key)
|
|
return netfd
|
|
}
|
|
|
|
func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
|
|
if mode == 'r' {
|
|
for fd.ncr > 0 {
|
|
fd.ncr--
|
|
fd.cr <- err
|
|
}
|
|
} else {
|
|
for fd.ncw > 0 {
|
|
fd.ncw--
|
|
fd.cw <- err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pollServer) CheckDeadlines() {
|
|
now := time.Now().UnixNano()
|
|
// TODO(rsc): This will need to be handled more efficiently,
|
|
// probably with a heap indexed by wakeup time.
|
|
|
|
var nextDeadline int64
|
|
for key, fd := range s.pending {
|
|
var t int64
|
|
var mode int
|
|
if key&1 == 0 {
|
|
mode = 'r'
|
|
} else {
|
|
mode = 'w'
|
|
}
|
|
if mode == 'r' {
|
|
t = fd.rdeadline.value()
|
|
} else {
|
|
t = fd.wdeadline.value()
|
|
}
|
|
if t > 0 {
|
|
if t <= now {
|
|
delete(s.pending, key)
|
|
if mode == 'r' {
|
|
s.poll.DelFD(fd.sysfd, mode)
|
|
} else {
|
|
s.poll.DelFD(fd.sysfd, mode)
|
|
}
|
|
s.WakeFD(fd, mode, errTimeout)
|
|
} else if nextDeadline == 0 || t < nextDeadline {
|
|
nextDeadline = t
|
|
}
|
|
}
|
|
}
|
|
s.deadline = nextDeadline
|
|
}
|
|
|
|
func (s *pollServer) Run() {
|
|
var scratch [100]byte
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
for {
|
|
var timeout int64 // nsec to wait for or 0 for none
|
|
if s.deadline > 0 {
|
|
timeout = s.deadline - time.Now().UnixNano()
|
|
if timeout <= 0 {
|
|
s.CheckDeadlines()
|
|
continue
|
|
}
|
|
}
|
|
fd, mode, err := s.poll.WaitFD(s, timeout)
|
|
if err != nil {
|
|
print("pollServer WaitFD: ", err.Error(), "\n")
|
|
return
|
|
}
|
|
if fd < 0 {
|
|
// Timeout happened.
|
|
s.CheckDeadlines()
|
|
continue
|
|
}
|
|
if fd == int(s.pr.Fd()) {
|
|
// Drain our wakeup pipe (we could loop here,
|
|
// but it's unlikely that there are more than
|
|
// len(scratch) wakeup calls).
|
|
s.pr.Read(scratch[0:])
|
|
s.CheckDeadlines()
|
|
} else {
|
|
netfd := s.LookupFD(fd, mode)
|
|
if netfd == nil {
|
|
// This can happen because the WaitFD runs without
|
|
// holding s's lock, so there might be a pending wakeup
|
|
// for an fd that has been evicted. No harm done.
|
|
continue
|
|
}
|
|
s.WakeFD(netfd, mode, nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *pollServer) WaitRead(fd *netFD) error {
|
|
err := s.AddFD(fd, 'r')
|
|
if err == nil {
|
|
err = <-fd.cr
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *pollServer) WaitWrite(fd *netFD) error {
|
|
err := s.AddFD(fd, 'w')
|
|
if err == nil {
|
|
err = <-fd.cw
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Network FD methods.
|
|
// Spread network FDs over several pollServers.
|
|
|
|
var pollMaxN int
|
|
var pollservers []*pollServer
|
|
var startServersOnce []func()
|
|
|
|
var canCancelIO = true // used for testing current package
|
|
|
|
func sysInit() {
|
|
pollMaxN = runtime.NumCPU()
|
|
if pollMaxN > 8 {
|
|
pollMaxN = 8 // No improvement then.
|
|
}
|
|
pollservers = make([]*pollServer, pollMaxN)
|
|
startServersOnce = make([]func(), pollMaxN)
|
|
for i := 0; i < pollMaxN; i++ {
|
|
k := i
|
|
once := new(sync.Once)
|
|
startServersOnce[i] = func() { once.Do(func() { startServer(k) }) }
|
|
}
|
|
}
|
|
|
|
func startServer(k int) {
|
|
p, err := newPollServer()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
pollservers[k] = p
|
|
}
|
|
|
|
func server(fd int) *pollServer {
|
|
pollN := runtime.GOMAXPROCS(0)
|
|
if pollN > pollMaxN {
|
|
pollN = pollMaxN
|
|
}
|
|
k := fd % pollN
|
|
startServersOnce[k]()
|
|
return pollservers[k]
|
|
}
|
|
|
|
func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) {
|
|
deadline := time.Now().Add(timeout)
|
|
_, addri, err := resolveNetAddr("dial", net, addr, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return dialAddr(net, addr, addri, deadline)
|
|
}
|
|
|
|
func newFD(fd, family, sotype int, net string) (*netFD, error) {
|
|
netfd := &netFD{
|
|
sysfd: fd,
|
|
family: family,
|
|
sotype: sotype,
|
|
net: net,
|
|
}
|
|
netfd.cr = make(chan error, 1)
|
|
netfd.cw = make(chan error, 1)
|
|
netfd.pollServer = server(fd)
|
|
return netfd, nil
|
|
}
|
|
|
|
func (fd *netFD) setAddr(laddr, raddr Addr) {
|
|
fd.laddr = laddr
|
|
fd.raddr = raddr
|
|
fd.sysfile = os.NewFile(uintptr(fd.sysfd), fd.net)
|
|
}
|
|
|
|
func (fd *netFD) name() string {
|
|
var ls, rs string
|
|
if fd.laddr != nil {
|
|
ls = fd.laddr.String()
|
|
}
|
|
if fd.raddr != nil {
|
|
rs = fd.raddr.String()
|
|
}
|
|
return fd.net + ":" + ls + "->" + rs
|
|
}
|
|
|
|
func (fd *netFD) connect(ra syscall.Sockaddr) error {
|
|
err := syscall.Connect(fd.sysfd, ra)
|
|
if err == syscall.EINPROGRESS {
|
|
if err = fd.pollServer.WaitWrite(fd); err != nil {
|
|
return err
|
|
}
|
|
var e int
|
|
e, err = syscall.GetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
|
|
if err != nil {
|
|
return os.NewSyscallError("getsockopt", err)
|
|
}
|
|
if e != 0 {
|
|
err = syscall.Errno(e)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Add a reference to this fd.
|
|
// If closing==true, pollserver must be locked; mark the fd as closing.
|
|
// Returns an error if the fd cannot be used.
|
|
func (fd *netFD) incref(closing bool) error {
|
|
fd.sysmu.Lock()
|
|
if fd.closing {
|
|
fd.sysmu.Unlock()
|
|
return errClosing
|
|
}
|
|
fd.sysref++
|
|
if closing {
|
|
fd.closing = true
|
|
}
|
|
fd.sysmu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Remove a reference to this FD and close if we've been asked to do so (and
|
|
// there are no references left.
|
|
func (fd *netFD) decref() {
|
|
fd.sysmu.Lock()
|
|
fd.sysref--
|
|
if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
|
|
fd.sysfile.Close()
|
|
fd.sysfile = nil
|
|
fd.sysfd = -1
|
|
}
|
|
fd.sysmu.Unlock()
|
|
}
|
|
|
|
func (fd *netFD) Close() error {
|
|
fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict
|
|
if err := fd.incref(true); err != nil {
|
|
fd.pollServer.Unlock()
|
|
return err
|
|
}
|
|
// Unblock any I/O. Once it all unblocks and returns,
|
|
// so that it cannot be referring to fd.sysfd anymore,
|
|
// the final decref will close fd.sysfd. This should happen
|
|
// fairly quickly, since all the I/O is non-blocking, and any
|
|
// attempts to block in the pollserver will return errClosing.
|
|
fd.pollServer.Evict(fd)
|
|
fd.pollServer.Unlock()
|
|
fd.decref()
|
|
return nil
|
|
}
|
|
|
|
func (fd *netFD) shutdown(how int) error {
|
|
if err := fd.incref(false); err != nil {
|
|
return err
|
|
}
|
|
defer fd.decref()
|
|
err := syscall.Shutdown(fd.sysfd, how)
|
|
if err != nil {
|
|
return &OpError{"shutdown", fd.net, fd.laddr, err}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (fd *netFD) CloseRead() error {
|
|
return fd.shutdown(syscall.SHUT_RD)
|
|
}
|
|
|
|
func (fd *netFD) CloseWrite() error {
|
|
return fd.shutdown(syscall.SHUT_WR)
|
|
}
|
|
|
|
func (fd *netFD) Read(p []byte) (n int, err error) {
|
|
fd.rio.Lock()
|
|
defer fd.rio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.rdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
n, err = syscall.Read(int(fd.sysfd), p)
|
|
if err != nil {
|
|
n = 0
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitRead(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
err = chkReadErr(n, err, fd)
|
|
break
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
err = &OpError{"read", fd.net, fd.raddr, err}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
|
|
fd.rio.Lock()
|
|
defer fd.rio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.rdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
|
|
if err != nil {
|
|
n = 0
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitRead(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
err = chkReadErr(n, err, fd)
|
|
break
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
err = &OpError{"read", fd.net, fd.laddr, err}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
|
|
fd.rio.Lock()
|
|
defer fd.rio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, 0, 0, nil, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.rdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
|
|
if err != nil {
|
|
// TODO(dfc) should n and oobn be set to 0
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitRead(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
err = chkReadErr(n, err, fd)
|
|
break
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
err = &OpError{"read", fd.net, fd.laddr, err}
|
|
}
|
|
return
|
|
}
|
|
|
|
func chkReadErr(n int, err error, fd *netFD) error {
|
|
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
|
|
return io.EOF
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (fd *netFD) Write(p []byte) (nn int, err error) {
|
|
fd.wio.Lock()
|
|
defer fd.wio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.wdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
var n int
|
|
n, err = syscall.Write(int(fd.sysfd), p[nn:])
|
|
if n > 0 {
|
|
nn += n
|
|
}
|
|
if nn == len(p) {
|
|
break
|
|
}
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
if err != nil {
|
|
n = 0
|
|
break
|
|
}
|
|
if n == 0 {
|
|
err = io.ErrUnexpectedEOF
|
|
break
|
|
}
|
|
}
|
|
if err != nil {
|
|
err = &OpError{"write", fd.net, fd.raddr, err}
|
|
}
|
|
return nn, err
|
|
}
|
|
|
|
func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
|
|
fd.wio.Lock()
|
|
defer fd.wio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.wdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
err = syscall.Sendto(fd.sysfd, p, 0, sa)
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
break
|
|
}
|
|
if err == nil {
|
|
n = len(p)
|
|
} else {
|
|
err = &OpError{"write", fd.net, fd.raddr, err}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
|
|
fd.wio.Lock()
|
|
defer fd.wio.Unlock()
|
|
if err := fd.incref(false); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer fd.decref()
|
|
for {
|
|
if fd.wdeadline.expired() {
|
|
err = errTimeout
|
|
break
|
|
}
|
|
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitWrite(fd); err == nil {
|
|
continue
|
|
}
|
|
}
|
|
break
|
|
}
|
|
if err == nil {
|
|
n = len(p)
|
|
oobn = len(oob)
|
|
} else {
|
|
err = &OpError{"write", fd.net, fd.raddr, err}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
|
|
if err := fd.incref(false); err != nil {
|
|
return nil, err
|
|
}
|
|
defer fd.decref()
|
|
|
|
var s int
|
|
var rsa syscall.Sockaddr
|
|
for {
|
|
s, rsa, err = accept(fd.sysfd)
|
|
if err != nil {
|
|
if err == syscall.EAGAIN {
|
|
if err = fd.pollServer.WaitRead(fd); err == nil {
|
|
continue
|
|
}
|
|
} else if err == syscall.ECONNABORTED {
|
|
// This means that a socket on the listen queue was closed
|
|
// before we Accept()ed it; it's a silly error, so try again.
|
|
continue
|
|
}
|
|
return nil, &OpError{"accept", fd.net, fd.laddr, err}
|
|
}
|
|
break
|
|
}
|
|
|
|
if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
|
|
closesocket(s)
|
|
return nil, err
|
|
}
|
|
lsa, _ := syscall.Getsockname(netfd.sysfd)
|
|
netfd.setAddr(toAddr(lsa), toAddr(rsa))
|
|
return netfd, nil
|
|
}
|
|
|
|
func (fd *netFD) dup() (f *os.File, err error) {
|
|
syscall.ForkLock.RLock()
|
|
ns, err := syscall.Dup(fd.sysfd)
|
|
if err != nil {
|
|
syscall.ForkLock.RUnlock()
|
|
return nil, &OpError{"dup", fd.net, fd.laddr, err}
|
|
}
|
|
syscall.CloseOnExec(ns)
|
|
syscall.ForkLock.RUnlock()
|
|
|
|
// We want blocking mode for the new fd, hence the double negative.
|
|
if err = syscall.SetNonblock(ns, false); err != nil {
|
|
return nil, &OpError{"setnonblock", fd.net, fd.laddr, err}
|
|
}
|
|
|
|
return os.NewFile(uintptr(ns), fd.name()), nil
|
|
}
|
|
|
|
func closesocket(s int) error {
|
|
return syscall.Close(s)
|
|
}
|