Reset MessageReciever queue whenever possible
(like we do with the conversation queue already) FREEBIE
This commit is contained in:
parent
305bd6b3b8
commit
af2ce56c8d
2 changed files with 46 additions and 26 deletions
|
@ -38349,6 +38349,24 @@ MessageReceiver.prototype.extend({
|
||||||
return this.dispatchAndWait(ev);
|
return this.dispatchAndWait(ev);
|
||||||
}.bind(this)));
|
}.bind(this)));
|
||||||
},
|
},
|
||||||
|
addToQueue: function(task) {
|
||||||
|
var count = this.count += 1;
|
||||||
|
var current = this.pending = this.pending.then(task, task);
|
||||||
|
|
||||||
|
var cleanup = function() {
|
||||||
|
this.updateProgress(count);
|
||||||
|
// We want to clear out the promise chain whenever possible because it could
|
||||||
|
// lead to large memory usage over time:
|
||||||
|
// https://github.com/nodejs/node/issues/6673#issuecomment-244331609
|
||||||
|
if (this.pending === current) {
|
||||||
|
this.pending = Promise.resolve();
|
||||||
|
}
|
||||||
|
}.bind(this);
|
||||||
|
|
||||||
|
current.then(cleanup, cleanup);
|
||||||
|
|
||||||
|
return current;
|
||||||
|
},
|
||||||
onEmpty: function() {
|
onEmpty: function() {
|
||||||
var incoming = this.incoming;
|
var incoming = this.incoming;
|
||||||
this.incoming = [];
|
this.incoming = [];
|
||||||
|
@ -38362,7 +38380,7 @@ MessageReceiver.prototype.extend({
|
||||||
// resetting count to zero so everything queued after this starts over again
|
// resetting count to zero so everything queued after this starts over again
|
||||||
this.count = 0;
|
this.count = 0;
|
||||||
|
|
||||||
this.pending = this.pending.then(dispatchEmpty, dispatchEmpty);
|
this.addToQueue(dispatchEmpty);
|
||||||
}.bind(this);
|
}.bind(this);
|
||||||
|
|
||||||
Promise.all(incoming).then(scheduleDispatch, scheduleDispatch);
|
Promise.all(incoming).then(scheduleDispatch, scheduleDispatch);
|
||||||
|
@ -38460,34 +38478,26 @@ MessageReceiver.prototype.extend({
|
||||||
return textsecure.storage.unprocessed.remove(id);
|
return textsecure.storage.unprocessed.remove(id);
|
||||||
},
|
},
|
||||||
queueDecryptedEnvelope: function(envelope, plaintext) {
|
queueDecryptedEnvelope: function(envelope, plaintext) {
|
||||||
var count = this.count += 1;
|
|
||||||
var id = this.getEnvelopeId(envelope);
|
var id = this.getEnvelopeId(envelope);
|
||||||
console.log('queueing decrypted envelope', id);
|
console.log('queueing decrypted envelope', id);
|
||||||
|
|
||||||
var task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
|
var task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
|
||||||
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEncryptedEnvelope ' + id);
|
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEncryptedEnvelope ' + id);
|
||||||
|
var promise = this.addToQueue(taskWithTimeout);
|
||||||
|
|
||||||
this.pending = this.pending.then(taskWithTimeout, taskWithTimeout);
|
return promise.catch(function(error) {
|
||||||
|
|
||||||
return this.pending.then(function() {
|
|
||||||
this.updateProgress(count);
|
|
||||||
}.bind(this), function(error) {
|
|
||||||
console.log('queueDecryptedEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
console.log('queueDecryptedEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
queueEnvelope: function(envelope) {
|
queueEnvelope: function(envelope) {
|
||||||
var count = this.count += 1;
|
|
||||||
var id = this.getEnvelopeId(envelope);
|
var id = this.getEnvelopeId(envelope);
|
||||||
console.log('queueing envelope', id);
|
console.log('queueing envelope', id);
|
||||||
|
|
||||||
var task = this.handleEnvelope.bind(this, envelope);
|
var task = this.handleEnvelope.bind(this, envelope);
|
||||||
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEnvelope ' + id);
|
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEnvelope ' + id);
|
||||||
|
var promise = this.addToQueue(taskWithTimeout);
|
||||||
|
|
||||||
this.pending = this.pending.then(taskWithTimeout, taskWithTimeout);
|
return promise.catch(function(error) {
|
||||||
|
|
||||||
return this.pending.then(function() {
|
|
||||||
this.updateProgress(count);
|
|
||||||
}.bind(this), function(error) {
|
|
||||||
console.log('queueEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
console.log('queueEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
|
@ -107,6 +107,24 @@ MessageReceiver.prototype.extend({
|
||||||
return this.dispatchAndWait(ev);
|
return this.dispatchAndWait(ev);
|
||||||
}.bind(this)));
|
}.bind(this)));
|
||||||
},
|
},
|
||||||
|
addToQueue: function(task) {
|
||||||
|
var count = this.count += 1;
|
||||||
|
var current = this.pending = this.pending.then(task, task);
|
||||||
|
|
||||||
|
var cleanup = function() {
|
||||||
|
this.updateProgress(count);
|
||||||
|
// We want to clear out the promise chain whenever possible because it could
|
||||||
|
// lead to large memory usage over time:
|
||||||
|
// https://github.com/nodejs/node/issues/6673#issuecomment-244331609
|
||||||
|
if (this.pending === current) {
|
||||||
|
this.pending = Promise.resolve();
|
||||||
|
}
|
||||||
|
}.bind(this);
|
||||||
|
|
||||||
|
current.then(cleanup, cleanup);
|
||||||
|
|
||||||
|
return current;
|
||||||
|
},
|
||||||
onEmpty: function() {
|
onEmpty: function() {
|
||||||
var incoming = this.incoming;
|
var incoming = this.incoming;
|
||||||
this.incoming = [];
|
this.incoming = [];
|
||||||
|
@ -120,7 +138,7 @@ MessageReceiver.prototype.extend({
|
||||||
// resetting count to zero so everything queued after this starts over again
|
// resetting count to zero so everything queued after this starts over again
|
||||||
this.count = 0;
|
this.count = 0;
|
||||||
|
|
||||||
this.pending = this.pending.then(dispatchEmpty, dispatchEmpty);
|
this.addToQueue(dispatchEmpty);
|
||||||
}.bind(this);
|
}.bind(this);
|
||||||
|
|
||||||
Promise.all(incoming).then(scheduleDispatch, scheduleDispatch);
|
Promise.all(incoming).then(scheduleDispatch, scheduleDispatch);
|
||||||
|
@ -218,34 +236,26 @@ MessageReceiver.prototype.extend({
|
||||||
return textsecure.storage.unprocessed.remove(id);
|
return textsecure.storage.unprocessed.remove(id);
|
||||||
},
|
},
|
||||||
queueDecryptedEnvelope: function(envelope, plaintext) {
|
queueDecryptedEnvelope: function(envelope, plaintext) {
|
||||||
var count = this.count += 1;
|
|
||||||
var id = this.getEnvelopeId(envelope);
|
var id = this.getEnvelopeId(envelope);
|
||||||
console.log('queueing decrypted envelope', id);
|
console.log('queueing decrypted envelope', id);
|
||||||
|
|
||||||
var task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
|
var task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
|
||||||
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEncryptedEnvelope ' + id);
|
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEncryptedEnvelope ' + id);
|
||||||
|
var promise = this.addToQueue(taskWithTimeout);
|
||||||
|
|
||||||
this.pending = this.pending.then(taskWithTimeout, taskWithTimeout);
|
return promise.catch(function(error) {
|
||||||
|
|
||||||
return this.pending.then(function() {
|
|
||||||
this.updateProgress(count);
|
|
||||||
}.bind(this), function(error) {
|
|
||||||
console.log('queueDecryptedEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
console.log('queueDecryptedEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
queueEnvelope: function(envelope) {
|
queueEnvelope: function(envelope) {
|
||||||
var count = this.count += 1;
|
|
||||||
var id = this.getEnvelopeId(envelope);
|
var id = this.getEnvelopeId(envelope);
|
||||||
console.log('queueing envelope', id);
|
console.log('queueing envelope', id);
|
||||||
|
|
||||||
var task = this.handleEnvelope.bind(this, envelope);
|
var task = this.handleEnvelope.bind(this, envelope);
|
||||||
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEnvelope ' + id);
|
var taskWithTimeout = textsecure.createTaskWithTimeout(task, 'queueEnvelope ' + id);
|
||||||
|
var promise = this.addToQueue(taskWithTimeout);
|
||||||
|
|
||||||
this.pending = this.pending.then(taskWithTimeout, taskWithTimeout);
|
return promise.catch(function(error) {
|
||||||
|
|
||||||
return this.pending.then(function() {
|
|
||||||
this.updateProgress(count);
|
|
||||||
}.bind(this), function(error) {
|
|
||||||
console.log('queueEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
console.log('queueEnvelope error handling envelope', id, ':', error && error.stack ? error.stack : error);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue