Introduce incrementMessagesMigrationAttempts query

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2024-08-22 22:54:41 -05:00 committed by GitHub
parent dd28a258b6
commit 187ed84de5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 113 additions and 14 deletions

View file

@ -1011,6 +1011,8 @@ export async function startApp(): Promise<void> {
upgradeMessageSchema,
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
saveMessages: DataWriter.saveMessages,
incrementMessagesMigrationAttempts:
DataWriter.incrementMessagesMigrationAttempts,
});
log.info('idleDetector/idle: Upgraded messages:', batchWithIndex);
isMigrationWithIndexComplete = batchWithIndex.done;

View file

@ -20,6 +20,7 @@ export async function migrateMessageData({
upgradeMessageSchema,
getMessagesNeedingUpgrade,
saveMessages,
incrementMessagesMigrationAttempts,
maxVersion = CURRENT_SCHEMA_VERSION,
}: Readonly<{
numMessagesPerBatch: number;
@ -35,6 +36,9 @@ export async function migrateMessageData({
data: ReadonlyArray<MessageAttributesType>,
options: { ourAci: AciString }
) => Promise<unknown>;
incrementMessagesMigrationAttempts: (
messageIds: ReadonlyArray<string>
) => Promise<void>;
maxVersion?: number;
}>): Promise<
| {
@ -80,7 +84,7 @@ export async function migrateMessageData({
const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now();
const failedMessages = new Array<MessageAttributesType>();
const failedMessages = new Array<string>();
const upgradedMessages = (
await pMap(
messagesRequiringSchemaUpgrade,
@ -92,7 +96,7 @@ export async function migrateMessageData({
'migrateMessageData.upgradeMessageSchema error:',
Errors.toLogFormat(error)
);
failedMessages.push(message);
failedMessages.push(message.id);
return undefined;
}
},
@ -104,18 +108,10 @@ export async function migrateMessageData({
const saveStartTime = Date.now();
const ourAci = window.textsecure.storage.user.getCheckedAci();
await saveMessages(
[
...upgradedMessages,
// Increment migration attempts
...failedMessages.map(message => ({
...message,
schemaMigrationAttempts: (message.schemaMigrationAttempts ?? 0) + 1,
})),
],
{ ourAci }
);
await saveMessages(upgradedMessages, { ourAci });
if (failedMessages.length) {
await incrementMessagesMigrationAttempts(failedMessages);
}
const saveDuration = Date.now() - saveStartTime;
const totalDuration = Date.now() - startTime;

View file

@ -778,6 +778,9 @@ type WritableInterface = {
) => void;
_removeAllReactions: () => void;
_removeAllMessages: () => void;
incrementMessagesMigrationAttempts: (
messageIds: ReadonlyArray<string>
) => void;
clearCallHistory: (target: CallLogEventTarget) => ReadonlyArray<string>;
_removeAllCallHistory: () => void;

View file

@ -445,6 +445,7 @@ export const DataWriter: ServerWritableInterface = {
migrateConversationMessages,
saveEditedMessage,
saveEditedMessages,
incrementMessagesMigrationAttempts,
removeSyncTaskById,
saveSyncTasks,
@ -6372,6 +6373,29 @@ function getMessagesNeedingUpgrade(
return rows.map(row => jsonToObject(row.json));
}
// Exported for tests
export function incrementMessagesMigrationAttempts(
db: WritableDB,
messageIds: ReadonlyArray<string>
): void {
batchMultiVarQuery(db, messageIds, (batch: ReadonlyArray<string>): void => {
const idSet = sqlJoin(batch);
const [sqlQuery, sqlParams] = sql`
UPDATE
messages
SET
json = json_set(
json,
'$.schemaMigrationAttempts',
IFNULL(json -> '$.schemaMigrationAttempts', 0) + 1
)
WHERE
id IN (${idSet})
`;
db.prepare(sqlQuery).run(sqlParams);
});
}
function getMessagesWithVisualMediaAttachments(
db: ReadableDB,
conversationId: string,

View file

@ -0,0 +1,74 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import type { WritableDB } from '../../sql/Interface';
import {
incrementMessagesMigrationAttempts,
setupTests,
} from '../../sql/Server';
import { createDB, insertData, getTableData } from './helpers';
describe('SQL/incrementMessagesMigrationAttempts', () => {
let db: WritableDB;
beforeEach(() => {
db = createDB();
setupTests(db);
});
afterEach(() => {
db.close();
});
function compactify(
message: Record<string, unknown>
): Record<string, unknown> {
const { id, conversationId, json } = message;
return {
id,
conversationId,
json,
};
}
it('should increment attempts for corrupted messages', () => {
insertData(db, 'messages', [
{
id: 'id',
conversationId: 'other',
json: {
sent_at: { low: 0, high: 0 },
},
},
]);
incrementMessagesMigrationAttempts(db, ['id']);
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
{
id: 'id',
conversationId: 'other',
json: {
schemaMigrationAttempts: 1,
sent_at: { low: 0, high: 0 },
},
},
]);
incrementMessagesMigrationAttempts(db, ['id']);
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
{
id: 'id',
conversationId: 'other',
json: {
schemaMigrationAttempts: 2,
sent_at: { low: 0, high: 0 },
},
},
]);
});
});