signal-desktop/ts/sql/migrations/1280-blob-unprocessed.ts
2025-01-21 13:42:14 -08:00

176 lines
4.8 KiB
TypeScript

// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { v7 as getGuid } from 'uuid';
import type { LoggerType } from '../../types/Logging';
import {
normalizePni,
normalizeServiceId,
toTaggedPni,
isUntaggedPniString,
} from '../../types/ServiceId';
import { SignalService as Proto } from '../../protobuf';
import { sql } from '../util';
import type { WritableDB } from '../Interface';
import { getOurUuid } from './41-uuid-keys';
export const version = 1280;
export function updateToSchemaVersion1280(
currentVersion: number,
db: WritableDB,
logger: LoggerType
): void {
if (currentVersion >= 1280) {
return;
}
db.transaction(() => {
const ourAci = getOurUuid(db);
let rows = db.prepare('SELECT * FROM unprocessed').all();
const [query] = sql`
DROP TABLE unprocessed;
CREATE TABLE unprocessed(
id TEXT NOT NULL PRIMARY KEY ASC,
type INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
attempts INTEGER NOT NULL,
receivedAtCounter INTEGER NOT NULL,
urgent INTEGER NOT NULL,
story INTEGER NOT NULL,
serverGuid TEXT NOT NULL,
serverTimestamp INTEGER NOT NULL,
isEncrypted INTEGER NOT NULL,
content BLOB NOT NULL,
messageAgeSec INTEGER NOT NULL,
destinationServiceId TEXT NOT NULL,
-- Not present for 1:1 messages and not sealed messages
groupId TEXT,
-- Not present for sealed envelopes
reportingToken BLOB,
source TEXT,
sourceServiceId TEXT,
sourceDevice TEXT,
-- Present only for PNP change number
updatedPni TEXT
) STRICT;
CREATE INDEX unprocessed_timestamp ON unprocessed
(timestamp);
CREATE INDEX unprocessed_byReceivedAtCounter ON unprocessed
(receivedAtCounter);
`;
db.exec(query);
const insertStmt = db.prepare(`
INSERT INTO unprocessed
(id, type, timestamp, attempts, receivedAtCounter, urgent, story,
serverGuid, serverTimestamp, isEncrypted, content, source,
messageAgeSec, sourceServiceId, sourceDevice,
destinationServiceId, reportingToken)
VALUES
($id, $type, $timestamp, $attempts, $receivedAtCounter, $urgent, $story,
$serverGuid, $serverTimestamp, $isEncrypted, $content, $source,
$messageAgeSec, $sourceServiceId, $sourceDevice,
$destinationServiceId, $reportingToken);
`);
let oldEnvelopes = 0;
if (!ourAci) {
if (rows.length) {
logger.warn(
`updateToSchemaVersion1280: no aci, dropping ${rows.length} envelopes`
);
rows = [];
}
}
for (const row of rows) {
const {
id,
envelope,
decrypted,
timestamp,
attempts,
version: envelopeVersion,
receivedAtCounter,
urgent,
story,
serverGuid,
serverTimestamp,
...rest
} = row;
// Skip old and/or invalid rows
if (envelopeVersion !== 2 || !envelope) {
oldEnvelopes += 1;
continue;
}
try {
const decoded = Proto.Envelope.decode(Buffer.from(envelope, 'base64'));
if (!decoded.content) {
throw new Error('Missing envelope content');
}
const content = decrypted
? Buffer.from(decrypted, 'base64')
: decoded.content;
insertStmt.run({
...rest,
id,
type: decoded.type ?? Proto.Envelope.Type.UNKNOWN,
content,
isEncrypted: decrypted ? 0 : 1,
timestamp: timestamp || Date.now(),
attempts: attempts || 0,
receivedAtCounter: receivedAtCounter || 0,
urgent: urgent ? 1 : 0,
story: story ? 1 : 0,
serverGuid: serverGuid || getGuid(),
serverTimestamp: serverTimestamp || 0,
destinationServiceId: normalizeServiceId(
decoded.destinationServiceId || ourAci,
'Envelope.destinationServiceId'
),
updatedPni: isUntaggedPniString(decoded.updatedPni)
? normalizePni(
toTaggedPni(decoded.updatedPni),
'Envelope.updatedPni'
)
: undefined,
// Sadly not captured previously
messageAgeSec: 0,
reportingToken: decoded.reportingToken?.length
? decoded.reportingToken
: null,
});
} catch (error) {
logger.warn(
'updateToSchemaVersion1280: failed to migrate unprocessed',
id,
error
);
}
}
if (oldEnvelopes !== 0) {
logger.warn(
`updateToSchemaVersion1280: dropped ${oldEnvelopes} envelopes`
);
}
db.pragma('user_version = 1280');
})();
logger.info('updateToSchemaVersion1280: success!');
}