Increment and store message migration attempts

This commit is contained in:
Fedor Indutny 2022-06-20 14:18:23 -07:00 committed by GitHub
parent d547ef362e
commit 63679f5af6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 20 deletions

View file

@ -850,7 +850,8 @@ export async function startApp(): Promise<void> {
`Starting background data migration. Target version: ${Message.CURRENT_SCHEMA_VERSION}` `Starting background data migration. Target version: ${Message.CURRENT_SCHEMA_VERSION}`
); );
idleDetector.on('idle', async () => { idleDetector.on('idle', async () => {
const NUM_MESSAGES_PER_BATCH = 1; const NUM_MESSAGES_PER_BATCH = 100;
const BATCH_DELAY = 5 * durations.SECOND;
if (!isMigrationWithIndexComplete) { if (!isMigrationWithIndexComplete) {
const batchWithIndex = await migrateMessageData({ const batchWithIndex = await migrateMessageData({
@ -858,15 +859,22 @@ export async function startApp(): Promise<void> {
upgradeMessageSchema, upgradeMessageSchema,
getMessagesNeedingUpgrade: getMessagesNeedingUpgrade:
window.Signal.Data.getMessagesNeedingUpgrade, window.Signal.Data.getMessagesNeedingUpgrade,
saveMessage: window.Signal.Data.saveMessage, saveMessages: window.Signal.Data.saveMessages,
}); });
log.info('Upgrade message schema (with index):', batchWithIndex); log.info('Upgrade message schema (with index):', batchWithIndex);
isMigrationWithIndexComplete = batchWithIndex.done; isMigrationWithIndexComplete = batchWithIndex.done;
} }
idleDetector.stop();
if (isMigrationWithIndexComplete) { if (isMigrationWithIndexComplete) {
log.info('Background migration complete. Stopping idle detector.'); log.info('Background migration complete. Stopping idle detector.');
idleDetector.stop(); } else {
log.info('Background migration not complete. Pausing idle detector.');
setTimeout(() => {
idleDetector.start();
}, BATCH_DELAY);
} }
}); });

View file

@ -2,9 +2,15 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import { isFunction, isNumber } from 'lodash'; import { isFunction, isNumber } from 'lodash';
import pMap from 'p-map';
import { CURRENT_SCHEMA_VERSION } from '../types/Message2'; import { CURRENT_SCHEMA_VERSION } from '../types/Message2';
import { isNotNil } from '../util/isNotNil';
import type { MessageAttributesType } from '../model-types.d'; import type { MessageAttributesType } from '../model-types.d';
import type { UUIDStringType } from '../types/UUID'; import type { UUIDStringType } from '../types/UUID';
import * as Errors from '../types/errors';
const MAX_CONCURRENCY = 5;
/** /**
* Ensures that messages in database are at the right schema. * Ensures that messages in database are at the right schema.
@ -13,7 +19,7 @@ export async function migrateMessageData({
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
getMessagesNeedingUpgrade, getMessagesNeedingUpgrade,
saveMessage, saveMessages,
maxVersion = CURRENT_SCHEMA_VERSION, maxVersion = CURRENT_SCHEMA_VERSION,
}: Readonly<{ }: Readonly<{
numMessagesPerBatch: number; numMessagesPerBatch: number;
@ -25,10 +31,10 @@ export async function migrateMessageData({
limit: number, limit: number,
options: { maxVersion: number } options: { maxVersion: number }
) => Promise<Array<MessageAttributesType>>; ) => Promise<Array<MessageAttributesType>>;
saveMessage: ( saveMessages: (
data: MessageAttributesType, data: ReadonlyArray<MessageAttributesType>,
options: { ourUuid: UUIDStringType } options: { ourUuid: UUIDStringType }
) => Promise<string>; ) => Promise<void>;
maxVersion?: number; maxVersion?: number;
}>): Promise< }>): Promise<
| { | {
@ -63,8 +69,8 @@ export async function migrateMessageData({
); );
} catch (error) { } catch (error) {
window.SignalContext.log.error( window.SignalContext.log.error(
'processNext error:', 'migrateMessageData.getMessagesNeedingUpgrade error:',
error && error.stack ? error.stack : error Errors.toLogFormat(error)
); );
return { return {
done: true, done: true,
@ -74,20 +80,41 @@ export async function migrateMessageData({
const fetchDuration = Date.now() - fetchStartTime; const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now(); const upgradeStartTime = Date.now();
const upgradedMessages = await Promise.all( const failedMessages = new Array<MessageAttributesType>();
messagesRequiringSchemaUpgrade.map(message => const upgradedMessages = (
upgradeMessageSchema(message, { maxVersion }) await pMap(
messagesRequiringSchemaUpgrade,
async message => {
try {
return await upgradeMessageSchema(message, { maxVersion });
} catch (error) {
window.SignalContext.log.error(
'migrateMessageData.upgradeMessageSchema error:',
Errors.toLogFormat(error)
);
failedMessages.push(message);
return undefined;
}
},
{ concurrency: MAX_CONCURRENCY }
) )
); ).filter(isNotNil);
const upgradeDuration = Date.now() - upgradeStartTime; const upgradeDuration = Date.now() - upgradeStartTime;
const saveStartTime = Date.now(); const saveStartTime = Date.now();
await Promise.all(
upgradedMessages.map(message => const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString();
saveMessage(message, { await saveMessages(
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), [
}) ...upgradedMessages,
)
// Increment migration attempts
...failedMessages.map(message => ({
...message,
schemaMigrationAttempts: (message.schemaMigrationAttempts ?? 0) + 1,
})),
],
{ ourUuid }
); );
const saveDuration = Date.now() - saveStartTime; const saveDuration = Date.now() - saveStartTime;

3
ts/model-types.d.ts vendored
View file

@ -205,6 +205,9 @@ export type MessageAttributesType = {
// background, when we were still in IndexedDB, before attachments had gone to disk // background, when we were still in IndexedDB, before attachments had gone to disk
// We set this so that the idle message upgrade process doesn't pick this message up // We set this so that the idle message upgrade process doesn't pick this message up
schemaVersion?: number; schemaVersion?: number;
// migrateMessageData will increment this field on every failure and give up
// when the value is too high.
schemaMigrationAttempts?: number;
// This should always be set for new messages, but older messages may not have them. We // This should always be set for new messages, but older messages may not have them. We
// may not have these for outbound messages, either, as we have not needed them. // may not have these for outbound messages, either, as we have not needed them.
serverGuid?: string; serverGuid?: string;

View file

@ -4387,22 +4387,31 @@ async function removeAllConfiguration(
})(); })();
} }
const MAX_MESSAGE_MIGRATION_ATTEMPTS = 5;
async function getMessagesNeedingUpgrade( async function getMessagesNeedingUpgrade(
limit: number, limit: number,
{ maxVersion }: { maxVersion: number } { maxVersion }: { maxVersion: number }
): Promise<Array<MessageType>> { ): Promise<Array<MessageType>> {
const db = getInstance(); const db = getInstance();
const rows: JSONRows = db const rows: JSONRows = db
.prepare<Query>( .prepare<Query>(
` `
SELECT json SELECT json
FROM messages FROM messages
WHERE schemaVersion IS NULL OR schemaVersion < $maxVersion WHERE
(schemaVersion IS NULL OR schemaVersion < $maxVersion) AND
IFNULL(
json_extract(json, '$.schemaMigrationAttempts'),
0
) < $maxAttempts
LIMIT $limit; LIMIT $limit;
` `
) )
.all({ .all({
maxVersion, maxVersion,
maxAttempts: MAX_MESSAGE_MIGRATION_ATTEMPTS,
limit, limit,
}); });