diff --git a/ts/background.ts b/ts/background.ts index e18ea39593..1cb523aff7 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -1011,6 +1011,8 @@ export async function startApp(): Promise { upgradeMessageSchema, getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade, saveMessages: DataWriter.saveMessages, + incrementMessagesMigrationAttempts: + DataWriter.incrementMessagesMigrationAttempts, }); log.info('idleDetector/idle: Upgraded messages:', batchWithIndex); isMigrationWithIndexComplete = batchWithIndex.done; diff --git a/ts/messages/migrateMessageData.ts b/ts/messages/migrateMessageData.ts index 110a39c323..862e64c50e 100644 --- a/ts/messages/migrateMessageData.ts +++ b/ts/messages/migrateMessageData.ts @@ -20,6 +20,7 @@ export async function migrateMessageData({ upgradeMessageSchema, getMessagesNeedingUpgrade, saveMessages, + incrementMessagesMigrationAttempts, maxVersion = CURRENT_SCHEMA_VERSION, }: Readonly<{ numMessagesPerBatch: number; @@ -35,6 +36,9 @@ export async function migrateMessageData({ data: ReadonlyArray, options: { ourAci: AciString } ) => Promise; + incrementMessagesMigrationAttempts: ( + messageIds: ReadonlyArray + ) => Promise; maxVersion?: number; }>): Promise< | { @@ -80,7 +84,7 @@ export async function migrateMessageData({ const fetchDuration = Date.now() - fetchStartTime; const upgradeStartTime = Date.now(); - const failedMessages = new Array(); + const failedMessages = new Array(); const upgradedMessages = ( await pMap( messagesRequiringSchemaUpgrade, @@ -92,7 +96,7 @@ export async function migrateMessageData({ 'migrateMessageData.upgradeMessageSchema error:', Errors.toLogFormat(error) ); - failedMessages.push(message); + failedMessages.push(message.id); return undefined; } }, @@ -104,18 +108,10 @@ export async function migrateMessageData({ const saveStartTime = Date.now(); const ourAci = window.textsecure.storage.user.getCheckedAci(); - await saveMessages( - [ - ...upgradedMessages, - - // Increment migration attempts - ...failedMessages.map(message => ({ - ...message, - schemaMigrationAttempts: (message.schemaMigrationAttempts ?? 0) + 1, - })), - ], - { ourAci } - ); + await saveMessages(upgradedMessages, { ourAci }); + if (failedMessages.length) { + await incrementMessagesMigrationAttempts(failedMessages); + } const saveDuration = Date.now() - saveStartTime; const totalDuration = Date.now() - startTime; diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 96910f7685..899f2f56fb 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -778,6 +778,9 @@ type WritableInterface = { ) => void; _removeAllReactions: () => void; _removeAllMessages: () => void; + incrementMessagesMigrationAttempts: ( + messageIds: ReadonlyArray + ) => void; clearCallHistory: (target: CallLogEventTarget) => ReadonlyArray; _removeAllCallHistory: () => void; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 6739edc0ee..782836eed4 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -445,6 +445,7 @@ export const DataWriter: ServerWritableInterface = { migrateConversationMessages, saveEditedMessage, saveEditedMessages, + incrementMessagesMigrationAttempts, removeSyncTaskById, saveSyncTasks, @@ -6372,6 +6373,29 @@ function getMessagesNeedingUpgrade( return rows.map(row => jsonToObject(row.json)); } +// Exported for tests +export function incrementMessagesMigrationAttempts( + db: WritableDB, + messageIds: ReadonlyArray +): void { + batchMultiVarQuery(db, messageIds, (batch: ReadonlyArray): void => { + const idSet = sqlJoin(batch); + const [sqlQuery, sqlParams] = sql` + UPDATE + messages + SET + json = json_set( + json, + '$.schemaMigrationAttempts', + IFNULL(json -> '$.schemaMigrationAttempts', 0) + 1 + ) + WHERE + id IN (${idSet}) + `; + db.prepare(sqlQuery).run(sqlParams); + }); +} + function getMessagesWithVisualMediaAttachments( db: ReadableDB, conversationId: string, diff --git a/ts/test-node/sql/incrementMessagesMigrationAttempts_test.ts b/ts/test-node/sql/incrementMessagesMigrationAttempts_test.ts new file mode 100644 index 0000000000..70a3b93ef9 --- /dev/null +++ b/ts/test-node/sql/incrementMessagesMigrationAttempts_test.ts @@ -0,0 +1,74 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import type { WritableDB } from '../../sql/Interface'; +import { + incrementMessagesMigrationAttempts, + setupTests, +} from '../../sql/Server'; +import { createDB, insertData, getTableData } from './helpers'; + +describe('SQL/incrementMessagesMigrationAttempts', () => { + let db: WritableDB; + + beforeEach(() => { + db = createDB(); + setupTests(db); + }); + + afterEach(() => { + db.close(); + }); + + function compactify( + message: Record + ): Record { + const { id, conversationId, json } = message; + + return { + id, + conversationId, + json, + }; + } + + it('should increment attempts for corrupted messages', () => { + insertData(db, 'messages', [ + { + id: 'id', + conversationId: 'other', + json: { + sent_at: { low: 0, high: 0 }, + }, + }, + ]); + + incrementMessagesMigrationAttempts(db, ['id']); + + assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [ + { + id: 'id', + conversationId: 'other', + json: { + schemaMigrationAttempts: 1, + sent_at: { low: 0, high: 0 }, + }, + }, + ]); + + incrementMessagesMigrationAttempts(db, ['id']); + + assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [ + { + id: 'id', + conversationId: 'other', + json: { + schemaMigrationAttempts: 2, + sent_at: { low: 0, high: 0 }, + }, + }, + ]); + }); +});