From 7ce4beb270d772fe9786a652cc11d67b7bfeea69 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Wed, 24 Aug 2022 22:04:42 -0700 Subject: [PATCH] Refactor contact sync processing --- sticker-creator/window/phase1-dependencies.ts | 2 - ts/background.ts | 119 ++---------- ts/models/conversations.ts | 9 +- ts/services/contactSync.ts | 179 ++++++++++++++++++ ts/sql/Client.ts | 3 +- ts/textsecure/ContactsParser.ts | 28 ++- ts/textsecure/MessageReceiver.ts | 31 +-- ts/textsecure/messageReceiverEvents.ts | 10 +- ts/window.d.ts | 2 - ts/windows/main/phase2-dependencies.ts | 2 - 10 files changed, 229 insertions(+), 156 deletions(-) create mode 100644 ts/services/contactSync.ts diff --git a/sticker-creator/window/phase1-dependencies.ts b/sticker-creator/window/phase1-dependencies.ts index 90278326c8..e3185c0dd0 100644 --- a/sticker-creator/window/phase1-dependencies.ts +++ b/sticker-creator/window/phase1-dependencies.ts @@ -1,7 +1,6 @@ // Copyright 2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import PQueue from 'p-queue'; import Backbone from 'backbone'; import { ipcRenderer as ipc } from 'electron'; @@ -17,7 +16,6 @@ window.ROOT_PATH = window.location.href.startsWith('file') ? '../../' : '/'; window.getEnvironment = getEnvironment; window.getVersion = () => window.SignalContext.config.version; -window.PQueue = PQueue; window.Backbone = Backbone; window.localeMessages = ipc.sendSync('locale-data'); diff --git a/ts/background.ts b/ts/background.ts index f928a4465e..dd902c610e 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -6,6 +6,7 @@ import { isNumber } from 'lodash'; import { bindActionCreators } from 'redux'; import { render } from 'react-dom'; import { batch as batchDispatch } from 'react-redux'; +import PQueue from 'p-queue'; import MessageReceiver from './textsecure/MessageReceiver'; import type { @@ -21,7 +22,6 @@ import type { MessageAttributesType, ConversationAttributesType, ReactionAttributesType, - ValidateConversationType, } from './model-types.d'; import * as Bytes from './Bytes'; import * as Timers from './Timers'; @@ -64,6 +64,7 @@ import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue'; import { ourProfileKeyService } from './services/ourProfileKey'; import { notificationService } from './services/notifications'; import { areWeASubscriberService } from './services/areWeASubscriber'; +import { onContactSync, setIsInitialSync } from './services/contactSync'; import { startTimeTravelDetector } from './util/startTimeTravelDetector'; import { shouldRespondWithProfileKey } from './util/shouldRespondWithProfileKey'; import { LatestQueue } from './util/LatestQueue'; @@ -71,7 +72,6 @@ import { parseIntOrThrow } from './util/parseIntOrThrow'; import { getProfile } from './util/getProfile'; import type { ConfigurationEvent, - ContactEvent, DecryptionErrorEvent, DeliveryEvent, EnvelopeEvent, @@ -153,7 +153,6 @@ import { SeenStatus } from './MessageSeenStatus'; import MessageSender from './textsecure/SendMessage'; import type AccountManager from './textsecure/AccountManager'; import { onStoryRecipientUpdate } from './util/onStoryRecipientUpdate'; -import { validateConversation } from './util/validateConversation'; const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000; @@ -314,13 +313,9 @@ export async function startApp(): Promise { 'delivery', queuedEventListener(onDeliveryReceipt) ); - messageReceiver.addEventListener( - 'contact', - queuedEventListener(onContactReceived, false) - ); messageReceiver.addEventListener( 'contactSync', - queuedEventListener(onContactSyncComplete) + queuedEventListener(onContactSync) ); messageReceiver.addEventListener( 'group', @@ -430,26 +425,26 @@ export async function startApp(): Promise { areWeASubscriberService.update(window.storage, server); }); - const eventHandlerQueue = new window.PQueue({ + const eventHandlerQueue = new PQueue({ concurrency: 1, timeout: durations.MINUTE * 30, }); // Note: this queue is meant to allow for stop/start of tasks, not limit parallelism. - const profileKeyResponseQueue = new window.PQueue(); + const profileKeyResponseQueue = new PQueue(); profileKeyResponseQueue.pause(); - const lightSessionResetQueue = new window.PQueue({ concurrency: 1 }); + const lightSessionResetQueue = new PQueue({ concurrency: 1 }); window.Signal.Services.lightSessionResetQueue = lightSessionResetQueue; lightSessionResetQueue.pause(); - const onDecryptionErrorQueue = new window.PQueue({ concurrency: 1 }); + const onDecryptionErrorQueue = new PQueue({ concurrency: 1 }); onDecryptionErrorQueue.pause(); - const onRetryRequestQueue = new window.PQueue({ concurrency: 1 }); + const onRetryRequestQueue = new PQueue({ concurrency: 1 }); onRetryRequestQueue.pause(); - window.Whisper.deliveryReceiptQueue = new window.PQueue({ + window.Whisper.deliveryReceiptQueue = new PQueue({ concurrency: 1, timeout: durations.MINUTE * 30, }); @@ -2022,10 +2017,6 @@ export async function startApp(): Promise { } } - // When true - we are running the very first storage and contact sync after - // linking. - let isInitialSync = false; - let connectCount = 0; let connecting = false; async function connect(firstRun?: boolean) { @@ -2040,7 +2031,7 @@ export async function startApp(): Promise { connecting = true; // Reset the flag and update it below if needed - isInitialSync = false; + setIsInitialSync(false); log.info('connect', { firstRun, connectCount }); @@ -2286,7 +2277,7 @@ export async function startApp(): Promise { const contactSyncComplete = waitForEvent('contactSync:complete'); log.info('firstRun: requesting initial sync'); - isInitialSync = true; + setIsInitialSync(true); // Request configuration, block, GV1 sync messages, contacts // (only avatars and inboxPosition),and Storage Service sync. @@ -2321,7 +2312,7 @@ export async function startApp(): Promise { } log.info('firstRun: initial sync complete'); - isInitialSync = false; + setIsInitialSync(false); // Switch to inbox view even if contact sync is still running if ( @@ -2717,92 +2708,6 @@ export async function startApp(): Promise { }); } - async function onContactSyncComplete() { - log.info('onContactSyncComplete'); - await window.storage.put('synced_at', Date.now()); - window.Whisper.events.trigger('contactSync:complete'); - } - - // Note: Like the handling for incoming/outgoing messages, this method is synchronous, - // deferring its async logic to the function passed to conversation.queueJob(). - function onContactReceived(ev: ContactEvent) { - const details = ev.contactDetails; - - const partialConversation: ValidateConversationType = { - e164: details.number, - uuid: UUID.cast(details.uuid), - type: 'private', - }; - - const validationError = validateConversation(partialConversation); - if (validationError) { - log.error( - 'Invalid contact received:', - Errors.toLogFormat(validationError) - ); - return; - } - - const conversation = window.ConversationController.maybeMergeContacts({ - e164: details.number, - aci: details.uuid, - reason: 'onContactReceived', - }); - strictAssert(conversation, 'need conversation to queue the job!'); - - // It's important to use queueJob here because we might update the expiration timer - // and we don't want conflicts with incoming message processing happening on the - // conversation queue. - conversation.queueJob('onContactReceived', async () => { - try { - conversation.set({ - name: details.name, - inbox_position: details.inboxPosition, - }); - - // Update the conversation avatar only if new avatar exists and hash differs - const { avatar } = details; - if (avatar && avatar.data) { - const newAttributes = await Conversation.maybeUpdateAvatar( - conversation.attributes, - avatar.data, - { - writeNewAttachmentData, - deleteAttachmentData, - doesAttachmentExist, - } - ); - conversation.set(newAttributes); - } else { - const { attributes } = conversation; - if (attributes.avatar && attributes.avatar.path) { - await deleteAttachmentData(attributes.avatar.path); - } - conversation.set({ avatar: null }); - } - - window.Signal.Data.updateConversation(conversation.attributes); - - // expireTimer isn't in Storage Service so we have to rely on contact sync. - const { expireTimer } = details; - const isValidExpireTimer = typeof expireTimer === 'number'; - if (isValidExpireTimer) { - await conversation.updateExpirationTimer(expireTimer, { - source: window.ConversationController.getOurConversationId(), - receivedAt: ev.receivedAtCounter, - fromSync: true, - isInitialSync, - reason: 'contact sync', - }); - } - - window.Whisper.events.trigger('incrementProgress'); - } catch (error) { - log.error('onContactReceived error:', Errors.toLogFormat(error)); - } - }); - } - async function onGroupSyncComplete() { log.info('onGroupSyncComplete'); await window.storage.put('synced_at', Date.now()); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index c45dccf2f5..a991653587 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -4,6 +4,7 @@ import { compact, has, isNumber, throttle, debounce } from 'lodash'; import { batch as batchDispatch } from 'react-redux'; import { v4 as generateGuid } from 'uuid'; +import PQueue from 'p-queue'; import type { ConversationAttributesType, @@ -191,9 +192,9 @@ export class ConversationModel extends window.Backbone inProgressFetch?: Promise; - newMessageQueue?: typeof window.PQueueType; + newMessageQueue?: PQueue; - jobQueue?: typeof window.PQueueType; + jobQueue?: PQueue; storeName?: string | null; @@ -1313,7 +1314,7 @@ export class ConversationModel extends window.Backbone private async beforeAddSingleMessage(): Promise { if (!this.newMessageQueue) { - this.newMessageQueue = new window.PQueue({ + this.newMessageQueue = new PQueue({ concurrency: 1, timeout: durations.MINUTE * 30, }); @@ -3427,7 +3428,7 @@ export class ConversationModel extends window.Backbone name: string, callback: (abortSignal: AbortSignal) => Promise ): Promise { - this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 }); + this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 }); const taskWithTimeout = createTaskWithTimeout( callback, diff --git a/ts/services/contactSync.ts b/ts/services/contactSync.ts new file mode 100644 index 0000000000..2d4f937a2a --- /dev/null +++ b/ts/services/contactSync.ts @@ -0,0 +1,179 @@ +// Copyright 2020-2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import PQueue from 'p-queue'; + +import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents'; +import type { ModifiedContactDetails } from '../textsecure/ContactsParser'; +import { UUID } from '../types/UUID'; +import * as Conversation from '../types/Conversation'; +import * as Errors from '../types/errors'; +import type { ValidateConversationType } from '../model-types.d'; +import type { ConversationModel } from '../models/conversations'; +import { validateConversation } from '../util/validateConversation'; +import { strictAssert } from '../util/assert'; +import { isDirectConversation, isMe } from '../util/whatTypeOfConversation'; +import * as log from '../logging/log'; + +// When true - we are running the very first storage and contact sync after +// linking. +let isInitialSync = false; + +export function setIsInitialSync(newValue: boolean): void { + log.info(`setIsInitialSync(${newValue})`); + isInitialSync = newValue; +} + +async function updateConversationFromContactSync( + conversation: ConversationModel, + details: ModifiedContactDetails, + receivedAtCounter: number +): Promise { + const { writeNewAttachmentData, deleteAttachmentData, doesAttachmentExist } = + window.Signal.Migrations; + + conversation.set({ + name: details.name, + inbox_position: details.inboxPosition, + }); + + // Update the conversation avatar only if new avatar exists and hash differs + const { avatar } = details; + if (avatar && avatar.data) { + const newAttributes = await Conversation.maybeUpdateAvatar( + conversation.attributes, + avatar.data, + { + writeNewAttachmentData, + deleteAttachmentData, + doesAttachmentExist, + } + ); + conversation.set(newAttributes); + } else { + const { attributes } = conversation; + if (attributes.avatar && attributes.avatar.path) { + await deleteAttachmentData(attributes.avatar.path); + } + conversation.set({ avatar: null }); + } + + // expireTimer isn't in Storage Service so we have to rely on contact sync. + const { expireTimer } = details; + const isValidExpireTimer = typeof expireTimer === 'number'; + if (isValidExpireTimer) { + await conversation.updateExpirationTimer(expireTimer, { + source: window.ConversationController.getOurConversationId(), + receivedAt: receivedAtCounter, + fromSync: true, + isInitialSync, + reason: 'contact sync', + }); + } + + window.Whisper.events.trigger('incrementProgress'); +} + +const queue = new PQueue({ concurrency: 1 }); + +async function doContactSync({ + contacts, + receivedAtCounter, +}: ContactSyncEvent): Promise { + log.info( + `doContactSync(${receivedAtCounter}): got ${contacts.length} contacts` + ); + + const updatedConversations = new Set(); + + let promises = new Array>(); + for (const details of contacts) { + const partialConversation: ValidateConversationType = { + e164: details.number, + uuid: UUID.cast(details.uuid), + type: 'private', + }; + + const validationError = validateConversation(partialConversation); + if (validationError) { + log.error( + `doContactSync(${receivedAtCounter}): Invalid contact received`, + Errors.toLogFormat(validationError) + ); + continue; + } + + const conversation = window.ConversationController.maybeMergeContacts({ + e164: details.number, + aci: details.uuid, + reason: `doContactSync(${receivedAtCounter})`, + }); + strictAssert(conversation, 'need conversation to queue the job!'); + + // It's important to use queueJob here because we might update the expiration timer + // and we don't want conflicts with incoming message processing happening on the + // conversation queue. + const job = conversation.queueJob( + `doContactSync(${receivedAtCounter}).set`, + async () => { + try { + await updateConversationFromContactSync( + conversation, + details, + receivedAtCounter + ); + + updatedConversations.add(conversation); + } catch (error) { + log.error( + 'updateConversationFromContactSync error:', + Errors.toLogFormat(error) + ); + } + } + ); + + promises.push(job); + } + + // updatedConversations are not populated until the promises are resolved + await Promise.all(promises); + promises = []; + + const notUpdated = window.ConversationController.getAll().filter( + convo => + !updatedConversations.has(convo) && + isDirectConversation(convo.attributes) && + !isMe(convo.attributes) + ); + + log.info( + `doContactSync(${receivedAtCounter}): ` + + `updated ${updatedConversations.size} ` + + `resetting ${notUpdated.length}` + ); + + for (const conversation of notUpdated) { + conversation.set({ + name: undefined, + inbox_position: undefined, + }); + } + + // Save new conversation attributes + promises.push( + window.Signal.Data.updateConversations( + [...updatedConversations, ...notUpdated].map(convo => convo.attributes) + ) + ); + + await Promise.all(promises); + + await window.storage.put('synced_at', Date.now()); + window.Whisper.events.trigger('contactSync:complete'); +} + +export async function onContactSync(ev: ContactSyncEvent): Promise { + log.info(`onContactSync(${ev.receivedAtCounter}): queueing sync`); + await queue.add(() => doContactSync(ev)); +} diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index c57c031001..06af49e561 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -6,6 +6,7 @@ import { ipcRenderer as ipc } from 'electron'; import fs from 'fs-extra'; import pify from 'pify'; +import PQueue from 'p-queue'; import { compact, @@ -1490,7 +1491,7 @@ async function removeAllMessagesInConversation( log.info(`removeAllMessagesInConversation/${logId}: Cleanup...`); // Note: It's very important that these models are fully hydrated because // we need to delete all associated on-disk files along with the database delete. - const queue = new window.PQueue({ concurrency: 3, timeout: MINUTE * 30 }); + const queue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 }); queue.addAll( messages.map( (message: MessageType) => async () => cleanupMessage(message) diff --git a/ts/textsecure/ContactsParser.ts b/ts/textsecure/ContactsParser.ts index b2ec603c6a..a83ba7a184 100644 --- a/ts/textsecure/ContactsParser.ts +++ b/ts/textsecure/ContactsParser.ts @@ -30,10 +30,12 @@ export type ModifiedGroupDetails = MessageWithAvatar; export type ModifiedContactDetails = MessageWithAvatar; -class ParserBase< +abstract class ParserBase< Message extends OptionalAvatar, - Decoder extends DecoderBase -> { + Decoder extends DecoderBase, + Result +> implements Iterable +{ protected readonly reader: protobuf.Reader; constructor(bytes: Uint8Array, private readonly decoder: Decoder) { @@ -83,17 +85,28 @@ class ParserBase< return undefined; } } + + public abstract next(): Result | undefined; + + *[Symbol.iterator](): Iterator { + let result = this.next(); + while (result !== undefined) { + yield result; + result = this.next(); + } + } } export class GroupBuffer extends ParserBase< Proto.GroupDetails, - typeof Proto.GroupDetails + typeof Proto.GroupDetails, + ModifiedGroupDetails > { constructor(arrayBuffer: Uint8Array) { super(arrayBuffer, Proto.GroupDetails); } - public next(): ModifiedGroupDetails | undefined { + public override next(): ModifiedGroupDetails | undefined { const proto = this.decodeDelimited(); if (!proto) { return undefined; @@ -120,13 +133,14 @@ export class GroupBuffer extends ParserBase< export class ContactBuffer extends ParserBase< Proto.ContactDetails, - typeof Proto.ContactDetails + typeof Proto.ContactDetails, + ModifiedContactDetails > { constructor(arrayBuffer: Uint8Array) { super(arrayBuffer, Proto.ContactDetails); } - public next(): ModifiedContactDetails | undefined { + public override next(): ModifiedContactDetails | undefined { const proto = this.decodeDelimited(); if (!proto) { return undefined; diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 7ce78790cf..d7b900f2f6 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -104,7 +104,6 @@ import { StickerPackEvent, ReadSyncEvent, ViewSyncEvent, - ContactEvent, ContactSyncEvent, GroupEvent, GroupSyncEvent, @@ -561,11 +560,6 @@ export default class MessageReceiver handler: (ev: ViewSyncEvent) => void ): void; - public override addEventListener( - name: 'contact', - handler: (ev: ContactEvent) => void - ): void; - public override addEventListener( name: 'contactSync', handler: (ev: ContactSyncEvent) => void @@ -2748,6 +2742,9 @@ export default class MessageReceiver return this.handleSentMessage(envelope, sentMessage); } if (syncMessage.contacts) { + // Note: we do not return here because we don't want to block the next + // message on this attachment download and a lot of processing of that + // attachment. this.handleContacts(envelope, syncMessage.contacts); return; } @@ -3068,26 +3065,14 @@ export default class MessageReceiver this.removeFromCache(envelope); - // Note: we do not return here because we don't want to block the next message on - // this attachment download and a lot of processing of that attachment. const attachmentPointer = await this.handleAttachment(blob); - const results = []; const contactBuffer = new ContactBuffer(attachmentPointer.data); - let contactDetails = contactBuffer.next(); - while (contactDetails !== undefined) { - const contactEvent = new ContactEvent( - contactDetails, - envelope.receivedAtCounter - ); - results.push(this.dispatchAndWait(contactEvent)); - contactDetails = contactBuffer.next(); - } - - await Promise.all(results); - - const finalEvent = new ContactSyncEvent(); - await this.dispatchAndWait(finalEvent); + const contactSync = new ContactSyncEvent( + Array.from(contactBuffer), + envelope.receivedAtCounter + ); + await this.dispatchAndWait(contactSync); log.info('handleContacts: finished'); } diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index 919c8e7c2e..32fbbe41e8 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -73,17 +73,11 @@ export class ErrorEvent extends Event { } } -export class ContactEvent extends Event { +export class ContactSyncEvent extends Event { constructor( - public readonly contactDetails: ModifiedContactDetails, + public readonly contacts: ReadonlyArray, public readonly receivedAtCounter: number ) { - super('contact'); - } -} - -export class ContactSyncEvent extends Event { - constructor() { super('contactSync'); } } diff --git a/ts/window.d.ts b/ts/window.d.ts index 5b3db6b83a..b5992ebf50 100644 --- a/ts/window.d.ts +++ b/ts/window.d.ts @@ -243,8 +243,6 @@ declare global { getAutoLaunch: () => Promise; setAutoLaunch: (value: boolean) => Promise; - PQueue: typeof PQueue; - PQueueType: PQueue; Mustache: { render: (template: string, data: any, partials?: any) => string; parse: (template: string) => void; diff --git a/ts/windows/main/phase2-dependencies.ts b/ts/windows/main/phase2-dependencies.ts index 18936303c5..faa261c355 100644 --- a/ts/windows/main/phase2-dependencies.ts +++ b/ts/windows/main/phase2-dependencies.ts @@ -7,7 +7,6 @@ import * as React from 'react'; import * as ReactDOM from 'react-dom'; import * as moment from 'moment'; import 'moment/min/locales.min'; -import PQueue from 'p-queue'; import { textsecure } from '../../textsecure'; import { imageToBlurHash } from '../../util/imageToBlurHash'; @@ -44,7 +43,6 @@ window.libphonenumberFormat = PhoneNumberFormat; window.React = React; window.ReactDOM = ReactDOM; -window.PQueue = PQueue; const { locale } = config; moment.updateLocale(locale, {