mirror of
https://github.com/jupyter/notebook.git
synced 2025-03-07 13:07:22 +08:00
share code between zmq channel handlers
This commit is contained in:
parent
0d6676f340
commit
f4b937767e
@ -519,23 +519,28 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
|
||||
self.on_message = self.save_on_message
|
||||
|
||||
|
||||
class IOPubHandler(AuthenticatedZMQStreamHandler):
|
||||
|
||||
class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
|
||||
|
||||
@property
|
||||
def max_msg_size(self):
|
||||
return self.settings.get('max_msg_size', 65535)
|
||||
|
||||
def create_stream(self):
|
||||
km = self.kernel_manager
|
||||
meth = getattr(km, 'connect_%s' % self.channel)
|
||||
self.zmq_stream = meth(self.kernel_id)
|
||||
|
||||
def initialize(self, *args, **kwargs):
|
||||
self.iopub_stream = None
|
||||
|
||||
self.zmq_stream = None
|
||||
|
||||
def on_first_message(self, msg):
|
||||
try:
|
||||
super(IOPubHandler, self).on_first_message(msg)
|
||||
super(ZMQChannelHandler, self).on_first_message(msg)
|
||||
except web.HTTPError:
|
||||
self.close()
|
||||
return
|
||||
km = self.kernel_manager
|
||||
kernel_id = self.kernel_id
|
||||
km.add_restart_callback(kernel_id, self.on_kernel_restarted)
|
||||
km.add_restart_callback(kernel_id, self.on_restart_failed, 'dead')
|
||||
try:
|
||||
self.iopub_stream = km.connect_iopub(kernel_id)
|
||||
self.create_stream()
|
||||
except web.HTTPError:
|
||||
# WebSockets don't response to traditional error codes so we
|
||||
# close the connection.
|
||||
@ -543,29 +548,32 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
|
||||
self.stream.close()
|
||||
self.close()
|
||||
else:
|
||||
self.iopub_stream.on_recv(self._on_zmq_reply)
|
||||
self.zmq_stream.on_recv(self._on_zmq_reply)
|
||||
|
||||
def on_message(self, msg):
|
||||
pass
|
||||
|
||||
def _send_status_message(self, status):
|
||||
msg = self.session.msg("status",
|
||||
{'execution_state': status}
|
||||
)
|
||||
self.write_message(jsonapi.dumps(msg, default=date_default))
|
||||
|
||||
def on_kernel_restarted(self):
|
||||
self.log.warn("kernel %s restarted", self.kernel_id)
|
||||
self._send_status_message('restarting')
|
||||
|
||||
def on_restart_failed(self):
|
||||
self.log.error("kernel %s restarted failed!", self.kernel_id)
|
||||
self._send_status_message('dead')
|
||||
if len(msg) < self.max_msg_size:
|
||||
msg = jsonapi.loads(msg)
|
||||
self.session.send(self.zmq_stream, msg)
|
||||
|
||||
def on_close(self):
|
||||
# This method can be called twice, once by self.kernel_died and once
|
||||
# from the WebSocket close event. If the WebSocket connection is
|
||||
# closed before the ZMQ streams are setup, they could be None.
|
||||
if self.zmq_stream is not None and not self.zmq_stream.closed():
|
||||
self.zmq_stream.on_recv(None)
|
||||
self.zmq_stream.close()
|
||||
|
||||
|
||||
class IOPubHandler(ZMQChannelHandler):
|
||||
channel = 'iopub'
|
||||
|
||||
def create_stream(self):
|
||||
super(IOPubHandler, self).create_stream()
|
||||
km = self.kernel_manager
|
||||
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
|
||||
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
|
||||
|
||||
def on_close(self):
|
||||
km = self.kernel_manager
|
||||
if self.kernel_id in km:
|
||||
km.remove_restart_callback(
|
||||
@ -574,48 +582,27 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
|
||||
km.remove_restart_callback(
|
||||
self.kernel_id, self.on_restart_failed, 'dead',
|
||||
)
|
||||
if self.iopub_stream is not None and not self.iopub_stream.closed():
|
||||
self.iopub_stream.on_recv(None)
|
||||
self.iopub_stream.close()
|
||||
|
||||
|
||||
class ShellHandler(AuthenticatedZMQStreamHandler):
|
||||
super(IOPubHandler, self).on_close()
|
||||
|
||||
@property
|
||||
def max_msg_size(self):
|
||||
return self.settings.get('max_msg_size', 65535)
|
||||
def _send_status_message(self, status):
|
||||
msg = self.session.msg("status",
|
||||
{'execution_state': status}
|
||||
)
|
||||
self.write_message(jsonapi.dumps(msg, default=date_default))
|
||||
|
||||
def initialize(self, *args, **kwargs):
|
||||
self.shell_stream = None
|
||||
def on_kernel_restarted(self):
|
||||
logging.warn("kernel %s restarted", self.kernel_id)
|
||||
self._send_status_message('restarting')
|
||||
|
||||
def on_first_message(self, msg):
|
||||
try:
|
||||
super(ShellHandler, self).on_first_message(msg)
|
||||
except web.HTTPError:
|
||||
self.close()
|
||||
return
|
||||
km = self.kernel_manager
|
||||
kernel_id = self.kernel_id
|
||||
try:
|
||||
self.shell_stream = km.connect_shell(kernel_id)
|
||||
except web.HTTPError:
|
||||
# WebSockets don't response to traditional error codes so we
|
||||
# close the connection.
|
||||
if not self.stream.closed():
|
||||
self.stream.close()
|
||||
self.close()
|
||||
else:
|
||||
self.shell_stream.on_recv(self._on_zmq_reply)
|
||||
def on_restart_failed(self):
|
||||
logging.error("kernel %s restarted failed!", self.kernel_id)
|
||||
self._send_status_message('dead')
|
||||
|
||||
class ShellHandler(ZMQChannelHandler):
|
||||
channel = 'shell'
|
||||
|
||||
def on_message(self, msg):
|
||||
if len(msg) < self.max_msg_size:
|
||||
msg = jsonapi.loads(msg)
|
||||
self.session.send(self.shell_stream, msg)
|
||||
|
||||
def on_close(self):
|
||||
# Make sure the stream exists and is not already closed.
|
||||
if self.shell_stream is not None and not self.shell_stream.closed():
|
||||
self.shell_stream.close()
|
||||
class StdinHandler(ZMQChannelHandler):
|
||||
channel = 'stdin'
|
||||
|
||||
|
||||
#-----------------------------------------------------------------------------
|
||||
|
Loading…
Reference in New Issue
Block a user