signal-desktop/js/modules/messages_data_migrator.js

391 lines
11 KiB
JavaScript
Raw Normal View History

// Module to upgrade the schema of messages, e.g. migrate attachments to disk.
// `dangerouslyProcessAllWithoutIndex` purposely doesnt rely on our Backbone
// IndexedDB adapter to prevent automatic migrations. Rather, it uses direct
// IndexedDB access. This includes avoiding usage of `storage` module which uses
// Backbone under the hood.
/* global IDBKeyRange */
2018-04-27 21:25:04 +00:00
const { isFunction, isNumber, isObject, isString, last } = require('lodash');
const database = require('./database');
2018-03-21 23:37:39 +00:00
const Message = require('./types/message');
const settings = require('./settings');
const { deferredToPromise } = require('./deferred_to_promise');
const MESSAGES_STORE_NAME = 'messages';
2018-03-21 23:37:39 +00:00
2018-03-26 20:32:22 +00:00
exports.processNext = async ({
2018-03-21 23:37:39 +00:00
BackboneMessage,
BackboneMessageCollection,
numMessagesPerBatch,
2018-03-21 23:37:39 +00:00
upgradeMessageSchema,
} = {}) => {
if (!isFunction(BackboneMessage)) {
2018-04-27 21:25:04 +00:00
throw new TypeError(
"'BackboneMessage' (Whisper.Message) constructor is required"
);
2018-03-21 23:37:39 +00:00
}
if (!isFunction(BackboneMessageCollection)) {
2018-04-27 21:25:04 +00:00
throw new TypeError(
"'BackboneMessageCollection' (Whisper.MessageCollection)" +
' constructor is required'
);
2018-03-21 23:37:39 +00:00
}
if (!isNumber(numMessagesPerBatch)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'numMessagesPerBatch' is required");
2018-03-21 23:37:39 +00:00
}
if (!isFunction(upgradeMessageSchema)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'upgradeMessageSchema' is required");
2018-03-21 23:37:39 +00:00
}
const startTime = Date.now();
2018-03-27 16:19:55 +00:00
const fetchStartTime = Date.now();
2018-04-27 21:25:04 +00:00
const messagesRequiringSchemaUpgrade = await _fetchMessagesRequiringSchemaUpgrade(
{
BackboneMessageCollection,
count: numMessagesPerBatch,
2018-04-27 21:25:04 +00:00
}
);
2018-03-27 16:19:55 +00:00
const fetchDuration = Date.now() - fetchStartTime;
2018-03-21 23:37:39 +00:00
2018-03-27 16:19:55 +00:00
const upgradeStartTime = Date.now();
2018-04-27 21:25:04 +00:00
const upgradedMessages = await Promise.all(
messagesRequiringSchemaUpgrade.map(upgradeMessageSchema)
);
2018-03-27 16:19:55 +00:00
const upgradeDuration = Date.now() - upgradeStartTime;
2018-03-21 23:37:39 +00:00
2018-03-27 16:19:55 +00:00
const saveStartTime = Date.now();
const saveMessage = _saveMessageBackbone({ BackboneMessage });
2018-03-21 23:37:39 +00:00
await Promise.all(upgradedMessages.map(saveMessage));
2018-03-27 16:19:55 +00:00
const saveDuration = Date.now() - saveStartTime;
2018-03-21 23:37:39 +00:00
const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length;
const done = numProcessed < numMessagesPerBatch;
2018-03-21 23:37:39 +00:00
return {
done,
2018-03-21 23:37:39 +00:00
numProcessed,
fetchDuration,
upgradeDuration,
saveDuration,
totalDuration,
};
};
exports.dangerouslyProcessAllWithoutIndex = async ({
databaseName,
minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema,
logger,
} = {}) => {
if (!isString(databaseName)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'databaseName' must be a string");
}
if (!isNumber(minDatabaseVersion)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'minDatabaseVersion' must be a number");
}
if (!isNumber(numMessagesPerBatch)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'numMessagesPerBatch' must be a number");
}
if (!isFunction(upgradeMessageSchema)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'upgradeMessageSchema' is required");
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
logger.info('Database status', {
databaseVersion,
isValidDatabaseVersion,
minDatabaseVersion,
});
if (!isValidDatabaseVersion) {
2018-04-27 21:25:04 +00:00
throw new Error(
`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`
);
}
// NOTE: Even if we make this async using `then`, requesting `count` on an
// IndexedDB store blocks all subsequent transactions, so we might as well
// explicitly wait for it here:
const numTotalMessages = await _getNumMessages({ connection });
const migrationStartTime = Date.now();
let numCumulativeMessagesProcessed = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const status = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
});
if (status.done) {
break;
}
numCumulativeMessagesProcessed += status.numMessagesProcessed;
logger.info(
2018-04-27 21:25:04 +00:00
'Upgrade message schema:',
Object.assign({}, status, {
numTotalMessages,
numCumulativeMessagesProcessed,
})
);
}
2018-03-28 14:37:21 +00:00
logger.info('Close database connection');
connection.close();
const totalDuration = Date.now() - migrationStartTime;
logger.info('Attachment migration complete:', {
2018-03-28 14:24:07 +00:00
totalDuration,
totalMessagesProcessed: numCumulativeMessagesProcessed,
2018-03-28 14:24:07 +00:00
});
};
exports.processNextBatchWithoutIndex = async ({
databaseName,
minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema,
} = {}) => {
if (!isFunction(upgradeMessageSchema)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'upgradeMessageSchema' is required");
}
const connection = await _getConnection({ databaseName, minDatabaseVersion });
const batch = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
});
return batch;
};
// Private API
const _getConnection = async ({ databaseName, minDatabaseVersion }) => {
if (!isString(databaseName)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'databaseName' must be a string");
}
if (!isNumber(minDatabaseVersion)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'minDatabaseVersion' must be a number");
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
if (!isValidDatabaseVersion) {
2018-04-27 21:25:04 +00:00
throw new Error(
`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`
);
}
return connection;
};
const _processBatch = async ({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
} = {}) => {
if (!isObject(connection)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'connection' must be a string");
}
if (!isFunction(upgradeMessageSchema)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'upgradeMessageSchema' is required");
}
if (!isNumber(numMessagesPerBatch)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'numMessagesPerBatch' is required");
}
2018-04-27 21:25:04 +00:00
const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete(
connection
);
if (isAttachmentMigrationComplete) {
return {
done: true,
};
}
2018-04-27 21:25:04 +00:00
const lastProcessedIndex = await settings.getAttachmentMigrationLastProcessedIndex(
connection
);
const fetchUnprocessedMessagesStartTime = Date.now();
2018-04-27 21:25:04 +00:00
const unprocessedMessages = await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex(
{
connection,
count: numMessagesPerBatch,
lastIndex: lastProcessedIndex,
2018-04-27 21:25:04 +00:00
}
);
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const upgradeStartTime = Date.now();
2018-04-27 21:25:04 +00:00
const upgradedMessages = await Promise.all(
unprocessedMessages.map(upgradeMessageSchema)
);
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
const numMessagesProcessed = upgradedMessages.length;
const done = numMessagesProcessed < numMessagesPerBatch;
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (!done) {
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
} else {
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
}
const batchTotalDuration = Date.now() - fetchUnprocessedMessagesStartTime;
return {
batchTotalDuration,
done,
fetchDuration,
lastProcessedIndex,
newLastProcessedIndex,
numMessagesProcessed,
saveDuration,
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
upgradeDuration,
};
};
2018-04-27 21:25:04 +00:00
const _saveMessageBackbone = ({ BackboneMessage } = {}) => message => {
2018-03-21 23:37:39 +00:00
const backboneMessage = new BackboneMessage(message);
return deferredToPromise(backboneMessage.save());
2018-03-21 23:37:39 +00:00
};
2018-04-27 21:25:04 +00:00
const _saveMessage = ({ transaction } = {}) => message => {
if (!isObject(transaction)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'transaction' is required");
}
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const request = messagesStore.put(message, message.id);
return new Promise((resolve, reject) => {
2018-04-27 21:25:04 +00:00
request.onsuccess = () => resolve();
request.onerror = event => reject(event.target.error);
});
};
2018-04-27 21:25:04 +00:00
const _fetchMessagesRequiringSchemaUpgrade = async ({
BackboneMessageCollection,
count,
} = {}) => {
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError(
"'BackboneMessageCollection' (Whisper.MessageCollection)" +
' constructor is required'
);
}
2018-03-21 23:37:39 +00:00
2018-04-27 21:25:04 +00:00
if (!isNumber(count)) {
throw new TypeError("'count' is required");
}
2018-03-21 23:37:39 +00:00
2018-04-27 21:25:04 +00:00
const collection = new BackboneMessageCollection();
return new Promise(resolve =>
collection
.fetch({
limit: count,
index: {
name: 'schemaVersion',
upper: Message.CURRENT_SCHEMA_VERSION,
excludeUpper: true,
order: 'desc',
},
})
.always(() => {
const models = collection.models || [];
const messages = models.map(model => model.toJSON());
resolve(messages);
})
);
};
// NOTE: Named dangerous because it is not as efficient as using our
// `messages` `schemaVersion` index:
2018-04-27 21:25:04 +00:00
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({
connection,
count,
lastIndex,
} = {}) => {
if (!isObject(connection)) {
throw new TypeError("'connection' is required");
}
2018-04-27 21:25:04 +00:00
if (!isNumber(count)) {
throw new TypeError("'count' is required");
}
2018-04-27 21:25:04 +00:00
if (lastIndex && !isString(lastIndex)) {
throw new TypeError("'lastIndex' must be a string");
}
2018-04-27 21:25:04 +00:00
const hasLastIndex = Boolean(lastIndex);
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const excludeLowerBound = true;
const range = hasLastIndex
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
: undefined;
return new Promise((resolve, reject) => {
const items = [];
const request = messagesStore.openCursor(range);
request.onsuccess = event => {
const cursor = event.target.result;
const hasMoreData = Boolean(cursor);
if (!hasMoreData || items.length === count) {
resolve(items);
return;
}
const item = cursor.value;
items.push(item);
cursor.continue();
};
request.onerror = event => reject(event.target.error);
});
};
const _getNumMessages = async ({ connection } = {}) => {
if (!isObject(connection)) {
2018-04-11 19:44:52 +00:00
throw new TypeError("'connection' is required");
}
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const numTotalMessages = await database.getCount({ store: messagesStore });
await database.completeTransaction(transaction);
return numTotalMessages;
};