522 lines
16 KiB
TypeScript
522 lines
16 KiB
TypeScript
|
// Copyright 2021 Signal Messenger, LLC
|
||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||
|
|
||
|
/* eslint-disable class-methods-use-this */
|
||
|
|
||
|
import PQueue from 'p-queue';
|
||
|
import type { LoggerType } from '../logging/log';
|
||
|
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||
|
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
||
|
import { MessageModel, getMessageById } from '../models/messages';
|
||
|
import type { ConversationModel } from '../models/conversations';
|
||
|
import { ourProfileKeyService } from '../services/ourProfileKey';
|
||
|
import { strictAssert } from '../util/assert';
|
||
|
import { isRecord } from '../util/isRecord';
|
||
|
import * as durations from '../util/durations';
|
||
|
import { isMe } from '../util/whatTypeOfConversation';
|
||
|
import { getSendOptions } from '../util/getSendOptions';
|
||
|
import { SignalService as Proto } from '../protobuf';
|
||
|
import { handleMessageSend } from '../util/handleMessageSend';
|
||
|
import type { CallbackResultType } from '../textsecure/Types.d';
|
||
|
import { isSent } from '../messages/MessageSendState';
|
||
|
import { getLastChallengeError, isOutgoing } from '../state/selectors/message';
|
||
|
import { parseIntWithFallback } from '../util/parseIntWithFallback';
|
||
|
import type {
|
||
|
AttachmentType,
|
||
|
GroupV1InfoType,
|
||
|
GroupV2InfoType,
|
||
|
PreviewType,
|
||
|
} from '../textsecure/SendMessage';
|
||
|
import type { BodyRangesType } from '../types/Util';
|
||
|
import type { WhatIsThis } from '../window.d';
|
||
|
|
||
|
import type { ParsedJob } from './types';
|
||
|
import { JobQueue } from './JobQueue';
|
||
|
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||
|
import { Job } from './Job';
|
||
|
|
||
|
const {
|
||
|
loadAttachmentData,
|
||
|
loadPreviewData,
|
||
|
loadQuoteData,
|
||
|
loadStickerData,
|
||
|
} = window.Signal.Migrations;
|
||
|
const { Message } = window.Signal.Types;
|
||
|
|
||
|
const MAX_RETRY_TIME = durations.DAY;
|
||
|
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
|
||
|
|
||
|
type NormalMessageSendJobData = {
|
||
|
messageId: string;
|
||
|
conversationId: string;
|
||
|
};
|
||
|
|
||
|
export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData> {
|
||
|
private readonly queues = new Map<string, PQueue>();
|
||
|
|
||
|
/**
|
||
|
* Add a job (see `JobQueue.prototype.add`).
|
||
|
*
|
||
|
* You can override `insert` to change the way the job is added to the database. This is
|
||
|
* useful if you're trying to save a message and a job in the same database transaction.
|
||
|
*/
|
||
|
async add(
|
||
|
data: Readonly<NormalMessageSendJobData>,
|
||
|
insert?: (job: ParsedJob<NormalMessageSendJobData>) => Promise<void>
|
||
|
): Promise<Job<NormalMessageSendJobData>> {
|
||
|
if (!insert) {
|
||
|
return super.add(data);
|
||
|
}
|
||
|
|
||
|
this.throwIfNotStarted();
|
||
|
|
||
|
const job = this.createJob(data);
|
||
|
await insert(job);
|
||
|
await jobQueueDatabaseStore.insert(job, {
|
||
|
shouldInsertIntoDatabase: false,
|
||
|
});
|
||
|
return job;
|
||
|
}
|
||
|
|
||
|
protected parseData(data: unknown): NormalMessageSendJobData {
|
||
|
// Because we do this so often and Zod is a bit slower, we do "manual" parsing here.
|
||
|
strictAssert(isRecord(data), 'Job data is not an object');
|
||
|
const { messageId, conversationId } = data;
|
||
|
strictAssert(
|
||
|
typeof messageId === 'string',
|
||
|
'Job data had a non-string message ID'
|
||
|
);
|
||
|
strictAssert(
|
||
|
typeof conversationId === 'string',
|
||
|
'Job data had a non-string conversation ID'
|
||
|
);
|
||
|
return { messageId, conversationId };
|
||
|
}
|
||
|
|
||
|
private getQueue(queueKey: string): PQueue {
|
||
|
const existingQueue = this.queues.get(queueKey);
|
||
|
if (existingQueue) {
|
||
|
return existingQueue;
|
||
|
}
|
||
|
|
||
|
const newQueue = new PQueue({ concurrency: 1 });
|
||
|
newQueue.once('idle', () => {
|
||
|
this.queues.delete(queueKey);
|
||
|
});
|
||
|
|
||
|
this.queues.set(queueKey, newQueue);
|
||
|
return newQueue;
|
||
|
}
|
||
|
|
||
|
private enqueue(queueKey: string, fn: () => Promise<void>): Promise<void> {
|
||
|
return this.getQueue(queueKey).add(fn);
|
||
|
}
|
||
|
|
||
|
protected async run(
|
||
|
{
|
||
|
data,
|
||
|
timestamp,
|
||
|
}: Readonly<{ data: NormalMessageSendJobData; timestamp: number }>,
|
||
|
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||
|
): Promise<void> {
|
||
|
const { messageId, conversationId } = data;
|
||
|
|
||
|
await this.enqueue(conversationId, async () => {
|
||
|
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
|
||
|
|
||
|
// We don't immediately use this value because we may want to mark the message
|
||
|
// failed before doing so.
|
||
|
const shouldContinue = await commonShouldJobContinue({
|
||
|
attempt,
|
||
|
log,
|
||
|
maxRetryTime: MAX_RETRY_TIME,
|
||
|
timestamp,
|
||
|
});
|
||
|
|
||
|
await window.ConversationController.loadPromise();
|
||
|
|
||
|
const message = await getMessageById(messageId);
|
||
|
if (!message) {
|
||
|
log.info(
|
||
|
`message ${messageId} was not found, maybe because it was deleted. Giving up on sending it`
|
||
|
);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (!isOutgoing(message.attributes)) {
|
||
|
log.error(
|
||
|
`message ${messageId} was not an outgoing message to begin with. This is probably a bogus job. Giving up on sending it`
|
||
|
);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (message.isErased() || message.get('deletedForEveryone')) {
|
||
|
log.info(`message ${messageId} was erased. Giving up on sending it`);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
let messageSendErrors: Array<Error> = [];
|
||
|
|
||
|
// We don't want to save errors on messages unless we're giving up. If it's our
|
||
|
// final attempt, we know upfront that we want to give up. However, we might also
|
||
|
// want to give up if (1) we get a 508 from the server, asking us to please stop
|
||
|
// (2) we get a 428 from the server, flagging the message for spam (3) some other
|
||
|
// reason not known at the time of this writing.
|
||
|
//
|
||
|
// This awkward callback lets us hold onto errors we might want to save, so we can
|
||
|
// decide whether to save them later on.
|
||
|
const saveErrors = isFinalAttempt
|
||
|
? undefined
|
||
|
: (errors: Array<Error>) => {
|
||
|
messageSendErrors = errors;
|
||
|
};
|
||
|
|
||
|
if (!shouldContinue) {
|
||
|
log.info(
|
||
|
`message ${messageId} ran out of time. Giving up on sending it`
|
||
|
);
|
||
|
await markMessageFailed(message, messageSendErrors);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
const conversation = message.getConversation();
|
||
|
if (!conversation) {
|
||
|
throw new Error(
|
||
|
`could not find conversation for message with ID ${messageId}`
|
||
|
);
|
||
|
}
|
||
|
|
||
|
const {
|
||
|
allRecipientIdentifiers,
|
||
|
recipientIdentifiersWithoutMe,
|
||
|
untrustedConversationIds,
|
||
|
} = getMessageRecipients({
|
||
|
message,
|
||
|
conversation,
|
||
|
});
|
||
|
|
||
|
if (untrustedConversationIds.length) {
|
||
|
log.info(
|
||
|
`message ${messageId} sending blocked because ${untrustedConversationIds.length} conversation(s) were untrusted. Giving up on the job, but it may be reborn later`
|
||
|
);
|
||
|
window.reduxActions.conversations.messageStoppedByMissingVerification(
|
||
|
messageId,
|
||
|
untrustedConversationIds
|
||
|
);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (!allRecipientIdentifiers.length) {
|
||
|
log.warn(
|
||
|
`trying to send message ${messageId} but it looks like it was already sent to everyone. This is unexpected, but we're giving up`
|
||
|
);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const {
|
||
|
attachments,
|
||
|
body,
|
||
|
deletedForEveryoneTimestamp,
|
||
|
expireTimer,
|
||
|
mentions,
|
||
|
messageTimestamp,
|
||
|
preview,
|
||
|
profileKey,
|
||
|
quote,
|
||
|
sticker,
|
||
|
} = await getMessageSendData({ conversation, message });
|
||
|
|
||
|
let messageSendPromise: Promise<unknown>;
|
||
|
|
||
|
if (recipientIdentifiersWithoutMe.length === 0) {
|
||
|
log.info('sending sync message only');
|
||
|
const dataMessage = await window.textsecure.messaging.getDataMessage({
|
||
|
attachments,
|
||
|
body,
|
||
|
deletedForEveryoneTimestamp,
|
||
|
expireTimer,
|
||
|
preview,
|
||
|
profileKey,
|
||
|
quote,
|
||
|
recipients: allRecipientIdentifiers,
|
||
|
sticker,
|
||
|
timestamp: messageTimestamp,
|
||
|
});
|
||
|
messageSendPromise = message.sendSyncMessageOnly(
|
||
|
dataMessage,
|
||
|
saveErrors
|
||
|
);
|
||
|
} else {
|
||
|
const conversationType = conversation.get('type');
|
||
|
const sendOptions = await getSendOptions(conversation.attributes);
|
||
|
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
|
||
|
|
||
|
let innerPromise: Promise<CallbackResultType>;
|
||
|
if (conversationType === Message.GROUP) {
|
||
|
log.info('sending group message');
|
||
|
innerPromise = window.Signal.Util.sendToGroup({
|
||
|
groupSendOptions: {
|
||
|
attachments,
|
||
|
deletedForEveryoneTimestamp,
|
||
|
expireTimer,
|
||
|
groupV1: updateRecipients(
|
||
|
conversation.getGroupV1Info(),
|
||
|
recipientIdentifiersWithoutMe
|
||
|
),
|
||
|
groupV2: updateRecipients(
|
||
|
conversation.getGroupV2Info(),
|
||
|
recipientIdentifiersWithoutMe
|
||
|
),
|
||
|
messageText: body,
|
||
|
preview,
|
||
|
profileKey,
|
||
|
quote,
|
||
|
sticker,
|
||
|
timestamp: messageTimestamp,
|
||
|
mentions,
|
||
|
},
|
||
|
conversation,
|
||
|
contentHint: ContentHint.RESENDABLE,
|
||
|
messageId,
|
||
|
sendOptions,
|
||
|
sendType: 'message',
|
||
|
});
|
||
|
} else {
|
||
|
log.info('sending direct message');
|
||
|
innerPromise = window.textsecure.messaging.sendMessageToIdentifier({
|
||
|
identifier: recipientIdentifiersWithoutMe[0],
|
||
|
messageText: body,
|
||
|
attachments,
|
||
|
quote,
|
||
|
preview,
|
||
|
sticker,
|
||
|
reaction: null,
|
||
|
deletedForEveryoneTimestamp,
|
||
|
timestamp: messageTimestamp,
|
||
|
expireTimer,
|
||
|
contentHint: ContentHint.RESENDABLE,
|
||
|
groupId: undefined,
|
||
|
profileKey,
|
||
|
options: sendOptions,
|
||
|
});
|
||
|
}
|
||
|
|
||
|
messageSendPromise = message.send(
|
||
|
handleMessageSend(innerPromise, {
|
||
|
messageIds: [messageId],
|
||
|
sendType: 'message',
|
||
|
}),
|
||
|
saveErrors
|
||
|
);
|
||
|
}
|
||
|
|
||
|
await messageSendPromise;
|
||
|
|
||
|
if (
|
||
|
getLastChallengeError({
|
||
|
errors: messageSendErrors,
|
||
|
})
|
||
|
) {
|
||
|
log.info(
|
||
|
`message ${messageId} hit a spam challenge. Not retrying any more`
|
||
|
);
|
||
|
await message.saveErrors(messageSendErrors);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const didFullySend =
|
||
|
!messageSendErrors.length || didSendToEveryone(message);
|
||
|
if (!didFullySend) {
|
||
|
throw new Error('message did not fully send');
|
||
|
}
|
||
|
} catch (err: unknown) {
|
||
|
const serverAskedUsToStop: boolean = messageSendErrors.some(
|
||
|
(messageSendError: unknown) =>
|
||
|
messageSendError instanceof Error &&
|
||
|
parseIntWithFallback(messageSendError.code, -1) === 508
|
||
|
);
|
||
|
|
||
|
if (isFinalAttempt || serverAskedUsToStop) {
|
||
|
await markMessageFailed(message, messageSendErrors);
|
||
|
}
|
||
|
|
||
|
if (serverAskedUsToStop) {
|
||
|
log.info('server responded with 508. Giving up on this job');
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
throw err;
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
export const normalMessageSendJobQueue = new NormalMessageSendJobQueue({
|
||
|
store: jobQueueDatabaseStore,
|
||
|
queueType: 'normal message send',
|
||
|
maxAttempts: MAX_ATTEMPTS,
|
||
|
});
|
||
|
|
||
|
function getMessageRecipients({
|
||
|
conversation,
|
||
|
message,
|
||
|
}: Readonly<{
|
||
|
conversation: ConversationModel;
|
||
|
message: MessageModel;
|
||
|
}>): {
|
||
|
allRecipientIdentifiers: Array<string>;
|
||
|
recipientIdentifiersWithoutMe: Array<string>;
|
||
|
untrustedConversationIds: Array<string>;
|
||
|
} {
|
||
|
const allRecipientIdentifiers: Array<string> = [];
|
||
|
const recipientIdentifiersWithoutMe: Array<string> = [];
|
||
|
const untrustedConversationIds: Array<string> = [];
|
||
|
|
||
|
const currentConversationRecipients = conversation.getRecipientConversationIds();
|
||
|
|
||
|
Object.entries(message.get('sendStateByConversationId') || {}).forEach(
|
||
|
([recipientConversationId, sendState]) => {
|
||
|
if (isSent(sendState.status)) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const recipient = window.ConversationController.get(
|
||
|
recipientConversationId
|
||
|
);
|
||
|
if (!recipient) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const isRecipientMe = isMe(recipient.attributes);
|
||
|
|
||
|
if (
|
||
|
!currentConversationRecipients.has(recipientConversationId) &&
|
||
|
!isRecipientMe
|
||
|
) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (recipient.isUntrusted()) {
|
||
|
untrustedConversationIds.push(recipientConversationId);
|
||
|
}
|
||
|
|
||
|
const recipientIdentifier = recipient.getSendTarget();
|
||
|
if (!recipientIdentifier) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
allRecipientIdentifiers.push(recipientIdentifier);
|
||
|
if (!isRecipientMe) {
|
||
|
recipientIdentifiersWithoutMe.push(recipientIdentifier);
|
||
|
}
|
||
|
}
|
||
|
);
|
||
|
|
||
|
return {
|
||
|
allRecipientIdentifiers,
|
||
|
recipientIdentifiersWithoutMe,
|
||
|
untrustedConversationIds,
|
||
|
};
|
||
|
}
|
||
|
|
||
|
async function getMessageSendData({
|
||
|
conversation,
|
||
|
message,
|
||
|
}: Readonly<{
|
||
|
conversation: ConversationModel;
|
||
|
message: MessageModel;
|
||
|
}>): Promise<{
|
||
|
attachments: Array<AttachmentType>;
|
||
|
body: undefined | string;
|
||
|
deletedForEveryoneTimestamp: undefined | number;
|
||
|
expireTimer: undefined | number;
|
||
|
mentions: undefined | BodyRangesType;
|
||
|
messageTimestamp: number;
|
||
|
preview: Array<PreviewType>;
|
||
|
profileKey: undefined | ArrayBuffer;
|
||
|
quote: WhatIsThis;
|
||
|
sticker: WhatIsThis;
|
||
|
}> {
|
||
|
const messageTimestamp =
|
||
|
message.get('sent_at') || message.get('timestamp') || Date.now();
|
||
|
|
||
|
const [
|
||
|
attachmentsWithData,
|
||
|
preview,
|
||
|
quote,
|
||
|
sticker,
|
||
|
profileKey,
|
||
|
] = await Promise.all([
|
||
|
// We don't update the caches here because (1) we expect the caches to be populated on
|
||
|
// initial send, so they should be there in the 99% case (2) if you're retrying a
|
||
|
// failed message across restarts, we don't touch the cache for simplicity. If sends
|
||
|
// are failing, let's not add the complication of a cache.
|
||
|
Promise.all((message.get('attachments') ?? []).map(loadAttachmentData)),
|
||
|
message.cachedOutgoingPreviewData ||
|
||
|
loadPreviewData(message.get('preview')),
|
||
|
message.cachedOutgoingQuoteData || loadQuoteData(message.get('quote')),
|
||
|
message.cachedOutgoingStickerData ||
|
||
|
loadStickerData(message.get('sticker')),
|
||
|
conversation.get('profileSharing') ? ourProfileKeyService.get() : undefined,
|
||
|
]);
|
||
|
|
||
|
const { body, attachments } = window.Whisper.Message.getLongMessageAttachment(
|
||
|
{
|
||
|
body: message.get('body'),
|
||
|
attachments: attachmentsWithData,
|
||
|
now: messageTimestamp,
|
||
|
}
|
||
|
);
|
||
|
|
||
|
return {
|
||
|
attachments,
|
||
|
body,
|
||
|
deletedForEveryoneTimestamp: message.get('deletedForEveryoneTimestamp'),
|
||
|
expireTimer: conversation.get('expireTimer'),
|
||
|
mentions: message.get('bodyRanges'),
|
||
|
messageTimestamp,
|
||
|
preview,
|
||
|
profileKey,
|
||
|
quote,
|
||
|
sticker,
|
||
|
};
|
||
|
}
|
||
|
|
||
|
async function markMessageFailed(
|
||
|
message: MessageModel,
|
||
|
errors: Array<Error>
|
||
|
): Promise<void> {
|
||
|
message.markFailed();
|
||
|
message.saveErrors(errors, { skipSave: true });
|
||
|
await window.Signal.Data.saveMessage(message.attributes);
|
||
|
}
|
||
|
|
||
|
function didSendToEveryone(message: Readonly<MessageModel>): boolean {
|
||
|
const sendStateByConversationId =
|
||
|
message.get('sendStateByConversationId') || {};
|
||
|
return Object.values(sendStateByConversationId).every(sendState =>
|
||
|
isSent(sendState.status)
|
||
|
);
|
||
|
}
|
||
|
|
||
|
function updateRecipients(
|
||
|
groupInfo: undefined | GroupV1InfoType,
|
||
|
recipients: Array<string>
|
||
|
): undefined | GroupV1InfoType;
|
||
|
function updateRecipients(
|
||
|
groupInfo: undefined | GroupV2InfoType,
|
||
|
recipients: Array<string>
|
||
|
): undefined | GroupV2InfoType;
|
||
|
function updateRecipients(
|
||
|
groupInfo: undefined | GroupV1InfoType | GroupV2InfoType,
|
||
|
recipients: Array<string>
|
||
|
): undefined | GroupV1InfoType | GroupV2InfoType {
|
||
|
return (
|
||
|
groupInfo && {
|
||
|
...groupInfo,
|
||
|
members: recipients,
|
||
|
}
|
||
|
);
|
||
|
}
|