Merge pull request #2775 from ellisonbg/kernelid

General cleanup of kernel manager code.

This does some general cleanup of MultiKernelManager to better reflect how it is actually being used.

Sometimes there is a need to create kernel_id's elsewhere in code.  This minor change allows a kernel_id to be created outside of the MultiKernelManager and passed in as a kwarg.  An exception is raised if the id is already used.

Tasks:

- [x] Fix cleanup of ipc files.
- [x] Allow kernel_id to be passed to MultiKernelManager.start_kernel.
- [x] Add ipc support to MultiKernelManager.
- [x] Add more tests for MultiKernelManager.
- [x] Rename sub channel to iopub channel everywhere.
- [x] Use consistent naming for all channel classes in zmq, inprocess and qt.
- [x] Move BlockingChannelMixin to zmq.blockingkernelmanager.
- [x] Create ABC for KernelManager.
- [x] Make the InProcessKernelManager a Configurable.
- [x] Cleanup docstrings in ABCs.
- [x] Add tests for KernelManager.
- [x] Check over MultiKernelManager.
- [x] Make KernelManager,kill_kernel private in ABC and implementations.
- [x] Find bug that is causing the kernel manager tests to hang unless the shell channel linger is set to 0.
- [x] Decide about critical logging in ipkernel.
- [x] Debug lack of stderr redirect in tests.
This commit is contained in:
Min RK 2013-01-17 22:06:47 -08:00
commit 40fb853bc7
3 changed files with 107 additions and 81 deletions

View File

@ -330,7 +330,7 @@ class MainKernelHandler(AuthenticatedHandler):
@web.authenticated
def get(self):
km = self.application.kernel_manager
self.finish(jsonapi.dumps(km.kernel_ids))
self.finish(jsonapi.dumps(km.list_kernel_ids()))
@web.authenticated
def post(self):
@ -364,9 +364,9 @@ class KernelActionHandler(AuthenticatedHandler):
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
new_kernel_id = km.restart_kernel(kernel_id)
data = {'ws_url':self.ws_url,'kernel_id':new_kernel_id}
self.set_header('Location', '/'+new_kernel_id)
km.restart_kernel(kernel_id)
data = {'ws_url':self.ws_url, 'kernel_id':kernel_id}
self.set_header('Location', '/'+kernel_id)
self.write(jsonapi.dumps(data))
self.finish()
@ -416,7 +416,7 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
def open(self, kernel_id):
self.kernel_id = kernel_id.decode('ascii')
try:
cfg = self.application.ipython_app.config
cfg = self.application.config
except AttributeError:
# protect from the case where this is run from something other than
# the notebook app:
@ -541,9 +541,13 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
if not self.hb_stream.closed():
self.hb_stream.on_recv(None)
def kernel_died(self):
def _delete_kernel_data(self):
"""Remove the kernel data and notebook mapping."""
self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
self.application.log.error("Kernel %s failed to respond to heartbeat", self.kernel_id)
def kernel_died(self):
self._delete_kernel_data()
self.application.log.error("Kernel died: %s" % self.kernel_id)
self.write_message(
{'header': {'msg_type': 'status'},
'parent_header': {},

View File

@ -17,8 +17,6 @@ Authors:
#-----------------------------------------------------------------------------
import os
import signal
import sys
import uuid
import zmq
@ -63,56 +61,65 @@ class MultiKernelManager(LoggingConfigurable):
_kernels = Dict()
@property
def kernel_ids(self):
def list_kernel_ids(self):
"""Return a list of the kernel ids of the active kernels."""
return self._kernels.keys()
# Create a copy so we can iterate over kernels in operations
# that delete keys.
return list(self._kernels.keys())
def __len__(self):
"""Return the number of running kernels."""
return len(self.kernel_ids)
return len(self.list_kernel_ids())
def __contains__(self, kernel_id):
if kernel_id in self.kernel_ids:
return True
else:
return False
return kernel_id in self._kernels
def start_kernel(self, **kwargs):
"""Start a new kernel."""
kernel_id = unicode(uuid.uuid4())
# use base KernelManager for each Kernel
"""Start a new kernel.
The caller can pick a kernel_id by passing one in as a keyword arg,
otherwise one will be picked using a uuid.
To silence the kernel's stdout/stderr, call this using::
km.start_kernel(stdout=PIPE, stderr=PIPE)
"""
kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
if kernel_id in self:
raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
# kernel_manager_factory is the constructor for the KernelManager
# subclass we are using. It can be configured as any Configurable,
# including things like its transport and ip.
km = self.kernel_manager_factory(connection_file=os.path.join(
self.connection_dir, "kernel-%s.json" % kernel_id),
config=self.config,
)
km.start_kernel(**kwargs)
# start just the shell channel, needed for graceful restart
km.start_channels(shell=True, sub=False, stdin=False, hb=False)
km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
self._kernels[kernel_id] = km
return kernel_id
def shutdown_kernel(self, kernel_id):
def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel by its kernel uuid.
Parameters
==========
kernel_id : uuid
The id of the kernel to shutdown.
now : bool
Should the kernel be shutdown forcibly using a signal.
"""
self.get_kernel(kernel_id).shutdown_kernel()
k = self.get_kernel(kernel_id)
k.shutdown_kernel(now=now)
k.shell_channel.stop()
del self._kernels[kernel_id]
def kill_kernel(self, kernel_id):
"""Kill a kernel by its kernel uuid.
Parameters
==========
kernel_id : uuid
The id of the kernel to kill.
"""
self.get_kernel(kernel_id).kill_kernel()
del self._kernels[kernel_id]
def shutdown_all(self, now=False):
"""Shutdown all kernels."""
for kid in self.list_kernel_ids():
self.shutdown_kernel(kid, now=now)
def interrupt_kernel(self, kernel_id):
"""Interrupt (SIGINT) the kernel by its uuid.
@ -125,7 +132,7 @@ class MultiKernelManager(LoggingConfigurable):
return self.get_kernel(kernel_id).interrupt_kernel()
def signal_kernel(self, kernel_id, signum):
""" Sends a signal to the kernel by its uuid.
"""Sends a signal to the kernel by its uuid.
Note that since only SIGTERM is supported on Windows, this function
is only useful on Unix systems.
@ -161,8 +168,8 @@ class MultiKernelManager(LoggingConfigurable):
else:
raise KeyError("Kernel with id not found: %s" % kernel_id)
def get_kernel_ports(self, kernel_id):
"""Return a dictionary of ports for a kernel.
def get_connection_info(self, kernel_id):
"""Return a dictionary of connection data for a kernel.
Parameters
==========
@ -171,21 +178,39 @@ class MultiKernelManager(LoggingConfigurable):
Returns
=======
port_dict : dict
A dict of key, value pairs where the keys are the names
(stdin_port,iopub_port,shell_port) and the values are the
integer port numbers for those channels.
connection_dict : dict
A dict of the information needed to connect to a kernel.
This includes the ip address and the integer port
numbers of the different channels (stdin_port, iopub_port,
shell_port, hb_port).
"""
# this will raise a KeyError if not found:
km = self.get_kernel(kernel_id)
return dict(shell_port=km.shell_port,
return dict(transport=km.transport,
ip=km.ip,
shell_port=km.shell_port,
iopub_port=km.iopub_port,
stdin_port=km.stdin_port,
hb_port=km.hb_port,
)
)
def get_kernel_ip(self, kernel_id):
"""Return ip address for a kernel.
def _make_url(self, transport, ip, port):
"""Make a ZeroMQ URL for a given transport, ip and port."""
if transport == 'tcp':
return "tcp://%s:%i" % (ip, port)
else:
return "%s://%s-%s" % (transport, ip, port)
def _create_connected_stream(self, kernel_id, socket_type):
"""Create a connected ZMQStream for a kernel."""
cinfo = self.get_connection_info(kernel_id)
url = self._make_url(cinfo['transport'], cinfo['ip'], cinfo['port'])
sock = self.context.socket(socket_type)
self.log.info("Connecting to: %s" % url)
sock.connect(url)
return ZMQStream(sock)
def create_iopub_stream(self, kernel_id):
"""Return a ZMQStream object connected to the iopub channel.
Parameters
==========
@ -194,35 +219,40 @@ class MultiKernelManager(LoggingConfigurable):
Returns
=======
ip : str
The ip address of the kernel.
stream : ZMQStream
"""
return self.get_kernel(kernel_id).ip
def create_connected_stream(self, ip, port, socket_type):
sock = self.context.socket(socket_type)
addr = "tcp://%s:%i" % (ip, port)
self.log.info("Connecting to: %s" % addr)
sock.connect(addr)
return ZMQStream(sock)
def create_iopub_stream(self, kernel_id):
ip = self.get_kernel_ip(kernel_id)
ports = self.get_kernel_ports(kernel_id)
iopub_stream = self.create_connected_stream(ip, ports['iopub_port'], zmq.SUB)
iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB)
iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
return iopub_stream
def create_shell_stream(self, kernel_id):
ip = self.get_kernel_ip(kernel_id)
ports = self.get_kernel_ports(kernel_id)
shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.DEALER)
"""Return a ZMQStream object connected to the shell channel.
Parameters
==========
kernel_id : uuid
The id of the kernel.
Returns
=======
stream : ZMQStream
"""
shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER)
return shell_stream
def create_hb_stream(self, kernel_id):
ip = self.get_kernel_ip(kernel_id)
ports = self.get_kernel_ports(kernel_id)
hb_stream = self.create_connected_stream(ip, ports['hb_port'], zmq.REQ)
"""Return a ZMQStream object connected to the hb channel.
Parameters
==========
kernel_id : uuid
The id of the kernel.
Returns
=======
stream : ZMQStream
"""
hb_stream = self._create_connected_stream(kernel_id, zmq.REQ)
return hb_stream
@ -289,20 +319,15 @@ class MappingKernelManager(MultiKernelManager):
self.log.info("Using existing kernel: %s" % kernel_id)
return kernel_id
def shutdown_kernel(self, kernel_id):
def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel and remove its notebook association."""
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).shutdown_kernel(kernel_id)
super(MappingKernelManager, self).shutdown_kernel(
kernel_id, now=now
)
self.delete_mapping_for_kernel(kernel_id)
self.log.info("Kernel shutdown: %s" % kernel_id)
def kill_kernel(self, kernel_id):
"""Kill a kernel and remove its notebook association."""
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).kill_kernel(kernel_id)
self.delete_mapping_for_kernel(kernel_id)
self.log.info("Kernel killed: %s" % kernel_id)
def interrupt_kernel(self, kernel_id):
"""Interrupt a kernel."""
self._check_kernel_id(kernel_id)
@ -314,7 +339,6 @@ class MappingKernelManager(MultiKernelManager):
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).restart_kernel(kernel_id)
self.log.info("Kernel restarted: %s" % kernel_id)
return kernel_id
def create_iopub_stream(self, kernel_id):
"""Create a new iopub stream."""

View File

@ -187,6 +187,7 @@ class NotebookWebApplication(web.Application):
self.cluster_manager = cluster_manager
self.ipython_app = ipython_app
self.read_only = self.ipython_app.read_only
self.config = self.ipython_app.config
self.log = log
self.jinja2_env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), "templates")))
@ -591,16 +592,13 @@ class NotebookApp(BaseIPythonApplication):
self.init_signal()
def cleanup_kernels(self):
"""shutdown all kernels
"""Shutdown all kernels.
The kernels will shutdown themselves when this process no longer exists,
but explicit shutdown allows the KernelManagers to cleanup the connection files.
"""
self.log.info('Shutting down kernels')
km = self.kernel_manager
# copy list, since shutdown_kernel deletes keys
for kid in list(km.kernel_ids):
km.shutdown_kernel(kid)
self.kernel_manager.shutdown_all()
def start(self):
ip = self.ip if self.ip else '[all ip addresses on your system]'