Introduce incrementMessagesMigrationAttempts query
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
parent
7db33a6708
commit
67252866cf
5 changed files with 113 additions and 14 deletions
|
@ -1011,6 +1011,8 @@ export async function startApp(): Promise<void> {
|
||||||
upgradeMessageSchema,
|
upgradeMessageSchema,
|
||||||
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
|
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
|
||||||
saveMessages: DataWriter.saveMessages,
|
saveMessages: DataWriter.saveMessages,
|
||||||
|
incrementMessagesMigrationAttempts:
|
||||||
|
DataWriter.incrementMessagesMigrationAttempts,
|
||||||
});
|
});
|
||||||
log.info('idleDetector/idle: Upgraded messages:', batchWithIndex);
|
log.info('idleDetector/idle: Upgraded messages:', batchWithIndex);
|
||||||
isMigrationWithIndexComplete = batchWithIndex.done;
|
isMigrationWithIndexComplete = batchWithIndex.done;
|
||||||
|
|
|
@ -20,6 +20,7 @@ export async function migrateMessageData({
|
||||||
upgradeMessageSchema,
|
upgradeMessageSchema,
|
||||||
getMessagesNeedingUpgrade,
|
getMessagesNeedingUpgrade,
|
||||||
saveMessages,
|
saveMessages,
|
||||||
|
incrementMessagesMigrationAttempts,
|
||||||
maxVersion = CURRENT_SCHEMA_VERSION,
|
maxVersion = CURRENT_SCHEMA_VERSION,
|
||||||
}: Readonly<{
|
}: Readonly<{
|
||||||
numMessagesPerBatch: number;
|
numMessagesPerBatch: number;
|
||||||
|
@ -35,6 +36,9 @@ export async function migrateMessageData({
|
||||||
data: ReadonlyArray<MessageAttributesType>,
|
data: ReadonlyArray<MessageAttributesType>,
|
||||||
options: { ourAci: AciString }
|
options: { ourAci: AciString }
|
||||||
) => Promise<unknown>;
|
) => Promise<unknown>;
|
||||||
|
incrementMessagesMigrationAttempts: (
|
||||||
|
messageIds: ReadonlyArray<string>
|
||||||
|
) => Promise<void>;
|
||||||
maxVersion?: number;
|
maxVersion?: number;
|
||||||
}>): Promise<
|
}>): Promise<
|
||||||
| {
|
| {
|
||||||
|
@ -80,7 +84,7 @@ export async function migrateMessageData({
|
||||||
const fetchDuration = Date.now() - fetchStartTime;
|
const fetchDuration = Date.now() - fetchStartTime;
|
||||||
|
|
||||||
const upgradeStartTime = Date.now();
|
const upgradeStartTime = Date.now();
|
||||||
const failedMessages = new Array<MessageAttributesType>();
|
const failedMessages = new Array<string>();
|
||||||
const upgradedMessages = (
|
const upgradedMessages = (
|
||||||
await pMap(
|
await pMap(
|
||||||
messagesRequiringSchemaUpgrade,
|
messagesRequiringSchemaUpgrade,
|
||||||
|
@ -92,7 +96,7 @@ export async function migrateMessageData({
|
||||||
'migrateMessageData.upgradeMessageSchema error:',
|
'migrateMessageData.upgradeMessageSchema error:',
|
||||||
Errors.toLogFormat(error)
|
Errors.toLogFormat(error)
|
||||||
);
|
);
|
||||||
failedMessages.push(message);
|
failedMessages.push(message.id);
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -104,18 +108,10 @@ export async function migrateMessageData({
|
||||||
const saveStartTime = Date.now();
|
const saveStartTime = Date.now();
|
||||||
|
|
||||||
const ourAci = window.textsecure.storage.user.getCheckedAci();
|
const ourAci = window.textsecure.storage.user.getCheckedAci();
|
||||||
await saveMessages(
|
await saveMessages(upgradedMessages, { ourAci });
|
||||||
[
|
if (failedMessages.length) {
|
||||||
...upgradedMessages,
|
await incrementMessagesMigrationAttempts(failedMessages);
|
||||||
|
}
|
||||||
// Increment migration attempts
|
|
||||||
...failedMessages.map(message => ({
|
|
||||||
...message,
|
|
||||||
schemaMigrationAttempts: (message.schemaMigrationAttempts ?? 0) + 1,
|
|
||||||
})),
|
|
||||||
],
|
|
||||||
{ ourAci }
|
|
||||||
);
|
|
||||||
const saveDuration = Date.now() - saveStartTime;
|
const saveDuration = Date.now() - saveStartTime;
|
||||||
|
|
||||||
const totalDuration = Date.now() - startTime;
|
const totalDuration = Date.now() - startTime;
|
||||||
|
|
|
@ -778,6 +778,9 @@ type WritableInterface = {
|
||||||
) => void;
|
) => void;
|
||||||
_removeAllReactions: () => void;
|
_removeAllReactions: () => void;
|
||||||
_removeAllMessages: () => void;
|
_removeAllMessages: () => void;
|
||||||
|
incrementMessagesMigrationAttempts: (
|
||||||
|
messageIds: ReadonlyArray<string>
|
||||||
|
) => void;
|
||||||
|
|
||||||
clearCallHistory: (target: CallLogEventTarget) => ReadonlyArray<string>;
|
clearCallHistory: (target: CallLogEventTarget) => ReadonlyArray<string>;
|
||||||
_removeAllCallHistory: () => void;
|
_removeAllCallHistory: () => void;
|
||||||
|
|
|
@ -445,6 +445,7 @@ export const DataWriter: ServerWritableInterface = {
|
||||||
migrateConversationMessages,
|
migrateConversationMessages,
|
||||||
saveEditedMessage,
|
saveEditedMessage,
|
||||||
saveEditedMessages,
|
saveEditedMessages,
|
||||||
|
incrementMessagesMigrationAttempts,
|
||||||
|
|
||||||
removeSyncTaskById,
|
removeSyncTaskById,
|
||||||
saveSyncTasks,
|
saveSyncTasks,
|
||||||
|
@ -6370,6 +6371,29 @@ function getMessagesNeedingUpgrade(
|
||||||
return rows.map(row => jsonToObject(row.json));
|
return rows.map(row => jsonToObject(row.json));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exported for tests
|
||||||
|
export function incrementMessagesMigrationAttempts(
|
||||||
|
db: WritableDB,
|
||||||
|
messageIds: ReadonlyArray<string>
|
||||||
|
): void {
|
||||||
|
batchMultiVarQuery(db, messageIds, (batch: ReadonlyArray<string>): void => {
|
||||||
|
const idSet = sqlJoin(batch);
|
||||||
|
const [sqlQuery, sqlParams] = sql`
|
||||||
|
UPDATE
|
||||||
|
messages
|
||||||
|
SET
|
||||||
|
json = json_set(
|
||||||
|
json,
|
||||||
|
'$.schemaMigrationAttempts',
|
||||||
|
IFNULL(json -> '$.schemaMigrationAttempts', 0) + 1
|
||||||
|
)
|
||||||
|
WHERE
|
||||||
|
id IN (${idSet})
|
||||||
|
`;
|
||||||
|
db.prepare(sqlQuery).run(sqlParams);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
function getMessagesWithVisualMediaAttachments(
|
function getMessagesWithVisualMediaAttachments(
|
||||||
db: ReadableDB,
|
db: ReadableDB,
|
||||||
conversationId: string,
|
conversationId: string,
|
||||||
|
|
74
ts/test-node/sql/incrementMessagesMigrationAttempts_test.ts
Normal file
74
ts/test-node/sql/incrementMessagesMigrationAttempts_test.ts
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
// Copyright 2024 Signal Messenger, LLC
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import { assert } from 'chai';
|
||||||
|
|
||||||
|
import type { WritableDB } from '../../sql/Interface';
|
||||||
|
import {
|
||||||
|
incrementMessagesMigrationAttempts,
|
||||||
|
setupTests,
|
||||||
|
} from '../../sql/Server';
|
||||||
|
import { createDB, insertData, getTableData } from './helpers';
|
||||||
|
|
||||||
|
describe('SQL/incrementMessagesMigrationAttempts', () => {
|
||||||
|
let db: WritableDB;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
db = createDB();
|
||||||
|
setupTests(db);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
db.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
function compactify(
|
||||||
|
message: Record<string, unknown>
|
||||||
|
): Record<string, unknown> {
|
||||||
|
const { id, conversationId, json } = message;
|
||||||
|
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
conversationId,
|
||||||
|
json,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
it('should increment attempts for corrupted messages', () => {
|
||||||
|
insertData(db, 'messages', [
|
||||||
|
{
|
||||||
|
id: 'id',
|
||||||
|
conversationId: 'other',
|
||||||
|
json: {
|
||||||
|
sent_at: { low: 0, high: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
incrementMessagesMigrationAttempts(db, ['id']);
|
||||||
|
|
||||||
|
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
|
||||||
|
{
|
||||||
|
id: 'id',
|
||||||
|
conversationId: 'other',
|
||||||
|
json: {
|
||||||
|
schemaMigrationAttempts: 1,
|
||||||
|
sent_at: { low: 0, high: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
incrementMessagesMigrationAttempts(db, ['id']);
|
||||||
|
|
||||||
|
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
|
||||||
|
{
|
||||||
|
id: 'id',
|
||||||
|
conversationId: 'other',
|
||||||
|
json: {
|
||||||
|
schemaMigrationAttempts: 2,
|
||||||
|
sent_at: { low: 0, high: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
Loading…
Add table
Add a link
Reference in a new issue