support buffers in comm messages

- add buffers arg to comm Python api
- support binary websocket messages when buffers are present
- reimplement utf8 in javascript, because javascript is the best
This commit is contained in:
MinRK 2014-07-10 17:03:35 -05:00
parent c0108e1089
commit 3a1c845f96
3 changed files with 161 additions and 8 deletions

View File

@ -4,6 +4,7 @@
# Distributed under the terms of the Modified BSD License.
import json
import struct
try:
from urllib.parse import urlparse # Py 3
@ -28,6 +29,61 @@ from IPython.utils.py3compat import PY3, cast_unicode
from .handlers import IPythonHandler
def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
buffers = msg.pop('buffers')
bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
sizes = (len(buf) for buf in buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
offsets_buf = struct.pack('!' + 'i' * (nbufs + 1), nbufs, *offsets)
buffers.insert(0, offsets_buf)
return b''.join(buffers)
def unserialize_binary_message(bmsg):
"""unserialize a message from a binary blog
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
nbufs = struct.unpack('i', bmsg[:4])[0]
offsets = list(struct.unpack('!' + 'i' * nbufs, bmsg[4:4*(nbufs+1)]))
offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
msg = json.loads(bufs[0])
msg['buffers'] = bufs[1:]
return msg
class ZMQStreamHandler(websocket.WebSocketHandler):
def check_origin(self, origin):
@ -92,8 +148,12 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
msg['parent_header'].pop('date')
except KeyError:
pass
msg.pop('buffers')
return json.dumps(msg, default=date_default)
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)
def _on_zmq_reply(self, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
@ -104,7 +164,7 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
self.write_message(msg)
self.write_message(msg, binary=isinstance(msg, bytes))
def allow_draft76(self):
"""Allow draft 76, until browsers such as Safari update to RFC 6455.

View File

@ -553,7 +553,44 @@ define([
], callback, errback
);
};
var decode_utf8 = function (array) {
// Decode UTF8 Uint8Array to String
// I can't believe Javascript makes us do this
// From http://stackoverflow.com/questions/17191945
var out, i, len, c;
var char2, char3;
out = "";
len = array.length;
i = 0;
while(i < len) {
c = array[i++];
switch(c >> 4) {
case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
// 0xxxxxxx
out += String.fromCharCode(c);
break;
case 12: case 13:
// 110x xxxx 10xx xxxx
char2 = array[i++];
out += String.fromCharCode(((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
// 1110 xxxx 10xx xxxx 10xx xxxx
char2 = array[i++];
char3 = array[i++];
out += String.fromCharCode(((c & 0x0F) << 12) |
((char2 & 0x3F) << 6) |
((char3 & 0x3F) << 0));
break;
}
}
return out;
};
var utils = {
regex_split : regex_split,
uuid : uuid,
@ -579,6 +616,7 @@ define([
ajax_error_msg : ajax_error_msg,
log_ajax_error : log_ajax_error,
requireCodeMirrorMode : requireCodeMirrorMode,
decode_utf8: decode_utf8,
};
// Backwards compatability.

View File

@ -846,12 +846,60 @@ define([
}
};
Kernel.prototype._unserialize_binary_message = function(blob, callback) {
// unserialize the binary message format
// callback will be called with a message whose buffers attribute
// will be an array of DataViews.
var reader = new FileReader();
reader.onload = function(e) {
var data = new DataView(this.result);
// read the header: 1 + nbufs 32b integers
var nbufs = data.getInt32(0);
var offsets = [];
var i;
for (i = 1; i <= nbufs; i++) {
offsets.push(data.getInt32(i * 4));
}
// the first chunk is the message as utf-8 JSON
var msg = $.parseJSON(
utis.decode_utf8(
new Uint8Array(this.result.slice(offsets[0], offsets[1]))
)
);
// the remaining chunks are stored as DataViews in msg.buffers
msg.buffers = [];
var start, stop;
for (i = 1; i < nbufs; i++) {
start = offsets[i];
stop = offsets[i+1];
msg.buffers.push(new DataView(this.result.slice(start, stop)));
}
callback(msg);
};
reader.readAsArrayBuffer(blob);
};
Kernel.prototype._unserialize_msg = function (e, callback) {
// unserialze a message and pass the unpacked message object to callback
if (typeof e.data === "string") {
// text JSON message
callback($.parseJSON(e.data));
} else {
// binary message
this._unserialize_binary_message(e.data, callback);
}
};
/**
* @function _handle_shell_reply
*/
Kernel.prototype._handle_shell_reply = function (e) {
var reply = $.parseJSON(e.data);
this.events.trigger('shell_reply.Kernel', {kernel: this, reply: reply});
this._unserialize_msg(e, $.proxy(this._finish_shell_reply, this));
};
Kernel.prototype._finish_shell_reply = function (reply) {
this.events.trigger('shell_reply.Kernel', {kernel: this, reply:reply});
var content = reply.content;
var metadata = reply.metadata;
var parent_id = reply.parent_header.msg_id;
@ -978,8 +1026,11 @@ define([
* @function _handle_iopub_message
*/
Kernel.prototype._handle_iopub_message = function (e) {
var msg = $.parseJSON(e.data);
this._unserialize_msg(e, $.proxy(this._finish_iopub_message, this));
};
Kernel.prototype._finish_iopub_message = function (msg) {
var handler = this.get_iopub_handler(msg.header.msg_type);
if (handler !== undefined) {
handler(msg);
@ -990,7 +1041,11 @@ define([
* @function _handle_input_request
*/
Kernel.prototype._handle_input_request = function (e) {
var request = $.parseJSON(e.data);
this._unserialize_msg(e, $.proxy(this._finish_input_request, this));
};
Kernel.prototype._finish_input_request = function (request) {
var header = request.header;
var content = request.content;
var metadata = request.metadata;