Merge pull request #7389 from minrk/one-websocket

use single WebSocket connection for all channels
This commit is contained in:
Brian E. Granger 2015-01-09 11:37:51 -08:00
commit ab83599d36
3 changed files with 107 additions and 142 deletions

View File

@ -138,7 +138,7 @@ class ZMQStreamHandler(WebSocketHandler):
"""meaningless for websockets""" """meaningless for websockets"""
pass pass
def _reserialize_reply(self, msg_list): def _reserialize_reply(self, msg_list, channel=None):
"""Reserialize a reply message using JSON. """Reserialize a reply message using JSON.
This takes the msg list from the ZMQ socket, deserializes it using This takes the msg list from the ZMQ socket, deserializes it using
@ -148,6 +148,8 @@ class ZMQStreamHandler(WebSocketHandler):
""" """
idents, msg_list = self.session.feed_identities(msg_list) idents, msg_list = self.session.feed_identities(msg_list)
msg = self.session.deserialize(msg_list) msg = self.session.deserialize(msg_list)
if channel:
msg['channel'] = channel
if msg['buffers']: if msg['buffers']:
buf = serialize_binary_message(msg) buf = serialize_binary_message(msg)
return buf return buf
@ -155,12 +157,13 @@ class ZMQStreamHandler(WebSocketHandler):
smsg = json.dumps(msg, default=date_default) smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg) return cast_unicode(smsg)
def _on_zmq_reply(self, msg_list): def _on_zmq_reply(self, stream, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the # Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called. # eventloop but hasn't been called.
if self.stream.closed(): return if stream.closed(): return
channel = getattr(stream, 'channel', None)
try: try:
msg = self._reserialize_reply(msg_list) msg = self._reserialize_reply(msg_list, channel=channel)
except Exception: except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True) self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else: else:

View File

@ -84,7 +84,7 @@ class KernelActionHandler(IPythonHandler):
self.finish() self.finish()
class ZMQChannelHandler(AuthenticatedZMQStreamHandler): class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
@property @property
def kernel_info_timeout(self): def kernel_info_timeout(self):
@ -95,8 +95,13 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
def create_stream(self): def create_stream(self):
km = self.kernel_manager km = self.kernel_manager
meth = getattr(km, 'connect_%s' % self.channel) identity = self.session.bsession
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession) for channel in ('shell', 'iopub', 'stdin'):
meth = getattr(km, 'connect_' + channel)
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
def request_kernel_info(self): def request_kernel_info(self):
"""send a request for kernel_info""" """send a request for kernel_info"""
@ -160,8 +165,9 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
self._kernel_info_future.set_result(info) self._kernel_info_future.set_result(info)
def initialize(self): def initialize(self):
super(ZMQChannelHandler, self).initialize() super(ZMQChannelsHandler, self).initialize()
self.zmq_stream = None self.zmq_stream = None
self.channels = {}
self.kernel_id = None self.kernel_id = None
self.kernel_info_channel = None self.kernel_info_channel = None
self._kernel_info_future = Future() self._kernel_info_future = Future()
@ -169,7 +175,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
@gen.coroutine @gen.coroutine
def pre_get(self): def pre_get(self):
# authenticate first # authenticate first
super(ZMQChannelHandler, self).pre_get() super(ZMQChannelsHandler, self).pre_get()
# then request kernel info, waiting up to a certain time before giving up. # then request kernel info, waiting up to a certain time before giving up.
# We don't want to wait forever, because browsers don't take it well when # We don't want to wait forever, because browsers don't take it well when
# servers never respond to websocket connection requests. # servers never respond to websocket connection requests.
@ -189,56 +195,36 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
@gen.coroutine @gen.coroutine
def get(self, kernel_id): def get(self, kernel_id):
self.kernel_id = cast_unicode(kernel_id, 'ascii') self.kernel_id = cast_unicode(kernel_id, 'ascii')
yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id) yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
def open(self, kernel_id): def open(self, kernel_id):
super(ZMQChannelHandler, self).open() super(ZMQChannelsHandler, self).open()
try: try:
self.create_stream() self.create_stream()
except web.HTTPError as e: except web.HTTPError as e:
self.log.error("Error opening stream: %s", e) self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we # WebSockets don't response to traditional error codes so we
# close the connection. # close the connection.
if not self.stream.closed(): for channel, stream in self.channels.items():
self.stream.close() if not stream.closed():
stream.close()
self.close() self.close()
else: else:
self.zmq_stream.on_recv(self._on_zmq_reply) for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)
def on_message(self, msg): def on_message(self, msg):
if self.zmq_stream is None:
return
elif self.zmq_stream.closed():
self.log.info("%s closed, closing websocket.", self)
self.close()
return
if isinstance(msg, bytes): if isinstance(msg, bytes):
msg = deserialize_binary_message(msg) msg = deserialize_binary_message(msg)
else: else:
msg = json.loads(msg) msg = json.loads(msg)
self.session.send(self.zmq_stream, msg) channel = msg.pop('channel', None)
if channel is None:
self.log.warn("No channel specified, assuming shell: %s", msg)
channel = 'shell'
stream = self.channels[channel]
self.session.send(stream, msg)
def on_close(self):
# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
if self.zmq_stream is not None and not self.zmq_stream.closed():
self.zmq_stream.on_recv(None)
# close the socket directly, don't wait for the stream
socket = self.zmq_stream.socket
self.zmq_stream.close()
socket.close()
class IOPubHandler(ZMQChannelHandler):
channel = 'iopub'
def create_stream(self):
super(IOPubHandler, self).create_stream()
km = self.kernel_manager
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
def on_close(self): def on_close(self):
km = self.kernel_manager km = self.kernel_manager
if self.kernel_id in km: if self.kernel_id in km:
@ -248,12 +234,24 @@ class IOPubHandler(ZMQChannelHandler):
km.remove_restart_callback( km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead', self.kernel_id, self.on_restart_failed, 'dead',
) )
super(IOPubHandler, self).on_close() # This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
for channel, stream in self.channels.items():
if stream is not None and not stream.closed():
stream.on_recv(None)
# close the socket directly, don't wait for the stream
socket = stream.socket
stream.close()
socket.close()
self.channels = {}
def _send_status_message(self, status): def _send_status_message(self, status):
msg = self.session.msg("status", msg = self.session.msg("status",
{'execution_state': status} {'execution_state': status}
) )
msg['channel'] = 'iopub'
self.write_message(json.dumps(msg, default=date_default)) self.write_message(json.dumps(msg, default=date_default))
def on_kernel_restarted(self): def on_kernel_restarted(self):
@ -263,18 +261,6 @@ class IOPubHandler(ZMQChannelHandler):
def on_restart_failed(self): def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id) logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead') self._send_status_message('dead')
def on_message(self, msg):
"""IOPub messages make no sense"""
pass
class ShellHandler(ZMQChannelHandler):
channel = 'shell'
class StdinHandler(ZMQChannelHandler):
channel = 'stdin'
#----------------------------------------------------------------------------- #-----------------------------------------------------------------------------
@ -289,7 +275,5 @@ default_handlers = [
(r"/api/kernels", MainKernelHandler), (r"/api/kernels", MainKernelHandler),
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler), (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler), (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
] ]

View File

@ -28,12 +28,7 @@ define([
this.id = null; this.id = null;
this.name = name; this.name = name;
this.ws = null;
this.channels = {
'shell': null,
'iopub': null,
'stdin': null
};
this.kernel_service_url = kernel_service_url; this.kernel_service_url = kernel_service_url;
this.kernel_url = null; this.kernel_url = null;
@ -429,7 +424,7 @@ define([
Kernel.prototype.start_channels = function () { Kernel.prototype.start_channels = function () {
/** /**
* Start the `shell`and `iopub` channels. * Start the websocket channels.
* Will stop and restart them if they already exist. * Will stop and restart them if they already exist.
* *
* @function start_channels * @function start_channels
@ -440,16 +435,12 @@ define([
console.log("Starting WebSockets:", ws_host_url); console.log("Starting WebSockets:", ws_host_url);
var channel_url = function(channel) { this.ws = new this.WebSocket([
return [
that.ws_url, that.ws_url,
utils.url_join_encode(that.kernel_url, channel), utils.url_join_encode(that.kernel_url, 'channels'),
"?session_id=" + that.session_id "?session_id=" + that.session_id
].join(''); ].join('')
}; );
this.channels.shell = new this.WebSocket(channel_url("shell"));
this.channels.stdin = new this.WebSocket(channel_url("stdin"));
this.channels.iopub = new this.WebSocket(channel_url("iopub"));
var already_called_onclose = false; // only alert once var already_called_onclose = false; // only alert once
var ws_closed_early = function(evt){ var ws_closed_early = function(evt){
@ -489,28 +480,22 @@ define([
that._ws_closed(ws_host_url, true); that._ws_closed(ws_host_url, true);
}; };
for (var c in this.channels) { this.ws.onopen = $.proxy(this._ws_opened, this);
this.channels[c].onopen = $.proxy(this._ws_opened, this); this.ws.onclose = ws_closed_early;
this.channels[c].onclose = ws_closed_early; this.ws.onerror = ws_error;
this.channels[c].onerror = ws_error;
}
// switch from early-close to late-close message after 1s // switch from early-close to late-close message after 1s
setTimeout(function() { setTimeout(function() {
for (var c in that.channels) { if (that.ws !== null) {
if (that.channels[c] !== null) { that.ws.onclose = ws_closed_late;
that.channels[c].onclose = ws_closed_late;
}
} }
}, 1000); }, 1000);
this.channels.shell.onmessage = $.proxy(this._handle_shell_reply, this); this.ws.onmessage = $.proxy(this._handle_ws_message, this);
this.channels.iopub.onmessage = $.proxy(this._handle_iopub_message, this);
this.channels.stdin.onmessage = $.proxy(this._handle_input_request, this);
}; };
Kernel.prototype._ws_opened = function (evt) { Kernel.prototype._ws_opened = function (evt) {
/** /**
* Handle a websocket entering the open state, * Handle a websocket entering the open state,
* signaling that the kernel is connected when all channels are open. * signaling that the kernel is connected when websocket is open.
* *
* @function _ws_opened * @function _ws_opened
*/ */
@ -522,8 +507,7 @@ define([
Kernel.prototype._ws_closed = function(ws_url, error) { Kernel.prototype._ws_closed = function(ws_url, error) {
/** /**
* Handle a websocket entering the closed state. This closes the * Handle a websocket entering the closed state. If the websocket
* other communication channels if they are open. If the websocket
* was not closed due to an error, try to reconnect to the kernel. * was not closed due to an error, try to reconnect to the kernel.
* *
* @function _ws_closed * @function _ws_closed
@ -560,27 +544,23 @@ define([
Kernel.prototype.stop_channels = function () { Kernel.prototype.stop_channels = function () {
/** /**
* Close the websocket channels. After successful close, the value * Close the websocket. After successful close, the value
* in `this.channels[channel_name]` will be null. * in `this.ws` will be null.
* *
* @function stop_channels * @function stop_channels
*/ */
var that = this; var that = this;
var close = function (c) { var close = function () {
return function () { if (that.ws && that.ws.readyState === WebSocket.CLOSED) {
if (that.channels[c] && that.channels[c].readyState === WebSocket.CLOSED) { that.ws = null;
that.channels[c] = null; }
}
};
}; };
for (var c in this.channels) { if (this.ws !== null) {
if ( this.channels[c] !== null ) { if (this.ws.readyState === WebSocket.OPEN) {
if (this.channels[c].readyState === WebSocket.OPEN) { this.ws.onclose = close;
this.channels[c].onclose = close(c); this.ws.close();
this.channels[c].close(); } else {
} else { close();
close(c)();
}
} }
} }
}; };
@ -588,20 +568,18 @@ define([
Kernel.prototype.is_connected = function () { Kernel.prototype.is_connected = function () {
/** /**
* Check whether there is a connection to the kernel. This * Check whether there is a connection to the kernel. This
* function only returns true if all channel objects have been * function only returns true if websocket has been
* created and have a state of WebSocket.OPEN. * created and has a state of WebSocket.OPEN.
* *
* @function is_connected * @function is_connected
* @returns {bool} - whether there is a connection * @returns {bool} - whether there is a connection
*/ */
for (var c in this.channels) { // if any channel is not ready, then we're not connected
// if any channel is not ready, then we're not connected if (this.ws === null) {
if (this.channels[c] === null) { return false;
return false; }
} if (this.ws.readyState !== WebSocket.OPEN) {
if (this.channels[c].readyState !== WebSocket.OPEN) { return false;
return false;
}
} }
return true; return true;
}; };
@ -615,12 +593,7 @@ define([
* @function is_fully_disconnected * @function is_fully_disconnected
* @returns {bool} - whether the kernel is fully disconnected * @returns {bool} - whether the kernel is fully disconnected
*/ */
for (var c in this.channels) { return (this.ws === null);
if (this.channels[c] === null) {
return true;
}
}
return false;
}; };
Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) { Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
@ -633,7 +606,8 @@ define([
throw new Error("kernel is not connected"); throw new Error("kernel is not connected");
} }
var msg = this._get_msg(msg_type, content, metadata, buffers); var msg = this._get_msg(msg_type, content, metadata, buffers);
this.channels.shell.send(serialize.serialize(msg)); msg.channel = 'shell';
this.ws.send(serialize.serialize(msg));
this.set_callbacks_for_msg(msg.header.msg_id, callbacks); this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
return msg.header.msg_id; return msg.header.msg_id;
}; };
@ -784,7 +758,8 @@ define([
}; };
this.events.trigger('input_reply.Kernel', {kernel: this, content: content}); this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
var msg = this._get_msg("input_reply", content); var msg = this._get_msg("input_reply", content);
this.channels.stdin.send(serialize.serialize(msg)); msg.channel = 'stdin';
this.ws.send(serialize.serialize(msg));
return msg.header.msg_id; return msg.header.msg_id;
}; };
@ -877,15 +852,28 @@ define([
this.last_msg_callbacks = {}; this.last_msg_callbacks = {};
} }
}; };
/** Kernel.prototype._handle_ws_message = function (e) {
* @function _handle_shell_reply serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
*/
Kernel.prototype._handle_shell_reply = function (e) {
serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
}; };
Kernel.prototype._finish_shell_reply = function (reply) { Kernel.prototype._finish_ws_message = function (msg) {
switch (msg.channel) {
case 'shell':
this._handle_shell_reply(msg);
break;
case 'iopub':
this._handle_iopub_message(msg);
break;
case 'stdin':
this._handle_input_request(msg);
break;
default:
console.error("unrecognized message channel", msg.channel, msg);
}
};
Kernel.prototype._handle_shell_reply = function (reply) {
this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply}); this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
var content = reply.content; var content = reply.content;
var metadata = reply.metadata; var metadata = reply.metadata;
@ -1030,12 +1018,7 @@ define([
* *
* @function _handle_iopub_message * @function _handle_iopub_message
*/ */
Kernel.prototype._handle_iopub_message = function (e) { Kernel.prototype._handle_iopub_message = function (msg) {
serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
};
Kernel.prototype._finish_iopub_message = function (msg) {
var handler = this.get_iopub_handler(msg.header.msg_type); var handler = this.get_iopub_handler(msg.header.msg_type);
if (handler !== undefined) { if (handler !== undefined) {
handler(msg); handler(msg);
@ -1045,12 +1028,7 @@ define([
/** /**
* @function _handle_input_request * @function _handle_input_request
*/ */
Kernel.prototype._handle_input_request = function (e) { Kernel.prototype._handle_input_request = function (request) {
serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
};
Kernel.prototype._finish_input_request = function (request) {
var header = request.header; var header = request.header;
var content = request.content; var content = request.content;
var metadata = request.metadata; var metadata = request.metadata;