signal-desktop/js/modules/messages_data_migrator.js
2018-10-01 18:18:37 -07:00

405 lines
11 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Module to upgrade the schema of messages, e.g. migrate attachments to disk.
// `dangerouslyProcessAllWithoutIndex` purposely doesnt rely on our Backbone
// IndexedDB adapter to prevent automatic migrations. Rather, it uses direct
// IndexedDB access. This includes avoiding usage of `storage` module which uses
// Backbone under the hood.
/* global IDBKeyRange, window */
const { isFunction, isNumber, isObject, isString, last } = require('lodash');
const database = require('./database');
const Message = require('./types/message');
const settings = require('./settings');
const MESSAGES_STORE_NAME = 'messages';
exports.processNext = async ({
BackboneMessage,
BackboneMessageCollection,
numMessagesPerBatch,
upgradeMessageSchema,
getMessagesNeedingUpgrade,
saveMessage,
maxVersion = Message.CURRENT_SCHEMA_VERSION,
} = {}) => {
if (!isFunction(BackboneMessage)) {
throw new TypeError(
"'BackboneMessage' (Whisper.Message) constructor is required"
);
}
if (!isFunction(BackboneMessageCollection)) {
throw new TypeError(
"'BackboneMessageCollection' (Whisper.MessageCollection)" +
' constructor is required'
);
}
if (!isNumber(numMessagesPerBatch)) {
throw new TypeError("'numMessagesPerBatch' is required");
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
const startTime = Date.now();
const fetchStartTime = Date.now();
let messagesRequiringSchemaUpgrade;
try {
messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
numMessagesPerBatch,
{
maxVersion,
MessageCollection: BackboneMessageCollection,
}
);
} catch (error) {
window.log.error(
'processNext error:',
error && error.stack ? error.stack : error
);
return {
done: true,
numProcessed: 0,
};
}
const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now();
const upgradedMessages = await Promise.all(
messagesRequiringSchemaUpgrade.map(message =>
upgradeMessageSchema(message, { maxVersion })
)
);
const upgradeDuration = Date.now() - upgradeStartTime;
const saveStartTime = Date.now();
await Promise.all(
upgradedMessages.map(message =>
saveMessage(message, { Message: BackboneMessage })
)
);
const saveDuration = Date.now() - saveStartTime;
const totalDuration = Date.now() - startTime;
const numProcessed = messagesRequiringSchemaUpgrade.length;
const done = numProcessed < numMessagesPerBatch;
return {
done,
numProcessed,
fetchDuration,
upgradeDuration,
saveDuration,
totalDuration,
};
};
exports.dangerouslyProcessAllWithoutIndex = async ({
databaseName,
minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema,
logger,
maxVersion = Message.CURRENT_SCHEMA_VERSION,
saveMessage,
BackboneMessage,
} = {}) => {
if (!isString(databaseName)) {
throw new TypeError("'databaseName' must be a string");
}
if (!isNumber(minDatabaseVersion)) {
throw new TypeError("'minDatabaseVersion' must be a number");
}
if (!isNumber(numMessagesPerBatch)) {
throw new TypeError("'numMessagesPerBatch' must be a number");
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
if (!isFunction(BackboneMessage)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
if (!isFunction(saveMessage)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
logger.info('Database status', {
databaseVersion,
isValidDatabaseVersion,
minDatabaseVersion,
});
if (!isValidDatabaseVersion) {
throw new Error(
`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`
);
}
// NOTE: Even if we make this async using `then`, requesting `count` on an
// IndexedDB store blocks all subsequent transactions, so we might as well
// explicitly wait for it here:
const numTotalMessages = await exports.getNumMessages({ connection });
const migrationStartTime = Date.now();
let numCumulativeMessagesProcessed = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const status = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
maxVersion,
saveMessage,
BackboneMessage,
});
if (status.done) {
break;
}
numCumulativeMessagesProcessed += status.numMessagesProcessed;
logger.info(
'Upgrade message schema:',
Object.assign({}, status, {
numTotalMessages,
numCumulativeMessagesProcessed,
})
);
}
logger.info('Close database connection');
connection.close();
const totalDuration = Date.now() - migrationStartTime;
logger.info('Attachment migration complete:', {
totalDuration,
totalMessagesProcessed: numCumulativeMessagesProcessed,
});
};
exports.processNextBatchWithoutIndex = async ({
databaseName,
minDatabaseVersion,
numMessagesPerBatch,
upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
} = {}) => {
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
const connection = await _getConnection({ databaseName, minDatabaseVersion });
const batch = await _processBatch({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
});
return batch;
};
// Private API
const _getConnection = async ({ databaseName, minDatabaseVersion }) => {
if (!isString(databaseName)) {
throw new TypeError("'databaseName' must be a string");
}
if (!isNumber(minDatabaseVersion)) {
throw new TypeError("'minDatabaseVersion' must be a number");
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
if (!isValidDatabaseVersion) {
throw new Error(
`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`
);
}
return connection;
};
const _processBatch = async ({
connection,
numMessagesPerBatch,
upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
} = {}) => {
if (!isObject(connection)) {
throw new TypeError('_processBatch: connection must be a string');
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('_processBatch: upgradeMessageSchema is required');
}
if (!isNumber(numMessagesPerBatch)) {
throw new TypeError('_processBatch: numMessagesPerBatch is required');
}
if (!isNumber(maxVersion)) {
throw new TypeError('_processBatch: maxVersion is required');
}
if (!isFunction(BackboneMessage)) {
throw new TypeError('_processBatch: BackboneMessage is required');
}
if (!isFunction(saveMessage)) {
throw new TypeError('_processBatch: saveMessage is required');
}
const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete(
connection
);
if (isAttachmentMigrationComplete) {
return {
done: true,
};
}
const lastProcessedIndex = await settings.getAttachmentMigrationLastProcessedIndex(
connection
);
const fetchUnprocessedMessagesStartTime = Date.now();
let unprocessedMessages;
try {
unprocessedMessages = await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex(
{
connection,
count: numMessagesPerBatch,
lastIndex: lastProcessedIndex,
}
);
} catch (error) {
window.log.error(
'_processBatch error:',
error && error.stack ? error.stack : error
);
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
return {
done: true,
};
}
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const upgradeStartTime = Date.now();
const upgradedMessages = await Promise.all(
unprocessedMessages.map(message =>
upgradeMessageSchema(message, { maxVersion })
)
);
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
await Promise.all(
upgradedMessages.map(message =>
saveMessage(message, { Message: BackboneMessage })
)
);
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
const numMessagesProcessed = upgradedMessages.length;
const done = numMessagesProcessed < numMessagesPerBatch;
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (!done) {
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
} else {
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
}
const batchTotalDuration = Date.now() - fetchUnprocessedMessagesStartTime;
return {
batchTotalDuration,
done,
fetchDuration,
lastProcessedIndex,
newLastProcessedIndex,
numMessagesProcessed,
saveDuration,
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
upgradeDuration,
};
};
// NOTE: Named dangerous because it is not as efficient as using our
// `messages` `schemaVersion` index:
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({
connection,
count,
lastIndex,
} = {}) => {
if (!isObject(connection)) {
throw new TypeError("'connection' is required");
}
if (!isNumber(count)) {
throw new TypeError("'count' is required");
}
if (lastIndex && !isString(lastIndex)) {
throw new TypeError("'lastIndex' must be a string");
}
const hasLastIndex = Boolean(lastIndex);
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const excludeLowerBound = true;
const range = hasLastIndex
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
: undefined;
return new Promise((resolve, reject) => {
const items = [];
const request = messagesStore.openCursor(range);
request.onsuccess = event => {
const cursor = event.target.result;
const hasMoreData = Boolean(cursor);
if (!hasMoreData || items.length === count) {
resolve(items);
return;
}
const item = cursor.value;
items.push(item);
cursor.continue();
};
request.onerror = event => reject(event.target.error);
});
};
exports.getNumMessages = async ({ connection } = {}) => {
if (!isObject(connection)) {
throw new TypeError("'connection' is required");
}
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const numTotalMessages = await database.getCount({ store: messagesStore });
await database.completeTransaction(transaction);
return numTotalMessages;
};