Improve backup export benchmark measurement
This commit is contained in:
parent
2e886235fa
commit
45e9c07125
7 changed files with 67 additions and 41 deletions
3
ts/CI.ts
3
ts/CI.ts
|
@ -11,6 +11,7 @@ import { explodePromise } from './util/explodePromise';
|
||||||
import { AccessType, ipcInvoke } from './sql/channels';
|
import { AccessType, ipcInvoke } from './sql/channels';
|
||||||
import { backupsService } from './services/backups';
|
import { backupsService } from './services/backups';
|
||||||
import { AttachmentBackupManager } from './jobs/AttachmentBackupManager';
|
import { AttachmentBackupManager } from './jobs/AttachmentBackupManager';
|
||||||
|
import { migrateAllMessages } from './messages/migrateMessageData';
|
||||||
import { SECOND } from './util/durations';
|
import { SECOND } from './util/durations';
|
||||||
import { isSignalRoute } from './util/signalRoutes';
|
import { isSignalRoute } from './util/signalRoutes';
|
||||||
import { strictAssert } from './util/assert';
|
import { strictAssert } from './util/assert';
|
||||||
|
@ -35,6 +36,7 @@ export type CIType = {
|
||||||
}
|
}
|
||||||
) => unknown;
|
) => unknown;
|
||||||
openSignalRoute(url: string): Promise<void>;
|
openSignalRoute(url: string): Promise<void>;
|
||||||
|
migrateAllMessages(): Promise<void>;
|
||||||
uploadBackup(): Promise<void>;
|
uploadBackup(): Promise<void>;
|
||||||
unlink: () => void;
|
unlink: () => void;
|
||||||
print: (...args: ReadonlyArray<unknown>) => void;
|
print: (...args: ReadonlyArray<unknown>) => void;
|
||||||
|
@ -192,6 +194,7 @@ export function getCI({ deviceName }: GetCIOptionsType): CIType {
|
||||||
solveChallenge,
|
solveChallenge,
|
||||||
waitForEvent,
|
waitForEvent,
|
||||||
openSignalRoute,
|
openSignalRoute,
|
||||||
|
migrateAllMessages,
|
||||||
uploadBackup,
|
uploadBackup,
|
||||||
unlink,
|
unlink,
|
||||||
getPendingEventCount,
|
getPendingEventCount,
|
||||||
|
|
|
@ -3,9 +3,11 @@
|
||||||
|
|
||||||
import { isFunction, isNumber } from 'lodash';
|
import { isFunction, isNumber } from 'lodash';
|
||||||
import pMap from 'p-map';
|
import pMap from 'p-map';
|
||||||
|
import PQueue from 'p-queue';
|
||||||
|
|
||||||
import { CURRENT_SCHEMA_VERSION } from '../types/Message2';
|
import { CURRENT_SCHEMA_VERSION } from '../types/Message2';
|
||||||
import { isNotNil } from '../util/isNotNil';
|
import { isNotNil } from '../util/isNotNil';
|
||||||
|
import { MINUTE } from '../util/durations';
|
||||||
import type { MessageAttributesType } from '../model-types.d';
|
import type { MessageAttributesType } from '../model-types.d';
|
||||||
import type { AciString } from '../types/ServiceId';
|
import type { AciString } from '../types/ServiceId';
|
||||||
import * as Errors from '../types/errors';
|
import * as Errors from '../types/errors';
|
||||||
|
@ -13,10 +15,30 @@ import { DataReader, DataWriter } from '../sql/Client';
|
||||||
|
|
||||||
const MAX_CONCURRENCY = 5;
|
const MAX_CONCURRENCY = 5;
|
||||||
|
|
||||||
|
// Don't migrate batches concurrently
|
||||||
|
const migrationQueue = new PQueue({
|
||||||
|
concurrency: 1,
|
||||||
|
timeout: MINUTE * 30,
|
||||||
|
});
|
||||||
|
|
||||||
|
type BatchResultType = Readonly<{
|
||||||
|
done: boolean;
|
||||||
|
numProcessed: number;
|
||||||
|
numSucceeded?: number;
|
||||||
|
numFailedSave?: number;
|
||||||
|
numFailedUpgrade?: number;
|
||||||
|
fetchDuration?: number;
|
||||||
|
upgradeDuration?: number;
|
||||||
|
saveDuration?: number;
|
||||||
|
totalDuration?: number;
|
||||||
|
}>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures that messages in database are at the right schema.
|
* Ensures that messages in database are at the right schema.
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
*/
|
*/
|
||||||
export async function migrateMessageData({
|
export async function _migrateMessageData({
|
||||||
numMessagesPerBatch,
|
numMessagesPerBatch,
|
||||||
upgradeMessageSchema,
|
upgradeMessageSchema,
|
||||||
getMessagesNeedingUpgrade,
|
getMessagesNeedingUpgrade,
|
||||||
|
@ -41,17 +63,7 @@ export async function migrateMessageData({
|
||||||
messageIds: ReadonlyArray<string>
|
messageIds: ReadonlyArray<string>
|
||||||
) => Promise<void>;
|
) => Promise<void>;
|
||||||
maxVersion?: number;
|
maxVersion?: number;
|
||||||
}>): Promise<{
|
}>): Promise<BatchResultType> {
|
||||||
done: boolean;
|
|
||||||
numProcessed: number;
|
|
||||||
numSucceeded?: number;
|
|
||||||
numFailedSave?: number;
|
|
||||||
numFailedUpgrade?: number;
|
|
||||||
fetchDuration?: number;
|
|
||||||
upgradeDuration?: number;
|
|
||||||
saveDuration?: number;
|
|
||||||
totalDuration?: number;
|
|
||||||
}> {
|
|
||||||
if (!isNumber(numMessagesPerBatch)) {
|
if (!isNumber(numMessagesPerBatch)) {
|
||||||
throw new TypeError("'numMessagesPerBatch' is required");
|
throw new TypeError("'numMessagesPerBatch' is required");
|
||||||
}
|
}
|
||||||
|
@ -148,13 +160,33 @@ export async function migrateBatchOfMessages({
|
||||||
numMessagesPerBatch,
|
numMessagesPerBatch,
|
||||||
}: {
|
}: {
|
||||||
numMessagesPerBatch: number;
|
numMessagesPerBatch: number;
|
||||||
}): ReturnType<typeof migrateMessageData> {
|
}): ReturnType<typeof _migrateMessageData> {
|
||||||
return migrateMessageData({
|
return migrationQueue.add(() =>
|
||||||
numMessagesPerBatch,
|
_migrateMessageData({
|
||||||
upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema,
|
numMessagesPerBatch,
|
||||||
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
|
upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema,
|
||||||
saveMessagesIndividually: DataWriter.saveMessagesIndividually,
|
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
|
||||||
incrementMessagesMigrationAttempts:
|
saveMessagesIndividually: DataWriter.saveMessagesIndividually,
|
||||||
DataWriter.incrementMessagesMigrationAttempts,
|
incrementMessagesMigrationAttempts:
|
||||||
});
|
DataWriter.incrementMessagesMigrationAttempts,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function migrateAllMessages(): Promise<void> {
|
||||||
|
const { log } = window.SignalContext;
|
||||||
|
|
||||||
|
let batch: BatchResultType | undefined;
|
||||||
|
let total = 0;
|
||||||
|
while (!batch?.done) {
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
batch = await migrateBatchOfMessages({
|
||||||
|
numMessagesPerBatch: 1000,
|
||||||
|
});
|
||||||
|
total += batch.numProcessed;
|
||||||
|
log.info(`migrateAllMessages: Migrated batch of ${batch.numProcessed}`);
|
||||||
|
}
|
||||||
|
log.info(
|
||||||
|
`migrateAllMessages: message migration complete; ${total} messages migrated`
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ import { CallLinkRestrictions } from '../../types/CallLink';
|
||||||
import { toAdminKeyBytes } from '../../util/callLinks';
|
import { toAdminKeyBytes } from '../../util/callLinks';
|
||||||
import { getRoomIdFromRootKey } from '../../util/callLinksRingrtc';
|
import { getRoomIdFromRootKey } from '../../util/callLinksRingrtc';
|
||||||
import { SeenStatus } from '../../MessageSeenStatus';
|
import { SeenStatus } from '../../MessageSeenStatus';
|
||||||
import { migrateBatchOfMessages } from '../../messages/migrateMessageData';
|
import { migrateAllMessages } from '../../messages/migrateMessageData';
|
||||||
|
|
||||||
const MAX_CONCURRENCY = 10;
|
const MAX_CONCURRENCY = 10;
|
||||||
|
|
||||||
|
@ -226,23 +226,7 @@ export class BackupExportStream extends Readable {
|
||||||
log.info('BackupExportStream: starting...');
|
log.info('BackupExportStream: starting...');
|
||||||
drop(AttachmentBackupManager.stop());
|
drop(AttachmentBackupManager.stop());
|
||||||
log.info('BackupExportStream: message migration starting...');
|
log.info('BackupExportStream: message migration starting...');
|
||||||
let batchMigrationResult:
|
await migrateAllMessages();
|
||||||
| Awaited<ReturnType<typeof migrateBatchOfMessages>>
|
|
||||||
| undefined;
|
|
||||||
let totalMigrated = 0;
|
|
||||||
while (!batchMigrationResult?.done) {
|
|
||||||
// eslint-disable-next-line no-await-in-loop
|
|
||||||
batchMigrationResult = await migrateBatchOfMessages({
|
|
||||||
numMessagesPerBatch: 1000,
|
|
||||||
});
|
|
||||||
totalMigrated += batchMigrationResult.numProcessed;
|
|
||||||
log.info(
|
|
||||||
`BackupExportStream: Migrated batch of ${batchMigrationResult.numProcessed}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
log.info(
|
|
||||||
`BackupExportStream: message migration complete; ${totalMigrated} messages migrated`
|
|
||||||
);
|
|
||||||
|
|
||||||
await pauseWriteAccess();
|
await pauseWriteAccess();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -577,7 +577,7 @@ export function reducer(
|
||||||
|
|
||||||
if (action.type === SHOW_BACKUP_IMPORT) {
|
if (action.type === SHOW_BACKUP_IMPORT) {
|
||||||
if (
|
if (
|
||||||
// Downloading backup after linking
|
// Downloading backup after linking
|
||||||
state.step !== InstallScreenStep.QrCodeNotScanned &&
|
state.step !== InstallScreenStep.QrCodeNotScanned &&
|
||||||
// Restarting backup download on startup
|
// Restarting backup download on startup
|
||||||
state.step !== InstallScreenStep.NotStarted
|
state.step !== InstallScreenStep.NotStarted
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import { v7 as uuid } from 'uuid';
|
import { v7 as uuid } from 'uuid';
|
||||||
import { migrateMessageData } from '../../messages/migrateMessageData';
|
import { _migrateMessageData as migrateMessageData } from '../../messages/migrateMessageData';
|
||||||
import type { MessageAttributesType } from '../../model-types';
|
import type { MessageAttributesType } from '../../model-types';
|
||||||
import { DataReader, DataWriter } from '../../sql/Client';
|
import { DataReader, DataWriter } from '../../sql/Client';
|
||||||
import { generateAci } from '../../types/ServiceId';
|
import { generateAci } from '../../types/ServiceId';
|
||||||
|
|
|
@ -22,6 +22,8 @@ Bootstrap.benchmark(async (bootstrap: Bootstrap): Promise<void> => {
|
||||||
const app = await bootstrap.link();
|
const app = await bootstrap.link();
|
||||||
const { duration: importDuration } = await app.waitForBackupImportComplete();
|
const { duration: importDuration } = await app.waitForBackupImportComplete();
|
||||||
|
|
||||||
|
await app.migrateAllMessages();
|
||||||
|
|
||||||
const exportStart = Date.now();
|
const exportStart = Date.now();
|
||||||
await app.uploadBackup();
|
await app.uploadBackup();
|
||||||
const exportEnd = Date.now();
|
const exportEnd = Date.now();
|
||||||
|
|
|
@ -195,6 +195,11 @@ export class App extends EventEmitter {
|
||||||
await window.evaluate('window.SignalCI.uploadBackup()');
|
await window.evaluate('window.SignalCI.uploadBackup()');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async migrateAllMessages(): Promise<void> {
|
||||||
|
const window = await this.getWindow();
|
||||||
|
await window.evaluate('window.SignalCI.migrateAllMessages()');
|
||||||
|
}
|
||||||
|
|
||||||
public async unlink(): Promise<void> {
|
public async unlink(): Promise<void> {
|
||||||
const window = await this.getWindow();
|
const window = await this.getWindow();
|
||||||
return window.evaluate('window.SignalCI.unlink()');
|
return window.evaluate('window.SignalCI.unlink()');
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue