Introduce in-memory transactions for sessions

This commit is contained in:
Fedor Indutny 2021-05-17 11:03:42 -07:00 committed by Scott Nonnenberg
parent 403b3c5fc6
commit 94d2c56ab9
12 changed files with 874 additions and 391 deletions

View file

@ -8,6 +8,7 @@
/* eslint-disable camelcase */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable max-classes-per-file */
/* eslint-disable no-restricted-syntax */
import { isNumber, map, omit, noop } from 'lodash';
import PQueue from 'p-queue';
@ -121,9 +122,10 @@ type CacheAddItemType = {
request: IncomingWebSocketRequest;
};
type CacheUpdateItemType = {
id: string;
data: Partial<UnprocessedType>;
type DecryptedEnvelope = {
readonly plaintext: ArrayBuffer;
readonly data: UnprocessedType;
readonly envelope: EnvelopeClass;
};
class MessageReceiverInner extends EventTarget {
@ -135,8 +137,6 @@ class MessageReceiverInner extends EventTarget {
cacheRemoveBatcher: BatcherType<string>;
cacheUpdateBatcher: BatcherType<CacheUpdateItemType>;
calledClose?: boolean;
count: number;
@ -233,12 +233,6 @@ class MessageReceiverInner extends EventTarget {
this.cacheAndQueueBatch(items);
},
});
this.cacheUpdateBatcher = createBatcher<CacheUpdateItemType>({
name: 'MessageReceiver.cacheUpdateBatcher',
wait: 75,
maxSize: 30,
processBatch: this.cacheUpdateBatch.bind(this),
});
this.cacheRemoveBatcher = createBatcher<string>({
name: 'MessageReceiver.cacheRemoveBatcher',
wait: 75,
@ -314,7 +308,6 @@ class MessageReceiverInner extends EventTarget {
unregisterBatchers() {
window.log.info('MessageReceiver: unregister batchers');
this.cacheAddBatcher.unregister();
this.cacheUpdateBatcher.unregister();
this.cacheRemoveBatcher.unregister();
}
@ -514,7 +507,7 @@ class MessageReceiverInner extends EventTarget {
return messageAgeSec;
}
async addToQueue(task: () => Promise<void>) {
async addToQueue<T>(task: () => Promise<T>): Promise<T> {
this.count += 1;
const promise = this.pendingQueue.add(task);
@ -538,7 +531,6 @@ class MessageReceiverInner extends EventTarget {
const emitEmpty = async () => {
await Promise.all([
this.cacheAddBatcher.flushAndWait(),
this.cacheUpdateBatcher.flushAndWait(),
this.cacheRemoveBatcher.flushAndWait(),
]);
@ -655,7 +647,7 @@ class MessageReceiverInner extends EventTarget {
}
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
} else {
this.queueEnvelope(envelope);
this.queueEnvelope(new Sessions(), envelope);
}
} catch (error) {
window.log.error(
@ -755,21 +747,88 @@ class MessageReceiverInner extends EventTarget {
async cacheAndQueueBatch(items: Array<CacheAddItemType>) {
window.log.info('MessageReceiver.cacheAndQueueBatch', items.length);
const dataArray = items.map(item => item.data);
const decrypted: Array<DecryptedEnvelope> = [];
try {
await window.textsecure.storage.unprocessed.batchAdd(dataArray);
items.forEach(item => {
const sessionStore = new Sessions({
transactionOnly: true,
});
const failed: Array<UnprocessedType> = [];
// Below we:
//
// 1. Enter session transaction
// 2. Decrypt all batched envelopes
// 3. Persist both decrypted envelopes and envelopes that we failed to
// decrypt (for future retries, see `attempts` field)
// 4. Leave session transaction and commit all pending session updates
// 5. Acknowledge envelopes (can't fail)
// 6. Finally process decrypted envelopes
await sessionStore.transaction(async () => {
await Promise.all<void>(
items.map(async ({ data, envelope }) => {
try {
const plaintext = await this.queueEnvelope(
sessionStore,
envelope
);
if (plaintext) {
decrypted.push({ plaintext, data, envelope });
}
} catch (error) {
failed.push(data);
window.log.error(
'cacheAndQueue error when processing the envelope',
error && error.stack ? error.stack : error
);
}
})
);
window.log.info(
'MessageReceiver.cacheAndQueueBatch storing ' +
`${decrypted.length} decrypted envelopes`
);
// Store both decrypted and failed unprocessed envelopes
const unprocesseds: Array<UnprocessedType> = decrypted.map(
({ envelope, data, plaintext }) => {
return {
...data,
// We have sucessfully decrypted the message so don't bother with
// storing the envelope.
envelope: '',
source: envelope.source,
sourceUuid: envelope.sourceUuid,
sourceDevice: envelope.sourceDevice,
serverTimestamp: envelope.serverTimestamp,
decrypted: MessageReceiverInner.arrayBufferToStringBase64(
plaintext
),
};
}
);
await sessionStore.addUnprocessed(unprocesseds.concat(failed));
});
window.log.info(
'MessageReceiver.cacheAndQueueBatch acknowledging receipt'
);
// Acknowledge all envelopes
for (const { request } of items) {
try {
item.request.respond(200, 'OK');
request.respond(200, 'OK');
} catch (error) {
window.log.error(
'cacheAndQueueBatch: Failed to send 200 to server; still queuing envelope'
);
}
this.queueEnvelope(item.envelope);
});
this.maybeScheduleRetryTimeout();
}
} catch (error) {
window.log.error(
'cacheAndQueue error trying to add messages to cache:',
@ -779,7 +838,25 @@ class MessageReceiverInner extends EventTarget {
items.forEach(item => {
item.request.respond(500, 'Failed to cache message');
});
return;
}
await Promise.all(
decrypted.map(async ({ envelope, plaintext }) => {
try {
await this.queueDecryptedEnvelope(envelope, plaintext);
} catch (error) {
window.log.error(
'cacheAndQueue error when processing decrypted envelope',
error && error.stack ? error.stack : error
);
}
})
);
window.log.info('MessageReceiver.cacheAndQueueBatch fully processed');
this.maybeScheduleRetryTimeout();
}
cacheAndQueue(
@ -802,23 +879,6 @@ class MessageReceiverInner extends EventTarget {
});
}
async cacheUpdateBatch(items: Array<Partial<UnprocessedType>>) {
window.log.info('MessageReceiver.cacheUpdateBatch', items.length);
await window.textsecure.storage.unprocessed.addDecryptedDataToList(items);
}
updateCache(envelope: EnvelopeClass, plaintext: ArrayBuffer) {
const { id } = envelope;
const data = {
source: envelope.source,
sourceUuid: envelope.sourceUuid,
sourceDevice: envelope.sourceDevice,
serverTimestamp: envelope.serverTimestamp,
decrypted: MessageReceiverInner.arrayBufferToStringBase64(plaintext),
};
this.cacheUpdateBatcher.add({ id, data });
}
async cacheRemoveBatch(items: Array<string>) {
await window.textsecure.storage.unprocessed.remove(items);
}
@ -851,18 +911,22 @@ class MessageReceiverInner extends EventTarget {
});
}
async queueEnvelope(envelope: EnvelopeClass) {
async queueEnvelope(
sessionStore: Sessions,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
const id = this.getEnvelopeId(envelope);
window.log.info('queueing envelope', id);
const task = this.handleEnvelope.bind(this, envelope);
const task = this.decryptEnvelope.bind(this, sessionStore, envelope);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
task,
`queueEnvelope ${id}`
);
const promise = this.addToQueue(taskWithTimeout);
return promise.catch(error => {
try {
return await this.addToQueue(taskWithTimeout);
} catch (error) {
const args = [
'queueEnvelope error handling envelope',
this.getEnvelopeId(envelope),
@ -875,12 +939,11 @@ class MessageReceiverInner extends EventTarget {
} else {
window.log.error(...args);
}
});
return undefined;
}
}
// Same as handleEnvelope, just without the decryption step. Necessary for handling
// messages which were successfully decrypted, but application logic didn't finish
// processing.
// Called after `decryptEnvelope` decrypted the message.
async handleDecryptedEnvelope(
envelope: EnvelopeClass,
plaintext: ArrayBuffer
@ -906,21 +969,26 @@ class MessageReceiverInner extends EventTarget {
throw new Error('Received message with no content and no legacyMessage');
}
async handleEnvelope(envelope: EnvelopeClass) {
async decryptEnvelope(
sessionStore: Sessions,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
if (this.stoppingProcessing) {
return Promise.resolve();
return undefined;
}
if (envelope.type === window.textsecure.protobuf.Envelope.Type.RECEIPT) {
return this.onDeliveryReceipt(envelope);
await this.onDeliveryReceipt(envelope);
return undefined;
}
if (envelope.content) {
return this.handleContentMessage(envelope);
return this.decryptContentMessage(sessionStore, envelope);
}
if (envelope.legacyMessage) {
return this.handleLegacyMessage(envelope);
return this.decryptLegacyMessage(sessionStore, envelope);
}
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
@ -935,7 +1003,7 @@ class MessageReceiverInner extends EventTarget {
return -1;
}
async onDeliveryReceipt(envelope: EnvelopeClass) {
async onDeliveryReceipt(envelope: EnvelopeClass): Promise<void> {
return new Promise((resolve, reject) => {
const ev = new Event('delivery');
ev.confirm = this.removeFromCache.bind(this, envelope);
@ -968,6 +1036,7 @@ class MessageReceiverInner extends EventTarget {
}
async decrypt(
sessionStore: Sessions,
envelope: EnvelopeClass,
ciphertext: ByteBufferClass
): Promise<ArrayBuffer | null> {
@ -990,7 +1059,6 @@ class MessageReceiverInner extends EventTarget {
throw new Error('MessageReceiver.decrypt: Failed to fetch local UUID');
}
const sessionStore = new Sessions();
const identityKeyStore = new IdentityKeys();
const preKeyStore = new PreKeys();
const signedPreKeyStore = new SignedPreKeys();
@ -1216,15 +1284,6 @@ class MessageReceiverInner extends EventTarget {
return null;
}
// Note: this is an out of band update; there are cases where the item in the
// cache has already been deleted by the time this runs. That's okay.
try {
this.updateCache(envelope, plaintext);
} catch (error) {
const errorString = error && error.stack ? error.stack : error;
window.log.error(`decrypt: updateCache failed: ${errorString}`);
}
return plaintext;
}
)
@ -1540,18 +1599,25 @@ class MessageReceiverInner extends EventTarget {
);
}
async handleLegacyMessage(envelope: EnvelopeClass) {
async decryptLegacyMessage(
sessionStore: Sessions,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
window.log.info(
'MessageReceiver.handleLegacyMessage',
'MessageReceiver.decryptLegacyMessage',
this.getEnvelopeId(envelope)
);
return this.decrypt(envelope, envelope.legacyMessage).then(plaintext => {
if (!plaintext) {
window.log.warn('handleLegacyMessage: plaintext was falsey');
return null;
}
return this.innerHandleLegacyMessage(envelope, plaintext);
});
const plaintext = await this.decrypt(
sessionStore,
envelope,
envelope.legacyMessage
);
if (!plaintext) {
window.log.warn('decryptLegacyMessage: plaintext was falsey');
return undefined;
}
return plaintext;
}
async innerHandleLegacyMessage(
@ -1562,18 +1628,25 @@ class MessageReceiverInner extends EventTarget {
return this.handleDataMessage(envelope, message);
}
async handleContentMessage(envelope: EnvelopeClass) {
async decryptContentMessage(
sessionStore: Sessions,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
window.log.info(
'MessageReceiver.handleContentMessage',
'MessageReceiver.decryptContentMessage',
this.getEnvelopeId(envelope)
);
return this.decrypt(envelope, envelope.content).then(plaintext => {
if (!plaintext) {
window.log.warn('handleContentMessage: plaintext was falsey');
return null;
}
return this.innerHandleContentMessage(envelope, plaintext);
});
const plaintext = await this.decrypt(
sessionStore,
envelope,
envelope.content
);
if (!plaintext) {
window.log.warn('decryptContentMessage: plaintext was falsey');
return undefined;
}
return plaintext;
}
async innerHandleContentMessage(