Fixed subtle bug in kernel restarting.

* Old routers were not being shutdown and removed.
* We were incorrectly associating the new kernel with the notebook
  (we were using the *old* kernel_id for this).
* General clean ups in the kernel manager.
This commit is contained in:
Brian E. Granger 2011-08-07 11:17:54 -07:00
parent a0cbccaa03
commit 8cf1812395
2 changed files with 52 additions and 10 deletions

View File

@ -90,14 +90,12 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
rkm = self.application.routing_kernel_manager rkm = self.application.routing_kernel_manager
self.router = rkm.get_router(kernel_id, self.stream_name) self.router = rkm.get_router(kernel_id, self.stream_name)
self.client_id = self.router.register_client(self) self.client_id = self.router.register_client(self)
logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
def on_message(self, msg): def on_message(self, msg):
self.router.forward_msg(self.client_id, msg) self.router.forward_msg(self.client_id, msg)
def on_close(self): def on_close(self):
self.router.unregister_client(self.client_id) self.router.unregister_client(self.client_id)
logging.info("Connection closed: %s" % self.client_id)
#----------------------------------------------------------------------------- #-----------------------------------------------------------------------------

View File

@ -207,9 +207,20 @@ class RoutingKernelManager(LoggingConfigurable):
@property @property
def kernel_ids(self): def kernel_ids(self):
"""List the kernel ids."""
return self.kernel_manager.kernel_ids return self.kernel_manager.kernel_ids
def kernel_for_notebook(self, notebook_id):
"""Return the kernel_id for a notebook_id or None."""
return self._notebook_mapping.get(notebook_id)
def set_kernel_for_notebook(self, notebook_id, kernel_id):
"""Associate a notebook with a kernel."""
if notebook_id is not None:
self._notebook_mapping[notebook_id] = kernel_id
def notebook_for_kernel(self, kernel_id): def notebook_for_kernel(self, kernel_id):
"""Return the notebook_id for a kernel_id or None."""
notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id] notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
if len(notebook_ids) == 1: if len(notebook_ids) == 1:
return notebook_ids[0] return notebook_ids[0]
@ -217,20 +228,29 @@ class RoutingKernelManager(LoggingConfigurable):
return None return None
def delete_mapping_for_kernel(self, kernel_id): def delete_mapping_for_kernel(self, kernel_id):
"""Remove the kernel/notebook mapping for kernel_id."""
notebook_id = self.notebook_for_kernel(kernel_id) notebook_id = self.notebook_for_kernel(kernel_id)
if notebook_id is not None: if notebook_id is not None:
del self._notebook_mapping[notebook_id] del self._notebook_mapping[notebook_id]
def start_kernel(self, notebook_id=None): def start_kernel(self, notebook_id=None):
"""Start a kernel an return its kernel_id.
Parameters
----------
notebook_id : uuid
The uuid of the notebook to associate the new kernel with. If this
is not None, this kernel will be persistent whenever the notebook
requests a kernel.
"""
self.log.info self.log.info
kernel_id = self._notebook_mapping.get(notebook_id) kernel_id = self.kernel_for_notebook(notebook_id)
if kernel_id is None: if kernel_id is None:
kwargs = dict() kwargs = dict()
kwargs['extra_arguments'] = self.kernel_argv kwargs['extra_arguments'] = self.kernel_argv
kernel_id = self.kernel_manager.start_kernel(**kwargs) kernel_id = self.kernel_manager.start_kernel(**kwargs)
if notebook_id is not None: self.set_kernel_for_notebook(notebook_id, kernel_id)
self._notebook_mapping[notebook_id] = kernel_id self.log.info("Kernel started: %s" % kernel_id)
self.log.info("Kernel started for notebook %s: %s" % (notebook_id,kernel_id))
self.log.debug("Kernel args: %r" % kwargs) self.log.debug("Kernel args: %r" % kwargs)
self.start_session_manager(kernel_id) self.start_session_manager(kernel_id)
else: else:
@ -238,6 +258,7 @@ class RoutingKernelManager(LoggingConfigurable):
return kernel_id return kernel_id
def start_session_manager(self, kernel_id): def start_session_manager(self, kernel_id):
"""Start the ZMQ sockets (a "session") to connect to a kernel."""
sm = self.kernel_manager.create_session_manager(kernel_id) sm = self.kernel_manager.create_session_manager(kernel_id)
self._session_dict[kernel_id] = sm self._session_dict[kernel_id] = sm
iopub_stream = sm.get_iopub_stream() iopub_stream = sm.get_iopub_stream()
@ -248,10 +269,11 @@ class RoutingKernelManager(LoggingConfigurable):
shell_router = ShellStreamRouter( shell_router = ShellStreamRouter(
zmq_stream=shell_stream, session=sm.session, config=self.config zmq_stream=shell_stream, session=sm.session, config=self.config
) )
self._routers[(kernel_id, 'iopub')] = iopub_router self.set_router(kernel_id, 'iopub', iopub_router)
self._routers[(kernel_id, 'shell')] = shell_router self.set_router(kernel_id, 'shell', shell_router)
def kill_kernel(self, kernel_id): def kill_kernel(self, kernel_id):
"""Kill a kernel and remove its notebook association."""
if kernel_id not in self.kernel_manager: if kernel_id not in self.kernel_manager:
raise web.HTTPError(404) raise web.HTTPError(404)
try: try:
@ -264,12 +286,14 @@ class RoutingKernelManager(LoggingConfigurable):
self.log.info("Kernel killed: %s" % kernel_id) self.log.info("Kernel killed: %s" % kernel_id)
def interrupt_kernel(self, kernel_id): def interrupt_kernel(self, kernel_id):
"""Interrupt a kernel."""
if kernel_id not in self.kernel_manager: if kernel_id not in self.kernel_manager:
raise web.HTTPError(404) raise web.HTTPError(404)
self.kernel_manager.interrupt_kernel(kernel_id) self.kernel_manager.interrupt_kernel(kernel_id)
self.log.debug("Kernel interrupted: %s" % kernel_id) self.log.debug("Kernel interrupted: %s" % kernel_id)
def restart_kernel(self, kernel_id): def restart_kernel(self, kernel_id):
"""Restart a kernel while keeping clients connected."""
if kernel_id not in self.kernel_manager: if kernel_id not in self.kernel_manager:
raise web.HTTPError(404) raise web.HTTPError(404)
@ -286,6 +310,14 @@ class RoutingKernelManager(LoggingConfigurable):
new_iopub_router.copy_clients(old_iopub_router) new_iopub_router.copy_clients(old_iopub_router)
new_shell_router.copy_clients(old_shell_router) new_shell_router.copy_clients(old_shell_router)
# Shut down the old routers
old_shell_router.close()
old_iopub_router.close()
self.delete_router(kernel_id, 'shell')
self.delete_router(kernel_id, 'iopub')
del old_shell_router
del old_iopub_router
# Now shutdown the old session and the kernel. # Now shutdown the old session and the kernel.
# TODO: This causes a hard crash in ZMQStream.close, which sets # TODO: This causes a hard crash in ZMQStream.close, which sets
# self.socket to None to hastily. We will need to fix this in PyZMQ # self.socket to None to hastily. We will need to fix this in PyZMQ
@ -295,12 +327,24 @@ class RoutingKernelManager(LoggingConfigurable):
# Now save the new kernel/notebook association. We have to save it # Now save the new kernel/notebook association. We have to save it
# after the old kernel is killed as that will delete the mapping. # after the old kernel is killed as that will delete the mapping.
self._notebook_mapping[notebook_id] = kernel_id self.set_kernel_for_notebook(notebook_id, new_kernel_id)
self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id)) self.log.debug("Kernel restarted: %s" % new_kernel_id)
return new_kernel_id return new_kernel_id
def get_router(self, kernel_id, stream_name): def get_router(self, kernel_id, stream_name):
"""Return the router for a given kernel_id and stream name."""
router = self._routers[(kernel_id, stream_name)] router = self._routers[(kernel_id, stream_name)]
return router return router
def set_router(self, kernel_id, stream_name, router):
"""Set the router for a given kernel_id and stream_name."""
self._routers[(kernel_id, stream_name)] = router
def delete_router(self, kernel_id, stream_name):
"""Delete a router for a kernel_id and stream_name."""
try:
del self._routers[(kernel_id, stream_name)]
except KeyError:
pass