Fix processing of cached envelopes

This commit is contained in:
Fedor Indutny 2021-05-24 14:30:56 -07:00 committed by GitHub
parent 25f4154cde
commit 227f532ec2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 48 deletions

View file

@ -3120,6 +3120,7 @@ export async function startApp(): Promise<void> {
}
if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
event.confirm();
return Promise.resolve();
}

View file

@ -1790,6 +1790,50 @@ function updateToSchemaVersion30(currentVersion: number, db: Database) {
console.log('updateToSchemaVersion30: success!');
}
function updateToSchemaVersion31(currentVersion: number, db: Database): void {
if (currentVersion >= 31) {
return;
}
console.log('updateToSchemaVersion10: starting...');
db.transaction(() => {
db.exec(`
DROP INDEX unprocessed_id;
DROP INDEX unprocessed_timestamp;
ALTER TABLE unprocessed RENAME TO unprocessed_old;
CREATE TABLE unprocessed(
id STRING PRIMARY KEY ASC,
timestamp INTEGER,
version INTEGER,
attempts INTEGER,
envelope TEXT,
decrypted TEXT,
source TEXT,
sourceDevice TEXT,
serverTimestamp INTEGER,
sourceUuid STRING
);
CREATE INDEX unprocessed_timestamp ON unprocessed (
timestamp
);
INSERT OR REPLACE INTO unprocessed
(id, timestamp, version, attempts, envelope, decrypted, source,
sourceDevice, serverTimestamp, sourceUuid)
SELECT
id, timestamp, version, attempts, envelope, decrypted, source,
sourceDevice, serverTimestamp, sourceUuid
FROM unprocessed_old;
DROP TABLE unprocessed_old;
`);
db.pragma('user_version = 31');
})();
console.log('updateToSchemaVersion31: success!');
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -1821,6 +1865,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion28,
updateToSchemaVersion29,
updateToSchemaVersion30,
updateToSchemaVersion31,
];
function updateSchema(db: Database): void {
@ -2229,11 +2274,11 @@ async function commitSessionsAndUnprocessed({
db.transaction(() => {
for (const item of sessions) {
createOrUpdateSession(item);
assertSync(createOrUpdateSessionSync(item));
}
for (const item of unprocessed) {
saveUnprocessedSync(item);
assertSync(saveUnprocessedSync(item));
}
})();
}

View file

@ -49,7 +49,6 @@ import WebSocketResource, {
import Crypto from './Crypto';
import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto';
import { ContactBuffer, GroupBuffer } from './ContactsParser';
import { assert } from '../util/assert';
import { isByteBufferEmpty } from '../util/isByteBufferEmpty';
import {
@ -120,7 +119,7 @@ declare global {
type CacheAddItemType = {
envelope: EnvelopeClass;
data: UnprocessedType;
request: IncomingWebSocketRequest;
request: Pick<IncomingWebSocketRequest, 'respond'>;
};
type DecryptedEnvelope = {
@ -145,7 +144,7 @@ class MessageReceiverInner extends EventTarget {
appQueue: PQueue;
cacheAddBatcher: BatcherType<CacheAddItemType>;
decryptAndCacheBatcher: BatcherType<CacheAddItemType>;
cacheRemoveBatcher: BatcherType<string>;
@ -246,14 +245,14 @@ class MessageReceiverInner extends EventTarget {
timeout: 1000 * 60 * 2,
});
this.cacheAddBatcher = createBatcher<CacheAddItemType>({
name: 'MessageReceiver.cacheAddBatcher',
this.decryptAndCacheBatcher = createBatcher<CacheAddItemType>({
name: 'MessageReceiver.decryptAndCacheBatcher',
wait: 75,
maxSize: 30,
processBatch: (items: Array<CacheAddItemType>) => {
// Not returning the promise here because we don't want to stall
// the batch.
this.cacheAndQueueBatch(items);
this.decryptAndCacheBatch(items);
},
});
this.cacheRemoveBatcher = createBatcher<string>({
@ -330,7 +329,7 @@ class MessageReceiverInner extends EventTarget {
unregisterBatchers() {
window.log.info('MessageReceiver: unregister batchers');
this.cacheAddBatcher.unregister();
this.decryptAndCacheBatcher.unregister();
this.cacheRemoveBatcher.unregister();
}
@ -484,7 +483,7 @@ class MessageReceiverInner extends EventTarget {
envelope.serverTimestamp
);
this.cacheAndQueue(envelope, plaintext, request);
this.decryptAndCache(envelope, plaintext, request);
this.processedCount += 1;
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
@ -554,7 +553,7 @@ class MessageReceiverInner extends EventTarget {
onEmpty() {
const emitEmpty = async () => {
await Promise.all([
this.cacheAddBatcher.flushAndWait(),
this.decryptAndCacheBatcher.flushAndWait(),
this.cacheRemoveBatcher.flushAndWait(),
]);
@ -588,7 +587,7 @@ class MessageReceiverInner extends EventTarget {
};
const waitForCacheAddBatcher = async () => {
await this.cacheAddBatcher.onIdle();
await this.decryptAndCacheBatcher.onIdle();
this.incomingQueue.add(waitForIncomingQueue);
};
@ -635,11 +634,7 @@ class MessageReceiverInner extends EventTarget {
envelopePlaintext = MessageReceiverInner.stringToArrayBufferBase64(
item.envelope
);
} else if (typeof item.envelope === 'string') {
assert(
item.envelope || item.decrypted,
'MessageReceiver.queueCached: empty envelope without decrypted data'
);
} else if (item.envelope && typeof item.envelope === 'string') {
envelopePlaintext = MessageReceiverInner.stringToArrayBuffer(
item.envelope
);
@ -686,7 +681,7 @@ class MessageReceiverInner extends EventTarget {
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
}, TaskType.Encrypted);
} else {
this.queueCachedEnvelope(envelope);
this.queueCachedEnvelope(item, envelope);
}
} catch (error) {
window.log.error(
@ -784,14 +779,14 @@ class MessageReceiverInner extends EventTarget {
);
}
async cacheAndQueueBatch(items: Array<CacheAddItemType>) {
window.log.info('MessageReceiver.cacheAndQueueBatch', items.length);
async decryptAndCacheBatch(items: Array<CacheAddItemType>) {
window.log.info('MessageReceiver.decryptAndCacheBatch', items.length);
const decrypted: Array<DecryptedEnvelope> = [];
const storageProtocol = window.textsecure.storage.protocol;
try {
const zone = new Zone('cacheAndQueueBatch', {
const zone = new Zone('decryptAndCacheBatch', {
pendingSessions: true,
pendingUnprocessed: true,
});
@ -822,7 +817,7 @@ class MessageReceiverInner extends EventTarget {
} catch (error) {
failed.push(data);
window.log.error(
'cacheAndQueue error when processing the envelope',
'decryptAndCache error when processing the envelope',
error && error.stack ? error.stack : error
);
}
@ -830,7 +825,7 @@ class MessageReceiverInner extends EventTarget {
);
window.log.info(
'MessageReceiver.cacheAndQueueBatch storing ' +
'MessageReceiver.decryptAndCacheBatch storing ' +
`${decrypted.length} decrypted envelopes`
);
@ -840,10 +835,6 @@ class MessageReceiverInner extends EventTarget {
return {
...data,
// We have sucessfully decrypted the message so don't bother with
// storing the envelope.
envelope: '',
source: envelope.source,
sourceUuid: envelope.sourceUuid,
sourceDevice: envelope.sourceDevice,
@ -862,7 +853,7 @@ class MessageReceiverInner extends EventTarget {
});
window.log.info(
'MessageReceiver.cacheAndQueueBatch acknowledging receipt'
'MessageReceiver.decryptAndCacheBatch acknowledging receipt'
);
// Acknowledge all envelopes
@ -871,13 +862,13 @@ class MessageReceiverInner extends EventTarget {
request.respond(200, 'OK');
} catch (error) {
window.log.error(
'cacheAndQueueBatch: Failed to send 200 to server; still queuing envelope'
'decryptAndCacheBatch: Failed to send 200 to server; still queuing envelope'
);
}
}
} catch (error) {
window.log.error(
'cacheAndQueue error trying to add messages to cache:',
'decryptAndCache error trying to add messages to cache:',
error && error.stack ? error.stack : error
);
@ -893,19 +884,19 @@ class MessageReceiverInner extends EventTarget {
await this.queueDecryptedEnvelope(envelope, plaintext);
} catch (error) {
window.log.error(
'cacheAndQueue error when processing decrypted envelope',
'decryptAndCache error when processing decrypted envelope',
error && error.stack ? error.stack : error
);
}
})
);
window.log.info('MessageReceiver.cacheAndQueueBatch fully processed');
window.log.info('MessageReceiver.decryptAndCacheBatch fully processed');
this.maybeScheduleRetryTimeout();
}
cacheAndQueue(
decryptAndCache(
envelope: EnvelopeClass,
plaintext: ArrayBuffer,
request: IncomingWebSocketRequest
@ -918,7 +909,7 @@ class MessageReceiverInner extends EventTarget {
timestamp: envelope.receivedAtCounter,
attempts: 1,
};
this.cacheAddBatcher.add({
this.decryptAndCacheBatcher.add({
request,
envelope,
data,
@ -944,7 +935,7 @@ class MessageReceiverInner extends EventTarget {
const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
task,
`queueEncryptedEnvelope ${id}`
`queueDecryptedEnvelope ${id}`
);
const promise = this.addToQueue(taskWithTimeout, TaskType.Decrypted);
@ -989,20 +980,22 @@ class MessageReceiverInner extends EventTarget {
}
}
async queueCachedEnvelope(envelope: EnvelopeClass): Promise<void> {
const plaintext = await this.queueEncryptedEnvelope(
{
sessionStore: new Sessions(),
identityKeyStore: new IdentityKeys(),
async queueCachedEnvelope(
data: UnprocessedType,
envelope: EnvelopeClass
): Promise<void> {
this.decryptAndCacheBatcher.add({
request: {
respond(code, status) {
window.log.info(
'queueCachedEnvelope: fake response ' +
`with code ${code} and status ${status}`
);
},
},
envelope
);
if (!plaintext) {
return;
}
await this.queueDecryptedEnvelope(envelope, plaintext);
envelope,
data,
});
}
// Called after `decryptEnvelope` decrypted the message.