diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 03d14758e..4ea675697 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -375,6 +375,11 @@ export type StickerPackRefType = Readonly<{ export type UnprocessedType = { id: string; timestamp: number; + /* + * A client generated date used for removing old envelopes from the table + * on startup. + */ + receivedAtDate: number; receivedAtCounter: number; attempts: number; type: number; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index bad24d8b7..c9df8c42d 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -4629,6 +4629,7 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { const { id, timestamp, + receivedAtDate, receivedAtCounter, attempts, type, @@ -4659,6 +4660,7 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { id, timestamp, receivedAtCounter, + receivedAtDate, attempts, type, isEncrypted, @@ -4680,6 +4682,7 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { $id, $timestamp, $receivedAtCounter, + $receivedAtDate, $attempts, $type, $isEncrypted, @@ -4702,7 +4705,8 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { ).run({ id, timestamp, - receivedAtCounter: receivedAtCounter ?? null, + receivedAtCounter, + receivedAtDate, attempts, type, isEncrypted: isEncrypted ? 1 : 0, @@ -4750,9 +4754,11 @@ function getAllUnprocessedIds(db: WritableDB): Array { return db.transaction(() => { // cleanup first const { changes: deletedStaleCount } = db - .prepare('DELETE FROM unprocessed WHERE timestamp < $monthAgo') + .prepare( + 'DELETE FROM unprocessed WHERE receivedAtDate < $messageQueueCutoff' + ) .run({ - monthAgo: Date.now() - 45 * durations.DAY, + messageQueueCutoff: Date.now() - 45 * durations.DAY, }); if (deletedStaleCount !== 0) { diff --git a/ts/sql/migrations/1320-unprocessed-received-at-date.ts b/ts/sql/migrations/1320-unprocessed-received-at-date.ts new file mode 100644 index 000000000..4a4156204 --- /dev/null +++ b/ts/sql/migrations/1320-unprocessed-received-at-date.ts @@ -0,0 +1,38 @@ +// Copyright 2025 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { LoggerType } from '../../types/Logging'; +import { sql } from '../util'; +import type { WritableDB } from '../Interface'; + +export const version = 1320; + +export function updateToSchemaVersion1320( + currentVersion: number, + db: WritableDB, + logger: LoggerType +): void { + if (currentVersion >= 1320) { + return; + } + + db.transaction(() => { + const [query] = sql` + DROP INDEX unprocessed_timestamp; + + ALTER TABLE unprocessed + ADD COLUMN receivedAtDate INTEGER DEFAULT 0 NOT NULL; + + UPDATE unprocessed + SET receivedAtDate = timestamp; + + CREATE INDEX unprocessed_byReceivedAtDate ON unprocessed + (receivedAtDate); + `; + db.exec(query); + + db.pragma('user_version = 1320'); + })(); + + logger.info('updateToSchemaVersion1320: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index 15d879db3..a7db78ca8 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -107,10 +107,11 @@ import { updateToSchemaVersion1270 } from './1270-normalize-messages'; import { updateToSchemaVersion1280 } from './1280-blob-unprocessed'; import { updateToSchemaVersion1290 } from './1290-int-unprocessed-source-device'; import { updateToSchemaVersion1300 } from './1300-sticker-pack-refs'; +import { updateToSchemaVersion1310 } from './1310-muted-fixup'; import { - updateToSchemaVersion1310, + updateToSchemaVersion1320, version as MAX_VERSION, -} from './1310-muted-fixup'; +} from './1320-unprocessed-received-at-date'; import { DataWriter } from '../Server'; function updateToSchemaVersion1( @@ -2089,6 +2090,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1300, updateToSchemaVersion1310, + updateToSchemaVersion1320, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/test-electron/SignalProtocolStore_test.ts b/ts/test-electron/SignalProtocolStore_test.ts index cb654a0f9..723be4d19 100644 --- a/ts/test-electron/SignalProtocolStore_test.ts +++ b/ts/test-electron/SignalProtocolStore_test.ts @@ -54,6 +54,8 @@ describe('SignalProtocolStore', () => { let identityKey: KeyPairType; let testKey: KeyPairType; + const NOW = Date.now(); + const unprocessedDefaults = { type: 1, messageAgeSec: 1, @@ -73,6 +75,7 @@ describe('SignalProtocolStore', () => { isEncrypted: true, content: Buffer.from('content'), + timestamp: NOW, }; function getSessionRecord(isOpen?: boolean): SessionRecord { @@ -1254,7 +1257,7 @@ describe('SignalProtocolStore', () => { id: '2-two', content: Buffer.from('second'), - timestamp: Date.now() + 2, + receivedAtDate: Date.now() + 2, }, { zone } ); @@ -1312,7 +1315,7 @@ describe('SignalProtocolStore', () => { id: '2-two', content: Buffer.from('second'), - timestamp: 2, + receivedAtDate: 2, }, { zone } ); @@ -1436,8 +1439,6 @@ describe('SignalProtocolStore', () => { }); describe('Not yet processed messages', () => { - const NOW = Date.now(); - beforeEach(async () => { await store.removeAllUnprocessed(); const items = await store.getUnprocessedByIdsAndIncrementAttempts( @@ -1454,7 +1455,7 @@ describe('SignalProtocolStore', () => { content: Buffer.from('old envelope'), receivedAtCounter: -1, - timestamp: NOW - 2 * durations.MONTH, + receivedAtDate: NOW - 2 * durations.MONTH, }), store.addUnprocessed({ ...unprocessedDefaults, @@ -1462,7 +1463,7 @@ describe('SignalProtocolStore', () => { content: Buffer.from('second'), receivedAtCounter: 1, - timestamp: NOW + 2, + receivedAtDate: NOW + 2, }), store.addUnprocessed({ ...unprocessedDefaults, @@ -1470,7 +1471,7 @@ describe('SignalProtocolStore', () => { content: Buffer.from('third'), receivedAtCounter: 2, - timestamp: NOW + 3, + receivedAtDate: NOW + 3, }), store.addUnprocessed({ ...unprocessedDefaults, @@ -1478,7 +1479,7 @@ describe('SignalProtocolStore', () => { content: Buffer.from('first'), receivedAtCounter: 0, - timestamp: NOW + 1, + receivedAtDate: NOW + 1, }), ]); @@ -1501,7 +1502,7 @@ describe('SignalProtocolStore', () => { id, - timestamp: NOW + 1, + receivedAtDate: NOW + 1, }); await store.removeUnprocessed(id); @@ -1518,7 +1519,7 @@ describe('SignalProtocolStore', () => { id: '1-one', attempts: 10, - timestamp: NOW + 1, + receivedAtDate: NOW + 1, }); const items = await store.getUnprocessedByIdsAndIncrementAttempts( diff --git a/ts/test-mock/messaging/unprocessed_test.ts b/ts/test-mock/messaging/unprocessed_test.ts index 0176af7ea..7edb02fb2 100644 --- a/ts/test-mock/messaging/unprocessed_test.ts +++ b/ts/test-mock/messaging/unprocessed_test.ts @@ -65,6 +65,7 @@ describe('unprocessed', function (this: Mocha.Suite) { sends.push( alice.sendText(desktop, `hello: ${i}`, { timestamp: bootstrap.getTimestamp(), + sealed: i % 2 === 0, }) ); } @@ -91,6 +92,7 @@ describe('unprocessed', function (this: Mocha.Suite) { .locator(`[data-testid="${alice.device.aci}"] >> "${alice.profileName}"`) .click(); + await page.locator('.module-message__text >> "hello: 4"').waitFor(); await page.locator('.module-message__text >> "hello: 5"').waitFor(); }); }); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index f8837deb0..c16711e58 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -874,9 +874,8 @@ export default class MessageReceiver const envelope: ProcessedEnvelope = { id: item.id, - receivedAtCounter: item.receivedAtCounter ?? item.timestamp, - receivedAtDate: - item.receivedAtCounter == null ? Date.now() : item.timestamp, + receivedAtCounter: item.receivedAtCounter, + receivedAtDate: item.receivedAtDate, messageAgeSec: item.messageAgeSec, // Proto.Envelope fields @@ -1189,11 +1188,8 @@ export default class MessageReceiver sourceServiceId: envelope.sourceServiceId, sourceDevice: envelope.sourceDevice, destinationServiceId: envelope.destinationServiceId, - - // This field is only used for aging items out of the cache. The original - // envelope's timestamp will be used when retrying this item. - timestamp: envelope.receivedAtDate, - + timestamp: envelope.timestamp, + receivedAtDate: envelope.receivedAtDate, attempts: 0, isEncrypted: true, content: envelope.content,