diff --git a/app/sql.js b/app/sql.js index 1d3cf93256..b49cb0bea0 100644 --- a/app/sql.js +++ b/app/sql.js @@ -94,6 +94,8 @@ module.exports = { getUnprocessedCount, getAllUnprocessed, saveUnprocessed, + updateUnprocessedAttempts, + updateUnprocessedWithData, getUnprocessedById, saveUnprocesseds, removeUnprocessed, @@ -563,6 +565,66 @@ async function updateToSchemaVersion9(currentVersion, instance) { console.log('updateToSchemaVersion9: success!'); } +async function updateToSchemaVersion10(currentVersion, instance) { + if (currentVersion >= 10) { + return; + } + console.log('updateToSchemaVersion10: starting...'); + await instance.run('BEGIN TRANSACTION;'); + + await instance.run('DROP INDEX unprocessed_id;'); + await instance.run('DROP INDEX unprocessed_timestamp;'); + await instance.run('ALTER TABLE unprocessed RENAME TO unprocessed_old;'); + + await instance.run(`CREATE TABLE unprocessed( + id STRING, + timestamp INTEGER, + version INTEGER, + attempts INTEGER, + envelope TEXT, + decrypted TEXT, + source TEXT, + sourceDevice TEXT, + serverTimestamp INTEGER + );`); + + await instance.run(`CREATE INDEX unprocessed_id ON unprocessed ( + id + );`); + await instance.run(`CREATE INDEX unprocessed_timestamp ON unprocessed ( + timestamp + );`); + + await instance.run(`INSERT INTO unprocessed ( + id, + timestamp, + version, + attempts, + envelope, + decrypted, + source, + sourceDevice, + serverTimestamp + ) SELECT + id, + timestamp, + json_extract(json, '$.version'), + json_extract(json, '$.attempts'), + json_extract(json, '$.envelope'), + json_extract(json, '$.decrypted'), + json_extract(json, '$.source'), + json_extract(json, '$.sourceDevice'), + json_extract(json, '$.serverTimestamp') + FROM unprocessed_old; + `); + + await instance.run('DROP TABLE unprocessed_old;'); + + await instance.run('PRAGMA schema_version = 10;'); + await instance.run('COMMIT TRANSACTION;'); + console.log('updateToSchemaVersion10: success!'); +} + const SCHEMA_VERSIONS = [ updateToSchemaVersion1, updateToSchemaVersion2, @@ -573,6 +635,7 @@ const SCHEMA_VERSIONS = [ updateToSchemaVersion7, updateToSchemaVersion8, updateToSchemaVersion9, + updateToSchemaVersion10, ]; async function updateSchema(instance) { @@ -1424,23 +1487,32 @@ async function getNextExpiringMessage() { } async function saveUnprocessed(data, { forceSave } = {}) { - const { id, timestamp } = data; + const { id, timestamp, version, attempts, envelope } = data; + if (!id) { + throw new Error('saveUnprocessed: id was falsey'); + } if (forceSave) { await db.run( `INSERT INTO unprocessed ( id, timestamp, - json + version, + attempts, + envelope ) values ( $id, $timestamp, - $json + $version, + $attempts, + $envelope );`, { $id: id, $timestamp: timestamp, - $json: objectToJSON(data), + $version: version, + $attempts: attempts, + $envelope: envelope, } ); @@ -1449,13 +1521,17 @@ async function saveUnprocessed(data, { forceSave } = {}) { await db.run( `UPDATE unprocessed SET - json = $json, - timestamp = $timestamp + timestamp = $timestamp, + version = $version, + attempts = $attempts, + envelope = $envelope WHERE id = $id;`, { $id: id, $timestamp: timestamp, - $json: objectToJSON(data), + $version: version, + $attempts: attempts, + $envelope: envelope, } ); @@ -1478,16 +1554,38 @@ async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) { await promise; } +async function updateUnprocessedAttempts(id, attempts) { + await db.run('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;', { + $id: id, + $attempts: attempts, + }); +} +async function updateUnprocessedWithData(id, data = {}) { + const { source, sourceDevice, serverTimestamp, decrypted } = data; + + await db.run( + `UPDATE unprocessed SET + source = $source, + sourceDevice = $sourceDevice, + serverTimestamp = $serverTimestamp, + decrypted = $decrypted + WHERE id = $id;`, + { + $id: id, + $source: source, + $sourceDevice: sourceDevice, + $serverTimestamp: serverTimestamp, + $decrypted: decrypted, + } + ); +} + async function getUnprocessedById(id) { - const row = await db.get('SELECT json FROM unprocessed WHERE id = $id;', { + const row = await db.get('SELECT * FROM unprocessed WHERE id = $id;', { $id: id, }); - if (!row) { - return null; - } - - return jsonToObject(row.json); + return row; } async function getUnprocessedCount() { @@ -1502,10 +1600,10 @@ async function getUnprocessedCount() { async function getAllUnprocessed() { const rows = await db.all( - 'SELECT json FROM unprocessed ORDER BY timestamp ASC;' + 'SELECT * FROM unprocessed ORDER BY timestamp ASC;' ); - return map(rows, row => jsonToObject(row.json)); + return rows; } async function removeUnprocessed(id) { diff --git a/js/modules/data.js b/js/modules/data.js index 1b93a8d767..2b1c207fc7 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -131,6 +131,8 @@ module.exports = { getUnprocessedById, saveUnprocessed, saveUnprocesseds, + updateUnprocessedAttempts, + updateUnprocessedWithData, removeUnprocessed, removeAllUnprocessed, @@ -848,13 +850,8 @@ async function getAllUnprocessed() { return channels.getAllUnprocessed(); } -async function getUnprocessedById(id, { Unprocessed }) { - const unprocessed = await channels.getUnprocessedById(id); - if (!unprocessed) { - return null; - } - - return new Unprocessed(unprocessed); +async function getUnprocessedById(id) { + return channels.getUnprocessedById(id); } async function saveUnprocessed(data, { forceSave } = {}) { @@ -868,6 +865,13 @@ async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) { }); } +async function updateUnprocessedAttempts(id, attempts) { + await channels.updateUnprocessedAttempts(id, attempts); +} +async function updateUnprocessedWithData(id, data) { + await channels.updateUnprocessedWithData(id, data); +} + async function removeUnprocessed(id) { await channels.removeUnprocessed(id); } diff --git a/js/signal_protocol_store.js b/js/signal_protocol_store.js index ebbde8ceb7..bf27c2850a 100644 --- a/js/signal_protocol_store.js +++ b/js/signal_protocol_store.js @@ -98,7 +98,6 @@ return result === 0; } - const Unprocessed = Backbone.Model.extend(); const IdentityRecord = Backbone.Model.extend({ storeName: 'identityKeys', validAttributes: [ @@ -872,21 +871,23 @@ return window.Signal.Data.getAllUnprocessed(); }, getUnprocessedById(id) { - return window.Signal.Data.getUnprocessedById(id, { Unprocessed }); + return window.Signal.Data.getUnprocessedById(id); }, addUnprocessed(data) { // We need to pass forceSave because the data has an id already, which will cause // an update instead of an insert. return window.Signal.Data.saveUnprocessed(data, { forceSave: true, - Unprocessed, }); }, - saveUnprocessed(data) { - return window.Signal.Data.saveUnprocessed(data, { Unprocessed }); + updateUnprocessedAttempts(id, attempts) { + return window.Signal.Data.updateUnprocessedAttempts(id, attempts); + }, + updateUnprocessedWithData(id, data) { + return window.Signal.Data.updateUnprocessedWithData(id, data); }, removeUnprocessed(id) { - return window.Signal.Data.removeUnprocessed(id, { Unprocessed }); + return window.Signal.Data.removeUnprocessed(id); }, removeAllUnprocessed() { return window.Signal.Data.removeAllUnprocessed(); diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index 2bf13f3644..a4109eb5d2 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -498,7 +498,10 @@ MessageReceiver.prototype.extend({ ); await textsecure.storage.unprocessed.remove(item.id); } else { - await textsecure.storage.unprocessed.save({ ...item, attempts }); + await textsecure.storage.unprocessed.updateAttempts( + item.id, + attempts + ); } } catch (error) { window.log.error( @@ -532,23 +535,19 @@ MessageReceiver.prototype.extend({ return null; } - if (item.get('version') === 2) { - item.set({ - source: envelope.source, - sourceDevice: envelope.sourceDevice, - serverTimestamp: envelope.serverTimestamp, - decrypted: await MessageReceiver.arrayBufferToStringBase64(plaintext), - }); + item.source = envelope.source; + item.sourceDevice = envelope.sourceDevice; + item.serverTimestamp = envelope.serverTimestamp; + + if (item.version === 2) { + item.decrypted = await MessageReceiver.arrayBufferToStringBase64( + plaintext + ); } else { - item.set({ - source: envelope.source, - sourceDevice: envelope.sourceDevice, - serverTimestamp: envelope.serverTimestamp, - decrypted: await MessageReceiver.arrayBufferToString(plaintext), - }); + item.decrypted = await MessageReceiver.arrayBufferToString(plaintext); } - return textsecure.storage.unprocessed.save(item.attributes); + return textsecure.storage.unprocessed.addDecryptedData(item.id, item); }, removeFromCache(envelope) { const { id } = envelope; diff --git a/libtextsecure/storage/unprocessed.js b/libtextsecure/storage/unprocessed.js index 91bc0e255e..ac113362f7 100644 --- a/libtextsecure/storage/unprocessed.js +++ b/libtextsecure/storage/unprocessed.js @@ -21,8 +21,14 @@ add(data) { return textsecure.storage.protocol.addUnprocessed(data); }, - save(data) { - return textsecure.storage.protocol.saveUnprocessed(data); + updateAttempts(id, attempts) { + return textsecure.storage.protocol.updateUnprocessedAttempts( + id, + attempts + ); + }, + addDecryptedData(id, data) { + return textsecure.storage.protocol.updateUnprocessedWithData(id, data); }, remove(id) { return textsecure.storage.protocol.removeUnprocessed(id); diff --git a/test/storage_test.js b/test/storage_test.js index 1874915ed6..890b262776 100644 --- a/test/storage_test.js +++ b/test/storage_test.js @@ -994,36 +994,36 @@ describe('SignalProtocolStore', () => { assert.strictEqual(items.length, 0); }); - it('adds two and gets them back', async () => { + it('adds three and gets them back', async () => { await Promise.all([ - store.addUnprocessed({ id: 2, name: 'second', timestamp: 2 }), - store.addUnprocessed({ id: 3, name: 'third', timestamp: 3 }), - store.addUnprocessed({ id: 1, name: 'first', timestamp: 1 }), + store.addUnprocessed({ id: 2, envelope: 'second', timestamp: 2 }), + store.addUnprocessed({ id: 3, envelope: 'third', timestamp: 3 }), + store.addUnprocessed({ id: 1, envelope: 'first', timestamp: 1 }), ]); const items = await store.getAllUnprocessed(); assert.strictEqual(items.length, 3); // they are in the proper order because the collection comparator is 'timestamp' - assert.strictEqual(items[0].name, 'first'); - assert.strictEqual(items[1].name, 'second'); - assert.strictEqual(items[2].name, 'third'); + assert.strictEqual(items[0].envelope, 'first'); + assert.strictEqual(items[1].envelope, 'second'); + assert.strictEqual(items[2].envelope, 'third'); }); it('saveUnprocessed successfully updates item', async () => { const id = 1; - await store.addUnprocessed({ id, name: 'first', timestamp: 1 }); - await store.saveUnprocessed({ id, name: 'updated', timestamp: 1 }); + await store.addUnprocessed({ id, envelope: 'first', timestamp: 1 }); + await store.updateUnprocessedWithData(id, { decrypted: 'updated' }); const items = await store.getAllUnprocessed(); assert.strictEqual(items.length, 1); - assert.strictEqual(items[0].name, 'updated'); + assert.strictEqual(items[0].decrypted, 'updated'); assert.strictEqual(items[0].timestamp, 1); }); it('removeUnprocessed successfully deletes item', async () => { const id = 1; - await store.addUnprocessed({ id, name: 'first', timestamp: 1 }); + await store.addUnprocessed({ id, envelope: 'first', timestamp: 1 }); await store.removeUnprocessed(id); const items = await store.getAllUnprocessed();