signal-desktop/ts/messages/handleDataMessage.ts
automated-signal 0d33057cc4
Check validity of link previews on import
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
2025-01-17 10:03:47 -08:00

810 lines
27 KiB
TypeScript

// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isNumber, partition } from 'lodash';
import * as log from '../logging/log';
import * as Errors from '../types/errors';
import * as MIME from '../types/MIME';
import * as LinkPreview from '../types/LinkPreview';
import { getAuthor, isStory, messageHasPaymentEvent } from './helpers';
import { getMessageIdForLogging } from '../util/idForLogging';
import { getSenderIdentifier } from '../util/getSenderIdentifier';
import { isNormalNumber } from '../util/isNormalNumber';
import { getOwn } from '../util/getOwn';
import {
SendActionType,
sendStateReducer,
SendStatus,
} from './MessageSendState';
import { DataReader, DataWriter } from '../sql/Client';
import { eraseMessageContents, postSaveUpdates } from '../util/cleanup';
import {
isDirectConversation,
isGroup,
isGroupV1,
} from '../util/whatTypeOfConversation';
import { generateMessageId } from '../util/generateMessageId';
import {
hasErrors,
isEndSession,
isExpirationTimerUpdate,
isGroupUpdate,
isTapToView,
isUnsupportedMessage,
} from '../state/selectors/message';
import { drop } from '../util/drop';
import { strictAssert } from '../util/assert';
import { isAciString } from '../util/isAciString';
import { copyFromQuotedMessage } from './copyQuote';
import { findStoryMessages } from '../util/findStoryMessage';
import { getRoomIdFromCallLink } from '../util/callLinksRingrtc';
import { isNotNil } from '../util/isNotNil';
import { normalizeServiceId } from '../types/ServiceId';
import { BodyRange } from '../types/BodyRange';
import { hydrateStoryContext } from '../util/hydrateStoryContext';
import { isMessageEmpty } from '../util/isMessageEmpty';
import { isValidTapToView } from '../util/isValidTapToView';
import { getNotificationTextForMessage } from '../util/getNotificationTextForMessage';
import { getMessageAuthorText } from '../util/getMessageAuthorText';
import { GiftBadgeStates } from '../components/conversation/Message';
import { getUserLanguages } from '../util/userLanguages';
import { parseBoostBadgeListFromServer } from '../badges/parseBadgesFromServer';
import { SignalService as Proto } from '../protobuf';
import {
modifyTargetMessage,
ModifyTargetMessageResult,
} from '../util/modifyTargetMessage';
import { saveAndNotify } from './saveAndNotify';
import { MessageModel } from '../models/messages';
import type { SentEventData } from '../textsecure/messageReceiverEvents';
import type {
ProcessedDataMessage,
ProcessedUnidentifiedDeliveryStatus,
} from '../textsecure/Types';
import type { ServiceIdString } from '../types/ServiceId';
import type { LinkPreviewType } from '../types/message/LinkPreviews';
const CURRENT_PROTOCOL_VERSION = Proto.DataMessage.ProtocolVersion.CURRENT;
const INITIAL_PROTOCOL_VERSION = Proto.DataMessage.ProtocolVersion.INITIAL;
export async function handleDataMessage(
message: MessageModel,
initialMessage: ProcessedDataMessage,
confirm: () => void,
options: { data?: SentEventData } = {}
): Promise<void> {
const { data } = options;
const { upgradeMessageSchema } = window.Signal.Migrations;
// This function is called from the background script in a few scenarios:
// 1. on an incoming message
// 2. on a sent message sync'd from another device
// 3. in rare cases, an incoming message can be retried, though it will
// still go through one of the previous two codepaths
const source = message.get('source');
const sourceServiceId = message.get('sourceServiceId');
const type = message.get('type');
const conversationId = message.get('conversationId');
const fromContact = getAuthor(message.attributes);
if (fromContact) {
fromContact.setRegistered();
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const conversation = window.ConversationController.get(conversationId)!;
const idLog = `handleDataMessage/${conversation.idForLogging()} ${getMessageIdForLogging(message.attributes)}`;
await conversation.queueJob(idLog, async () => {
log.info(`${idLog}: starting processing in queue`);
// First, check for duplicates. If we find one, stop processing here.
const senderIdentifier = getSenderIdentifier(message.attributes);
const inMemoryMessage = window.MessageCache.findBySender(senderIdentifier);
if (inMemoryMessage) {
log.info(`${idLog}: cache hit`, senderIdentifier);
} else {
log.info(`${idLog}: duplicate check db lookup needed`, senderIdentifier);
}
let existingMessage = inMemoryMessage;
if (!existingMessage) {
const fromDb = await DataReader.getMessageBySender(message.attributes);
existingMessage = fromDb
? window.MessageCache.register(new MessageModel(fromDb))
: undefined;
}
const isUpdate = Boolean(data && data.isRecipientUpdate);
const isDuplicateMessage =
existingMessage &&
(type === 'incoming' ||
(type === 'story' &&
existingMessage.get('storyDistributionListId') ===
message.attributes.storyDistributionListId));
if (isDuplicateMessage) {
log.warn(
`${idLog}: Received duplicate message`,
getMessageIdForLogging(message.attributes)
);
confirm();
return;
}
if (type === 'outgoing') {
if (isUpdate && existingMessage) {
log.info(
`${idLog}: Updating message ${getMessageIdForLogging(message.attributes)} with received transcript`
);
const toUpdate = window.MessageCache.register(existingMessage);
const unidentifiedDeliveriesSet = new Set<string>(
toUpdate.get('unidentifiedDeliveries') ?? []
);
const sendStateByConversationId = {
...(toUpdate.get('sendStateByConversationId') || {}),
};
const unidentifiedStatus: Array<ProcessedUnidentifiedDeliveryStatus> =
data && Array.isArray(data.unidentifiedStatus)
? data.unidentifiedStatus
: [];
unidentifiedStatus.forEach(
({ destinationServiceId, destination, unidentified }) => {
const identifier = destinationServiceId || destination;
if (!identifier) {
return;
}
const destinationConversation =
window.ConversationController.lookupOrCreate({
serviceId: destinationServiceId,
e164: destination || undefined,
reason: `handleDataMessage(${initialMessage.timestamp})`,
});
if (!destinationConversation) {
return;
}
const updatedAt: number =
data && isNormalNumber(data.timestamp)
? data.timestamp
: Date.now();
const previousSendState = getOwn(
sendStateByConversationId,
destinationConversation.id
);
sendStateByConversationId[destinationConversation.id] =
previousSendState
? sendStateReducer(previousSendState, {
type: SendActionType.Sent,
updatedAt,
})
: {
status: SendStatus.Sent,
updatedAt,
};
if (unidentified) {
unidentifiedDeliveriesSet.add(identifier);
}
}
);
toUpdate.set({
sendStateByConversationId,
unidentifiedDeliveries: [...unidentifiedDeliveriesSet],
});
await DataWriter.saveMessage(toUpdate.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
confirm();
return;
}
if (isUpdate) {
log.warn(
`${idLog}: Received update transcript, but no existing entry for message ${getMessageIdForLogging(message.attributes)}. Dropping.`
);
confirm();
return;
}
if (existingMessage) {
// TODO: (DESKTOP-7301): improve this check in case previous message is not yet
// registered in memory
log.warn(
`${idLog}: Received duplicate transcript for message ${getMessageIdForLogging(message.attributes)}, but it was not an update transcript. Dropping.`
);
confirm();
return;
}
}
// GroupV2
if (initialMessage.groupV2) {
if (isGroupV1(conversation.attributes)) {
// If we received a GroupV2 message in a GroupV1 group, we migrate!
const { revision, groupChange } = initialMessage.groupV2;
await window.Signal.Groups.respondToGroupV2Migration({
conversation,
groupChange: groupChange
? {
base64: groupChange,
isTrusted: false,
}
: undefined,
newRevision: revision,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
});
} else if (
initialMessage.groupV2.masterKey &&
initialMessage.groupV2.secretParams &&
initialMessage.groupV2.publicParams
) {
// Repair core GroupV2 data if needed
await conversation.maybeRepairGroupV2({
masterKey: initialMessage.groupV2.masterKey,
secretParams: initialMessage.groupV2.secretParams,
publicParams: initialMessage.groupV2.publicParams,
});
const existingRevision = conversation.get('revision');
const isFirstUpdate = !isNumber(existingRevision);
// Standard GroupV2 modification codepath
const isV2GroupUpdate =
initialMessage.groupV2 &&
isNumber(initialMessage.groupV2.revision) &&
(isFirstUpdate || initialMessage.groupV2.revision > existingRevision);
if (isV2GroupUpdate && initialMessage.groupV2) {
const { revision, groupChange } = initialMessage.groupV2;
try {
await window.Signal.Groups.maybeUpdateGroup({
conversation,
groupChange: groupChange
? {
base64: groupChange,
isTrusted: false,
}
: undefined,
newRevision: revision,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
});
} catch (error) {
const errorText = Errors.toLogFormat(error);
log.error(
`${idLog}: Failed to process group update as part of message ${getMessageIdForLogging(message.attributes)}: ${errorText}`
);
throw error;
}
}
}
}
const ourAci = window.textsecure.storage.user.getCheckedAci();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const sender = window.ConversationController.lookupOrCreate({
e164: source,
serviceId: sourceServiceId,
reason: 'handleDataMessage',
})!;
const hasGroupV2Prop = Boolean(initialMessage.groupV2);
// Drop if from blocked user. Only GroupV2 messages should need to be dropped here.
const isBlocked =
(source && window.storage.blocked.isBlocked(source)) ||
(sourceServiceId &&
window.storage.blocked.isServiceIdBlocked(sourceServiceId));
if (isBlocked) {
log.info(
`${idLog}: Dropping message from blocked sender. hasGroupV2Prop: ${hasGroupV2Prop}`
);
confirm();
return;
}
const areWeMember =
!conversation.get('left') && conversation.hasMember(ourAci);
// Drop an incoming GroupV2 message if we or the sender are not part of the group
// after applying the message's associated group changes.
if (
type === 'incoming' &&
!isDirectConversation(conversation.attributes) &&
hasGroupV2Prop &&
(!areWeMember ||
(sourceServiceId && !conversation.hasMember(sourceServiceId)))
) {
log.warn(
`${idLog}: Received message destined for group, which we or the sender are not a part of. Dropping.`
);
confirm();
return;
}
// We drop incoming messages for v1 groups we already know about, which we're not
// a part of, except for group updates. Because group v1 updates haven't been
// applied by this point.
// Note: if we have no information about a group at all, we will accept those
// messages. We detect that via a missing 'members' field.
if (
type === 'incoming' &&
!isDirectConversation(conversation.attributes) &&
!hasGroupV2Prop &&
conversation.get('members') &&
!areWeMember
) {
log.warn(
`Received message destined for group ${conversation.idForLogging()}, which we're not a part of. Dropping.`
);
confirm();
return;
}
// Drop incoming messages to announcement only groups where sender is not admin
if (conversation.get('announcementsOnly')) {
const senderServiceId = sender.getServiceId();
if (!senderServiceId || !conversation.isAdmin(senderServiceId)) {
confirm();
return;
}
}
const messageId =
message.get('id') || generateMessageId(message.get('received_at')).id;
// Send delivery receipts, but only for non-story sealed sender messages
// and not for messages from unaccepted conversations
if (
type === 'incoming' &&
message.get('unidentifiedDeliveryReceived') &&
!hasErrors(message.attributes) &&
conversation.getAccepted()
) {
// Note: We both queue and batch because we want to wait until we are done
// processing incoming messages to start sending outgoing delivery receipts.
// The queue can be paused easily.
drop(
window.Whisper.deliveryReceiptQueue.add(() => {
strictAssert(
isAciString(sourceServiceId),
'Incoming message must be from ACI'
);
window.Whisper.deliveryReceiptBatcher.add({
messageId,
conversationId,
senderE164: source,
senderAci: sourceServiceId,
timestamp: message.get('sent_at'),
isDirectConversation: isDirectConversation(conversation.attributes),
});
})
);
}
const { storyContext } = initialMessage;
let storyContextLogId = 'no storyContext';
if (storyContext) {
storyContextLogId =
`storyContext(${storyContext.sentTimestamp}, ` +
`${storyContext.authorAci})`;
}
// Ensure that quote author's conversation exist
if (initialMessage.quote) {
window.ConversationController.lookupOrCreate({
serviceId: initialMessage.quote.authorAci,
reason: 'handleDataMessage.quote.author',
});
}
const [quote, storyQuotes] = await Promise.all([
initialMessage.quote
? copyFromQuotedMessage(initialMessage.quote, conversation.id)
: undefined,
findStoryMessages(conversation.id, storyContext),
]);
const storyQuote = storyQuotes.find(candidateQuote => {
const sendStateByConversationId =
candidateQuote.sendStateByConversationId || {};
const sendState = sendStateByConversationId[sender.id];
const storyQuoteIsFromSelf =
candidateQuote.sourceServiceId === window.storage.user.getCheckedAci();
if (!storyQuoteIsFromSelf) {
return true;
}
// The sender is not a recipient for this story
if (sendState === undefined) {
return false;
}
// Group replies are always allowed
if (!isDirectConversation(conversation.attributes)) {
return true;
}
// For 1:1 stories, we need to check if they can be replied to
return sendState.isAllowedToReplyToStory !== false;
});
if (
storyContext &&
!storyQuote &&
!isDirectConversation(conversation.attributes)
) {
log.warn(
`${idLog}: Received ${storyContextLogId} message in group but no matching story. Dropping.`
);
confirm();
return;
}
if (storyQuote) {
const { storyDistributionListId } = storyQuote;
if (storyDistributionListId) {
const storyDistribution =
await DataReader.getStoryDistributionWithMembers(
storyDistributionListId
);
if (!storyDistribution) {
log.warn(
`${idLog}: Received ${storyContextLogId} message for story with no associated distribution list. Dropping.`
);
confirm();
return;
}
if (!storyDistribution.allowsReplies) {
log.warn(
`${idLog}: Received ${storyContextLogId} message but distribution list does not allow replies. Dropping.`
);
confirm();
return;
}
}
}
const withQuoteReference = {
...message.attributes,
...initialMessage,
quote,
storyId: storyQuote?.id,
};
// There are type conflicts between ModelAttributesType and protos passed in here
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const dataMessage = await upgradeMessageSchema(withQuoteReference as any);
const isGroupStoryReply =
isGroup(conversation.attributes) && dataMessage.storyId;
try {
const now = new Date().getTime();
const urls = LinkPreview.findLinks(dataMessage.body || '');
const incomingPreview = dataMessage.preview || [];
const preview = incomingPreview
.map((item: LinkPreviewType) => {
if (LinkPreview.isCallLink(item.url)) {
return {
...item,
isCallLink: true,
callLinkRoomId: getRoomIdFromCallLink(item.url),
};
}
if (
!LinkPreview.isValidLinkPreview(urls, item, {
isStory: isStory(message.attributes),
})
) {
return null;
}
return item;
})
.filter(isNotNil);
if (preview.length < incomingPreview.length) {
log.info(
`${getMessageIdForLogging(message.attributes)}: Eliminated ${
incomingPreview.length - preview.length
} previews with invalid urls'`
);
}
const ourPni = window.textsecure.storage.user.getCheckedPni();
const ourServiceIds: Set<ServiceIdString> = new Set([ourAci, ourPni]);
const [longMessageAttachments, normalAttachments] = partition(
dataMessage.attachments ?? [],
attachment => MIME.isLongMessage(attachment.contentType)
);
// eslint-disable-next-line no-param-reassign
message = window.MessageCache.register(message);
message.set({
id: messageId,
attachments: normalAttachments,
body: dataMessage.body,
bodyAttachment: longMessageAttachments[0],
bodyRanges: dataMessage.bodyRanges,
contact: dataMessage.contact,
conversationId: conversation.id,
decrypted_at: now,
errors: [],
flags: dataMessage.flags,
giftBadge: initialMessage.giftBadge,
hasAttachments: dataMessage.hasAttachments,
hasFileAttachments: dataMessage.hasFileAttachments,
hasVisualMediaAttachments: dataMessage.hasVisualMediaAttachments,
isViewOnce: Boolean(dataMessage.isViewOnce),
mentionsMe: (dataMessage.bodyRanges ?? []).some(bodyRange => {
if (!BodyRange.isMention(bodyRange)) {
return false;
}
return ourServiceIds.has(
normalizeServiceId(
bodyRange.mentionAci,
'handleDataMessage: mentionsMe check'
)
);
}),
preview,
requiredProtocolVersion:
dataMessage.requiredProtocolVersion || INITIAL_PROTOCOL_VERSION,
supportedVersionAtReceive: CURRENT_PROTOCOL_VERSION,
payment: dataMessage.payment,
quote: dataMessage.quote,
schemaVersion: dataMessage.schemaVersion,
sticker: dataMessage.sticker,
storyId: dataMessage.storyId,
});
if (storyQuote) {
await hydrateStoryContext(message.id, storyQuote, {
shouldSave: true,
});
}
const isSupported = !isUnsupportedMessage(message.attributes);
if (!isSupported) {
await eraseMessageContents(message);
}
if (isSupported) {
const attributes = {
...conversation.attributes,
};
// Drop empty messages after. This needs to happen after the initial
// message.set call and after GroupV1 processing to make sure all possible
// properties are set before we determine that a message is empty.
if (isMessageEmpty(message.attributes)) {
log.info(`${idLog}: Dropping empty message`);
confirm();
return;
}
if (isStory(message.attributes)) {
attributes.hasPostedStory = true;
} else {
attributes.active_at = now;
}
conversation.set(attributes);
// Sync group story reply expiration timers with the parent story's
// expiration timer
if (isGroupStoryReply && storyQuote) {
message.set({
expireTimer: storyQuote.expireTimer,
expirationStartTimestamp: storyQuote.expirationStartTimestamp,
});
}
if (dataMessage.expireTimer && !isExpirationTimerUpdate(dataMessage)) {
message.set({ expireTimer: dataMessage.expireTimer });
if (isStory(message.attributes)) {
log.info(`${idLog}: Starting story expiration`);
message.set({
expirationStartTimestamp: dataMessage.timestamp,
});
}
}
if (!hasGroupV2Prop && !isStory(message.attributes)) {
if (isExpirationTimerUpdate(message.attributes)) {
message.set({
expirationTimerUpdate: {
source,
sourceServiceId,
expireTimer: initialMessage.expireTimer,
},
});
if (conversation.get('expireTimer') !== dataMessage.expireTimer) {
log.info('Incoming expirationTimerUpdate changed timer', {
id: conversation.idForLogging(),
expireTimer: dataMessage.expireTimer || 'disabled',
source: idLog,
});
conversation.set({
expireTimer: dataMessage.expireTimer,
});
}
}
// Note: For incoming expire timer updates (not normal messages that come
// along with an expireTimer), the conversation will be updated by this
// point and these calls will return early.
if (dataMessage.expireTimer) {
void conversation.updateExpirationTimer(dataMessage.expireTimer, {
source: sourceServiceId || source,
receivedAt: message.get('received_at'),
receivedAtMS: message.get('received_at_ms'),
sentAt: message.get('sent_at'),
reason: idLog,
version: initialMessage.expireTimerVersion,
});
} else if (
// We won't turn off timers for these kinds of messages:
!isGroupUpdate(message.attributes) &&
!isEndSession(message.attributes)
) {
void conversation.updateExpirationTimer(undefined, {
source: sourceServiceId || source,
receivedAt: message.get('received_at'),
receivedAtMS: message.get('received_at_ms'),
sentAt: message.get('sent_at'),
reason: idLog,
version: initialMessage.expireTimerVersion,
});
}
}
if (initialMessage.profileKey) {
const { profileKey } = initialMessage;
if (
source === window.textsecure.storage.user.getNumber() ||
sourceServiceId === window.textsecure.storage.user.getAci()
) {
conversation.set({ profileSharing: true });
} else if (isDirectConversation(conversation.attributes)) {
drop(
conversation.setProfileKey(profileKey, {
reason: 'handleDataMessage',
})
);
} else {
const local = window.ConversationController.lookupOrCreate({
e164: source,
serviceId: sourceServiceId,
reason: 'handleDataMessage:setProfileKey',
});
drop(
local?.setProfileKey(profileKey, {
reason: 'handleDataMessage',
})
);
}
}
if (isTapToView(message.attributes) && type === 'outgoing') {
await eraseMessageContents(message);
}
if (
type === 'incoming' &&
isTapToView(message.attributes) &&
!isValidTapToView(message.attributes)
) {
log.warn(
`${idLog}: Received tap to view message with invalid data. Erasing contents.`
);
message.set({
isTapToViewInvalid: true,
});
await eraseMessageContents(message);
}
}
const conversationTimestamp = conversation.get('timestamp');
if (
!isStory(message.attributes) &&
!isGroupStoryReply &&
(!conversationTimestamp ||
message.get('sent_at') > conversationTimestamp) &&
messageHasPaymentEvent(message.attributes)
) {
conversation.set({
lastMessage: getNotificationTextForMessage(message.attributes),
lastMessageAuthor: getMessageAuthorText(message.attributes),
timestamp: message.get('sent_at'),
});
}
// eslint-disable-next-line no-param-reassign
message = window.MessageCache.register(message);
conversation.incrementMessageCount();
// If we sent a message in a given conversation, unarchive it!
if (type === 'outgoing') {
conversation.setArchived(false);
}
await DataWriter.updateConversation(conversation.attributes);
const giftBadge = message.get('giftBadge');
if (giftBadge && giftBadge.state !== GiftBadgeStates.Failed) {
const { level } = giftBadge;
const { updatesUrl } = window.SignalContext.config;
strictAssert(
typeof updatesUrl === 'string',
'getProfile: expected updatesUrl to be a defined string'
);
const userLanguages = getUserLanguages(
window.SignalContext.getPreferredSystemLocales(),
window.SignalContext.getResolvedMessagesLocale()
);
const { messaging } = window.textsecure;
if (!messaging) {
throw new Error(`${idLog}: messaging is not available`);
}
const response =
await messaging.server.getSubscriptionConfiguration(userLanguages);
const boostBadgesByLevel = parseBoostBadgeListFromServer(
response,
updatesUrl
);
const badge = boostBadgesByLevel[level];
if (!badge) {
log.error(
`${idLog}: gift badge with level ${level} not found on server`
);
} else {
await window.reduxActions.badges.updateOrCreate([badge]);
giftBadge.id = badge.id;
}
}
const result = await modifyTargetMessage(message, conversation, {
isFirstRun: true,
skipEdits: false,
});
if (result === ModifyTargetMessageResult.Deleted) {
confirm();
return;
}
log.info(`${idLog}: Batching save`);
drop(saveAndNotify(message, conversation, confirm));
} catch (error) {
const errorForLog = Errors.toLogFormat(error);
log.error(`${idLog}: error:`, errorForLog);
throw error;
}
});
}