From 227f532ec2324bb4c6429f4f5f013d808a3f4230 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 24 May 2021 14:30:56 -0700 Subject: [PATCH] Fix processing of cached envelopes --- ts/background.ts | 1 + ts/sql/Server.ts | 49 +++++++++++++++++- ts/textsecure/MessageReceiver.ts | 85 +++++++++++++++----------------- 3 files changed, 87 insertions(+), 48 deletions(-) diff --git a/ts/background.ts b/ts/background.ts index 85bcc1018672..760ea605e69b 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -3120,6 +3120,7 @@ export async function startApp(): Promise { } if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) { + event.confirm(); return Promise.resolve(); } diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 8a379754b128..b785c35249ed 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -1790,6 +1790,50 @@ function updateToSchemaVersion30(currentVersion: number, db: Database) { console.log('updateToSchemaVersion30: success!'); } +function updateToSchemaVersion31(currentVersion: number, db: Database): void { + if (currentVersion >= 31) { + return; + } + console.log('updateToSchemaVersion10: starting...'); + db.transaction(() => { + db.exec(` + DROP INDEX unprocessed_id; + DROP INDEX unprocessed_timestamp; + ALTER TABLE unprocessed RENAME TO unprocessed_old; + + CREATE TABLE unprocessed( + id STRING PRIMARY KEY ASC, + timestamp INTEGER, + version INTEGER, + attempts INTEGER, + envelope TEXT, + decrypted TEXT, + source TEXT, + sourceDevice TEXT, + serverTimestamp INTEGER, + sourceUuid STRING + ); + + CREATE INDEX unprocessed_timestamp ON unprocessed ( + timestamp + ); + + INSERT OR REPLACE INTO unprocessed + (id, timestamp, version, attempts, envelope, decrypted, source, + sourceDevice, serverTimestamp, sourceUuid) + SELECT + id, timestamp, version, attempts, envelope, decrypted, source, + sourceDevice, serverTimestamp, sourceUuid + FROM unprocessed_old; + + DROP TABLE unprocessed_old; + `); + + db.pragma('user_version = 31'); + })(); + console.log('updateToSchemaVersion31: success!'); +} + const SCHEMA_VERSIONS = [ updateToSchemaVersion1, updateToSchemaVersion2, @@ -1821,6 +1865,7 @@ const SCHEMA_VERSIONS = [ updateToSchemaVersion28, updateToSchemaVersion29, updateToSchemaVersion30, + updateToSchemaVersion31, ]; function updateSchema(db: Database): void { @@ -2229,11 +2274,11 @@ async function commitSessionsAndUnprocessed({ db.transaction(() => { for (const item of sessions) { - createOrUpdateSession(item); + assertSync(createOrUpdateSessionSync(item)); } for (const item of unprocessed) { - saveUnprocessedSync(item); + assertSync(saveUnprocessedSync(item)); } })(); } diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 59ed9b38f6e0..60aeb42095e7 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -49,7 +49,6 @@ import WebSocketResource, { import Crypto from './Crypto'; import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto'; import { ContactBuffer, GroupBuffer } from './ContactsParser'; -import { assert } from '../util/assert'; import { isByteBufferEmpty } from '../util/isByteBufferEmpty'; import { @@ -120,7 +119,7 @@ declare global { type CacheAddItemType = { envelope: EnvelopeClass; data: UnprocessedType; - request: IncomingWebSocketRequest; + request: Pick; }; type DecryptedEnvelope = { @@ -145,7 +144,7 @@ class MessageReceiverInner extends EventTarget { appQueue: PQueue; - cacheAddBatcher: BatcherType; + decryptAndCacheBatcher: BatcherType; cacheRemoveBatcher: BatcherType; @@ -246,14 +245,14 @@ class MessageReceiverInner extends EventTarget { timeout: 1000 * 60 * 2, }); - this.cacheAddBatcher = createBatcher({ - name: 'MessageReceiver.cacheAddBatcher', + this.decryptAndCacheBatcher = createBatcher({ + name: 'MessageReceiver.decryptAndCacheBatcher', wait: 75, maxSize: 30, processBatch: (items: Array) => { // Not returning the promise here because we don't want to stall // the batch. - this.cacheAndQueueBatch(items); + this.decryptAndCacheBatch(items); }, }); this.cacheRemoveBatcher = createBatcher({ @@ -330,7 +329,7 @@ class MessageReceiverInner extends EventTarget { unregisterBatchers() { window.log.info('MessageReceiver: unregister batchers'); - this.cacheAddBatcher.unregister(); + this.decryptAndCacheBatcher.unregister(); this.cacheRemoveBatcher.unregister(); } @@ -484,7 +483,7 @@ class MessageReceiverInner extends EventTarget { envelope.serverTimestamp ); - this.cacheAndQueue(envelope, plaintext, request); + this.decryptAndCache(envelope, plaintext, request); this.processedCount += 1; } catch (e) { request.respond(500, 'Bad encrypted websocket message'); @@ -554,7 +553,7 @@ class MessageReceiverInner extends EventTarget { onEmpty() { const emitEmpty = async () => { await Promise.all([ - this.cacheAddBatcher.flushAndWait(), + this.decryptAndCacheBatcher.flushAndWait(), this.cacheRemoveBatcher.flushAndWait(), ]); @@ -588,7 +587,7 @@ class MessageReceiverInner extends EventTarget { }; const waitForCacheAddBatcher = async () => { - await this.cacheAddBatcher.onIdle(); + await this.decryptAndCacheBatcher.onIdle(); this.incomingQueue.add(waitForIncomingQueue); }; @@ -635,11 +634,7 @@ class MessageReceiverInner extends EventTarget { envelopePlaintext = MessageReceiverInner.stringToArrayBufferBase64( item.envelope ); - } else if (typeof item.envelope === 'string') { - assert( - item.envelope || item.decrypted, - 'MessageReceiver.queueCached: empty envelope without decrypted data' - ); + } else if (item.envelope && typeof item.envelope === 'string') { envelopePlaintext = MessageReceiverInner.stringToArrayBuffer( item.envelope ); @@ -686,7 +681,7 @@ class MessageReceiverInner extends EventTarget { this.queueDecryptedEnvelope(envelope, payloadPlaintext); }, TaskType.Encrypted); } else { - this.queueCachedEnvelope(envelope); + this.queueCachedEnvelope(item, envelope); } } catch (error) { window.log.error( @@ -784,14 +779,14 @@ class MessageReceiverInner extends EventTarget { ); } - async cacheAndQueueBatch(items: Array) { - window.log.info('MessageReceiver.cacheAndQueueBatch', items.length); + async decryptAndCacheBatch(items: Array) { + window.log.info('MessageReceiver.decryptAndCacheBatch', items.length); const decrypted: Array = []; const storageProtocol = window.textsecure.storage.protocol; try { - const zone = new Zone('cacheAndQueueBatch', { + const zone = new Zone('decryptAndCacheBatch', { pendingSessions: true, pendingUnprocessed: true, }); @@ -822,7 +817,7 @@ class MessageReceiverInner extends EventTarget { } catch (error) { failed.push(data); window.log.error( - 'cacheAndQueue error when processing the envelope', + 'decryptAndCache error when processing the envelope', error && error.stack ? error.stack : error ); } @@ -830,7 +825,7 @@ class MessageReceiverInner extends EventTarget { ); window.log.info( - 'MessageReceiver.cacheAndQueueBatch storing ' + + 'MessageReceiver.decryptAndCacheBatch storing ' + `${decrypted.length} decrypted envelopes` ); @@ -840,10 +835,6 @@ class MessageReceiverInner extends EventTarget { return { ...data, - // We have sucessfully decrypted the message so don't bother with - // storing the envelope. - envelope: '', - source: envelope.source, sourceUuid: envelope.sourceUuid, sourceDevice: envelope.sourceDevice, @@ -862,7 +853,7 @@ class MessageReceiverInner extends EventTarget { }); window.log.info( - 'MessageReceiver.cacheAndQueueBatch acknowledging receipt' + 'MessageReceiver.decryptAndCacheBatch acknowledging receipt' ); // Acknowledge all envelopes @@ -871,13 +862,13 @@ class MessageReceiverInner extends EventTarget { request.respond(200, 'OK'); } catch (error) { window.log.error( - 'cacheAndQueueBatch: Failed to send 200 to server; still queuing envelope' + 'decryptAndCacheBatch: Failed to send 200 to server; still queuing envelope' ); } } } catch (error) { window.log.error( - 'cacheAndQueue error trying to add messages to cache:', + 'decryptAndCache error trying to add messages to cache:', error && error.stack ? error.stack : error ); @@ -893,19 +884,19 @@ class MessageReceiverInner extends EventTarget { await this.queueDecryptedEnvelope(envelope, plaintext); } catch (error) { window.log.error( - 'cacheAndQueue error when processing decrypted envelope', + 'decryptAndCache error when processing decrypted envelope', error && error.stack ? error.stack : error ); } }) ); - window.log.info('MessageReceiver.cacheAndQueueBatch fully processed'); + window.log.info('MessageReceiver.decryptAndCacheBatch fully processed'); this.maybeScheduleRetryTimeout(); } - cacheAndQueue( + decryptAndCache( envelope: EnvelopeClass, plaintext: ArrayBuffer, request: IncomingWebSocketRequest @@ -918,7 +909,7 @@ class MessageReceiverInner extends EventTarget { timestamp: envelope.receivedAtCounter, attempts: 1, }; - this.cacheAddBatcher.add({ + this.decryptAndCacheBatcher.add({ request, envelope, data, @@ -944,7 +935,7 @@ class MessageReceiverInner extends EventTarget { const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); const taskWithTimeout = window.textsecure.createTaskWithTimeout( task, - `queueEncryptedEnvelope ${id}` + `queueDecryptedEnvelope ${id}` ); const promise = this.addToQueue(taskWithTimeout, TaskType.Decrypted); @@ -989,20 +980,22 @@ class MessageReceiverInner extends EventTarget { } } - async queueCachedEnvelope(envelope: EnvelopeClass): Promise { - const plaintext = await this.queueEncryptedEnvelope( - { - sessionStore: new Sessions(), - identityKeyStore: new IdentityKeys(), + async queueCachedEnvelope( + data: UnprocessedType, + envelope: EnvelopeClass + ): Promise { + this.decryptAndCacheBatcher.add({ + request: { + respond(code, status) { + window.log.info( + 'queueCachedEnvelope: fake response ' + + `with code ${code} and status ${status}` + ); + }, }, - envelope - ); - - if (!plaintext) { - return; - } - - await this.queueDecryptedEnvelope(envelope, plaintext); + envelope, + data, + }); } // Called after `decryptEnvelope` decrypted the message.