Implement MessageDataMigrator.processAll
Upgrades schema of all messags upon startup.
This commit is contained in:
parent
b8a0bc3423
commit
070235b59b
1 changed files with 198 additions and 25 deletions
|
@ -1,12 +1,20 @@
|
|||
/* eslint-env browser */
|
||||
|
||||
const isFunction = require('lodash/isFunction');
|
||||
const isNumber = require('lodash/isNumber');
|
||||
const isObject = require('lodash/isObject');
|
||||
const isString = require('lodash/isString');
|
||||
const last = require('lodash/last');
|
||||
|
||||
const Message = require('./types/message');
|
||||
const { deferredToPromise } = require('./deferred_to_promise');
|
||||
const Migrations0DatabaseWithAttachmentData =
|
||||
require('./migrations/migrations_0_database_with_attachment_data');
|
||||
|
||||
|
||||
const DATABASE_NAME = 'signal';
|
||||
// Last version with attachment data stored in database:
|
||||
const EXPECTED_DATABASE_VERSION = 17;
|
||||
const MESSAGES_STORE_NAME = 'messages';
|
||||
const ITEMS_STORE_NAME = 'items';
|
||||
|
||||
exports.processNext = async ({
|
||||
BackboneMessage,
|
||||
|
@ -44,7 +52,7 @@ exports.processNext = async ({
|
|||
const upgradeDuration = Date.now() - startUpgradeTime;
|
||||
|
||||
const startSaveTime = Date.now();
|
||||
const saveMessage = _saveMessage({ BackboneMessage });
|
||||
const saveMessage = _saveMessageBackbone({ BackboneMessage });
|
||||
await Promise.all(upgradedMessages.map(saveMessage));
|
||||
const saveDuration = Date.now() - startSaveTime;
|
||||
|
||||
|
@ -78,20 +86,91 @@ exports.processAll = async ({
|
|||
throw new TypeError('"upgradeMessageSchema" is required');
|
||||
}
|
||||
|
||||
const lastIndex = null;
|
||||
const unprocessedMessages =
|
||||
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
|
||||
Backbone,
|
||||
count: 10,
|
||||
lastIndex,
|
||||
const connection = await openDatabase(DATABASE_NAME, EXPECTED_DATABASE_VERSION);
|
||||
const isComplete = await isMigrationComplete(connection);
|
||||
console.log('Is attachment migration complete?', isComplete);
|
||||
if (isComplete) {
|
||||
return;
|
||||
}
|
||||
|
||||
const migrationStartTime = Date.now();
|
||||
let unprocessedMessages = [];
|
||||
do {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const lastProcessedIndex = (await getLastProcessedIndex(connection)) || null;
|
||||
|
||||
const fetchUnprocessedMessagesStartTime = Date.now();
|
||||
unprocessedMessages =
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({
|
||||
connection,
|
||||
count: 10,
|
||||
lastIndex: lastProcessedIndex,
|
||||
});
|
||||
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
|
||||
const numUnprocessedMessages = unprocessedMessages.length;
|
||||
|
||||
const upgradeStartTime = Date.now();
|
||||
const upgradedMessages =
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await Promise.all(unprocessedMessages.map(upgradeMessageSchema));
|
||||
const upgradeDuration = Date.now() - upgradeStartTime;
|
||||
|
||||
const saveMessagesStartTime = Date.now();
|
||||
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
|
||||
const transactionCompletion = completeTransaction(transaction);
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await Promise.all(upgradedMessages.map(_saveMessage({ transaction })));
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await transactionCompletion;
|
||||
const saveDuration = Date.now() - saveMessagesStartTime;
|
||||
|
||||
// TODO: Confirm transaction is complete
|
||||
|
||||
const lastMessage = last(upgradedMessages);
|
||||
const newLastProcessedIndex = lastMessage ? lastMessage.id : null;
|
||||
if (newLastProcessedIndex) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await setLastProcessedIndex(connection, newLastProcessedIndex);
|
||||
}
|
||||
|
||||
console.log('Upgrade message schema on startup:', {
|
||||
lastProcessedIndex,
|
||||
numUnprocessedMessages,
|
||||
fetchDuration,
|
||||
saveDuration,
|
||||
upgradeDuration,
|
||||
newLastProcessedIndex,
|
||||
});
|
||||
} while (unprocessedMessages.length > 0);
|
||||
|
||||
await markMigrationComplete(connection);
|
||||
connection.close();
|
||||
|
||||
const totalDuration = Date.now() - migrationStartTime;
|
||||
console.log('Attachment migration complete:', { totalDuration });
|
||||
};
|
||||
|
||||
const _saveMessage = ({ BackboneMessage } = {}) => (message) => {
|
||||
const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => {
|
||||
const backboneMessage = new BackboneMessage(message);
|
||||
return deferredToPromise(backboneMessage.save());
|
||||
};
|
||||
|
||||
const _saveMessage = ({ transaction } = {}) => (message) => {
|
||||
if (!isObject(transaction)) {
|
||||
throw new TypeError('"transaction" is required');
|
||||
}
|
||||
|
||||
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
|
||||
const request = messagesStore.put(message, message.id);
|
||||
return new Promise((resolve, reject) => {
|
||||
request.onsuccess = () =>
|
||||
resolve();
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
});
|
||||
};
|
||||
|
||||
const _fetchMessagesRequiringSchemaUpgrade =
|
||||
async ({ BackboneMessageCollection, count } = {}) => {
|
||||
if (!isFunction(BackboneMessageCollection)) {
|
||||
|
@ -119,11 +198,10 @@ const _fetchMessagesRequiringSchemaUpgrade =
|
|||
}));
|
||||
};
|
||||
|
||||
const MAX_MESSAGE_KEY = 'ffffffff-ffff-ffff-ffff-ffffffffffff';
|
||||
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
|
||||
async ({ Backbone, count, lastIndex } = {}) => {
|
||||
if (!isObject(Backbone)) {
|
||||
throw new TypeError('"Backbone" is required');
|
||||
({ connection, count, lastIndex } = {}) => {
|
||||
if (!isObject(connection)) {
|
||||
throw new TypeError('"connection" is required');
|
||||
}
|
||||
|
||||
if (!isNumber(count)) {
|
||||
|
@ -134,17 +212,112 @@ const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex =
|
|||
throw new TypeError('"lastIndex" must be a string');
|
||||
}
|
||||
|
||||
const storeName = 'messages';
|
||||
const collection =
|
||||
Migrations0DatabaseWithAttachmentData.createCollection({ Backbone, storeName });
|
||||
const hasLastIndex = Boolean(lastIndex);
|
||||
|
||||
const range = lastIndex ? [lastIndex, MAX_MESSAGE_KEY] : null;
|
||||
await deferredToPromise(collection.fetch({
|
||||
limit: count,
|
||||
range,
|
||||
}));
|
||||
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly');
|
||||
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
|
||||
|
||||
const models = collection.models || [];
|
||||
const messages = models.map(model => model.toJSON());
|
||||
return messages;
|
||||
const excludeLowerBound = true;
|
||||
const query = hasLastIndex
|
||||
? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound)
|
||||
: undefined;
|
||||
const request = messagesStore.getAll(query, count);
|
||||
return new Promise((resolve, reject) => {
|
||||
request.onsuccess = event =>
|
||||
resolve(event.target.result);
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
});
|
||||
};
|
||||
|
||||
const openDatabase = (name, version) => {
|
||||
const request = window.indexedDB.open(name, version);
|
||||
return new Promise((resolve, reject) => {
|
||||
request.onblocked = () =>
|
||||
reject(new Error('Database blocked'));
|
||||
|
||||
request.onupgradeneeded = event =>
|
||||
reject(new Error('Unexpected database upgrade required:' +
|
||||
`oldVersion: ${event.oldVersion}, newVersion: ${event.newVersion}`));
|
||||
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
|
||||
request.onsuccess = (event) => {
|
||||
const connection = event.target.result;
|
||||
resolve(connection);
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
const LAST_PROCESSED_INDEX_KEY = 'attachmentMigration_lastProcessedIndex';
|
||||
const IS_MIGRATION_COMPLETE_KEY = 'attachmentMigration_isComplete';
|
||||
|
||||
const getLastProcessedIndex = connection =>
|
||||
getItem(connection, LAST_PROCESSED_INDEX_KEY);
|
||||
|
||||
const setLastProcessedIndex = (connection, value) =>
|
||||
setItem(connection, LAST_PROCESSED_INDEX_KEY, value);
|
||||
|
||||
const isMigrationComplete = async (connection) => {
|
||||
const value = await getItem(connection, IS_MIGRATION_COMPLETE_KEY);
|
||||
return Boolean(value);
|
||||
};
|
||||
|
||||
const markMigrationComplete = connection =>
|
||||
setItem(connection, IS_MIGRATION_COMPLETE_KEY, true);
|
||||
|
||||
const getItem = (connection, key) => {
|
||||
if (!isObject(connection)) {
|
||||
throw new TypeError('"connection" is required');
|
||||
}
|
||||
|
||||
if (!isString(key)) {
|
||||
throw new TypeError('"key" must be a string');
|
||||
}
|
||||
|
||||
const transaction = connection.transaction(ITEMS_STORE_NAME, 'readonly');
|
||||
const itemsStore = transaction.objectStore(ITEMS_STORE_NAME);
|
||||
const request = itemsStore.get(key);
|
||||
return new Promise((resolve, reject) => {
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
|
||||
request.onsuccess = event =>
|
||||
resolve(event.target.result);
|
||||
});
|
||||
};
|
||||
|
||||
const setItem = (connection, key, value) => {
|
||||
if (!isObject(connection)) {
|
||||
throw new TypeError('"connection" is required');
|
||||
}
|
||||
|
||||
if (!isString(key)) {
|
||||
throw new TypeError('"key" must be a string');
|
||||
}
|
||||
|
||||
const transaction = connection.transaction(ITEMS_STORE_NAME, 'readwrite');
|
||||
const itemsStore = transaction.objectStore(ITEMS_STORE_NAME);
|
||||
const request = itemsStore.put(value, key);
|
||||
return new Promise((resolve, reject) => {
|
||||
request.onerror = event =>
|
||||
reject(event.target.error);
|
||||
|
||||
request.onsuccess = () =>
|
||||
resolve();
|
||||
});
|
||||
};
|
||||
|
||||
const completeTransaction = transaction =>
|
||||
new Promise((resolve, reject) => {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
transaction.onabort = event =>
|
||||
reject(event.target.error);
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
transaction.onerror = event =>
|
||||
reject(event.target.error);
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
transaction.oncomplete = () =>
|
||||
resolve();
|
||||
});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue