2023-01-03 19:55:46 +00:00
|
|
|
// Copyright 2018 Signal Messenger, LLC
|
2020-10-30 20:34:04 +00:00
|
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
|
2022-05-31 23:56:25 +00:00
|
|
|
import { isFunction, isNumber } from 'lodash';
|
2022-06-20 21:18:23 +00:00
|
|
|
import pMap from 'p-map';
|
|
|
|
|
2022-06-10 01:10:20 +00:00
|
|
|
import { CURRENT_SCHEMA_VERSION } from '../types/Message2';
|
2022-06-20 21:18:23 +00:00
|
|
|
import { isNotNil } from '../util/isNotNil';
|
2022-05-31 23:56:25 +00:00
|
|
|
import type { MessageAttributesType } from '../model-types.d';
|
2023-08-10 16:43:33 +00:00
|
|
|
import type { AciString } from '../types/ServiceId';
|
2022-06-20 21:18:23 +00:00
|
|
|
import * as Errors from '../types/errors';
|
2024-09-23 19:24:41 +00:00
|
|
|
import { DataReader, DataWriter } from '../sql/Client';
|
2022-06-20 21:18:23 +00:00
|
|
|
|
|
|
|
const MAX_CONCURRENCY = 5;
|
2018-03-27 16:19:40 +00:00
|
|
|
|
2022-05-31 23:56:25 +00:00
|
|
|
/**
|
|
|
|
* Ensures that messages in database are at the right schema.
|
|
|
|
*/
|
|
|
|
export async function migrateMessageData({
|
2018-04-03 18:43:17 +00:00
|
|
|
numMessagesPerBatch,
|
2018-03-21 23:37:39 +00:00
|
|
|
upgradeMessageSchema,
|
2018-07-25 22:02:27 +00:00
|
|
|
getMessagesNeedingUpgrade,
|
2022-06-20 21:18:23 +00:00
|
|
|
saveMessages,
|
2024-08-22 18:12:00 +00:00
|
|
|
incrementMessagesMigrationAttempts,
|
2022-06-10 01:10:20 +00:00
|
|
|
maxVersion = CURRENT_SCHEMA_VERSION,
|
2022-05-31 23:56:25 +00:00
|
|
|
}: Readonly<{
|
|
|
|
numMessagesPerBatch: number;
|
|
|
|
upgradeMessageSchema: (
|
|
|
|
message: MessageAttributesType,
|
|
|
|
options: { maxVersion: number }
|
|
|
|
) => Promise<MessageAttributesType>;
|
|
|
|
getMessagesNeedingUpgrade: (
|
|
|
|
limit: number,
|
|
|
|
options: { maxVersion: number }
|
|
|
|
) => Promise<Array<MessageAttributesType>>;
|
2022-06-20 21:18:23 +00:00
|
|
|
saveMessages: (
|
|
|
|
data: ReadonlyArray<MessageAttributesType>,
|
2023-08-10 16:43:33 +00:00
|
|
|
options: { ourAci: AciString }
|
2024-06-03 17:02:25 +00:00
|
|
|
) => Promise<unknown>;
|
2024-08-22 18:12:00 +00:00
|
|
|
incrementMessagesMigrationAttempts: (
|
|
|
|
messageIds: ReadonlyArray<string>
|
|
|
|
) => Promise<void>;
|
2022-05-31 23:56:25 +00:00
|
|
|
maxVersion?: number;
|
|
|
|
}>): Promise<
|
|
|
|
| {
|
|
|
|
done: true;
|
|
|
|
numProcessed: 0;
|
|
|
|
}
|
|
|
|
| {
|
|
|
|
done: boolean;
|
|
|
|
numProcessed: number;
|
|
|
|
fetchDuration: number;
|
|
|
|
upgradeDuration: number;
|
|
|
|
saveDuration: number;
|
|
|
|
totalDuration: number;
|
|
|
|
}
|
|
|
|
> {
|
2018-04-03 18:43:17 +00:00
|
|
|
if (!isNumber(numMessagesPerBatch)) {
|
2018-04-11 19:44:52 +00:00
|
|
|
throw new TypeError("'numMessagesPerBatch' is required");
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (!isFunction(upgradeMessageSchema)) {
|
2018-04-11 19:44:52 +00:00
|
|
|
throw new TypeError("'upgradeMessageSchema' is required");
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const startTime = Date.now();
|
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const fetchStartTime = Date.now();
|
2018-09-21 01:47:19 +00:00
|
|
|
let messagesRequiringSchemaUpgrade;
|
|
|
|
try {
|
|
|
|
messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
|
|
|
|
numMessagesPerBatch,
|
2022-05-31 23:56:25 +00:00
|
|
|
{ maxVersion }
|
2018-09-21 01:47:19 +00:00
|
|
|
);
|
|
|
|
} catch (error) {
|
2021-10-07 23:28:47 +00:00
|
|
|
window.SignalContext.log.error(
|
2022-06-20 21:18:23 +00:00
|
|
|
'migrateMessageData.getMessagesNeedingUpgrade error:',
|
|
|
|
Errors.toLogFormat(error)
|
2018-09-21 01:47:19 +00:00
|
|
|
);
|
|
|
|
return {
|
|
|
|
done: true,
|
|
|
|
numProcessed: 0,
|
|
|
|
};
|
|
|
|
}
|
2018-03-27 16:19:55 +00:00
|
|
|
const fetchDuration = Date.now() - fetchStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const upgradeStartTime = Date.now();
|
2024-08-22 18:12:00 +00:00
|
|
|
const failedMessages = new Array<string>();
|
2022-06-20 21:18:23 +00:00
|
|
|
const upgradedMessages = (
|
|
|
|
await pMap(
|
|
|
|
messagesRequiringSchemaUpgrade,
|
|
|
|
async message => {
|
|
|
|
try {
|
|
|
|
return await upgradeMessageSchema(message, { maxVersion });
|
|
|
|
} catch (error) {
|
|
|
|
window.SignalContext.log.error(
|
|
|
|
'migrateMessageData.upgradeMessageSchema error:',
|
|
|
|
Errors.toLogFormat(error)
|
|
|
|
);
|
2024-08-22 18:12:00 +00:00
|
|
|
failedMessages.push(message.id);
|
2022-06-20 21:18:23 +00:00
|
|
|
return undefined;
|
|
|
|
}
|
|
|
|
},
|
|
|
|
{ concurrency: MAX_CONCURRENCY }
|
2018-07-27 02:19:34 +00:00
|
|
|
)
|
2022-06-20 21:18:23 +00:00
|
|
|
).filter(isNotNil);
|
2018-03-27 16:19:55 +00:00
|
|
|
const upgradeDuration = Date.now() - upgradeStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const saveStartTime = Date.now();
|
2022-06-20 21:18:23 +00:00
|
|
|
|
2023-08-10 16:43:33 +00:00
|
|
|
const ourAci = window.textsecure.storage.user.getCheckedAci();
|
2024-08-22 18:12:00 +00:00
|
|
|
await saveMessages(upgradedMessages, { ourAci });
|
|
|
|
if (failedMessages.length) {
|
|
|
|
await incrementMessagesMigrationAttempts(failedMessages);
|
|
|
|
}
|
2018-03-27 16:19:55 +00:00
|
|
|
const saveDuration = Date.now() - saveStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
|
|
const totalDuration = Date.now() - startTime;
|
|
|
|
const numProcessed = messagesRequiringSchemaUpgrade.length;
|
2018-04-03 18:43:17 +00:00
|
|
|
const done = numProcessed < numMessagesPerBatch;
|
2018-03-21 23:37:39 +00:00
|
|
|
return {
|
2018-03-30 21:20:50 +00:00
|
|
|
done,
|
2018-03-21 23:37:39 +00:00
|
|
|
numProcessed,
|
|
|
|
fetchDuration,
|
|
|
|
upgradeDuration,
|
|
|
|
saveDuration,
|
|
|
|
totalDuration,
|
|
|
|
};
|
2022-05-31 23:56:25 +00:00
|
|
|
}
|
2024-09-23 19:24:41 +00:00
|
|
|
|
|
|
|
export async function migrateBatchOfMessages({
|
|
|
|
numMessagesPerBatch,
|
|
|
|
}: {
|
|
|
|
numMessagesPerBatch: number;
|
|
|
|
}): ReturnType<typeof migrateMessageData> {
|
|
|
|
return migrateMessageData({
|
|
|
|
numMessagesPerBatch,
|
|
|
|
upgradeMessageSchema: window.Signal.Migrations.upgradeMessageSchema,
|
|
|
|
getMessagesNeedingUpgrade: DataReader.getMessagesNeedingUpgrade,
|
|
|
|
saveMessages: DataWriter.saveMessages,
|
|
|
|
incrementMessagesMigrationAttempts:
|
|
|
|
DataWriter.incrementMessagesMigrationAttempts,
|
|
|
|
});
|
|
|
|
}
|