// Module to upgrade the schema of messages, e.g. migrate attachments to disk. // `dangerouslyProcessAllWithoutIndex` purposely doesn’t rely on our Backbone // IndexedDB adapter to prevent automatic migrations. Rather, it uses direct // IndexedDB access. This includes avoiding usage of `storage` module which uses // Backbone under the hood. /* global IDBKeyRange */ const { isFunction, isNumber, isObject, isString, last } = require('lodash'); const database = require('./database'); const Message = require('./types/message'); const settings = require('./settings'); const MESSAGES_STORE_NAME = 'messages'; exports.processNext = async ({ BackboneMessage, BackboneMessageCollection, numMessagesPerBatch, upgradeMessageSchema, getMessagesNeedingUpgrade, saveMessage, maxVersion = Message.CURRENT_SCHEMA_VERSION, } = {}) => { if (!isFunction(BackboneMessage)) { throw new TypeError( "'BackboneMessage' (Whisper.Message) constructor is required" ); } if (!isFunction(BackboneMessageCollection)) { throw new TypeError( "'BackboneMessageCollection' (Whisper.MessageCollection)" + ' constructor is required' ); } if (!isNumber(numMessagesPerBatch)) { throw new TypeError("'numMessagesPerBatch' is required"); } if (!isFunction(upgradeMessageSchema)) { throw new TypeError("'upgradeMessageSchema' is required"); } const startTime = Date.now(); const fetchStartTime = Date.now(); const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade( numMessagesPerBatch, { maxVersion, MessageCollection: BackboneMessageCollection, } ); const fetchDuration = Date.now() - fetchStartTime; const upgradeStartTime = Date.now(); const upgradedMessages = await Promise.all( messagesRequiringSchemaUpgrade.map(message => upgradeMessageSchema(message, { maxVersion }) ) ); const upgradeDuration = Date.now() - upgradeStartTime; const saveStartTime = Date.now(); await Promise.all( upgradedMessages.map(message => saveMessage(message, { Message: BackboneMessage }) ) ); const saveDuration = Date.now() - saveStartTime; const totalDuration = Date.now() - startTime; const numProcessed = messagesRequiringSchemaUpgrade.length; const done = numProcessed < numMessagesPerBatch; return { done, numProcessed, fetchDuration, upgradeDuration, saveDuration, totalDuration, }; }; exports.dangerouslyProcessAllWithoutIndex = async ({ databaseName, minDatabaseVersion, numMessagesPerBatch, upgradeMessageSchema, logger, maxVersion = Message.CURRENT_SCHEMA_VERSION, saveMessage, BackboneMessage, } = {}) => { if (!isString(databaseName)) { throw new TypeError("'databaseName' must be a string"); } if (!isNumber(minDatabaseVersion)) { throw new TypeError("'minDatabaseVersion' must be a number"); } if (!isNumber(numMessagesPerBatch)) { throw new TypeError("'numMessagesPerBatch' must be a number"); } if (!isFunction(upgradeMessageSchema)) { throw new TypeError("'upgradeMessageSchema' is required"); } if (!isFunction(BackboneMessage)) { throw new TypeError("'upgradeMessageSchema' is required"); } if (!isFunction(saveMessage)) { throw new TypeError("'upgradeMessageSchema' is required"); } const connection = await database.open(databaseName); const databaseVersion = connection.version; const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion; logger.info('Database status', { databaseVersion, isValidDatabaseVersion, minDatabaseVersion, }); if (!isValidDatabaseVersion) { throw new Error( `Expected database version (${databaseVersion})` + ` to be at least ${minDatabaseVersion}` ); } // NOTE: Even if we make this async using `then`, requesting `count` on an // IndexedDB store blocks all subsequent transactions, so we might as well // explicitly wait for it here: const numTotalMessages = await exports.getNumMessages({ connection }); const migrationStartTime = Date.now(); let numCumulativeMessagesProcessed = 0; // eslint-disable-next-line no-constant-condition while (true) { // eslint-disable-next-line no-await-in-loop const status = await _processBatch({ connection, numMessagesPerBatch, upgradeMessageSchema, maxVersion, saveMessage, BackboneMessage, }); if (status.done) { break; } numCumulativeMessagesProcessed += status.numMessagesProcessed; logger.info( 'Upgrade message schema:', Object.assign({}, status, { numTotalMessages, numCumulativeMessagesProcessed, }) ); } logger.info('Close database connection'); connection.close(); const totalDuration = Date.now() - migrationStartTime; logger.info('Attachment migration complete:', { totalDuration, totalMessagesProcessed: numCumulativeMessagesProcessed, }); }; exports.processNextBatchWithoutIndex = async ({ databaseName, minDatabaseVersion, numMessagesPerBatch, upgradeMessageSchema, maxVersion, BackboneMessage, saveMessage, } = {}) => { if (!isFunction(upgradeMessageSchema)) { throw new TypeError("'upgradeMessageSchema' is required"); } const connection = await _getConnection({ databaseName, minDatabaseVersion }); const batch = await _processBatch({ connection, numMessagesPerBatch, upgradeMessageSchema, maxVersion, BackboneMessage, saveMessage, }); return batch; }; // Private API const _getConnection = async ({ databaseName, minDatabaseVersion }) => { if (!isString(databaseName)) { throw new TypeError("'databaseName' must be a string"); } if (!isNumber(minDatabaseVersion)) { throw new TypeError("'minDatabaseVersion' must be a number"); } const connection = await database.open(databaseName); const databaseVersion = connection.version; const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion; if (!isValidDatabaseVersion) { throw new Error( `Expected database version (${databaseVersion})` + ` to be at least ${minDatabaseVersion}` ); } return connection; }; const _processBatch = async ({ connection, numMessagesPerBatch, upgradeMessageSchema, maxVersion, BackboneMessage, saveMessage, } = {}) => { if (!isObject(connection)) { throw new TypeError('_processBatch: connection must be a string'); } if (!isFunction(upgradeMessageSchema)) { throw new TypeError('_processBatch: upgradeMessageSchema is required'); } if (!isNumber(numMessagesPerBatch)) { throw new TypeError('_processBatch: numMessagesPerBatch is required'); } if (!isNumber(maxVersion)) { throw new TypeError('_processBatch: maxVersion is required'); } if (!isFunction(BackboneMessage)) { throw new TypeError('_processBatch: BackboneMessage is required'); } if (!isFunction(saveMessage)) { throw new TypeError('_processBatch: saveMessage is required'); } const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete( connection ); if (isAttachmentMigrationComplete) { return { done: true, }; } const lastProcessedIndex = await settings.getAttachmentMigrationLastProcessedIndex( connection ); const fetchUnprocessedMessagesStartTime = Date.now(); const unprocessedMessages = await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex( { connection, count: numMessagesPerBatch, lastIndex: lastProcessedIndex, } ); const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime; const upgradeStartTime = Date.now(); const upgradedMessages = await Promise.all( unprocessedMessages.map(message => upgradeMessageSchema(message, { maxVersion }) ) ); const upgradeDuration = Date.now() - upgradeStartTime; const saveMessagesStartTime = Date.now(); const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); const transactionCompletion = database.completeTransaction(transaction); await Promise.all( upgradedMessages.map(message => saveMessage(message, { Message: BackboneMessage }) ) ); await transactionCompletion; const saveDuration = Date.now() - saveMessagesStartTime; const numMessagesProcessed = upgradedMessages.length; const done = numMessagesProcessed < numMessagesPerBatch; const lastMessage = last(upgradedMessages); const newLastProcessedIndex = lastMessage ? lastMessage.id : null; if (!done) { await settings.setAttachmentMigrationLastProcessedIndex( connection, newLastProcessedIndex ); } else { await settings.markAttachmentMigrationComplete(connection); await settings.deleteAttachmentMigrationLastProcessedIndex(connection); } const batchTotalDuration = Date.now() - fetchUnprocessedMessagesStartTime; return { batchTotalDuration, done, fetchDuration, lastProcessedIndex, newLastProcessedIndex, numMessagesProcessed, saveDuration, targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION, upgradeDuration, }; }; // NOTE: Named ‘dangerous’ because it is not as efficient as using our // `messages` `schemaVersion` index: const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({ connection, count, lastIndex, } = {}) => { if (!isObject(connection)) { throw new TypeError("'connection' is required"); } if (!isNumber(count)) { throw new TypeError("'count' is required"); } if (lastIndex && !isString(lastIndex)) { throw new TypeError("'lastIndex' must be a string"); } const hasLastIndex = Boolean(lastIndex); const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly'); const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME); const excludeLowerBound = true; const range = hasLastIndex ? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound) : undefined; return new Promise((resolve, reject) => { const items = []; const request = messagesStore.openCursor(range); request.onsuccess = event => { const cursor = event.target.result; const hasMoreData = Boolean(cursor); if (!hasMoreData || items.length === count) { resolve(items); return; } const item = cursor.value; items.push(item); cursor.continue(); }; request.onerror = event => reject(event.target.error); }); }; exports.getNumMessages = async ({ connection } = {}) => { if (!isObject(connection)) { throw new TypeError("'connection' is required"); } const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly'); const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME); const numTotalMessages = await database.getCount({ store: messagesStore }); await database.completeTransaction(transaction); return numTotalMessages; };