MessageReceiver: Restore queuing for in-order decryption

This commit is contained in:
Scott Nonnenberg 2021-03-17 14:20:05 -07:00 committed by Josh Perez
parent 0eec84391f
commit 7ffc01f0b0

View file

@ -206,7 +206,7 @@ class MessageReceiverInner extends EventTarget {
this.cacheAddBatcher = createBatcher<CacheAddItemType>({ this.cacheAddBatcher = createBatcher<CacheAddItemType>({
wait: 200, wait: 200,
maxSize: 30, maxSize: 30,
processBatch: this.cacheAndHandleBatch.bind(this), processBatch: this.cacheAndQueueBatch.bind(this),
}); });
this.cacheUpdateBatcher = createBatcher<CacheUpdateItemType>({ this.cacheUpdateBatcher = createBatcher<CacheUpdateItemType>({
wait: 500, wait: 500,
@ -240,7 +240,7 @@ class MessageReceiverInner extends EventTarget {
} }
// We always process our cache before processing a new websocket message // We always process our cache before processing a new websocket message
this.pendingQueue.add(async () => this.handleAllCached()); this.pendingQueue.add(async () => this.queueAllCached());
this.count = 0; this.count = 0;
if (this.hasConnected) { if (this.hasConnected) {
@ -440,7 +440,7 @@ class MessageReceiverInner extends EventTarget {
envelope.serverTimestamp envelope.serverTimestamp
); );
this.cacheAndHandle(envelope, plaintext, request); this.cacheAndQueue(envelope, plaintext, request);
this.processedCount += 1; this.processedCount += 1;
} catch (e) { } catch (e) {
request.respond(500, 'Bad encrypted websocket message'); request.respond(500, 'Bad encrypted websocket message');
@ -560,17 +560,17 @@ class MessageReceiverInner extends EventTarget {
this.dispatchEvent(ev); this.dispatchEvent(ev);
} }
async handleAllCached() { async queueAllCached() {
const items = await this.getAllFromCache(); const items = await this.getAllFromCache();
const max = items.length; const max = items.length;
for (let i = 0; i < max; i += 1) { for (let i = 0; i < max; i += 1) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.handleCachedEnvelope(items[i]); await this.queueCached(items[i]);
} }
} }
async handleCachedEnvelope(item: UnprocessedType) { async queueCached(item: UnprocessedType) {
window.log.info('MessageReceiver.handleCachedEnvelope', item.id); window.log.info('MessageReceiver.queueCached', item.id);
try { try {
let envelopePlaintext: ArrayBuffer; let envelopePlaintext: ArrayBuffer;
@ -584,7 +584,7 @@ class MessageReceiverInner extends EventTarget {
); );
} else { } else {
throw new Error( throw new Error(
'MessageReceiver.handleCachedEnvelope: item.envelope was malformed' 'MessageReceiver.queueCached: item.envelope was malformed'
); );
} }
@ -615,13 +615,13 @@ class MessageReceiverInner extends EventTarget {
} else { } else {
throw new Error('Cached decrypted value was not a string!'); throw new Error('Cached decrypted value was not a string!');
} }
this.handleDecryptedEnvelope(envelope, payloadPlaintext); this.queueDecryptedEnvelope(envelope, payloadPlaintext);
} else { } else {
this.handleEnvelope(envelope); this.queueEnvelope(envelope);
} }
} catch (error) { } catch (error) {
window.log.error( window.log.error(
'handleCachedEnvelope error handling item', 'queueCached error handling item',
item.id, item.id,
'removing it. Error:', 'removing it. Error:',
error && error.stack ? error.stack : error error && error.stack ? error.stack : error
@ -632,7 +632,7 @@ class MessageReceiverInner extends EventTarget {
await window.textsecure.storage.unprocessed.remove(id); await window.textsecure.storage.unprocessed.remove(id);
} catch (deleteError) { } catch (deleteError) {
window.log.error( window.log.error(
'handleCachedEnvelope error deleting item', 'queueCached error deleting item',
item.id, item.id,
'Error:', 'Error:',
deleteError && deleteError.stack ? deleteError.stack : deleteError deleteError && deleteError.stack ? deleteError.stack : deleteError
@ -666,7 +666,7 @@ class MessageReceiverInner extends EventTarget {
if (this.isEmptied) { if (this.isEmptied) {
this.clearRetryTimeout(); this.clearRetryTimeout();
this.retryCachedTimeout = setTimeout(() => { this.retryCachedTimeout = setTimeout(() => {
this.pendingQueue.add(async () => this.handleAllCached()); this.pendingQueue.add(async () => this.queueAllCached());
}, RETRY_TIMEOUT); }, RETRY_TIMEOUT);
} }
} }
@ -715,8 +715,8 @@ class MessageReceiverInner extends EventTarget {
); );
} }
async cacheAndHandleBatch(items: Array<CacheAddItemType>) { async cacheAndQueueBatch(items: Array<CacheAddItemType>) {
window.log.info('MessageReceiver.cacheAndHandleBatch', items.length); window.log.info('MessageReceiver.cacheAndQueueBatch', items.length);
const dataArray = items.map(item => item.data); const dataArray = items.map(item => item.data);
try { try {
await window.textsecure.storage.unprocessed.batchAdd(dataArray); await window.textsecure.storage.unprocessed.batchAdd(dataArray);
@ -725,16 +725,16 @@ class MessageReceiverInner extends EventTarget {
item.request.respond(200, 'OK'); item.request.respond(200, 'OK');
} catch (error) { } catch (error) {
window.log.error( window.log.error(
'cacheAndHandleBatch: Failed to send 200 to server; still queuing envelope' 'cacheAndQueueBatch: Failed to send 200 to server; still queuing envelope'
); );
} }
this.handleEnvelope(item.envelope); this.queueEnvelope(item.envelope);
}); });
this.maybeScheduleRetryTimeout(); this.maybeScheduleRetryTimeout();
} catch (error) { } catch (error) {
window.log.error( window.log.error(
'cacheAndHandleBatch error trying to add messages to cache:', 'cacheAndQueue error trying to add messages to cache:',
error && error.stack ? error.stack : error error && error.stack ? error.stack : error
); );
@ -744,7 +744,7 @@ class MessageReceiverInner extends EventTarget {
} }
} }
cacheAndHandle( cacheAndQueue(
envelope: EnvelopeClass, envelope: EnvelopeClass,
plaintext: ArrayBuffer, plaintext: ArrayBuffer,
request: IncomingWebSocketRequest request: IncomingWebSocketRequest
@ -790,71 +790,43 @@ class MessageReceiverInner extends EventTarget {
this.cacheRemoveBatcher.add(id); this.cacheRemoveBatcher.add(id);
} }
// Same as handleEnvelope, just without the decryption step. Necessary for handling async queueDecryptedEnvelope(
// messages which were successfully decrypted, but application logic didn't finish
// processing.
async handleDecryptedEnvelope(
envelope: EnvelopeClass, envelope: EnvelopeClass,
plaintext: ArrayBuffer plaintext: ArrayBuffer
): Promise<void> { ) {
const id = this.getEnvelopeId(envelope); const id = this.getEnvelopeId(envelope);
window.log.info('MessageReceiver.handleDecryptedEnvelope', id); window.log.info('queueing decrypted envelope', id);
try { const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
if (this.stoppingProcessing) { const taskWithTimeout = window.textsecure.createTaskWithTimeout(
return; task,
} `queueEncryptedEnvelope ${id}`
// No decryption is required for delivery receipts, so the decrypted field of );
// the Unprocessed model will never be set const promise = this.addToQueue(taskWithTimeout);
if (envelope.content) { return promise.catch(error => {
await this.innerHandleContentMessage(envelope, plaintext);
return;
}
if (envelope.legacyMessage) {
await this.innerHandleLegacyMessage(envelope, plaintext);
return;
}
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
} catch (error) {
window.log.error( window.log.error(
`handleDecryptedEnvelope error handling envelope ${id}:`, `queueDecryptedEnvelope error handling envelope ${id}:`,
error && error.extra ? JSON.stringify(error.extra) : '', error && error.extra ? JSON.stringify(error.extra) : '',
error && error.stack ? error.stack : error error && error.stack ? error.stack : error
); );
} });
} }
async handleEnvelope(envelope: EnvelopeClass) { async queueEnvelope(envelope: EnvelopeClass) {
const id = this.getEnvelopeId(envelope); const id = this.getEnvelopeId(envelope);
window.log.info('MessageReceiver.handleEnvelope', id); window.log.info('queueing envelope', id);
try { const task = this.handleEnvelope.bind(this, envelope);
if (this.stoppingProcessing) { const taskWithTimeout = window.textsecure.createTaskWithTimeout(
return Promise.resolve(); task,
} `queueEnvelope ${id}`
);
const promise = this.addToQueue(taskWithTimeout);
if (envelope.type === window.textsecure.protobuf.Envelope.Type.RECEIPT) { return promise.catch(error => {
return this.onDeliveryReceipt(envelope);
}
if (envelope.content) {
return this.handleContentMessage(envelope);
}
if (envelope.legacyMessage) {
return this.handleLegacyMessage(envelope);
}
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
} catch (error) {
const args = [ const args = [
'handleEnvelope error handling envelope', 'queueEnvelope error handling envelope',
this.getEnvelopeId(envelope), this.getEnvelopeId(envelope),
':', ':',
error && error.extra ? JSON.stringify(error.extra) : '', error && error.extra ? JSON.stringify(error.extra) : '',
@ -865,9 +837,54 @@ class MessageReceiverInner extends EventTarget {
} else { } else {
window.log.error(...args); window.log.error(...args);
} }
});
}
// Same as handleEnvelope, just without the decryption step. Necessary for handling
// messages which were successfully decrypted, but application logic didn't finish
// processing.
async handleDecryptedEnvelope(
envelope: EnvelopeClass,
plaintext: ArrayBuffer
): Promise<void> {
if (this.stoppingProcessing) {
return;
}
// No decryption is required for delivery receipts, so the decrypted field of
// the Unprocessed model will never be set
if (envelope.content) {
await this.innerHandleContentMessage(envelope, plaintext);
return;
}
if (envelope.legacyMessage) {
await this.innerHandleLegacyMessage(envelope, plaintext);
return;
} }
return undefined; this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
}
async handleEnvelope(envelope: EnvelopeClass) {
if (this.stoppingProcessing) {
return Promise.resolve();
}
if (envelope.type === window.textsecure.protobuf.Envelope.Type.RECEIPT) {
return this.onDeliveryReceipt(envelope);
}
if (envelope.content) {
return this.handleContentMessage(envelope);
}
if (envelope.legacyMessage) {
return this.handleLegacyMessage(envelope);
}
this.removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
} }
getStatus() { getStatus() {