diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 25fdc537edee..cdb0158f43ff 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -135,6 +135,11 @@ type LockedStores = { readonly zone?: Zone; }; +enum TaskType { + Encrypted = 'Encrypted', + Decrypted = 'Decrypted', +} + class MessageReceiverInner extends EventTarget { _onClose?: (ev: any) => Promise; @@ -162,7 +167,9 @@ class MessageReceiverInner extends EventTarget { password: string; - pendingQueue: PQueue; + encryptedQueue: PQueue; + + decryptedQueue: PQueue; retryCachedTimeout: any; @@ -227,9 +234,18 @@ class MessageReceiverInner extends EventTarget { : undefined; this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); - this.pendingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); + // All envelopes start in encryptedQueue and progress to decryptedQueue + this.encryptedQueue = new PQueue({ + concurrency: 1, + timeout: 1000 * 60 * 2, + }); + this.decryptedQueue = new PQueue({ + concurrency: 1, + timeout: 1000 * 60 * 2, + }); + this.cacheAddBatcher = createBatcher({ name: 'MessageReceiver.cacheAddBatcher', wait: 75, @@ -268,7 +284,7 @@ class MessageReceiverInner extends EventTarget { } // We always process our cache before processing a new websocket message - this.pendingQueue.add(async () => this.queueAllCached()); + this.encryptedQueue.add(async () => this.queueAllCached()); this.count = 0; if (this.hasConnected) { @@ -514,20 +530,21 @@ class MessageReceiverInner extends EventTarget { return messageAgeSec; } - async addToQueue(task: () => Promise): Promise { - this.count += 1; + async addToQueue(task: () => Promise, taskType: TaskType): Promise { + if (taskType === TaskType.Encrypted) { + this.count += 1; + } - const promise = this.pendingQueue.add(task); + const queue = + taskType === TaskType.Encrypted + ? this.encryptedQueue + : this.decryptedQueue; - const { count } = this; - - const update = () => { - this.updateProgress(count); - }; - - promise.then(update, update); - - return promise; + try { + return await queue.add(task); + } finally { + this.updateProgress(this.count); + } } hasEmptied(): boolean { @@ -549,7 +566,7 @@ class MessageReceiverInner extends EventTarget { this.maybeScheduleRetryTimeout(); }; - const waitForPendingQueue = async () => { + const waitForDecryptedQueue = async () => { window.log.info( "MessageReceiver: finished processing messages after 'empty', now waiting for application" ); @@ -558,8 +575,12 @@ class MessageReceiverInner extends EventTarget { this.appQueue.add(emitEmpty); }; + const waitForEncryptedQueue = async () => { + this.addToQueue(waitForDecryptedQueue, TaskType.Decrypted); + }; + const waitForIncomingQueue = () => { - this.addToQueue(waitForPendingQueue); + this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); // Note: this.count is used in addToQueue // Resetting count so everything from the websocket after this starts at zero @@ -575,10 +596,13 @@ class MessageReceiverInner extends EventTarget { } async drain() { - const waitForIncomingQueue = async () => + const waitForEncryptedQueue = async () => this.addToQueue(async () => { window.log.info('drained'); - }); + }, TaskType.Decrypted); + + const waitForIncomingQueue = async () => + this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); return this.incomingQueue.add(waitForIncomingQueue); } @@ -656,15 +680,13 @@ class MessageReceiverInner extends EventTarget { } else { throw new Error('Cached decrypted value was not a string!'); } - this.queueDecryptedEnvelope(envelope, payloadPlaintext); + + // Maintain invariant: encrypted queue => decrypted queue + this.addToQueue(async () => { + this.queueDecryptedEnvelope(envelope, payloadPlaintext); + }, TaskType.Encrypted); } else { - this.queueEnvelope( - { - sessionStore: new Sessions(), - identityKeyStore: new IdentityKeys(), - }, - envelope - ); + this.queueCachedEnvelope(envelope); } } catch (error) { window.log.error( @@ -713,7 +735,7 @@ class MessageReceiverInner extends EventTarget { if (this.isEmptied) { this.clearRetryTimeout(); this.retryCachedTimeout = setTimeout(() => { - this.pendingQueue.add(async () => this.queueAllCached()); + this.encryptedQueue.add(async () => this.queueAllCached()); }, RETRY_TIMEOUT); } } @@ -790,7 +812,7 @@ class MessageReceiverInner extends EventTarget { await Promise.all( items.map(async ({ data, envelope }) => { try { - const plaintext = await this.queueEnvelope( + const plaintext = await this.queueEncryptedEnvelope( { sessionStore, identityKeyStore, zone }, envelope ); @@ -924,7 +946,7 @@ class MessageReceiverInner extends EventTarget { task, `queueEncryptedEnvelope ${id}` ); - const promise = this.addToQueue(taskWithTimeout); + const promise = this.addToQueue(taskWithTimeout, TaskType.Decrypted); return promise.catch(error => { window.log.error( @@ -935,7 +957,7 @@ class MessageReceiverInner extends EventTarget { }); } - async queueEnvelope( + async queueEncryptedEnvelope( stores: LockedStores, envelope: EnvelopeClass ): Promise { @@ -945,14 +967,14 @@ class MessageReceiverInner extends EventTarget { const task = this.decryptEnvelope.bind(this, stores, envelope); const taskWithTimeout = window.textsecure.createTaskWithTimeout( task, - `queueEnvelope ${id}` + `queueEncryptedEnvelope ${id}` ); try { - return await this.addToQueue(taskWithTimeout); + return await this.addToQueue(taskWithTimeout, TaskType.Encrypted); } catch (error) { const args = [ - 'queueEnvelope error handling envelope', + 'queueEncryptedEnvelope error handling envelope', this.getEnvelopeId(envelope), ':', error && error.extra ? JSON.stringify(error.extra) : '', @@ -967,6 +989,22 @@ class MessageReceiverInner extends EventTarget { } } + async queueCachedEnvelope(envelope: EnvelopeClass): Promise { + const plaintext = await this.queueEncryptedEnvelope( + { + sessionStore: new Sessions(), + identityKeyStore: new IdentityKeys(), + }, + envelope + ); + + if (!plaintext) { + return; + } + + await this.queueDecryptedEnvelope(envelope, plaintext); + } + // Called after `decryptEnvelope` decrypted the message. async handleDecryptedEnvelope( envelope: EnvelopeClass,