diff --git a/ts/SignalProtocolStore.ts b/ts/SignalProtocolStore.ts index c4d887c2f5c..b0cb1352cca 100644 --- a/ts/SignalProtocolStore.ts +++ b/ts/SignalProtocolStore.ts @@ -1859,9 +1859,9 @@ export class SignalProtocolStore extends EventsMixin { }); } - getAllUnprocessed(): Promise> { + getAllUnprocessedAndIncrementAttempts(): Promise> { return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => { - return window.Signal.Data.getAllUnprocessed(); + return window.Signal.Data.getAllUnprocessedAndIncrementAttempts(); }); } diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index b47a520cd2c..5ce13818d19 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -247,7 +247,7 @@ const dataInterface: ClientInterface = { migrateConversationMessages, getUnprocessedCount, - getAllUnprocessed, + getAllUnprocessedAndIncrementAttempts, getUnprocessedById, updateUnprocessedWithData, updateUnprocessedsWithData, @@ -1443,8 +1443,8 @@ async function getUnprocessedCount() { return channels.getUnprocessedCount(); } -async function getAllUnprocessed() { - return channels.getAllUnprocessed(); +async function getAllUnprocessedAndIncrementAttempts() { + return channels.getAllUnprocessedAndIncrementAttempts(); } async function getUnprocessedById(id: string) { diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 5d393af333c..dd655ca5727 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -474,7 +474,7 @@ export type DataInterface = { ) => Promise; getUnprocessedCount: () => Promise; - getAllUnprocessed: () => Promise>; + getAllUnprocessedAndIncrementAttempts: () => Promise>; updateUnprocessedWithData: ( id: string, data: UnprocessedUpdateType diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index bc7f4357bcf..0189215531e 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -244,7 +244,7 @@ const dataInterface: ServerInterface = { migrateConversationMessages, getUnprocessedCount, - getAllUnprocessed, + getAllUnprocessedAndIncrementAttempts, updateUnprocessedWithData, updateUnprocessedsWithData, getUnprocessedById, @@ -3181,32 +3181,58 @@ async function getUnprocessedCount(): Promise { return getCountFromTable(getInstance(), 'unprocessed'); } -async function getAllUnprocessed(): Promise> { +async function getAllUnprocessedAndIncrementAttempts(): Promise< + Array +> { const db = getInstance(); - const { changes: deletedCount } = db - .prepare('DELETE FROM unprocessed WHERE timestamp < $monthAgo') - .run({ - monthAgo: Date.now() - durations.MONTH, - }); + return db.transaction(() => { + const { changes: deletedStaleCount } = db + .prepare('DELETE FROM unprocessed WHERE timestamp < $monthAgo') + .run({ + monthAgo: Date.now() - durations.MONTH, + }); - if (deletedCount !== 0) { - logger.warn( - `getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes` - ); - } + if (deletedStaleCount !== 0) { + logger.warn( + 'getAllUnprocessedAndIncrementAttempts: ' + + `deleting ${deletedStaleCount} old unprocessed envelopes` + ); + } - const rows = db - .prepare( + db.prepare( ` - SELECT * - FROM unprocessed - ORDER BY timestamp ASC; + UPDATE unprocessed + SET attempts = attempts + 1 ` - ) - .all(); + ).run(); - return rows; + const { changes: deletedInvalidCount } = db + .prepare( + ` + DELETE FROM unprocessed + WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS + ` + ) + .run({ MAX_UNPROCESSED_ATTEMPTS }); + + if (deletedInvalidCount !== 0) { + logger.warn( + 'getAllUnprocessedAndIncrementAttempts: ' + + `deleting ${deletedInvalidCount} invalid unprocessed envelopes` + ); + } + + return db + .prepare( + ` + SELECT * + FROM unprocessed + ORDER BY timestamp ASC; + ` + ) + .all(); + })(); } function removeUnprocessedsSync(ids: Array): void { diff --git a/ts/test-electron/SignalProtocolStore_test.ts b/ts/test-electron/SignalProtocolStore_test.ts index e6d383e406c..ecfbee4208f 100644 --- a/ts/test-electron/SignalProtocolStore_test.ts +++ b/ts/test-electron/SignalProtocolStore_test.ts @@ -1499,7 +1499,8 @@ describe('SignalProtocolStore', () => { assert.equal(await store.loadSession(id), testSession); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); - const allUnprocessed = await store.getAllUnprocessed(); + const allUnprocessed = + await store.getAllUnprocessedAndIncrementAttempts(); assert.deepEqual( allUnprocessed.map(({ envelope }) => envelope), ['second'] @@ -1551,7 +1552,7 @@ describe('SignalProtocolStore', () => { assert.equal(await store.loadSession(id), testSession); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); - assert.deepEqual(await store.getAllUnprocessed(), []); + assert.deepEqual(await store.getAllUnprocessedAndIncrementAttempts(), []); }); it('can be re-entered', async () => { @@ -1647,7 +1648,7 @@ describe('SignalProtocolStore', () => { beforeEach(async () => { await store.removeAllUnprocessed(); - const items = await store.getAllUnprocessed(); + const items = await store.getAllUnprocessedAndIncrementAttempts(); assert.strictEqual(items.length, 0); }); @@ -1687,7 +1688,7 @@ describe('SignalProtocolStore', () => { }), ]); - const items = await store.getAllUnprocessed(); + const items = await store.getAllUnprocessedAndIncrementAttempts(); assert.strictEqual(items.length, 3); // they are in the proper order because the collection comparator is 'timestamp' @@ -1708,10 +1709,11 @@ describe('SignalProtocolStore', () => { }); await store.updateUnprocessedWithData(id, { decrypted: 'updated' }); - const items = await store.getAllUnprocessed(); + const items = await store.getAllUnprocessedAndIncrementAttempts(); assert.strictEqual(items.length, 1); assert.strictEqual(items[0].decrypted, 'updated'); assert.strictEqual(items[0].timestamp, NOW + 1); + assert.strictEqual(items[0].attempts, 1); }); it('removeUnprocessed successfully deletes item', async () => { @@ -1726,7 +1728,21 @@ describe('SignalProtocolStore', () => { }); await store.removeUnprocessed(id); - const items = await store.getAllUnprocessed(); + const items = await store.getAllUnprocessedAndIncrementAttempts(); + assert.strictEqual(items.length, 0); + }); + + it('getAllUnprocessedAndIncrementAttempts deletes items', async () => { + await store.addUnprocessed({ + id: '1-one', + envelope: 'first', + timestamp: NOW + 1, + receivedAtCounter: 0, + version: 2, + attempts: 3, + }); + + const items = await store.getAllUnprocessedAndIncrementAttempts(); assert.strictEqual(items.length, 0); }); }); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 631a865d833..631037156ab 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -802,17 +802,11 @@ export default class MessageReceiver return []; } - const items = await this.storage.protocol.getAllUnprocessed(); + const items = + await this.storage.protocol.getAllUnprocessedAndIncrementAttempts(); log.info('getAllFromCache loaded', items.length, 'saved envelopes'); - return items.map(item => { - const { attempts = 0 } = item; - - return { - ...item, - attempts: attempts + 1, - }; - }); + return items; } private async decryptAndCacheBatch(