diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 525dc55b4d14..60545bf97895 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -206,7 +206,7 @@ class MessageReceiverInner extends EventTarget { this.cacheAddBatcher = createBatcher({ wait: 200, maxSize: 30, - processBatch: this.cacheAndHandleBatch.bind(this), + processBatch: this.cacheAndQueueBatch.bind(this), }); this.cacheUpdateBatcher = createBatcher({ wait: 500, @@ -240,7 +240,7 @@ class MessageReceiverInner extends EventTarget { } // We always process our cache before processing a new websocket message - this.pendingQueue.add(async () => this.handleAllCached()); + this.pendingQueue.add(async () => this.queueAllCached()); this.count = 0; if (this.hasConnected) { @@ -440,7 +440,7 @@ class MessageReceiverInner extends EventTarget { envelope.serverTimestamp ); - this.cacheAndHandle(envelope, plaintext, request); + this.cacheAndQueue(envelope, plaintext, request); this.processedCount += 1; } catch (e) { request.respond(500, 'Bad encrypted websocket message'); @@ -560,17 +560,17 @@ class MessageReceiverInner extends EventTarget { this.dispatchEvent(ev); } - async handleAllCached() { + async queueAllCached() { const items = await this.getAllFromCache(); const max = items.length; for (let i = 0; i < max; i += 1) { // eslint-disable-next-line no-await-in-loop - await this.handleCachedEnvelope(items[i]); + await this.queueCached(items[i]); } } - async handleCachedEnvelope(item: UnprocessedType) { - window.log.info('MessageReceiver.handleCachedEnvelope', item.id); + async queueCached(item: UnprocessedType) { + window.log.info('MessageReceiver.queueCached', item.id); try { let envelopePlaintext: ArrayBuffer; @@ -584,7 +584,7 @@ class MessageReceiverInner extends EventTarget { ); } else { throw new Error( - 'MessageReceiver.handleCachedEnvelope: item.envelope was malformed' + 'MessageReceiver.queueCached: item.envelope was malformed' ); } @@ -615,13 +615,13 @@ class MessageReceiverInner extends EventTarget { } else { throw new Error('Cached decrypted value was not a string!'); } - this.handleDecryptedEnvelope(envelope, payloadPlaintext); + this.queueDecryptedEnvelope(envelope, payloadPlaintext); } else { - this.handleEnvelope(envelope); + this.queueEnvelope(envelope); } } catch (error) { window.log.error( - 'handleCachedEnvelope error handling item', + 'queueCached error handling item', item.id, 'removing it. Error:', error && error.stack ? error.stack : error @@ -632,7 +632,7 @@ class MessageReceiverInner extends EventTarget { await window.textsecure.storage.unprocessed.remove(id); } catch (deleteError) { window.log.error( - 'handleCachedEnvelope error deleting item', + 'queueCached error deleting item', item.id, 'Error:', deleteError && deleteError.stack ? deleteError.stack : deleteError @@ -666,7 +666,7 @@ class MessageReceiverInner extends EventTarget { if (this.isEmptied) { this.clearRetryTimeout(); this.retryCachedTimeout = setTimeout(() => { - this.pendingQueue.add(async () => this.handleAllCached()); + this.pendingQueue.add(async () => this.queueAllCached()); }, RETRY_TIMEOUT); } } @@ -715,8 +715,8 @@ class MessageReceiverInner extends EventTarget { ); } - async cacheAndHandleBatch(items: Array) { - window.log.info('MessageReceiver.cacheAndHandleBatch', items.length); + async cacheAndQueueBatch(items: Array) { + window.log.info('MessageReceiver.cacheAndQueueBatch', items.length); const dataArray = items.map(item => item.data); try { await window.textsecure.storage.unprocessed.batchAdd(dataArray); @@ -725,16 +725,16 @@ class MessageReceiverInner extends EventTarget { item.request.respond(200, 'OK'); } catch (error) { window.log.error( - 'cacheAndHandleBatch: Failed to send 200 to server; still queuing envelope' + 'cacheAndQueueBatch: Failed to send 200 to server; still queuing envelope' ); } - this.handleEnvelope(item.envelope); + this.queueEnvelope(item.envelope); }); this.maybeScheduleRetryTimeout(); } catch (error) { window.log.error( - 'cacheAndHandleBatch error trying to add messages to cache:', + 'cacheAndQueue error trying to add messages to cache:', error && error.stack ? error.stack : error ); @@ -744,7 +744,7 @@ class MessageReceiverInner extends EventTarget { } } - cacheAndHandle( + cacheAndQueue( envelope: EnvelopeClass, plaintext: ArrayBuffer, request: IncomingWebSocketRequest @@ -790,71 +790,43 @@ class MessageReceiverInner extends EventTarget { this.cacheRemoveBatcher.add(id); } - // Same as handleEnvelope, just without the decryption step. Necessary for handling - // messages which were successfully decrypted, but application logic didn't finish - // processing. - async handleDecryptedEnvelope( + async queueDecryptedEnvelope( envelope: EnvelopeClass, plaintext: ArrayBuffer - ): Promise { + ) { const id = this.getEnvelopeId(envelope); - window.log.info('MessageReceiver.handleDecryptedEnvelope', id); + window.log.info('queueing decrypted envelope', id); - try { - if (this.stoppingProcessing) { - return; - } - // No decryption is required for delivery receipts, so the decrypted field of - // the Unprocessed model will never be set + const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); + const taskWithTimeout = window.textsecure.createTaskWithTimeout( + task, + `queueEncryptedEnvelope ${id}` + ); + const promise = this.addToQueue(taskWithTimeout); - if (envelope.content) { - await this.innerHandleContentMessage(envelope, plaintext); - - return; - } - if (envelope.legacyMessage) { - await this.innerHandleLegacyMessage(envelope, plaintext); - - return; - } - - this.removeFromCache(envelope); - throw new Error('Received message with no content and no legacyMessage'); - } catch (error) { + return promise.catch(error => { window.log.error( - `handleDecryptedEnvelope error handling envelope ${id}:`, + `queueDecryptedEnvelope error handling envelope ${id}:`, error && error.extra ? JSON.stringify(error.extra) : '', error && error.stack ? error.stack : error ); - } + }); } - async handleEnvelope(envelope: EnvelopeClass) { + async queueEnvelope(envelope: EnvelopeClass) { const id = this.getEnvelopeId(envelope); - window.log.info('MessageReceiver.handleEnvelope', id); + window.log.info('queueing envelope', id); - try { - if (this.stoppingProcessing) { - return Promise.resolve(); - } + const task = this.handleEnvelope.bind(this, envelope); + const taskWithTimeout = window.textsecure.createTaskWithTimeout( + task, + `queueEnvelope ${id}` + ); + const promise = this.addToQueue(taskWithTimeout); - if (envelope.type === window.textsecure.protobuf.Envelope.Type.RECEIPT) { - return this.onDeliveryReceipt(envelope); - } - - if (envelope.content) { - return this.handleContentMessage(envelope); - } - if (envelope.legacyMessage) { - return this.handleLegacyMessage(envelope); - } - - this.removeFromCache(envelope); - - throw new Error('Received message with no content and no legacyMessage'); - } catch (error) { + return promise.catch(error => { const args = [ - 'handleEnvelope error handling envelope', + 'queueEnvelope error handling envelope', this.getEnvelopeId(envelope), ':', error && error.extra ? JSON.stringify(error.extra) : '', @@ -865,9 +837,54 @@ class MessageReceiverInner extends EventTarget { } else { window.log.error(...args); } + }); + } + + // Same as handleEnvelope, just without the decryption step. Necessary for handling + // messages which were successfully decrypted, but application logic didn't finish + // processing. + async handleDecryptedEnvelope( + envelope: EnvelopeClass, + plaintext: ArrayBuffer + ): Promise { + if (this.stoppingProcessing) { + return; + } + // No decryption is required for delivery receipts, so the decrypted field of + // the Unprocessed model will never be set + + if (envelope.content) { + await this.innerHandleContentMessage(envelope, plaintext); + + return; + } + if (envelope.legacyMessage) { + await this.innerHandleLegacyMessage(envelope, plaintext); + + return; } - return undefined; + this.removeFromCache(envelope); + throw new Error('Received message with no content and no legacyMessage'); + } + + async handleEnvelope(envelope: EnvelopeClass) { + if (this.stoppingProcessing) { + return Promise.resolve(); + } + + if (envelope.type === window.textsecure.protobuf.Envelope.Type.RECEIPT) { + return this.onDeliveryReceipt(envelope); + } + + if (envelope.content) { + return this.handleContentMessage(envelope); + } + if (envelope.legacyMessage) { + return this.handleLegacyMessage(envelope); + } + this.removeFromCache(envelope); + throw new Error('Received message with no content and no legacyMessage'); } getStatus() {