Further improve in-memory transactions in MessageReceiver

This commit is contained in:
Fedor Indutny 2021-05-20 17:24:53 -07:00 committed by Scott Nonnenberg
parent 7c07fdd589
commit 7b164fdf91

View file

@ -135,6 +135,11 @@ type LockedStores = {
readonly zone?: Zone;
};
enum TaskType {
Encrypted = 'Encrypted',
Decrypted = 'Decrypted',
}
class MessageReceiverInner extends EventTarget {
_onClose?: (ev: any) => Promise<void>;
@ -162,7 +167,9 @@ class MessageReceiverInner extends EventTarget {
password: string;
pendingQueue: PQueue;
encryptedQueue: PQueue;
decryptedQueue: PQueue;
retryCachedTimeout: any;
@ -227,9 +234,18 @@ class MessageReceiverInner extends EventTarget {
: undefined;
this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.pendingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
// All envelopes start in encryptedQueue and progress to decryptedQueue
this.encryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
});
this.decryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
});
this.cacheAddBatcher = createBatcher<CacheAddItemType>({
name: 'MessageReceiver.cacheAddBatcher',
wait: 75,
@ -268,7 +284,7 @@ class MessageReceiverInner extends EventTarget {
}
// We always process our cache before processing a new websocket message
this.pendingQueue.add(async () => this.queueAllCached());
this.encryptedQueue.add(async () => this.queueAllCached());
this.count = 0;
if (this.hasConnected) {
@ -514,20 +530,21 @@ class MessageReceiverInner extends EventTarget {
return messageAgeSec;
}
async addToQueue<T>(task: () => Promise<T>): Promise<T> {
this.count += 1;
async addToQueue<T>(task: () => Promise<T>, taskType: TaskType): Promise<T> {
if (taskType === TaskType.Encrypted) {
this.count += 1;
}
const promise = this.pendingQueue.add(task);
const queue =
taskType === TaskType.Encrypted
? this.encryptedQueue
: this.decryptedQueue;
const { count } = this;
const update = () => {
this.updateProgress(count);
};
promise.then(update, update);
return promise;
try {
return await queue.add(task);
} finally {
this.updateProgress(this.count);
}
}
hasEmptied(): boolean {
@ -549,7 +566,7 @@ class MessageReceiverInner extends EventTarget {
this.maybeScheduleRetryTimeout();
};
const waitForPendingQueue = async () => {
const waitForDecryptedQueue = async () => {
window.log.info(
"MessageReceiver: finished processing messages after 'empty', now waiting for application"
);
@ -558,8 +575,12 @@ class MessageReceiverInner extends EventTarget {
this.appQueue.add(emitEmpty);
};
const waitForEncryptedQueue = async () => {
this.addToQueue(waitForDecryptedQueue, TaskType.Decrypted);
};
const waitForIncomingQueue = () => {
this.addToQueue(waitForPendingQueue);
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
// Note: this.count is used in addToQueue
// Resetting count so everything from the websocket after this starts at zero
@ -575,10 +596,13 @@ class MessageReceiverInner extends EventTarget {
}
async drain() {
const waitForIncomingQueue = async () =>
const waitForEncryptedQueue = async () =>
this.addToQueue(async () => {
window.log.info('drained');
});
}, TaskType.Decrypted);
const waitForIncomingQueue = async () =>
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
return this.incomingQueue.add(waitForIncomingQueue);
}
@ -656,15 +680,13 @@ class MessageReceiverInner extends EventTarget {
} else {
throw new Error('Cached decrypted value was not a string!');
}
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
// Maintain invariant: encrypted queue => decrypted queue
this.addToQueue(async () => {
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
}, TaskType.Encrypted);
} else {
this.queueEnvelope(
{
sessionStore: new Sessions(),
identityKeyStore: new IdentityKeys(),
},
envelope
);
this.queueCachedEnvelope(envelope);
}
} catch (error) {
window.log.error(
@ -713,7 +735,7 @@ class MessageReceiverInner extends EventTarget {
if (this.isEmptied) {
this.clearRetryTimeout();
this.retryCachedTimeout = setTimeout(() => {
this.pendingQueue.add(async () => this.queueAllCached());
this.encryptedQueue.add(async () => this.queueAllCached());
}, RETRY_TIMEOUT);
}
}
@ -790,7 +812,7 @@ class MessageReceiverInner extends EventTarget {
await Promise.all<void>(
items.map(async ({ data, envelope }) => {
try {
const plaintext = await this.queueEnvelope(
const plaintext = await this.queueEncryptedEnvelope(
{ sessionStore, identityKeyStore, zone },
envelope
);
@ -924,7 +946,7 @@ class MessageReceiverInner extends EventTarget {
task,
`queueEncryptedEnvelope ${id}`
);
const promise = this.addToQueue(taskWithTimeout);
const promise = this.addToQueue(taskWithTimeout, TaskType.Decrypted);
return promise.catch(error => {
window.log.error(
@ -935,7 +957,7 @@ class MessageReceiverInner extends EventTarget {
});
}
async queueEnvelope(
async queueEncryptedEnvelope(
stores: LockedStores,
envelope: EnvelopeClass
): Promise<ArrayBuffer | undefined> {
@ -945,14 +967,14 @@ class MessageReceiverInner extends EventTarget {
const task = this.decryptEnvelope.bind(this, stores, envelope);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
task,
`queueEnvelope ${id}`
`queueEncryptedEnvelope ${id}`
);
try {
return await this.addToQueue(taskWithTimeout);
return await this.addToQueue(taskWithTimeout, TaskType.Encrypted);
} catch (error) {
const args = [
'queueEnvelope error handling envelope',
'queueEncryptedEnvelope error handling envelope',
this.getEnvelopeId(envelope),
':',
error && error.extra ? JSON.stringify(error.extra) : '',
@ -967,6 +989,22 @@ class MessageReceiverInner extends EventTarget {
}
}
async queueCachedEnvelope(envelope: EnvelopeClass): Promise<void> {
const plaintext = await this.queueEncryptedEnvelope(
{
sessionStore: new Sessions(),
identityKeyStore: new IdentityKeys(),
},
envelope
);
if (!plaintext) {
return;
}
await this.queueDecryptedEnvelope(envelope, plaintext);
}
// Called after `decryptEnvelope` decrypted the message.
async handleDecryptedEnvelope(
envelope: EnvelopeClass,