Merge pull request #6110 from minrk/binarycomm

support binary buffers in comm messages
This commit is contained in:
Matthias Bussonnier 2014-10-19 11:03:24 +02:00
commit d269912958
10 changed files with 358 additions and 33 deletions

View File

@ -4,6 +4,7 @@
# Distributed under the terms of the Modified BSD License.
import json
import struct
try:
from urllib.parse import urlparse # Py 3
@ -22,12 +23,70 @@ from tornado import web
from tornado import websocket
from IPython.kernel.zmq.session import Session
from IPython.utils.jsonutil import date_default
from IPython.utils.jsonutil import date_default, extract_dates
from IPython.utils.py3compat import PY3, cast_unicode
from .handlers import IPythonHandler
def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
# don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
buffers.insert(0, offsets_buf)
return b''.join(buffers)
def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
nbufs = struct.unpack('!i', bmsg[:4])[0]
offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
msg['buffers'] = bufs[1:]
return msg
class ZMQStreamHandler(websocket.WebSocketHandler):
def check_origin(self, origin):
@ -77,23 +136,19 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
def _reserialize_reply(self, msg_list):
"""Reserialize a reply message using JSON.
This takes the msg list from the ZMQ socket, unserializes it using
This takes the msg list from the ZMQ socket, deserializes it using
self.session and then serializes the result using JSON. This method
should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
idents, msg_list = self.session.feed_identities(msg_list)
msg = self.session.unserialize(msg_list)
try:
msg['header'].pop('date')
except KeyError:
pass
try:
msg['parent_header'].pop('date')
except KeyError:
pass
msg.pop('buffers')
return json.dumps(msg, default=date_default)
msg = self.session.deserialize(msg_list)
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)
def _on_zmq_reply(self, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
@ -104,7 +159,7 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
self.write_message(msg)
self.write_message(msg, binary=isinstance(msg, bytes))
def allow_draft76(self):
"""Allow draft 76, until browsers such as Safari update to RFC 6455.

View File

@ -12,7 +12,7 @@ from IPython.utils.py3compat import string_types
from IPython.html.utils import url_path_join, url_escape
from ...base.handlers import IPythonHandler, json_errors
from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
from IPython.core.release import kernel_protocol_version
@ -110,7 +110,7 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
"""
idents,msg = self.session.feed_identities(msg)
try:
msg = self.session.unserialize(msg)
msg = self.session.deserialize(msg)
except:
self.log.error("Bad kernel_info reply", exc_info=True)
self._request_kernel_info()
@ -150,6 +150,9 @@ class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
self.log.info("%s closed, closing websocket.", self)
self.close()
return
if isinstance(msg, bytes):
msg = deserialize_binary_message(msg)
else:
msg = json.loads(msg)
self.session.send(self.zmq_stream, msg)

View File

@ -129,12 +129,12 @@ define([
return this.kernel.send_shell_message("comm_open", content, callbacks, metadata);
};
Comm.prototype.send = function (data, callbacks, metadata) {
Comm.prototype.send = function (data, callbacks, metadata, buffers) {
var content = {
comm_id : this.comm_id,
data : data || {},
};
return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata);
return this.kernel.send_shell_message("comm_msg", content, callbacks, metadata, buffers);
};
Comm.prototype.close = function (data, callbacks, metadata) {

View File

@ -5,9 +5,10 @@ define([
'base/js/namespace',
'jquery',
'base/js/utils',
'services/kernels/js/comm',
'widgets/js/init',
], function(IPython, $, utils, comm, widgetmanager) {
'./comm',
'./serialize',
'widgets/js/init'
], function(IPython, $, utils, comm, serialize, widgetmanager) {
"use strict";
/**
@ -69,7 +70,7 @@ define([
/**
* @function _get_msg
*/
Kernel.prototype._get_msg = function (msg_type, content, metadata) {
Kernel.prototype._get_msg = function (msg_type, content, metadata, buffers) {
var msg = {
header : {
msg_id : utils.uuid(),
@ -80,6 +81,7 @@ define([
},
metadata : metadata || {},
content : content,
buffers : buffers || [],
parent_header : {}
};
return msg;
@ -596,12 +598,12 @@ define([
*
* @function send_shell_message
*/
Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata) {
Kernel.prototype.send_shell_message = function (msg_type, content, callbacks, metadata, buffers) {
if (!this.is_connected()) {
throw new Error("kernel is not connected");
}
var msg = this._get_msg(msg_type, content, metadata);
this.channels.shell.send(JSON.stringify(msg));
var msg = this._get_msg(msg_type, content, metadata, buffers);
this.channels.shell.send(serialize.serialize(msg));
this.set_callbacks_for_msg(msg.header.msg_id, callbacks);
return msg.header.msg_id;
};
@ -752,7 +754,7 @@ define([
};
this.events.trigger('input_reply.Kernel', {kernel: this, content: content});
var msg = this._get_msg("input_reply", content);
this.channels.stdin.send(JSON.stringify(msg));
this.channels.stdin.send(serialize.serialize(msg));
return msg.header.msg_id;
};
@ -850,8 +852,11 @@ define([
* @function _handle_shell_reply
*/
Kernel.prototype._handle_shell_reply = function (e) {
var reply = $.parseJSON(e.data);
this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply});
serialize.deserialize(e.data, $.proxy(this._finish_shell_reply, this));
};
Kernel.prototype._finish_shell_reply = function (reply) {
this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
var content = reply.content;
var metadata = reply.metadata;
var parent_id = reply.parent_header.msg_id;
@ -978,8 +983,11 @@ define([
* @function _handle_iopub_message
*/
Kernel.prototype._handle_iopub_message = function (e) {
var msg = $.parseJSON(e.data);
serialize.deserialize(e.data, $.proxy(this._finish_iopub_message, this));
};
Kernel.prototype._finish_iopub_message = function (msg) {
var handler = this.get_iopub_handler(msg.header.msg_type);
if (handler !== undefined) {
handler(msg);
@ -990,7 +998,11 @@ define([
* @function _handle_input_request
*/
Kernel.prototype._handle_input_request = function (e) {
var request = $.parseJSON(e.data);
serialize.deserialize(e.data, $.proxy(this._finish_input_request, this));
};
Kernel.prototype._finish_input_request = function (request) {
var header = request.header;
var content = request.content;
var metadata = request.metadata;

View File

@ -0,0 +1,114 @@
// Copyright (c) IPython Development Team.
// Distributed under the terms of the Modified BSD License.
define([
'underscore',
], function (_) {
"use strict";
var _deserialize_array_buffer = function (buf) {
var data = new DataView(buf);
// read the header: 1 + nbufs 32b integers
var nbufs = data.getUint32(0);
var offsets = [];
var i;
for (i = 1; i <= nbufs; i++) {
offsets.push(data.getUint32(i * 4));
}
var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
var msg = JSON.parse(
(new TextDecoder('utf8')).decode(json_bytes)
);
// the remaining chunks are stored as DataViews in msg.buffers
msg.buffers = [];
var start, stop;
for (i = 1; i < nbufs; i++) {
start = offsets[i];
stop = offsets[i+1] || buf.byteLength;
msg.buffers.push(new DataView(buf.slice(start, stop)));
}
return msg;
};
var _deserialize_binary = function(data, callback) {
// deserialize the binary message format
// callback will be called with a message whose buffers attribute
// will be an array of DataViews.
if (data instanceof Blob) {
// data is Blob, have to deserialize from ArrayBuffer in reader callback
var reader = new FileReader();
reader.onload = function () {
var msg = _deserialize_array_buffer(this.result);
callback(msg);
};
reader.readAsArrayBuffer(data);
} else {
// data is ArrayBuffer, can deserialize directly
var msg = _deserialize_array_buffer(data);
callback(msg);
}
};
var deserialize = function (data, callback) {
// deserialize a message and pass the unpacked message object to callback
if (typeof data === "string") {
// text JSON message
callback(JSON.parse(data));
} else {
// binary message
_deserialize_binary(data, callback);
}
};
var _serialize_binary = function (msg) {
// implement the binary serialization protocol
// serializes JSON message to ArrayBuffer
msg = _.clone(msg);
var offsets = [];
var buffers = [];
msg.buffers.map(function (buf) {
buffers.push(buf);
});
delete msg.buffers;
var json_utf8 = (new TextEncoder('utf8')).encode(JSON.stringify(msg));
buffers.unshift(json_utf8);
var nbufs = buffers.length;
offsets.push(4 * (nbufs + 1));
var i;
for (i = 0; i + 1 < buffers.length; i++) {
offsets.push(offsets[offsets.length-1] + buffers[i].byteLength);
}
var msg_buf = new Uint8Array(
offsets[offsets.length-1] + buffers[buffers.length-1].byteLength
);
// use DataView.setUint32 for network byte-order
var view = new DataView(msg_buf.buffer);
// write nbufs to first 4 bytes
view.setUint32(0, nbufs);
// write offsets to next 4 * nbufs bytes
for (i = 0; i < offsets.length; i++) {
view.setUint32(4 * (i+1), offsets[i]);
}
// write all the buffers at their respective offsets
for (i = 0; i < buffers.length; i++) {
msg_buf.set(new Uint8Array(buffers[i].buffer), offsets[i]);
}
// return raw ArrayBuffer
return msg_buf.buffer;
};
var serialize = function (msg) {
if (msg.buffers && msg.buffers.length) {
return _serialize_binary(msg);
} else {
return JSON.stringify(msg);
}
};
var exports = {
deserialize : deserialize,
serialize: serialize
};
return exports;
});

View File

@ -317,6 +317,7 @@ class="notebook_app"
{% block script %}
{{super()}}
<script src="{{ static_url("components/text-encoding/lib/encoding.js") }}" charset="utf-8"></script>
<script src="{{ static_url("notebook/js/main.js") }}" charset="utf-8"></script>

View File

@ -0,0 +1,113 @@
//
// Test binary messages on websockets.
// Only works on slimer for now, due to old websocket impl in phantomjs.
//
casper.notebook_test(function () {
if (!this.slimerjs) {
console.log("Can't test binary websockets on phantomjs.");
return;
}
// create EchoBuffers target on js-side.
// it just captures and echos comm messages.
this.then(function () {
var success = this.evaluate(function () {
IPython._msgs = [];
var EchoBuffers = function(comm) {
this.comm = comm;
this.comm.on_msg($.proxy(this.on_msg, this));
};
EchoBuffers.prototype.on_msg = function (msg) {
IPython._msgs.push(msg);
this.comm.send(msg.content.data, {}, {}, msg.buffers);
};
IPython.notebook.kernel.comm_manager.register_target("echo", function (comm) {
return new EchoBuffers(comm);
});
return true;
});
this.test.assertEquals(success, true, "Created echo comm target");
});
// Create a similar comm that captures messages Python-side
this.then(function () {
var index = this.append_cell([
"import os",
"from IPython.kernel.comm import Comm",
"comm = Comm(target_name='echo')",
"msgs = []",
"def on_msg(msg):",
" msgs.append(msg)",
"comm.on_msg(on_msg)"
].join('\n'), 'code');
this.execute_cell(index);
});
// send a message with binary data
this.then(function () {
var index = this.append_cell([
"buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']",
"comm.send(data='hi', buffers=buffers)"
].join('\n'), 'code');
this.execute_cell(index);
});
// wait for capture
this.waitFor(function () {
return this.evaluate(function () {
return IPython._msgs.length > 0;
});
});
// validate captured buffers js-side
this.then(function () {
var msgs = this.evaluate(function () {
return IPython._msgs;
});
this.test.assertEquals(msgs.length, 1, "Captured comm message");
var buffers = msgs[0].buffers;
this.test.assertEquals(buffers.length, 2, "comm message has buffers");
// extract attributes to test in evaluate,
// because the raw DataViews can't be passed across
var buf_info = function (index) {
var buf = IPython._msgs[0].buffers[index];
var data = {};
data.byteLength = buf.byteLength;
data.bytes = [];
for (var i = 0; i < data.byteLength; i++) {
data.bytes.push(buf.getUint8(i));
}
return data;
};
buf0 = this.evaluate(buf_info, 0);
buf1 = this.evaluate(buf_info, 1);
this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size');
this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes');
this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size');
this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes');
});
// validate captured buffers Python-side
this.then(function () {
var index = this.append_cell([
"assert len(msgs) == 1, len(msgs)",
"bufs = msgs[0]['buffers']",
"assert len(bufs) == len(buffers), bufs",
"assert bufs[0].bytes == buffers[0], bufs[0].bytes",
"assert bufs[1].bytes == buffers[1], bufs[1].bytes",
"1",
].join('\n'), 'code');
this.execute_cell(index);
this.wait_for_output(index);
this.then(function () {
var out = this.get_output_cell(index);
this.test.assertEquals(out['text/plain'], '1', "Python received buffers");
});
});
});

View File

@ -0,0 +1,26 @@
"""Test serialize/deserialize messages with buffers"""
import os
import nose.tools as nt
from IPython.kernel.zmq.session import Session
from ..base.zmqhandlers import (
serialize_binary_message,
deserialize_binary_message,
)
def test_serialize_binary():
s = Session()
msg = s.msg('data_pub', content={'a': 'b'})
msg['buffers'] = [ os.urandom(3) for i in range(3) ]
bmsg = serialize_binary_message(msg)
nt.assert_is_instance(bmsg, bytes)
def test_deserialize_binary():
s = Session()
msg = s.msg('data_pub', content={'a': 'b'})
msg['buffers'] = [ os.urandom(2) for i in range(3) ]
bmsg = serialize_binary_message(msg)
msg2 = deserialize_binary_message(bmsg)
nt.assert_equal(msg2, msg)

View File

@ -161,7 +161,8 @@ def find_package_data():
pjoin(components, "requirejs", "require.js"),
pjoin(components, "underscore", "underscore-min.js"),
pjoin(components, "moment", "moment.js"),
pjoin(components, "moment", "min","moment.min.js"),
pjoin(components, "moment", "min", "moment.min.js"),
pjoin(components, "text-encoding", "lib", "encoding.js"),
])
# Ship all of Codemirror's CSS and JS