diff --git a/IPython/testing/iptest.py b/IPython/testing/iptest.py index e94540a04..e95109468 100644 --- a/IPython/testing/iptest.py +++ b/IPython/testing/iptest.py @@ -365,47 +365,22 @@ class StreamCapturer(Thread): super(StreamCapturer, self).__init__() self.streams = [] self.buffer = BytesIO() - self.streams_lock = Lock() + self.readfd, self.writefd = os.pipe() self.buffer_lock = Lock() - self.stream_added = Event() self.stop = Event() def run(self): self.started = True + while not self.stop.is_set(): - with self.streams_lock: - streams = self.streams + ready = select([self.readfd], [], [], 1)[0] - if not streams: - self.stream_added.wait(timeout=1) - self.stream_added.clear() - continue - - ready = select(streams, [], [], 0.5)[0] - dead = [] - with self.buffer_lock: - for fd in ready: - try: - self.buffer.write(os.read(fd, 1024)) - except OSError as e: - import errno - if e.errno == errno.EBADF: - dead.append(fd) - else: - raise - - with self.streams_lock: - for fd in dead: - self.streams.remove(fd) + if ready: + with self.buffer_lock: + self.buffer.write(os.read(self.readfd, 1024)) - def add_stream(self, fd): - with self.streams_lock: - self.streams.append(fd) - self.stream_added.set() - - def remove_stream(self, fd): - with self.streams_lock: - self.streams.remove(fd) + os.close(self.readfd) + os.close(self.writefd) def reset_buffer(self): with self.buffer_lock: @@ -426,7 +401,7 @@ class SubprocessStreamCapturePlugin(Plugin): Plugin.__init__(self) self.stream_capturer = StreamCapturer() # This is ugly, but distant parts of the test machinery need to be able - # to add streams, so we make the object globally accessible. + # to redirect streams, so we make the object globally accessible. nose.ipy_stream_capturer = self.stream_capturer def configure(self, options, config):