Add support for attachment background migration without index

This commit is contained in:
Daniel Gasienica 2018-04-02 18:59:24 -04:00
parent 32ac99b439
commit 057762806e
2 changed files with 153 additions and 95 deletions

View file

@ -19,7 +19,7 @@
const { upgradeMessageSchema } = window.Signal.Migrations; const { upgradeMessageSchema } = window.Signal.Migrations;
const { const {
Migrations0DatabaseWithAttachmentData, Migrations0DatabaseWithAttachmentData,
// Migrations1DatabaseWithoutAttachmentData, Migrations1DatabaseWithoutAttachmentData,
} = window.Signal.Migrations; } = window.Signal.Migrations;
const { Views } = window.Signal; const { Views } = window.Signal;
@ -86,11 +86,21 @@
console.log('Run migrations on database with attachment data'); console.log('Run migrations on database with attachment data');
await Migrations0DatabaseWithAttachmentData.run({ Backbone }); await Migrations0DatabaseWithAttachmentData.run({ Backbone });
const database = Whisper.Database; await MessageDataMigrator.dangerouslyProcessAllWithoutIndex({
const status = await Migrations1DatabaseWithoutAttachmentData.getStatus({ database }); databaseName: Migrations0DatabaseWithAttachmentData.getDatabase().name,
minDatabaseVersion: Migrations0DatabaseWithAttachmentData.getDatabase().version,
upgradeMessageSchema,
});
const status = await Migrations1DatabaseWithoutAttachmentData.getStatus({
database: Whisper.Database,
});
console.log('Run migrations on database without attachment data:', status); console.log('Run migrations on database without attachment data:', status);
if (status.canRun) { if (status.canRun) {
await Migrations1DatabaseWithoutAttachmentData.run({ Backbone, database }); await Migrations1DatabaseWithoutAttachmentData.run({
Backbone,
database: Whisper.Database,
});
} }
console.log('Storage fetch'); console.log('Storage fetch');
@ -98,19 +108,18 @@
const idleDetector = new IdleDetector(); const idleDetector = new IdleDetector();
const NUM_MESSAGE_UPGRADES_PER_IDLE = 2;
idleDetector.on('idle', async () => { idleDetector.on('idle', async () => {
const results = await MessageDataMigrator.processNext({ // const database = Migrations0DatabaseWithAttachmentData.getDatabase();
BackboneMessage: Whisper.Message, // const batch = await MessageDataMigrator.processNextBatchWithoutIndex({
BackboneMessageCollection: Whisper.MessageCollection, // databaseName: database.name,
count: NUM_MESSAGE_UPGRADES_PER_IDLE, // minDatabaseVersion: database.version,
upgradeMessageSchema, // upgradeMessageSchema,
}); // });
console.log('Upgrade message schema:', results); // console.log('Upgrade message schema:', batch);
if (!results.done) { // if (batch.done) {
idleDetector.stop(); // idleDetector.stop();
} // }
}); });
/* eslint-disable */ /* eslint-disable */

View file

@ -1,8 +1,8 @@
// Module to upgrade the schema of messages, e.g. migrate attachments to disk. // Module to upgrade the schema of messages, e.g. migrate attachments to disk.
// `processAll` purposely doesnt rely on our Backbone IndexedDB adapter to // `dangerouslyProcessAllWithoutIndex` purposely doesnt rely on our Backbone
// prevent automatic migrations. Rather, it uses direct IndexedDB access. // IndexedDB adapter to prevent automatic migrations. Rather, it uses direct
// This includes avoiding usage of `storage` module which uses Backbone under // IndexedDB access. This includes avoiding usage of `storage` module which uses
// the hood. // Backbone under the hood.
/* global IDBKeyRange */ /* global IDBKeyRange */
@ -81,7 +81,7 @@ exports.processNext = async ({
}; };
}; };
exports.processAll = async ({ exports.dangerouslyProcessAllWithoutIndex = async ({
databaseName, databaseName,
minDatabaseVersion, minDatabaseVersion,
upgradeMessageSchema, upgradeMessageSchema,
@ -111,84 +111,26 @@ exports.processAll = async ({
` to be at least ${minDatabaseVersion}`); ` to be at least ${minDatabaseVersion}`);
} }
const isComplete = await settings.isAttachmentMigrationComplete(connection); // NOTE: Even if we make this async using `then`, requesting `count` on an
console.log('Attachment migration status:', isComplete ? 'complete' : 'incomplete'); // IndexedDB store blocks all subsequent transactions, so we might as well
if (isComplete) { // explicitly wait for it here:
return; const numTotalMessages = await _getNumMessages({ connection });
}
let numTotalMessages = null;
// eslint-disable-next-line more/no-then
getNumMessages({ connection }).then((numMessages) => {
numTotalMessages = numMessages;
});
const migrationStartTime = Date.now(); const migrationStartTime = Date.now();
let unprocessedMessages = []; let numCumulativeMessagesProcessed = 0;
let totalMessagesProcessed = 0; // eslint-disable-next-line no-constant-condition
do { while (true) {
const lastProcessedIndex =
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await settings.getAttachmentMigrationLastProcessedIndex(connection); const status = await _processBatch({ connection, upgradeMessageSchema });
if (status.done) {
const fetchUnprocessedMessagesStartTime = Date.now();
unprocessedMessages =
// eslint-disable-next-line no-await-in-loop
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
connection,
count: NUM_MESSAGES_PER_BATCH,
lastIndex: lastProcessedIndex,
});
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const numUnprocessedMessages = unprocessedMessages.length;
if (numUnprocessedMessages === 0) {
break; break;
} }
numCumulativeMessagesProcessed += status.numMessagesProcessed;
const upgradeStartTime = Date.now(); console.log('Upgrade message schema:', Object.assign({}, status, {
const upgradedMessages =
// eslint-disable-next-line no-await-in-loop
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
// eslint-disable-next-line no-await-in-loop
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
// eslint-disable-next-line no-await-in-loop
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
// TODO: Confirm transaction is complete
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (newLastProcessedIndex) {
// eslint-disable-next-line no-await-in-loop
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
}
totalMessagesProcessed += numUnprocessedMessages;
console.log('Upgrade message schema:', {
lastProcessedIndex,
numUnprocessedMessages,
numCumulativeMessagesProcessed: totalMessagesProcessed,
numTotalMessages, numTotalMessages,
fetchDuration, numCumulativeMessagesProcessed,
saveDuration, }));
upgradeDuration, }
newLastProcessedIndex,
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
});
} while (unprocessedMessages.length > 0);
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
console.log('Close database connection'); console.log('Close database connection');
connection.close(); connection.close();
@ -196,10 +138,117 @@ exports.processAll = async ({
const totalDuration = Date.now() - migrationStartTime; const totalDuration = Date.now() - migrationStartTime;
console.log('Attachment migration complete:', { console.log('Attachment migration complete:', {
totalDuration, totalDuration,
totalMessagesProcessed, totalMessagesProcessed: numCumulativeMessagesProcessed,
}); });
}; };
exports.processNextBatchWithoutIndex = async ({
databaseName,
minDatabaseVersion,
upgradeMessageSchema,
} = {}) => {
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required');
}
const connection = await _getConnection({ databaseName, minDatabaseVersion });
const batch = await _processBatch({ connection, upgradeMessageSchema });
return batch;
};
// Private API
const _getConnection = async ({ databaseName, minDatabaseVersion }) => {
if (!isString(databaseName)) {
throw new TypeError('"databaseName" must be a string');
}
if (!isNumber(minDatabaseVersion)) {
throw new TypeError('"minDatabaseVersion" must be a number');
}
const connection = await database.open(databaseName);
const databaseVersion = connection.version;
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
console.log('Database status', {
databaseVersion,
isValidDatabaseVersion,
minDatabaseVersion,
});
if (!isValidDatabaseVersion) {
throw new Error(`Expected database version (${databaseVersion})` +
` to be at least ${minDatabaseVersion}`);
}
return connection;
};
const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => {
if (!isObject(connection)) {
throw new TypeError('"connection" must be a string');
}
if (!isFunction(upgradeMessageSchema)) {
throw new TypeError('"upgradeMessageSchema" is required');
}
const isAttachmentMigrationComplete =
await settings.isAttachmentMigrationComplete(connection);
if (isAttachmentMigrationComplete) {
return {
done: true,
};
}
const lastProcessedIndex =
await settings.getAttachmentMigrationLastProcessedIndex(connection);
const fetchUnprocessedMessagesStartTime = Date.now();
const unprocessedMessages =
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
connection,
count: NUM_MESSAGES_PER_BATCH,
lastIndex: lastProcessedIndex,
});
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const upgradeStartTime = Date.now();
const upgradedMessages =
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction);
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime;
const numMessagesProcessed = upgradedMessages.length;
const done = numMessagesProcessed === 0;
const lastMessage = last(upgradedMessages);
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
if (!done) {
await settings.setAttachmentMigrationLastProcessedIndex(
connection,
newLastProcessedIndex
);
} else {
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
}
return {
done,
fetchDuration,
lastProcessedIndex,
newLastProcessedIndex,
numMessagesProcessed,
saveDuration,
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
upgradeDuration,
};
};
const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => { const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => {
const backboneMessage = new BackboneMessage(message); const backboneMessage = new BackboneMessage(message);
return deferredToPromise(backboneMessage.save()); return deferredToPromise(backboneMessage.save());
@ -281,7 +330,7 @@ const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
}); });
}; };
const getNumMessages = async ({ connection } = {}) => { const _getNumMessages = async ({ connection } = {}) => {
if (!isObject(connection)) { if (!isObject(connection)) {
throw new TypeError('"connection" is required'); throw new TypeError('"connection" is required');
} }