From 2abc3310587660036e36365c53d580a9709fa673 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 17 May 2021 17:41:28 -0700 Subject: [PATCH] Fix in-memory transactions while accessing sessions --- ts/LibSignalStores.ts | 23 +++++- ts/SignalProtocolStore.ts | 124 ++++++++++++++++++++++++------- ts/background.ts | 25 ++++++- ts/textsecure/MessageReceiver.ts | 123 +++++++++--------------------- 4 files changed, 177 insertions(+), 118 deletions(-) diff --git a/ts/LibSignalStores.ts b/ts/LibSignalStores.ts index 5a401ad11ec8..eb39a101fc8e 100644 --- a/ts/LibSignalStores.ts +++ b/ts/LibSignalStores.ts @@ -39,16 +39,19 @@ function encodedNameFromAddress(address: ProtocolAddress): string { } export type SessionsOptions = { + readonly lock?: Lock; readonly transactionOnly?: boolean; }; export class Sessions extends SessionStore { - private readonly lock = new Lock(); + private readonly lock: Lock; private inTransaction = false; constructor(private readonly options: SessionsOptions = {}) { super(); + + this.lock = options.lock || new Lock(); } public async transaction(fn: () => Promise): Promise { @@ -109,7 +112,18 @@ export class Sessions extends SessionStore { } } +export type IdentityKeysOptions = { + readonly lock?: Lock; +}; + export class IdentityKeys extends IdentityKeyStore { + private readonly lock: Lock; + + constructor({ lock = new Lock() }: IdentityKeysOptions = {}) { + super(); + this.lock = lock; + } + async getIdentityKey(): Promise { const keyPair = await window.textsecure.storage.protocol.getIdentityKeyPair(); if (!keyPair) { @@ -144,9 +158,14 @@ export class IdentityKeys extends IdentityKeyStore { async saveIdentity(name: ProtocolAddress, key: PublicKey): Promise { const encodedName = encodedNameFromAddress(name); const publicKey = typedArrayToArrayBuffer(key.serialize()); + + // Pass `lock` to let `saveIdentity` archive sibling sessions when identity + // key changes. return window.textsecure.storage.protocol.saveIdentity( encodedName, - publicKey + publicKey, + false, + { lock: this.lock } ); } diff --git a/ts/SignalProtocolStore.ts b/ts/SignalProtocolStore.ts index 6c13d0b75a32..f721301efac7 100644 --- a/ts/SignalProtocolStore.ts +++ b/ts/SignalProtocolStore.ts @@ -112,6 +112,8 @@ type MapFields = | 'sessions' | 'signedPreKeys'; +type SessionResetsType = Record; + export type SessionTransactionOptions = { readonly lock?: Lock; }; @@ -964,36 +966,45 @@ export class SignalProtocolStore extends EventsMixin { }); } - async archiveSiblingSessions(encodedAddress: string): Promise { - return this.sessionTransaction('archiveSiblingSessions', async () => { - if (!this.sessions) { - throw new Error( - 'archiveSiblingSessions: this.sessions not yet cached!' + async archiveSiblingSessions( + encodedAddress: string, + { lock }: SessionTransactionOptions = {} + ): Promise { + return this.sessionTransaction( + 'archiveSiblingSessions', + async () => { + if (!this.sessions) { + throw new Error( + 'archiveSiblingSessions: this.sessions not yet cached!' + ); + } + + window.log.info( + 'archiveSiblingSessions: archiving sibling sessions for', + encodedAddress ); - } - window.log.info( - 'archiveSiblingSessions: archiving sibling sessions for', - encodedAddress - ); + const id = await normalizeEncodedAddress(encodedAddress); + const [identifier, deviceId] = window.textsecure.utils.unencodeNumber( + id + ); + const deviceIdNumber = parseInt(deviceId, 10); - const id = await normalizeEncodedAddress(encodedAddress); - const [identifier, deviceId] = window.textsecure.utils.unencodeNumber(id); - const deviceIdNumber = parseInt(deviceId, 10); + const allEntries = this._getAllSessions(); + const entries = allEntries.filter( + entry => + entry.fromDB.conversationId === identifier && + entry.fromDB.deviceId !== deviceIdNumber + ); - const allEntries = this._getAllSessions(); - const entries = allEntries.filter( - entry => - entry.fromDB.conversationId === identifier && - entry.fromDB.deviceId !== deviceIdNumber - ); - - await Promise.all( - entries.map(async entry => { - await this._archiveSession(entry); - }) - ); - }); + await Promise.all( + entries.map(async entry => { + await this._archiveSession(entry); + }) + ); + }, + lock + ); } async archiveAllSessions(identifier: string): Promise { @@ -1032,6 +1043,59 @@ export class SignalProtocolStore extends EventsMixin { }); } + async lightSessionReset(uuid: string, deviceId: number): Promise { + const id = `${uuid}.${deviceId}`; + + const sessionResets = window.storage.get( + 'sessionResets', + {} + ) as SessionResetsType; + + const lastReset = sessionResets[id]; + + const ONE_HOUR = 60 * 60 * 1000; + if (lastReset && isMoreRecentThan(lastReset, ONE_HOUR)) { + window.log.warn( + `lightSessionReset/${id}: Skipping session reset, last reset at ${lastReset}` + ); + return; + } + + sessionResets[id] = Date.now(); + window.storage.put('sessionResets', sessionResets); + + try { + // First, fetch this conversation + const conversationId = window.ConversationController.ensureContactIds({ + uuid, + }); + assert(conversationId, `lightSessionReset/${id}: missing conversationId`); + + const conversation = window.ConversationController.get(conversationId); + assert(conversation, `lightSessionReset/${id}: missing conversation`); + + window.log.warn(`lightSessionReset/${id}: Resetting session`); + + // Archive open session with this device + await this.archiveSession(id); + + // Send a null message with newly-created session + const sendOptions = await conversation.getSendOptions(); + await window.textsecure.messaging.sendNullMessage({ uuid }, sendOptions); + } catch (error) { + // If we failed to do the session reset, then we'll allow another attempt sooner + // than one hour from now. + delete sessionResets[id]; + window.storage.put('sessionResets', sessionResets); + + const errorString = error && error.stack ? error.stack : error; + window.log.error( + `lightSessionReset/${id}: Encountered error`, + errorString + ); + } + } + // Identity Keys getIdentityRecord(identifier: string): IdentityKeyType | undefined { @@ -1168,7 +1232,8 @@ export class SignalProtocolStore extends EventsMixin { async saveIdentity( encodedAddress: string, publicKey: ArrayBuffer, - nonblockingApproval = false + nonblockingApproval = false, + { lock }: SessionTransactionOptions = {} ): Promise { if (!this.identityKeys) { throw new Error('saveIdentity: this.identityKeys not yet cached!'); @@ -1241,7 +1306,10 @@ export class SignalProtocolStore extends EventsMixin { error && error.stack ? error.stack : error ); } - await this.archiveSiblingSessions(encodedAddress); + + // Pass the lock to facilitate transactional session use in + // MessageReceiver.ts + await this.archiveSiblingSessions(encodedAddress, { lock }); return true; } diff --git a/ts/background.ts b/ts/background.ts index f2419fe33cbe..1f2ff7a29af2 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -68,6 +68,9 @@ export async function startApp(): Promise { const profileKeyResponseQueue = new window.PQueue(); profileKeyResponseQueue.pause(); + const lightSessionResetQueue = new window.PQueue(); + lightSessionResetQueue.pause(); + window.Whisper.deliveryReceiptQueue = new window.PQueue({ concurrency: 1, timeout: 1000 * 60 * 2, @@ -1892,6 +1895,7 @@ export async function startApp(): Promise { // To avoid a flood of operations before we catch up, we pause some queues. profileKeyResponseQueue.pause(); + lightSessionResetQueue.pause(); window.Whisper.deliveryReceiptQueue.pause(); window.Whisper.Notifications.disable(); @@ -2222,6 +2226,7 @@ export async function startApp(): Promise { ); profileKeyResponseQueue.start(); + lightSessionResetQueue.start(); window.Whisper.deliveryReceiptQueue.start(); window.Whisper.Notifications.enable(); @@ -2294,6 +2299,7 @@ export async function startApp(): Promise { // very fast, and it looks like a network blip. But we need to suppress // notifications in these scenarios too. So we listen for 'reconnect' events. profileKeyResponseQueue.pause(); + lightSessionResetQueue.pause(); window.Whisper.deliveryReceiptQueue.pause(); window.Whisper.Notifications.disable(); } @@ -3266,13 +3272,28 @@ export async function startApp(): Promise { window.log.warn('background onError: Doing nothing with incoming error'); } - type LightSessionResetEventType = { + type LightSessionResetEventType = Event & { senderUuid: string; + senderDevice: number; }; function onLightSessionReset(event: LightSessionResetEventType) { + const { senderUuid, senderDevice } = event; + + if (event.confirm) { + event.confirm(); + } + + // Postpone sending light session resets until the queue is empty + lightSessionResetQueue.add(() => { + window.textsecure.storage.protocol.lightSessionReset( + senderUuid, + senderDevice + ); + }); + const conversationId = window.ConversationController.ensureContactIds({ - uuid: event.senderUuid, + uuid: senderUuid, }); if (!conversationId) { diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 427c02203ccf..b5d49fc0da97 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -38,8 +38,8 @@ import { SignedPreKeys, } from '../LibSignalStores'; import { BatcherType, createBatcher } from '../util/batcher'; -import { assert } from '../util/assert'; import { parseIntOrThrow } from '../util/parseIntOrThrow'; +import { Lock } from '../util/Lock'; import EventTarget from './EventTarget'; import { WebAPIType } from './WebAPI'; import utils from './Helpers'; @@ -128,6 +128,11 @@ type DecryptedEnvelope = { readonly envelope: EnvelopeClass; }; +type LockedStores = { + readonly sessionStore: Sessions; + readonly identityKeyStore: IdentityKeys; +}; + class MessageReceiverInner extends EventTarget { _onClose?: (ev: any) => Promise; @@ -647,7 +652,13 @@ class MessageReceiverInner extends EventTarget { } this.queueDecryptedEnvelope(envelope, payloadPlaintext); } else { - this.queueEnvelope(new Sessions(), envelope); + this.queueEnvelope( + { + sessionStore: new Sessions(), + identityKeyStore: new IdentityKeys(), + }, + envelope + ); } } catch (error) { window.log.error( @@ -751,8 +762,13 @@ class MessageReceiverInner extends EventTarget { const decrypted: Array = []; try { + const lock = new Lock(); const sessionStore = new Sessions({ transactionOnly: true, + lock, + }); + const identityKeyStore = new IdentityKeys({ + lock, }); const failed: Array = []; @@ -770,7 +786,7 @@ class MessageReceiverInner extends EventTarget { items.map(async ({ data, envelope }) => { try { const plaintext = await this.queueEnvelope( - sessionStore, + { sessionStore, identityKeyStore }, envelope ); if (plaintext) { @@ -912,13 +928,13 @@ class MessageReceiverInner extends EventTarget { } async queueEnvelope( - sessionStore: Sessions, + stores: LockedStores, envelope: EnvelopeClass ): Promise { const id = this.getEnvelopeId(envelope); window.log.info('queueing envelope', id); - const task = this.decryptEnvelope.bind(this, sessionStore, envelope); + const task = this.decryptEnvelope.bind(this, stores, envelope); const taskWithTimeout = window.textsecure.createTaskWithTimeout( task, `queueEnvelope ${id}` @@ -970,7 +986,7 @@ class MessageReceiverInner extends EventTarget { } async decryptEnvelope( - sessionStore: Sessions, + stores: LockedStores, envelope: EnvelopeClass ): Promise { if (this.stoppingProcessing) { @@ -983,10 +999,10 @@ class MessageReceiverInner extends EventTarget { } if (envelope.content) { - return this.decryptContentMessage(sessionStore, envelope); + return this.decryptContentMessage(stores, envelope); } if (envelope.legacyMessage) { - return this.decryptLegacyMessage(sessionStore, envelope); + return this.decryptLegacyMessage(stores, envelope); } this.removeFromCache(envelope); @@ -1036,7 +1052,7 @@ class MessageReceiverInner extends EventTarget { } async decrypt( - sessionStore: Sessions, + { sessionStore, identityKeyStore }: LockedStores, envelope: EnvelopeClass, ciphertext: ByteBufferClass ): Promise { @@ -1059,7 +1075,6 @@ class MessageReceiverInner extends EventTarget { throw new Error('MessageReceiver.decrypt: Failed to fetch local UUID'); } - const identityKeyStore = new IdentityKeys(); const preKeyStore = new PreKeys(); const signedPreKeyStore = new SignedPreKeys(); @@ -1311,7 +1326,10 @@ class MessageReceiverInner extends EventTarget { } if (uuid && deviceId) { - await this.maybeLightSessionReset(uuid, deviceId); + // It is safe (from deadlocks) to await this call because the session + // reset is going to be scheduled on a separate p-queue in + // ts/background.ts + await this.lightSessionReset(uuid, deviceId); } else { const envelopeId = this.getEnvelopeId(envelope); window.log.error( @@ -1350,74 +1368,11 @@ class MessageReceiverInner extends EventTarget { window.storage.put('sessionResets', sessionResets); } - async maybeLightSessionReset(uuid: string, deviceId: number): Promise { - const id = `${uuid}.${deviceId}`; - - try { - const sessionResets = window.storage.get( - 'sessionResets', - {} - ) as SessionResetsType; - const lastReset = sessionResets[id]; - - // We emit this event every time we encounter an error, not just when we reset the - // session. This is because a message might have been lost with every decryption - // failure. - const event = new Event('light-session-reset'); - event.senderUuid = uuid; - this.dispatchAndWait(event); - - if (lastReset && !this.isOverHourIntoPast(lastReset)) { - window.log.warn( - `maybeLightSessionReset/${id}: Skipping session reset, last reset at ${lastReset}` - ); - return; - } - - sessionResets[id] = Date.now(); - window.storage.put('sessionResets', sessionResets); - - await this.lightSessionReset(uuid, deviceId); - } catch (error) { - // If we failed to do the session reset, then we'll allow another attempt sooner - // than one hour from now. - const sessionResets = window.storage.get( - 'sessionResets', - {} - ) as SessionResetsType; - delete sessionResets[id]; - window.storage.put('sessionResets', sessionResets); - - const errorString = error && error.stack ? error.stack : error; - window.log.error( - `maybeLightSessionReset/${id}: Encountered error`, - errorString - ); - } - } - async lightSessionReset(uuid: string, deviceId: number): Promise { - const id = `${uuid}.${deviceId}`; - - // First, fetch this conversation - const conversationId = window.ConversationController.ensureContactIds({ - uuid, - }); - assert(conversationId, `lightSessionReset/${id}: missing conversationId`); - - const conversation = window.ConversationController.get(conversationId); - assert(conversation, `lightSessionReset/${id}: missing conversation`); - - window.log.warn(`lightSessionReset/${id}: Resetting session`); - - // Archive open session with this device - await window.textsecure.storage.protocol.archiveSession( - `${uuid}.${deviceId}` - ); - - // Send a null message with newly-created session - const sendOptions = await conversation.getSendOptions(); - await window.textsecure.messaging.sendNullMessage({ uuid }, sendOptions); + const event = new Event('light-session-reset'); + event.senderUuid = uuid; + event.senderDevice = deviceId; + await this.dispatchAndWait(event); } async handleSentMessage( @@ -1600,7 +1555,7 @@ class MessageReceiverInner extends EventTarget { } async decryptLegacyMessage( - sessionStore: Sessions, + stores: LockedStores, envelope: EnvelopeClass ): Promise { window.log.info( @@ -1608,7 +1563,7 @@ class MessageReceiverInner extends EventTarget { this.getEnvelopeId(envelope) ); const plaintext = await this.decrypt( - sessionStore, + stores, envelope, envelope.legacyMessage ); @@ -1629,18 +1584,14 @@ class MessageReceiverInner extends EventTarget { } async decryptContentMessage( - sessionStore: Sessions, + stores: LockedStores, envelope: EnvelopeClass ): Promise { window.log.info( 'MessageReceiver.decryptContentMessage', this.getEnvelopeId(envelope) ); - const plaintext = await this.decrypt( - sessionStore, - envelope, - envelope.content - ); + const plaintext = await this.decrypt(stores, envelope, envelope.content); if (!plaintext) { window.log.warn('decryptContentMessage: plaintext was falsey'); return undefined;