Improve message migration error handling

This commit is contained in:
trevor-signal 2024-11-04 15:18:36 -05:00 committed by GitHub
parent 1620ccf3ab
commit aac94b0217
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 163 additions and 33 deletions

View file

@ -211,11 +211,6 @@ export function getSourceDevice(
if (isIncoming(message) || isStory(message)) { if (isIncoming(message) || isStory(message)) {
return sourceDevice; return sourceDevice;
} }
if (!isOutgoing(message)) {
log.warn(
'Message.getSourceDevice: Called for non-incoming/non-outgoing message'
);
}
return sourceDevice || window.textsecure.storage.user.getDeviceId(); return sourceDevice || window.textsecure.storage.user.getDeviceId();
} }
@ -226,11 +221,6 @@ export function getSourceServiceId(
if (isIncoming(message) || isStory(message)) { if (isIncoming(message) || isStory(message)) {
return message.sourceServiceId; return message.sourceServiceId;
} }
if (!isOutgoing(message)) {
log.warn(
'Message.getSourceServiceId: Called for non-incoming/non-outgoing message'
);
}
return window.textsecure.storage.user.getAci(); return window.textsecure.storage.user.getAci();
} }

View file

@ -20,7 +20,7 @@ export async function migrateMessageData({
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
getMessagesNeedingUpgrade, getMessagesNeedingUpgrade,
saveMessages, saveMessagesIndividually,
incrementMessagesMigrationAttempts, incrementMessagesMigrationAttempts,
maxVersion = CURRENT_SCHEMA_VERSION, maxVersion = CURRENT_SCHEMA_VERSION,
}: Readonly<{ }: Readonly<{
@ -33,28 +33,25 @@ export async function migrateMessageData({
limit: number, limit: number,
options: { maxVersion: number } options: { maxVersion: number }
) => Promise<Array<MessageAttributesType>>; ) => Promise<Array<MessageAttributesType>>;
saveMessages: ( saveMessagesIndividually: (
data: ReadonlyArray<MessageAttributesType>, data: ReadonlyArray<MessageAttributesType>,
options: { ourAci: AciString } options: { ourAci: AciString }
) => Promise<unknown>; ) => Promise<{ failedIndices: Array<number> }>;
incrementMessagesMigrationAttempts: ( incrementMessagesMigrationAttempts: (
messageIds: ReadonlyArray<string> messageIds: ReadonlyArray<string>
) => Promise<void>; ) => Promise<void>;
maxVersion?: number; maxVersion?: number;
}>): Promise< }>): Promise<{
| { done: boolean;
done: true; numProcessed: number;
numProcessed: 0; numSucceeded?: number;
} numFailedSave?: number;
| { numFailedUpgrade?: number;
done: boolean; fetchDuration?: number;
numProcessed: number; upgradeDuration?: number;
fetchDuration: number; saveDuration?: number;
upgradeDuration: number; totalDuration?: number;
saveDuration: number; }> {
totalDuration: number;
}
> {
if (!isNumber(numMessagesPerBatch)) { if (!isNumber(numMessagesPerBatch)) {
throw new TypeError("'numMessagesPerBatch' is required"); throw new TypeError("'numMessagesPerBatch' is required");
} }
@ -85,7 +82,7 @@ export async function migrateMessageData({
const fetchDuration = Date.now() - fetchStartTime; const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now(); const upgradeStartTime = Date.now();
const failedMessages = new Array<string>(); const failedToUpgradeMessageIds = new Array<string>();
const upgradedMessages = ( const upgradedMessages = (
await pMap( await pMap(
messagesRequiringSchemaUpgrade, messagesRequiringSchemaUpgrade,
@ -97,7 +94,7 @@ export async function migrateMessageData({
'migrateMessageData.upgradeMessageSchema error:', 'migrateMessageData.upgradeMessageSchema error:',
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
failedMessages.push(message.id); failedToUpgradeMessageIds.push(message.id);
return undefined; return undefined;
} }
}, },
@ -109,18 +106,37 @@ export async function migrateMessageData({
const saveStartTime = Date.now(); const saveStartTime = Date.now();
const ourAci = window.textsecure.storage.user.getCheckedAci(); const ourAci = window.textsecure.storage.user.getCheckedAci();
await saveMessages(upgradedMessages, { ourAci }); const { failedIndices: failedToSaveIndices } = await saveMessagesIndividually(
if (failedMessages.length) { upgradedMessages,
await incrementMessagesMigrationAttempts(failedMessages); {
ourAci,
}
);
const failedToSaveMessageIds = failedToSaveIndices.map(
idx => upgradedMessages[idx].id
);
if (failedToUpgradeMessageIds.length || failedToSaveMessageIds.length) {
await incrementMessagesMigrationAttempts([
...failedToUpgradeMessageIds,
...failedToSaveMessageIds,
]);
} }
const saveDuration = Date.now() - saveStartTime; const saveDuration = Date.now() - saveStartTime;
const totalDuration = Date.now() - startTime; const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length; const numProcessed = messagesRequiringSchemaUpgrade.length;
const numFailedUpgrade = failedToUpgradeMessageIds.length;
const numFailedSave = failedToSaveIndices.length;
const numSucceeded = numProcessed - numFailedSave - numFailedUpgrade;
const done = numProcessed < numMessagesPerBatch; const done = numProcessed < numMessagesPerBatch;
return { return {
done, done,
numProcessed, numProcessed,
numSucceeded,
numFailedUpgrade,
numFailedSave,
fetchDuration, fetchDuration,
upgradeDuration, upgradeDuration,
saveDuration, saveDuration,
@ -137,7 +153,7 @@ export async function migrateBatchOfMessages({
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema, upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema,
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade, getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
saveMessages: DataWriter.saveMessages, saveMessagesIndividually: DataWriter.saveMessagesIndividually,
incrementMessagesMigrationAttempts: incrementMessagesMigrationAttempts:
DataWriter.incrementMessagesMigrationAttempts, DataWriter.incrementMessagesMigrationAttempts,
}); });

View file

@ -761,6 +761,10 @@ type WritableInterface = {
arrayOfMessages: ReadonlyArray<ReadonlyDeep<MessageType>>, arrayOfMessages: ReadonlyArray<ReadonlyDeep<MessageType>>,
options: { forceSave?: boolean; ourAci: AciString } options: { forceSave?: boolean; ourAci: AciString }
) => Array<string>; ) => Array<string>;
saveMessagesIndividually: (
arrayOfMessages: ReadonlyArray<ReadonlyDeep<MessageType>>,
options: { forceSave?: boolean; ourAci: AciString }
) => { failedIndices: Array<number> };
getUnreadByConversationAndMarkRead: (options: { getUnreadByConversationAndMarkRead: (options: {
conversationId: string; conversationId: string;

View file

@ -438,6 +438,7 @@ export const DataWriter: ServerWritableInterface = {
saveMessage, saveMessage,
saveMessages, saveMessages,
saveMessagesIndividually,
removeMessage, removeMessage,
removeMessages, removeMessages,
markReactionAsRead, markReactionAsRead,
@ -2208,6 +2209,7 @@ export function saveMessage(
ourAci: AciString; ourAci: AciString;
} }
): string { ): string {
// NB: `saveMessagesIndividually` relies on `saveMessage` being atomic
const { alreadyInTransaction, forceSave, jobToInsert, ourAci } = options; const { alreadyInTransaction, forceSave, jobToInsert, ourAci } = options;
if (!alreadyInTransaction) { if (!alreadyInTransaction) {
@ -2435,6 +2437,31 @@ function saveMessages(
})(); })();
} }
function saveMessagesIndividually(
db: WritableDB,
arrayOfMessages: ReadonlyArray<ReadonlyDeep<MessageType>>,
options: { forceSave?: boolean; ourAci: AciString }
): { failedIndices: Array<number> } {
return db.transaction(() => {
const failedIndices: Array<number> = [];
arrayOfMessages.forEach((message, index) => {
try {
saveMessage(db, message, {
...options,
alreadyInTransaction: true,
});
} catch (e) {
logger.error(
'saveMessagesIndividually: failed to save message',
Errors.toLogFormat(e)
);
failedIndices.push(index);
}
});
return { failedIndices };
})();
}
function removeMessage(db: WritableDB, id: string): void { function removeMessage(db: WritableDB, id: string): void {
db.prepare<Query>('DELETE FROM messages WHERE id = $id;').run({ id }); db.prepare<Query>('DELETE FROM messages WHERE id = $id;').run({ id });
} }

View file

@ -0,0 +1,93 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import assert from 'assert';
import { v7 as uuid } from 'uuid';
import { migrateMessageData } from '../../messages/migrateMessageData';
import type { MessageAttributesType } from '../../model-types';
import { DataReader, DataWriter } from '../../sql/Client';
import { generateAci } from '../../types/ServiceId';
function composeMessage(timestamp: number): MessageAttributesType {
return {
schemaVersion: 1,
conversationId: uuid(),
id: uuid(),
type: 'incoming',
received_at: timestamp,
received_at_ms: timestamp,
sent_at: timestamp,
timestamp,
};
}
describe('utils/migrateMessageData', async () => {
before(async () => {
await DataWriter.removeAll();
await window.storage.put('uuid_id', generateAci());
});
after(async () => {
await DataWriter.removeAll();
});
it('increments attempts for messages which fail to save', async () => {
const messages = new Array(5)
.fill(null)
.map((_, idx) => composeMessage(idx + 1));
const CANNOT_UPGRADE_MESSAGE_ID = messages[1].id;
const CANNOT_SAVE_MESSAGE_ID = messages[2].id;
await DataWriter.saveMessages(messages, {
forceSave: true,
ourAci: generateAci(),
});
const result = await migrateMessageData({
numMessagesPerBatch: 10_000,
upgradeMessageSchema: async (message, ...rest) => {
if (message.id === CANNOT_UPGRADE_MESSAGE_ID) {
throw new Error('upgrade failed');
}
return window.Signal.Migrations.upgradeMessageSchema(message, ...rest);
},
getMessagesNeedingUpgrade: async (...args) => {
const messagesToUpgrade = await DataReader.getMessagesNeedingUpgrade(
...args
);
return messagesToUpgrade.map(message => {
if (message.id === CANNOT_SAVE_MESSAGE_ID) {
return {
...message,
// mimic bad data in DB
sent_at: { low: 0, high: 0 } as unknown as number,
};
}
return message;
});
},
saveMessagesIndividually: DataWriter.saveMessagesIndividually,
incrementMessagesMigrationAttempts:
DataWriter.incrementMessagesMigrationAttempts,
});
assert.equal(result.done, true);
assert.equal(result.numProcessed, 5);
assert.equal(result.numSucceeded, 3);
assert.equal(result.numFailedSave, 1);
assert.equal(result.numFailedUpgrade, 1);
const upgradedMessages = await DataReader._getAllMessages();
for (const message of upgradedMessages) {
if (
message.id === CANNOT_SAVE_MESSAGE_ID ||
message.id === CANNOT_UPGRADE_MESSAGE_ID
) {
assert.equal(message.schemaMigrationAttempts, 1);
assert.equal(message.schemaVersion, 1);
} else {
assert.equal(message.schemaMigrationAttempts ?? 0, 0);
assert.equal((message.schemaVersion ?? 0) > 1, true);
}
}
});
});