From c11a894ad124a9018d5af56aaf7ff19f9d67ad35 Mon Sep 17 00:00:00 2001 From: trevor-signal <131492920+trevor-signal@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:52:31 -0400 Subject: [PATCH] Don't block message processing on contact syncs --- ts/background.ts | 1 - ts/services/contactSync.ts | 52 +++++++++++++++++++-- ts/test-electron/MessageReceiver_test.ts | 2 - ts/textsecure/MessageReceiver.ts | 59 ++++-------------------- ts/textsecure/messageReceiverEvents.ts | 4 +- 5 files changed, 59 insertions(+), 59 deletions(-) diff --git a/ts/background.ts b/ts/background.ts index a99e80148663..cf5b6d325557 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -542,7 +542,6 @@ export async function startApp(): Promise { log.info('Initializing MessageReceiver'); messageReceiver = new MessageReceiver({ - server, storage: window.storage, serverTrustRoot: window.getServerTrustRoot(), }); diff --git a/ts/services/contactSync.ts b/ts/services/contactSync.ts index e9d49ffa9709..ca9f6e291434 100644 --- a/ts/services/contactSync.ts +++ b/ts/services/contactSync.ts @@ -5,7 +5,10 @@ import PQueue from 'p-queue'; import { DataWriter } from '../sql/Client'; import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents'; -import type { ContactDetailsWithAvatar } from '../textsecure/ContactsParser'; +import { + parseContactsV2, + type ContactDetailsWithAvatar, +} from '../textsecure/ContactsParser'; import { normalizeAci } from '../util/normalizeAci'; import * as Conversation from '../types/Conversation'; import * as Errors from '../types/errors'; @@ -15,6 +18,11 @@ import { validateConversation } from '../util/validateConversation'; import { isDirectConversation, isMe } from '../util/whatTypeOfConversation'; import * as log from '../logging/log'; import { dropNull } from '../util/dropNull'; +import type { ProcessedAttachment } from '../textsecure/Types'; +import { downloadAttachment } from '../textsecure/downloadAttachment'; +import { strictAssert } from '../util/assert'; +import type { ReencryptedAttachmentV2 } from '../AttachmentCrypto'; +import { SECOND } from '../util/durations'; // When true - we are running the very first storage and contact sync after // linking. @@ -86,8 +94,34 @@ async function updateConversationFromContactSync( const queue = new PQueue({ concurrency: 1 }); +async function downloadAndParseContactAttachment( + contactAttachment: ProcessedAttachment +) { + strictAssert(window.textsecure.server, 'server must exist'); + let downloaded: ReencryptedAttachmentV2 | undefined; + try { + downloaded = await downloadAttachment( + window.textsecure.server, + contactAttachment, + { + disableRetries: true, + timeout: 90 * SECOND, + } + ); + + return await parseContactsV2({ + ...contactAttachment, + ...downloaded, + }); + } finally { + if (downloaded?.path) { + await window.Signal.Migrations.deleteAttachmentData(downloaded.path); + } + } +} + async function doContactSync({ - contacts, + contactAttachment, complete: isFullSync, receivedAtCounter, sentAt, @@ -95,8 +129,11 @@ async function doContactSync({ const logId = `doContactSync(sent=${sentAt}, ` + `receivedAt=${receivedAtCounter}, isFullSync=${isFullSync})`; - log.info(`${logId}: got ${contacts.length} contacts`); + log.info(`${logId}: downloading contact attachment`); + const contacts = await downloadAndParseContactAttachment(contactAttachment); + + log.info(`${logId}: got ${contacts.length} contacts`); const updatedConversations = new Set(); let promises = new Array>(); @@ -195,5 +232,12 @@ export async function onContactSync(ev: ContactSyncEvent): Promise { log.info( `onContactSync(sent=${ev.sentAt}, receivedAt=${ev.receivedAtCounter}): queueing sync` ); - await queue.add(() => doContactSync(ev)); + + const promise = queue.add(() => doContactSync(ev)); + + // Returning the promise blocks MessageReceiver.appQueue, which we only want to do on + // initial sync + if (isInitialSync) { + return promise; + } } diff --git a/ts/test-electron/MessageReceiver_test.ts b/ts/test-electron/MessageReceiver_test.ts index 0c9df43609db..006f82d81b7d 100644 --- a/ts/test-electron/MessageReceiver_test.ts +++ b/ts/test-electron/MessageReceiver_test.ts @@ -8,7 +8,6 @@ import Long from 'long'; import MessageReceiver from '../textsecure/MessageReceiver'; import { IncomingWebSocketRequestLegacy } from '../textsecure/WebsocketResources'; -import type { WebAPIType } from '../textsecure/WebAPI'; import type { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents'; import { generateAci } from '../types/ServiceId'; import type { AciString } from '../types/ServiceId'; @@ -39,7 +38,6 @@ describe('MessageReceiver', () => { describe('connecting', () => { it('generates decryption-error event when it cannot decrypt', async () => { const messageReceiver = new MessageReceiver({ - server: {} as WebAPIType, storage: window.storage, serverTrustRoot: 'AAAAAAAA', }); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 08425d07cc47..3820da5565e7 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -49,8 +49,7 @@ import { parseIntOrThrow } from '../util/parseIntOrThrow'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { Zone } from '../util/Zone'; import * as durations from '../util/durations'; -import { DurationInSeconds, SECOND } from '../util/durations'; -import type { AttachmentType } from '../types/Attachment'; +import { DurationInSeconds } from '../util/durations'; import { Address } from '../types/Address'; import { QualifiedAddress } from '../types/QualifiedAddress'; import { normalizeStoryDistributionId } from '../types/StoryDistributionId'; @@ -82,11 +81,8 @@ import { import { processSyncMessage } from './processSyncMessage'; import type { EventHandler } from './EventTarget'; import EventTarget from './EventTarget'; -import { downloadAttachment } from './downloadAttachment'; import type { IncomingWebSocketRequest } from './WebsocketResources'; import { ServerRequestType } from './WebsocketResources'; -import { parseContactsV2 } from './ContactsParser'; -import type { WebAPIType } from './WebAPI'; import type { Storage } from './Storage'; import { WarnOnlyError } from './Errors'; import * as Bytes from '../Bytes'; @@ -215,7 +211,6 @@ enum TaskType { } export type MessageReceiverOptions = { - server: WebAPIType; storage: Storage; serverTrustRoot: string; }; @@ -287,8 +282,6 @@ export default class MessageReceiver { /* eslint-enable @typescript-eslint/brace-style */ - private server: WebAPIType; - private storage: Storage; private appQueue: PQueue; @@ -319,10 +312,9 @@ export default class MessageReceiver private isAppReadyForProcessing: boolean = false; - constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) { + constructor({ storage, serverTrustRoot }: MessageReceiverOptions) { super(); - this.server = server; this.storage = storage; this.count = 0; @@ -3823,34 +3815,13 @@ export default class MessageReceiver this.removeFromCache(envelope); - let attachment: AttachmentType | undefined; - try { - attachment = await this.handleAttachmentV2(blob, { - disableRetries: true, - timeout: 90 * SECOND, - }); - - const { path } = attachment; - if (!path) { - throw new Error('Failed no path field in returned attachment'); - } - - const contacts = await parseContactsV2(attachment); - - const contactSync = new ContactSyncEvent( - contacts, - Boolean(contactSyncProto.complete), - envelope.receivedAtCounter, - envelope.timestamp - ); - await this.dispatchAndWait(logId, contactSync); - - log.info('handleContacts: finished'); - } finally { - if (attachment?.path) { - await window.Signal.Migrations.deleteAttachmentData(attachment.path); - } - } + const contactSync = new ContactSyncEvent( + processAttachment(blob), + Boolean(contactSyncProto.complete), + envelope.receivedAtCounter, + envelope.timestamp + ); + await this.dispatchAndWait(logId, contactSync); } private async handleBlocked( @@ -3937,18 +3908,6 @@ export default class MessageReceiver return this.storage.blocked.isGroupBlocked(groupId); } - private async handleAttachmentV2( - attachment: Proto.IAttachmentPointer, - options?: { timeout?: number; disableRetries?: boolean } - ): Promise { - const cleaned = processAttachment(attachment); - const downloaded = await downloadAttachment(this.server, cleaned, options); - return { - ...cleaned, - ...downloaded, - }; - } - private async handleEndSession( envelope: ProcessedEnvelope, theirServiceId: ServiceIdString diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index 26f4d1342cf3..fc433162743e 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -16,8 +16,8 @@ import type { ProcessedEnvelope, ProcessedDataMessage, ProcessedSent, + ProcessedAttachment, } from './Types.d'; -import type { ContactDetailsWithAvatar } from './ContactsParser'; import type { CallEventDetails, CallLogEventDetails, @@ -84,7 +84,7 @@ export class ErrorEvent extends Event { export class ContactSyncEvent extends Event { constructor( - public readonly contacts: ReadonlyArray, + public readonly contactAttachment: ProcessedAttachment, public readonly complete: boolean, public readonly receivedAtCounter: number, public readonly sentAt: number