Two fixes for messages causing errors
* Queue delivery receipt sends, only start after we get 'empty' * Retry cached two minutes after empty, or any post-empty message
This commit is contained in:
parent
b5ebd034db
commit
6ac7f4ccf6
2 changed files with 50 additions and 19 deletions
|
@ -17,6 +17,10 @@
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const eventHandlerQueue = new window.PQueue({ concurrency: 1 });
|
const eventHandlerQueue = new window.PQueue({ concurrency: 1 });
|
||||||
|
const deliveryReceiptQueue = new window.PQueue({
|
||||||
|
concurrency: 1,
|
||||||
|
});
|
||||||
|
deliveryReceiptQueue.pause();
|
||||||
|
|
||||||
// Globally disable drag and drop
|
// Globally disable drag and drop
|
||||||
document.body.addEventListener(
|
document.body.addEventListener(
|
||||||
|
@ -824,6 +828,7 @@
|
||||||
serverTrustRoot: window.getServerTrustRoot(),
|
serverTrustRoot: window.getServerTrustRoot(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
deliveryReceiptQueue.pause(); // avoid flood of delivery receipts until we catch up
|
||||||
Whisper.Notifications.disable(); // avoid notification flood until empty
|
Whisper.Notifications.disable(); // avoid notification flood until empty
|
||||||
|
|
||||||
// initialize the socket and start listening for messages
|
// initialize the socket and start listening for messages
|
||||||
|
@ -996,6 +1001,7 @@
|
||||||
}
|
}
|
||||||
}, 500);
|
}, 500);
|
||||||
|
|
||||||
|
deliveryReceiptQueue.start();
|
||||||
Whisper.Notifications.enable();
|
Whisper.Notifications.enable();
|
||||||
}
|
}
|
||||||
function onReconnect() {
|
function onReconnect() {
|
||||||
|
@ -1003,6 +1009,7 @@
|
||||||
// scenarios where we're coming back from sleep, we can get offline/online events
|
// scenarios where we're coming back from sleep, we can get offline/online events
|
||||||
// very fast, and it looks like a network blip. But we need to suppress
|
// very fast, and it looks like a network blip. But we need to suppress
|
||||||
// notifications in these scenarios too. So we listen for 'reconnect' events.
|
// notifications in these scenarios too. So we listen for 'reconnect' events.
|
||||||
|
deliveryReceiptQueue.pause();
|
||||||
Whisper.Notifications.disable();
|
Whisper.Notifications.disable();
|
||||||
}
|
}
|
||||||
function onProgress(ev) {
|
function onProgress(ev) {
|
||||||
|
@ -1545,6 +1552,7 @@
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deliveryReceiptQueue.add(async () => {
|
||||||
try {
|
try {
|
||||||
const { wrap, sendOptions } = ConversationController.prepareForSend(
|
const { wrap, sendOptions } = ConversationController.prepareForSend(
|
||||||
data.source
|
data.source
|
||||||
|
@ -1564,6 +1572,7 @@
|
||||||
error && error.stack ? error.stack : error
|
error && error.stack ? error.stack : error
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
/* eslint-disable more/no-then */
|
/* eslint-disable more/no-then */
|
||||||
|
|
||||||
const WORKER_TIMEOUT = 60 * 1000; // one minute
|
const WORKER_TIMEOUT = 60 * 1000; // one minute
|
||||||
|
const RETRY_TIMEOUT = 2 * 60 * 1000;
|
||||||
|
|
||||||
const _utilWorker = new Worker('js/util_worker.js');
|
const _utilWorker = new Worker('js/util_worker.js');
|
||||||
const _jobs = Object.create(null);
|
const _jobs = Object.create(null);
|
||||||
|
@ -167,6 +168,7 @@ MessageReceiver.prototype.extend({
|
||||||
this.dispatchEvent(ev);
|
this.dispatchEvent(ev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.isEmptied = false;
|
||||||
this.hasConnected = true;
|
this.hasConnected = true;
|
||||||
|
|
||||||
if (this.socket && this.socket.readyState !== WebSocket.CLOSED) {
|
if (this.socket && this.socket.readyState !== WebSocket.CLOSED) {
|
||||||
|
@ -218,6 +220,8 @@ MessageReceiver.prototype.extend({
|
||||||
this.wsr.close(3000, 'called close');
|
this.wsr.close(3000, 'called close');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.clearRetryTimeout();
|
||||||
|
|
||||||
return this.drain();
|
return this.drain();
|
||||||
},
|
},
|
||||||
onopen() {
|
onopen() {
|
||||||
|
@ -308,6 +312,9 @@ MessageReceiver.prototype.extend({
|
||||||
await this.addToCache(envelope, plaintext);
|
await this.addToCache(envelope, plaintext);
|
||||||
request.respond(200, 'OK');
|
request.respond(200, 'OK');
|
||||||
this.queueEnvelope(envelope);
|
this.queueEnvelope(envelope);
|
||||||
|
|
||||||
|
this.clearRetryTimeout();
|
||||||
|
this.maybeScheduleRetryTimeout();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
request.respond(500, 'Failed to cache message');
|
request.respond(500, 'Failed to cache message');
|
||||||
window.log.error(
|
window.log.error(
|
||||||
|
@ -349,6 +356,8 @@ MessageReceiver.prototype.extend({
|
||||||
window.log.info("MessageReceiver: emitting 'empty' event");
|
window.log.info("MessageReceiver: emitting 'empty' event");
|
||||||
const ev = new Event('empty');
|
const ev = new Event('empty');
|
||||||
this.dispatchAndWait(ev);
|
this.dispatchAndWait(ev);
|
||||||
|
this.isEmptied = true;
|
||||||
|
this.maybeScheduleRetryTimeout();
|
||||||
};
|
};
|
||||||
|
|
||||||
const waitForPendingQueue = () => {
|
const waitForPendingQueue = () => {
|
||||||
|
@ -465,6 +474,19 @@ MessageReceiver.prototype.extend({
|
||||||
|
|
||||||
return envelope.id;
|
return envelope.id;
|
||||||
},
|
},
|
||||||
|
clearRetryTimeout() {
|
||||||
|
if (this.retryCachedTimeout) {
|
||||||
|
clearInterval(this.retryCachedTimeout);
|
||||||
|
this.retryCachedTimeout = null;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maybeScheduleRetryTimeout() {
|
||||||
|
if (this.isEmptied) {
|
||||||
|
this.retryCachedTimeout = setTimeout(() => {
|
||||||
|
this.pendingQueue.add(() => this.queueAllCached());
|
||||||
|
}, RETRY_TIMEOUT);
|
||||||
|
}
|
||||||
|
},
|
||||||
async getAllFromCache() {
|
async getAllFromCache() {
|
||||||
window.log.info('getAllFromCache');
|
window.log.info('getAllFromCache');
|
||||||
const count = await textsecure.storage.unprocessed.getCount();
|
const count = await textsecure.storage.unprocessed.getCount();
|
||||||
|
|
Loading…
Add table
Reference in a new issue