actually send only one kernel_info request

store the Future for the initial request,
allowing subsequent requests to wait on the same pending reply.

Previously, any incoming requests that arrived while waiting for the first reply would send their own request.
This commit is contained in:
Min RK 2014-11-01 13:09:05 -07:00
parent d26b4291f5
commit 3e82ea8319

View File

@ -103,8 +103,8 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
km = self.kernel_manager km = self.kernel_manager
kernel = km.get_kernel(self.kernel_id) kernel = km.get_kernel(self.kernel_id)
try: try:
# check for cached value # check for previous request
kernel_info = kernel._kernel_info future = kernel._kernel_info_future
except AttributeError: except AttributeError:
self.log.debug("Requesting kernel info from %s", self.kernel_id) self.log.debug("Requesting kernel info from %s", self.kernel_id)
# Create a kernel_info channel to query the kernel protocol version. # Create a kernel_info channel to query the kernel protocol version.
@ -113,9 +113,12 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
self.kernel_info_channel = km.connect_shell(self.kernel_id) self.kernel_info_channel = km.connect_shell(self.kernel_id)
self.kernel_info_channel.on_recv(self._handle_kernel_info_reply) self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
self.session.send(self.kernel_info_channel, "kernel_info_request") self.session.send(self.kernel_info_channel, "kernel_info_request")
# store the future on the kernel, so only one request is sent
kernel._kernel_info_future = self._kernel_info_future
else: else:
# use cached value, don't resend request if not future.done():
self._finish_kernel_info(kernel_info) self.log.debug("Waiting for pending kernel_info request")
future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
return self._kernel_info_future return self._kernel_info_future
def _handle_kernel_info_reply(self, msg): def _handle_kernel_info_reply(self, msg):
@ -128,16 +131,14 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
msg = self.session.deserialize(msg) msg = self.session.deserialize(msg)
except: except:
self.log.error("Bad kernel_info reply", exc_info=True) self.log.error("Bad kernel_info reply", exc_info=True)
self._kernel_info_future.set_result(None) self._kernel_info_future.set_result({})
return return
else: else:
info = msg['content'] info = msg['content']
self.log.debug("Received kernel info: %s", info) self.log.debug("Received kernel info: %s", info)
if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info: if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
self.log.error("Kernel info request failed, assuming current %s", info) self.log.error("Kernel info request failed, assuming current %s", info)
else: info = {}
kernel = self.kernel_manager.get_kernel(self.kernel_id)
kernel._kernel_info = info
self._finish_kernel_info(info) self._finish_kernel_info(info)
# close the kernel_info channel, we don't need it anymore # close the kernel_info channel, we don't need it anymore
@ -179,7 +180,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
if future.done(): if future.done():
return return
self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id) self.log.warn("Timeout waiting for kernel_info reply from %s", self.kernel_id)
future.set_result(None) future.set_result({})
loop = IOLoop.current() loop = IOLoop.current()
loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up) loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
# actually wait for it # actually wait for it