Support for message retry requests

This commit is contained in:
Scott Nonnenberg 2021-05-28 12:11:19 -07:00 committed by GitHub
parent 28f016ce48
commit ee513a1965
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 1996 additions and 359 deletions

View file

@ -13,9 +13,12 @@
import { isNumber, map, omit, noop } from 'lodash';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { z } from 'zod';
import {
DecryptionErrorMessage,
groupDecrypt,
PlaintextContent,
PreKeySignalMessage,
processSenderKeyDistributionMessage,
ProtocolAddress,
@ -73,7 +76,30 @@ const GROUPV1_ID_LENGTH = 16;
const GROUPV2_ID_LENGTH = 32;
const RETRY_TIMEOUT = 2 * 60 * 1000;
type SessionResetsType = Record<string, number>;
const decryptionErrorTypeSchema = z
.object({
cipherTextBytes: z.instanceof(ArrayBuffer).optional(),
cipherTextType: z.number().optional(),
contentHint: z.number().optional(),
groupId: z.string().optional(),
receivedAtCounter: z.number(),
receivedAtDate: z.number(),
senderDevice: z.number(),
senderUuid: z.string(),
timestamp: z.number(),
})
.passthrough();
export type DecryptionErrorType = z.infer<typeof decryptionErrorTypeSchema>;
const retryRequestTypeSchema = z
.object({
requesterUuid: z.string(),
requesterDevice: z.number(),
senderDevice: z.number(),
sentAt: z.number(),
})
.passthrough();
export type RetryRequestType = z.infer<typeof retryRequestTypeSchema>;
declare global {
// We want to extend `Event`, so we need an interface.
@ -107,6 +133,8 @@ declare global {
timestamp?: any;
typing?: any;
verified?: any;
retryRequest?: RetryRequestType;
decryptionError?: DecryptionErrorType;
}
// We want to extend `Error`, so we need an interface.
// eslint-disable-next-line no-restricted-syntax
@ -261,8 +289,6 @@ class MessageReceiverInner extends EventTarget {
maxSize: 30,
processBatch: this.cacheRemoveBatch.bind(this),
});
this.cleanupSessionResets();
}
static stringToArrayBuffer = (string: string): ArrayBuffer =>
@ -1122,7 +1148,14 @@ class MessageReceiverInner extends EventTarget {
ArrayBuffer | { isMe: boolean } | { isBlocked: boolean } | undefined
>;
if (envelope.type === envelopeTypeEnum.CIPHERTEXT) {
if (envelope.type === envelopeTypeEnum.PLAINTEXT_CONTENT) {
const buffer = Buffer.from(ciphertext.toArrayBuffer());
const plaintextContent = PlaintextContent.deserialize(buffer);
promise = Promise.resolve(
this.unpad(typedArrayToArrayBuffer(plaintextContent.body()))
);
} else if (envelope.type === envelopeTypeEnum.CIPHERTEXT) {
window.log.info('message from', this.getEnvelopeId(envelope));
if (!identifier) {
throw new Error(
@ -1215,6 +1248,13 @@ class MessageReceiverInner extends EventTarget {
originalSource || originalSourceUuid
);
// eslint-disable-next-line no-param-reassign
envelope.contentHint = messageContent.contentHint();
// eslint-disable-next-line no-param-reassign
envelope.groupId = messageContent.groupId()?.toString('base64');
// eslint-disable-next-line no-param-reassign
envelope.usmc = messageContent;
if (
(envelope.source && this.isBlocked(envelope.source)) ||
(envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid))
@ -1231,6 +1271,17 @@ class MessageReceiverInner extends EventTarget {
);
}
if (
messageContent.msgType() ===
unidentifiedSenderTypeEnum.PLAINTEXT_CONTENT
) {
const plaintextContent = PlaintextContent.deserialize(
messageContent.contents()
);
return plaintextContent.body();
}
if (
messageContent.msgType() ===
unidentifiedSenderTypeEnum.SENDERKEY_MESSAGE
@ -1345,10 +1396,26 @@ class MessageReceiverInner extends EventTarget {
}
if (uuid && deviceId) {
// It is safe (from deadlocks) to await this call because the session
// reset is going to be scheduled on a separate p-queue in
// ts/background.ts
await this.lightSessionReset(uuid, deviceId);
const event = new Event('decryption-error');
event.decryptionError = {
cipherTextBytes: envelope.usmc
? typedArrayToArrayBuffer(envelope.usmc.contents())
: undefined,
cipherTextType: envelope.usmc ? envelope.usmc.msgType() : undefined,
contentHint: envelope.contentHint,
groupId: envelope.groupId,
receivedAtCounter: envelope.receivedAtCounter,
receivedAtDate: envelope.receivedAtDate,
senderDevice: deviceId,
senderUuid: uuid,
timestamp: envelope.timestamp.toNumber(),
};
// Avoid deadlocks by scheduling processing on decrypted queue
this.addToQueue(
() => this.dispatchAndWait(event),
TaskType.Decrypted
);
} else {
const envelopeId = this.getEnvelopeId(envelope);
window.log.error(
@ -1360,40 +1427,6 @@ class MessageReceiverInner extends EventTarget {
});
}
isOverHourIntoPast(timestamp: number): boolean {
const HOUR = 1000 * 60 * 60;
const now = Date.now();
const oneHourIntoPast = now - HOUR;
return isNumber(timestamp) && timestamp <= oneHourIntoPast;
}
// We don't lose anything if we delete keys over an hour into the past, because we only
// change our behavior if the timestamps stored are less than an hour ago.
cleanupSessionResets(): void {
const sessionResets = window.storage.get(
'sessionResets',
{}
) as SessionResetsType;
const keys = Object.keys(sessionResets);
keys.forEach(key => {
const timestamp = sessionResets[key];
if (!timestamp || this.isOverHourIntoPast(timestamp)) {
delete sessionResets[key];
}
});
window.storage.put('sessionResets', sessionResets);
}
async lightSessionReset(uuid: string, deviceId: number): Promise<void> {
const event = new Event('light-session-reset');
event.senderUuid = uuid;
event.senderDevice = deviceId;
await this.dispatchAndWait(event);
}
async handleSentMessage(
envelope: EnvelopeClass,
sentContainer: SyncMessageClass.Sent
@ -1630,7 +1663,10 @@ class MessageReceiverInner extends EventTarget {
// make sure to process it first. If that fails, we still try to process
// the rest of the message.
try {
if (content.senderKeyDistributionMessage) {
if (
content.senderKeyDistributionMessage &&
!isByteBufferEmpty(content.senderKeyDistributionMessage)
) {
await this.handleSenderKeyDistributionMessage(
envelope,
content.senderKeyDistributionMessage
@ -1643,6 +1679,16 @@ class MessageReceiverInner extends EventTarget {
);
}
if (
content.decryptionErrorMessage &&
!isByteBufferEmpty(content.decryptionErrorMessage)
) {
await this.handleDecryptionError(
envelope,
content.decryptionErrorMessage
);
return;
}
if (content.syncMessage) {
await this.handleSyncMessage(envelope, content.syncMessage);
return;
@ -1675,6 +1721,34 @@ class MessageReceiverInner extends EventTarget {
}
}
async handleDecryptionError(
envelope: EnvelopeClass,
decryptionError: ByteBufferClass
) {
const envelopeId = this.getEnvelopeId(envelope);
window.log.info(`handleDecryptionError: ${envelopeId}`);
const buffer = Buffer.from(decryptionError.toArrayBuffer());
const request = DecryptionErrorMessage.deserialize(buffer);
this.removeFromCache(envelope);
const { sourceUuid, sourceDevice } = envelope;
if (!sourceUuid || !sourceDevice) {
window.log.error('handleDecryptionError: Missing uuid or device!');
return;
}
const event = new Event('retry-request');
event.retryRequest = {
sentAt: request.timestamp(),
requesterUuid: sourceUuid,
requesterDevice: sourceDevice,
senderDevice: request.deviceId(),
};
await this.dispatchAndWait(event);
}
async handleSenderKeyDistributionMessage(
envelope: EnvelopeClass,
distributionMessage: ByteBufferClass
@ -2603,10 +2677,6 @@ export default class MessageReceiver {
this.stopProcessing = inner.stopProcessing.bind(inner);
this.unregisterBatchers = inner.unregisterBatchers.bind(inner);
// For tests
this.isOverHourIntoPast = inner.isOverHourIntoPast.bind(inner);
this.cleanupSessionResets = inner.cleanupSessionResets.bind(inner);
inner.connect();
this.getProcessedCount = () => inner.processedCount;
}
@ -2629,10 +2699,6 @@ export default class MessageReceiver {
unregisterBatchers: () => void;
isOverHourIntoPast: (timestamp: number) => boolean;
cleanupSessionResets: () => void;
getProcessedCount: () => number;
static stringToArrayBuffer = MessageReceiverInner.stringToArrayBuffer;