From aac94b02179134069b2da5ff85db0bdccba226ff Mon Sep 17 00:00:00 2001 From: trevor-signal <131492920+trevor-signal@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:18:36 -0500 Subject: [PATCH] Improve message migration error handling --- ts/messages/helpers.ts | 10 -- ts/messages/migrateMessageData.ts | 62 ++++++++----- ts/sql/Interface.ts | 4 + ts/sql/Server.ts | 27 ++++++ .../util/migrateMessageData_test.ts | 93 +++++++++++++++++++ 5 files changed, 163 insertions(+), 33 deletions(-) create mode 100644 ts/test-electron/util/migrateMessageData_test.ts diff --git a/ts/messages/helpers.ts b/ts/messages/helpers.ts index 44df2ba88..3460ad0ad 100644 --- a/ts/messages/helpers.ts +++ b/ts/messages/helpers.ts @@ -211,11 +211,6 @@ export function getSourceDevice( if (isIncoming(message) || isStory(message)) { return sourceDevice; } - if (!isOutgoing(message)) { - log.warn( - 'Message.getSourceDevice: Called for non-incoming/non-outgoing message' - ); - } return sourceDevice || window.textsecure.storage.user.getDeviceId(); } @@ -226,11 +221,6 @@ export function getSourceServiceId( if (isIncoming(message) || isStory(message)) { return message.sourceServiceId; } - if (!isOutgoing(message)) { - log.warn( - 'Message.getSourceServiceId: Called for non-incoming/non-outgoing message' - ); - } return window.textsecure.storage.user.getAci(); } diff --git a/ts/messages/migrateMessageData.ts b/ts/messages/migrateMessageData.ts index a9ac384a6..880b1aec8 100644 --- a/ts/messages/migrateMessageData.ts +++ b/ts/messages/migrateMessageData.ts @@ -20,7 +20,7 @@ export async function migrateMessageData({ numMessagesPerBatch, upgradeMessageSchema, getMessagesNeedingUpgrade, - saveMessages, + saveMessagesIndividually, incrementMessagesMigrationAttempts, maxVersion = CURRENT_SCHEMA_VERSION, }: Readonly<{ @@ -33,28 +33,25 @@ export async function migrateMessageData({ limit: number, options: { maxVersion: number } ) => Promise>; - saveMessages: ( + saveMessagesIndividually: ( data: ReadonlyArray, options: { ourAci: AciString } - ) => Promise; + ) => Promise<{ failedIndices: Array }>; incrementMessagesMigrationAttempts: ( messageIds: ReadonlyArray ) => Promise; maxVersion?: number; -}>): Promise< - | { - done: true; - numProcessed: 0; - } - | { - done: boolean; - numProcessed: number; - fetchDuration: number; - upgradeDuration: number; - saveDuration: number; - totalDuration: number; - } -> { +}>): Promise<{ + done: boolean; + numProcessed: number; + numSucceeded?: number; + numFailedSave?: number; + numFailedUpgrade?: number; + fetchDuration?: number; + upgradeDuration?: number; + saveDuration?: number; + totalDuration?: number; +}> { if (!isNumber(numMessagesPerBatch)) { throw new TypeError("'numMessagesPerBatch' is required"); } @@ -85,7 +82,7 @@ export async function migrateMessageData({ const fetchDuration = Date.now() - fetchStartTime; const upgradeStartTime = Date.now(); - const failedMessages = new Array(); + const failedToUpgradeMessageIds = new Array(); const upgradedMessages = ( await pMap( messagesRequiringSchemaUpgrade, @@ -97,7 +94,7 @@ export async function migrateMessageData({ 'migrateMessageData.upgradeMessageSchema error:', Errors.toLogFormat(error) ); - failedMessages.push(message.id); + failedToUpgradeMessageIds.push(message.id); return undefined; } }, @@ -109,18 +106,37 @@ export async function migrateMessageData({ const saveStartTime = Date.now(); const ourAci = window.textsecure.storage.user.getCheckedAci(); - await saveMessages(upgradedMessages, { ourAci }); - if (failedMessages.length) { - await incrementMessagesMigrationAttempts(failedMessages); + const { failedIndices: failedToSaveIndices } = await saveMessagesIndividually( + upgradedMessages, + { + ourAci, + } + ); + + const failedToSaveMessageIds = failedToSaveIndices.map( + idx => upgradedMessages[idx].id + ); + + if (failedToUpgradeMessageIds.length || failedToSaveMessageIds.length) { + await incrementMessagesMigrationAttempts([ + ...failedToUpgradeMessageIds, + ...failedToSaveMessageIds, + ]); } const saveDuration = Date.now() - saveStartTime; const totalDuration = Date.now() - startTime; const numProcessed = messagesRequiringSchemaUpgrade.length; + const numFailedUpgrade = failedToUpgradeMessageIds.length; + const numFailedSave = failedToSaveIndices.length; + const numSucceeded = numProcessed - numFailedSave - numFailedUpgrade; const done = numProcessed < numMessagesPerBatch; return { done, numProcessed, + numSucceeded, + numFailedUpgrade, + numFailedSave, fetchDuration, upgradeDuration, saveDuration, @@ -137,7 +153,7 @@ export async function migrateBatchOfMessages({ numMessagesPerBatch, upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema, getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade, - saveMessages: DataWriter.saveMessages, + saveMessagesIndividually: DataWriter.saveMessagesIndividually, incrementMessagesMigrationAttempts: DataWriter.incrementMessagesMigrationAttempts, }); diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 2e7e1760d..5d8ad2f9c 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -761,6 +761,10 @@ type WritableInterface = { arrayOfMessages: ReadonlyArray>, options: { forceSave?: boolean; ourAci: AciString } ) => Array; + saveMessagesIndividually: ( + arrayOfMessages: ReadonlyArray>, + options: { forceSave?: boolean; ourAci: AciString } + ) => { failedIndices: Array }; getUnreadByConversationAndMarkRead: (options: { conversationId: string; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 8e7e4c07e..bca0bba30 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -438,6 +438,7 @@ export const DataWriter: ServerWritableInterface = { saveMessage, saveMessages, + saveMessagesIndividually, removeMessage, removeMessages, markReactionAsRead, @@ -2208,6 +2209,7 @@ export function saveMessage( ourAci: AciString; } ): string { + // NB: `saveMessagesIndividually` relies on `saveMessage` being atomic const { alreadyInTransaction, forceSave, jobToInsert, ourAci } = options; if (!alreadyInTransaction) { @@ -2435,6 +2437,31 @@ function saveMessages( })(); } +function saveMessagesIndividually( + db: WritableDB, + arrayOfMessages: ReadonlyArray>, + options: { forceSave?: boolean; ourAci: AciString } +): { failedIndices: Array } { + return db.transaction(() => { + const failedIndices: Array = []; + arrayOfMessages.forEach((message, index) => { + try { + saveMessage(db, message, { + ...options, + alreadyInTransaction: true, + }); + } catch (e) { + logger.error( + 'saveMessagesIndividually: failed to save message', + Errors.toLogFormat(e) + ); + failedIndices.push(index); + } + }); + return { failedIndices }; + })(); +} + function removeMessage(db: WritableDB, id: string): void { db.prepare('DELETE FROM messages WHERE id = $id;').run({ id }); } diff --git a/ts/test-electron/util/migrateMessageData_test.ts b/ts/test-electron/util/migrateMessageData_test.ts new file mode 100644 index 000000000..eb9137dc4 --- /dev/null +++ b/ts/test-electron/util/migrateMessageData_test.ts @@ -0,0 +1,93 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import assert from 'assert'; +import { v7 as uuid } from 'uuid'; +import { migrateMessageData } from '../../messages/migrateMessageData'; +import type { MessageAttributesType } from '../../model-types'; +import { DataReader, DataWriter } from '../../sql/Client'; +import { generateAci } from '../../types/ServiceId'; + +function composeMessage(timestamp: number): MessageAttributesType { + return { + schemaVersion: 1, + conversationId: uuid(), + id: uuid(), + type: 'incoming', + received_at: timestamp, + received_at_ms: timestamp, + sent_at: timestamp, + timestamp, + }; +} + +describe('utils/migrateMessageData', async () => { + before(async () => { + await DataWriter.removeAll(); + await window.storage.put('uuid_id', generateAci()); + }); + after(async () => { + await DataWriter.removeAll(); + }); + it('increments attempts for messages which fail to save', async () => { + const messages = new Array(5) + .fill(null) + .map((_, idx) => composeMessage(idx + 1)); + + const CANNOT_UPGRADE_MESSAGE_ID = messages[1].id; + const CANNOT_SAVE_MESSAGE_ID = messages[2].id; + await DataWriter.saveMessages(messages, { + forceSave: true, + ourAci: generateAci(), + }); + + const result = await migrateMessageData({ + numMessagesPerBatch: 10_000, + upgradeMessageSchema: async (message, ...rest) => { + if (message.id === CANNOT_UPGRADE_MESSAGE_ID) { + throw new Error('upgrade failed'); + } + return window.Signal.Migrations.upgradeMessageSchema(message, ...rest); + }, + getMessagesNeedingUpgrade: async (...args) => { + const messagesToUpgrade = await DataReader.getMessagesNeedingUpgrade( + ...args + ); + + return messagesToUpgrade.map(message => { + if (message.id === CANNOT_SAVE_MESSAGE_ID) { + return { + ...message, + // mimic bad data in DB + sent_at: { low: 0, high: 0 } as unknown as number, + }; + } + return message; + }); + }, + saveMessagesIndividually: DataWriter.saveMessagesIndividually, + incrementMessagesMigrationAttempts: + DataWriter.incrementMessagesMigrationAttempts, + }); + + assert.equal(result.done, true); + assert.equal(result.numProcessed, 5); + assert.equal(result.numSucceeded, 3); + assert.equal(result.numFailedSave, 1); + assert.equal(result.numFailedUpgrade, 1); + + const upgradedMessages = await DataReader._getAllMessages(); + for (const message of upgradedMessages) { + if ( + message.id === CANNOT_SAVE_MESSAGE_ID || + message.id === CANNOT_UPGRADE_MESSAGE_ID + ) { + assert.equal(message.schemaMigrationAttempts, 1); + assert.equal(message.schemaVersion, 1); + } else { + assert.equal(message.schemaMigrationAttempts ?? 0, 0); + assert.equal((message.schemaVersion ?? 0) > 1, true); + } + } + }); +});