Migrate unprocessed table to BLOBs

This commit is contained in:
Fedor Indutny 2025-01-21 13:42:14 -08:00 committed by GitHub
parent 06aa2f6ce4
commit 4b6ef3a1ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 492 additions and 253 deletions

View file

@ -5,7 +5,7 @@
import { isBoolean, isNumber, isString, noop, omit } from 'lodash';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { v7 as getGuid } from 'uuid';
import type {
SealedSenderDecryptionResult,
@ -417,6 +417,7 @@ export default class MessageReceiver
// Proto.Envelope fields
type: decoded.type ?? Proto.Envelope.Type.UNKNOWN,
source: undefined,
sourceServiceId: decoded.sourceServiceId
? normalizeServiceId(
decoded.sourceServiceId,
@ -443,16 +444,17 @@ export default class MessageReceiver
serverTimestamp,
urgent: isBoolean(decoded.urgent) ? decoded.urgent : true,
story: decoded.story ?? false,
reportingToken: decoded.reportingToken?.length
reportingToken: Bytes.isNotEmpty(decoded.reportingToken)
? decoded.reportingToken
: undefined,
groupId: undefined,
};
// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
this.#decryptAndCache(envelope, plaintext, request);
this.#decryptAndCache(envelope, request);
this.#processedCount += 1;
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
@ -865,20 +867,6 @@ export default class MessageReceiver
async #queueCached(item: UnprocessedType): Promise<void> {
log.info('MessageReceiver.queueCached', item.id);
try {
let envelopePlaintext: Uint8Array;
if (item.envelope && item.version === 2) {
envelopePlaintext = Bytes.fromBase64(item.envelope);
} else if (item.envelope && typeof item.envelope === 'string') {
envelopePlaintext = Bytes.fromBinary(item.envelope);
} else {
throw new Error(
'MessageReceiver.queueCached: item.envelope was malformed'
);
}
const decoded = Proto.Envelope.decode(envelopePlaintext);
const ourAci = this.#storage.user.getCheckedAci();
const envelope: ProcessedEnvelope = {
@ -886,50 +874,37 @@ export default class MessageReceiver
receivedAtCounter: item.receivedAtCounter ?? item.timestamp,
receivedAtDate:
item.receivedAtCounter == null ? Date.now() : item.timestamp,
messageAgeSec: item.messageAgeSec || 0,
messageAgeSec: item.messageAgeSec,
// Proto.Envelope fields
type: decoded.type ?? Proto.Envelope.Type.UNKNOWN,
type: item.type,
source: item.source,
sourceServiceId: normalizeServiceId(
item.sourceServiceId || decoded.sourceServiceId,
item.sourceServiceId,
'CachedEnvelope.sourceServiceId'
),
sourceDevice: decoded.sourceDevice || item.sourceDevice,
sourceDevice: item.sourceDevice,
destinationServiceId: normalizeServiceId(
decoded.destinationServiceId || item.destinationServiceId || ourAci,
item.destinationServiceId || ourAci,
'CachedEnvelope.destinationServiceId'
),
updatedPni: isUntaggedPniString(decoded.updatedPni)
updatedPni: isUntaggedPniString(item.updatedPni)
? normalizePni(
toTaggedPni(decoded.updatedPni),
toTaggedPni(item.updatedPni),
'CachedEnvelope.updatedPni'
)
: undefined,
timestamp: decoded.timestamp?.toNumber() ?? 0,
content: dropNull(decoded.content),
serverGuid: decoded.serverGuid ?? getGuid(),
serverTimestamp:
item.serverTimestamp || decoded.serverTimestamp?.toNumber() || 0,
timestamp: item.timestamp,
content: item.isEncrypted ? item.content : undefined,
serverGuid: item.serverGuid,
serverTimestamp: item.serverTimestamp,
urgent: isBoolean(item.urgent) ? item.urgent : true,
story: Boolean(item.story),
reportingToken: item.reportingToken
? Bytes.fromBase64(item.reportingToken)
: undefined,
reportingToken: item.reportingToken,
groupId: item.groupId,
};
const { decrypted } = item;
if (decrypted) {
let payloadPlaintext: Uint8Array;
if (item.version === 2) {
payloadPlaintext = Bytes.fromBase64(decrypted);
} else if (typeof decrypted === 'string') {
payloadPlaintext = Bytes.fromBinary(decrypted);
} else {
throw new Error('Cached decrypted value was not a string!');
}
if (!item.isEncrypted) {
strictAssert(
envelope.sourceServiceId,
'Decrypted envelope must have source uuid'
@ -960,7 +935,7 @@ export default class MessageReceiver
async () => {
void this.#queueDecryptedEnvelope(
decryptedEnvelope,
payloadPlaintext
item.content
);
},
`queueDecryptedEnvelope(${getEnvelopeId(decryptedEnvelope)})`,
@ -1129,7 +1104,7 @@ export default class MessageReceiver
updatedPni: envelope.updatedPni,
serverGuid: envelope.serverGuid,
serverTimestamp: envelope.serverTimestamp,
decrypted: Bytes.toBase64(plaintext),
decrypted: plaintext,
};
}
);
@ -1194,27 +1169,35 @@ export default class MessageReceiver
#decryptAndCache(
envelope: ProcessedEnvelope,
plaintext: Uint8Array,
request: IncomingWebSocketRequest
): void {
strictAssert(envelope.content, 'Content is required for envelopes');
const { id } = envelope;
const data: UnprocessedType = {
id,
version: 2,
type: envelope.type,
source: envelope.source,
sourceServiceId: envelope.sourceServiceId,
sourceDevice: envelope.sourceDevice,
destinationServiceId: envelope.destinationServiceId,
// This field is only used for aging items out of the cache. The original
// envelope's timestamp will be used when retrying this item.
timestamp: envelope.receivedAtDate,
attempts: 0,
envelope: Bytes.toBase64(plaintext),
isEncrypted: true,
content: envelope.content,
messageAgeSec: envelope.messageAgeSec,
receivedAtCounter: envelope.receivedAtCounter,
serverGuid: envelope.serverGuid,
serverTimestamp: envelope.serverTimestamp,
urgent: envelope.urgent,
story: envelope.story,
reportingToken: envelope.reportingToken
? Bytes.toBase64(envelope.reportingToken)
: undefined,
updatedPni: envelope.updatedPni,
reportingToken: envelope.reportingToken,
groupId: envelope.groupId,
};
this.#decryptAndCacheBatcher.add({
request,