Simplify StreamCapturer for subprocess testing

Rather than using a transient pipe for each subprocess started, the
StreamCapturer now makes a single pipe, and subprocesses redirect their
output to it.

So long as this works on Windows (I've done brief testing, and os.pipe()
seems to be functional), this will hopefully make this much more robust.
The recent failures in ShiningPanda on IPython.parallel have been caused
by StreamCapturer.
This commit is contained in:
Thomas Kluyver 2013-10-29 12:24:24 -07:00
parent 0f25ac58b3
commit acd00de9dd

View File

@ -365,47 +365,22 @@ class StreamCapturer(Thread):
super(StreamCapturer, self).__init__()
self.streams = []
self.buffer = BytesIO()
self.streams_lock = Lock()
self.readfd, self.writefd = os.pipe()
self.buffer_lock = Lock()
self.stream_added = Event()
self.stop = Event()
def run(self):
self.started = True
while not self.stop.is_set():
with self.streams_lock:
streams = self.streams
ready = select([self.readfd], [], [], 1)[0]
if not streams:
self.stream_added.wait(timeout=1)
self.stream_added.clear()
continue
ready = select(streams, [], [], 0.5)[0]
dead = []
with self.buffer_lock:
for fd in ready:
try:
self.buffer.write(os.read(fd, 1024))
except OSError as e:
import errno
if e.errno == errno.EBADF:
dead.append(fd)
else:
raise
with self.streams_lock:
for fd in dead:
self.streams.remove(fd)
if ready:
with self.buffer_lock:
self.buffer.write(os.read(self.readfd, 1024))
def add_stream(self, fd):
with self.streams_lock:
self.streams.append(fd)
self.stream_added.set()
def remove_stream(self, fd):
with self.streams_lock:
self.streams.remove(fd)
os.close(self.readfd)
os.close(self.writefd)
def reset_buffer(self):
with self.buffer_lock:
@ -426,7 +401,7 @@ class SubprocessStreamCapturePlugin(Plugin):
Plugin.__init__(self)
self.stream_capturer = StreamCapturer()
# This is ugly, but distant parts of the test machinery need to be able
# to add streams, so we make the object globally accessible.
# to redirect streams, so we make the object globally accessible.
nose.ipy_stream_capturer = self.stream_capturer
def configure(self, options, config):