Introduce logging for unexpected incoming urgent values
This commit is contained in:
parent
f92be05b15
commit
5fcf97b43b
8 changed files with 278 additions and 83 deletions
|
@ -208,6 +208,7 @@ export type UnprocessedType = {
|
|||
serverGuid?: string;
|
||||
serverTimestamp?: number;
|
||||
decrypted?: string;
|
||||
urgent?: boolean;
|
||||
};
|
||||
|
||||
export type UnprocessedUpdateType = {
|
||||
|
|
|
@ -15,6 +15,7 @@ import {
|
|||
forEach,
|
||||
fromPairs,
|
||||
groupBy,
|
||||
isBoolean,
|
||||
isNil,
|
||||
isNumber,
|
||||
isString,
|
||||
|
@ -3085,6 +3086,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
serverGuid,
|
||||
serverTimestamp,
|
||||
decrypted,
|
||||
urgent,
|
||||
} = data;
|
||||
if (!id) {
|
||||
throw new Error('saveUnprocessedSync: id was falsey');
|
||||
|
@ -3110,7 +3112,8 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
sourceDevice,
|
||||
serverGuid,
|
||||
serverTimestamp,
|
||||
decrypted
|
||||
decrypted,
|
||||
urgent
|
||||
) values (
|
||||
$id,
|
||||
$timestamp,
|
||||
|
@ -3123,7 +3126,8 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
$sourceDevice,
|
||||
$serverGuid,
|
||||
$serverTimestamp,
|
||||
$decrypted
|
||||
$decrypted,
|
||||
$urgent
|
||||
);
|
||||
`
|
||||
).run({
|
||||
|
@ -3139,6 +3143,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
serverGuid: serverGuid || null,
|
||||
serverTimestamp: serverTimestamp || null,
|
||||
decrypted: decrypted || null,
|
||||
urgent: urgent || !isBoolean(urgent) ? 1 : 0,
|
||||
});
|
||||
|
||||
return id;
|
||||
|
@ -3210,7 +3215,10 @@ async function getUnprocessedById(
|
|||
id,
|
||||
});
|
||||
|
||||
return row;
|
||||
return {
|
||||
...row,
|
||||
urgent: isNumber(row.urgent) ? Boolean(row.urgent) : true,
|
||||
};
|
||||
}
|
||||
|
||||
async function getUnprocessedCount(): Promise<number> {
|
||||
|
@ -3267,7 +3275,11 @@ async function getAllUnprocessedAndIncrementAttempts(): Promise<
|
|||
ORDER BY receivedAtCounter ASC;
|
||||
`
|
||||
)
|
||||
.all();
|
||||
.all()
|
||||
.map(row => ({
|
||||
...row,
|
||||
urgent: isNumber(row.urgent) ? Boolean(row.urgent) : true,
|
||||
}));
|
||||
})();
|
||||
}
|
||||
|
||||
|
|
28
ts/sql/migrations/63-add-urgent-to-unprocessed.ts
Normal file
28
ts/sql/migrations/63-add-urgent-to-unprocessed.ts
Normal file
|
@ -0,0 +1,28 @@
|
|||
// Copyright 2021-2022 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { Database } from 'better-sqlite3';
|
||||
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
|
||||
export default function updateToSchemaVersion63(
|
||||
currentVersion: number,
|
||||
db: Database,
|
||||
logger: LoggerType
|
||||
): void {
|
||||
if (currentVersion >= 63) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.transaction(() => {
|
||||
db.exec(
|
||||
`
|
||||
ALTER TABLE unprocessed ADD COLUMN urgent INTEGER;
|
||||
`
|
||||
);
|
||||
|
||||
db.pragma('user_version = 63');
|
||||
})();
|
||||
|
||||
logger.info('updateToSchemaVersion63: success!');
|
||||
}
|
|
@ -38,6 +38,7 @@ import updateToSchemaVersion59 from './59-unprocessed-received-at-counter-index'
|
|||
import updateToSchemaVersion60 from './60-update-expiring-index';
|
||||
import updateToSchemaVersion61 from './61-distribution-list-storage';
|
||||
import updateToSchemaVersion62 from './62-add-urgent-to-send-log';
|
||||
import updateToSchemaVersion63 from './63-add-urgent-to-unprocessed';
|
||||
|
||||
function updateToSchemaVersion1(
|
||||
currentVersion: number,
|
||||
|
@ -1939,6 +1940,7 @@ export const SCHEMA_VERSIONS = [
|
|||
updateToSchemaVersion60,
|
||||
updateToSchemaVersion61,
|
||||
updateToSchemaVersion62,
|
||||
updateToSchemaVersion63,
|
||||
];
|
||||
|
||||
export function updateSchema(db: Database, logger: LoggerType): void {
|
||||
|
|
|
@ -1480,11 +1480,13 @@ describe('SignalProtocolStore', () => {
|
|||
await store.addUnprocessed(
|
||||
{
|
||||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: Date.now() + 2,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'second',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: Date.now() + 2,
|
||||
urgent: true,
|
||||
},
|
||||
{ zone }
|
||||
);
|
||||
|
@ -1536,11 +1538,13 @@ describe('SignalProtocolStore', () => {
|
|||
await store.addUnprocessed(
|
||||
{
|
||||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: 2,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'second',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: 2,
|
||||
urgent: true,
|
||||
},
|
||||
{ zone }
|
||||
);
|
||||
|
@ -1656,35 +1660,43 @@ describe('SignalProtocolStore', () => {
|
|||
await Promise.all([
|
||||
store.addUnprocessed({
|
||||
id: '0-dropped',
|
||||
envelope: 'old envelope',
|
||||
timestamp: NOW - 2 * durations.MONTH,
|
||||
receivedAtCounter: -1,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'old envelope',
|
||||
receivedAtCounter: -1,
|
||||
timestamp: NOW - 2 * durations.MONTH,
|
||||
urgent: true,
|
||||
}),
|
||||
store.addUnprocessed({
|
||||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: NOW + 2,
|
||||
receivedAtCounter: 1,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'second',
|
||||
receivedAtCounter: 1,
|
||||
timestamp: NOW + 2,
|
||||
urgent: true,
|
||||
}),
|
||||
store.addUnprocessed({
|
||||
id: '3-three',
|
||||
envelope: 'third',
|
||||
timestamp: NOW + 3,
|
||||
receivedAtCounter: 2,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'third',
|
||||
receivedAtCounter: 2,
|
||||
timestamp: NOW + 3,
|
||||
urgent: true,
|
||||
}),
|
||||
store.addUnprocessed({
|
||||
id: '1-one',
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'first',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: NOW + 1,
|
||||
urgent: true,
|
||||
}),
|
||||
]);
|
||||
|
||||
|
@ -1702,11 +1714,13 @@ describe('SignalProtocolStore', () => {
|
|||
const id = '1-one';
|
||||
await store.addUnprocessed({
|
||||
id,
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'first',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: NOW + 1,
|
||||
urgent: false,
|
||||
});
|
||||
await store.updateUnprocessedWithData(id, { decrypted: 'updated' });
|
||||
|
||||
|
@ -1715,17 +1729,20 @@ describe('SignalProtocolStore', () => {
|
|||
assert.strictEqual(items[0].decrypted, 'updated');
|
||||
assert.strictEqual(items[0].timestamp, NOW + 1);
|
||||
assert.strictEqual(items[0].attempts, 1);
|
||||
assert.strictEqual(items[0].urgent, false);
|
||||
});
|
||||
|
||||
it('removeUnprocessed successfully deletes item', async () => {
|
||||
const id = '1-one';
|
||||
await store.addUnprocessed({
|
||||
id,
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 0,
|
||||
envelope: 'first',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: NOW + 1,
|
||||
urgent: true,
|
||||
});
|
||||
await store.removeUnprocessed(id);
|
||||
|
||||
|
@ -1736,11 +1753,13 @@ describe('SignalProtocolStore', () => {
|
|||
it('getAllUnprocessedAndIncrementAttempts deletes items', async () => {
|
||||
await store.addUnprocessed({
|
||||
id: '1-one',
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
|
||||
attempts: 3,
|
||||
envelope: 'first',
|
||||
receivedAtCounter: 0,
|
||||
timestamp: NOW + 1,
|
||||
urgent: true,
|
||||
});
|
||||
|
||||
const items = await store.getAllUnprocessedAndIncrementAttempts();
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
/* eslint-disable no-bitwise */
|
||||
|
||||
import { isNumber } from 'lodash';
|
||||
import { isBoolean, isNumber } from 'lodash';
|
||||
import PQueue from 'p-queue';
|
||||
import { v4 as getGuid } from 'uuid';
|
||||
|
||||
|
@ -115,6 +115,7 @@ import * as durations from '../util/durations';
|
|||
import { areArraysMatchingSets } from '../util/areArraysMatchingSets';
|
||||
import { generateBlurHash } from '../util/generateBlurHash';
|
||||
import { APPLICATION_OCTET_STREAM } from '../types/MIME';
|
||||
import type { SendTypesType } from '../util/handleMessageSend';
|
||||
|
||||
const GROUPV1_ID_LENGTH = 16;
|
||||
const GROUPV2_ID_LENGTH = 32;
|
||||
|
@ -166,6 +167,62 @@ export type MessageReceiverOptions = {
|
|||
serverTrustRoot: string;
|
||||
};
|
||||
|
||||
const LOG_UNEXPECTED_URGENT_VALUES = false;
|
||||
const MUST_BE_URGENT_TYPES: Array<SendTypesType> = [
|
||||
'message',
|
||||
'deleteForEveryone',
|
||||
'reaction',
|
||||
'readSync',
|
||||
];
|
||||
const CAN_BE_URGENT_TYPES: Array<SendTypesType> = [
|
||||
'callingMessage',
|
||||
'senderKeyDistributionMessage',
|
||||
|
||||
// Deprecated
|
||||
'resetSession',
|
||||
'legacyGroupChange',
|
||||
];
|
||||
|
||||
function logUnexpectedUrgentValue(
|
||||
envelope: ProcessedEnvelope,
|
||||
type: SendTypesType
|
||||
) {
|
||||
if (!LOG_UNEXPECTED_URGENT_VALUES) {
|
||||
return;
|
||||
}
|
||||
|
||||
const mustBeUrgent = MUST_BE_URGENT_TYPES.includes(type);
|
||||
const canBeUrgent = mustBeUrgent || CAN_BE_URGENT_TYPES.includes(type);
|
||||
|
||||
if (envelope.urgent && !canBeUrgent) {
|
||||
const envelopeId = getEnvelopeId(envelope);
|
||||
log.warn(
|
||||
`${envelopeId}: Message of type '${type}' was marked urgent, but shouldn't be!`
|
||||
);
|
||||
}
|
||||
if (!envelope.urgent && mustBeUrgent) {
|
||||
const envelopeId = getEnvelopeId(envelope);
|
||||
log.warn(
|
||||
`${envelopeId}: Message of type '${type}' wasn't marked urgent, but should be!`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function getEnvelopeId(envelope: ProcessedEnvelope): string {
|
||||
const { timestamp } = envelope;
|
||||
|
||||
let prefix = '';
|
||||
|
||||
if (envelope.sourceUuid || envelope.source) {
|
||||
const sender = envelope.sourceUuid || envelope.source;
|
||||
prefix += `${sender}.${envelope.sourceDevice} `;
|
||||
}
|
||||
|
||||
prefix += `> ${envelope.destinationUuid.toString()}`;
|
||||
|
||||
return `${prefix} ${timestamp} (${envelope.id})`;
|
||||
}
|
||||
|
||||
export default class MessageReceiver
|
||||
extends EventTarget
|
||||
implements IRequestHandler
|
||||
|
@ -322,6 +379,7 @@ export default class MessageReceiver
|
|||
content: dropNull(decoded.content),
|
||||
serverGuid: decoded.serverGuid,
|
||||
serverTimestamp,
|
||||
urgent: isBoolean(decoded.urgent) ? decoded.urgent : true,
|
||||
};
|
||||
|
||||
// After this point, decoding errors are not the server's
|
||||
|
@ -711,6 +769,7 @@ export default class MessageReceiver
|
|||
serverGuid: decoded.serverGuid,
|
||||
serverTimestamp:
|
||||
item.serverTimestamp || decoded.serverTimestamp?.toNumber(),
|
||||
urgent: isBoolean(item.urgent) ? item.urgent : true,
|
||||
};
|
||||
|
||||
const { decrypted } = item;
|
||||
|
@ -758,21 +817,6 @@ export default class MessageReceiver
|
|||
}
|
||||
}
|
||||
|
||||
private getEnvelopeId(envelope: ProcessedEnvelope): string {
|
||||
const { timestamp } = envelope;
|
||||
|
||||
let prefix = '';
|
||||
|
||||
if (envelope.sourceUuid || envelope.source) {
|
||||
const sender = envelope.sourceUuid || envelope.source;
|
||||
prefix += `${sender}.${envelope.sourceDevice} `;
|
||||
}
|
||||
|
||||
prefix += `> ${envelope.destinationUuid.toString()}`;
|
||||
|
||||
return `${prefix} ${timestamp} (${envelope.id})`;
|
||||
}
|
||||
|
||||
private clearRetryTimeout(): void {
|
||||
clearTimeoutIfNecessary(this.retryCachedTimeout);
|
||||
this.retryCachedTimeout = undefined;
|
||||
|
@ -855,7 +899,7 @@ export default class MessageReceiver
|
|||
if (uuidKind === UUIDKind.Unknown) {
|
||||
log.warn(
|
||||
'MessageReceiver.decryptAndCacheBatch: ' +
|
||||
`Rejecting envelope ${this.getEnvelopeId(envelope)}, ` +
|
||||
`Rejecting envelope ${getEnvelopeId(envelope)}, ` +
|
||||
`unknown uuid: ${destinationUuid}`
|
||||
);
|
||||
return;
|
||||
|
@ -984,11 +1028,13 @@ export default class MessageReceiver
|
|||
const data: UnprocessedType = {
|
||||
id,
|
||||
version: 2,
|
||||
|
||||
attempts: 1,
|
||||
envelope: Bytes.toBase64(plaintext),
|
||||
messageAgeSec: envelope.messageAgeSec,
|
||||
receivedAtCounter: envelope.receivedAtCounter,
|
||||
timestamp: envelope.timestamp,
|
||||
attempts: 1,
|
||||
messageAgeSec: envelope.messageAgeSec,
|
||||
urgent: envelope.urgent,
|
||||
};
|
||||
this.decryptAndCacheBatcher.add({
|
||||
request,
|
||||
|
@ -1010,7 +1056,7 @@ export default class MessageReceiver
|
|||
envelope: UnsealedEnvelope,
|
||||
plaintext: Uint8Array
|
||||
): Promise<void> {
|
||||
const id = this.getEnvelopeId(envelope);
|
||||
const id = getEnvelopeId(envelope);
|
||||
log.info('queueing decrypted envelope', id);
|
||||
|
||||
const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
|
||||
|
@ -1038,7 +1084,7 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
uuidKind: UUIDKind
|
||||
): Promise<DecryptResult> {
|
||||
let logId = this.getEnvelopeId(envelope);
|
||||
let logId = getEnvelopeId(envelope);
|
||||
log.info(`queueing ${uuidKind} envelope`, logId);
|
||||
|
||||
const task = async (): Promise<DecryptResult> => {
|
||||
|
@ -1053,7 +1099,7 @@ export default class MessageReceiver
|
|||
return { plaintext: undefined, envelope };
|
||||
}
|
||||
|
||||
logId = this.getEnvelopeId(unsealedEnvelope);
|
||||
logId = getEnvelopeId(unsealedEnvelope);
|
||||
|
||||
this.addToQueue(
|
||||
async () => this.dispatchEvent(new EnvelopeEvent(unsealedEnvelope)),
|
||||
|
@ -1128,7 +1174,7 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
uuidKind: UUIDKind
|
||||
): Promise<UnsealedEnvelope | undefined> {
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
|
||||
if (this.stoppingProcessing) {
|
||||
log.warn(`MessageReceiver.unsealEnvelope(${logId}): dropping`);
|
||||
|
@ -1202,7 +1248,7 @@ export default class MessageReceiver
|
|||
envelope: UnsealedEnvelope,
|
||||
uuidKind: UUIDKind
|
||||
): Promise<DecryptResult> {
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
|
||||
if (this.stoppingProcessing) {
|
||||
log.warn(`MessageReceiver.decryptEnvelope(${logId}): dropping unsealed`);
|
||||
|
@ -1325,7 +1371,7 @@ export default class MessageReceiver
|
|||
);
|
||||
}
|
||||
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
|
||||
if (envelope.serverTimestamp > certificate.expiration()) {
|
||||
throw new Error(
|
||||
|
@ -1338,6 +1384,8 @@ export default class MessageReceiver
|
|||
}
|
||||
|
||||
private async onDeliveryReceipt(envelope: ProcessedEnvelope): Promise<void> {
|
||||
logUnexpectedUrgentValue(envelope, 'deliveryReceipt');
|
||||
|
||||
await this.dispatchAndWait(
|
||||
new DeliveryEvent(
|
||||
{
|
||||
|
@ -1377,7 +1425,7 @@ export default class MessageReceiver
|
|||
'MessageReceiver.decryptSealedSender: localDeviceId'
|
||||
);
|
||||
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
|
||||
const { unsealedContent: messageContent, certificate } = envelope;
|
||||
strictAssert(
|
||||
|
@ -1489,7 +1537,7 @@ export default class MessageReceiver
|
|||
): Promise<Uint8Array | undefined> {
|
||||
const { sessionStore, identityKeyStore, zone } = stores;
|
||||
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
const envelopeTypeEnum = Proto.Envelope.Type;
|
||||
|
||||
const identifier = envelope.sourceUuid;
|
||||
|
@ -1692,7 +1740,7 @@ export default class MessageReceiver
|
|||
TaskType.Decrypted
|
||||
);
|
||||
} else {
|
||||
const envelopeId = this.getEnvelopeId(envelope);
|
||||
const envelopeId = getEnvelopeId(envelope);
|
||||
this.removeFromCache(envelope);
|
||||
log.error(
|
||||
`MessageReceiver.decrypt: Envelope ${envelopeId} missing uuid or deviceId`
|
||||
|
@ -1707,7 +1755,10 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
sentContainer: ProcessedSent
|
||||
) {
|
||||
log.info('MessageReceiver.handleSentMessage', this.getEnvelopeId(envelope));
|
||||
log.info('MessageReceiver.handleSentMessage', getEnvelopeId(envelope));
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'sentSync');
|
||||
|
||||
const {
|
||||
destination,
|
||||
destinationUuid,
|
||||
|
@ -1725,11 +1776,11 @@ export default class MessageReceiver
|
|||
let p: Promise<void> = Promise.resolve();
|
||||
if (msg.flags && msg.flags & Proto.DataMessage.Flags.END_SESSION) {
|
||||
if (destinationUuid) {
|
||||
p = this.handleEndSession(new UUID(destinationUuid));
|
||||
p = this.handleEndSession(envelope, new UUID(destinationUuid));
|
||||
} else if (destination) {
|
||||
const theirUuid = UUID.lookup(destination);
|
||||
if (theirUuid) {
|
||||
p = this.handleEndSession(theirUuid);
|
||||
p = this.handleEndSession(envelope, theirUuid);
|
||||
} else {
|
||||
log.warn(`handleSentMessage: uuid not found for ${destination}`);
|
||||
p = Promise.resolve();
|
||||
|
@ -1759,9 +1810,7 @@ export default class MessageReceiver
|
|||
|
||||
if (groupId && isBlocked && !(isMe && isLeavingGroup)) {
|
||||
log.warn(
|
||||
`Message ${this.getEnvelopeId(
|
||||
envelope
|
||||
)} ignored; destined for blocked group`
|
||||
`Message ${getEnvelopeId(envelope)} ignored; destined for blocked group`
|
||||
);
|
||||
this.removeFromCache(envelope);
|
||||
return undefined;
|
||||
|
@ -1791,7 +1840,7 @@ export default class MessageReceiver
|
|||
msg: Proto.IStoryMessage,
|
||||
sentMessage?: ProcessedSent
|
||||
): Promise<void> {
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
log.info('MessageReceiver.handleStoryMessage', logId);
|
||||
|
||||
const attachments: Array<ProcessedAttachment> = [];
|
||||
|
@ -1823,7 +1872,7 @@ export default class MessageReceiver
|
|||
const groupV2 = msg.group ? processGroupV2Context(msg.group) : undefined;
|
||||
if (groupV2 && this.isGroupBlocked(groupV2.id)) {
|
||||
log.warn(
|
||||
`MessageReceiver.handleStoryMessage: envelope ${this.getEnvelopeId(
|
||||
`MessageReceiver.handleStoryMessage: envelope ${getEnvelopeId(
|
||||
envelope
|
||||
)} ignored; destined for blocked group`
|
||||
);
|
||||
|
@ -1933,12 +1982,14 @@ export default class MessageReceiver
|
|||
envelope: UnsealedEnvelope,
|
||||
msg: Proto.IDataMessage
|
||||
): Promise<void> {
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
log.info('MessageReceiver.handleDataMessage', logId);
|
||||
|
||||
const isStoriesEnabled =
|
||||
isEnabled('desktop.stories') || isEnabled('desktop.internalUser');
|
||||
if (!isStoriesEnabled && msg.storyContext) {
|
||||
logUnexpectedUrgentValue(envelope, 'story');
|
||||
|
||||
log.info(
|
||||
`MessageReceiver.handleDataMessage/${logId}: Dropping incoming dataMessage with storyContext field`
|
||||
);
|
||||
|
@ -1962,7 +2013,7 @@ export default class MessageReceiver
|
|||
await this.checkGroupV1Data(msg);
|
||||
|
||||
if (msg.flags && msg.flags & Proto.DataMessage.Flags.END_SESSION) {
|
||||
p = this.handleEndSession(new UUID(destination));
|
||||
p = this.handleEndSession(envelope, new UUID(destination));
|
||||
}
|
||||
|
||||
if (msg.flags && msg.flags & Proto.DataMessage.Flags.PROFILE_KEY_UPDATE) {
|
||||
|
@ -1971,6 +2022,8 @@ export default class MessageReceiver
|
|||
'PROFILE_KEY_UPDATE without profileKey'
|
||||
);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'profileKeyUpdate');
|
||||
|
||||
const ev = new ProfileKeyUpdateEvent(
|
||||
{
|
||||
source: envelope.source,
|
||||
|
@ -1983,6 +2036,29 @@ export default class MessageReceiver
|
|||
}
|
||||
await p;
|
||||
|
||||
let type: SendTypesType = 'message';
|
||||
|
||||
if (msg.storyContext) {
|
||||
type = 'story';
|
||||
} else if (msg.body) {
|
||||
type = 'message';
|
||||
} else if (msg.reaction) {
|
||||
type = 'reaction';
|
||||
} else if (msg.delete) {
|
||||
type = 'deleteForEveryone';
|
||||
} else if (
|
||||
msg.flags &&
|
||||
msg.flags & Proto.DataMessage.Flags.EXPIRATION_TIMER_UPDATE
|
||||
) {
|
||||
type = 'expirationTimerUpdate';
|
||||
} else if (msg.group) {
|
||||
type = 'legacyGroupChange';
|
||||
}
|
||||
// Note: other data messages without any of these attributes will fall into the
|
||||
// 'message' bucket - like stickers, gift badges, etc.
|
||||
|
||||
logUnexpectedUrgentValue(envelope, type);
|
||||
|
||||
const message = await this.processDecrypted(envelope, msg);
|
||||
const groupId = this.getProcessedGroupId(message);
|
||||
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
|
||||
|
@ -2000,9 +2076,7 @@ export default class MessageReceiver
|
|||
|
||||
if (groupId && isBlocked && !(isMe && isLeavingGroup)) {
|
||||
log.warn(
|
||||
`Message ${this.getEnvelopeId(
|
||||
envelope
|
||||
)} ignored; destined for blocked group`
|
||||
`Message ${getEnvelopeId(envelope)} ignored; destined for blocked group`
|
||||
);
|
||||
this.removeFromCache(envelope);
|
||||
return undefined;
|
||||
|
@ -2136,7 +2210,7 @@ export default class MessageReceiver
|
|||
return;
|
||||
}
|
||||
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
log.info(
|
||||
`innerHandleContentMessage/${logId}: Dropping incoming message with storyMessage field`
|
||||
);
|
||||
|
@ -2155,9 +2229,11 @@ export default class MessageReceiver
|
|||
envelope: UnsealedEnvelope,
|
||||
decryptionError: Uint8Array
|
||||
) {
|
||||
const logId = this.getEnvelopeId(envelope);
|
||||
const logId = getEnvelopeId(envelope);
|
||||
log.info(`handleDecryptionError: ${logId}`);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'retryRequest');
|
||||
|
||||
const buffer = Buffer.from(decryptionError);
|
||||
const request = DecryptionErrorMessage.deserialize(buffer);
|
||||
|
||||
|
@ -2187,9 +2263,11 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
distributionMessage: Uint8Array
|
||||
): Promise<void> {
|
||||
const envelopeId = this.getEnvelopeId(envelope);
|
||||
const envelopeId = getEnvelopeId(envelope);
|
||||
log.info(`handleSenderKeyDistributionMessage/${envelopeId}`);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'senderKeyDistributionMessage');
|
||||
|
||||
// Note: we don't call removeFromCache here because this message can be combined
|
||||
// with a dataMessage, for example. That processing will dictate cache removal.
|
||||
|
||||
|
@ -2233,6 +2311,8 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
callingMessage: Proto.ICallingMessage
|
||||
): Promise<void> {
|
||||
logUnexpectedUrgentValue(envelope, 'callingMessage');
|
||||
|
||||
this.removeFromCache(envelope);
|
||||
await window.Signal.Services.calling.handleCallingMessage(
|
||||
envelope,
|
||||
|
@ -2247,15 +2327,19 @@ export default class MessageReceiver
|
|||
strictAssert(receiptMessage.timestamp, 'Receipt message without timestamp');
|
||||
|
||||
let EventClass: typeof DeliveryEvent | typeof ReadEvent | typeof ViewEvent;
|
||||
let type: SendTypesType;
|
||||
switch (receiptMessage.type) {
|
||||
case Proto.ReceiptMessage.Type.DELIVERY:
|
||||
EventClass = DeliveryEvent;
|
||||
type = 'deliveryReceipt';
|
||||
break;
|
||||
case Proto.ReceiptMessage.Type.READ:
|
||||
EventClass = ReadEvent;
|
||||
type = 'readReceipt';
|
||||
break;
|
||||
case Proto.ReceiptMessage.Type.VIEWED:
|
||||
EventClass = ViewEvent;
|
||||
type = 'viewedReceipt';
|
||||
break;
|
||||
default:
|
||||
// This can happen if we get a receipt type we don't know about yet, which
|
||||
|
@ -2263,6 +2347,8 @@ export default class MessageReceiver
|
|||
return;
|
||||
}
|
||||
|
||||
logUnexpectedUrgentValue(envelope, type);
|
||||
|
||||
await Promise.all(
|
||||
receiptMessage.timestamp.map(async rawTimestamp => {
|
||||
const ev = new EventClass(
|
||||
|
@ -2286,6 +2372,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
this.removeFromCache(envelope);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'typing');
|
||||
|
||||
if (envelope.timestamp && typingMessage.timestamp) {
|
||||
const envelopeTimestamp = envelope.timestamp;
|
||||
const typingTimestamp = typingMessage.timestamp?.toNumber();
|
||||
|
@ -2337,7 +2425,10 @@ export default class MessageReceiver
|
|||
}
|
||||
|
||||
private handleNullMessage(envelope: ProcessedEnvelope): void {
|
||||
log.info('MessageReceiver.handleNullMessage', this.getEnvelopeId(envelope));
|
||||
log.info('MessageReceiver.handleNullMessage', getEnvelopeId(envelope));
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'nullMessage');
|
||||
|
||||
this.removeFromCache(envelope);
|
||||
}
|
||||
|
||||
|
@ -2355,7 +2446,7 @@ export default class MessageReceiver
|
|||
if (isInvalid) {
|
||||
log.info(
|
||||
'isInvalidGroupData: invalid GroupV1 message from',
|
||||
this.getEnvelopeId(envelope)
|
||||
getEnvelopeId(envelope)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -2370,7 +2461,7 @@ export default class MessageReceiver
|
|||
if (isInvalid) {
|
||||
log.info(
|
||||
'isInvalidGroupData: invalid GroupV2 message from',
|
||||
this.getEnvelopeId(envelope)
|
||||
getEnvelopeId(envelope)
|
||||
);
|
||||
}
|
||||
return isInvalid;
|
||||
|
@ -2499,7 +2590,7 @@ export default class MessageReceiver
|
|||
this.getDestination(sentMessage),
|
||||
sentMessage.timestamp?.toNumber(),
|
||||
'from',
|
||||
this.getEnvelopeId(envelope)
|
||||
getEnvelopeId(envelope)
|
||||
);
|
||||
return this.handleSentMessage(envelope, sentMessage);
|
||||
}
|
||||
|
@ -2563,7 +2654,7 @@ export default class MessageReceiver
|
|||
|
||||
this.removeFromCache(envelope);
|
||||
log.warn(
|
||||
`handleSyncMessage/${this.getEnvelopeId(envelope)}: Got empty SyncMessage`
|
||||
`handleSyncMessage/${getEnvelopeId(envelope)}: Got empty SyncMessage`
|
||||
);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
@ -2573,6 +2664,9 @@ export default class MessageReceiver
|
|||
configuration: Proto.SyncMessage.IConfiguration
|
||||
): Promise<void> {
|
||||
log.info('got configuration sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'configurationSync');
|
||||
|
||||
const ev = new ConfigurationEvent(
|
||||
configuration,
|
||||
this.removeFromCache.bind(this, envelope)
|
||||
|
@ -2586,6 +2680,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
log.info('got view once open sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'viewOnceSync');
|
||||
|
||||
const ev = new ViewOnceOpenSyncEvent(
|
||||
{
|
||||
source: dropNull(sync.sender),
|
||||
|
@ -2606,6 +2702,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
log.info('got message request response sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'messageRequestSync');
|
||||
|
||||
const { groupId } = sync;
|
||||
|
||||
let groupIdString: string | undefined;
|
||||
|
@ -2648,6 +2746,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
log.info('got fetch latest sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'fetchLatestManifestSync');
|
||||
|
||||
const ev = new FetchLatestEvent(
|
||||
sync.type,
|
||||
this.removeFromCache.bind(this, envelope)
|
||||
|
@ -2662,6 +2762,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
log.info('got keys sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'keySync');
|
||||
|
||||
if (!sync.storageService) {
|
||||
return undefined;
|
||||
}
|
||||
|
@ -2680,6 +2782,8 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
log.info('MessageReceiver: got pni identity sync message');
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'pniIdentitySync');
|
||||
|
||||
if (!publicKey || !privateKey) {
|
||||
log.warn('MessageReceiver: empty pni identity sync message');
|
||||
return undefined;
|
||||
|
@ -2706,6 +2810,7 @@ export default class MessageReceiver
|
|||
): Promise<void> {
|
||||
const ENUM = Proto.SyncMessage.StickerPackOperation.Type;
|
||||
log.info('got sticker pack operation sync message');
|
||||
logUnexpectedUrgentValue(envelope, 'stickerPackSync');
|
||||
|
||||
const stickerPacks = operations.map(operation => ({
|
||||
id: operation.packId ? Bytes.toHex(operation.packId) : undefined,
|
||||
|
@ -2726,7 +2831,10 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
read: Array<Proto.SyncMessage.IRead>
|
||||
): Promise<void> {
|
||||
log.info('MessageReceiver.handleRead', this.getEnvelopeId(envelope));
|
||||
log.info('MessageReceiver.handleRead', getEnvelopeId(envelope));
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'readSync');
|
||||
|
||||
const results = [];
|
||||
for (const { timestamp, sender, senderUuid } of read) {
|
||||
const ev = new ReadSyncEvent(
|
||||
|
@ -2749,7 +2857,10 @@ export default class MessageReceiver
|
|||
envelope: ProcessedEnvelope,
|
||||
viewed: ReadonlyArray<Proto.SyncMessage.IViewed>
|
||||
): Promise<void> {
|
||||
log.info('MessageReceiver.handleViewed', this.getEnvelopeId(envelope));
|
||||
log.info('MessageReceiver.handleViewed', getEnvelopeId(envelope));
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'viewSync');
|
||||
|
||||
await Promise.all(
|
||||
viewed.map(async ({ timestamp, senderE164, senderUuid }) => {
|
||||
const ev = new ViewSyncEvent(
|
||||
|
@ -2778,6 +2889,8 @@ export default class MessageReceiver
|
|||
throw new Error('MessageReceiver.handleContacts: blob field was missing');
|
||||
}
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'contactSync');
|
||||
|
||||
this.removeFromCache(envelope);
|
||||
|
||||
// Note: we do not return here because we don't want to block the next message on
|
||||
|
@ -2813,6 +2926,8 @@ export default class MessageReceiver
|
|||
|
||||
this.removeFromCache(envelope);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'groupSync');
|
||||
|
||||
if (!blob) {
|
||||
throw new Error('MessageReceiver.handleGroups: blob field was missing');
|
||||
}
|
||||
|
@ -2861,6 +2976,8 @@ export default class MessageReceiver
|
|||
const allIdentifiers = [];
|
||||
let changed = false;
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'blockSync');
|
||||
|
||||
if (blocked.numbers) {
|
||||
const previous = this.storage.get('blocked', []);
|
||||
|
||||
|
@ -2948,8 +3065,14 @@ export default class MessageReceiver
|
|||
return downloadAttachment(this.server, cleaned);
|
||||
}
|
||||
|
||||
private async handleEndSession(theirUuid: UUID): Promise<void> {
|
||||
private async handleEndSession(
|
||||
envelope: ProcessedEnvelope,
|
||||
theirUuid: UUID
|
||||
): Promise<void> {
|
||||
log.info(`handleEndSession: closing sessions for ${theirUuid.toString()}`);
|
||||
|
||||
logUnexpectedUrgentValue(envelope, 'resetSession');
|
||||
|
||||
await this.storage.protocol.archiveAllSessions(theirUuid);
|
||||
}
|
||||
|
||||
|
|
1
ts/textsecure/Types.d.ts
vendored
1
ts/textsecure/Types.d.ts
vendored
|
@ -92,6 +92,7 @@ export type ProcessedEnvelope = Readonly<{
|
|||
serverGuid: string;
|
||||
serverTimestamp: number;
|
||||
groupId?: string;
|
||||
urgent?: boolean;
|
||||
}>;
|
||||
|
||||
export type ProcessedAttachment = {
|
||||
|
|
|
@ -19,6 +19,7 @@ const { insertSentProto, updateConversation } = dataInterface;
|
|||
export const sendTypesEnum = z.enum([
|
||||
// Core user interactions, default urgent
|
||||
'message',
|
||||
'story', // non-urgent
|
||||
'callingMessage', // excluded from send log; only call-initiation messages are urgent
|
||||
'deleteForEveryone',
|
||||
'expirationTimerUpdate', // non-urgent
|
||||
|
@ -46,6 +47,14 @@ export const sendTypesEnum = z.enum([
|
|||
'keySyncRequest', // urgent because it blocks the link process
|
||||
'pniIdentitySyncRequest', // urgent because we need our PNI to be fully functional
|
||||
|
||||
// The actual sync messages, which we never send, just receive - non-urgent
|
||||
'blockSync',
|
||||
'configurationSync',
|
||||
'contactSync',
|
||||
'groupSync',
|
||||
'keySync',
|
||||
'pniIdentitySync',
|
||||
|
||||
// Syncs, default non-urgent
|
||||
'fetchLatestManifestSync',
|
||||
'fetchLocalProfileSync',
|
||||
|
|
Loading…
Add table
Reference in a new issue