Skip to content

Commit 01a7b4f

Browse files
authored
Add a whitelist option to the javascript library. Can speed up dispatching substantially (#381)
* Add a whitelist option to the javascript library. Can speed up decoding/dispatching substantially if you know you want to ignore some messages. * Add a test and a benchmark. * travis doesn't like microtime, and we don't need it
1 parent bce18da commit 01a7b4f

File tree

4 files changed

+195
-15
lines changed

4 files changed

+195
-15
lines changed
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Benchmark how much time is saved in dispatch by ignoring some messages (in this case half)
2+
var Benchmark = require('benchmark');
3+
var stream = require('stream');
4+
var path = require('path');
5+
6+
var dispatch = require(path.resolve(__dirname, './sbp/')).dispatch;
7+
var MsgPosLlh = require(path.resolve(__dirname, './sbp/navigation')).MsgPosLlh;
8+
9+
var suite = new Benchmark.Suite();
10+
var Readable = stream.Readable;
11+
12+
var msgLlhPayload = new Buffer('VQEC9tciFC4nAPod4rrrtkJAE8szxBiLXsAfnaDoenNRQAAAAAAJAOyL', 'base64');
13+
var msgVelEcefPayload = new Buffer('VQQC9tcUFC4nANoLAACG9f//o/z//wAACQBQ7A==', 'base64');
14+
15+
function runDispatch (whitelist, deferred) {
16+
var rs = new Readable();
17+
rs.push(msgVelEcefPayload);
18+
rs.push(msgLlhPayload);
19+
rs.push(null);
20+
21+
var callbacks = 0;
22+
dispatch(rs, whitelist, function (err, framedMessage) {
23+
callbacks++;
24+
if (whitelist && callbacks === 1) {
25+
deferred.resolve();
26+
} else if (!whitelist && callbacks === 2) {
27+
deferred.resolve();
28+
}
29+
});
30+
}
31+
32+
suite.add('dispatch without whitelist', {
33+
defer: true,
34+
fn: function (deferred) {
35+
runDispatch(undefined, deferred);
36+
}
37+
}).add('dispatch with whitelist', {
38+
defer: true,
39+
fn: function (deferred) {
40+
runDispatch([MsgPosLlh.prototype.msg_type], deferred);
41+
}
42+
}).on('cycle', function (event) {
43+
console.log(String(event.target));
44+
}).on('complete', function () {
45+
console.log('Fastest is ' + this.filter('fastest').map('name'));
46+
}).run({ async: false });

javascript/sbp/msg.js

+52-15
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ var mergeDict = function (dest, src) {
6363
return dest;
6464
};
6565

66+
function ParameterValidationError (message) {
67+
this.name = 'ParameterValidationError';
68+
this.message = message;
69+
this.stack = (new Error()).stack;
70+
}
71+
ParameterValidationError.prototype = Object.create(Error.prototype);
72+
ParameterValidationError.prototype.constructor = ParameterValidationError;
73+
6674
function BufferTooShortError (message) {
6775
this.name = 'BufferTooShortError';
6876
this.message = message;
@@ -169,14 +177,31 @@ module.exports = {
169177
*
170178
* This corresponds to the logic in `framer.py`.
171179
*
172-
* @param stream: A Readable stream of bytes.
173-
* @param callback: a callback function invoked when a framed message is found and decoded in the stream.
180+
* @param {Stream} stream - A Readable stream of bytes.
181+
* @param {function|Array|number} [messageWhitelist]: An optional parameter that will filter
182+
* messages. Filtered messages will not be fully decoded, improving performance. Whitelist
183+
* can be an array of message types, a numeric mask that will be applied to the message type,
184+
* or a function that takes the message type and returns a boolean.
185+
* @param {function} callback: a callback function invoked when a framed message is found and decoded in the stream.
174186
* @returns [parsed SBP object, Buffer]
175187
*/
176-
dispatch: function dispatch (stream, callback) {
188+
dispatch: function dispatch (stream, messageWhitelistIn, callbackIn) {
177189
var offset = 0;
178190
var streamBuffer = new Buffer(0);
179191

192+
var callback, messageWhitelist;
193+
194+
if (typeof callbackIn === 'undefined' && typeof messageWhitelistIn === 'function') {
195+
callback = messageWhitelistIn;
196+
} else {
197+
callback = callbackIn;
198+
messageWhitelist = messageWhitelistIn;
199+
}
200+
201+
if (messageWhitelist && !(Array.isArray(messageWhitelist) || ['function', 'number'].indexOf(typeof messageWhitelist) !== -1)) {
202+
throw ParameterValidationError('dispatch: messageWhitelist must be function, number, or array');
203+
}
204+
180205
var getFramedMessage = function () {
181206
var headerBuf, payloadBuf;
182207
var preamble, msgType, sender, length, crc;
@@ -203,6 +228,15 @@ module.exports = {
203228
sender = streamBuffer.readUInt16LE(preamblePos+3);
204229
length = streamBuffer.readUInt8(preamblePos+5);
205230

231+
// Don't continue if message isn't whitelisted...
232+
var whitelistedArray = messageWhitelist && Array.isArray(messageWhitelist) && messageWhitelist.indexOf(msgType) !== -1;
233+
var whitelistedMask = messageWhitelist && typeof messageWhitelist === 'number' && (messageWhitelist & msgType);
234+
var whitelistedFn = messageWhitelist && typeof messageWhitelist === 'function' && messageWhitelist(msgType);
235+
if (messageWhitelist && !(whitelistedArray || whitelistedMask || whitelistedFn)) {
236+
streamBuffer = streamBuffer.slice(preamblePos+6+length+2);
237+
return null;
238+
}
239+
206240
// Get full payload
207241
// First, check payload length + CRC
208242
if (preamblePos+8+length > streamBuffer.length) {
@@ -239,27 +273,30 @@ module.exports = {
239273
return;
240274
}
241275
var pair = getFramedMessage();
242-
var framedMessage = pair[0];
243-
var fullBuffer = pair[1];
244276

245-
// If there is data left to process after a successful parse, process again
246-
if (streamBuffer.length > 0) {
247-
setTimeout(function () {
248-
processData(new Buffer(0));
249-
}, 0);
277+
// pair may have been filtered and not fully decoded
278+
if (pair === null) {
279+
return;
250280
}
251281

282+
var framedMessage = pair[0];
283+
var fullBuffer = pair[1];
284+
252285
callback(null, framedMessage, fullBuffer);
253286
} catch (e) {
254-
// If the buffer was corrupt but there's more in the stream, try again immediately
255-
if (e instanceof BufferCorruptError && streamBuffer.length > 0) {
256-
setTimeout(function () {
257-
processData(new Buffer(0));
258-
}, 0);
287+
if (!(e instanceof BufferTooShortError || e instanceof BufferCorruptError)) {
288+
throw e;
259289
}
260290
} finally {
261291
offset = 0;
262292
stream.resume();
293+
294+
// If there's more in the stream, try again immediately
295+
if (streamBuffer.length > 0) {
296+
setTimeout(function () {
297+
processData(new Buffer(0));
298+
}, 0);
299+
}
263300
}
264301
};
265302

javascript/tests/test_dispatch.js

+96
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ var path = require('path');
1414
var assert = require('assert');
1515
var Readable = require('stream').Readable;
1616
var dispatch = require(path.resolve(__dirname, '../sbp/')).dispatch;
17+
var MsgPosLlh = require(path.resolve(__dirname, '../sbp/navigation')).MsgPosLlh;
18+
var MsgVelEcef = require(path.resolve(__dirname, '../sbp/navigation')).MsgVelEcef;
1719

1820
var framedMessage = [0x55, 0x02, 0x02, 0xcc, 0x04, 0x14, 0x70, 0x3d, 0xd0, 0x18, 0xcf, 0xef, 0xff, 0xff, 0xef, 0xe8, 0xff, 0xff, 0xf0, 0x18, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x43, 0x94];
1921
var corruptedMessageTooShort = [0x55, 0x02, 0x02, 0xcc, 0x04, 0x12, 0x70, 0x3d, 0xd0, 0x18, 0xcf, 0xef, 0xff, 0xff, 0xef, 0xe8, 0xff, 0xff, 0xf0, 0x18, 0x00, 0x00, 0x00, 0x00, 0x05, 0x00, 0x43, 0x94];
@@ -140,4 +142,98 @@ describe('dispatcher', function () {
140142
}
141143
});
142144
});
145+
146+
it('should whitelist messages properly - no whitelist', function (done) {
147+
var msgLlhPayload = new Buffer('VQEC9tciFC4nAPod4rrrtkJAE8szxBiLXsAfnaDoenNRQAAAAAAJAOyL', 'base64');
148+
var msgVelEcefPayload = new Buffer('VQQC9tcUFC4nANoLAACG9f//o/z//wAACQBQ7A==', 'base64');
149+
150+
var rs = new Readable();
151+
rs.push(msgLlhPayload);
152+
rs.push(msgVelEcefPayload);
153+
rs.push(null);
154+
155+
var callbacks = 0;
156+
var validMessages = 0;
157+
dispatch(rs, function (err, framedMessage) {
158+
if (framedMessage && framedMessage.fields && framedMessage.fields.tow) {
159+
validMessages++;
160+
}
161+
if ((++callbacks) === 2) {
162+
assert.equal(validMessages, 2);
163+
done();
164+
}
165+
});
166+
});
167+
168+
it('should whitelist messages properly - array whitelist', function (done) {
169+
var msgLlhPayload = new Buffer('VQEC9tciFC4nAPod4rrrtkJAE8szxBiLXsAfnaDoenNRQAAAAAAJAOyL', 'base64');
170+
var msgVelEcefPayload = new Buffer('VQQC9tcUFC4nANoLAACG9f//o/z//wAACQBQ7A==', 'base64');
171+
172+
var rs = new Readable();
173+
rs.push(msgVelEcefPayload);
174+
rs.push(msgLlhPayload);
175+
rs.push(null);
176+
177+
var callbacks = 0;
178+
var validMessages = 0;
179+
dispatch(rs, [MsgPosLlh.prototype.msg_type], function (err, framedMessage) {
180+
assert.equal(framedMessage.msg_type, MsgPosLlh.prototype.msg_type);
181+
if (framedMessage && framedMessage.fields && framedMessage.fields.tow) {
182+
validMessages++;
183+
}
184+
if ((++callbacks) === 1) {
185+
assert.equal(validMessages, 1);
186+
done();
187+
}
188+
});
189+
});
190+
191+
it('should whitelist messages properly - mask whitelist', function (done) {
192+
var msgLlhPayload = new Buffer('VQEC9tciFC4nAPod4rrrtkJAE8szxBiLXsAfnaDoenNRQAAAAAAJAOyL', 'base64');
193+
var msgVelEcefPayload = new Buffer('VQQC9tcUFC4nANoLAACG9f//o/z//wAACQBQ7A==', 'base64');
194+
195+
var rs = new Readable();
196+
rs.push(msgVelEcefPayload);
197+
rs.push(msgLlhPayload);
198+
rs.push(null);
199+
200+
var callbacks = 0;
201+
var validMessages = 0;
202+
dispatch(rs, ~MsgVelEcef.prototype.msg_type, function (err, framedMessage) {
203+
assert.equal(framedMessage.msg_type, MsgPosLlh.prototype.msg_type);
204+
if (framedMessage && framedMessage.fields && framedMessage.fields.tow) {
205+
validMessages++;
206+
}
207+
if ((++callbacks) === 1) {
208+
assert.equal(validMessages, 1);
209+
done();
210+
}
211+
});
212+
});
213+
214+
it('should whitelist messages properly - function whitelist', function (done) {
215+
var msgLlhPayload = new Buffer('VQEC9tciFC4nAPod4rrrtkJAE8szxBiLXsAfnaDoenNRQAAAAAAJAOyL', 'base64');
216+
var msgVelEcefPayload = new Buffer('VQQC9tcUFC4nANoLAACG9f//o/z//wAACQBQ7A==', 'base64');
217+
218+
var rs = new Readable();
219+
rs.push(msgVelEcefPayload);
220+
rs.push(msgLlhPayload);
221+
rs.push(null);
222+
223+
var callbacks = 0;
224+
var validMessages = 0;
225+
var whitelist = function (msgType) {
226+
return msgType === MsgVelEcef.prototype.msg_type;
227+
};
228+
dispatch(rs, whitelist, function (err, framedMessage) {
229+
assert.equal(framedMessage.msg_type, MsgVelEcef.prototype.msg_type);
230+
if (framedMessage && framedMessage.fields && framedMessage.fields.tow) {
231+
validMessages++;
232+
}
233+
if ((++callbacks) === 1) {
234+
assert.equal(validMessages, 1);
235+
done();
236+
}
237+
});
238+
});
143239
});

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"js-yaml": "^3.4.3"
1818
},
1919
"devDependencies": {
20+
"benchmark": "^2.1.1",
2021
"mocha": "^2.4.5",
2122
"request": "^2.74.0",
2223
"serialport": "^4.0.1"

0 commit comments

Comments
 (0)