diff --git a/IPython/html/base/zmqhandlers.py b/IPython/html/base/zmqhandlers.py index b3d781036..d0b77a1f4 100644 --- a/IPython/html/base/zmqhandlers.py +++ b/IPython/html/base/zmqhandlers.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. import json +import struct try: from urllib.parse import urlparse # Py 3 @@ -22,12 +23,70 @@ from tornado import web from tornado import websocket from IPython.kernel.zmq.session import Session -from IPython.utils.jsonutil import date_default +from IPython.utils.jsonutil import date_default, extract_dates from IPython.utils.py3compat import PY3, cast_unicode from .handlers import IPythonHandler +def serialize_binary_message(msg): + """serialize a message as a binary blob + + Header: + + 4 bytes: number of msg parts (nbufs) as 32b int + 4 * nbufs bytes: offset for each buffer as integer as 32b int + + Offsets are from the start of the buffer, including the header. + + Returns + ------- + + The message serialized to bytes. + + """ + # don't modify msg or buffer list in-place + msg = msg.copy() + buffers = list(msg.pop('buffers')) + bmsg = json.dumps(msg, default=date_default).encode('utf8') + buffers.insert(0, bmsg) + nbufs = len(buffers) + offsets = [4 * (nbufs + 1)] + for buf in buffers[:-1]: + offsets.append(offsets[-1] + len(buf)) + offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets) + buffers.insert(0, offsets_buf) + return b''.join(buffers) + + +def deserialize_binary_message(bmsg): + """deserialize a message from a binary blog + + Header: + + 4 bytes: number of msg parts (nbufs) as 32b int + 4 * nbufs bytes: offset for each buffer as integer as 32b int + + Offsets are from the start of the buffer, including the header. + + Returns + ------- + + message dictionary + """ + nbufs = struct.unpack('!i', bmsg[:4])[0] + offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)])) + offsets.append(None) + bufs = [] + for start, stop in zip(offsets[:-1], offsets[1:]): + bufs.append(bmsg[start:stop]) + msg = json.loads(bufs[0].decode('utf8')) + msg['header'] = extract_dates(msg['header']) + msg['parent_header'] = extract_dates(msg['parent_header']) + msg['buffers'] = bufs[1:] + return msg + + class ZMQStreamHandler(websocket.WebSocketHandler): def check_origin(self, origin): @@ -77,23 +136,19 @@ class ZMQStreamHandler(websocket.WebSocketHandler): def _reserialize_reply(self, msg_list): """Reserialize a reply message using JSON. - This takes the msg list from the ZMQ socket, unserializes it using + This takes the msg list from the ZMQ socket, deserializes it using self.session and then serializes the result using JSON. This method should be used by self._on_zmq_reply to build messages that can be sent back to the browser. """ idents, msg_list = self.session.feed_identities(msg_list) - msg = self.session.unserialize(msg_list) - try: - msg['header'].pop('date') - except KeyError: - pass - try: - msg['parent_header'].pop('date') - except KeyError: - pass - msg.pop('buffers') - return json.dumps(msg, default=date_default) + msg = self.session.deserialize(msg_list) + if msg['buffers']: + buf = serialize_binary_message(msg) + return buf + else: + smsg = json.dumps(msg, default=date_default) + return cast_unicode(smsg) def _on_zmq_reply(self, msg_list): # Sometimes this gets triggered when the on_close method is scheduled in the @@ -104,7 +159,7 @@ class ZMQStreamHandler(websocket.WebSocketHandler): except Exception: self.log.critical("Malformed message: %r" % msg_list, exc_info=True) else: - self.write_message(msg) + self.write_message(msg, binary=isinstance(msg, bytes)) def allow_draft76(self): """Allow draft 76, until browsers such as Safari update to RFC 6455. diff --git a/IPython/html/services/kernels/handlers.py b/IPython/html/services/kernels/handlers.py index 7d89ad6c7..6c5f2dfbf 100644 --- a/IPython/html/services/kernels/handlers.py +++ b/IPython/html/services/kernels/handlers.py @@ -12,7 +12,7 @@ from IPython.utils.py3compat import string_types from IPython.html.utils import url_path_join, url_escape from ...base.handlers import IPythonHandler, json_errors -from ...base.zmqhandlers import AuthenticatedZMQStreamHandler +from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message from IPython.core.release import kernel_protocol_version @@ -110,7 +110,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): """ idents,msg = self.session.feed_identities(msg) try: - msg = self.session.unserialize(msg) + msg = self.session.deserialize(msg) except: self.log.error("Bad kernel_info reply", exc_info=True) self._request_kernel_info() @@ -150,7 +150,10 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler): self.log.info("%s closed, closing websocket.", self) self.close() return - msg = json.loads(msg) + if isinstance(msg, bytes): + msg = deserialize_binary_message(msg) + else: + msg = json.loads(msg) self.session.send(self.zmq_stream, msg) def on_close(self): diff --git a/IPython/html/static/base/js/utils.js b/IPython/html/static/base/js/utils.js index 61eb84ab7..8e2530354 100644 --- a/IPython/html/static/base/js/utils.js +++ b/IPython/html/static/base/js/utils.js @@ -553,7 +553,7 @@ define([ ], callback, errback ); }; - + var utils = { regex_split : regex_split, uuid : uuid, diff --git a/IPython/html/static/services/kernels/js/comm.js b/IPython/html/static/services/kernels/js/comm.js index 04307cf40..91f3dc826 100644 --- a/IPython/html/static/services/kernels/js/comm.js +++ b/IPython/html/static/services/kernels/js/comm.js @@ -129,12 +129,12 @@ define([ return this.kernel.send_shell_message("comm_open", content, callbacks, metadata); }; - Comm.prototype.send = function (data, callbacks, metadata) { + Comm.prototype.send = function (data, callbacks, metadata, buffers) { var content = { comm_id : this.comm_id, data : data || {}, }; - return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata); + return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata, buffers); }; Comm.prototype.close = function (data, callbacks, metadata) { diff --git a/IPython/html/static/services/kernels/js/kernel.js b/IPython/html/static/services/kernels/js/kernel.js index 3a25e9066..9ad4fcb00 100644 --- a/IPython/html/static/services/kernels/js/kernel.js +++ b/IPython/html/static/services/kernels/js/kernel.js @@ -5,9 +5,10 @@ define([ 'base/js/namespace', 'jquery', 'base/js/utils', - 'services/kernels/js/comm', - 'widgets/js/init', -], function(IPython, $, utils, comm, widgetmanager) { + './comm', + './serialize', + 'widgets/js/init' +], function(IPython, $, utils, comm, serialize, widgetmanager) { "use strict"; /** @@ -69,7 +70,7 @@ define([ /** * @function _get_msg */ - Kernel.prototype._get_msg = function (msg_type, content, metadata) { + Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) { var msg = { header : { msg_id : utils.uuid(), @@ -80,6 +81,7 @@ define([ }, metadata : metadata || {}, content : content, + buffers : buffers || [], parent_header : {} }; return msg; @@ -596,12 +598,12 @@ define([ * * @function send_shell_message */ - Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata) { + Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) { if (!this.is_connected()) { throw new Error("kernel is not connected"); } - var msg = this._get_msg(msg_type, content, metadata); - this.channels.shell.send(JSON.stringify(msg)); + var msg = this._get_msg(msg_type, content, metadata, buffers); + this.channels.shell.send(serialize.serialize(msg)); this.set_callbacks_for_msg(msg.header.msg_id, callbacks); return msg.header.msg_id; }; @@ -752,7 +754,7 @@ define([ }; this.events.trigger('input_reply.Kernel', {kernel: this, content: content}); var msg = this._get_msg("input_reply", content); - this.channels.stdin.send(JSON.stringify(msg)); + this.channels.stdin.send(serialize.serialize(msg)); return msg.header.msg_id; }; @@ -850,8 +852,11 @@ define([ * @function _handle_shell_reply */ Kernel.prototype._handle_shell_reply = function (e) { - var reply = $.parseJSON(e.data); - this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply}); + serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this)); + }; + + Kernel.prototype._finish_shell_reply = function (reply) { + this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply}); var content = reply.content; var metadata = reply.metadata; var parent_id = reply.parent_header.msg_id; @@ -978,8 +983,11 @@ define([ * @function _handle_iopub_message */ Kernel.prototype._handle_iopub_message = function (e) { - var msg = $.parseJSON(e.data); + serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this)); + }; + + Kernel.prototype._finish_iopub_message = function (msg) { var handler = this.get_iopub_handler(msg.header.msg_type); if (handler !== undefined) { handler(msg); @@ -990,7 +998,11 @@ define([ * @function _handle_input_request */ Kernel.prototype._handle_input_request = function (e) { - var request = $.parseJSON(e.data); + serialize.deserialize(e.data, $.proxy(this._finish_input_request, this)); + }; + + + Kernel.prototype._finish_input_request = function (request) { var header = request.header; var content = request.content; var metadata = request.metadata; diff --git a/IPython/html/static/services/kernels/js/serialize.js b/IPython/html/static/services/kernels/js/serialize.js new file mode 100644 index 000000000..c86e3662f --- /dev/null +++ b/IPython/html/static/services/kernels/js/serialize.js @@ -0,0 +1,114 @@ +// Copyright (c) IPython Development Team. +// Distributed under the terms of the Modified BSD License. + +define([ + 'underscore', + ], function (_) { + "use strict"; + + var _deserialize_array_buffer = function (buf) { + var data = new DataView(buf); + // read the header: 1 + nbufs 32b integers + var nbufs = data.getUint32(0); + var offsets = []; + var i; + for (i = 1; i <= nbufs; i++) { + offsets.push(data.getUint32(i * 4)); + } + var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1])); + var msg = JSON.parse( + (new TextDecoder('utf8')).decode(json_bytes) + ); + // the remaining chunks are stored as DataViews in msg.buffers + msg.buffers = []; + var start, stop; + for (i = 1; i < nbufs; i++) { + start = offsets[i]; + stop = offsets[i+1] || buf.byteLength; + msg.buffers.push(new DataView(buf.slice(start, stop))); + } + return msg; + }; + + var _deserialize_binary = function(data, callback) { + // deserialize the binary message format + // callback will be called with a message whose buffers attribute + // will be an array of DataViews. + if (data instanceof Blob) { + // data is Blob, have to deserialize from ArrayBuffer in reader callback + var reader = new FileReader(); + reader.onload = function () { + var msg = _deserialize_array_buffer(this.result); + callback(msg); + }; + reader.readAsArrayBuffer(data); + } else { + // data is ArrayBuffer, can deserialize directly + var msg = _deserialize_array_buffer(data); + callback(msg); + } + }; + + var deserialize = function (data, callback) { + // deserialize a message and pass the unpacked message object to callback + if (typeof data === "string") { + // text JSON message + callback(JSON.parse(data)); + } else { + // binary message + _deserialize_binary(data, callback); + } + }; + + var _serialize_binary = function (msg) { + // implement the binary serialization protocol + // serializes JSON message to ArrayBuffer + msg = _.clone(msg); + var offsets = []; + var buffers = []; + msg.buffers.map(function (buf) { + buffers.push(buf); + }); + delete msg.buffers; + var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg)); + buffers.unshift(json_utf8); + var nbufs = buffers.length; + offsets.push(4 * (nbufs + 1)); + var i; + for (i = 0; i + 1 < buffers.length; i++) { + offsets.push(offsets[offsets.length-1] + buffers[i].byteLength); + } + var msg_buf = new Uint8Array( + offsets[offsets.length-1] + buffers[buffers.length-1].byteLength + ); + // use DataView.setUint32 for network byte-order + var view = new DataView(msg_buf.buffer); + // write nbufs to first 4 bytes + view.setUint32(0, nbufs); + // write offsets to next 4 * nbufs bytes + for (i = 0; i < offsets.length; i++) { + view.setUint32(4 * (i+1), offsets[i]); + } + // write all the buffers at their respective offsets + for (i = 0; i < buffers.length; i++) { + msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]); + } + + // return raw ArrayBuffer + return msg_buf.buffer; + }; + + var serialize = function (msg) { + if (msg.buffers && msg.buffers.length) { + return _serialize_binary(msg); + } else { + return JSON.stringify(msg); + } + }; + + var exports = { + deserialize : deserialize, + serialize: serialize + }; + return exports; +}); \ No newline at end of file diff --git a/IPython/html/templates/notebook.html b/IPython/html/templates/notebook.html index c116506cb..790f74838 100644 --- a/IPython/html/templates/notebook.html +++ b/IPython/html/templates/notebook.html @@ -317,6 +317,7 @@ class="notebook_app" {% block script %} {{super()}} + diff --git a/IPython/html/tests/services/serialize.js b/IPython/html/tests/services/serialize.js new file mode 100644 index 000000000..fe61bf11f --- /dev/null +++ b/IPython/html/tests/services/serialize.js @@ -0,0 +1,113 @@ +// +// Test binary messages on websockets. +// Only works on slimer for now, due to old websocket impl in phantomjs. +// + +casper.notebook_test(function () { + if (!this.slimerjs) { + console.log("Can't test binary websockets on phantomjs."); + return; + } + // create EchoBuffers target on js-side. + // it just captures and echos comm messages. + this.then(function () { + var success = this.evaluate(function () { + IPython._msgs = []; + + var EchoBuffers = function(comm) { + this.comm = comm; + this.comm.on_msg($.proxy(this.on_msg, this)); + }; + + EchoBuffers.prototype.on_msg = function (msg) { + IPython._msgs.push(msg); + this.comm.send(msg.content.data, {}, {}, msg.buffers); + }; + + IPython.notebook.kernel.comm_manager.register_target("echo", function (comm) { + return new EchoBuffers(comm); + }); + + return true; + }); + this.test.assertEquals(success, true, "Created echo comm target"); + }); + + // Create a similar comm that captures messages Python-side + this.then(function () { + var index = this.append_cell([ + "import os", + "from IPython.kernel.comm import Comm", + "comm = Comm(target_name='echo')", + "msgs = []", + "def on_msg(msg):", + " msgs.append(msg)", + "comm.on_msg(on_msg)" + ].join('\n'), 'code'); + this.execute_cell(index); + }); + + // send a message with binary data + this.then(function () { + var index = this.append_cell([ + "buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']", + "comm.send(data='hi', buffers=buffers)" + ].join('\n'), 'code'); + this.execute_cell(index); + }); + + // wait for capture + this.waitFor(function () { + return this.evaluate(function () { + return IPython._msgs.length > 0; + }); + }); + + // validate captured buffers js-side + this.then(function () { + var msgs = this.evaluate(function () { + return IPython._msgs; + }); + this.test.assertEquals(msgs.length, 1, "Captured comm message"); + var buffers = msgs[0].buffers; + this.test.assertEquals(buffers.length, 2, "comm message has buffers"); + + // extract attributes to test in evaluate, + // because the raw DataViews can't be passed across + var buf_info = function (index) { + var buf = IPython._msgs[0].buffers[index]; + var data = {}; + data.byteLength = buf.byteLength; + data.bytes = []; + for (var i = 0; i < data.byteLength; i++) { + data.bytes.push(buf.getUint8(i)); + } + return data; + }; + + buf0 = this.evaluate(buf_info, 0); + buf1 = this.evaluate(buf_info, 1); + this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size'); + this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes'); + this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size'); + this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes'); + }); + + // validate captured buffers Python-side + this.then(function () { + var index = this.append_cell([ + "assert len(msgs) == 1, len(msgs)", + "bufs = msgs[0]['buffers']", + "assert len(bufs) == len(buffers), bufs", + "assert bufs[0].bytes == buffers[0], bufs[0].bytes", + "assert bufs[1].bytes == buffers[1], bufs[1].bytes", + "1", + ].join('\n'), 'code'); + this.execute_cell(index); + this.wait_for_output(index); + this.then(function () { + var out = this.get_output_cell(index); + this.test.assertEquals(out['text/plain'], '1', "Python received buffers"); + }); + }); +}); diff --git a/IPython/html/tests/test_serialize.py b/IPython/html/tests/test_serialize.py new file mode 100644 index 000000000..7a88b29f4 --- /dev/null +++ b/IPython/html/tests/test_serialize.py @@ -0,0 +1,26 @@ +"""Test serialize/deserialize messages with buffers""" + +import os + +import nose.tools as nt + +from IPython.kernel.zmq.session import Session +from ..base.zmqhandlers import ( + serialize_binary_message, + deserialize_binary_message, +) + +def test_serialize_binary(): + s = Session() + msg = s.msg('data_pub', content={'a': 'b'}) + msg['buffers'] = [ os.urandom(3) for i in range(3) ] + bmsg = serialize_binary_message(msg) + nt.assert_is_instance(bmsg, bytes) + +def test_deserialize_binary(): + s = Session() + msg = s.msg('data_pub', content={'a': 'b'}) + msg['buffers'] = [ os.urandom(2) for i in range(3) ] + bmsg = serialize_binary_message(msg) + msg2 = deserialize_binary_message(bmsg) + nt.assert_equal(msg2, msg) diff --git a/setupbase.py b/setupbase.py index 5e85d4b75..7a9da5e18 100644 --- a/setupbase.py +++ b/setupbase.py @@ -161,7 +161,8 @@ def find_package_data(): pjoin(components, "requirejs", "require.js"), pjoin(components, "underscore", "underscore-min.js"), pjoin(components, "moment", "moment.js"), - pjoin(components, "moment", "min","moment.min.js"), + pjoin(components, "moment", "min", "moment.min.js"), + pjoin(components, "text-encoding", "lib", "encoding.js"), ]) # Ship all of Codemirror's CSS and JS