Fix in-memory transactions while accessing sessions

This commit is contained in:
Fedor Indutny 2021-05-17 17:41:28 -07:00 committed by Scott Nonnenberg
parent 6ccf97b5d0
commit 2abc331058
4 changed files with 177 additions and 118 deletions

View file

@ -39,16 +39,19 @@ function encodedNameFromAddress(address: ProtocolAddress): string {
}
export type SessionsOptions = {
readonly lock?: Lock;
readonly transactionOnly?: boolean;
};
export class Sessions extends SessionStore {
private readonly lock = new Lock();
private readonly lock: Lock;
private inTransaction = false;
constructor(private readonly options: SessionsOptions = {}) {
super();
this.lock = options.lock || new Lock();
}
public async transaction<T>(fn: () => Promise<T>): Promise<T> {
@ -109,7 +112,18 @@ export class Sessions extends SessionStore {
}
}
export type IdentityKeysOptions = {
readonly lock?: Lock;
};
export class IdentityKeys extends IdentityKeyStore {
private readonly lock: Lock;
constructor({ lock = new Lock() }: IdentityKeysOptions = {}) {
super();
this.lock = lock;
}
async getIdentityKey(): Promise<PrivateKey> {
const keyPair = await window.textsecure.storage.protocol.getIdentityKeyPair();
if (!keyPair) {
@ -144,9 +158,14 @@ export class IdentityKeys extends IdentityKeyStore {
async saveIdentity(name: ProtocolAddress, key: PublicKey): Promise<boolean> {
const encodedName = encodedNameFromAddress(name);
const publicKey = typedArrayToArrayBuffer(key.serialize());
// Pass `lock` to let `saveIdentity` archive sibling sessions when identity
// key changes.
return window.textsecure.storage.protocol.saveIdentity(
encodedName,
publicKey
publicKey,
false,
{ lock: this.lock }
);
}

View file

@ -112,6 +112,8 @@ type MapFields =
| 'sessions'
| 'signedPreKeys';
type SessionResetsType = Record<string, number>;
export type SessionTransactionOptions = {
readonly lock?: Lock;
};
@ -964,36 +966,45 @@ export class SignalProtocolStore extends EventsMixin {
});
}
async archiveSiblingSessions(encodedAddress: string): Promise<void> {
return this.sessionTransaction('archiveSiblingSessions', async () => {
if (!this.sessions) {
throw new Error(
'archiveSiblingSessions: this.sessions not yet cached!'
async archiveSiblingSessions(
encodedAddress: string,
{ lock }: SessionTransactionOptions = {}
): Promise<void> {
return this.sessionTransaction(
'archiveSiblingSessions',
async () => {
if (!this.sessions) {
throw new Error(
'archiveSiblingSessions: this.sessions not yet cached!'
);
}
window.log.info(
'archiveSiblingSessions: archiving sibling sessions for',
encodedAddress
);
}
window.log.info(
'archiveSiblingSessions: archiving sibling sessions for',
encodedAddress
);
const id = await normalizeEncodedAddress(encodedAddress);
const [identifier, deviceId] = window.textsecure.utils.unencodeNumber(
id
);
const deviceIdNumber = parseInt(deviceId, 10);
const id = await normalizeEncodedAddress(encodedAddress);
const [identifier, deviceId] = window.textsecure.utils.unencodeNumber(id);
const deviceIdNumber = parseInt(deviceId, 10);
const allEntries = this._getAllSessions();
const entries = allEntries.filter(
entry =>
entry.fromDB.conversationId === identifier &&
entry.fromDB.deviceId !== deviceIdNumber
);
const allEntries = this._getAllSessions();
const entries = allEntries.filter(
entry =>
entry.fromDB.conversationId === identifier &&
entry.fromDB.deviceId !== deviceIdNumber
);
await Promise.all(
entries.map(async entry => {
await this._archiveSession(entry);
})
);
});
await Promise.all(
entries.map(async entry => {
await this._archiveSession(entry);
})
);
},
lock
);
}
async archiveAllSessions(identifier: string): Promise<void> {
@ -1032,6 +1043,59 @@ export class SignalProtocolStore extends EventsMixin {
});
}
async lightSessionReset(uuid: string, deviceId: number): Promise<void> {
const id = `${uuid}.${deviceId}`;
const sessionResets = window.storage.get(
'sessionResets',
{}
) as SessionResetsType;
const lastReset = sessionResets[id];
const ONE_HOUR = 60 * 60 * 1000;
if (lastReset && isMoreRecentThan(lastReset, ONE_HOUR)) {
window.log.warn(
`lightSessionReset/${id}: Skipping session reset, last reset at ${lastReset}`
);
return;
}
sessionResets[id] = Date.now();
window.storage.put('sessionResets', sessionResets);
try {
// First, fetch this conversation
const conversationId = window.ConversationController.ensureContactIds({
uuid,
});
assert(conversationId, `lightSessionReset/${id}: missing conversationId`);
const conversation = window.ConversationController.get(conversationId);
assert(conversation, `lightSessionReset/${id}: missing conversation`);
window.log.warn(`lightSessionReset/${id}: Resetting session`);
// Archive open session with this device
await this.archiveSession(id);
// Send a null message with newly-created session
const sendOptions = await conversation.getSendOptions();
await window.textsecure.messaging.sendNullMessage({ uuid }, sendOptions);
} catch (error) {
// If we failed to do the session reset, then we'll allow another attempt sooner
// than one hour from now.
delete sessionResets[id];
window.storage.put('sessionResets', sessionResets);
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`lightSessionReset/${id}: Encountered error`,
errorString
);
}
}
// Identity Keys
getIdentityRecord(identifier: string): IdentityKeyType | undefined {
@ -1168,7 +1232,8 @@ export class SignalProtocolStore extends EventsMixin {
async saveIdentity(
encodedAddress: string,
publicKey: ArrayBuffer,
nonblockingApproval = false
nonblockingApproval = false,
{ lock }: SessionTransactionOptions = {}
): Promise<boolean> {
if (!this.identityKeys) {
throw new Error('saveIdentity: this.identityKeys not yet cached!');
@ -1241,7 +1306,10 @@ export class SignalProtocolStore extends EventsMixin {
error && error.stack ? error.stack : error
);
}
await this.archiveSiblingSessions(encodedAddress);
// Pass the lock to facilitate transactional session use in
// MessageReceiver.ts
await this.archiveSiblingSessions(encodedAddress, { lock });
return true;
}

View file

@ -68,6 +68,9 @@ export async function startApp(): Promise<void> {
const profileKeyResponseQueue = new window.PQueue();
profileKeyResponseQueue.pause();
const lightSessionResetQueue = new window.PQueue();
lightSessionResetQueue.pause();
window.Whisper.deliveryReceiptQueue = new window.PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
@ -1892,6 +1895,7 @@ export async function startApp(): Promise<void> {
// To avoid a flood of operations before we catch up, we pause some queues.
profileKeyResponseQueue.pause();
lightSessionResetQueue.pause();
window.Whisper.deliveryReceiptQueue.pause();
window.Whisper.Notifications.disable();
@ -2222,6 +2226,7 @@ export async function startApp(): Promise<void> {
);
profileKeyResponseQueue.start();
lightSessionResetQueue.start();
window.Whisper.deliveryReceiptQueue.start();
window.Whisper.Notifications.enable();
@ -2294,6 +2299,7 @@ export async function startApp(): Promise<void> {
// very fast, and it looks like a network blip. But we need to suppress
// notifications in these scenarios too. So we listen for 'reconnect' events.
profileKeyResponseQueue.pause();
lightSessionResetQueue.pause();
window.Whisper.deliveryReceiptQueue.pause();
window.Whisper.Notifications.disable();
}
@ -3266,13 +3272,28 @@ export async function startApp(): Promise<void> {
window.log.warn('background onError: Doing nothing with incoming error');
}
type LightSessionResetEventType = {
type LightSessionResetEventType = Event & {
senderUuid: string;
senderDevice: number;
};
function onLightSessionReset(event: LightSessionResetEventType) {
const { senderUuid, senderDevice } = event;
if (event.confirm) {
event.confirm();
}
// Postpone sending light session resets until the queue is empty
lightSessionResetQueue.add(() => {
window.textsecure.storage.protocol.lightSessionReset(
senderUuid,
senderDevice
);
});
const conversationId = window.ConversationController.ensureContactIds({
uuid: event.senderUuid,
uuid: senderUuid,
});
if (!conversationId) {

View file

@ -38,8 +38,8 @@ import {
SignedPreKeys,
} from '../LibSignalStores';
import { BatcherType, createBatcher } from '../util/batcher';
import { assert } from '../util/assert';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { Lock } from '../util/Lock';
import EventTarget from './EventTarget';
import { WebAPIType } from './WebAPI';
import utils from './Helpers';
@ -128,6 +128,11 @@ type DecryptedEnvelope = {
readonly envelope: EnvelopeClass;
};
type LockedStores = {
readonly sessionStore: Sessions;
readonly identityKeyStore: IdentityKeys;
};
class MessageReceiverInner extends EventTarget {
_onClose?: (ev: any) => Promise<void>;
@ -647,7 +652,13 @@ class MessageReceiverInner extends EventTarget {
}
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
} else {
this.queueEnvelope(new Sessions(), envelope);
this.queueEnvelope(
{
sessionStore: new Sessions(),
identityKeyStore: new IdentityKeys(),
},
envelope
);
}
} catch (error) {
window.log.error(
@ -751,8 +762,13 @@ class MessageReceiverInner extends EventTarget {
const decrypted: Array<DecryptedEnvelope> = [];
try {
const lock = new Lock();
const sessionStore = new Sessions({
transactionOnly: true,
lock,
});
const identityKeyStore = new IdentityKeys({
lock,
});
const failed: Array<UnprocessedType> = [];
@ -770,7 +786,7 @@ class MessageReceiverInner extends EventTarget {
items.map(async ({ data, envelope }) => {
try {
const plaintext = await this.queueEnvelope(
sessionStore,
{ sessionStore, identityKeyStore },
envelope
);
if (plaintext) {
@ -912,13 +928,13 @@ class MessageReceiverInner extends EventTarget {
}
async queueEnvelope(
sessionStore: Sessions,
stores: LockedStores,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
const id = this.getEnvelopeId(envelope);
window.log.info('queueing envelope', id);
const task = this.decryptEnvelope.bind(this, sessionStore, envelope);
const task = this.decryptEnvelope.bind(this, stores, envelope);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
task,
`queueEnvelope ${id}`
@ -970,7 +986,7 @@ class MessageReceiverInner extends EventTarget {
}
async decryptEnvelope(
sessionStore: Sessions,
stores: LockedStores,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
if (this.stoppingProcessing) {
@ -983,10 +999,10 @@ class MessageReceiverInner extends EventTarget {
}
if (envelope.content) {
return this.decryptContentMessage(sessionStore, envelope);
return this.decryptContentMessage(stores, envelope);
}
if (envelope.legacyMessage) {
return this.decryptLegacyMessage(sessionStore, envelope);
return this.decryptLegacyMessage(stores, envelope);
}
this.removeFromCache(envelope);
@ -1036,7 +1052,7 @@ class MessageReceiverInner extends EventTarget {
}
async decrypt(
sessionStore: Sessions,
{ sessionStore, identityKeyStore }: LockedStores,
envelope: EnvelopeClass,
ciphertext: ByteBufferClass
): Promise<ArrayBuffer | null> {
@ -1059,7 +1075,6 @@ class MessageReceiverInner extends EventTarget {
throw new Error('MessageReceiver.decrypt: Failed to fetch local UUID');
}
const identityKeyStore = new IdentityKeys();
const preKeyStore = new PreKeys();
const signedPreKeyStore = new SignedPreKeys();
@ -1311,7 +1326,10 @@ class MessageReceiverInner extends EventTarget {
}
if (uuid && deviceId) {
await this.maybeLightSessionReset(uuid, deviceId);
// It is safe (from deadlocks) to await this call because the session
// reset is going to be scheduled on a separate p-queue in
// ts/background.ts
await this.lightSessionReset(uuid, deviceId);
} else {
const envelopeId = this.getEnvelopeId(envelope);
window.log.error(
@ -1350,74 +1368,11 @@ class MessageReceiverInner extends EventTarget {
window.storage.put('sessionResets', sessionResets);
}
async maybeLightSessionReset(uuid: string, deviceId: number): Promise<void> {
const id = `${uuid}.${deviceId}`;
try {
const sessionResets = window.storage.get(
'sessionResets',
{}
) as SessionResetsType;
const lastReset = sessionResets[id];
// We emit this event every time we encounter an error, not just when we reset the
// session. This is because a message might have been lost with every decryption
// failure.
const event = new Event('light-session-reset');
event.senderUuid = uuid;
this.dispatchAndWait(event);
if (lastReset && !this.isOverHourIntoPast(lastReset)) {
window.log.warn(
`maybeLightSessionReset/${id}: Skipping session reset, last reset at ${lastReset}`
);
return;
}
sessionResets[id] = Date.now();
window.storage.put('sessionResets', sessionResets);
await this.lightSessionReset(uuid, deviceId);
} catch (error) {
// If we failed to do the session reset, then we'll allow another attempt sooner
// than one hour from now.
const sessionResets = window.storage.get(
'sessionResets',
{}
) as SessionResetsType;
delete sessionResets[id];
window.storage.put('sessionResets', sessionResets);
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`maybeLightSessionReset/${id}: Encountered error`,
errorString
);
}
}
async lightSessionReset(uuid: string, deviceId: number): Promise<void> {
const id = `${uuid}.${deviceId}`;
// First, fetch this conversation
const conversationId = window.ConversationController.ensureContactIds({
uuid,
});
assert(conversationId, `lightSessionReset/${id}: missing conversationId`);
const conversation = window.ConversationController.get(conversationId);
assert(conversation, `lightSessionReset/${id}: missing conversation`);
window.log.warn(`lightSessionReset/${id}: Resetting session`);
// Archive open session with this device
await window.textsecure.storage.protocol.archiveSession(
`${uuid}.${deviceId}`
);
// Send a null message with newly-created session
const sendOptions = await conversation.getSendOptions();
await window.textsecure.messaging.sendNullMessage({ uuid }, sendOptions);
const event = new Event('light-session-reset');
event.senderUuid = uuid;
event.senderDevice = deviceId;
await this.dispatchAndWait(event);
}
async handleSentMessage(
@ -1600,7 +1555,7 @@ class MessageReceiverInner extends EventTarget {
}
async decryptLegacyMessage(
sessionStore: Sessions,
stores: LockedStores,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
window.log.info(
@ -1608,7 +1563,7 @@ class MessageReceiverInner extends EventTarget {
this.getEnvelopeId(envelope)
);
const plaintext = await this.decrypt(
sessionStore,
stores,
envelope,
envelope.legacyMessage
);
@ -1629,18 +1584,14 @@ class MessageReceiverInner extends EventTarget {
}
async decryptContentMessage(
sessionStore: Sessions,
stores: LockedStores,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
window.log.info(
'MessageReceiver.decryptContentMessage',
this.getEnvelopeId(envelope)
);
const plaintext = await this.decrypt(
sessionStore,
envelope,
envelope.content
);
const plaintext = await this.decrypt(stores, envelope, envelope.content);
if (!plaintext) {
window.log.warn('decryptContentMessage: plaintext was falsey');
return undefined;