From 4b6ef3a1edc2d2b9797439ef10436e663ab722b3 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Tue, 21 Jan 2025 13:42:14 -0800 Subject: [PATCH] Migrate unprocessed table to BLOBs --- ts/SignalProtocolStore.ts | 22 --- ts/sql/Interface.ts | 44 ++--- ts/sql/Server.ts | 109 +++++------- ts/sql/migrations/1280-blob-unprocessed.ts | 176 +++++++++++++++++++ ts/sql/migrations/index.ts | 6 +- ts/test-electron/SignalProtocolStore_test.ts | 108 +++++------- ts/test-node/sql/migration_1280_test.ts | 159 +++++++++++++++++ ts/textsecure/MessageReceiver.ts | 87 ++++----- ts/textsecure/Types.d.ts | 20 +-- ts/textsecure/messageReceiverEvents.ts | 14 +- 10 files changed, 492 insertions(+), 253 deletions(-) create mode 100644 ts/sql/migrations/1280-blob-unprocessed.ts create mode 100644 ts/test-node/sql/migration_1280_test.ts diff --git a/ts/SignalProtocolStore.ts b/ts/SignalProtocolStore.ts index 4b8182f2a93..927720b636d 100644 --- a/ts/SignalProtocolStore.ts +++ b/ts/SignalProtocolStore.ts @@ -46,7 +46,6 @@ import type { SignedPreKeyIdType, SignedPreKeyType, UnprocessedType, - UnprocessedUpdateType, CompatPreKeyType, } from './textsecure/Types.d'; import type { ServiceIdString, PniString, AciString } from './types/ServiceId'; @@ -2454,27 +2453,6 @@ export class SignalProtocolStore extends EventEmitter { }); } - updateUnprocessedWithData( - id: string, - data: UnprocessedUpdateType - ): Promise { - return this.withZone(GLOBAL_ZONE, 'updateUnprocessedWithData', async () => { - await DataWriter.updateUnprocessedWithData(id, data); - }); - } - - updateUnprocessedsWithData( - items: Array<{ id: string; data: UnprocessedUpdateType }> - ): Promise { - return this.withZone( - GLOBAL_ZONE, - 'updateUnprocessedsWithData', - async () => { - await DataWriter.updateUnprocessedsWithData(items); - } - ); - } - removeUnprocessed(idOrArray: string | Array): Promise { return this.withZone(GLOBAL_ZONE, 'removeUnprocessed', async () => { await DataWriter.removeUnprocessed(idOrArray); diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 5dfc2b64921..ff411e95b78 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -368,32 +368,24 @@ export type StickerPackType = InstalledStickerPackType & export type UnprocessedType = { id: string; timestamp: number; - receivedAtCounter: number | null; - version: number; + receivedAtCounter: number; attempts: number; - envelope?: string; + type: number; + isEncrypted: boolean; + content: Uint8Array; - messageAgeSec?: number; - source?: string; - sourceServiceId?: ServiceIdString; - sourceDevice?: number; - destinationServiceId?: ServiceIdString; - updatedPni?: PniString; - serverGuid?: string; - serverTimestamp?: number; - decrypted?: string; - urgent?: boolean; - story?: boolean; - reportingToken?: string; -}; - -export type UnprocessedUpdateType = { - source?: string; - sourceServiceId?: ServiceIdString; - sourceDevice?: number; - serverGuid?: string; - serverTimestamp?: number; - decrypted?: string; + messageAgeSec: number; + source: string | undefined; + sourceServiceId: ServiceIdString | undefined; + sourceDevice: number | undefined; + destinationServiceId: ServiceIdString; + updatedPni: PniString | undefined; + serverGuid: string; + serverTimestamp: number; + urgent: boolean; + story: boolean; + reportingToken: Uint8Array | undefined; + groupId: string | undefined; }; export type ConversationMessageStatsType = { @@ -901,10 +893,6 @@ type WritableInterface = { getUnprocessedByIdsAndIncrementAttempts: ( ids: ReadonlyArray ) => Array; - updateUnprocessedWithData: (id: string, data: UnprocessedUpdateType) => void; - updateUnprocessedsWithData: ( - array: Array<{ id: string; data: UnprocessedUpdateType }> - ) => void; removeUnprocessed: (id: string | Array) => void; /** only for testing */ diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 543c4a05d91..48fd22d40f1 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -174,7 +174,6 @@ import type { StoryReadType, UninstalledStickerPackType, UnprocessedType, - UnprocessedUpdateType, WritableDB, } from './Interface'; import { AttachmentDownloadSource, MESSAGE_COLUMNS } from './Interface'; @@ -474,8 +473,6 @@ export const DataWriter: ServerWritableInterface = { getUnprocessedByIdsAndIncrementAttempts, getAllUnprocessedIds, - updateUnprocessedWithData, - updateUnprocessedsWithData, removeUnprocessed, removeAllUnprocessed, @@ -4632,17 +4629,23 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { id, timestamp, receivedAtCounter, - version, attempts, - envelope, + type, + isEncrypted, + content, + + messageAgeSec, source, sourceServiceId, sourceDevice, + destinationServiceId, + updatedPni, serverGuid, serverTimestamp, - decrypted, urgent, story, + reportingToken, + groupId, } = data; if (!id) { throw new Error('saveUnprocessed: id was falsey'); @@ -4655,102 +4658,72 @@ function saveUnprocessed(db: WritableDB, data: UnprocessedType): string { id, timestamp, receivedAtCounter, - version, attempts, - envelope, + type, + isEncrypted, + content, + + messageAgeSec, source, sourceServiceId, sourceDevice, + destinationServiceId, + updatedPni, serverGuid, serverTimestamp, - decrypted, urgent, - story + story, + reportingToken, + groupId ) values ( $id, $timestamp, $receivedAtCounter, - $version, $attempts, - $envelope, + $type, + $isEncrypted, + $content, + + $messageAgeSec, $source, $sourceServiceId, $sourceDevice, + $destinationServiceId, + $updatedPni, $serverGuid, $serverTimestamp, - $decrypted, $urgent, - $story + $story, + $reportingToken, + $groupId ); ` ).run({ id, timestamp, receivedAtCounter: receivedAtCounter ?? null, - version, attempts, - envelope: envelope || null, + type, + isEncrypted: isEncrypted ? 1 : 0, + content, + + messageAgeSec, source: source || null, sourceServiceId: sourceServiceId || null, sourceDevice: sourceDevice || null, - serverGuid: serverGuid || null, - serverTimestamp: serverTimestamp || null, - decrypted: decrypted || null, + destinationServiceId, + updatedPni: updatedPni || null, + serverGuid, + serverTimestamp, urgent: urgent || !isBoolean(urgent) ? 1 : 0, story: story ? 1 : 0, + reportingToken: reportingToken || null, + groupId: groupId || null, }); return id; } -function updateUnprocessedWithData( - db: WritableDB, - id: string, - data: UnprocessedUpdateType -): void { - const { - source, - sourceServiceId, - sourceDevice, - serverGuid, - serverTimestamp, - decrypted, - } = data; - - prepare( - db, - ` - UPDATE unprocessed SET - source = $source, - sourceServiceId = $sourceServiceId, - sourceDevice = $sourceDevice, - serverGuid = $serverGuid, - serverTimestamp = $serverTimestamp, - decrypted = $decrypted - WHERE id = $id; - ` - ).run({ - id, - source: source || null, - sourceServiceId: sourceServiceId || null, - sourceDevice: sourceDevice || null, - serverGuid: serverGuid || null, - serverTimestamp: serverTimestamp || null, - decrypted: decrypted || null, - }); -} - -function updateUnprocessedsWithData( - db: WritableDB, - arrayOfUnprocessed: Array<{ id: string; data: UnprocessedUpdateType }> -): void { - db.transaction(() => { - for (const { id, data } of arrayOfUnprocessed) { - updateUnprocessedWithData(db, id, data); - } - })(); -} - function getUnprocessedById( db: ReadableDB, id: string @@ -4778,7 +4751,7 @@ function getAllUnprocessedIds(db: WritableDB): Array { const { changes: deletedStaleCount } = db .prepare('DELETE FROM unprocessed WHERE timestamp < $monthAgo') .run({ - monthAgo: Date.now() - durations.MONTH, + monthAgo: Date.now() - 45 * durations.DAY, }); if (deletedStaleCount !== 0) { diff --git a/ts/sql/migrations/1280-blob-unprocessed.ts b/ts/sql/migrations/1280-blob-unprocessed.ts new file mode 100644 index 00000000000..fab7579b628 --- /dev/null +++ b/ts/sql/migrations/1280-blob-unprocessed.ts @@ -0,0 +1,176 @@ +// Copyright 2025 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { v7 as getGuid } from 'uuid'; + +import type { LoggerType } from '../../types/Logging'; +import { + normalizePni, + normalizeServiceId, + toTaggedPni, + isUntaggedPniString, +} from '../../types/ServiceId'; +import { SignalService as Proto } from '../../protobuf'; +import { sql } from '../util'; +import type { WritableDB } from '../Interface'; +import { getOurUuid } from './41-uuid-keys'; + +export const version = 1280; + +export function updateToSchemaVersion1280( + currentVersion: number, + db: WritableDB, + logger: LoggerType +): void { + if (currentVersion >= 1280) { + return; + } + + db.transaction(() => { + const ourAci = getOurUuid(db); + + let rows = db.prepare('SELECT * FROM unprocessed').all(); + + const [query] = sql` + DROP TABLE unprocessed; + + CREATE TABLE unprocessed( + id TEXT NOT NULL PRIMARY KEY ASC, + type INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + attempts INTEGER NOT NULL, + receivedAtCounter INTEGER NOT NULL, + urgent INTEGER NOT NULL, + story INTEGER NOT NULL, + serverGuid TEXT NOT NULL, + serverTimestamp INTEGER NOT NULL, + isEncrypted INTEGER NOT NULL, + content BLOB NOT NULL, + messageAgeSec INTEGER NOT NULL, + destinationServiceId TEXT NOT NULL, + + -- Not present for 1:1 messages and not sealed messages + groupId TEXT, + + -- Not present for sealed envelopes + reportingToken BLOB, + source TEXT, + sourceServiceId TEXT, + sourceDevice TEXT, + + -- Present only for PNP change number + updatedPni TEXT + ) STRICT; + + CREATE INDEX unprocessed_timestamp ON unprocessed + (timestamp); + + CREATE INDEX unprocessed_byReceivedAtCounter ON unprocessed + (receivedAtCounter); + `; + db.exec(query); + + const insertStmt = db.prepare(` + INSERT INTO unprocessed + (id, type, timestamp, attempts, receivedAtCounter, urgent, story, + serverGuid, serverTimestamp, isEncrypted, content, source, + messageAgeSec, sourceServiceId, sourceDevice, + destinationServiceId, reportingToken) + VALUES + ($id, $type, $timestamp, $attempts, $receivedAtCounter, $urgent, $story, + $serverGuid, $serverTimestamp, $isEncrypted, $content, $source, + $messageAgeSec, $sourceServiceId, $sourceDevice, + $destinationServiceId, $reportingToken); + `); + + let oldEnvelopes = 0; + + if (!ourAci) { + if (rows.length) { + logger.warn( + `updateToSchemaVersion1280: no aci, dropping ${rows.length} envelopes` + ); + rows = []; + } + } + + for (const row of rows) { + const { + id, + envelope, + decrypted, + timestamp, + attempts, + version: envelopeVersion, + receivedAtCounter, + urgent, + story, + serverGuid, + serverTimestamp, + ...rest + } = row; + + // Skip old and/or invalid rows + if (envelopeVersion !== 2 || !envelope) { + oldEnvelopes += 1; + continue; + } + + try { + const decoded = Proto.Envelope.decode(Buffer.from(envelope, 'base64')); + if (!decoded.content) { + throw new Error('Missing envelope content'); + } + + const content = decrypted + ? Buffer.from(decrypted, 'base64') + : decoded.content; + + insertStmt.run({ + ...rest, + id, + type: decoded.type ?? Proto.Envelope.Type.UNKNOWN, + content, + isEncrypted: decrypted ? 0 : 1, + timestamp: timestamp || Date.now(), + attempts: attempts || 0, + receivedAtCounter: receivedAtCounter || 0, + urgent: urgent ? 1 : 0, + story: story ? 1 : 0, + serverGuid: serverGuid || getGuid(), + serverTimestamp: serverTimestamp || 0, + destinationServiceId: normalizeServiceId( + decoded.destinationServiceId || ourAci, + 'Envelope.destinationServiceId' + ), + updatedPni: isUntaggedPniString(decoded.updatedPni) + ? normalizePni( + toTaggedPni(decoded.updatedPni), + 'Envelope.updatedPni' + ) + : undefined, + // Sadly not captured previously + messageAgeSec: 0, + reportingToken: decoded.reportingToken?.length + ? decoded.reportingToken + : null, + }); + } catch (error) { + logger.warn( + 'updateToSchemaVersion1280: failed to migrate unprocessed', + id, + error + ); + } + } + + if (oldEnvelopes !== 0) { + logger.warn( + `updateToSchemaVersion1280: dropped ${oldEnvelopes} envelopes` + ); + } + + db.pragma('user_version = 1280'); + })(); + + logger.info('updateToSchemaVersion1280: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index e2f10b88581..609bf3aaade 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -103,10 +103,11 @@ import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table'; import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage'; import { updateToSchemaVersion1260 } from './1260-sync-tasks-rowid'; +import { updateToSchemaVersion1270 } from './1270-normalize-messages'; import { - updateToSchemaVersion1270, + updateToSchemaVersion1280, version as MAX_VERSION, -} from './1270-normalize-messages'; +} from './1280-blob-unprocessed'; import { DataWriter } from '../Server'; function updateToSchemaVersion1( @@ -2080,6 +2081,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1250, updateToSchemaVersion1260, updateToSchemaVersion1270, + updateToSchemaVersion1280, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/test-electron/SignalProtocolStore_test.ts b/ts/test-electron/SignalProtocolStore_test.ts index 0c1f3de9a5b..cb654a0f95a 100644 --- a/ts/test-electron/SignalProtocolStore_test.ts +++ b/ts/test-electron/SignalProtocolStore_test.ts @@ -44,6 +44,8 @@ const { SenderKeyStateStructure, } = signal.proto.storage; +const ZERO = new Uint8Array(0); + describe('SignalProtocolStore', () => { const ourAci = generateAci(); const ourPni = generatePni(); @@ -52,6 +54,27 @@ describe('SignalProtocolStore', () => { let identityKey: KeyPairType; let testKey: KeyPairType; + const unprocessedDefaults = { + type: 1, + messageAgeSec: 1, + source: undefined, + sourceDevice: undefined, + sourceServiceId: undefined, + destinationServiceId: ourAci, + reportingToken: undefined, + groupId: undefined, + updatedPni: undefined, + story: false, + urgent: false, + receivedAtCounter: 0, + serverGuid: generateUuid(), + serverTimestamp: 1, + attempts: 0, + + isEncrypted: true, + content: Buffer.from('content'), + }; + function getSessionRecord(isOpen?: boolean): SessionRecord { const proto = new RecordStructure(); @@ -1227,14 +1250,11 @@ describe('SignalProtocolStore', () => { await store.addUnprocessed( { + ...unprocessedDefaults, id: '2-two', - version: 2, - attempts: 0, - envelope: 'second', - receivedAtCounter: 0, + content: Buffer.from('second'), timestamp: Date.now() + 2, - urgent: true, }, { zone } ); @@ -1255,7 +1275,7 @@ describe('SignalProtocolStore', () => { ); assert.deepEqual( - allUnprocessed.map(({ envelope }) => envelope), + allUnprocessed.map(({ content }) => Bytes.toString(content || ZERO)), ['second'] ); }); @@ -1288,14 +1308,11 @@ describe('SignalProtocolStore', () => { await store.addUnprocessed( { + ...unprocessedDefaults, id: '2-two', - version: 2, - attempts: 0, - envelope: 'second', - receivedAtCounter: 0, + content: Buffer.from('second'), timestamp: 2, - urgent: true, }, { zone } ); @@ -1432,44 +1449,36 @@ describe('SignalProtocolStore', () => { it('adds three and gets them back', async () => { await Promise.all([ store.addUnprocessed({ + ...unprocessedDefaults, id: '0-dropped', - version: 2, - attempts: 0, - envelope: 'old envelope', + content: Buffer.from('old envelope'), receivedAtCounter: -1, timestamp: NOW - 2 * durations.MONTH, - urgent: true, }), store.addUnprocessed({ + ...unprocessedDefaults, id: '2-two', - version: 2, - attempts: 0, - envelope: 'second', + content: Buffer.from('second'), receivedAtCounter: 1, timestamp: NOW + 2, - urgent: true, }), store.addUnprocessed({ + ...unprocessedDefaults, id: '3-three', - version: 2, - attempts: 0, - envelope: 'third', + content: Buffer.from('third'), receivedAtCounter: 2, timestamp: NOW + 3, - urgent: true, }), store.addUnprocessed({ + ...unprocessedDefaults, id: '1-one', - version: 2, - attempts: 0, - envelope: 'first', + content: Buffer.from('first'), receivedAtCounter: 0, timestamp: NOW + 1, - urgent: true, }), ]); @@ -1480,46 +1489,19 @@ describe('SignalProtocolStore', () => { // they are in the proper order because the collection comparator is // 'receivedAtCounter' - assert.strictEqual(items[0].envelope, 'first'); - assert.strictEqual(items[1].envelope, 'second'); - assert.strictEqual(items[2].envelope, 'third'); - }); - - it('can updates items', async () => { - const id = '1-one'; - await store.addUnprocessed({ - id, - version: 2, - - attempts: 0, - envelope: 'first', - receivedAtCounter: 0, - timestamp: NOW + 1, - urgent: false, - }); - await store.updateUnprocessedWithData(id, { decrypted: 'updated' }); - - const items = await store.getUnprocessedByIdsAndIncrementAttempts( - await store.getAllUnprocessedIds() - ); - assert.strictEqual(items.length, 1); - assert.strictEqual(items[0].decrypted, 'updated'); - assert.strictEqual(items[0].timestamp, NOW + 1); - assert.strictEqual(items[0].attempts, 1); - assert.strictEqual(items[0].urgent, false); + assert.strictEqual(Bytes.toString(items[0].content || ZERO), 'first'); + assert.strictEqual(Bytes.toString(items[1].content || ZERO), 'second'); + assert.strictEqual(Bytes.toString(items[2].content || ZERO), 'third'); }); it('removeUnprocessed successfully deletes item', async () => { const id = '1-one'; await store.addUnprocessed({ - id, - version: 2, + ...unprocessedDefaults, + + id, - attempts: 0, - envelope: 'first', - receivedAtCounter: 0, timestamp: NOW + 1, - urgent: true, }); await store.removeUnprocessed(id); @@ -1531,14 +1513,12 @@ describe('SignalProtocolStore', () => { it('getAllUnprocessedAndIncrementAttempts deletes items', async () => { await store.addUnprocessed({ + ...unprocessedDefaults, + id: '1-one', - version: 2, attempts: 10, - envelope: 'first', - receivedAtCounter: 0, timestamp: NOW + 1, - urgent: true, }); const items = await store.getUnprocessedByIdsAndIncrementAttempts( diff --git a/ts/test-node/sql/migration_1280_test.ts b/ts/test-node/sql/migration_1280_test.ts new file mode 100644 index 00000000000..d9b2a37f18b --- /dev/null +++ b/ts/test-node/sql/migration_1280_test.ts @@ -0,0 +1,159 @@ +// Copyright 2025 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { type WritableDB } from '../../sql/Interface'; +import { SignalService as Proto } from '../../protobuf'; +import { generateAci } from '../../types/ServiceId'; +import { createDB, updateToVersion, insertData, getTableData } from './helpers'; + +describe('SQL/updateToSchemaVersion1280', () => { + let db: WritableDB; + + const OUR_ACI = generateAci(); + const THEIR_ACI = generateAci(); + + afterEach(() => { + db.close(); + }); + + beforeEach(() => { + db = createDB(); + updateToVersion(db, 1270); + + insertData(db, 'items', [ + { + id: 'uuid_id', + json: { + id: 'uuid_id', + value: `${OUR_ACI}.2`, + }, + }, + ]); + }); + + it('drops v1 envelopes', () => { + insertData(db, 'unprocessed', [ + { + id: 'old', + }, + ]); + updateToVersion(db, 1280); + + assert.deepStrictEqual(getTableData(db, 'unprocessed'), []); + }); + + it('does not drop v2 envelopes', () => { + insertData(db, 'unprocessed', [ + { + id: 'new', + version: 2, + + receivedAtCounter: 1, + story: 1, + urgent: 1, + timestamp: 4, + attempts: 5, + envelope: Buffer.from( + Proto.Envelope.encode({ + destinationServiceId: THEIR_ACI, + content: Buffer.from('encrypted1'), + reportingToken: Buffer.from('token'), + }).finish() + ).toString('base64'), + serverTimestamp: 6, + serverGuid: 'guid1', + }, + { + id: 'new-2', + version: 2, + + receivedAtCounter: 2, + story: 1, + urgent: 1, + timestamp: 4, + attempts: 5, + envelope: Buffer.from( + Proto.Envelope.encode({ + type: 3, + content: Buffer.from('encrypted2'), + }).finish() + ).toString('base64'), + serverTimestamp: 7, + serverGuid: 'guid2', + }, + { + id: 'new-3', + version: 2, + + receivedAtCounter: 3, + story: 0, + urgent: 0, + timestamp: 5, + attempts: 6, + envelope: Buffer.from( + Proto.Envelope.encode({ + content: Buffer.from('unused'), + }).finish() + ).toString('base64'), + decrypted: 'CAFE', + serverTimestamp: 8, + serverGuid: 'guid3', + }, + ]); + updateToVersion(db, 1280); + + assert.deepStrictEqual(getTableData(db, 'unprocessed'), [ + { + id: 'new', + + type: 0, + receivedAtCounter: 1, + story: 1, + urgent: 1, + messageAgeSec: 0, + timestamp: 4, + attempts: 5, + destinationServiceId: THEIR_ACI, + content: '656e6372797074656431', + isEncrypted: 1, + serverTimestamp: 6, + serverGuid: 'guid1', + reportingToken: '746f6b656e', + }, + { + id: 'new-2', + + receivedAtCounter: 2, + story: 1, + urgent: 1, + timestamp: 4, + messageAgeSec: 0, + attempts: 5, + destinationServiceId: OUR_ACI, + content: '656e6372797074656432', + isEncrypted: 1, + type: 3, + serverTimestamp: 7, + serverGuid: 'guid2', + }, + { + id: 'new-3', + + receivedAtCounter: 3, + urgent: 0, + story: 0, + timestamp: 5, + messageAgeSec: 0, + attempts: 6, + destinationServiceId: OUR_ACI, + content: '080144', + isEncrypted: 0, + type: 0, + serverTimestamp: 8, + serverGuid: 'guid3', + }, + ]); + }); +}); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 05409effb37..85ca288d328 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -5,7 +5,7 @@ import { isBoolean, isNumber, isString, noop, omit } from 'lodash'; import PQueue from 'p-queue'; -import { v4 as getGuid } from 'uuid'; +import { v7 as getGuid } from 'uuid'; import type { SealedSenderDecryptionResult, @@ -417,6 +417,7 @@ export default class MessageReceiver // Proto.Envelope fields type: decoded.type ?? Proto.Envelope.Type.UNKNOWN, + source: undefined, sourceServiceId: decoded.sourceServiceId ? normalizeServiceId( decoded.sourceServiceId, @@ -443,16 +444,17 @@ export default class MessageReceiver serverTimestamp, urgent: isBoolean(decoded.urgent) ? decoded.urgent : true, story: decoded.story ?? false, - reportingToken: decoded.reportingToken?.length + reportingToken: Bytes.isNotEmpty(decoded.reportingToken) ? decoded.reportingToken : undefined, + groupId: undefined, }; // After this point, decoding errors are not the server's // fault, and we should handle them gracefully and tell the // user they received an invalid message - this.#decryptAndCache(envelope, plaintext, request); + this.#decryptAndCache(envelope, request); this.#processedCount += 1; } catch (e) { request.respond(500, 'Bad encrypted websocket message'); @@ -865,20 +867,6 @@ export default class MessageReceiver async #queueCached(item: UnprocessedType): Promise { log.info('MessageReceiver.queueCached', item.id); try { - let envelopePlaintext: Uint8Array; - - if (item.envelope && item.version === 2) { - envelopePlaintext = Bytes.fromBase64(item.envelope); - } else if (item.envelope && typeof item.envelope === 'string') { - envelopePlaintext = Bytes.fromBinary(item.envelope); - } else { - throw new Error( - 'MessageReceiver.queueCached: item.envelope was malformed' - ); - } - - const decoded = Proto.Envelope.decode(envelopePlaintext); - const ourAci = this.#storage.user.getCheckedAci(); const envelope: ProcessedEnvelope = { @@ -886,50 +874,37 @@ export default class MessageReceiver receivedAtCounter: item.receivedAtCounter ?? item.timestamp, receivedAtDate: item.receivedAtCounter == null ? Date.now() : item.timestamp, - messageAgeSec: item.messageAgeSec || 0, + messageAgeSec: item.messageAgeSec, // Proto.Envelope fields - type: decoded.type ?? Proto.Envelope.Type.UNKNOWN, + type: item.type, source: item.source, sourceServiceId: normalizeServiceId( - item.sourceServiceId || decoded.sourceServiceId, + item.sourceServiceId, 'CachedEnvelope.sourceServiceId' ), - sourceDevice: decoded.sourceDevice || item.sourceDevice, + sourceDevice: item.sourceDevice, destinationServiceId: normalizeServiceId( - decoded.destinationServiceId || item.destinationServiceId || ourAci, + item.destinationServiceId || ourAci, 'CachedEnvelope.destinationServiceId' ), - updatedPni: isUntaggedPniString(decoded.updatedPni) + updatedPni: isUntaggedPniString(item.updatedPni) ? normalizePni( - toTaggedPni(decoded.updatedPni), + toTaggedPni(item.updatedPni), 'CachedEnvelope.updatedPni' ) : undefined, - timestamp: decoded.timestamp?.toNumber() ?? 0, - content: dropNull(decoded.content), - serverGuid: decoded.serverGuid ?? getGuid(), - serverTimestamp: - item.serverTimestamp || decoded.serverTimestamp?.toNumber() || 0, + timestamp: item.timestamp, + content: item.isEncrypted ? item.content : undefined, + serverGuid: item.serverGuid, + serverTimestamp: item.serverTimestamp, urgent: isBoolean(item.urgent) ? item.urgent : true, story: Boolean(item.story), - reportingToken: item.reportingToken - ? Bytes.fromBase64(item.reportingToken) - : undefined, + reportingToken: item.reportingToken, + groupId: item.groupId, }; - const { decrypted } = item; - if (decrypted) { - let payloadPlaintext: Uint8Array; - - if (item.version === 2) { - payloadPlaintext = Bytes.fromBase64(decrypted); - } else if (typeof decrypted === 'string') { - payloadPlaintext = Bytes.fromBinary(decrypted); - } else { - throw new Error('Cached decrypted value was not a string!'); - } - + if (!item.isEncrypted) { strictAssert( envelope.sourceServiceId, 'Decrypted envelope must have source uuid' @@ -960,7 +935,7 @@ export default class MessageReceiver async () => { void this.#queueDecryptedEnvelope( decryptedEnvelope, - payloadPlaintext + item.content ); }, `queueDecryptedEnvelope(${getEnvelopeId(decryptedEnvelope)})`, @@ -1129,7 +1104,7 @@ export default class MessageReceiver updatedPni: envelope.updatedPni, serverGuid: envelope.serverGuid, serverTimestamp: envelope.serverTimestamp, - decrypted: Bytes.toBase64(plaintext), + decrypted: plaintext, }; } ); @@ -1194,27 +1169,35 @@ export default class MessageReceiver #decryptAndCache( envelope: ProcessedEnvelope, - plaintext: Uint8Array, request: IncomingWebSocketRequest ): void { + strictAssert(envelope.content, 'Content is required for envelopes'); + const { id } = envelope; const data: UnprocessedType = { id, - version: 2, + type: envelope.type, + source: envelope.source, + sourceServiceId: envelope.sourceServiceId, + sourceDevice: envelope.sourceDevice, + destinationServiceId: envelope.destinationServiceId, // This field is only used for aging items out of the cache. The original // envelope's timestamp will be used when retrying this item. timestamp: envelope.receivedAtDate, attempts: 0, - envelope: Bytes.toBase64(plaintext), + isEncrypted: true, + content: envelope.content, messageAgeSec: envelope.messageAgeSec, receivedAtCounter: envelope.receivedAtCounter, + serverGuid: envelope.serverGuid, + serverTimestamp: envelope.serverTimestamp, urgent: envelope.urgent, story: envelope.story, - reportingToken: envelope.reportingToken - ? Bytes.toBase64(envelope.reportingToken) - : undefined, + updatedPni: envelope.updatedPni, + reportingToken: envelope.reportingToken, + groupId: envelope.groupId, }; this.#decryptAndCacheBatcher.add({ request, diff --git a/ts/textsecure/Types.d.ts b/ts/textsecure/Types.d.ts index fa039f66159..0f4c6db61c6 100644 --- a/ts/textsecure/Types.d.ts +++ b/ts/textsecure/Types.d.ts @@ -24,7 +24,6 @@ export { SignedPreKeyIdType, SignedPreKeyType, UnprocessedType, - UnprocessedUpdateType, } from '../sql/Interface'; export type StorageServiceCallOptionsType = { @@ -87,19 +86,20 @@ export type ProcessedEnvelope = Readonly<{ // Mostly from Proto.Envelope except for null/undefined type: Proto.Envelope.Type; - source?: string; - sourceServiceId?: ServiceIdString; - sourceDevice?: number; + source: string | undefined; + sourceServiceId: ServiceIdString | undefined; + sourceDevice: number | Undefined; destinationServiceId: ServiceIdString; - updatedPni?: PniString; + updatedPni: PniString | undefined; timestamp: number; - content?: Uint8Array; + content: Uint8Array | undefined; serverGuid: string; serverTimestamp: number; - groupId?: string; - urgent?: boolean; - story?: boolean; - reportingToken?: Uint8Array; + groupId: string | undefined; + urgent: boolean; + story: boolean; + reportingToken: Uint8Array | undefined; + groupId: string | undefined; }>; export type ProcessedAttachment = { diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index 289cd8cca14..6473126928a 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -156,10 +156,10 @@ export class SuccessfulDecryptEvent extends ConfirmableEvent { } export type DecryptionErrorEventData = Readonly<{ - cipherTextBytes?: Uint8Array; - cipherTextType?: number; - contentHint?: number; - groupId?: string; + cipherTextBytes: Uint8Array | undefined; + cipherTextType: number | undefined; + contentHint: number | undefined; + groupId: string | undefined; receivedAtCounter: number; receivedAtDate: number; senderDevice: number; @@ -211,7 +211,7 @@ export type SentEventData = Readonly<{ destination?: string; destinationServiceId?: ServiceIdString; timestamp?: number; - serverTimestamp?: number; + serverTimestamp: number; device: number | undefined; unidentifiedStatus: ProcessedSent['unidentifiedStatus']; message: ProcessedDataMessage; @@ -254,8 +254,8 @@ export type MessageEventData = Readonly<{ sourceDevice?: number; destinationServiceId: ServiceIdString; timestamp: number; - serverGuid?: string; - serverTimestamp?: number; + serverGuid: string; + serverTimestamp: number; unidentifiedDeliveryReceived: boolean; message: ProcessedDataMessage; receivedAtCounter: number;