Merge pull request #7780 from jasongrout/message-race

Fix race condition in javascript kernel message processing
This commit is contained in:
Min RK 2015-03-02 09:27:36 -08:00
commit 6cb15fbffe
3 changed files with 48 additions and 27 deletions

View File

@ -41,6 +41,7 @@ define([
this.username = "username";
this.session_id = utils.uuid();
this._msg_callbacks = {};
this._msg_queue = Promise.resolve();
this.info_reply = {}; // kernel_info_reply stored here after starting
if (typeof(WebSocket) !== 'undefined') {
@ -854,7 +855,10 @@ define([
};
Kernel.prototype._handle_ws_message = function (e) {
serialize.deserialize(e.data, $.proxy(this._finish_ws_message, this));
this._msg_queue = this._msg_queue.then(function() {
return serialize.deserialize(e.data);
}).then($.proxy(this._finish_ws_message, this))
.catch(utils.reject("Couldn't process kernel message", true));
};
Kernel.prototype._finish_ws_message = function (msg) {

View File

@ -30,7 +30,7 @@ define([
return msg;
};
var _deserialize_binary = function(data, callback) {
var _deserialize_binary = function(data) {
/**
* deserialize the binary message format
* callback will be called with a message whose buffers attribute
@ -39,28 +39,31 @@ define([
if (data instanceof Blob) {
// data is Blob, have to deserialize from ArrayBuffer in reader callback
var reader = new FileReader();
reader.onload = function () {
var msg = _deserialize_array_buffer(this.result);
callback(msg);
};
var promise = new Promise(function(resolve, reject) {
reader.onload = function () {
var msg = _deserialize_array_buffer(this.result);
resolve(msg);
};
});
reader.readAsArrayBuffer(data);
return promise;
} else {
// data is ArrayBuffer, can deserialize directly
var msg = _deserialize_array_buffer(data);
callback(msg);
return msg;
}
};
var deserialize = function (data, callback) {
var deserialize = function (data) {
/**
* deserialize a message and pass the unpacked message object to callback
* deserialize a message and return a promise for the unpacked message
*/
if (typeof data === "string") {
// text JSON message
callback(JSON.parse(data));
return Promise.resolve(JSON.parse(data));
} else {
// binary message
_deserialize_binary(data, callback);
return Promise.resolve(_deserialize_binary(data));
}
};
@ -117,4 +120,4 @@ define([
serialize: serialize
};
return exports;
});
});

View File

@ -51,7 +51,9 @@ casper.notebook_test(function () {
this.then(function () {
var index = this.append_cell([
"buffers = [b'\\xFF\\x00', b'\\x00\\x01\\x02']",
"comm.send(data='hi', buffers=buffers)"
"comm.send(data='message 0', buffers=buffers)",
"comm.send(data='message 1')",
"comm.send(data='message 2', buffers=buffers)",
].join('\n'), 'code');
this.execute_cell(index);
});
@ -59,7 +61,7 @@ casper.notebook_test(function () {
// wait for capture
this.waitFor(function () {
return this.evaluate(function () {
return IPython._msgs.length > 0;
return IPython._msgs.length >= 3;
});
});
@ -68,14 +70,22 @@ casper.notebook_test(function () {
var msgs = this.evaluate(function () {
return IPython._msgs;
});
this.test.assertEquals(msgs.length, 1, "Captured comm message");
var buffers = msgs[0].buffers;
this.test.assertEquals(buffers.length, 2, "comm message has buffers");
this.test.assertEquals(msgs.length, 3, "Captured three comm messages");
// check the messages came in the right order
this.test.assertEquals(msgs[0].content.data, "message 0", "message 0 processed first");
this.test.assertEquals(msgs[0].buffers.length, 2, "comm message 0 has two buffers");
this.test.assertEquals(msgs[1].content.data, "message 1", "message 1 processed second");
this.test.assertEquals(msgs[1].buffers.length, 0, "comm message 1 has no buffers");
this.test.assertEquals(msgs[2].content.data, "message 2", "message 2 processed third");
this.test.assertEquals(msgs[2].buffers.length, 2, "comm message 2 has two buffers");
// extract attributes to test in evaluate,
// because the raw DataViews can't be passed across
var buf_info = function (index) {
var buf = IPython._msgs[0].buffers[index];
var buf_info = function (message, index) {
var buf = IPython._msgs[message].buffers[index];
var data = {};
data.byteLength = buf.byteLength;
data.bytes = [];
@ -85,18 +95,22 @@ casper.notebook_test(function () {
return data;
};
buf0 = this.evaluate(buf_info, 0);
buf1 = this.evaluate(buf_info, 1);
this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size');
this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes');
this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size');
this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes');
var msgs_with_buffers = [0, 2];
for (var i = 0; i < msgs_with_buffers.length; i++) {
msg_index = msgs_with_buffers[i];
buf0 = this.evaluate(buf_info, msg_index, 0);
buf1 = this.evaluate(buf_info, msg_index, 1);
this.test.assertEquals(buf0.byteLength, 2, 'buf[0] has correct size in message '+msg_index);
this.test.assertEquals(buf0.bytes, [255, 0], 'buf[0] has correct bytes in message '+msg_index);
this.test.assertEquals(buf1.byteLength, 3, 'buf[1] has correct size in message '+msg_index);
this.test.assertEquals(buf1.bytes, [0, 1, 2], 'buf[1] has correct bytes in message '+msg_index);
}
});
// validate captured buffers Python-side
this.then(function () {
var index = this.append_cell([
"assert len(msgs) == 1, len(msgs)",
"assert len(msgs) == 3, len(msgs)",
"bufs = msgs[0]['buffers']",
"assert len(bufs) == len(buffers), bufs",
"assert bufs[0].bytes == buffers[0], bufs[0].bytes",
@ -107,7 +121,7 @@ casper.notebook_test(function () {
this.wait_for_output(index);
this.then(function () {
var out = this.get_output_cell(index);
this.test.assertEquals(out['text/plain'], '1', "Python received buffers");
this.test.assertEquals(out.data['text/plain'], '1', "Python received buffers");
});
});
});