signal-desktop/js/modules/messages_data_migrator.js

291 lines
9 KiB
JavaScript
Raw Normal View History

// Module to upgrade the schema of messages, e.g. migrate attachments to disk.
// `processAll` purposely doesnt rely on our Backbone IndexedDB adapter to
// prevent automatic migrations. Rather, it uses direct IndexedDB access.
// This includes avoiding usage of `storage` module which uses Backbone under
// the hood.
/* global IDBKeyRange */
2018-04-02 19:08:53 +00:00
const {
isFunction,
isNumber,
isObject,
isString,
last,
} = require('lodash');
const database = require('./database');
2018-03-21 23:37:39 +00:00
const Message = require('./types/message');
const settings = require('./settings');
const { deferredToPromise } = require('./deferred_to_promise');
const MESSAGES_STORE_NAME = 'messages';
const NUM_MESSAGES_PER_BATCH = 1;
2018-03-21 23:37:39 +00:00
2018-03-26 20:32:22 +00:00
exports.processNext = async ({
2018-03-21 23:37:39 +00:00
BackboneMessage,
BackboneMessageCollection,
count,
upgradeMessageSchema,
} = {}) => {
if (!isFunction(BackboneMessage)) {
throw new TypeError('"BackboneMessage" (Whisper.Message) constructor is required');
2018-03-21 23:37:39 +00:00
}
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
2018-03-21 23:37:39 +00:00
' constructor is required');
}
if (!isNumber(count)) {
throw new TypeError('"count" is required');
2018-03-21 23:37:39 +00:00
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required');
2018-03-21 23:37:39 +00:00
}
const startTime = Date.now();
2018-03-27 16:19:55 +00:00
const fetchStartTime = Date.now();
2018-03-21 23:37:39 +00:00
const messagesRequiringSchemaUpgrade =
await _fetchMessagesRequiringSchemaUpgrade({ BackboneMessageCollection, count });
2018-03-27 16:19:55 +00:00
const fetchDuration = Date.now() - fetchStartTime;
2018-03-21 23:37:39 +00:00
2018-03-27 16:19:55 +00:00
const upgradeStartTime = Date.now();
2018-03-21 23:37:39 +00:00
const upgradedMessages =
await Promise.all(messagesRequiringSchemaUpgrade.map(upgradeMessageSchema));
2018-03-27 16:19:55 +00:00
const upgradeDuration = Date.now() - upgradeStartTime;
2018-03-21 23:37:39 +00:00
2018-03-27 16:19:55 +00:00
const saveStartTime = Date.now();
const saveMessage = _saveMessageBackbone({ BackboneMessage });
2018-03-21 23:37:39 +00:00
await Promise.all(upgradedMessages.map(saveMessage));
2018-03-27 16:19:55 +00:00
const saveDuration = Date.now() - saveStartTime;
2018-03-21 23:37:39 +00:00
const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length;
const hasMore = numProcessed > 0;
return {
hasMore,
numProcessed,
fetchDuration,
upgradeDuration,
saveDuration,
totalDuration,
};
};
exports.processAll = async ({
databaseName,
minDatabaseVersion,
upgradeMessageSchema,
} = {}) => {
if (!isString(databaseName)) {
throw new TypeError('"databaseName" must be a string');
}
if (!isNumber(minDatabaseVersion)) {
throw new TypeError('"minDatabaseVersion" must be a number');
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required');
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
console.log('Database status', {
databaseVersion,
isValidDatabaseVersion,
minDatabaseVersion,
});
if (!isValidDatabaseVersion) {
throw new Error(`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`);
}
const isComplete = await settings.isAttachmentMigrationComplete(connection);
2018-03-28 14:25:22 +00:00
console.log('Attachment migration status:', isComplete ? 'complete' : 'incomplete');
if (isComplete) {
return;
}
let numTotalMessages = null;
// eslint-disable-next-line more/no-then
getNumMessages({ connection }).then((numMessages) => {
numTotalMessages = numMessages;
});
const migrationStartTime = Date.now();
let unprocessedMessages = [];
2018-03-28 14:24:07 +00:00
let totalMessagesProcessed = 0;
do {
const lastProcessedIndex =
// eslint-disable-next-line no-await-in-loop
await settings.getAttachmentMigrationLastProcessedIndex(connection);
const fetchUnprocessedMessagesStartTime = Date.now();
unprocessedMessages =
// eslint-disable-next-line no-await-in-loop
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
connection,
2018-03-27 16:15:14 +00:00
count: NUM_MESSAGES_PER_BATCH,
lastIndex: lastProcessedIndex,
});
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const numUnprocessedMessages = unprocessedMessages.length;
if (numUnprocessedMessages === 0) {
break;
}
const upgradeStartTime = Date.now();
const upgradedMessages =
// eslint-disable-next-line no-await-in-loop
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
// eslint-disable-next-line no-await-in-loop
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
// eslint-disable-next-line no-await-in-loop
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
// TODO: Confirm transaction is complete
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (newLastProcessedIndex) {
// eslint-disable-next-line no-await-in-loop
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
}
2018-03-28 14:24:07 +00:00
totalMessagesProcessed += numUnprocessedMessages;
console.log('Upgrade message schema:', {
lastProcessedIndex,
numUnprocessedMessages,
2018-03-28 14:24:07 +00:00
numCumulativeMessagesProcessed: totalMessagesProcessed,
numTotalMessages,
fetchDuration,
saveDuration,
upgradeDuration,
newLastProcessedIndex,
2018-03-28 14:24:49 +00:00
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
});
} while (unprocessedMessages.length > 0);
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
2018-03-28 14:37:21 +00:00
console.log('Close database connection');
connection.close();
const totalDuration = Date.now() - migrationStartTime;
2018-03-28 14:24:07 +00:00
console.log('Attachment migration complete:', {
totalDuration,
totalMessagesProcessed,
});
};
const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => {
2018-03-21 23:37:39 +00:00
const backboneMessage = new BackboneMessage(message);
return deferredToPromise(backboneMessage.save());
2018-03-21 23:37:39 +00:00
};
const _saveMessage = ({ transaction } = {}) => (message) => {
if (!isObject(transaction)) {
throw new TypeError('"transaction" is required');
}
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const request = messagesStore.put(message, message.id);
return new Promise((resolve, reject) => {
request.onsuccess = () =>
resolve();
request.onerror = event =>
reject(event.target.error);
});
};
2018-03-21 23:37:39 +00:00
const _fetchMessagesRequiringSchemaUpgrade =
async ({ BackboneMessageCollection, count } = {}) => {
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
2018-03-21 23:37:39 +00:00
' constructor is required');
}
if (!isNumber(count)) {
throw new TypeError('"count" is required');
2018-03-21 23:37:39 +00:00
}
const collection = new BackboneMessageCollection();
return new Promise(resolve => collection.fetch({
limit: count,
index: {
name: 'schemaVersion',
upper: Message.CURRENT_SCHEMA_VERSION,
excludeUpper: true,
order: 'desc',
},
}).always(() => {
const models = collection.models || [];
const messages = models.map(model => model.toJSON());
resolve(messages);
}));
};
// NOTE: Named dangerous because it is not as efficient as using our
// `messages` `schemaVersion` index:
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
({ connection, count, lastIndex } = {}) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
if (!isNumber(count)) {
throw new TypeError('"count" is required');
}
if (lastIndex && !isString(lastIndex)) {
throw new TypeError('"lastIndex" must be a string');
}
const hasLastIndex = Boolean(lastIndex);
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const excludeLowerBound = true;
const query = hasLastIndex
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
: undefined;
const request = messagesStore.getAll(query, count);
return new Promise((resolve, reject) => {
request.onsuccess = event =>
resolve(event.target.result);
request.onerror = event =>
reject(event.target.error);
});
};
const getNumMessages = async ({ connection } = {}) => {
if (!isObject(connection)) {
throw new TypeError('"connection" is required');
}
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const numTotalMessages = await database.getCount({ store: messagesStore });
await database.completeTransaction(transaction);
return numTotalMessages;
};