diff --git a/ts/CI.ts b/ts/CI.ts index 54ee01539..d3174793f 100644 --- a/ts/CI.ts +++ b/ts/CI.ts @@ -11,6 +11,7 @@ import { explodePromise } from './util/explodePromise'; import { AccessType, ipcInvoke } from './sql/channels'; import { backupsService } from './services/backups'; import { AttachmentBackupManager } from './jobs/AttachmentBackupManager'; +import { migrateAllMessages } from './messages/migrateMessageData'; import { SECOND } from './util/durations'; import { isSignalRoute } from './util/signalRoutes'; import { strictAssert } from './util/assert'; @@ -35,6 +36,7 @@ export type CIType = { } ) => unknown; openSignalRoute(url: string): Promise; + migrateAllMessages(): Promise; uploadBackup(): Promise; unlink: () => void; print: (...args: ReadonlyArray) => void; @@ -192,6 +194,7 @@ export function getCI({ deviceName }: GetCIOptionsType): CIType { solveChallenge, waitForEvent, openSignalRoute, + migrateAllMessages, uploadBackup, unlink, getPendingEventCount, diff --git a/ts/messages/migrateMessageData.ts b/ts/messages/migrateMessageData.ts index 880b1aec8..148e6be9e 100644 --- a/ts/messages/migrateMessageData.ts +++ b/ts/messages/migrateMessageData.ts @@ -3,9 +3,11 @@ import { isFunction, isNumber } from 'lodash'; import pMap from 'p-map'; +import PQueue from 'p-queue'; import { CURRENT_SCHEMA_VERSION } from '../types/Message2'; import { isNotNil } from '../util/isNotNil'; +import { MINUTE } from '../util/durations'; import type { MessageAttributesType } from '../model-types.d'; import type { AciString } from '../types/ServiceId'; import * as Errors from '../types/errors'; @@ -13,10 +15,30 @@ import { DataReader, DataWriter } from '../sql/Client'; const MAX_CONCURRENCY = 5; +// Don't migrate batches concurrently +const migrationQueue = new PQueue({ + concurrency: 1, + timeout: MINUTE * 30, +}); + +type BatchResultType = Readonly<{ + done: boolean; + numProcessed: number; + numSucceeded?: number; + numFailedSave?: number; + numFailedUpgrade?: number; + fetchDuration?: number; + upgradeDuration?: number; + saveDuration?: number; + totalDuration?: number; +}>; + /** * Ensures that messages in database are at the right schema. + * + * @internal */ -export async function migrateMessageData({ +export async function _migrateMessageData({ numMessagesPerBatch, upgradeMessageSchema, getMessagesNeedingUpgrade, @@ -41,17 +63,7 @@ export async function migrateMessageData({ messageIds: ReadonlyArray ) => Promise; maxVersion?: number; -}>): Promise<{ - done: boolean; - numProcessed: number; - numSucceeded?: number; - numFailedSave?: number; - numFailedUpgrade?: number; - fetchDuration?: number; - upgradeDuration?: number; - saveDuration?: number; - totalDuration?: number; -}> { +}>): Promise { if (!isNumber(numMessagesPerBatch)) { throw new TypeError("'numMessagesPerBatch' is required"); } @@ -148,13 +160,33 @@ export async function migrateBatchOfMessages({ numMessagesPerBatch, }: { numMessagesPerBatch: number; -}): ReturnType { - return migrateMessageData({ - numMessagesPerBatch, - upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema, - getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade, - saveMessagesIndividually: DataWriter.saveMessagesIndividually, - incrementMessagesMigrationAttempts: - DataWriter.incrementMessagesMigrationAttempts, - }); +}): ReturnType { + return migrationQueue.add(() => + _migrateMessageData({ + numMessagesPerBatch, + upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema, + getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade, + saveMessagesIndividually: DataWriter.saveMessagesIndividually, + incrementMessagesMigrationAttempts: + DataWriter.incrementMessagesMigrationAttempts, + }) + ); +} + +export async function migrateAllMessages(): Promise { + const { log } = window.SignalContext; + + let batch: BatchResultType | undefined; + let total = 0; + while (!batch?.done) { + // eslint-disable-next-line no-await-in-loop + batch = await migrateBatchOfMessages({ + numMessagesPerBatch: 1000, + }); + total += batch.numProcessed; + log.info(`migrateAllMessages: Migrated batch of ${batch.numProcessed}`); + } + log.info( + `migrateAllMessages: message migration complete; ${total} messages migrated` + ); } diff --git a/ts/services/backups/export.ts b/ts/services/backups/export.ts index 242e277c2..11fc85404 100644 --- a/ts/services/backups/export.ts +++ b/ts/services/backups/export.ts @@ -138,7 +138,7 @@ import { CallLinkRestrictions } from '../../types/CallLink'; import { toAdminKeyBytes } from '../../util/callLinks'; import { getRoomIdFromRootKey } from '../../util/callLinksRingrtc'; import { SeenStatus } from '../../MessageSeenStatus'; -import { migrateBatchOfMessages } from '../../messages/migrateMessageData'; +import { migrateAllMessages } from '../../messages/migrateMessageData'; const MAX_CONCURRENCY = 10; @@ -226,23 +226,7 @@ export class BackupExportStream extends Readable { log.info('BackupExportStream: starting...'); drop(AttachmentBackupManager.stop()); log.info('BackupExportStream: message migration starting...'); - let batchMigrationResult: - | Awaited> - | undefined; - let totalMigrated = 0; - while (!batchMigrationResult?.done) { - // eslint-disable-next-line no-await-in-loop - batchMigrationResult = await migrateBatchOfMessages({ - numMessagesPerBatch: 1000, - }); - totalMigrated += batchMigrationResult.numProcessed; - log.info( - `BackupExportStream: Migrated batch of ${batchMigrationResult.numProcessed}` - ); - } - log.info( - `BackupExportStream: message migration complete; ${totalMigrated} messages migrated` - ); + await migrateAllMessages(); await pauseWriteAccess(); try { diff --git a/ts/state/ducks/installer.ts b/ts/state/ducks/installer.ts index eeecb3364..1341ca1f9 100644 --- a/ts/state/ducks/installer.ts +++ b/ts/state/ducks/installer.ts @@ -577,7 +577,7 @@ export function reducer( if (action.type === SHOW_BACKUP_IMPORT) { if ( - // Downloading backup after linking + // Downloading backup after linking state.step !== InstallScreenStep.QrCodeNotScanned && // Restarting backup download on startup state.step !== InstallScreenStep.NotStarted diff --git a/ts/test-electron/util/migrateMessageData_test.ts b/ts/test-electron/util/migrateMessageData_test.ts index eb9137dc4..b856dbf81 100644 --- a/ts/test-electron/util/migrateMessageData_test.ts +++ b/ts/test-electron/util/migrateMessageData_test.ts @@ -3,7 +3,7 @@ import assert from 'assert'; import { v7 as uuid } from 'uuid'; -import { migrateMessageData } from '../../messages/migrateMessageData'; +import { _migrateMessageData as migrateMessageData } from '../../messages/migrateMessageData'; import type { MessageAttributesType } from '../../model-types'; import { DataReader, DataWriter } from '../../sql/Client'; import { generateAci } from '../../types/ServiceId'; diff --git a/ts/test-mock/benchmarks/backup_bench.ts b/ts/test-mock/benchmarks/backup_bench.ts index a930b0ac3..50fe2ac25 100644 --- a/ts/test-mock/benchmarks/backup_bench.ts +++ b/ts/test-mock/benchmarks/backup_bench.ts @@ -22,6 +22,8 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise => { const app = await bootstrap.link(); const { duration: importDuration } = await app.waitForBackupImportComplete(); + await app.migrateAllMessages(); + const exportStart = Date.now(); await app.uploadBackup(); const exportEnd = Date.now(); diff --git a/ts/test-mock/playwright.ts b/ts/test-mock/playwright.ts index 7760663c8..b908adce6 100644 --- a/ts/test-mock/playwright.ts +++ b/ts/test-mock/playwright.ts @@ -195,6 +195,11 @@ export class App extends EventEmitter { await window.evaluate('window.SignalCI.uploadBackup()'); } + public async migrateAllMessages(): Promise { + const window = await this.getWindow(); + await window.evaluate('window.SignalCI.migrateAllMessages()'); + } + public async unlink(): Promise { const window = await this.getWindow(); return window.evaluate('window.SignalCI.unlink()');