// Copyright 2024 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { isNumber, partition } from 'lodash'; import * as log from '../logging/log'; import * as Errors from '../types/errors'; import * as MIME from '../types/MIME'; import * as LinkPreview from '../types/LinkPreview'; import { getAuthor, isStory, messageHasPaymentEvent } from './helpers'; import { getMessageIdForLogging } from '../util/idForLogging'; import { getSenderIdentifier } from '../util/getSenderIdentifier'; import { isNormalNumber } from '../util/isNormalNumber'; import { getOwn } from '../util/getOwn'; import { SendActionType, sendStateReducer, SendStatus, } from './MessageSendState'; import { DataReader, DataWriter } from '../sql/Client'; import { eraseMessageContents, postSaveUpdates } from '../util/cleanup'; import { isDirectConversation, isGroup, isGroupV1, } from '../util/whatTypeOfConversation'; import { generateMessageId } from '../util/generateMessageId'; import { hasErrors, isEndSession, isExpirationTimerUpdate, isGroupUpdate, isTapToView, isUnsupportedMessage, } from '../state/selectors/message'; import { drop } from '../util/drop'; import { strictAssert } from '../util/assert'; import { isAciString } from '../util/isAciString'; import { copyFromQuotedMessage } from './copyQuote'; import { findStoryMessages } from '../util/findStoryMessage'; import { getRoomIdFromCallLink } from '../util/callLinksRingrtc'; import { isNotNil } from '../util/isNotNil'; import { normalizeServiceId } from '../types/ServiceId'; import { BodyRange } from '../types/BodyRange'; import { hydrateStoryContext } from '../util/hydrateStoryContext'; import { isMessageEmpty } from '../util/isMessageEmpty'; import { isValidTapToView } from '../util/isValidTapToView'; import { getNotificationTextForMessage } from '../util/getNotificationTextForMessage'; import { getMessageAuthorText } from '../util/getMessageAuthorText'; import { GiftBadgeStates } from '../components/conversation/Message'; import { getUserLanguages } from '../util/userLanguages'; import { parseBoostBadgeListFromServer } from '../badges/parseBadgesFromServer'; import { SignalService as Proto } from '../protobuf'; import { modifyTargetMessage, ModifyTargetMessageResult, } from '../util/modifyTargetMessage'; import { saveAndNotify } from './saveAndNotify'; import { MessageModel } from '../models/messages'; import type { SentEventData } from '../textsecure/messageReceiverEvents'; import type { ProcessedDataMessage, ProcessedUnidentifiedDeliveryStatus, } from '../textsecure/Types'; import type { ServiceIdString } from '../types/ServiceId'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; const CURRENT_PROTOCOL_VERSION = Proto.DataMessage.ProtocolVersion.CURRENT; const INITIAL_PROTOCOL_VERSION = Proto.DataMessage.ProtocolVersion.INITIAL; export async function handleDataMessage( message: MessageModel, initialMessage: ProcessedDataMessage, confirm: () => void, options: { data?: SentEventData } = {} ): Promise { const { data } = options; const { upgradeMessageSchema } = window.Signal.Migrations; // This function is called from the background script in a few scenarios: // 1. on an incoming message // 2. on a sent message sync'd from another device // 3. in rare cases, an incoming message can be retried, though it will // still go through one of the previous two codepaths const source = message.get('source'); const sourceServiceId = message.get('sourceServiceId'); const type = message.get('type'); const conversationId = message.get('conversationId'); const fromContact = getAuthor(message.attributes); if (fromContact) { fromContact.setRegistered(); } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const conversation = window.ConversationController.get(conversationId)!; const idLog = `handleDataMessage/${conversation.idForLogging()} ${getMessageIdForLogging(message.attributes)}`; await conversation.queueJob(idLog, async () => { log.info(`${idLog}: starting processing in queue`); // First, check for duplicates. If we find one, stop processing here. const senderIdentifier = getSenderIdentifier(message.attributes); const inMemoryMessage = window.MessageCache.findBySender(senderIdentifier); if (inMemoryMessage) { log.info(`${idLog}: cache hit`, senderIdentifier); } else { log.info(`${idLog}: duplicate check db lookup needed`, senderIdentifier); } let existingMessage = inMemoryMessage; if (!existingMessage) { const fromDb = await DataReader.getMessageBySender(message.attributes); existingMessage = fromDb ? window.MessageCache.register(new MessageModel(fromDb)) : undefined; } const isUpdate = Boolean(data && data.isRecipientUpdate); const isDuplicateMessage = existingMessage && (type === 'incoming' || (type === 'story' && existingMessage.get('storyDistributionListId') === message.attributes.storyDistributionListId)); if (isDuplicateMessage) { log.warn( `${idLog}: Received duplicate message`, getMessageIdForLogging(message.attributes) ); confirm(); return; } if (type === 'outgoing') { if (isUpdate && existingMessage) { log.info( `${idLog}: Updating message ${getMessageIdForLogging(message.attributes)} with received transcript` ); const toUpdate = window.MessageCache.register(existingMessage); const unidentifiedDeliveriesSet = new Set( toUpdate.get('unidentifiedDeliveries') ?? [] ); const sendStateByConversationId = { ...(toUpdate.get('sendStateByConversationId') || {}), }; const unidentifiedStatus: Array = data && Array.isArray(data.unidentifiedStatus) ? data.unidentifiedStatus : []; unidentifiedStatus.forEach( ({ destinationServiceId, destination, unidentified }) => { const identifier = destinationServiceId || destination; if (!identifier) { return; } const destinationConversation = window.ConversationController.lookupOrCreate({ serviceId: destinationServiceId, e164: destination || undefined, reason: `handleDataMessage(${initialMessage.timestamp})`, }); if (!destinationConversation) { return; } const updatedAt: number = data && isNormalNumber(data.timestamp) ? data.timestamp : Date.now(); const previousSendState = getOwn( sendStateByConversationId, destinationConversation.id ); sendStateByConversationId[destinationConversation.id] = previousSendState ? sendStateReducer(previousSendState, { type: SendActionType.Sent, updatedAt, }) : { status: SendStatus.Sent, updatedAt, }; if (unidentified) { unidentifiedDeliveriesSet.add(identifier); } } ); toUpdate.set({ sendStateByConversationId, unidentifiedDeliveries: [...unidentifiedDeliveriesSet], }); await DataWriter.saveMessage(toUpdate.attributes, { ourAci: window.textsecure.storage.user.getCheckedAci(), postSaveUpdates, }); confirm(); return; } if (isUpdate) { log.warn( `${idLog}: Received update transcript, but no existing entry for message ${getMessageIdForLogging(message.attributes)}. Dropping.` ); confirm(); return; } if (existingMessage) { // TODO: (DESKTOP-7301): improve this check in case previous message is not yet // registered in memory log.warn( `${idLog}: Received duplicate transcript for message ${getMessageIdForLogging(message.attributes)}, but it was not an update transcript. Dropping.` ); confirm(); return; } } // GroupV2 if (initialMessage.groupV2) { if (isGroupV1(conversation.attributes)) { // If we received a GroupV2 message in a GroupV1 group, we migrate! const { revision, groupChange } = initialMessage.groupV2; await window.Signal.Groups.respondToGroupV2Migration({ conversation, groupChange: groupChange ? { base64: groupChange, isTrusted: false, } : undefined, newRevision: revision, receivedAt: message.get('received_at'), sentAt: message.get('sent_at'), }); } else if ( initialMessage.groupV2.masterKey && initialMessage.groupV2.secretParams && initialMessage.groupV2.publicParams ) { // Repair core GroupV2 data if needed await conversation.maybeRepairGroupV2({ masterKey: initialMessage.groupV2.masterKey, secretParams: initialMessage.groupV2.secretParams, publicParams: initialMessage.groupV2.publicParams, }); const existingRevision = conversation.get('revision'); const isFirstUpdate = !isNumber(existingRevision); // Standard GroupV2 modification codepath const isV2GroupUpdate = initialMessage.groupV2 && isNumber(initialMessage.groupV2.revision) && (isFirstUpdate || initialMessage.groupV2.revision > existingRevision); if (isV2GroupUpdate && initialMessage.groupV2) { const { revision, groupChange } = initialMessage.groupV2; try { await window.Signal.Groups.maybeUpdateGroup({ conversation, groupChange: groupChange ? { base64: groupChange, isTrusted: false, } : undefined, newRevision: revision, receivedAt: message.get('received_at'), sentAt: message.get('sent_at'), }); } catch (error) { const errorText = Errors.toLogFormat(error); log.error( `${idLog}: Failed to process group update as part of message ${getMessageIdForLogging(message.attributes)}: ${errorText}` ); throw error; } } } } const ourAci = window.textsecure.storage.user.getCheckedAci(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const sender = window.ConversationController.lookupOrCreate({ e164: source, serviceId: sourceServiceId, reason: 'handleDataMessage', })!; const hasGroupV2Prop = Boolean(initialMessage.groupV2); // Drop if from blocked user. Only GroupV2 messages should need to be dropped here. const isBlocked = (source && window.storage.blocked.isBlocked(source)) || (sourceServiceId && window.storage.blocked.isServiceIdBlocked(sourceServiceId)); if (isBlocked) { log.info( `${idLog}: Dropping message from blocked sender. hasGroupV2Prop: ${hasGroupV2Prop}` ); confirm(); return; } const areWeMember = !conversation.get('left') && conversation.hasMember(ourAci); // Drop an incoming GroupV2 message if we or the sender are not part of the group // after applying the message's associated group changes. if ( type === 'incoming' && !isDirectConversation(conversation.attributes) && hasGroupV2Prop && (!areWeMember || (sourceServiceId && !conversation.hasMember(sourceServiceId))) ) { log.warn( `${idLog}: Received message destined for group, which we or the sender are not a part of. Dropping.` ); confirm(); return; } // We drop incoming messages for v1 groups we already know about, which we're not // a part of, except for group updates. Because group v1 updates haven't been // applied by this point. // Note: if we have no information about a group at all, we will accept those // messages. We detect that via a missing 'members' field. if ( type === 'incoming' && !isDirectConversation(conversation.attributes) && !hasGroupV2Prop && conversation.get('members') && !areWeMember ) { log.warn( `Received message destined for group ${conversation.idForLogging()}, which we're not a part of. Dropping.` ); confirm(); return; } // Drop incoming messages to announcement only groups where sender is not admin if (conversation.get('announcementsOnly')) { const senderServiceId = sender.getServiceId(); if (!senderServiceId || !conversation.isAdmin(senderServiceId)) { confirm(); return; } } const messageId = message.get('id') || generateMessageId(message.get('received_at')).id; // Send delivery receipts, but only for non-story sealed sender messages // and not for messages from unaccepted conversations if ( type === 'incoming' && message.get('unidentifiedDeliveryReceived') && !hasErrors(message.attributes) && conversation.getAccepted() ) { // Note: We both queue and batch because we want to wait until we are done // processing incoming messages to start sending outgoing delivery receipts. // The queue can be paused easily. drop( window.Whisper.deliveryReceiptQueue.add(() => { strictAssert( isAciString(sourceServiceId), 'Incoming message must be from ACI' ); window.Whisper.deliveryReceiptBatcher.add({ messageId, conversationId, senderE164: source, senderAci: sourceServiceId, timestamp: message.get('sent_at'), isDirectConversation: isDirectConversation(conversation.attributes), }); }) ); } const { storyContext } = initialMessage; let storyContextLogId = 'no storyContext'; if (storyContext) { storyContextLogId = `storyContext(${storyContext.sentTimestamp}, ` + `${storyContext.authorAci})`; } // Ensure that quote author's conversation exist if (initialMessage.quote) { window.ConversationController.lookupOrCreate({ serviceId: initialMessage.quote.authorAci, reason: 'handleDataMessage.quote.author', }); } const [quote, storyQuotes] = await Promise.all([ initialMessage.quote ? copyFromQuotedMessage(initialMessage.quote, conversation.id) : undefined, findStoryMessages(conversation.id, storyContext), ]); const storyQuote = storyQuotes.find(candidateQuote => { const sendStateByConversationId = candidateQuote.sendStateByConversationId || {}; const sendState = sendStateByConversationId[sender.id]; const storyQuoteIsFromSelf = candidateQuote.sourceServiceId === window.storage.user.getCheckedAci(); if (!storyQuoteIsFromSelf) { return true; } // The sender is not a recipient for this story if (sendState === undefined) { return false; } // Group replies are always allowed if (!isDirectConversation(conversation.attributes)) { return true; } // For 1:1 stories, we need to check if they can be replied to return sendState.isAllowedToReplyToStory !== false; }); if ( storyContext && !storyQuote && !isDirectConversation(conversation.attributes) ) { log.warn( `${idLog}: Received ${storyContextLogId} message in group but no matching story. Dropping.` ); confirm(); return; } if (storyQuote) { const { storyDistributionListId } = storyQuote; if (storyDistributionListId) { const storyDistribution = await DataReader.getStoryDistributionWithMembers( storyDistributionListId ); if (!storyDistribution) { log.warn( `${idLog}: Received ${storyContextLogId} message for story with no associated distribution list. Dropping.` ); confirm(); return; } if (!storyDistribution.allowsReplies) { log.warn( `${idLog}: Received ${storyContextLogId} message but distribution list does not allow replies. Dropping.` ); confirm(); return; } } } const withQuoteReference = { ...message.attributes, ...initialMessage, quote, storyId: storyQuote?.id, }; // There are type conflicts between ModelAttributesType and protos passed in here // eslint-disable-next-line @typescript-eslint/no-explicit-any const dataMessage = await upgradeMessageSchema(withQuoteReference as any); const isGroupStoryReply = isGroup(conversation.attributes) && dataMessage.storyId; try { const now = new Date().getTime(); const urls = LinkPreview.findLinks(dataMessage.body || ''); const incomingPreview = dataMessage.preview || []; const preview = incomingPreview .map((item: LinkPreviewType) => { if (LinkPreview.isCallLink(item.url)) { return { ...item, isCallLink: true, callLinkRoomId: getRoomIdFromCallLink(item.url), }; } if ( !LinkPreview.isValidLinkPreview(urls, item, { isStory: isStory(message.attributes), }) ) { return null; } return item; }) .filter(isNotNil); if (preview.length < incomingPreview.length) { log.info( `${getMessageIdForLogging(message.attributes)}: Eliminated ${ incomingPreview.length - preview.length } previews with invalid urls'` ); } const ourPni = window.textsecure.storage.user.getCheckedPni(); const ourServiceIds: Set = new Set([ourAci, ourPni]); const [longMessageAttachments, normalAttachments] = partition( dataMessage.attachments ?? [], attachment => MIME.isLongMessage(attachment.contentType) ); // eslint-disable-next-line no-param-reassign message = window.MessageCache.register(message); message.set({ id: messageId, attachments: normalAttachments, body: dataMessage.body, bodyAttachment: longMessageAttachments[0], bodyRanges: dataMessage.bodyRanges, contact: dataMessage.contact, conversationId: conversation.id, decrypted_at: now, errors: [], flags: dataMessage.flags, giftBadge: initialMessage.giftBadge, hasAttachments: dataMessage.hasAttachments, hasFileAttachments: dataMessage.hasFileAttachments, hasVisualMediaAttachments: dataMessage.hasVisualMediaAttachments, isViewOnce: Boolean(dataMessage.isViewOnce), mentionsMe: (dataMessage.bodyRanges ?? []).some(bodyRange => { if (!BodyRange.isMention(bodyRange)) { return false; } return ourServiceIds.has( normalizeServiceId( bodyRange.mentionAci, 'handleDataMessage: mentionsMe check' ) ); }), preview, requiredProtocolVersion: dataMessage.requiredProtocolVersion || INITIAL_PROTOCOL_VERSION, supportedVersionAtReceive: CURRENT_PROTOCOL_VERSION, payment: dataMessage.payment, quote: dataMessage.quote, schemaVersion: dataMessage.schemaVersion, sticker: dataMessage.sticker, storyId: dataMessage.storyId, }); if (storyQuote) { await hydrateStoryContext(message.id, storyQuote, { shouldSave: true, }); } const isSupported = !isUnsupportedMessage(message.attributes); if (!isSupported) { await eraseMessageContents(message); } if (isSupported) { const attributes = { ...conversation.attributes, }; // Drop empty messages after. This needs to happen after the initial // message.set call and after GroupV1 processing to make sure all possible // properties are set before we determine that a message is empty. if (isMessageEmpty(message.attributes)) { log.info(`${idLog}: Dropping empty message`); confirm(); return; } if (isStory(message.attributes)) { attributes.hasPostedStory = true; } else { attributes.active_at = now; } conversation.set(attributes); // Sync group story reply expiration timers with the parent story's // expiration timer if (isGroupStoryReply && storyQuote) { message.set({ expireTimer: storyQuote.expireTimer, expirationStartTimestamp: storyQuote.expirationStartTimestamp, }); } if (dataMessage.expireTimer && !isExpirationTimerUpdate(dataMessage)) { message.set({ expireTimer: dataMessage.expireTimer }); if (isStory(message.attributes)) { log.info(`${idLog}: Starting story expiration`); message.set({ expirationStartTimestamp: dataMessage.timestamp, }); } } if (!hasGroupV2Prop && !isStory(message.attributes)) { if (isExpirationTimerUpdate(message.attributes)) { message.set({ expirationTimerUpdate: { source, sourceServiceId, expireTimer: initialMessage.expireTimer, }, }); if (conversation.get('expireTimer') !== dataMessage.expireTimer) { log.info('Incoming expirationTimerUpdate changed timer', { id: conversation.idForLogging(), expireTimer: dataMessage.expireTimer || 'disabled', source: idLog, }); conversation.set({ expireTimer: dataMessage.expireTimer, }); } } // Note: For incoming expire timer updates (not normal messages that come // along with an expireTimer), the conversation will be updated by this // point and these calls will return early. if (dataMessage.expireTimer) { void conversation.updateExpirationTimer(dataMessage.expireTimer, { source: sourceServiceId || source, receivedAt: message.get('received_at'), receivedAtMS: message.get('received_at_ms'), sentAt: message.get('sent_at'), reason: idLog, version: initialMessage.expireTimerVersion, }); } else if ( // We won't turn off timers for these kinds of messages: !isGroupUpdate(message.attributes) && !isEndSession(message.attributes) ) { void conversation.updateExpirationTimer(undefined, { source: sourceServiceId || source, receivedAt: message.get('received_at'), receivedAtMS: message.get('received_at_ms'), sentAt: message.get('sent_at'), reason: idLog, version: initialMessage.expireTimerVersion, }); } } if (initialMessage.profileKey) { const { profileKey } = initialMessage; if ( source === window.textsecure.storage.user.getNumber() || sourceServiceId === window.textsecure.storage.user.getAci() ) { conversation.set({ profileSharing: true }); } else if (isDirectConversation(conversation.attributes)) { drop( conversation.setProfileKey(profileKey, { reason: 'handleDataMessage', }) ); } else { const local = window.ConversationController.lookupOrCreate({ e164: source, serviceId: sourceServiceId, reason: 'handleDataMessage:setProfileKey', }); drop( local?.setProfileKey(profileKey, { reason: 'handleDataMessage', }) ); } } if (isTapToView(message.attributes) && type === 'outgoing') { await eraseMessageContents(message); } if ( type === 'incoming' && isTapToView(message.attributes) && !isValidTapToView(message.attributes) ) { log.warn( `${idLog}: Received tap to view message with invalid data. Erasing contents.` ); message.set({ isTapToViewInvalid: true, }); await eraseMessageContents(message); } } const conversationTimestamp = conversation.get('timestamp'); if ( !isStory(message.attributes) && !isGroupStoryReply && (!conversationTimestamp || message.get('sent_at') > conversationTimestamp) && messageHasPaymentEvent(message.attributes) ) { conversation.set({ lastMessage: getNotificationTextForMessage(message.attributes), lastMessageAuthor: getMessageAuthorText(message.attributes), timestamp: message.get('sent_at'), }); } // eslint-disable-next-line no-param-reassign message = window.MessageCache.register(message); conversation.incrementMessageCount(); // If we sent a message in a given conversation, unarchive it! if (type === 'outgoing') { conversation.setArchived(false); } await DataWriter.updateConversation(conversation.attributes); const giftBadge = message.get('giftBadge'); if (giftBadge && giftBadge.state !== GiftBadgeStates.Failed) { const { level } = giftBadge; const { updatesUrl } = window.SignalContext.config; strictAssert( typeof updatesUrl === 'string', 'getProfile: expected updatesUrl to be a defined string' ); const userLanguages = getUserLanguages( window.SignalContext.getPreferredSystemLocales(), window.SignalContext.getResolvedMessagesLocale() ); const { messaging } = window.textsecure; if (!messaging) { throw new Error(`${idLog}: messaging is not available`); } const response = await messaging.server.getSubscriptionConfiguration(userLanguages); const boostBadgesByLevel = parseBoostBadgeListFromServer( response, updatesUrl ); const badge = boostBadgesByLevel[level]; if (!badge) { log.error( `${idLog}: gift badge with level ${level} not found on server` ); } else { await window.reduxActions.badges.updateOrCreate([badge]); giftBadge.id = badge.id; } } const result = await modifyTargetMessage(message, conversation, { isFirstRun: true, skipEdits: false, }); if (result === ModifyTargetMessageResult.Deleted) { confirm(); return; } log.info(`${idLog}: Batching save`); drop(saveAndNotify(message, conversation, confirm)); } catch (error) { const errorForLog = Errors.toLogFormat(error); log.error(`${idLog}: error:`, errorForLog); throw error; } }); }