474 lines
15 KiB
TypeScript
474 lines
15 KiB
TypeScript
// Copyright 2022 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
import { isNumber } from 'lodash';
|
|
|
|
import * as Errors from '../../types/errors';
|
|
import type { MessageModel } from '../../models/messages';
|
|
import { getMessageById } from '../../messages/getMessageById';
|
|
import type { ConversationModel } from '../../models/conversations';
|
|
import { isGroupV2, isMe } from '../../util/whatTypeOfConversation';
|
|
import { getSendOptions } from '../../util/getSendOptions';
|
|
import { SignalService as Proto } from '../../protobuf';
|
|
import { handleMessageSend } from '../../util/handleMessageSend';
|
|
import type { CallbackResultType } from '../../textsecure/Types.d';
|
|
import { isSent } from '../../messages/MessageSendState';
|
|
import {
|
|
getLastChallengeError,
|
|
isOutgoing,
|
|
} from '../../state/selectors/message';
|
|
import type { AttachmentType } from '../../textsecure/SendMessage';
|
|
import type { LinkPreviewType } from '../../types/message/LinkPreviews';
|
|
import type { BodyRangesType } from '../../types/Util';
|
|
import type { WhatIsThis } from '../../window.d';
|
|
import type { LoggerType } from '../../types/Logging';
|
|
import type {
|
|
ConversationQueueJobBundle,
|
|
NormalMessageSendJobData,
|
|
} from '../conversationJobQueue';
|
|
|
|
import { handleMultipleSendErrors } from './handleMultipleSendErrors';
|
|
import { ourProfileKeyService } from '../../services/ourProfileKey';
|
|
import { isConversationUnregistered } from '../../util/isConversationUnregistered';
|
|
import { isConversationAccepted } from '../../util/isConversationAccepted';
|
|
|
|
export async function sendNormalMessage(
|
|
conversation: ConversationModel,
|
|
{
|
|
isFinalAttempt,
|
|
shouldContinue,
|
|
timeRemaining,
|
|
log,
|
|
}: ConversationQueueJobBundle,
|
|
data: NormalMessageSendJobData
|
|
): Promise<void> {
|
|
const { Message } = window.Signal.Types;
|
|
|
|
const { messageId, revision } = data;
|
|
const message = await getMessageById(messageId);
|
|
if (!message) {
|
|
log.info(
|
|
`message ${messageId} was not found, maybe because it was deleted. Giving up on sending it`
|
|
);
|
|
return;
|
|
}
|
|
|
|
const messageConversation = message.getConversation();
|
|
if (messageConversation !== conversation) {
|
|
log.error(
|
|
`Message conversation '${messageConversation?.idForLogging()}' does not match job conversation ${conversation.idForLogging()}`
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (!isOutgoing(message.attributes)) {
|
|
log.error(
|
|
`message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it`
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (message.isErased() || message.get('deletedForEveryone')) {
|
|
log.info(`message ${messageId} was erased. Giving up on sending it`);
|
|
return;
|
|
}
|
|
|
|
let messageSendErrors: Array<Error> = [];
|
|
|
|
// We don't want to save errors on messages unless we're giving up. If it's our
|
|
// final attempt, we know upfront that we want to give up. However, we might also
|
|
// want to give up if (1) we get a 508 from the server, asking us to please stop
|
|
// (2) we get a 428 from the server, flagging the message for spam (3) some other
|
|
// reason not known at the time of this writing.
|
|
//
|
|
// This awkward callback lets us hold onto errors we might want to save, so we can
|
|
// decide whether to save them later on.
|
|
const saveErrors = isFinalAttempt
|
|
? undefined
|
|
: (errors: Array<Error>) => {
|
|
messageSendErrors = errors;
|
|
};
|
|
|
|
if (!shouldContinue) {
|
|
log.info(`message ${messageId} ran out of time. Giving up on sending it`);
|
|
await markMessageFailed(message, messageSendErrors);
|
|
return;
|
|
}
|
|
|
|
let profileKey: Uint8Array | undefined;
|
|
if (conversation.get('profileSharing')) {
|
|
profileKey = await ourProfileKeyService.get();
|
|
}
|
|
|
|
let originalError: Error | undefined;
|
|
|
|
try {
|
|
const {
|
|
allRecipientIdentifiers,
|
|
recipientIdentifiersWithoutMe,
|
|
untrustedConversationIds,
|
|
} = getMessageRecipients({
|
|
message,
|
|
conversation,
|
|
});
|
|
|
|
if (untrustedConversationIds.length) {
|
|
window.reduxActions.conversations.conversationStoppedByMissingVerification(
|
|
{
|
|
conversationId: conversation.id,
|
|
untrustedConversationIds,
|
|
}
|
|
);
|
|
throw new Error(
|
|
`Message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Failing this attempt.`
|
|
);
|
|
}
|
|
|
|
if (!allRecipientIdentifiers.length) {
|
|
log.warn(
|
|
`trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up`
|
|
);
|
|
return;
|
|
}
|
|
|
|
const {
|
|
attachments,
|
|
body,
|
|
deletedForEveryoneTimestamp,
|
|
expireTimer,
|
|
mentions,
|
|
messageTimestamp,
|
|
preview,
|
|
quote,
|
|
sticker,
|
|
} = await getMessageSendData({ log, message });
|
|
|
|
let messageSendPromise: Promise<CallbackResultType | void>;
|
|
|
|
if (recipientIdentifiersWithoutMe.length === 0) {
|
|
log.info('sending sync message only');
|
|
const dataMessage = await window.textsecure.messaging.getDataMessage({
|
|
attachments,
|
|
body,
|
|
groupV2: conversation.getGroupV2Info({
|
|
members: recipientIdentifiersWithoutMe,
|
|
}),
|
|
deletedForEveryoneTimestamp,
|
|
expireTimer,
|
|
preview,
|
|
profileKey,
|
|
quote,
|
|
recipients: allRecipientIdentifiers,
|
|
sticker,
|
|
timestamp: messageTimestamp,
|
|
});
|
|
messageSendPromise = message.sendSyncMessageOnly(dataMessage, saveErrors);
|
|
} else {
|
|
const conversationType = conversation.get('type');
|
|
const sendOptions = await getSendOptions(conversation.attributes);
|
|
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
|
|
|
|
let innerPromise: Promise<CallbackResultType>;
|
|
if (conversationType === Message.GROUP) {
|
|
// Note: this will happen for all old jobs queued beore 5.32.x
|
|
if (isGroupV2(conversation.attributes) && !isNumber(revision)) {
|
|
log.error('No revision provided, but conversation is GroupV2');
|
|
}
|
|
|
|
const groupV2Info = conversation.getGroupV2Info({
|
|
members: recipientIdentifiersWithoutMe,
|
|
});
|
|
if (groupV2Info && isNumber(revision)) {
|
|
groupV2Info.revision = revision;
|
|
}
|
|
|
|
log.info('sending group message');
|
|
innerPromise = conversation.queueJob(
|
|
'conversationQueue/sendNormalMessage',
|
|
() =>
|
|
window.Signal.Util.sendToGroup({
|
|
contentHint: ContentHint.RESENDABLE,
|
|
groupSendOptions: {
|
|
attachments,
|
|
deletedForEveryoneTimestamp,
|
|
expireTimer,
|
|
groupV1: conversation.getGroupV1Info(
|
|
recipientIdentifiersWithoutMe
|
|
),
|
|
groupV2: groupV2Info,
|
|
messageText: body,
|
|
preview,
|
|
profileKey,
|
|
quote,
|
|
sticker,
|
|
timestamp: messageTimestamp,
|
|
mentions,
|
|
},
|
|
messageId,
|
|
sendOptions,
|
|
sendTarget: conversation.toSenderKeyTarget(),
|
|
sendType: 'message',
|
|
})
|
|
);
|
|
} else {
|
|
if (!isConversationAccepted(conversation.attributes)) {
|
|
log.info(
|
|
`conversation ${conversation.idForLogging()} is not accepted; refusing to send`
|
|
);
|
|
markMessageFailed(message, [
|
|
new Error('Message request was not accepted'),
|
|
]);
|
|
return;
|
|
}
|
|
if (isConversationUnregistered(conversation.attributes)) {
|
|
log.info(
|
|
`conversation ${conversation.idForLogging()} is unregistered; refusing to send`
|
|
);
|
|
markMessageFailed(message, [
|
|
new Error('Contact no longer has a Signal account'),
|
|
]);
|
|
return;
|
|
}
|
|
if (conversation.isBlocked()) {
|
|
log.info(
|
|
`conversation ${conversation.idForLogging()} is blocked; refusing to send`
|
|
);
|
|
markMessageFailed(message, [new Error('Contact is blocked')]);
|
|
return;
|
|
}
|
|
|
|
log.info('sending direct message');
|
|
innerPromise = window.textsecure.messaging.sendMessageToIdentifier({
|
|
identifier: recipientIdentifiersWithoutMe[0],
|
|
messageText: body,
|
|
attachments,
|
|
quote,
|
|
preview,
|
|
sticker,
|
|
reaction: undefined,
|
|
deletedForEveryoneTimestamp,
|
|
timestamp: messageTimestamp,
|
|
expireTimer,
|
|
contentHint: ContentHint.RESENDABLE,
|
|
groupId: undefined,
|
|
profileKey,
|
|
options: sendOptions,
|
|
});
|
|
}
|
|
|
|
messageSendPromise = message.send(
|
|
handleMessageSend(innerPromise, {
|
|
messageIds: [messageId],
|
|
sendType: 'message',
|
|
}),
|
|
saveErrors
|
|
);
|
|
|
|
// Because message.send swallows and processes errors, we'll await the inner promise
|
|
// to get the SendMessageProtoError, which gives us information upstream
|
|
// processors need to detect certain kinds of situations.
|
|
try {
|
|
await innerPromise;
|
|
} catch (error) {
|
|
if (error instanceof Error) {
|
|
originalError = error;
|
|
} else {
|
|
log.error(
|
|
`promiseForError threw something other than an error: ${Errors.toLogFormat(
|
|
error
|
|
)}`
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
await messageSendPromise;
|
|
|
|
if (
|
|
getLastChallengeError({
|
|
errors: messageSendErrors,
|
|
})
|
|
) {
|
|
log.info(
|
|
`message ${messageId} hit a spam challenge. Not retrying any more`
|
|
);
|
|
await message.saveErrors(messageSendErrors);
|
|
return;
|
|
}
|
|
|
|
const didFullySend =
|
|
!messageSendErrors.length || didSendToEveryone(message);
|
|
if (!didFullySend) {
|
|
throw new Error('message did not fully send');
|
|
}
|
|
} catch (thrownError: unknown) {
|
|
const errors = [thrownError, ...messageSendErrors];
|
|
await handleMultipleSendErrors({
|
|
errors,
|
|
isFinalAttempt,
|
|
log,
|
|
markFailed: () => markMessageFailed(message, messageSendErrors),
|
|
timeRemaining,
|
|
// In the case of a failed group send thrownError will not be SentMessageProtoError,
|
|
// but we should have been able to harvest the original error. In the Note to Self
|
|
// send case, thrownError will be the error we care about, and we won't have an
|
|
// originalError.
|
|
toThrow: originalError || thrownError,
|
|
});
|
|
}
|
|
}
|
|
|
|
function getMessageRecipients({
|
|
conversation,
|
|
message,
|
|
}: Readonly<{
|
|
conversation: ConversationModel;
|
|
message: MessageModel;
|
|
}>): {
|
|
allRecipientIdentifiers: Array<string>;
|
|
recipientIdentifiersWithoutMe: Array<string>;
|
|
untrustedConversationIds: Array<string>;
|
|
} {
|
|
const allRecipientIdentifiers: Array<string> = [];
|
|
const recipientIdentifiersWithoutMe: Array<string> = [];
|
|
const untrustedConversationIds: Array<string> = [];
|
|
|
|
const currentConversationRecipients =
|
|
conversation.getRecipientConversationIds();
|
|
|
|
Object.entries(message.get('sendStateByConversationId') || {}).forEach(
|
|
([recipientConversationId, sendState]) => {
|
|
if (isSent(sendState.status)) {
|
|
return;
|
|
}
|
|
|
|
const recipient = window.ConversationController.get(
|
|
recipientConversationId
|
|
);
|
|
if (!recipient) {
|
|
return;
|
|
}
|
|
|
|
const isRecipientMe = isMe(recipient.attributes);
|
|
|
|
if (
|
|
!currentConversationRecipients.has(recipientConversationId) &&
|
|
!isRecipientMe
|
|
) {
|
|
return;
|
|
}
|
|
|
|
if (recipient.isUntrusted()) {
|
|
untrustedConversationIds.push(recipientConversationId);
|
|
}
|
|
|
|
const recipientIdentifier = recipient.getSendTarget();
|
|
if (!recipientIdentifier) {
|
|
return;
|
|
}
|
|
|
|
allRecipientIdentifiers.push(recipientIdentifier);
|
|
if (!isRecipientMe) {
|
|
recipientIdentifiersWithoutMe.push(recipientIdentifier);
|
|
}
|
|
}
|
|
);
|
|
|
|
return {
|
|
allRecipientIdentifiers,
|
|
recipientIdentifiersWithoutMe,
|
|
untrustedConversationIds,
|
|
};
|
|
}
|
|
|
|
async function getMessageSendData({
|
|
log,
|
|
message,
|
|
}: Readonly<{
|
|
log: LoggerType;
|
|
message: MessageModel;
|
|
}>): Promise<{
|
|
attachments: Array<AttachmentType>;
|
|
body: undefined | string;
|
|
deletedForEveryoneTimestamp: undefined | number;
|
|
expireTimer: undefined | number;
|
|
mentions: undefined | BodyRangesType;
|
|
messageTimestamp: number;
|
|
preview: Array<LinkPreviewType>;
|
|
quote: WhatIsThis;
|
|
sticker: WhatIsThis;
|
|
}> {
|
|
const {
|
|
loadAttachmentData,
|
|
loadPreviewData,
|
|
loadQuoteData,
|
|
loadStickerData,
|
|
} = window.Signal.Migrations;
|
|
|
|
let messageTimestamp: number;
|
|
const sentAt = message.get('sent_at');
|
|
const timestamp = message.get('timestamp');
|
|
if (sentAt) {
|
|
messageTimestamp = sentAt;
|
|
} else if (timestamp) {
|
|
log.error('message lacked sent_at. Falling back to timestamp');
|
|
messageTimestamp = timestamp;
|
|
} else {
|
|
log.error(
|
|
'message lacked sent_at and timestamp. Falling back to current time'
|
|
);
|
|
messageTimestamp = Date.now();
|
|
}
|
|
|
|
const [attachmentsWithData, preview, quote, sticker] = await Promise.all([
|
|
// We don't update the caches here because (1) we expect the caches to be populated
|
|
// on initial send, so they should be there in the 99% case (2) if you're retrying
|
|
// a failed message across restarts, we don't touch the cache for simplicity. If
|
|
// sends are failing, let's not add the complication of a cache.
|
|
Promise.all((message.get('attachments') ?? []).map(loadAttachmentData)),
|
|
message.cachedOutgoingPreviewData ||
|
|
loadPreviewData(message.get('preview')),
|
|
message.cachedOutgoingQuoteData || loadQuoteData(message.get('quote')),
|
|
message.cachedOutgoingStickerData ||
|
|
loadStickerData(message.get('sticker')),
|
|
]);
|
|
|
|
const { body, attachments } = window.Whisper.Message.getLongMessageAttachment(
|
|
{
|
|
body: message.get('body'),
|
|
attachments: attachmentsWithData,
|
|
now: messageTimestamp,
|
|
}
|
|
);
|
|
|
|
return {
|
|
attachments,
|
|
body,
|
|
deletedForEveryoneTimestamp: message.get('deletedForEveryoneTimestamp'),
|
|
expireTimer: message.get('expireTimer'),
|
|
mentions: message.get('bodyRanges'),
|
|
messageTimestamp,
|
|
preview,
|
|
quote,
|
|
sticker,
|
|
};
|
|
}
|
|
|
|
async function markMessageFailed(
|
|
message: MessageModel,
|
|
errors: Array<Error>
|
|
): Promise<void> {
|
|
message.markFailed();
|
|
message.saveErrors(errors, { skipSave: true });
|
|
await window.Signal.Data.saveMessage(message.attributes, {
|
|
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
|
|
});
|
|
}
|
|
|
|
function didSendToEveryone(message: Readonly<MessageModel>): boolean {
|
|
const sendStateByConversationId =
|
|
message.get('sendStateByConversationId') || {};
|
|
return Object.values(sendStateByConversationId).every(sendState =>
|
|
isSent(sendState.status)
|
|
);
|
|
}
|