Upgrade message batch on import

This commit is contained in:
Fedor Indutny 2025-01-06 07:06:31 -08:00 committed by GitHub
parent 9b84402fb0
commit f7f84c463f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 192 additions and 128 deletions

View file

@ -217,7 +217,11 @@ export class BackupImportStream extends Writable {
ConversationAttributesType
>();
private readonly identityKeys = new Map<ServiceIdString, IdentityKeyType>();
private readonly saveMessageBatch = new Set<MessageAttributesType>();
private readonly saveMessageBatch = new Map<
MessageAttributesType,
Promise<MessageAttributesType>
>();
private flushMessagesPromise: Promise<void> | undefined;
private readonly stickerPacks = new Array<StickerPackPointerType>();
private ourConversation?: ConversationAttributesType;
private pinnedConversations = new Array<[number, string]>();
@ -310,6 +314,10 @@ export class BackupImportStream extends Writable {
try {
// Finish saving remaining conversations/messages
// Save messages first since they depend on conversations in memory
while (this.flushMessagesPromise) {
// eslint-disable-next-line no-await-in-loop
await this.flushMessagesPromise;
}
await this.flushMessages();
await this.flushConversations();
log.info(`${this.logId}: flushed messages and conversations`);
@ -341,7 +349,16 @@ export class BackupImportStream extends Writable {
allConversations.filter(convo => {
return convo.get('active_at') || convo.get('isPinned');
}),
convo => convo.updateLastMessage(),
async convo => {
try {
await convo.updateLastMessage();
} catch (error) {
log.error(
`${this.logId}: failed to update conversation's last message` +
`${Errors.toLogFormat(error)}`
);
}
},
{ concurrency: MAX_CONCURRENCY }
);
@ -493,9 +510,30 @@ export class BackupImportStream extends Writable {
}
private async saveMessage(attributes: MessageAttributesType): Promise<void> {
this.saveMessageBatch.add(attributes);
this.saveMessageBatch.set(attributes, this.safeUpgradeMessage(attributes));
if (this.saveMessageBatch.size >= SAVE_MESSAGE_BATCH_SIZE) {
return this.flushMessages();
// Wait for previous flush to finish before scheduling a new one.
// (Unlikely to happen, but needed to make sure we don't save too many
// messages at once)
while (this.flushMessagesPromise) {
// eslint-disable-next-line no-await-in-loop
await this.flushMessagesPromise;
}
this.flushMessagesPromise = this.flushMessages();
}
}
private async safeUpgradeMessage(
attributes: MessageAttributesType
): Promise<MessageAttributesType> {
try {
return await window.Signal.Migrations.upgradeMessageSchema(attributes);
} catch (error) {
log.error(
`${this.logId}: failed to migrate a message ${attributes.sent_at}, ` +
`${Errors.toLogFormat(error)}`
);
return attributes;
}
}
@ -528,9 +566,11 @@ export class BackupImportStream extends Writable {
const ourAci = this.ourConversation?.serviceId;
strictAssert(isAciString(ourAci), 'Must have our aci for messages');
const batch = Array.from(this.saveMessageBatch);
const batchPromises = Array.from(this.saveMessageBatch.values());
this.saveMessageBatch.clear();
const batch = await Promise.all(batchPromises);
// There are a few indexes that start with message id, and many more that
// start with conversationId. Sort messages by both to make sure that we
// are not doing random insertions into the database file.
@ -591,6 +631,8 @@ export class BackupImportStream extends Writable {
}
await Promise.allSettled(attachmentDownloadJobPromises);
await AttachmentDownloadManager.saveBatchedJobs();
this.flushMessagesPromise = undefined;
}
private async saveCallHistory(

View file

@ -166,6 +166,8 @@ describe('backup/attachments', () => {
// path & iv will not be roundtripped
[
composeMessage(1, {
hasAttachments: 1,
hasVisualMediaAttachments: 1,
attachments: [
omit(longMessageAttachment, NON_ROUNDTRIPPED_FIELDS),
omit(normalAttachment, NON_ROUNDTRIPPED_FIELDS),
@ -282,6 +284,8 @@ describe('backup/attachments', () => {
// path & iv will not be roundtripped
[
composeMessage(1, {
hasAttachments: 1,
hasVisualMediaAttachments: 1,
attachments: [
omit(attachment1, NON_ROUNDTRIPPED_FIELDS),
omit(attachment2, NON_ROUNDTRIPPED_FIELDS),
@ -303,6 +307,9 @@ describe('backup/attachments', () => {
],
[
composeMessage(1, {
hasAttachments: 1,
hasVisualMediaAttachments: 1,
// path, iv, and uploadTimestamp will not be roundtripped,
// but there will be a backupLocator
attachments: [
@ -334,6 +341,7 @@ describe('backup/attachments', () => {
],
[
composeMessage(1, {
hasAttachments: 1,
attachments: [
{
...omit(attachment, NON_ROUNDTRIPPED_BACKUP_LOCATOR_FIELDS),
@ -584,6 +592,8 @@ describe('backup/attachments', () => {
[
{
...existingMessage,
hasAttachments: 1,
hasVisualMediaAttachments: 1,
attachments: [
{
...omit(

View file

@ -68,9 +68,12 @@ function sortAndNormalize(
reactions,
sendStateByConversationId,
verifiedChanged,
attachments,
// Set to an empty array after message migration
attachments = [],
contact = [],
preview,
contact,
quote,
sticker,
@ -106,6 +109,9 @@ function sortAndNormalize(
// Get rid of unserializable `undefined` values.
return JSON.parse(
JSON.stringify({
// Migration defaults
hasAttachments: 0,
...rest,
conversationId: mapConvoId(conversationId),
reactions: reactions?.map(({ fromId, ...restOfReaction }) => {
@ -132,14 +138,14 @@ function sortAndNormalize(
};
}),
attachments: attachments?.map(attachment =>
attachments: attachments.map(attachment =>
omit(attachment, 'downloadPath')
),
preview: preview?.map(previewItem => ({
...previewItem,
image: omit(previewItem.image, 'downloadPath'),
})),
contact: contact?.map(contactItem => ({
contact: contact.map(contactItem => ({
...contactItem,
avatar: {
...contactItem.avatar,

View file

@ -74,6 +74,9 @@ describe('backup/non-bubble messages', () => {
readStatus: ReadStatus.Read,
seenStatus: SeenStatus.Seen,
flags: Proto.DataMessage.Flags.END_SESSION,
attachments: [],
contact: [],
hasAttachments: 0,
},
]);
});

View file

@ -22,7 +22,9 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise<void> => {
const app = await bootstrap.link();
const { duration: importDuration } = await app.waitForBackupImportComplete();
const migrateStart = Date.now();
await app.migrateAllMessages();
const migrateEnd = Date.now();
const exportStart = Date.now();
await app.uploadBackup();
@ -31,5 +33,6 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise<void> => {
console.log('run=%d info=%j', 0, {
importDuration,
exportDuration: exportEnd - exportStart,
migrationDuration: migrateEnd - migrateStart,
});
});

View file

@ -260,12 +260,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
message,
logger,
getRegionCode: () => '1',
writeNewAttachmentData,
});
const result = await upgradeVersion(
message.contact[0],
{
logger,
getRegionCode: () => '1',
writeNewAttachmentData,
},
message
);
assert.deepEqual(result, message.contact[0]);
});
@ -302,12 +305,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
message,
getRegionCode: () => 'US',
logger,
writeNewAttachmentData,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => 'US',
logger,
writeNewAttachmentData,
},
message
);
assert.deepEqual(result, expected);
});
@ -347,12 +353,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, expected);
});
@ -432,12 +441,15 @@ describe('Contact', () => {
},
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, expected);
});
@ -479,12 +491,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, expected);
});
@ -526,12 +541,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, expected);
});
@ -569,12 +587,15 @@ describe('Contact', () => {
nickname: 'Someone Somewhere',
},
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, expected);
});
@ -598,12 +619,15 @@ describe('Contact', () => {
},
],
};
const result = await upgradeVersion(message.contact[0], {
getRegionCode: () => '1',
writeNewAttachmentData,
message,
logger,
});
const result = await upgradeVersion(
message.contact[0],
{
getRegionCode: () => '1',
writeNewAttachmentData,
logger,
},
message
);
assert.deepEqual(result, message.contact[0]);
});
});

View file

@ -223,15 +223,15 @@ export function parseAndWriteAvatar(
return async (
contact: EmbeddedContactType,
context: {
message: ReadonlyMessageAttributesType;
getRegionCode: () => string | undefined;
logger: LoggerType;
writeNewAttachmentData: (
data: Uint8Array
) => Promise<LocalAttachmentV2Type>;
}
},
message: ReadonlyMessageAttributesType
): Promise<EmbeddedContactType> => {
const { message, getRegionCode, logger } = context;
const { getRegionCode, logger } = context;
const { avatar } = contact;
const contactWithUpdatedAvatar =

View file

@ -1,7 +1,7 @@
// Copyright 2018 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isFunction, isObject } from 'lodash';
import { isFunction, isObject, identity } from 'lodash';
import type { ReadonlyDeep } from 'type-fest';
import * as Contact from './EmbeddedContact';
@ -95,10 +95,6 @@ export type ContextType = {
deleteOnDisk: (path: string) => Promise<void>;
};
export type ContextWithMessageType = ContextType & {
message: MessageAttributesType;
};
// Schema version history
//
// Version 0
@ -278,14 +274,20 @@ export type UpgradeAttachmentType = (
message: MessageAttributesType
) => Promise<AttachmentType>;
// As regrettable as it is we have to fight back against esbuild's `__name`
// wrapper for functions that are created at high rate, because `__name` affects
// runtime performance.
const esbuildAnonymize = identity;
export const _mapAttachments =
(upgradeAttachment: UpgradeAttachmentType) =>
async (
message: MessageAttributesType,
context: ContextType
): Promise<MessageAttributesType> => {
const upgradeWithContext = (attachment: AttachmentType) =>
upgradeAttachment(attachment, context, message);
const upgradeWithContext = esbuildAnonymize((attachment: AttachmentType) =>
upgradeAttachment(attachment, context, message)
);
const attachments = await Promise.all(
(message.attachments || []).map(upgradeWithContext)
);
@ -345,7 +347,8 @@ export const _mapAllAttachments =
export type UpgradeContactType = (
contact: EmbeddedContactType,
contextWithMessage: ContextWithMessageType
context: ContextType,
message: MessageAttributesType
) => Promise<EmbeddedContactType>;
export const _mapContact =
(upgradeContact: UpgradeContactType) =>
@ -353,9 +356,10 @@ export const _mapContact =
message: MessageAttributesType,
context: ContextType
): Promise<MessageAttributesType> => {
const contextWithMessage = { ...context, message };
const upgradeWithContext = (contact: EmbeddedContactType) =>
upgradeContact(contact, contextWithMessage);
const upgradeWithContext = esbuildAnonymize(
(contact: EmbeddedContactType) =>
upgradeContact(contact, context, message)
);
const contact = await Promise.all(
(message.contact || []).map(upgradeWithContext)
);
@ -378,21 +382,23 @@ export const _mapQuotedAttachments =
throw new Error('_mapQuotedAttachments: context must have logger object');
}
const upgradeWithContext = async (
attachment: QuotedAttachmentType
): Promise<QuotedAttachmentType> => {
const { thumbnail } = attachment;
if (!thumbnail) {
return attachment;
}
const upgradeWithContext = esbuildAnonymize(
async (
attachment: QuotedAttachmentType
): Promise<QuotedAttachmentType> => {
const { thumbnail } = attachment;
if (!thumbnail) {
return attachment;
}
const upgradedThumbnail = await upgradeAttachment(
thumbnail as AttachmentType,
context,
message
);
return { ...attachment, thumbnail: upgradedThumbnail };
};
const upgradedThumbnail = await upgradeAttachment(
thumbnail as AttachmentType,
context,
message
);
return { ...attachment, thumbnail: upgradedThumbnail };
}
);
const quotedAttachments =
(message.quote && message.quote.attachments) || [];
@ -421,15 +427,17 @@ export const _mapPreviewAttachments =
);
}
const upgradeWithContext = async (preview: LinkPreviewType) => {
const { image } = preview;
if (!image) {
return preview;
}
const upgradeWithContext = esbuildAnonymize(
async (preview: LinkPreviewType) => {
const { image } = preview;
if (!image) {
return preview;
}
const upgradedImage = await upgradeAttachment(image, context, message);
return { ...preview, image: upgradedImage };
};
const upgradedImage = await upgradeAttachment(image, context, message);
return { ...preview, image: upgradedImage };
}
);
const preview = await Promise.all(
(message.preview || []).map(upgradeWithContext)
@ -462,9 +470,11 @@ const toVersion4 = _withSchemaVersion({
schemaVersion: 4,
upgrade: _mapQuotedAttachments(migrateDataToFileSystem),
});
// NOOP: Used to be initializeAttachmentMetadata, but it happens in version 7
// now.
const toVersion5 = _withSchemaVersion({
schemaVersion: 5,
upgrade: initializeAttachmentMetadata,
upgrade: noopUpgrade,
});
const toVersion6 = _withSchemaVersion({
schemaVersion: 6,
@ -723,40 +733,6 @@ export const upgradeSchema = async (
} = { versions: VERSIONS }
): Promise<MessageAttributesType> => {
const { versions } = upgradeOptions;
if (!isFunction(readAttachmentData)) {
throw new TypeError('context.readAttachmentData is required');
}
if (!isFunction(writeNewAttachmentData)) {
throw new TypeError('context.writeNewAttachmentData is required');
}
if (!isFunction(getRegionCode)) {
throw new TypeError('context.getRegionCode is required');
}
if (!isFunction(makeObjectUrl)) {
throw new TypeError('context.makeObjectUrl is required');
}
if (!isFunction(revokeObjectUrl)) {
throw new TypeError('context.revokeObjectUrl is required');
}
if (!isFunction(getImageDimensions)) {
throw new TypeError('context.getImageDimensions is required');
}
if (!isFunction(makeImageThumbnail)) {
throw new TypeError('context.makeImageThumbnail is required');
}
if (!isFunction(makeVideoScreenshot)) {
throw new TypeError('context.makeVideoScreenshot is required');
}
if (!isObject(logger)) {
throw new TypeError('context.logger is required');
}
if (!isFunction(writeNewStickerData)) {
throw new TypeError('context.writeNewStickerData is required');
}
if (!isFunction(deleteOnDisk)) {
throw new TypeError('context.deleteOnDisk is required');
}
let message = rawMessage;
const startingVersion = message.schemaVersion ?? 0;
for (let index = 0, max = versions.length; index < max; index += 1) {