Optimize initial storage service fetch
This commit is contained in:
parent
cc51cdccc7
commit
a72cf075ef
7 changed files with 172 additions and 36 deletions
|
@ -187,7 +187,7 @@
|
|||
"@babel/preset-typescript": "7.16.0",
|
||||
"@chanzuckerberg/axe-storybook-testing": "3.0.2",
|
||||
"@electron/fuses": "1.5.0",
|
||||
"@signalapp/mock-server": "1.1.2",
|
||||
"@signalapp/mock-server": "1.2.0",
|
||||
"@storybook/addon-actions": "5.1.11",
|
||||
"@storybook/addon-knobs": "5.1.11",
|
||||
"@storybook/addons": "5.1.11",
|
||||
|
|
|
@ -2542,7 +2542,11 @@ export class ConversationModel extends window.Backbone
|
|||
}
|
||||
|
||||
this.set({ verified });
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
|
||||
// We will update the conversation during storage service sync
|
||||
if (!options.viaStorageServiceSync) {
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
}
|
||||
|
||||
if (
|
||||
!options.viaStorageServiceSync &&
|
||||
|
@ -4655,7 +4659,7 @@ export class ConversationModel extends window.Backbone
|
|||
async setProfileKey(
|
||||
profileKey: string | undefined,
|
||||
{ viaStorageServiceSync = false } = {}
|
||||
): Promise<void> {
|
||||
): Promise<boolean> {
|
||||
// profileKey is a string so we can compare it directly
|
||||
if (this.get('profileKey') !== profileKey) {
|
||||
log.info(
|
||||
|
@ -4665,13 +4669,15 @@ export class ConversationModel extends window.Backbone
|
|||
about: undefined,
|
||||
aboutEmoji: undefined,
|
||||
profileAvatar: undefined,
|
||||
profileKey,
|
||||
profileKeyVersion: undefined,
|
||||
profileKeyCredential: null,
|
||||
accessKey: null,
|
||||
sealedSender: SEALED_SENDER.UNKNOWN,
|
||||
});
|
||||
|
||||
// Don't trigger immediate profile fetches when syncing to remote storage
|
||||
this.set({ profileKey }, { silent: viaStorageServiceSync });
|
||||
|
||||
// If our profile key was cleared above, we don't tell our linked devices about it.
|
||||
// We want linked devices to tell us what it should be, instead of telling them to
|
||||
// erase their local value.
|
||||
|
@ -4684,8 +4690,14 @@ export class ConversationModel extends window.Backbone
|
|||
this.deriveProfileKeyVersionIfNeeded(),
|
||||
]);
|
||||
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
// We will update the conversation during storage service sync
|
||||
if (!viaStorageServiceSync) {
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async deriveAccessKeyIfNeeded(): Promise<void> {
|
||||
|
@ -4989,8 +5001,8 @@ export class ConversationModel extends window.Backbone
|
|||
|
||||
if (!viaStorageServiceSync) {
|
||||
this.captureChange('mutedUntilTimestamp');
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
}
|
||||
window.Signal.Data.updateConversation(this.attributes);
|
||||
}
|
||||
|
||||
isMuted(): boolean {
|
||||
|
|
|
@ -50,8 +50,11 @@ import type {
|
|||
|
||||
type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier;
|
||||
|
||||
const { eraseStorageServiceStateFromConversations, updateConversation } =
|
||||
dataInterface;
|
||||
const {
|
||||
eraseStorageServiceStateFromConversations,
|
||||
updateConversation,
|
||||
updateConversations,
|
||||
} = dataInterface;
|
||||
|
||||
const uploadBucket: Array<number> = [];
|
||||
|
||||
|
@ -738,6 +741,8 @@ type MergedRecordType = UnknownRecord & {
|
|||
shouldDrop: boolean;
|
||||
hasError: boolean;
|
||||
isUnsupported: boolean;
|
||||
updatedConversations: ReadonlyArray<ConversationModel>;
|
||||
needProfileFetch: ReadonlyArray<ConversationModel>;
|
||||
};
|
||||
|
||||
async function mergeRecord(
|
||||
|
@ -751,6 +756,8 @@ async function mergeRecord(
|
|||
let mergeResult: MergeResultType = { hasConflict: false, details: [] };
|
||||
let isUnsupported = false;
|
||||
let hasError = false;
|
||||
let updatedConversations = new Array<ConversationModel>();
|
||||
const needProfileFetch = new Array<ConversationModel>();
|
||||
|
||||
try {
|
||||
if (itemType === ITEM_TYPE.UNKNOWN) {
|
||||
|
@ -797,6 +804,15 @@ async function mergeRecord(
|
|||
const oldID = mergeResult.oldStorageID
|
||||
? redactStorageID(mergeResult.oldStorageID, mergeResult.oldStorageVersion)
|
||||
: '?';
|
||||
updatedConversations = [
|
||||
...updatedConversations,
|
||||
...(mergeResult.updatedConversations ?? []),
|
||||
];
|
||||
if (mergeResult.needsProfileFetch) {
|
||||
strictAssert(mergeResult.conversation, 'needsProfileFetch, but no convo');
|
||||
needProfileFetch.push(mergeResult.conversation);
|
||||
}
|
||||
|
||||
log.info(
|
||||
`storageService.merge(${redactedID}): merged item type=${itemType} ` +
|
||||
`oldID=${oldID} ` +
|
||||
|
@ -821,6 +837,8 @@ async function mergeRecord(
|
|||
isUnsupported,
|
||||
itemType,
|
||||
storageID,
|
||||
updatedConversations,
|
||||
needProfileFetch,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1110,7 +1128,7 @@ async function processRemoteRecords(
|
|||
...(await pMap(
|
||||
prunedStorageItems,
|
||||
(item: MergeableItemType) => mergeRecord(storageVersion, item),
|
||||
{ concurrency: 5 }
|
||||
{ concurrency: 32 }
|
||||
)),
|
||||
|
||||
// Merge Account records last since it contains the pinned conversations
|
||||
|
@ -1124,6 +1142,29 @@ async function processRemoteRecords(
|
|||
`processed records=${mergedRecords.length}`
|
||||
);
|
||||
|
||||
const updatedConversations = mergedRecords
|
||||
.map(record => record.updatedConversations)
|
||||
.flat()
|
||||
.map(convo => convo.attributes);
|
||||
await updateConversations(updatedConversations);
|
||||
|
||||
log.info(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`updated conversations=${updatedConversations.length}`
|
||||
);
|
||||
|
||||
const needProfileFetch = mergedRecords
|
||||
.map(record => record.needProfileFetch)
|
||||
.flat();
|
||||
|
||||
log.info(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`kicking off profile fetches=${needProfileFetch.length}`
|
||||
);
|
||||
|
||||
// Intentionally not awaiting
|
||||
pMap(needProfileFetch, convo => convo.getProfiles(), { concurrency: 3 });
|
||||
|
||||
// Collect full map of previously and currently unknown records
|
||||
const unknownRecords: Map<string, UnknownRecord> = new Map();
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import Long from 'long';
|
|||
|
||||
import { deriveMasterKeyFromGroupV1 } from '../Crypto';
|
||||
import * as Bytes from '../Bytes';
|
||||
import dataInterface from '../sql/Client';
|
||||
import {
|
||||
deriveGroupFields,
|
||||
waitThenMaybeUpdateGroup,
|
||||
|
@ -41,8 +40,6 @@ import * as preferredReactionEmoji from '../reactions/preferredReactionEmoji';
|
|||
import { SignalService as Proto } from '../protobuf';
|
||||
import * as log from '../logging/log';
|
||||
|
||||
const { updateConversation } = dataInterface;
|
||||
|
||||
type RecordClass =
|
||||
| Proto.IAccountRecord
|
||||
| Proto.IContactRecord
|
||||
|
@ -53,6 +50,8 @@ export type MergeResultType = Readonly<{
|
|||
hasConflict: boolean;
|
||||
shouldDrop?: boolean;
|
||||
conversation?: ConversationModel;
|
||||
needsProfileFetch?: boolean;
|
||||
updatedConversations?: ReadonlyArray<ConversationModel>;
|
||||
oldStorageID?: string;
|
||||
oldStorageVersion?: number;
|
||||
details: ReadonlyArray<string>;
|
||||
|
@ -613,20 +612,19 @@ export async function mergeGroupV1Record(
|
|||
details.push('marking v1 group for an update to v2');
|
||||
}
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
return {
|
||||
hasConflict: hasPendingChanges,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
updatedConversations: [conversation],
|
||||
};
|
||||
}
|
||||
|
||||
async function getGroupV2Conversation(
|
||||
function getGroupV2Conversation(
|
||||
masterKeyBuffer: Uint8Array
|
||||
): Promise<ConversationModel> {
|
||||
): ConversationModel {
|
||||
const groupFields = deriveGroupFields(masterKeyBuffer);
|
||||
|
||||
const groupId = Bytes.toBase64(groupFields.id);
|
||||
|
@ -637,7 +635,7 @@ async function getGroupV2Conversation(
|
|||
// First we check for an existing GroupV2 group
|
||||
const groupV2 = window.ConversationController.get(groupId);
|
||||
if (groupV2) {
|
||||
await groupV2.maybeRepairGroupV2({
|
||||
groupV2.maybeRepairGroupV2({
|
||||
masterKey,
|
||||
secretParams,
|
||||
publicParams,
|
||||
|
@ -681,7 +679,7 @@ export async function mergeGroupV2Record(
|
|||
}
|
||||
|
||||
const masterKeyBuffer = groupV2Record.masterKey;
|
||||
const conversation = await getGroupV2Conversation(masterKeyBuffer);
|
||||
const conversation = getGroupV2Conversation(masterKeyBuffer);
|
||||
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
@ -718,8 +716,6 @@ export async function mergeGroupV2Record(
|
|||
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
const isGroupNewToUs = !isNumber(conversation.get('revision'));
|
||||
const isFirstSync = !window.storage.get('storageFetchComplete');
|
||||
const dropInitialJoinMessage = isFirstSync;
|
||||
|
@ -752,6 +748,7 @@ export async function mergeGroupV2Record(
|
|||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
updatedConversations: [conversation],
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
|
@ -802,10 +799,12 @@ export async function mergeContactRecord(
|
|||
'private'
|
||||
);
|
||||
|
||||
let needsProfileFetch = false;
|
||||
if (contactRecord.profileKey && contactRecord.profileKey.length > 0) {
|
||||
await conversation.setProfileKey(Bytes.toBase64(contactRecord.profileKey), {
|
||||
viaStorageServiceSync: true,
|
||||
});
|
||||
needsProfileFetch = await conversation.setProfileKey(
|
||||
Bytes.toBase64(contactRecord.profileKey),
|
||||
{ viaStorageServiceSync: true }
|
||||
);
|
||||
}
|
||||
|
||||
const remoteName = dropNull(contactRecord.givenName);
|
||||
|
@ -878,11 +877,11 @@ export async function mergeContactRecord(
|
|||
);
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
updatedConversations: [conversation],
|
||||
needsProfileFetch,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
|
@ -917,6 +916,8 @@ export async function mergeAccountRecord(
|
|||
displayBadgesOnProfile,
|
||||
} = accountRecord;
|
||||
|
||||
const updatedConversations = new Array<ConversationModel>();
|
||||
|
||||
window.storage.put('read-receipt-setting', Boolean(readReceipts));
|
||||
|
||||
if (typeof sealedSenderIndicators === 'boolean') {
|
||||
|
@ -1086,7 +1087,7 @@ export async function mergeAccountRecord(
|
|||
|
||||
conversationsToUnpin.forEach(conversation => {
|
||||
conversation.set({ isPinned: false });
|
||||
updateConversation(conversation.attributes);
|
||||
updatedConversations.push(conversation);
|
||||
});
|
||||
|
||||
remotelyPinnedConversations.forEach(conversation => {
|
||||
|
@ -1101,7 +1102,7 @@ export async function mergeAccountRecord(
|
|||
);
|
||||
conversation.addMessageHistoryDisclaimer();
|
||||
}
|
||||
updateConversation(conversation.attributes);
|
||||
updatedConversations.push(conversation);
|
||||
});
|
||||
|
||||
window.storage.put('pinnedConversationIds', remotelyPinnedConversationIds);
|
||||
|
@ -1138,8 +1139,12 @@ export async function mergeAccountRecord(
|
|||
storageVersion,
|
||||
});
|
||||
|
||||
let needsProfileFetch = false;
|
||||
if (accountRecord.profileKey && accountRecord.profileKey.length > 0) {
|
||||
await conversation.setProfileKey(Bytes.toBase64(accountRecord.profileKey));
|
||||
needsProfileFetch = await conversation.setProfileKey(
|
||||
Bytes.toBase64(accountRecord.profileKey),
|
||||
{ viaStorageServiceSync: true }
|
||||
);
|
||||
}
|
||||
|
||||
if (avatarUrl) {
|
||||
|
@ -1153,13 +1158,15 @@ export async function mergeAccountRecord(
|
|||
conversation
|
||||
);
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
updatedConversations.push(conversation);
|
||||
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
updatedConversations,
|
||||
needsProfileFetch,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
|
|
67
ts/test-mock/benchmarks/storage_sync_bench.ts
Normal file
67
ts/test-mock/benchmarks/storage_sync_bench.ts
Normal file
|
@ -0,0 +1,67 @@
|
|||
// Copyright 2022 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
/* eslint-disable no-await-in-loop, no-console */
|
||||
|
||||
import { StorageState } from '@signalapp/mock-server';
|
||||
|
||||
import { Bootstrap, saveLogs } from './fixtures';
|
||||
|
||||
const CONTACT_COUNT = 1000;
|
||||
|
||||
(async () => {
|
||||
const contactNames = new Array<string>();
|
||||
for (let i = 0; i < CONTACT_COUNT; i += 1) {
|
||||
contactNames.push(`Contact ${i}`);
|
||||
}
|
||||
|
||||
const bootstrap = new Bootstrap({
|
||||
benchmark: true,
|
||||
});
|
||||
|
||||
await bootstrap.init();
|
||||
const { phone, server } = bootstrap;
|
||||
|
||||
let state = StorageState.getEmpty();
|
||||
for (const [i, profileName] of contactNames.entries()) {
|
||||
const contact = await server.createPrimaryDevice({
|
||||
profileName,
|
||||
});
|
||||
|
||||
state = state.addContact(contact, {
|
||||
// Make sure we fetch profile from the server
|
||||
givenName: `Loading ${profileName}...`,
|
||||
|
||||
identityKey: contact.publicKey.serialize(),
|
||||
profileKey: contact.profileKey.serialize(),
|
||||
});
|
||||
|
||||
if (i >= contactNames.length - 4) {
|
||||
state = state.pin(contact);
|
||||
}
|
||||
}
|
||||
|
||||
await phone.setStorageState(state);
|
||||
|
||||
const start = Date.now();
|
||||
const app = await bootstrap.link();
|
||||
try {
|
||||
const window = await app.getWindow();
|
||||
|
||||
const leftPane = window.locator('.left-pane-wrapper');
|
||||
|
||||
const item = leftPane.locator(
|
||||
'_react=BaseConversationListItem' +
|
||||
`[title = ${JSON.stringify(contactNames[contactNames.length - 1])}]`
|
||||
);
|
||||
await item.waitFor();
|
||||
|
||||
const duration = Date.now() - start;
|
||||
console.log(`Took: ${(duration / 1000).toFixed(2)} seconds`);
|
||||
} catch (error) {
|
||||
await saveLogs(bootstrap);
|
||||
throw error;
|
||||
} finally {
|
||||
await app.close();
|
||||
await bootstrap.teardown();
|
||||
}
|
||||
})();
|
|
@ -59,6 +59,8 @@ export type BootstrapOptions = Readonly<{
|
|||
|
||||
linkedDevices?: number;
|
||||
contactCount?: number;
|
||||
contactNames?: ReadonlyArray<string>;
|
||||
contactPreKeyCount?: number;
|
||||
}>;
|
||||
|
||||
type BootstrapInternalOptions = Pick<BootstrapOptions, 'extraConfig'> &
|
||||
|
@ -66,6 +68,7 @@ type BootstrapInternalOptions = Pick<BootstrapOptions, 'extraConfig'> &
|
|||
benchmark: boolean;
|
||||
linkedDevices: number;
|
||||
contactCount: number;
|
||||
contactNames: ReadonlyArray<string>;
|
||||
}>;
|
||||
|
||||
//
|
||||
|
@ -107,12 +110,13 @@ export class Bootstrap {
|
|||
this.options = {
|
||||
linkedDevices: 5,
|
||||
contactCount: MAX_CONTACTS,
|
||||
contactNames: CONTACT_NAMES,
|
||||
benchmark: false,
|
||||
|
||||
...options,
|
||||
};
|
||||
|
||||
assert(this.options.contactCount <= MAX_CONTACTS);
|
||||
assert(this.options.contactCount <= this.options.contactNames.length);
|
||||
}
|
||||
|
||||
public async init(): Promise<void> {
|
||||
|
@ -123,11 +127,16 @@ export class Bootstrap {
|
|||
const { port } = this.server.address();
|
||||
debug('started server on port=%d', port);
|
||||
|
||||
const contactNames = CONTACT_NAMES.slice(0, this.options.contactCount);
|
||||
const contactNames = this.options.contactNames.slice(
|
||||
0,
|
||||
this.options.contactCount
|
||||
);
|
||||
|
||||
this.privContacts = await Promise.all(
|
||||
contactNames.map(async profileName => {
|
||||
const primary = await this.server.createPrimaryDevice({ profileName });
|
||||
const primary = await this.server.createPrimaryDevice({
|
||||
profileName,
|
||||
});
|
||||
|
||||
for (let i = 0; i < this.options.linkedDevices; i += 1) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
|
|
|
@ -1351,10 +1351,10 @@
|
|||
"@react-spring/shared" "~9.4.0"
|
||||
"@react-spring/types" "~9.4.0"
|
||||
|
||||
"@signalapp/mock-server@1.1.2":
|
||||
version "1.1.2"
|
||||
resolved "https://registry.yarnpkg.com/@signalapp/mock-server/-/mock-server-1.1.2.tgz#008c46cd6f29cb5b91a3414c9c74bc770021b853"
|
||||
integrity sha512-rkxPzbbiD6HMeeb6fe7li5H7jwzJuWQPuA7tP74KYoRLf0LeFlbRdHSrqZ4GxpA63Zu0vzC3h2bC8E2F1JM4Eg==
|
||||
"@signalapp/mock-server@1.2.0":
|
||||
version "1.2.0"
|
||||
resolved "https://registry.yarnpkg.com/@signalapp/mock-server/-/mock-server-1.2.0.tgz#1f17c39f113f2bc2d8ba51d2abd47607415df174"
|
||||
integrity sha512-BbnYwSSPti6D2osZri8c8qMAP9Br4vLpc0Cbzet/BLEW5Jrex2g4ux1+sEQtTOoueIf3uieX7l/cWFgw2mJWDg==
|
||||
dependencies:
|
||||
"@signalapp/signal-client" "0.12.1"
|
||||
debug "^4.3.2"
|
||||
|
|
Loading…
Reference in a new issue