2018-03-27 16:19:40 +00:00
|
|
|
|
// Module to upgrade the schema of messages, e.g. migrate attachments to disk.
|
2018-03-28 14:23:36 +00:00
|
|
|
|
// `processAll` purposely doesn’t rely on our Backbone IndexedDB adapter to
|
|
|
|
|
// prevent automatic migrations. Rather, it uses direct IndexedDB access.
|
|
|
|
|
// This includes avoiding usage of `storage` module which uses Backbone under
|
|
|
|
|
// the hood.
|
2018-03-27 16:19:40 +00:00
|
|
|
|
|
2018-03-28 14:54:01 +00:00
|
|
|
|
/* global IDBKeyRange */
|
|
|
|
|
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const isFunction = require('lodash/isFunction');
|
2018-03-26 23:09:06 +00:00
|
|
|
|
const isNumber = require('lodash/isNumber');
|
|
|
|
|
const isObject = require('lodash/isObject');
|
|
|
|
|
const isString = require('lodash/isString');
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const last = require('lodash/last');
|
2018-03-26 20:33:23 +00:00
|
|
|
|
|
2018-03-28 14:54:01 +00:00
|
|
|
|
const database = require('./database');
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const Message = require('./types/message');
|
2018-03-28 14:54:01 +00:00
|
|
|
|
const settings = require('./settings');
|
2018-03-26 20:33:23 +00:00
|
|
|
|
const { deferredToPromise } = require('./deferred_to_promise');
|
2018-03-27 15:51:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const MESSAGES_STORE_NAME = 'messages';
|
2018-03-28 22:14:26 +00:00
|
|
|
|
const NUM_MESSAGES_PER_BATCH = 1;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
2018-03-26 20:32:22 +00:00
|
|
|
|
exports.processNext = async ({
|
2018-03-21 23:37:39 +00:00
|
|
|
|
BackboneMessage,
|
|
|
|
|
BackboneMessageCollection,
|
|
|
|
|
count,
|
|
|
|
|
upgradeMessageSchema,
|
|
|
|
|
} = {}) => {
|
|
|
|
|
if (!isFunction(BackboneMessage)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"BackboneMessage" (Whisper.Message) constructor is required');
|
2018-03-21 23:37:39 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isFunction(BackboneMessageCollection)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
|
2018-03-21 23:37:39 +00:00
|
|
|
|
' constructor is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isNumber(count)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"count" is required');
|
2018-03-21 23:37:39 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isFunction(upgradeMessageSchema)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"upgradeMessageSchema" is required');
|
2018-03-21 23:37:39 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const startTime = Date.now();
|
|
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const fetchStartTime = Date.now();
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const messagesRequiringSchemaUpgrade =
|
|
|
|
|
await _fetchMessagesRequiringSchemaUpgrade({ BackboneMessageCollection, count });
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const fetchDuration = Date.now() - fetchStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const upgradeStartTime = Date.now();
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const upgradedMessages =
|
|
|
|
|
await Promise.all(messagesRequiringSchemaUpgrade.map(upgradeMessageSchema));
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const upgradeDuration = Date.now() - upgradeStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const saveStartTime = Date.now();
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const saveMessage = _saveMessageBackbone({ BackboneMessage });
|
2018-03-21 23:37:39 +00:00
|
|
|
|
await Promise.all(upgradedMessages.map(saveMessage));
|
2018-03-27 16:19:55 +00:00
|
|
|
|
const saveDuration = Date.now() - saveStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
|
|
|
|
const totalDuration = Date.now() - startTime;
|
|
|
|
|
const numProcessed = messagesRequiringSchemaUpgrade.length;
|
|
|
|
|
const hasMore = numProcessed > 0;
|
|
|
|
|
return {
|
|
|
|
|
hasMore,
|
|
|
|
|
numProcessed,
|
|
|
|
|
fetchDuration,
|
|
|
|
|
upgradeDuration,
|
|
|
|
|
saveDuration,
|
|
|
|
|
totalDuration,
|
|
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
|
2018-03-28 14:23:36 +00:00
|
|
|
|
exports.processAll = async ({
|
2018-03-28 14:54:01 +00:00
|
|
|
|
Backbone,
|
|
|
|
|
databaseName,
|
2018-03-28 18:45:07 +00:00
|
|
|
|
minDatabaseVersion,
|
2018-03-28 14:54:01 +00:00
|
|
|
|
upgradeMessageSchema,
|
|
|
|
|
} = {}) => {
|
2018-03-26 23:09:06 +00:00
|
|
|
|
if (!isObject(Backbone)) {
|
|
|
|
|
throw new TypeError('"Backbone" is required');
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-28 14:23:36 +00:00
|
|
|
|
if (!isString(databaseName)) {
|
|
|
|
|
throw new TypeError('"databaseName" must be a string');
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-28 18:45:07 +00:00
|
|
|
|
if (!isNumber(minDatabaseVersion)) {
|
|
|
|
|
throw new TypeError('"minDatabaseVersion" must be a number');
|
2018-03-28 14:23:36 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-03-26 23:09:06 +00:00
|
|
|
|
if (!isFunction(upgradeMessageSchema)) {
|
|
|
|
|
throw new TypeError('"upgradeMessageSchema" is required');
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-28 18:45:07 +00:00
|
|
|
|
const connection = await database.open(databaseName);
|
|
|
|
|
const databaseVersion = connection.version;
|
|
|
|
|
const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion;
|
|
|
|
|
console.log('Database status', {
|
|
|
|
|
databaseVersion,
|
|
|
|
|
isValidDatabaseVersion,
|
|
|
|
|
minDatabaseVersion,
|
|
|
|
|
});
|
|
|
|
|
if (!isValidDatabaseVersion) {
|
|
|
|
|
throw new Error(`Expected database version (${databaseVersion})` +
|
|
|
|
|
` to be at least ${minDatabaseVersion}`);
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-28 14:54:01 +00:00
|
|
|
|
const isComplete = await settings.isAttachmentMigrationComplete(connection);
|
2018-03-28 14:25:22 +00:00
|
|
|
|
console.log('Attachment migration status:', isComplete ? 'complete' : 'incomplete');
|
2018-03-27 15:51:21 +00:00
|
|
|
|
if (isComplete) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const migrationStartTime = Date.now();
|
|
|
|
|
let unprocessedMessages = [];
|
2018-03-28 14:24:07 +00:00
|
|
|
|
let totalMessagesProcessed = 0;
|
2018-03-27 15:51:21 +00:00
|
|
|
|
do {
|
2018-03-28 14:54:01 +00:00
|
|
|
|
const lastProcessedIndex =
|
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
|
await settings.getAttachmentMigrationLastProcessedIndex(connection);
|
2018-03-27 15:51:21 +00:00
|
|
|
|
|
|
|
|
|
const fetchUnprocessedMessagesStartTime = Date.now();
|
|
|
|
|
unprocessedMessages =
|
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
|
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
|
|
|
|
|
connection,
|
2018-03-27 16:15:14 +00:00
|
|
|
|
count: NUM_MESSAGES_PER_BATCH,
|
2018-03-27 15:51:21 +00:00
|
|
|
|
lastIndex: lastProcessedIndex,
|
|
|
|
|
});
|
|
|
|
|
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
|
|
|
|
|
const numUnprocessedMessages = unprocessedMessages.length;
|
|
|
|
|
|
2018-03-28 14:10:45 +00:00
|
|
|
|
if (numUnprocessedMessages === 0) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const upgradeStartTime = Date.now();
|
|
|
|
|
const upgradedMessages =
|
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
|
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
|
|
|
|
|
const upgradeDuration = Date.now() - upgradeStartTime;
|
|
|
|
|
|
|
|
|
|
const saveMessagesStartTime = Date.now();
|
|
|
|
|
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
|
2018-03-28 14:54:01 +00:00
|
|
|
|
const transactionCompletion = database.completeTransaction(transaction);
|
2018-03-27 15:51:21 +00:00
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
|
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
|
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
|
await transactionCompletion;
|
|
|
|
|
const saveDuration = Date.now() - saveMessagesStartTime;
|
|
|
|
|
|
|
|
|
|
// TODO: Confirm transaction is complete
|
|
|
|
|
|
|
|
|
|
const lastMessage = last(upgradedMessages);
|
|
|
|
|
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
|
|
|
|
|
if (newLastProcessedIndex) {
|
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
2018-03-28 14:54:01 +00:00
|
|
|
|
await settings.setAttachmentMigrationLastProcessedIndex(
|
|
|
|
|
connection,
|
|
|
|
|
newLastProcessedIndex
|
|
|
|
|
);
|
2018-03-27 15:51:21 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-03-28 14:24:07 +00:00
|
|
|
|
totalMessagesProcessed += numUnprocessedMessages;
|
|
|
|
|
console.log('Upgrade message schema:', {
|
2018-03-27 15:51:21 +00:00
|
|
|
|
lastProcessedIndex,
|
|
|
|
|
numUnprocessedMessages,
|
2018-03-28 14:24:07 +00:00
|
|
|
|
numCumulativeMessagesProcessed: totalMessagesProcessed,
|
2018-03-27 15:51:21 +00:00
|
|
|
|
fetchDuration,
|
|
|
|
|
saveDuration,
|
|
|
|
|
upgradeDuration,
|
|
|
|
|
newLastProcessedIndex,
|
2018-03-28 14:24:49 +00:00
|
|
|
|
targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION,
|
2018-03-26 23:09:06 +00:00
|
|
|
|
});
|
2018-03-27 15:51:21 +00:00
|
|
|
|
} while (unprocessedMessages.length > 0);
|
|
|
|
|
|
2018-03-28 14:54:01 +00:00
|
|
|
|
await settings.markAttachmentMigrationComplete(connection);
|
2018-03-28 21:12:51 +00:00
|
|
|
|
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
|
2018-03-28 14:37:21 +00:00
|
|
|
|
|
|
|
|
|
console.log('Close database connection');
|
2018-03-27 15:51:21 +00:00
|
|
|
|
connection.close();
|
|
|
|
|
|
|
|
|
|
const totalDuration = Date.now() - migrationStartTime;
|
2018-03-28 14:24:07 +00:00
|
|
|
|
console.log('Attachment migration complete:', {
|
|
|
|
|
totalDuration,
|
|
|
|
|
totalMessagesProcessed,
|
|
|
|
|
});
|
2018-03-26 23:09:06 +00:00
|
|
|
|
};
|
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => {
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const backboneMessage = new BackboneMessage(message);
|
2018-03-26 20:33:23 +00:00
|
|
|
|
return deferredToPromise(backboneMessage.save());
|
2018-03-21 23:37:39 +00:00
|
|
|
|
};
|
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const _saveMessage = ({ transaction } = {}) => (message) => {
|
|
|
|
|
if (!isObject(transaction)) {
|
|
|
|
|
throw new TypeError('"transaction" is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
|
|
|
|
|
const request = messagesStore.put(message, message.id);
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
|
request.onsuccess = () =>
|
|
|
|
|
resolve();
|
|
|
|
|
request.onerror = event =>
|
|
|
|
|
reject(event.target.error);
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
2018-03-21 23:37:39 +00:00
|
|
|
|
const _fetchMessagesRequiringSchemaUpgrade =
|
|
|
|
|
async ({ BackboneMessageCollection, count } = {}) => {
|
|
|
|
|
if (!isFunction(BackboneMessageCollection)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"BackboneMessageCollection" (Whisper.MessageCollection)' +
|
2018-03-21 23:37:39 +00:00
|
|
|
|
' constructor is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isNumber(count)) {
|
2018-03-26 23:04:38 +00:00
|
|
|
|
throw new TypeError('"count" is required');
|
2018-03-21 23:37:39 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const collection = new BackboneMessageCollection();
|
|
|
|
|
return new Promise(resolve => collection.fetch({
|
|
|
|
|
limit: count,
|
|
|
|
|
index: {
|
|
|
|
|
name: 'schemaVersion',
|
|
|
|
|
upper: Message.CURRENT_SCHEMA_VERSION,
|
|
|
|
|
excludeUpper: true,
|
|
|
|
|
order: 'desc',
|
|
|
|
|
},
|
|
|
|
|
}).always(() => {
|
|
|
|
|
const models = collection.models || [];
|
|
|
|
|
const messages = models.map(model => model.toJSON());
|
|
|
|
|
resolve(messages);
|
|
|
|
|
}));
|
|
|
|
|
};
|
2018-03-26 23:09:06 +00:00
|
|
|
|
|
2018-03-29 17:12:18 +00:00
|
|
|
|
// NOTE: Named ‘dangerous’ because it is not as efficient as using our
|
|
|
|
|
// `messages` `schemaVersion` index:
|
2018-03-26 23:09:06 +00:00
|
|
|
|
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
|
2018-03-27 15:51:21 +00:00
|
|
|
|
({ connection, count, lastIndex } = {}) => {
|
|
|
|
|
if (!isObject(connection)) {
|
|
|
|
|
throw new TypeError('"connection" is required');
|
2018-03-26 23:09:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isNumber(count)) {
|
|
|
|
|
throw new TypeError('"count" is required');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (lastIndex && !isString(lastIndex)) {
|
|
|
|
|
throw new TypeError('"lastIndex" must be a string');
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const hasLastIndex = Boolean(lastIndex);
|
2018-03-26 23:09:06 +00:00
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
|
|
|
|
|
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
|
2018-03-26 23:09:06 +00:00
|
|
|
|
|
2018-03-27 15:51:21 +00:00
|
|
|
|
const excludeLowerBound = true;
|
|
|
|
|
const query = hasLastIndex
|
|
|
|
|
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
|
|
|
|
|
: undefined;
|
|
|
|
|
const request = messagesStore.getAll(query, count);
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
|
request.onsuccess = event =>
|
|
|
|
|
resolve(event.target.result);
|
|
|
|
|
request.onerror = event =>
|
|
|
|
|
reject(event.target.error);
|
|
|
|
|
});
|
2018-03-26 23:09:06 +00:00
|
|
|
|
};
|