From 71ca3c07646a48ebbc9fce3d8524fbcbef0d1eb1 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Tue, 14 Dec 2021 02:25:44 +0100 Subject: [PATCH] Drop old unprocessed envelopes --- ts/sql/Server.ts | 13 +++++++++++ ts/test-electron/SignalProtocolStore_test.ts | 24 ++++++++++++++------ ts/util/durations.ts | 1 + 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 4f5bf49a7286..38db7c3aff0d 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -3028,6 +3028,19 @@ async function getUnprocessedCount(): Promise { async function getAllUnprocessed(): Promise> { const db = getInstance(); + + const { changes: deletedCount } = db + .prepare('DELETE FROM unprocessed WHERE timestamp < $monthAgo') + .run({ + monthAgo: Date.now() - durations.MONTH, + }); + + if (deletedCount !== 0) { + logger.warn( + `getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes` + ); + } + const rows = db .prepare( ` diff --git a/ts/test-electron/SignalProtocolStore_test.ts b/ts/test-electron/SignalProtocolStore_test.ts index 38312217a093..4b30f1dacec6 100644 --- a/ts/test-electron/SignalProtocolStore_test.ts +++ b/ts/test-electron/SignalProtocolStore_test.ts @@ -13,6 +13,7 @@ import { import { signal } from '../protobuf/compiled'; import { sessionStructureToBytes } from '../util/sessionTranslation'; +import * as durations from '../util/durations'; import { Zone } from '../util/Zone'; import * as Bytes from '../Bytes'; @@ -1477,7 +1478,7 @@ describe('SignalProtocolStore', () => { { id: '2-two', envelope: 'second', - timestamp: 2, + timestamp: Date.now() + 2, version: 2, attempts: 0, }, @@ -1617,6 +1618,8 @@ describe('SignalProtocolStore', () => { }); describe('Not yet processed messages', () => { + const NOW = Date.now(); + beforeEach(async () => { await store.removeAllUnprocessed(); const items = await store.getAllUnprocessed(); @@ -1625,24 +1628,31 @@ describe('SignalProtocolStore', () => { it('adds three and gets them back', async () => { await Promise.all([ + store.addUnprocessed({ + id: '0-dropped', + envelope: 'old envelope', + timestamp: NOW - 2 * durations.MONTH, + version: 2, + attempts: 0, + }), store.addUnprocessed({ id: '2-two', envelope: 'second', - timestamp: 2, + timestamp: NOW + 2, version: 2, attempts: 0, }), store.addUnprocessed({ id: '3-three', envelope: 'third', - timestamp: 3, + timestamp: NOW + 3, version: 2, attempts: 0, }), store.addUnprocessed({ id: '1-one', envelope: 'first', - timestamp: 1, + timestamp: NOW + 1, version: 2, attempts: 0, }), @@ -1662,7 +1672,7 @@ describe('SignalProtocolStore', () => { await store.addUnprocessed({ id, envelope: 'first', - timestamp: 1, + timestamp: NOW + 1, version: 2, attempts: 0, }); @@ -1671,7 +1681,7 @@ describe('SignalProtocolStore', () => { const items = await store.getAllUnprocessed(); assert.strictEqual(items.length, 1); assert.strictEqual(items[0].decrypted, 'updated'); - assert.strictEqual(items[0].timestamp, 1); + assert.strictEqual(items[0].timestamp, NOW + 1); }); it('removeUnprocessed successfully deletes item', async () => { @@ -1679,7 +1689,7 @@ describe('SignalProtocolStore', () => { await store.addUnprocessed({ id, envelope: 'first', - timestamp: 1, + timestamp: NOW + 1, version: 2, attempts: 0, }); diff --git a/ts/util/durations.ts b/ts/util/durations.ts index b4c22fa977fb..95717ab795fd 100644 --- a/ts/util/durations.ts +++ b/ts/util/durations.ts @@ -6,3 +6,4 @@ export const MINUTE = SECOND * 60; export const HOUR = MINUTE * 60; export const DAY = HOUR * 24; export const WEEK = DAY * 7; +export const MONTH = DAY * 30;