From f7f84c463f43747a5669bb8362d3a6e08a98418b Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 6 Jan 2025 07:06:31 -0800 Subject: [PATCH] Upgrade message batch on import --- ts/services/backups/import.ts | 52 ++++++++- ts/test-electron/backup/attachments_test.ts | 10 ++ ts/test-electron/backup/helpers.ts | 14 ++- ts/test-electron/backup/non_bubble_test.ts | 3 + ts/test-mock/benchmarks/backup_bench.ts | 3 + ts/test-node/types/EmbeddedContact_test.ts | 120 ++++++++++++-------- ts/types/EmbeddedContact.ts | 6 +- ts/types/Message2.ts | 112 +++++++----------- 8 files changed, 192 insertions(+), 128 deletions(-) diff --git a/ts/services/backups/import.ts b/ts/services/backups/import.ts index 82f218195..4f2b49838 100644 --- a/ts/services/backups/import.ts +++ b/ts/services/backups/import.ts @@ -217,7 +217,11 @@ export class BackupImportStream extends Writable { ConversationAttributesType >(); private readonly identityKeys = new Map(); - private readonly saveMessageBatch = new Set(); + private readonly saveMessageBatch = new Map< + MessageAttributesType, + Promise + >(); + private flushMessagesPromise: Promise | undefined; private readonly stickerPacks = new Array(); private ourConversation?: ConversationAttributesType; private pinnedConversations = new Array<[number, string]>(); @@ -310,6 +314,10 @@ export class BackupImportStream extends Writable { try { // Finish saving remaining conversations/messages // Save messages first since they depend on conversations in memory + while (this.flushMessagesPromise) { + // eslint-disable-next-line no-await-in-loop + await this.flushMessagesPromise; + } await this.flushMessages(); await this.flushConversations(); log.info(`${this.logId}: flushed messages and conversations`); @@ -341,7 +349,16 @@ export class BackupImportStream extends Writable { allConversations.filter(convo => { return convo.get('active_at') || convo.get('isPinned'); }), - convo => convo.updateLastMessage(), + async convo => { + try { + await convo.updateLastMessage(); + } catch (error) { + log.error( + `${this.logId}: failed to update conversation's last message` + + `${Errors.toLogFormat(error)}` + ); + } + }, { concurrency: MAX_CONCURRENCY } ); @@ -493,9 +510,30 @@ export class BackupImportStream extends Writable { } private async saveMessage(attributes: MessageAttributesType): Promise { - this.saveMessageBatch.add(attributes); + this.saveMessageBatch.set(attributes, this.safeUpgradeMessage(attributes)); if (this.saveMessageBatch.size >= SAVE_MESSAGE_BATCH_SIZE) { - return this.flushMessages(); + // Wait for previous flush to finish before scheduling a new one. + // (Unlikely to happen, but needed to make sure we don't save too many + // messages at once) + while (this.flushMessagesPromise) { + // eslint-disable-next-line no-await-in-loop + await this.flushMessagesPromise; + } + this.flushMessagesPromise = this.flushMessages(); + } + } + + private async safeUpgradeMessage( + attributes: MessageAttributesType + ): Promise { + try { + return await window.Signal.Migrations.upgradeMessageSchema(attributes); + } catch (error) { + log.error( + `${this.logId}: failed to migrate a message ${attributes.sent_at}, ` + + `${Errors.toLogFormat(error)}` + ); + return attributes; } } @@ -528,9 +566,11 @@ export class BackupImportStream extends Writable { const ourAci = this.ourConversation?.serviceId; strictAssert(isAciString(ourAci), 'Must have our aci for messages'); - const batch = Array.from(this.saveMessageBatch); + const batchPromises = Array.from(this.saveMessageBatch.values()); this.saveMessageBatch.clear(); + const batch = await Promise.all(batchPromises); + // There are a few indexes that start with message id, and many more that // start with conversationId. Sort messages by both to make sure that we // are not doing random insertions into the database file. @@ -591,6 +631,8 @@ export class BackupImportStream extends Writable { } await Promise.allSettled(attachmentDownloadJobPromises); await AttachmentDownloadManager.saveBatchedJobs(); + + this.flushMessagesPromise = undefined; } private async saveCallHistory( diff --git a/ts/test-electron/backup/attachments_test.ts b/ts/test-electron/backup/attachments_test.ts index 61005a3a4..ae5cffb3b 100644 --- a/ts/test-electron/backup/attachments_test.ts +++ b/ts/test-electron/backup/attachments_test.ts @@ -166,6 +166,8 @@ describe('backup/attachments', () => { // path & iv will not be roundtripped [ composeMessage(1, { + hasAttachments: 1, + hasVisualMediaAttachments: 1, attachments: [ omit(longMessageAttachment, NON_ROUNDTRIPPED_FIELDS), omit(normalAttachment, NON_ROUNDTRIPPED_FIELDS), @@ -282,6 +284,8 @@ describe('backup/attachments', () => { // path & iv will not be roundtripped [ composeMessage(1, { + hasAttachments: 1, + hasVisualMediaAttachments: 1, attachments: [ omit(attachment1, NON_ROUNDTRIPPED_FIELDS), omit(attachment2, NON_ROUNDTRIPPED_FIELDS), @@ -303,6 +307,9 @@ describe('backup/attachments', () => { ], [ composeMessage(1, { + hasAttachments: 1, + hasVisualMediaAttachments: 1, + // path, iv, and uploadTimestamp will not be roundtripped, // but there will be a backupLocator attachments: [ @@ -334,6 +341,7 @@ describe('backup/attachments', () => { ], [ composeMessage(1, { + hasAttachments: 1, attachments: [ { ...omit(attachment, NON_ROUNDTRIPPED_BACKUP_LOCATOR_FIELDS), @@ -584,6 +592,8 @@ describe('backup/attachments', () => { [ { ...existingMessage, + hasAttachments: 1, + hasVisualMediaAttachments: 1, attachments: [ { ...omit( diff --git a/ts/test-electron/backup/helpers.ts b/ts/test-electron/backup/helpers.ts index 17e3e8fd5..9f547be6e 100644 --- a/ts/test-electron/backup/helpers.ts +++ b/ts/test-electron/backup/helpers.ts @@ -68,9 +68,12 @@ function sortAndNormalize( reactions, sendStateByConversationId, verifiedChanged, - attachments, + + // Set to an empty array after message migration + attachments = [], + contact = [], + preview, - contact, quote, sticker, @@ -106,6 +109,9 @@ function sortAndNormalize( // Get rid of unserializable `undefined` values. return JSON.parse( JSON.stringify({ + // Migration defaults + hasAttachments: 0, + ...rest, conversationId: mapConvoId(conversationId), reactions: reactions?.map(({ fromId, ...restOfReaction }) => { @@ -132,14 +138,14 @@ function sortAndNormalize( }; }), - attachments: attachments?.map(attachment => + attachments: attachments.map(attachment => omit(attachment, 'downloadPath') ), preview: preview?.map(previewItem => ({ ...previewItem, image: omit(previewItem.image, 'downloadPath'), })), - contact: contact?.map(contactItem => ({ + contact: contact.map(contactItem => ({ ...contactItem, avatar: { ...contactItem.avatar, diff --git a/ts/test-electron/backup/non_bubble_test.ts b/ts/test-electron/backup/non_bubble_test.ts index 182c65d16..25315568d 100644 --- a/ts/test-electron/backup/non_bubble_test.ts +++ b/ts/test-electron/backup/non_bubble_test.ts @@ -74,6 +74,9 @@ describe('backup/non-bubble messages', () => { readStatus: ReadStatus.Read, seenStatus: SeenStatus.Seen, flags: Proto.DataMessage.Flags.END_SESSION, + attachments: [], + contact: [], + hasAttachments: 0, }, ]); }); diff --git a/ts/test-mock/benchmarks/backup_bench.ts b/ts/test-mock/benchmarks/backup_bench.ts index 50fe2ac25..f58c0892f 100644 --- a/ts/test-mock/benchmarks/backup_bench.ts +++ b/ts/test-mock/benchmarks/backup_bench.ts @@ -22,7 +22,9 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise => { const app = await bootstrap.link(); const { duration: importDuration } = await app.waitForBackupImportComplete(); + const migrateStart = Date.now(); await app.migrateAllMessages(); + const migrateEnd = Date.now(); const exportStart = Date.now(); await app.uploadBackup(); @@ -31,5 +33,6 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise => { console.log('run=%d info=%j', 0, { importDuration, exportDuration: exportEnd - exportStart, + migrationDuration: migrateEnd - migrateStart, }); }); diff --git a/ts/test-node/types/EmbeddedContact_test.ts b/ts/test-node/types/EmbeddedContact_test.ts index 5ba81be21..471336069 100644 --- a/ts/test-node/types/EmbeddedContact_test.ts +++ b/ts/test-node/types/EmbeddedContact_test.ts @@ -260,12 +260,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - message, - logger, - getRegionCode: () => '1', - writeNewAttachmentData, - }); + const result = await upgradeVersion( + message.contact[0], + { + logger, + getRegionCode: () => '1', + writeNewAttachmentData, + }, + message + ); assert.deepEqual(result, message.contact[0]); }); @@ -302,12 +305,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - message, - getRegionCode: () => 'US', - logger, - writeNewAttachmentData, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => 'US', + logger, + writeNewAttachmentData, + }, + message + ); assert.deepEqual(result, expected); }); @@ -347,12 +353,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, expected); }); @@ -432,12 +441,15 @@ describe('Contact', () => { }, }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, expected); }); @@ -479,12 +491,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, expected); }); @@ -526,12 +541,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, expected); }); @@ -569,12 +587,15 @@ describe('Contact', () => { nickname: 'Someone Somewhere', }, }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, expected); }); @@ -598,12 +619,15 @@ describe('Contact', () => { }, ], }; - const result = await upgradeVersion(message.contact[0], { - getRegionCode: () => '1', - writeNewAttachmentData, - message, - logger, - }); + const result = await upgradeVersion( + message.contact[0], + { + getRegionCode: () => '1', + writeNewAttachmentData, + logger, + }, + message + ); assert.deepEqual(result, message.contact[0]); }); }); diff --git a/ts/types/EmbeddedContact.ts b/ts/types/EmbeddedContact.ts index 0ab1699d4..04ec48b2c 100644 --- a/ts/types/EmbeddedContact.ts +++ b/ts/types/EmbeddedContact.ts @@ -223,15 +223,15 @@ export function parseAndWriteAvatar( return async ( contact: EmbeddedContactType, context: { - message: ReadonlyMessageAttributesType; getRegionCode: () => string | undefined; logger: LoggerType; writeNewAttachmentData: ( data: Uint8Array ) => Promise; - } + }, + message: ReadonlyMessageAttributesType ): Promise => { - const { message, getRegionCode, logger } = context; + const { getRegionCode, logger } = context; const { avatar } = contact; const contactWithUpdatedAvatar = diff --git a/ts/types/Message2.ts b/ts/types/Message2.ts index 19d9a327d..c7a38e689 100644 --- a/ts/types/Message2.ts +++ b/ts/types/Message2.ts @@ -1,7 +1,7 @@ // Copyright 2018 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { isFunction, isObject } from 'lodash'; +import { isFunction, isObject, identity } from 'lodash'; import type { ReadonlyDeep } from 'type-fest'; import * as Contact from './EmbeddedContact'; @@ -95,10 +95,6 @@ export type ContextType = { deleteOnDisk: (path: string) => Promise; }; -export type ContextWithMessageType = ContextType & { - message: MessageAttributesType; -}; - // Schema version history // // Version 0 @@ -278,14 +274,20 @@ export type UpgradeAttachmentType = ( message: MessageAttributesType ) => Promise; +// As regrettable as it is we have to fight back against esbuild's `__name` +// wrapper for functions that are created at high rate, because `__name` affects +// runtime performance. +const esbuildAnonymize = identity; + export const _mapAttachments = (upgradeAttachment: UpgradeAttachmentType) => async ( message: MessageAttributesType, context: ContextType ): Promise => { - const upgradeWithContext = (attachment: AttachmentType) => - upgradeAttachment(attachment, context, message); + const upgradeWithContext = esbuildAnonymize((attachment: AttachmentType) => + upgradeAttachment(attachment, context, message) + ); const attachments = await Promise.all( (message.attachments || []).map(upgradeWithContext) ); @@ -345,7 +347,8 @@ export const _mapAllAttachments = export type UpgradeContactType = ( contact: EmbeddedContactType, - contextWithMessage: ContextWithMessageType + context: ContextType, + message: MessageAttributesType ) => Promise; export const _mapContact = (upgradeContact: UpgradeContactType) => @@ -353,9 +356,10 @@ export const _mapContact = message: MessageAttributesType, context: ContextType ): Promise => { - const contextWithMessage = { ...context, message }; - const upgradeWithContext = (contact: EmbeddedContactType) => - upgradeContact(contact, contextWithMessage); + const upgradeWithContext = esbuildAnonymize( + (contact: EmbeddedContactType) => + upgradeContact(contact, context, message) + ); const contact = await Promise.all( (message.contact || []).map(upgradeWithContext) ); @@ -378,21 +382,23 @@ export const _mapQuotedAttachments = throw new Error('_mapQuotedAttachments: context must have logger object'); } - const upgradeWithContext = async ( - attachment: QuotedAttachmentType - ): Promise => { - const { thumbnail } = attachment; - if (!thumbnail) { - return attachment; - } + const upgradeWithContext = esbuildAnonymize( + async ( + attachment: QuotedAttachmentType + ): Promise => { + const { thumbnail } = attachment; + if (!thumbnail) { + return attachment; + } - const upgradedThumbnail = await upgradeAttachment( - thumbnail as AttachmentType, - context, - message - ); - return { ...attachment, thumbnail: upgradedThumbnail }; - }; + const upgradedThumbnail = await upgradeAttachment( + thumbnail as AttachmentType, + context, + message + ); + return { ...attachment, thumbnail: upgradedThumbnail }; + } + ); const quotedAttachments = (message.quote && message.quote.attachments) || []; @@ -421,15 +427,17 @@ export const _mapPreviewAttachments = ); } - const upgradeWithContext = async (preview: LinkPreviewType) => { - const { image } = preview; - if (!image) { - return preview; - } + const upgradeWithContext = esbuildAnonymize( + async (preview: LinkPreviewType) => { + const { image } = preview; + if (!image) { + return preview; + } - const upgradedImage = await upgradeAttachment(image, context, message); - return { ...preview, image: upgradedImage }; - }; + const upgradedImage = await upgradeAttachment(image, context, message); + return { ...preview, image: upgradedImage }; + } + ); const preview = await Promise.all( (message.preview || []).map(upgradeWithContext) @@ -462,9 +470,11 @@ const toVersion4 = _withSchemaVersion({ schemaVersion: 4, upgrade: _mapQuotedAttachments(migrateDataToFileSystem), }); +// NOOP: Used to be initializeAttachmentMetadata, but it happens in version 7 +// now. const toVersion5 = _withSchemaVersion({ schemaVersion: 5, - upgrade: initializeAttachmentMetadata, + upgrade: noopUpgrade, }); const toVersion6 = _withSchemaVersion({ schemaVersion: 6, @@ -723,40 +733,6 @@ export const upgradeSchema = async ( } = { versions: VERSIONS } ): Promise => { const { versions } = upgradeOptions; - if (!isFunction(readAttachmentData)) { - throw new TypeError('context.readAttachmentData is required'); - } - if (!isFunction(writeNewAttachmentData)) { - throw new TypeError('context.writeNewAttachmentData is required'); - } - if (!isFunction(getRegionCode)) { - throw new TypeError('context.getRegionCode is required'); - } - if (!isFunction(makeObjectUrl)) { - throw new TypeError('context.makeObjectUrl is required'); - } - if (!isFunction(revokeObjectUrl)) { - throw new TypeError('context.revokeObjectUrl is required'); - } - if (!isFunction(getImageDimensions)) { - throw new TypeError('context.getImageDimensions is required'); - } - if (!isFunction(makeImageThumbnail)) { - throw new TypeError('context.makeImageThumbnail is required'); - } - if (!isFunction(makeVideoScreenshot)) { - throw new TypeError('context.makeVideoScreenshot is required'); - } - if (!isObject(logger)) { - throw new TypeError('context.logger is required'); - } - if (!isFunction(writeNewStickerData)) { - throw new TypeError('context.writeNewStickerData is required'); - } - if (!isFunction(deleteOnDisk)) { - throw new TypeError('context.deleteOnDisk is required'); - } - let message = rawMessage; const startingVersion = message.schemaVersion ?? 0; for (let index = 0, max = versions.length; index < max; index += 1) {