Improve queuing strategies in the app

This commit is contained in:
Scott Nonnenberg 2019-07-09 11:46:48 -07:00
parent 7b645011c2
commit cb2c691667
6 changed files with 85 additions and 131 deletions

View file

@ -736,23 +736,14 @@
}, },
queueJob(callback) { queueJob(callback) {
const previous = this.pending || Promise.resolve(); this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
const taskWithTimeout = textsecure.createTaskWithTimeout( const taskWithTimeout = textsecure.createTaskWithTimeout(
callback, callback,
`conversation ${this.idForLogging()}` `conversation ${this.idForLogging()}`
); );
this.pending = previous.then(taskWithTimeout, taskWithTimeout); return this.jobQueue.add(taskWithTimeout);
const current = this.pending;
current.then(() => {
if (this.pending === current) {
delete this.pending;
}
});
return current;
}, },
getRecipients() { getRecipients() {

View file

@ -327,10 +327,11 @@
}); });
}, },
queueTask(task) { queueTask(task) {
this.pendingQueue =
this.pendingQueue || new window.PQueue({ concurrency: 1 });
const taskWithTimeout = textsecure.createTaskWithTimeout(task); const taskWithTimeout = textsecure.createTaskWithTimeout(task);
this.pending = this.pending.then(taskWithTimeout, taskWithTimeout);
return this.pending; return this.pendingQueue.add(taskWithTimeout);
}, },
cleanSignedPreKeys() { cleanSignedPreKeys() {
const MINIMUM_KEYS = 3; const MINIMUM_KEYS = 3;

View file

@ -36486,14 +36486,10 @@ Internal.SessionLock = {};
var jobQueue = {}; var jobQueue = {};
Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJob) { Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJob) {
var runPrevious = jobQueue[number] || Promise.resolve(); jobQueue[number] = jobQueue[number] || new window.PQueue({ concurrency: 1 });
var runCurrent = jobQueue[number] = runPrevious.then(runJob, runJob); var queue = jobQueue[number];
runCurrent.then(function() {
if (jobQueue[number] === runCurrent) { return queue.add(runJob);
delete jobQueue[number];
}
});
return runCurrent;
}; };
})(); })();

View file

@ -134,10 +134,12 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
this.number = address.getName(); this.number = address.getName();
this.deviceId = address.getDeviceId(); this.deviceId = address.getDeviceId();
this.pending = Promise.resolve(); this.pendingQueue = new window.PQueue({ concurrency: 1 });
this.incomingQueue = new window.PQueue({ concurrency: 1 });
this.appQueue = new window.PQueue({ concurrency: 1 });
if (options.retryCached) { if (options.retryCached) {
this.pending = this.queueAllCached(); this.pendingQueue.add(() => this.queueAllCached());
} }
} }
@ -187,10 +189,6 @@ MessageReceiver.prototype.extend({
// Because sometimes the socket doesn't properly emit its close event // Because sometimes the socket doesn't properly emit its close event
this._onClose = this.onclose.bind(this); this._onClose = this.onclose.bind(this);
this.wsr.addEventListener('close', this._onClose); this.wsr.addEventListener('close', this._onClose);
// Ensures that an immediate 'empty' event from the websocket will fire only after
// all cached envelopes are processed.
this.incoming = [this.pending];
}, },
stopProcessing() { stopProcessing() {
window.log.info('MessageReceiver: stopProcessing requested'); window.log.info('MessageReceiver: stopProcessing requested');
@ -229,11 +227,7 @@ MessageReceiver.prototype.extend({
window.log.error('websocket error'); window.log.error('websocket error');
}, },
dispatchAndWait(event) { dispatchAndWait(event) {
const promise = this.appPromise || Promise.resolve(); this.appQueue.add(() => Promise.all(this.dispatchEvent(event)));
const appJobPromise = Promise.all(this.dispatchEvent(event));
const job = () => appJobPromise;
this.appPromise = promise.then(job, job);
return Promise.resolve(); return Promise.resolve();
}, },
@ -268,9 +262,6 @@ MessageReceiver.prototype.extend({
}); });
}, },
handleRequest(request) { handleRequest(request) {
this.incoming = this.incoming || [];
const lastPromise = _.last(this.incoming);
// We do the message decryption here, instead of in the ordered pending queue, // We do the message decryption here, instead of in the ordered pending queue,
// to avoid exposing the time it took us to process messages through the time-to-ack. // to avoid exposing the time it took us to process messages through the time-to-ack.
@ -279,31 +270,33 @@ MessageReceiver.prototype.extend({
request.respond(200, 'OK'); request.respond(200, 'OK');
if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') { if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') {
this.onEmpty(); this.incomingQueue.add(() => this.onEmpty());
} }
return; return;
} }
let promise; const job = async () => {
const headers = request.headers || []; let plaintext;
if (headers.includes('X-Signal-Key: true')) { const headers = request.headers || [];
promise = textsecure.crypto.decryptWebsocketMessage(
request.body,
this.signalingKey
);
} else {
promise = Promise.resolve(request.body.toArrayBuffer());
}
promise = promise if (headers.includes('X-Signal-Key: true')) {
.then(plaintext => { plaintext = await textsecure.crypto.decryptWebsocketMessage(
request.body,
this.signalingKey
);
} else {
plaintext = request.body.toArrayBuffer();
}
try {
const envelope = textsecure.protobuf.Envelope.decode(plaintext); const envelope = textsecure.protobuf.Envelope.decode(plaintext);
// After this point, decoding errors are not the server's // After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the // fault, and we should handle them gracefully and tell the
// user they received an invalid message // user they received an invalid message
if (this.isBlocked(envelope.source)) { if (this.isBlocked(envelope.source)) {
return request.respond(200, 'OK'); request.respond(200, 'OK');
return;
} }
envelope.id = envelope.serverGuid || window.getGuid(); envelope.id = envelope.serverGuid || window.getGuid();
@ -311,24 +304,18 @@ MessageReceiver.prototype.extend({
? envelope.serverTimestamp.toNumber() ? envelope.serverTimestamp.toNumber()
: null; : null;
return this.addToCache(envelope, plaintext).then( try {
async () => { await this.addToCache(envelope, plaintext);
request.respond(200, 'OK'); request.respond(200, 'OK');
this.queueEnvelope(envelope);
// To ensure that we queue in the same order we receive messages } catch (error) {
await lastPromise; request.respond(500, 'Failed to cache message');
this.queueEnvelope(envelope); window.log.error(
}, 'handleRequest error trying to add message to cache:',
error => { error && error.stack ? error.stack : error
request.respond(500, 'Failed to cache message'); );
window.log.error( }
'handleRequest error trying to add message to cache:', } catch (e) {
error && error.stack ? error.stack : error
);
}
);
})
.catch(e => {
request.respond(500, 'Bad encrypted websocket message'); request.respond(500, 'Bad encrypted websocket message');
window.log.error( window.log.error(
'Error handling incoming message:', 'Error handling incoming message:',
@ -336,75 +323,60 @@ MessageReceiver.prototype.extend({
); );
const ev = new Event('error'); const ev = new Event('error');
ev.error = e; ev.error = e;
return this.dispatchAndWait(ev); await this.dispatchAndWait(ev);
});
this.incoming.push(promise);
},
addToQueue(task) {
this.count += 1;
this.pending = this.pending.then(task, task);
const { count, pending } = this;
const cleanup = () => {
this.updateProgress(count);
// We want to clear out the promise chain whenever possible because it could
// lead to large memory usage over time:
// https://github.com/nodejs/node/issues/6673#issuecomment-244331609
if (this.pending === pending) {
this.pending = Promise.resolve();
} }
}; };
pending.then(cleanup, cleanup); this.incomingQueue.add(job);
},
addToQueue(task) {
this.count += 1;
return pending; const promise = this.pendingQueue.add(task);
const { count } = this;
const update = () => {
this.updateProgress(count);
};
promise.then(update, update);
return promise;
}, },
onEmpty() { onEmpty() {
const { incoming } = this;
this.incoming = [];
const emitEmpty = () => { const emitEmpty = () => {
window.log.info("MessageReceiver: emitting 'empty' event"); window.log.info("MessageReceiver: emitting 'empty' event");
const ev = new Event('empty'); const ev = new Event('empty');
this.dispatchAndWait(ev); this.dispatchAndWait(ev);
}; };
const waitForApplication = async () => { const waitForPendingQueue = () => {
window.log.info( window.log.info(
"MessageReceiver: finished processing messages after 'empty', now waiting for application" "MessageReceiver: finished processing messages after 'empty', now waiting for application"
); );
const promise = this.appPromise || Promise.resolve();
this.appPromise = Promise.resolve();
// We don't await here because we don't this to gate future message processing // We don't await here because we don't want this to gate future message processing
promise.then(emitEmpty, emitEmpty); this.appQueue.add(emitEmpty);
}; };
const waitForEmptyQueue = () => { const waitForIncomingQueue = () => {
// resetting count to zero so everything queued after this starts over again this.addToQueue(waitForPendingQueue);
// Note: this.count is used in addToQueue
// Resetting count so everything from the websocket after this starts at zero
this.count = 0; this.count = 0;
this.addToQueue(waitForApplication);
}; };
// We first wait for all recently-received messages (this.incoming) to be queued, this.incomingQueue.add(waitForIncomingQueue);
// then we queue a task to wait for the application to finish its processing, then
// finally we emit the 'empty' event to the queue.
Promise.all(incoming).then(waitForEmptyQueue, waitForEmptyQueue);
}, },
drain() { drain() {
const { incoming } = this; const waitForIncomingQueue = () =>
this.incoming = [];
const queueDispatch = () =>
this.addToQueue(() => { this.addToQueue(() => {
window.log.info('drained'); window.log.info('drained');
}); });
// This promise will resolve when there are no more messages to be processed. return this.incomingQueue.add(waitForIncomingQueue);
return Promise.all(incoming).then(queueDispatch, queueDispatch);
}, },
updateProgress(count) { updateProgress(count) {
// count by 10s // count by 10s

View file

@ -249,23 +249,17 @@ MessageSender.prototype = {
}, },
queueJobForNumber(number, runJob) { queueJobForNumber(number, runJob) {
this.pendingMessages[number] =
this.pendingMessages[number] || new window.PQueue({ concurrency: 1 });
const queue = this.pendingMessages[number];
const taskWithTimeout = textsecure.createTaskWithTimeout( const taskWithTimeout = textsecure.createTaskWithTimeout(
runJob, runJob,
`queueJobForNumber ${number}` `queueJobForNumber ${number}`
); );
const runPrevious = this.pendingMessages[number] || Promise.resolve(); queue.add(taskWithTimeout);
this.pendingMessages[number] = runPrevious.then(
taskWithTimeout,
taskWithTimeout
);
const runCurrent = this.pendingMessages[number];
runCurrent.then(() => {
if (this.pendingMessages[number] === runCurrent) {
delete this.pendingMessages[number];
}
});
}, },
uploadAttachments(message) { uploadAttachments(message) {

View file

@ -1336,23 +1336,23 @@
"rule": "jQuery-wrap(", "rule": "jQuery-wrap(",
"path": "libtextsecure/message_receiver.js", "path": "libtextsecure/message_receiver.js",
"line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer());", "line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer());",
"lineNumber": 145,
"reasonCategory": "falseMatch",
"updated": "2018-09-19T18:13:29.628Z"
},
{
"rule": "jQuery-wrap(",
"path": "libtextsecure/message_receiver.js",
"line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'));",
"lineNumber": 147, "lineNumber": 147,
"reasonCategory": "falseMatch", "reasonCategory": "falseMatch",
"updated": "2018-09-19T18:13:29.628Z" "updated": "2018-09-19T18:13:29.628Z"
}, },
{
"rule": "jQuery-wrap(",
"path": "libtextsecure/message_receiver.js",
"line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'));",
"lineNumber": 149,
"reasonCategory": "falseMatch",
"updated": "2018-09-19T18:13:29.628Z"
},
{ {
"rule": "jQuery-wrap(", "rule": "jQuery-wrap(",
"path": "libtextsecure/message_receiver.js", "path": "libtextsecure/message_receiver.js",
"line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);", "line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);",
"lineNumber": 833, "lineNumber": 805,
"reasonCategory": "falseMatch", "reasonCategory": "falseMatch",
"updated": "2018-09-19T18:13:29.628Z" "updated": "2018-09-19T18:13:29.628Z"
}, },
@ -1360,7 +1360,7 @@
"rule": "jQuery-wrap(", "rule": "jQuery-wrap(",
"path": "libtextsecure/message_receiver.js", "path": "libtextsecure/message_receiver.js",
"line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);", "line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);",
"lineNumber": 858, "lineNumber": 830,
"reasonCategory": "falseMatch", "reasonCategory": "falseMatch",
"updated": "2018-09-19T18:13:29.628Z" "updated": "2018-09-19T18:13:29.628Z"
}, },