Move conversations to SQLCipher

This commit is contained in:
Scott Nonnenberg 2018-09-20 18:47:19 -07:00
parent 8cd3db0262
commit cd60bdd08a
31 changed files with 1354 additions and 774 deletions

View file

@ -224,7 +224,49 @@ function eliminateClientConfigInBackup(data, targetPath) {
}
}
function importFromJsonString(db, jsonString, targetPath, options) {
async function importConversationsFromJSON(conversations, options) {
const { writeNewAttachmentData } = window.Signal.Migrations;
const { conversationLookup } = options;
let count = 0;
let skipCount = 0;
for (let i = 0, max = conversations.length; i < max; i += 1) {
const toAdd = unstringify(conversations[i]);
const haveConversationAlready =
conversationLookup[getConversationKey(toAdd)];
if (haveConversationAlready) {
skipCount += 1;
count += 1;
// eslint-disable-next-line no-continue
continue;
}
count += 1;
// eslint-disable-next-line no-await-in-loop
const migrated = await window.Signal.Types.Conversation.migrateConversation(
toAdd,
{
writeNewAttachmentData,
}
);
// eslint-disable-next-line no-await-in-loop
await window.Signal.Data.saveConversation(migrated, {
Conversation: Whisper.Conversation,
});
}
window.log.info(
'Done importing conversations:',
'Total count:',
count,
'Skipped:',
skipCount
);
}
async function importFromJsonString(db, jsonString, targetPath, options) {
options = options || {};
_.defaults(options, {
forceLightImport: false,
@ -232,12 +274,12 @@ function importFromJsonString(db, jsonString, targetPath, options) {
groupLookup: {},
});
const { conversationLookup, groupLookup } = options;
const { groupLookup } = options;
const result = {
fullImport: true,
};
return new Promise((resolve, reject) => {
return new Promise(async (resolve, reject) => {
const importObject = JSON.parse(jsonString);
delete importObject.debug;
@ -273,7 +315,25 @@ function importFromJsonString(db, jsonString, targetPath, options) {
finished = true;
};
const transaction = db.transaction(storeNames, 'readwrite');
// Special-case conversations key here, going to SQLCipher
const { conversations } = importObject;
const remainingStoreNames = _.without(
storeNames,
'conversations',
'unprocessed'
);
try {
await importConversationsFromJSON(conversations, options);
} catch (error) {
reject(error);
}
// Because the 'are we done?' check below looks at the keys remaining in importObject
delete importObject.conversations;
delete importObject.unprocessed;
// The rest go to IndexedDB
const transaction = db.transaction(remainingStoreNames, 'readwrite');
transaction.onerror = () => {
Whisper.Database.handleDOMException(
'importFromJsonString transaction error',
@ -283,7 +343,7 @@ function importFromJsonString(db, jsonString, targetPath, options) {
};
transaction.oncomplete = finish.bind(null, 'transaction complete');
_.each(storeNames, storeName => {
_.each(remainingStoreNames, storeName => {
window.log.info('Importing items for store', storeName);
if (!importObject[storeName].length) {
@ -315,13 +375,10 @@ function importFromJsonString(db, jsonString, targetPath, options) {
_.each(importObject[storeName], toAdd => {
toAdd = unstringify(toAdd);
const haveConversationAlready =
storeName === 'conversations' &&
conversationLookup[getConversationKey(toAdd)];
const haveGroupAlready =
storeName === 'groups' && groupLookup[getGroupKey(toAdd)];
if (haveConversationAlready || haveGroupAlready) {
if (haveGroupAlready) {
skipCount += 1;
count += 1;
return;
@ -1137,20 +1194,17 @@ function getMessageKey(message) {
const sourceDevice = message.sourceDevice || 1;
return `${source}.${sourceDevice} ${message.timestamp}`;
}
async function loadMessagesLookup(db) {
const array = await window.Signal.Data.getAllMessageIds({
db,
getMessageKey,
handleDOMException: Whisper.Database.handleDOMException,
});
return fromPairs(map(array, item => [item, true]));
async function loadMessagesLookup() {
const array = await window.Signal.Data.getAllMessageIds();
return fromPairs(map(array, item => [getMessageKey(item), true]));
}
function getConversationKey(conversation) {
return conversation.id;
}
function loadConversationLookup(db) {
return assembleLookup(db, 'conversations', getConversationKey);
async function loadConversationLookup() {
const array = await window.Signal.Data.getAllConversationIds();
return fromPairs(map(array, item => [getConversationKey(item), true]));
}
function getGroupKey(group) {

View file

@ -1,7 +1,8 @@
/* global window, setTimeout */
const electron = require('electron');
const { forEach, isFunction, isObject } = require('lodash');
const { forEach, isFunction, isObject, merge } = require('lodash');
const { deferredToPromise } = require('./deferred_to_promise');
const MessageType = require('./types/message');
@ -37,6 +38,20 @@ module.exports = {
close,
removeDB,
getConversationCount,
saveConversation,
saveConversations,
getConversationById,
updateConversation,
removeConversation,
_removeConversations,
getAllConversations,
getAllConversationIds,
getAllPrivateConversations,
getAllGroupsInvolvingId,
searchConversations,
getMessageCount,
saveMessage,
saveLegacyMessage,
@ -49,6 +64,7 @@ module.exports = {
getMessageBySender,
getMessageById,
getAllMessages,
getAllMessageIds,
getMessagesBySentAt,
getExpiredMessages,
@ -222,6 +238,86 @@ async function removeDB() {
await channels.removeDB();
}
async function getConversationCount() {
return channels.getConversationCount();
}
async function saveConversation(data) {
await channels.saveConversation(data);
}
async function saveConversations(data) {
await channels.saveConversations(data);
}
async function getConversationById(id, { Conversation }) {
const data = await channels.getConversationById(id);
return new Conversation(data);
}
async function updateConversation(id, data, { Conversation }) {
const existing = await getConversationById(id, { Conversation });
if (!existing) {
throw new Error(`Conversation ${id} does not exist!`);
}
const merged = merge({}, existing.attributes, data);
await channels.updateConversation(merged);
}
async function removeConversation(id, { Conversation }) {
const existing = await getConversationById(id, { Conversation });
// Note: It's important to have a fully database-hydrated model to delete here because
// it needs to delete all associated on-disk files along with the database delete.
if (existing) {
await channels.removeConversation(id);
await existing.cleanup();
}
}
// Note: this method will not clean up external files, just delete from SQL
async function _removeConversations(ids) {
await channels.removeConversation(ids);
}
async function getAllConversations({ ConversationCollection }) {
const conversations = await channels.getAllConversations();
const collection = new ConversationCollection();
collection.add(conversations);
return collection;
}
async function getAllConversationIds() {
const ids = await channels.getAllConversationIds();
return ids;
}
async function getAllPrivateConversations({ ConversationCollection }) {
const conversations = await channels.getAllPrivateConversations();
const collection = new ConversationCollection();
collection.add(conversations);
return collection;
}
async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
const conversations = await channels.getAllGroupsInvolvingId(id);
const collection = new ConversationCollection();
collection.add(conversations);
return collection;
}
async function searchConversations(query, { ConversationCollection }) {
const conversations = await channels.searchConversations(query);
const collection = new ConversationCollection();
collection.add(conversations);
return collection;
}
async function getMessageCount() {
return channels.getMessageCount();
}
@ -267,6 +363,12 @@ async function getMessageById(id, { Message }) {
return new Message(message);
}
// For testing only
async function getAllMessages({ MessageCollection }) {
const messages = await channels.getAllMessages();
return new MessageCollection(messages);
}
async function getAllMessageIds() {
const ids = await channels.getAllMessageIds();
return ids;

View file

@ -16,7 +16,6 @@ const {
const Attachments = require('../../app/attachments');
const Message = require('./types/message');
const { deferredToPromise } = require('./deferred_to_promise');
const { sleep } = require('./sleep');
// See: https://en.wikipedia.org/wiki/Fictitious_telephone_number#North_American_Numbering_Plan
@ -50,9 +49,12 @@ exports.createConversation = async ({
active_at: Date.now(),
unread: numMessages,
});
await deferredToPromise(conversation.save());
const conversationId = conversation.get('id');
await Signal.Data.updateConversation(
conversationId,
conversation.attributes,
{ Conversation: Whisper.Conversation }
);
await Promise.all(
range(0, numMessages).map(async index => {

View file

@ -4,7 +4,7 @@
// IndexedDB access. This includes avoiding usage of `storage` module which uses
// Backbone under the hood.
/* global IDBKeyRange */
/* global IDBKeyRange, window */
const { isFunction, isNumber, isObject, isString, last } = require('lodash');
@ -47,13 +47,25 @@ exports.processNext = async ({
const startTime = Date.now();
const fetchStartTime = Date.now();
const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
numMessagesPerBatch,
{
maxVersion,
MessageCollection: BackboneMessageCollection,
}
);
let messagesRequiringSchemaUpgrade;
try {
messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
numMessagesPerBatch,
{
maxVersion,
MessageCollection: BackboneMessageCollection,
}
);
} catch (error) {
window.log.error(
'processNext error:',
error && error.stack ? error.stack : error
);
return {
done: true,
numProcessed: 0,
};
}
const fetchDuration = Date.now() - fetchStartTime;
const upgradeStartTime = Date.now();
@ -263,13 +275,26 @@ const _processBatch = async ({
);
const fetchUnprocessedMessagesStartTime = Date.now();
const unprocessedMessages = await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex(
{
connection,
count: numMessagesPerBatch,
lastIndex: lastProcessedIndex,
}
);
let unprocessedMessages;
try {
unprocessedMessages = await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex(
{
connection,
count: numMessagesPerBatch,
lastIndex: lastProcessedIndex,
}
);
} catch (error) {
window.log.error(
'_processBatch error:',
error && error.stack ? error.stack : error
);
await settings.markAttachmentMigrationComplete(connection);
await settings.deleteAttachmentMigrationLastProcessedIndex(connection);
return {
done: true,
};
}
const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime;
const upgradeStartTime = Date.now();

View file

@ -6,6 +6,8 @@ const {
_removeMessages,
saveUnprocesseds,
removeUnprocessed,
saveConversations,
_removeConversations,
} = require('./data');
const {
getMessageExportLastIndex,
@ -15,6 +17,7 @@ const {
getUnprocessedExportLastIndex,
setUnprocessedExportLastIndex,
} = require('./settings');
const { migrateConversation } = require('./types/conversation');
module.exports = {
migrateToSQL,
@ -26,6 +29,7 @@ async function migrateToSQL({
handleDOMException,
countCallback,
arrayBufferToString,
writeNewAttachmentData,
}) {
if (!db) {
throw new Error('Need db for IndexedDB connection!');
@ -74,6 +78,11 @@ async function migrateToSQL({
}
}
window.log.info('migrateToSQL: migrate of messages complete');
try {
await clearStores(['messages']);
} catch (error) {
window.log.warn('Failed to clear messages store');
}
lastIndex = await getUnprocessedExportLastIndex(db);
complete = false;
@ -116,8 +125,43 @@ async function migrateToSQL({
await setUnprocessedExportLastIndex(db, lastIndex);
}
window.log.info('migrateToSQL: migrate of unprocessed complete');
try {
await clearStores(['unprocessed']);
} catch (error) {
window.log.warn('Failed to clear unprocessed store');
}
await clearStores(['messages', 'unprocessed']);
complete = false;
while (!complete) {
// eslint-disable-next-line no-await-in-loop
const status = await migrateStoreToSQLite({
db,
// eslint-disable-next-line no-loop-func
save: async array => {
const conversations = await Promise.all(
map(array, async conversation =>
migrateConversation(conversation, { writeNewAttachmentData })
)
);
saveConversations(conversations);
},
remove: _removeConversations,
storeName: 'conversations',
handleDOMException,
lastIndex,
// Because we're doing real-time moves to the filesystem, minimize parallelism
batchSize: 5,
});
({ complete, lastIndex } = status);
}
window.log.info('migrateToSQL: migrate of conversations complete');
try {
await clearStores(['conversations']);
} catch (error) {
window.log.warn('Failed to clear conversations store');
}
window.log.info('migrateToSQL: complete');
}

View file

@ -1,15 +1,13 @@
/* global window, Whisper */
const Migrations0DatabaseWithAttachmentData = require('./migrations_0_database_with_attachment_data');
const Migrations1DatabaseWithoutAttachmentData = require('./migrations_1_database_without_attachment_data');
exports.getPlaceholderMigrations = () => {
const last0MigrationVersion = Migrations0DatabaseWithAttachmentData.getLatestVersion();
const last1MigrationVersion = Migrations1DatabaseWithoutAttachmentData.getLatestVersion();
const lastMigrationVersion = last1MigrationVersion || last0MigrationVersion;
return [
{
version: lastMigrationVersion,
version: last0MigrationVersion,
migrate() {
throw new Error(
'Unexpected invocation of placeholder migration!' +
@ -20,3 +18,18 @@ exports.getPlaceholderMigrations = () => {
},
];
};
exports.getCurrentVersion = () =>
new Promise((resolve, reject) => {
const request = window.indexedDB.open(Whisper.Database.id);
request.onerror = reject;
request.onupgradeneeded = reject;
request.onsuccess = () => {
const db = request.result;
const { version } = db;
return resolve(version);
};
});

View file

@ -1,22 +1,38 @@
/* global window */
const { last } = require('lodash');
const db = require('../database');
const settings = require('../settings');
const { runMigrations } = require('./run_migrations');
// IMPORTANT: Add new migrations that need to traverse entire database, e.g.
// messages store, below. Whenever we need this, we need to force attachment
// migration on startup:
const migrations = [
// {
// version: 0,
// migrate(transaction, next) {
// next();
// },
// },
// These are cleanup migrations, to be run after migration to SQLCipher
exports.migrations = [
{
version: 19,
migrate(transaction, next) {
window.log.info('Migration 19');
window.log.info(
'Removing messages, unprocessed, and conversations object stores'
);
// This should be run after things are migrated to SQLCipher
transaction.db.deleteObjectStore('messages');
transaction.db.deleteObjectStore('unprocessed');
transaction.db.deleteObjectStore('conversations');
next();
},
},
];
exports.run = async ({ Backbone, database, logger } = {}) => {
exports.run = async ({ Backbone, logger } = {}) => {
const database = {
id: 'signal',
nolog: true,
migrations: exports.migrations,
};
const { canRun } = await exports.getStatus({ database });
if (!canRun) {
throw new Error(
@ -24,7 +40,11 @@ exports.run = async ({ Backbone, database, logger } = {}) => {
);
}
await runMigrations({ Backbone, database, logger });
await runMigrations({
Backbone,
logger,
database,
});
};
exports.getStatus = async ({ database } = {}) => {
@ -32,7 +52,7 @@ exports.getStatus = async ({ database } = {}) => {
const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete(
connection
);
const hasMigrations = migrations.length > 0;
const hasMigrations = exports.migrations.length > 0;
const canRun = isAttachmentMigrationComplete && hasMigrations;
return {
@ -43,7 +63,7 @@ exports.getStatus = async ({ database } = {}) => {
};
exports.getLatestVersion = () => {
const lastMigration = last(migrations);
const lastMigration = last(exports.migrations);
if (!lastMigration) {
return null;
}

View file

@ -58,6 +58,7 @@ const {
// Migrations
const {
getPlaceholderMigrations,
getCurrentVersion,
} = require('./migrations/get_placeholder_migrations');
const Migrations0DatabaseWithAttachmentData = require('./migrations/migrations_0_database_with_attachment_data');
@ -67,7 +68,7 @@ const Migrations1DatabaseWithoutAttachmentData = require('./migrations/migration
const AttachmentType = require('./types/attachment');
const VisualAttachment = require('./types/visual_attachment');
const Contact = require('../../ts/types/Contact');
const Conversation = require('../../ts/types/Conversation');
const Conversation = require('./types/conversation');
const Errors = require('./types/errors');
const MediaGalleryMessage = require('../../ts/components/conversation/media-gallery/types/Message');
const MessageType = require('./types/message');
@ -123,11 +124,14 @@ function initializeMigrations({
}),
getAbsoluteAttachmentPath,
getPlaceholderMigrations,
getCurrentVersion,
loadAttachmentData,
loadQuoteData,
loadMessage: MessageType.createAttachmentLoader(loadAttachmentData),
Migrations0DatabaseWithAttachmentData,
Migrations1DatabaseWithoutAttachmentData,
writeNewAttachmentData: createWriterForNew(attachmentsPath),
deleteAttachmentData: deleteOnDisk,
upgradeMessageSchema: (message, options = {}) => {
const { maxVersion } = options;

View file

@ -0,0 +1,133 @@
/* global dcodeIO, crypto */
const { isFunction, isNumber } = require('lodash');
const { createLastMessageUpdate } = require('../../../ts/types/Conversation');
async function computeHash(arraybuffer) {
const hash = await crypto.subtle.digest({ name: 'SHA-512' }, arraybuffer);
return arrayBufferToBase64(hash);
}
function arrayBufferToBase64(arraybuffer) {
return dcodeIO.ByteBuffer.wrap(arraybuffer).toString('base64');
}
function base64ToArrayBuffer(base64) {
return dcodeIO.ByteBuffer.wrap(base64, 'base64').toArrayBuffer();
}
function buildAvatarUpdater({ field }) {
return async (conversation, data, options = {}) => {
if (!conversation) {
return conversation;
}
const avatar = conversation[field];
const { writeNewAttachmentData, deleteAttachmentData } = options;
if (!isFunction(writeNewAttachmentData)) {
throw new Error(
'Conversation.buildAvatarUpdater: writeNewAttachmentData must be a function'
);
}
if (!isFunction(deleteAttachmentData)) {
throw new Error(
'Conversation.buildAvatarUpdater: deleteAttachmentData must be a function'
);
}
const newHash = await computeHash(data);
if (!avatar || !avatar.hash) {
return {
...conversation,
avatar: {
hash: newHash,
path: await writeNewAttachmentData(data),
},
};
}
const { hash, path } = avatar;
if (hash === newHash) {
return conversation;
}
await deleteAttachmentData(path);
return {
...conversation,
avatar: {
hash: newHash,
path: await writeNewAttachmentData(data),
},
};
};
}
const maybeUpdateAvatar = buildAvatarUpdater({ field: 'avatar' });
const maybeUpdateProfileAvatar = buildAvatarUpdater({
field: 'profileAvatar',
});
async function upgradeToVersion2(conversation, options) {
if (conversation.version >= 2) {
return conversation;
}
const { writeNewAttachmentData } = options;
if (!isFunction(writeNewAttachmentData)) {
throw new Error(
'Conversation.upgradeToVersion2: writeNewAttachmentData must be a function'
);
}
let { avatar, profileAvatar, profileKey } = conversation;
if (avatar && avatar.data) {
avatar = {
hash: await computeHash(avatar.data),
path: await writeNewAttachmentData(avatar.data),
};
}
if (profileAvatar && profileAvatar.data) {
profileAvatar = {
hash: await computeHash(profileAvatar.data),
path: await writeNewAttachmentData(profileAvatar.data),
};
}
if (profileKey && profileKey.byteLength) {
profileKey = arrayBufferToBase64(profileKey);
}
return {
...conversation,
version: 2,
avatar,
profileAvatar,
profileKey,
};
}
async function migrateConversation(conversation, options = {}) {
if (!conversation) {
return conversation;
}
if (!isNumber(conversation.version)) {
// eslint-disable-next-line no-param-reassign
conversation.version = 1;
}
return upgradeToVersion2(conversation, options);
}
module.exports = {
migrateConversation,
maybeUpdateAvatar,
maybeUpdateProfileAvatar,
createLastMessageUpdate,
arrayBufferToBase64,
base64ToArrayBuffer,
};