diff --git a/package.json b/package.json index ff48a0158ba..0eab967a84e 100644 --- a/package.json +++ b/package.json @@ -187,7 +187,7 @@ "@babel/preset-typescript": "7.16.0", "@chanzuckerberg/axe-storybook-testing": "3.0.2", "@electron/fuses": "1.5.0", - "@signalapp/mock-server": "1.1.2", + "@signalapp/mock-server": "1.2.0", "@storybook/addon-actions": "5.1.11", "@storybook/addon-knobs": "5.1.11", "@storybook/addons": "5.1.11", diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 43c69a0bdd2..193be8a0f59 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -2542,7 +2542,11 @@ export class ConversationModel extends window.Backbone } this.set({ verified }); - window.Signal.Data.updateConversation(this.attributes); + + // We will update the conversation during storage service sync + if (!options.viaStorageServiceSync) { + window.Signal.Data.updateConversation(this.attributes); + } if ( !options.viaStorageServiceSync && @@ -4655,7 +4659,7 @@ export class ConversationModel extends window.Backbone async setProfileKey( profileKey: string | undefined, { viaStorageServiceSync = false } = {} - ): Promise { + ): Promise { // profileKey is a string so we can compare it directly if (this.get('profileKey') !== profileKey) { log.info( @@ -4665,13 +4669,15 @@ export class ConversationModel extends window.Backbone about: undefined, aboutEmoji: undefined, profileAvatar: undefined, - profileKey, profileKeyVersion: undefined, profileKeyCredential: null, accessKey: null, sealedSender: SEALED_SENDER.UNKNOWN, }); + // Don't trigger immediate profile fetches when syncing to remote storage + this.set({ profileKey }, { silent: viaStorageServiceSync }); + // If our profile key was cleared above, we don't tell our linked devices about it. // We want linked devices to tell us what it should be, instead of telling them to // erase their local value. @@ -4684,8 +4690,14 @@ export class ConversationModel extends window.Backbone this.deriveProfileKeyVersionIfNeeded(), ]); - window.Signal.Data.updateConversation(this.attributes); + // We will update the conversation during storage service sync + if (!viaStorageServiceSync) { + window.Signal.Data.updateConversation(this.attributes); + } + + return true; } + return false; } async deriveAccessKeyIfNeeded(): Promise { @@ -4989,8 +5001,8 @@ export class ConversationModel extends window.Backbone if (!viaStorageServiceSync) { this.captureChange('mutedUntilTimestamp'); + window.Signal.Data.updateConversation(this.attributes); } - window.Signal.Data.updateConversation(this.attributes); } isMuted(): boolean { diff --git a/ts/services/storage.ts b/ts/services/storage.ts index 73bc5379a75..5f20b2522dc 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -50,8 +50,11 @@ import type { type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier; -const { eraseStorageServiceStateFromConversations, updateConversation } = - dataInterface; +const { + eraseStorageServiceStateFromConversations, + updateConversation, + updateConversations, +} = dataInterface; const uploadBucket: Array = []; @@ -738,6 +741,8 @@ type MergedRecordType = UnknownRecord & { shouldDrop: boolean; hasError: boolean; isUnsupported: boolean; + updatedConversations: ReadonlyArray; + needProfileFetch: ReadonlyArray; }; async function mergeRecord( @@ -751,6 +756,8 @@ async function mergeRecord( let mergeResult: MergeResultType = { hasConflict: false, details: [] }; let isUnsupported = false; let hasError = false; + let updatedConversations = new Array(); + const needProfileFetch = new Array(); try { if (itemType === ITEM_TYPE.UNKNOWN) { @@ -797,6 +804,15 @@ async function mergeRecord( const oldID = mergeResult.oldStorageID ? redactStorageID(mergeResult.oldStorageID, mergeResult.oldStorageVersion) : '?'; + updatedConversations = [ + ...updatedConversations, + ...(mergeResult.updatedConversations ?? []), + ]; + if (mergeResult.needsProfileFetch) { + strictAssert(mergeResult.conversation, 'needsProfileFetch, but no convo'); + needProfileFetch.push(mergeResult.conversation); + } + log.info( `storageService.merge(${redactedID}): merged item type=${itemType} ` + `oldID=${oldID} ` + @@ -821,6 +837,8 @@ async function mergeRecord( isUnsupported, itemType, storageID, + updatedConversations, + needProfileFetch, }; } @@ -1110,7 +1128,7 @@ async function processRemoteRecords( ...(await pMap( prunedStorageItems, (item: MergeableItemType) => mergeRecord(storageVersion, item), - { concurrency: 5 } + { concurrency: 32 } )), // Merge Account records last since it contains the pinned conversations @@ -1124,6 +1142,29 @@ async function processRemoteRecords( `processed records=${mergedRecords.length}` ); + const updatedConversations = mergedRecords + .map(record => record.updatedConversations) + .flat() + .map(convo => convo.attributes); + await updateConversations(updatedConversations); + + log.info( + `storageService.process(${storageVersion}): ` + + `updated conversations=${updatedConversations.length}` + ); + + const needProfileFetch = mergedRecords + .map(record => record.needProfileFetch) + .flat(); + + log.info( + `storageService.process(${storageVersion}): ` + + `kicking off profile fetches=${needProfileFetch.length}` + ); + + // Intentionally not awaiting + pMap(needProfileFetch, convo => convo.getProfiles(), { concurrency: 3 }); + // Collect full map of previously and currently unknown records const unknownRecords: Map = new Map(); diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index 3f139ac9b23..35a0a68dbbf 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -6,7 +6,6 @@ import Long from 'long'; import { deriveMasterKeyFromGroupV1 } from '../Crypto'; import * as Bytes from '../Bytes'; -import dataInterface from '../sql/Client'; import { deriveGroupFields, waitThenMaybeUpdateGroup, @@ -41,8 +40,6 @@ import * as preferredReactionEmoji from '../reactions/preferredReactionEmoji'; import { SignalService as Proto } from '../protobuf'; import * as log from '../logging/log'; -const { updateConversation } = dataInterface; - type RecordClass = | Proto.IAccountRecord | Proto.IContactRecord @@ -53,6 +50,8 @@ export type MergeResultType = Readonly<{ hasConflict: boolean; shouldDrop?: boolean; conversation?: ConversationModel; + needsProfileFetch?: boolean; + updatedConversations?: ReadonlyArray; oldStorageID?: string; oldStorageVersion?: number; details: ReadonlyArray; @@ -613,20 +612,19 @@ export async function mergeGroupV1Record( details.push('marking v1 group for an update to v2'); } - updateConversation(conversation.attributes); - return { hasConflict: hasPendingChanges, conversation, oldStorageID, oldStorageVersion, details, + updatedConversations: [conversation], }; } -async function getGroupV2Conversation( +function getGroupV2Conversation( masterKeyBuffer: Uint8Array -): Promise { +): ConversationModel { const groupFields = deriveGroupFields(masterKeyBuffer); const groupId = Bytes.toBase64(groupFields.id); @@ -637,7 +635,7 @@ async function getGroupV2Conversation( // First we check for an existing GroupV2 group const groupV2 = window.ConversationController.get(groupId); if (groupV2) { - await groupV2.maybeRepairGroupV2({ + groupV2.maybeRepairGroupV2({ masterKey, secretParams, publicParams, @@ -681,7 +679,7 @@ export async function mergeGroupV2Record( } const masterKeyBuffer = groupV2Record.masterKey; - const conversation = await getGroupV2Conversation(masterKeyBuffer); + const conversation = getGroupV2Conversation(masterKeyBuffer); const oldStorageID = conversation.get('storageID'); const oldStorageVersion = conversation.get('storageVersion'); @@ -718,8 +716,6 @@ export async function mergeGroupV2Record( details = details.concat(extraDetails); - updateConversation(conversation.attributes); - const isGroupNewToUs = !isNumber(conversation.get('revision')); const isFirstSync = !window.storage.get('storageFetchComplete'); const dropInitialJoinMessage = isFirstSync; @@ -752,6 +748,7 @@ export async function mergeGroupV2Record( return { hasConflict, conversation, + updatedConversations: [conversation], oldStorageID, oldStorageVersion, details, @@ -802,10 +799,12 @@ export async function mergeContactRecord( 'private' ); + let needsProfileFetch = false; if (contactRecord.profileKey && contactRecord.profileKey.length > 0) { - await conversation.setProfileKey(Bytes.toBase64(contactRecord.profileKey), { - viaStorageServiceSync: true, - }); + needsProfileFetch = await conversation.setProfileKey( + Bytes.toBase64(contactRecord.profileKey), + { viaStorageServiceSync: true } + ); } const remoteName = dropNull(contactRecord.givenName); @@ -878,11 +877,11 @@ export async function mergeContactRecord( ); details = details.concat(extraDetails); - updateConversation(conversation.attributes); - return { hasConflict, conversation, + updatedConversations: [conversation], + needsProfileFetch, oldStorageID, oldStorageVersion, details, @@ -917,6 +916,8 @@ export async function mergeAccountRecord( displayBadgesOnProfile, } = accountRecord; + const updatedConversations = new Array(); + window.storage.put('read-receipt-setting', Boolean(readReceipts)); if (typeof sealedSenderIndicators === 'boolean') { @@ -1086,7 +1087,7 @@ export async function mergeAccountRecord( conversationsToUnpin.forEach(conversation => { conversation.set({ isPinned: false }); - updateConversation(conversation.attributes); + updatedConversations.push(conversation); }); remotelyPinnedConversations.forEach(conversation => { @@ -1101,7 +1102,7 @@ export async function mergeAccountRecord( ); conversation.addMessageHistoryDisclaimer(); } - updateConversation(conversation.attributes); + updatedConversations.push(conversation); }); window.storage.put('pinnedConversationIds', remotelyPinnedConversationIds); @@ -1138,8 +1139,12 @@ export async function mergeAccountRecord( storageVersion, }); + let needsProfileFetch = false; if (accountRecord.profileKey && accountRecord.profileKey.length > 0) { - await conversation.setProfileKey(Bytes.toBase64(accountRecord.profileKey)); + needsProfileFetch = await conversation.setProfileKey( + Bytes.toBase64(accountRecord.profileKey), + { viaStorageServiceSync: true } + ); } if (avatarUrl) { @@ -1153,13 +1158,15 @@ export async function mergeAccountRecord( conversation ); - updateConversation(conversation.attributes); + updatedConversations.push(conversation); details = details.concat(extraDetails); return { hasConflict, conversation, + updatedConversations, + needsProfileFetch, oldStorageID, oldStorageVersion, details, diff --git a/ts/test-mock/benchmarks/storage_sync_bench.ts b/ts/test-mock/benchmarks/storage_sync_bench.ts new file mode 100644 index 00000000000..8149ebc8bd7 --- /dev/null +++ b/ts/test-mock/benchmarks/storage_sync_bench.ts @@ -0,0 +1,67 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable no-await-in-loop, no-console */ + +import { StorageState } from '@signalapp/mock-server'; + +import { Bootstrap, saveLogs } from './fixtures'; + +const CONTACT_COUNT = 1000; + +(async () => { + const contactNames = new Array(); + for (let i = 0; i < CONTACT_COUNT; i += 1) { + contactNames.push(`Contact ${i}`); + } + + const bootstrap = new Bootstrap({ + benchmark: true, + }); + + await bootstrap.init(); + const { phone, server } = bootstrap; + + let state = StorageState.getEmpty(); + for (const [i, profileName] of contactNames.entries()) { + const contact = await server.createPrimaryDevice({ + profileName, + }); + + state = state.addContact(contact, { + // Make sure we fetch profile from the server + givenName: `Loading ${profileName}...`, + + identityKey: contact.publicKey.serialize(), + profileKey: contact.profileKey.serialize(), + }); + + if (i >= contactNames.length - 4) { + state = state.pin(contact); + } + } + + await phone.setStorageState(state); + + const start = Date.now(); + const app = await bootstrap.link(); + try { + const window = await app.getWindow(); + + const leftPane = window.locator('.left-pane-wrapper'); + + const item = leftPane.locator( + '_react=BaseConversationListItem' + + `[title = ${JSON.stringify(contactNames[contactNames.length - 1])}]` + ); + await item.waitFor(); + + const duration = Date.now() - start; + console.log(`Took: ${(duration / 1000).toFixed(2)} seconds`); + } catch (error) { + await saveLogs(bootstrap); + throw error; + } finally { + await app.close(); + await bootstrap.teardown(); + } +})(); diff --git a/ts/test-mock/bootstrap.ts b/ts/test-mock/bootstrap.ts index 9719e5ae4dc..87314d7bb44 100644 --- a/ts/test-mock/bootstrap.ts +++ b/ts/test-mock/bootstrap.ts @@ -59,6 +59,8 @@ export type BootstrapOptions = Readonly<{ linkedDevices?: number; contactCount?: number; + contactNames?: ReadonlyArray; + contactPreKeyCount?: number; }>; type BootstrapInternalOptions = Pick & @@ -66,6 +68,7 @@ type BootstrapInternalOptions = Pick & benchmark: boolean; linkedDevices: number; contactCount: number; + contactNames: ReadonlyArray; }>; // @@ -107,12 +110,13 @@ export class Bootstrap { this.options = { linkedDevices: 5, contactCount: MAX_CONTACTS, + contactNames: CONTACT_NAMES, benchmark: false, ...options, }; - assert(this.options.contactCount <= MAX_CONTACTS); + assert(this.options.contactCount <= this.options.contactNames.length); } public async init(): Promise { @@ -123,11 +127,16 @@ export class Bootstrap { const { port } = this.server.address(); debug('started server on port=%d', port); - const contactNames = CONTACT_NAMES.slice(0, this.options.contactCount); + const contactNames = this.options.contactNames.slice( + 0, + this.options.contactCount + ); this.privContacts = await Promise.all( contactNames.map(async profileName => { - const primary = await this.server.createPrimaryDevice({ profileName }); + const primary = await this.server.createPrimaryDevice({ + profileName, + }); for (let i = 0; i < this.options.linkedDevices; i += 1) { // eslint-disable-next-line no-await-in-loop diff --git a/yarn.lock b/yarn.lock index c81895501d3..dbbf39ffa18 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1351,10 +1351,10 @@ "@react-spring/shared" "~9.4.0" "@react-spring/types" "~9.4.0" -"@signalapp/mock-server@1.1.2": - version "1.1.2" - resolved "https://registry.yarnpkg.com/@signalapp/mock-server/-/mock-server-1.1.2.tgz#008c46cd6f29cb5b91a3414c9c74bc770021b853" - integrity sha512-rkxPzbbiD6HMeeb6fe7li5H7jwzJuWQPuA7tP74KYoRLf0LeFlbRdHSrqZ4GxpA63Zu0vzC3h2bC8E2F1JM4Eg== +"@signalapp/mock-server@1.2.0": + version "1.2.0" + resolved "https://registry.yarnpkg.com/@signalapp/mock-server/-/mock-server-1.2.0.tgz#1f17c39f113f2bc2d8ba51d2abd47607415df174" + integrity sha512-BbnYwSSPti6D2osZri8c8qMAP9Br4vLpc0Cbzet/BLEW5Jrex2g4ux1+sEQtTOoueIf3uieX7l/cWFgw2mJWDg== dependencies: "@signalapp/signal-client" "0.12.1" debug "^4.3.2"