onDecryptionError/onRetryRequest: Don't run until queue is empty

This commit is contained in:
Scott Nonnenberg 2021-10-20 14:50:00 -07:00 committed by GitHub
parent fe49edce8a
commit 51af6947d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 75 deletions

View file

@ -51,28 +51,30 @@ import { LatestQueue } from './util/LatestQueue';
import { parseIntOrThrow } from './util/parseIntOrThrow';
import { getProfile } from './util/getProfile';
import {
TypingEvent,
ErrorEvent,
ConfigurationEvent,
ContactEvent,
DecryptionErrorEvent,
DeliveryEvent,
SentEvent,
SentEventData,
ProfileKeyUpdateEvent,
EnvelopeEvent,
ErrorEvent,
FetchLatestEvent,
GroupEvent,
KeysEvent,
MessageEvent,
MessageEventData,
ReadEvent,
ViewEvent,
ConfigurationEvent,
ViewOnceOpenSyncEvent,
MessageRequestResponseEvent,
FetchLatestEvent,
KeysEvent,
StickerPackEvent,
VerifiedEvent,
ProfileKeyUpdateEvent,
ReadEvent,
ReadSyncEvent,
RetryRequestEvent,
SentEvent,
SentEventData,
StickerPackEvent,
TypingEvent,
VerifiedEvent,
ViewEvent,
ViewOnceOpenSyncEvent,
ViewSyncEvent,
ContactEvent,
GroupEvent,
EnvelopeEvent,
} from './textsecure/messageReceiverEvents';
import type { WebAPIType } from './textsecure/WebAPI';
import * as KeyChangeListener from './textsecure/KeyChangeListener';
@ -258,11 +260,15 @@ export async function startApp(): Promise<void> {
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener(onDecryptionError)
queuedEventListener((event: DecryptionErrorEvent) => {
onDecryptionErrorQueue.add(() => onDecryptionError(event));
})
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener(onRetryRequest)
queuedEventListener((event: RetryRequestEvent) => {
onRetryRequestQueue.add(() => onRetryRequest(event));
})
);
messageReceiver.addEventListener('empty', queuedEventListener(onEmpty));
messageReceiver.addEventListener(
@ -338,6 +344,12 @@ export async function startApp(): Promise<void> {
window.Signal.Services.lightSessionResetQueue = lightSessionResetQueue;
lightSessionResetQueue.pause();
const onDecryptionErrorQueue = new window.PQueue();
onDecryptionErrorQueue.pause();
const onRetryRequestQueue = new window.PQueue();
onRetryRequestQueue.pause();
window.Whisper.deliveryReceiptQueue = new window.PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
@ -2077,6 +2089,8 @@ export async function startApp(): Promise<void> {
// To avoid a flood of operations before we catch up, we pause some queues.
profileKeyResponseQueue.pause();
lightSessionResetQueue.pause();
onDecryptionErrorQueue.pause();
onRetryRequestQueue.pause();
window.Whisper.deliveryReceiptQueue.pause();
notificationService.disable();
@ -2327,6 +2341,8 @@ export async function startApp(): Promise<void> {
profileKeyResponseQueue.start();
lightSessionResetQueue.start();
onDecryptionErrorQueue.start();
onRetryRequestQueue.start();
window.Whisper.deliveryReceiptQueue.start();
notificationService.enable();
@ -2391,6 +2407,8 @@ export async function startApp(): Promise<void> {
// notifications in these scenarios too. So we listen for 'reconnect' events.
profileKeyResponseQueue.pause();
lightSessionResetQueue.pause();
onDecryptionErrorQueue.pause();
onRetryRequestQueue.pause();
window.Whisper.deliveryReceiptQueue.pause();
notificationService.disable();
}

View file

@ -1447,7 +1447,6 @@ export default class MessageReceiver
try {
return await this.innerDecrypt(stores, envelope, ciphertext);
} catch (error) {
this.removeFromCache(envelope);
const uuid = envelope.sourceUuid;
const deviceId = envelope.sourceDevice;
@ -1456,6 +1455,7 @@ export default class MessageReceiver
error?.message?.includes &&
error.message.includes('message with old counter')
) {
this.removeFromCache(envelope);
throw error;
}
@ -1465,6 +1465,7 @@ export default class MessageReceiver
error?.message?.includes &&
error.message.includes('trust root validation failed')
) {
this.removeFromCache(envelope);
throw error;
}
@ -1475,22 +1476,26 @@ export default class MessageReceiver
log.info(
'MessageReceiver.decrypt: Error from blocked sender; no further processing'
);
this.removeFromCache(envelope);
throw error;
}
if (uuid && deviceId) {
const { usmc } = envelope;
const event = new DecryptionErrorEvent({
cipherTextBytes: usmc ? usmc.contents() : undefined,
cipherTextType: usmc ? usmc.msgType() : undefined,
contentHint: envelope.contentHint,
groupId: envelope.groupId,
receivedAtCounter: envelope.receivedAtCounter,
receivedAtDate: envelope.receivedAtDate,
senderDevice: deviceId,
senderUuid: uuid,
timestamp: envelope.timestamp,
});
const event = new DecryptionErrorEvent(
{
cipherTextBytes: usmc ? usmc.contents() : undefined,
cipherTextType: usmc ? usmc.msgType() : undefined,
contentHint: envelope.contentHint,
groupId: envelope.groupId,
receivedAtCounter: envelope.receivedAtCounter,
receivedAtDate: envelope.receivedAtDate,
senderDevice: deviceId,
senderUuid: uuid,
timestamp: envelope.timestamp,
},
() => this.removeFromCache(envelope)
);
// Avoid deadlocks by scheduling processing on decrypted queue
this.addToQueue(
@ -1499,6 +1504,7 @@ export default class MessageReceiver
);
} else {
const envelopeId = this.getEnvelopeId(envelope);
this.removeFromCache(envelope);
log.error(
`MessageReceiver.decrypt: Envelope ${envelopeId} missing uuid or deviceId`
);
@ -1801,22 +1807,24 @@ export default class MessageReceiver
const buffer = Buffer.from(decryptionError);
const request = DecryptionErrorMessage.deserialize(buffer);
this.removeFromCache(envelope);
const { sourceUuid, sourceDevice } = envelope;
if (!sourceUuid || !sourceDevice) {
log.error(`handleDecryptionError/${logId}: Missing uuid or device!`);
this.removeFromCache(envelope);
return;
}
const event = new RetryRequestEvent({
groupId: envelope.groupId,
requesterDevice: sourceDevice,
requesterUuid: sourceUuid,
ratchetKey: request.ratchetKey(),
senderDevice: request.deviceId(),
sentAt: request.timestamp(),
});
const event = new RetryRequestEvent(
{
groupId: envelope.groupId,
requesterDevice: sourceDevice,
requesterUuid: sourceUuid,
ratchetKey: request.ratchetKey(),
senderDevice: request.deviceId(),
sentAt: request.timestamp(),
},
() => this.removeFromCache(envelope)
);
await this.dispatchEvent(event);
}

View file

@ -78,39 +78,6 @@ export class ErrorEvent extends Event {
}
}
export type DecryptionErrorEventData = Readonly<{
cipherTextBytes?: Uint8Array;
cipherTextType?: number;
contentHint?: number;
groupId?: string;
receivedAtCounter: number;
receivedAtDate: number;
senderDevice: number;
senderUuid: string;
timestamp: number;
}>;
export class DecryptionErrorEvent extends Event {
constructor(public readonly decryptionError: DecryptionErrorEventData) {
super('decryption-error');
}
}
export type RetryRequestEventData = Readonly<{
groupId?: string;
ratchetKey?: PublicKey;
requesterUuid: string;
requesterDevice: number;
senderDevice: number;
sentAt: number;
}>;
export class RetryRequestEvent extends Event {
constructor(public readonly retryRequest: RetryRequestEventData) {
super('retry-request');
}
}
export class ContactEvent extends Event {
constructor(public readonly contactDetails: ModifiedContactDetails) {
super('contact');
@ -175,6 +142,45 @@ export class DeliveryEvent extends ConfirmableEvent {
}
}
export type DecryptionErrorEventData = Readonly<{
cipherTextBytes?: Uint8Array;
cipherTextType?: number;
contentHint?: number;
groupId?: string;
receivedAtCounter: number;
receivedAtDate: number;
senderDevice: number;
senderUuid: string;
timestamp: number;
}>;
export class DecryptionErrorEvent extends ConfirmableEvent {
constructor(
public readonly decryptionError: DecryptionErrorEventData,
confirm: ConfirmCallback
) {
super('decryption-error', confirm);
}
}
export type RetryRequestEventData = Readonly<{
groupId?: string;
ratchetKey?: PublicKey;
requesterUuid: string;
requesterDevice: number;
senderDevice: number;
sentAt: number;
}>;
export class RetryRequestEvent extends ConfirmableEvent {
constructor(
public readonly retryRequest: RetryRequestEventData,
confirm: ConfirmCallback
) {
super('retry-request', confirm);
}
}
export type SentEventData = Readonly<{
destination?: string;
destinationUuid?: string;

View file

@ -34,7 +34,7 @@ import * as log from '../logging/log';
// Entrypoints
export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
const { retryRequest } = event;
const { confirm, retryRequest } = event;
const {
groupId: requestGroupId,
requesterDevice,
@ -50,6 +50,7 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
log.warn(
`onRetryRequest/${logId}: Feature flag disabled, returning early.`
);
confirm();
return;
}
@ -80,6 +81,7 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
`onRetryRequest/${logId}: Message is too old, refusing to send again.`
);
await sendDistributionMessageOrNullMessage(logId, retryRequest);
confirm();
return;
}
@ -92,6 +94,7 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
if (!sentProto) {
log.info(`onRetryRequest/${logId}: Did not find sent proto`);
await sendDistributionMessageOrNullMessage(logId, retryRequest);
confirm();
return;
}
@ -125,6 +128,9 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
messageIds: [],
sendType: 'resendFromLog',
});
confirm();
log.info(`onRetryRequest/${logId}: Resend complete.`);
}
function maybeShowDecryptionToast(logId: string) {
@ -141,7 +147,7 @@ function maybeShowDecryptionToast(logId: string) {
export async function onDecryptionError(
event: DecryptionErrorEvent
): Promise<void> {
const { decryptionError } = event;
const { confirm, decryptionError } = event;
const { senderUuid, senderDevice, timestamp } = decryptionError;
const logId = `${senderUuid}.${senderDevice} ${timestamp}`;
@ -166,6 +172,7 @@ export async function onDecryptionError(
await startAutomaticSessionReset(decryptionError);
}
confirm();
log.info(`onDecryptionError/${logId}: ...complete`);
}