Remove JSON column and Backbone Model for unprocessed

This commit is contained in:
Scott Nonnenberg 2019-02-04 17:23:50 -08:00
parent 9c540ab977
commit 041fe4be05
6 changed files with 164 additions and 56 deletions

View file

@ -94,6 +94,8 @@ module.exports = {
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessed, getAllUnprocessed,
saveUnprocessed, saveUnprocessed,
updateUnprocessedAttempts,
updateUnprocessedWithData,
getUnprocessedById, getUnprocessedById,
saveUnprocesseds, saveUnprocesseds,
removeUnprocessed, removeUnprocessed,
@ -563,6 +565,66 @@ async function updateToSchemaVersion9(currentVersion, instance) {
console.log('updateToSchemaVersion9: success!'); console.log('updateToSchemaVersion9: success!');
} }
async function updateToSchemaVersion10(currentVersion, instance) {
if (currentVersion >= 10) {
return;
}
console.log('updateToSchemaVersion10: starting...');
await instance.run('BEGIN TRANSACTION;');
await instance.run('DROP INDEX unprocessed_id;');
await instance.run('DROP INDEX unprocessed_timestamp;');
await instance.run('ALTER TABLE unprocessed RENAME TO unprocessed_old;');
await instance.run(`CREATE TABLE unprocessed(
id STRING,
timestamp INTEGER,
version INTEGER,
attempts INTEGER,
envelope TEXT,
decrypted TEXT,
source TEXT,
sourceDevice TEXT,
serverTimestamp INTEGER
);`);
await instance.run(`CREATE INDEX unprocessed_id ON unprocessed (
id
);`);
await instance.run(`CREATE INDEX unprocessed_timestamp ON unprocessed (
timestamp
);`);
await instance.run(`INSERT INTO unprocessed (
id,
timestamp,
version,
attempts,
envelope,
decrypted,
source,
sourceDevice,
serverTimestamp
) SELECT
id,
timestamp,
json_extract(json, '$.version'),
json_extract(json, '$.attempts'),
json_extract(json, '$.envelope'),
json_extract(json, '$.decrypted'),
json_extract(json, '$.source'),
json_extract(json, '$.sourceDevice'),
json_extract(json, '$.serverTimestamp')
FROM unprocessed_old;
`);
await instance.run('DROP TABLE unprocessed_old;');
await instance.run('PRAGMA schema_version = 10;');
await instance.run('COMMIT TRANSACTION;');
console.log('updateToSchemaVersion10: success!');
}
const SCHEMA_VERSIONS = [ const SCHEMA_VERSIONS = [
updateToSchemaVersion1, updateToSchemaVersion1,
updateToSchemaVersion2, updateToSchemaVersion2,
@ -573,6 +635,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion7, updateToSchemaVersion7,
updateToSchemaVersion8, updateToSchemaVersion8,
updateToSchemaVersion9, updateToSchemaVersion9,
updateToSchemaVersion10,
]; ];
async function updateSchema(instance) { async function updateSchema(instance) {
@ -1424,23 +1487,32 @@ async function getNextExpiringMessage() {
} }
async function saveUnprocessed(data, { forceSave } = {}) { async function saveUnprocessed(data, { forceSave } = {}) {
const { id, timestamp } = data; const { id, timestamp, version, attempts, envelope } = data;
if (!id) {
throw new Error('saveUnprocessed: id was falsey');
}
if (forceSave) { if (forceSave) {
await db.run( await db.run(
`INSERT INTO unprocessed ( `INSERT INTO unprocessed (
id, id,
timestamp, timestamp,
json version,
attempts,
envelope
) values ( ) values (
$id, $id,
$timestamp, $timestamp,
$json $version,
$attempts,
$envelope
);`, );`,
{ {
$id: id, $id: id,
$timestamp: timestamp, $timestamp: timestamp,
$json: objectToJSON(data), $version: version,
$attempts: attempts,
$envelope: envelope,
} }
); );
@ -1449,13 +1521,17 @@ async function saveUnprocessed(data, { forceSave } = {}) {
await db.run( await db.run(
`UPDATE unprocessed SET `UPDATE unprocessed SET
json = $json, timestamp = $timestamp,
timestamp = $timestamp version = $version,
attempts = $attempts,
envelope = $envelope
WHERE id = $id;`, WHERE id = $id;`,
{ {
$id: id, $id: id,
$timestamp: timestamp, $timestamp: timestamp,
$json: objectToJSON(data), $version: version,
$attempts: attempts,
$envelope: envelope,
} }
); );
@ -1478,16 +1554,38 @@ async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) {
await promise; await promise;
} }
async function updateUnprocessedAttempts(id, attempts) {
await db.run('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;', {
$id: id,
$attempts: attempts,
});
}
async function updateUnprocessedWithData(id, data = {}) {
const { source, sourceDevice, serverTimestamp, decrypted } = data;
await db.run(
`UPDATE unprocessed SET
source = $source,
sourceDevice = $sourceDevice,
serverTimestamp = $serverTimestamp,
decrypted = $decrypted
WHERE id = $id;`,
{
$id: id,
$source: source,
$sourceDevice: sourceDevice,
$serverTimestamp: serverTimestamp,
$decrypted: decrypted,
}
);
}
async function getUnprocessedById(id) { async function getUnprocessedById(id) {
const row = await db.get('SELECT json FROM unprocessed WHERE id = $id;', { const row = await db.get('SELECT * FROM unprocessed WHERE id = $id;', {
$id: id, $id: id,
}); });
if (!row) { return row;
return null;
}
return jsonToObject(row.json);
} }
async function getUnprocessedCount() { async function getUnprocessedCount() {
@ -1502,10 +1600,10 @@ async function getUnprocessedCount() {
async function getAllUnprocessed() { async function getAllUnprocessed() {
const rows = await db.all( const rows = await db.all(
'SELECT json FROM unprocessed ORDER BY timestamp ASC;' 'SELECT * FROM unprocessed ORDER BY timestamp ASC;'
); );
return map(rows, row => jsonToObject(row.json)); return rows;
} }
async function removeUnprocessed(id) { async function removeUnprocessed(id) {

View file

@ -131,6 +131,8 @@ module.exports = {
getUnprocessedById, getUnprocessedById,
saveUnprocessed, saveUnprocessed,
saveUnprocesseds, saveUnprocesseds,
updateUnprocessedAttempts,
updateUnprocessedWithData,
removeUnprocessed, removeUnprocessed,
removeAllUnprocessed, removeAllUnprocessed,
@ -848,13 +850,8 @@ async function getAllUnprocessed() {
return channels.getAllUnprocessed(); return channels.getAllUnprocessed();
} }
async function getUnprocessedById(id, { Unprocessed }) { async function getUnprocessedById(id) {
const unprocessed = await channels.getUnprocessedById(id); return channels.getUnprocessedById(id);
if (!unprocessed) {
return null;
}
return new Unprocessed(unprocessed);
} }
async function saveUnprocessed(data, { forceSave } = {}) { async function saveUnprocessed(data, { forceSave } = {}) {
@ -868,6 +865,13 @@ async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) {
}); });
} }
async function updateUnprocessedAttempts(id, attempts) {
await channels.updateUnprocessedAttempts(id, attempts);
}
async function updateUnprocessedWithData(id, data) {
await channels.updateUnprocessedWithData(id, data);
}
async function removeUnprocessed(id) { async function removeUnprocessed(id) {
await channels.removeUnprocessed(id); await channels.removeUnprocessed(id);
} }

View file

@ -98,7 +98,6 @@
return result === 0; return result === 0;
} }
const Unprocessed = Backbone.Model.extend();
const IdentityRecord = Backbone.Model.extend({ const IdentityRecord = Backbone.Model.extend({
storeName: 'identityKeys', storeName: 'identityKeys',
validAttributes: [ validAttributes: [
@ -872,21 +871,23 @@
return window.Signal.Data.getAllUnprocessed(); return window.Signal.Data.getAllUnprocessed();
}, },
getUnprocessedById(id) { getUnprocessedById(id) {
return window.Signal.Data.getUnprocessedById(id, { Unprocessed }); return window.Signal.Data.getUnprocessedById(id);
}, },
addUnprocessed(data) { addUnprocessed(data) {
// We need to pass forceSave because the data has an id already, which will cause // We need to pass forceSave because the data has an id already, which will cause
// an update instead of an insert. // an update instead of an insert.
return window.Signal.Data.saveUnprocessed(data, { return window.Signal.Data.saveUnprocessed(data, {
forceSave: true, forceSave: true,
Unprocessed,
}); });
}, },
saveUnprocessed(data) { updateUnprocessedAttempts(id, attempts) {
return window.Signal.Data.saveUnprocessed(data, { Unprocessed }); return window.Signal.Data.updateUnprocessedAttempts(id, attempts);
},
updateUnprocessedWithData(id, data) {
return window.Signal.Data.updateUnprocessedWithData(id, data);
}, },
removeUnprocessed(id) { removeUnprocessed(id) {
return window.Signal.Data.removeUnprocessed(id, { Unprocessed }); return window.Signal.Data.removeUnprocessed(id);
}, },
removeAllUnprocessed() { removeAllUnprocessed() {
return window.Signal.Data.removeAllUnprocessed(); return window.Signal.Data.removeAllUnprocessed();

View file

@ -498,7 +498,10 @@ MessageReceiver.prototype.extend({
); );
await textsecure.storage.unprocessed.remove(item.id); await textsecure.storage.unprocessed.remove(item.id);
} else { } else {
await textsecure.storage.unprocessed.save({ ...item, attempts }); await textsecure.storage.unprocessed.updateAttempts(
item.id,
attempts
);
} }
} catch (error) { } catch (error) {
window.log.error( window.log.error(
@ -532,23 +535,19 @@ MessageReceiver.prototype.extend({
return null; return null;
} }
if (item.get('version') === 2) { item.source = envelope.source;
item.set({ item.sourceDevice = envelope.sourceDevice;
source: envelope.source, item.serverTimestamp = envelope.serverTimestamp;
sourceDevice: envelope.sourceDevice,
serverTimestamp: envelope.serverTimestamp, if (item.version === 2) {
decrypted: await MessageReceiver.arrayBufferToStringBase64(plaintext), item.decrypted = await MessageReceiver.arrayBufferToStringBase64(
}); plaintext
);
} else { } else {
item.set({ item.decrypted = await MessageReceiver.arrayBufferToString(plaintext);
source: envelope.source,
sourceDevice: envelope.sourceDevice,
serverTimestamp: envelope.serverTimestamp,
decrypted: await MessageReceiver.arrayBufferToString(plaintext),
});
} }
return textsecure.storage.unprocessed.save(item.attributes); return textsecure.storage.unprocessed.addDecryptedData(item.id, item);
}, },
removeFromCache(envelope) { removeFromCache(envelope) {
const { id } = envelope; const { id } = envelope;

View file

@ -21,8 +21,14 @@
add(data) { add(data) {
return textsecure.storage.protocol.addUnprocessed(data); return textsecure.storage.protocol.addUnprocessed(data);
}, },
save(data) { updateAttempts(id, attempts) {
return textsecure.storage.protocol.saveUnprocessed(data); return textsecure.storage.protocol.updateUnprocessedAttempts(
id,
attempts
);
},
addDecryptedData(id, data) {
return textsecure.storage.protocol.updateUnprocessedWithData(id, data);
}, },
remove(id) { remove(id) {
return textsecure.storage.protocol.removeUnprocessed(id); return textsecure.storage.protocol.removeUnprocessed(id);

View file

@ -994,36 +994,36 @@ describe('SignalProtocolStore', () => {
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
it('adds two and gets them back', async () => { it('adds three and gets them back', async () => {
await Promise.all([ await Promise.all([
store.addUnprocessed({ id: 2, name: 'second', timestamp: 2 }), store.addUnprocessed({ id: 2, envelope: 'second', timestamp: 2 }),
store.addUnprocessed({ id: 3, name: 'third', timestamp: 3 }), store.addUnprocessed({ id: 3, envelope: 'third', timestamp: 3 }),
store.addUnprocessed({ id: 1, name: 'first', timestamp: 1 }), store.addUnprocessed({ id: 1, envelope: 'first', timestamp: 1 }),
]); ]);
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessed();
assert.strictEqual(items.length, 3); assert.strictEqual(items.length, 3);
// they are in the proper order because the collection comparator is 'timestamp' // they are in the proper order because the collection comparator is 'timestamp'
assert.strictEqual(items[0].name, 'first'); assert.strictEqual(items[0].envelope, 'first');
assert.strictEqual(items[1].name, 'second'); assert.strictEqual(items[1].envelope, 'second');
assert.strictEqual(items[2].name, 'third'); assert.strictEqual(items[2].envelope, 'third');
}); });
it('saveUnprocessed successfully updates item', async () => { it('saveUnprocessed successfully updates item', async () => {
const id = 1; const id = 1;
await store.addUnprocessed({ id, name: 'first', timestamp: 1 }); await store.addUnprocessed({ id, envelope: 'first', timestamp: 1 });
await store.saveUnprocessed({ id, name: 'updated', timestamp: 1 }); await store.updateUnprocessedWithData(id, { decrypted: 'updated' });
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessed();
assert.strictEqual(items.length, 1); assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].name, 'updated'); assert.strictEqual(items[0].decrypted, 'updated');
assert.strictEqual(items[0].timestamp, 1); assert.strictEqual(items[0].timestamp, 1);
}); });
it('removeUnprocessed successfully deletes item', async () => { it('removeUnprocessed successfully deletes item', async () => {
const id = 1; const id = 1;
await store.addUnprocessed({ id, name: 'first', timestamp: 1 }); await store.addUnprocessed({ id, envelope: 'first', timestamp: 1 });
await store.removeUnprocessed(id); await store.removeUnprocessed(id);
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessed();