Add all sends needed for retry to conversationJobQueue

This commit is contained in:
Scott Nonnenberg 2023-03-14 13:25:05 -07:00 committed by GitHub
parent 0d37396339
commit 5949cc11b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 687 additions and 197 deletions

View file

@ -54,10 +54,9 @@ import type { Address } from './types/Address';
import type { QualifiedAddressStringType } from './types/QualifiedAddress'; import type { QualifiedAddressStringType } from './types/QualifiedAddress';
import { QualifiedAddress } from './types/QualifiedAddress'; import { QualifiedAddress } from './types/QualifiedAddress';
import * as log from './logging/log'; import * as log from './logging/log';
import { singleProtoJobQueue } from './jobs/singleProtoJobQueue';
import * as Errors from './types/errors'; import * as Errors from './types/errors';
import MessageSender from './textsecure/SendMessage';
import { MINUTE } from './util/durations'; import { MINUTE } from './util/durations';
import { conversationJobQueue } from './jobs/conversationJobQueue';
const TIMESTAMP_THRESHOLD = 5 * 1000; // 5 seconds const TIMESTAMP_THRESHOLD = 5 * 1000; // 5 seconds
@ -1437,13 +1436,13 @@ export class SignalProtocolStore extends EventEmitter {
await this.archiveSession(qualifiedAddress); await this.archiveSession(qualifiedAddress);
// Enqueue a null message with newly-created session // Enqueue a null message with newly-created session
await singleProtoJobQueue.add( await conversationJobQueue.add({
MessageSender.getNullMessage({ type: 'NullMessage',
uuid: uuid.toString(), conversationId: conversation.id,
}) idForTracking: id,
); });
} catch (error) { } catch (error) {
// If we failed to do the session reset, then we'll allow another attempt sooner // If we failed to queue the session reset, then we'll allow another attempt sooner
// than one hour from now. // than one hour from now.
delete sessionResets[id]; delete sessionResets[id];
await window.storage.put('sessionResets', sessionResets); await window.storage.put('sessionResets', sessionResets);

View file

@ -39,6 +39,10 @@ import type { UUIDStringType } from '../types/UUID';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
import { sleeper } from '../util/sleeper'; import { sleeper } from '../util/sleeper';
import { receiptSchema, ReceiptType } from '../types/Receipt'; import { receiptSchema, ReceiptType } from '../types/Receipt';
import { sendResendRequest } from './helpers/sendResendRequest';
import { sendNullMessage } from './helpers/sendNullMessage';
import { sendSenderKeyDistribution } from './helpers/sendSenderKeyDistribution';
import { sendSavedProto } from './helpers/sendSavedProto';
// Note: generally, we only want to add to this list. If you do need to change one of // Note: generally, we only want to add to this list. If you do need to change one of
// these values, you'll likely need to write a database migration. // these values, you'll likely need to write a database migration.
@ -48,8 +52,12 @@ export const conversationQueueJobEnum = z.enum([
'DirectExpirationTimerUpdate', 'DirectExpirationTimerUpdate',
'GroupUpdate', 'GroupUpdate',
'NormalMessage', 'NormalMessage',
'NullMessage',
'ProfileKey', 'ProfileKey',
'Reaction', 'Reaction',
'ResendRequest',
'SavedProto',
'SenderKeyDistribution',
'Story', 'Story',
'Receipts', 'Receipts',
]); ]);
@ -115,6 +123,13 @@ export type NormalMessageSendJobData = z.infer<
typeof normalMessageSendJobDataSchema typeof normalMessageSendJobDataSchema
>; >;
const nullMessageJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.NullMessage),
conversationId: z.string(),
idForTracking: z.string().optional(),
});
export type NullMessageJobData = z.infer<typeof nullMessageJobDataSchema>;
const profileKeyJobDataSchema = z.object({ const profileKeyJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.ProfileKey), type: z.literal(conversationQueueJobEnum.enum.ProfileKey),
conversationId: z.string(), conversationId: z.string(),
@ -132,6 +147,41 @@ const reactionJobDataSchema = z.object({
}); });
export type ReactionJobData = z.infer<typeof reactionJobDataSchema>; export type ReactionJobData = z.infer<typeof reactionJobDataSchema>;
const resendRequestJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.ResendRequest),
conversationId: z.string(),
contentHint: z.number().optional(),
groupId: z.string().optional(),
plaintext: z.string(),
receivedAtCounter: z.number(),
receivedAtDate: z.number(),
senderUuid: z.string(),
senderDevice: z.number(),
timestamp: z.number(),
});
export type ResendRequestJobData = z.infer<typeof resendRequestJobDataSchema>;
const savedProtoJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.SavedProto),
conversationId: z.string(),
contentHint: z.number(),
groupId: z.string().optional(),
protoBase64: z.string(),
story: z.boolean(),
timestamp: z.number(),
urgent: z.boolean(),
});
export type SavedProtoJobData = z.infer<typeof savedProtoJobDataSchema>;
const senderKeyDistributionJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.SenderKeyDistribution),
conversationId: z.string(),
groupId: z.string(),
});
export type SenderKeyDistributionJobData = z.infer<
typeof senderKeyDistributionJobDataSchema
>;
const storyJobDataSchema = z.object({ const storyJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.Story), type: z.literal(conversationQueueJobEnum.enum.Story),
conversationId: z.string(), conversationId: z.string(),
@ -156,8 +206,12 @@ export const conversationQueueJobDataSchema = z.union([
expirationTimerUpdateJobDataSchema, expirationTimerUpdateJobDataSchema,
groupUpdateJobDataSchema, groupUpdateJobDataSchema,
normalMessageSendJobDataSchema, normalMessageSendJobDataSchema,
nullMessageJobDataSchema,
profileKeyJobDataSchema, profileKeyJobDataSchema,
reactionJobDataSchema, reactionJobDataSchema,
resendRequestJobDataSchema,
savedProtoJobDataSchema,
senderKeyDistributionJobDataSchema,
storyJobDataSchema, storyJobDataSchema,
receiptsJobDataSchema, receiptsJobDataSchema,
]); ]);
@ -408,12 +462,24 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
case jobSet.NormalMessage: case jobSet.NormalMessage:
await sendNormalMessage(conversation, jobBundle, data); await sendNormalMessage(conversation, jobBundle, data);
break; break;
case jobSet.NullMessage:
await sendNullMessage(conversation, jobBundle, data);
break;
case jobSet.ProfileKey: case jobSet.ProfileKey:
await sendProfileKey(conversation, jobBundle, data); await sendProfileKey(conversation, jobBundle, data);
break; break;
case jobSet.Reaction: case jobSet.Reaction:
await sendReaction(conversation, jobBundle, data); await sendReaction(conversation, jobBundle, data);
break; break;
case jobSet.ResendRequest:
await sendResendRequest(conversation, jobBundle, data);
break;
case jobSet.SavedProto:
await sendSavedProto(conversation, jobBundle, data);
break;
case jobSet.SenderKeyDistribution:
await sendSenderKeyDistribution(conversation, jobBundle, data);
break;
case jobSet.Story: case jobSet.Story:
await sendStory(conversation, jobBundle, data); await sendStory(conversation, jobBundle, data);
break; break;

View file

@ -0,0 +1,124 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { handleMessageSend } from '../../util/handleMessageSend';
import { getSendOptions } from '../../util/getSendOptions';
import { isDirectConversation } from '../../util/whatTypeOfConversation';
import { SignalService as Proto } from '../../protobuf';
import {
handleMultipleSendErrors,
maybeExpandErrors,
} from './handleMultipleSendErrors';
import type { ConversationModel } from '../../models/conversations';
import type {
ConversationQueueJobBundle,
NullMessageJobData,
} from '../conversationJobQueue';
import type { SessionResetsType } from '../../textsecure/Types.d';
import { isConversationUnregistered } from '../../util/isConversationUnregistered';
import {
OutgoingIdentityKeyError,
UnregisteredUserError,
} from '../../textsecure/Errors';
import MessageSender from '../../textsecure/SendMessage';
async function clearResetsTracking(idForTracking: string | undefined) {
if (!idForTracking) {
return;
}
const sessionResets = window.storage.get(
'sessionResets',
<SessionResetsType>{}
);
delete sessionResets[idForTracking];
await window.storage.put('sessionResets', sessionResets);
}
export async function sendNullMessage(
conversation: ConversationModel,
{
isFinalAttempt,
messaging,
shouldContinue,
timestamp,
timeRemaining,
log,
}: ConversationQueueJobBundle,
data: NullMessageJobData
): Promise<void> {
const { idForTracking } = data;
if (!shouldContinue) {
log.info('Ran out of time. Giving up on sending null message');
await clearResetsTracking(idForTracking);
return;
}
log.info(
`starting null message send to ${conversation.idForLogging()} with timestamp ${timestamp}`
);
const sendOptions = await getSendOptions(conversation.attributes);
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
const contentHint = ContentHint.RESENDABLE;
const sendType = 'nullMessage';
if (!isDirectConversation(conversation.attributes)) {
log.info('Failing attempt to send null message to group');
return;
}
// Note: we will send to blocked users, to those still in message request state, etc.
// Any needed blocking should still apply once the decryption error is fixed.
if (isConversationUnregistered(conversation.attributes)) {
await clearResetsTracking(idForTracking);
log.info(
`conversation ${conversation.idForLogging()} is unregistered; refusing to send null message`
);
return;
}
try {
const proto = MessageSender.getNullMessage();
await handleMessageSend(
messaging.sendIndividualProto({
contentHint,
identifier: conversation.getSendTarget(),
options: sendOptions,
proto,
timestamp,
urgent: false,
}),
{
messageIds: [],
sendType,
}
);
} catch (error: unknown) {
if (
error instanceof OutgoingIdentityKeyError ||
error instanceof UnregisteredUserError
) {
log.info(
'Send failure was OutgoingIdentityKeyError or UnregisteredUserError. Cancelling job.'
);
return;
}
if (isFinalAttempt) {
await clearResetsTracking(idForTracking);
}
await handleMultipleSendErrors({
errors: maybeExpandErrors(error),
isFinalAttempt,
log,
timeRemaining,
toThrow: error,
});
}
}

View file

@ -0,0 +1,187 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { PlaintextContent } from '@signalapp/libsignal-client';
import { handleMessageSend } from '../../util/handleMessageSend';
import { getSendOptions } from '../../util/getSendOptions';
import { isDirectConversation } from '../../util/whatTypeOfConversation';
import { SignalService as Proto } from '../../protobuf';
import {
handleMultipleSendErrors,
maybeExpandErrors,
} from './handleMultipleSendErrors';
import type { ConversationModel } from '../../models/conversations';
import type {
ConversationQueueJobBundle,
ResendRequestJobData,
} from '../conversationJobQueue';
import { isConversationUnregistered } from '../../util/isConversationUnregistered';
import {
OutgoingIdentityKeyError,
UnregisteredUserError,
} from '../../textsecure/Errors';
import { drop } from '../../util/drop';
import { strictAssert } from '../../util/assert';
import type { DecryptionErrorEventData } from '../../textsecure/messageReceiverEvents';
import type { LoggerType } from '../../types/Logging';
import { startAutomaticSessionReset } from '../../util/handleRetry';
function failoverToLocalReset(
logger: LoggerType,
options: Pick<
DecryptionErrorEventData,
'senderUuid' | 'senderDevice' | 'timestamp'
>
) {
logger.error('Failing over to local reset');
startAutomaticSessionReset(options);
}
export async function sendResendRequest(
conversation: ConversationModel,
{
isFinalAttempt,
messaging,
shouldContinue,
timestamp,
timeRemaining,
log,
}: ConversationQueueJobBundle,
data: ResendRequestJobData
): Promise<void> {
const {
contentHint,
groupId,
plaintext: plaintextBase64,
receivedAtCounter,
receivedAtDate,
} = data;
if (!shouldContinue) {
log.info('Ran out of time. Giving up on sending resend request');
failoverToLocalReset(log, data);
return;
}
log.info(
`starting resend request send to ${conversation.idForLogging()} with timestamp ${timestamp}`
);
if (!isDirectConversation(conversation.attributes)) {
log.error('conversation is not direct, cancelling job.');
return;
}
if (isConversationUnregistered(conversation.attributes)) {
log.error('conversation is unregistered, cancelling job.');
failoverToLocalReset(log, data);
return;
}
// Note: we will send to blocked users, to those still in message request state, etc.
// Any needed blocking should still apply once the decryption error is fixed.
const senderUuid = conversation.get('uuid');
if (!senderUuid) {
log.error('conversation was missing a uuid, cancelling job.');
failoverToLocalReset(log, data);
return;
}
const plaintext = PlaintextContent.deserialize(
Buffer.from(plaintextBase64, 'base64')
);
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
// We run this job on the queue for the individual sender we want the resend from, but
// the original message might have been sent in a group - and that's where we'll put
// the error or placeholder.
const groupConversationId = window.ConversationController.get(groupId)?.id;
const targetConversationId = groupConversationId ?? conversation.get('id');
try {
const options = await getSendOptions(conversation.attributes);
await handleMessageSend(
messaging.sendMessageProtoAndWait({
timestamp,
recipients: [senderUuid],
proto: plaintext,
contentHint: ContentHint.DEFAULT,
groupId,
options,
urgent: false,
}),
{ messageIds: [], sendType: 'retryRequest' }
);
// Now that we've successfully sent, represent this to the user. Three options:
// 1. We believe that it could be successfully re-sent, so we'll add a placeholder.
if (contentHint === ContentHint.RESENDABLE) {
const { retryPlaceholders } = window.Signal.Services;
strictAssert(retryPlaceholders, 'sendResendRequest: adding placeholder');
log.info('contentHint is RESENDABLE, adding placeholder');
const state = window.reduxStore.getState();
const selectedId = state.conversations.selectedConversationId;
const wasOpened = selectedId === targetConversationId;
await retryPlaceholders.add({
conversationId: targetConversationId,
receivedAt: receivedAtDate,
receivedAtCounter,
sentAt: timestamp,
senderUuid,
wasOpened,
});
return;
}
// 2. This message cannot be resent. We'll show no error and trust the other side to
// reset their session.
if (contentHint === ContentHint.IMPLICIT) {
log.info('contentHint is IMPLICIT, adding no timeline item.');
return;
}
// 3. We don't know what kind of message this was, and add an eror
log.warn('No contentHint, adding error in conversation immediately');
drop(
conversation.queueJob('addDeliveryIssue', async () => {
await conversation.addDeliveryIssue({
receivedAt: receivedAtDate,
receivedAtCounter,
senderUuid,
sentAt: timestamp,
});
})
);
} catch (error: unknown) {
if (
error instanceof OutgoingIdentityKeyError ||
error instanceof UnregisteredUserError
) {
log.info(
'Group send failures were all OutgoingIdentityKeyError or UnregisteredUserError. Cancelling job.'
);
return;
}
if (isFinalAttempt) {
failoverToLocalReset(log, data);
}
await handleMultipleSendErrors({
errors: maybeExpandErrors(error),
isFinalAttempt,
log,
timeRemaining,
toThrow: error,
});
}
}

View file

@ -0,0 +1,116 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { handleMessageSend } from '../../util/handleMessageSend';
import { getSendOptions } from '../../util/getSendOptions';
import { isDirectConversation } from '../../util/whatTypeOfConversation';
import { SignalService as Proto } from '../../protobuf';
import {
handleMultipleSendErrors,
maybeExpandErrors,
} from './handleMultipleSendErrors';
import type { ConversationModel } from '../../models/conversations';
import type {
ConversationQueueJobBundle,
SavedProtoJobData,
} from '../conversationJobQueue';
import { isConversationUnregistered } from '../../util/isConversationUnregistered';
import {
OutgoingIdentityKeyError,
UnregisteredUserError,
} from '../../textsecure/Errors';
export async function sendSavedProto(
conversation: ConversationModel,
{
isFinalAttempt,
messaging,
shouldContinue,
timestamp,
timeRemaining,
log,
}: ConversationQueueJobBundle,
data: SavedProtoJobData
): Promise<void> {
if (!shouldContinue) {
log.info('Ran out of time. Giving up on sending null message');
return;
}
log.info(
`starting saved proto send to ${conversation.idForLogging()} with timestamp ${timestamp}`
);
if (!isDirectConversation(conversation.attributes)) {
log.info('Failing attempt to send null message to group');
return;
}
// Note: we will send to blocked users, to those still in message request state, etc.
// Any needed blocking should still apply once the decryption error is fixed.
if (isConversationUnregistered(conversation.attributes)) {
log.info(
`conversation ${conversation.idForLogging()} is unregistered; refusing to send null message`
);
return;
}
const uuid = conversation.get('uuid');
if (!uuid) {
log.info(
`conversation ${conversation.idForLogging()} was missing uuid, cancelling job.`
);
return;
}
const {
protoBase64,
groupId,
contentHint,
story,
timestamp: originalTimestamp,
urgent,
} = data;
const sendOptions = await getSendOptions(conversation.attributes, { story });
const sendType = 'resendFromLog';
try {
const proto = Proto.Content.decode(Buffer.from(protoBase64, 'base64'));
await handleMessageSend(
messaging.sendMessageProtoAndWait({
contentHint,
groupId,
options: sendOptions,
proto,
recipients: [uuid],
timestamp: originalTimestamp,
urgent,
story,
}),
{
messageIds: [],
sendType,
}
);
} catch (error: unknown) {
if (
error instanceof OutgoingIdentityKeyError ||
error instanceof UnregisteredUserError
) {
log.info(
'Send failure was OutgoingIdentityKeyError or UnregisteredUserError. Cancelling job.'
);
return;
}
await handleMultipleSendErrors({
errors: maybeExpandErrors(error),
isFinalAttempt,
log,
timeRemaining,
toThrow: error,
});
}
}

View file

@ -0,0 +1,120 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { handleMessageSend } from '../../util/handleMessageSend';
import { getSendOptions } from '../../util/getSendOptions';
import { isDirectConversation } from '../../util/whatTypeOfConversation';
import {
handleMultipleSendErrors,
maybeExpandErrors,
} from './handleMultipleSendErrors';
import type { ConversationModel } from '../../models/conversations';
import type {
ConversationQueueJobBundle,
SenderKeyDistributionJobData,
} from '../conversationJobQueue';
import { isConversationUnregistered } from '../../util/isConversationUnregistered';
import {
NoSenderKeyError,
OutgoingIdentityKeyError,
UnregisteredUserError,
} from '../../textsecure/Errors';
import { shouldSendToConversation } from './shouldSendToConversation';
// Note: in regular scenarios, sender keys are sent as part of a group send. This job type
// is only used in decryption error recovery scenarios.
export async function sendSenderKeyDistribution(
conversation: ConversationModel,
{
isFinalAttempt,
messaging,
shouldContinue,
timestamp,
timeRemaining,
log,
}: ConversationQueueJobBundle,
data: SenderKeyDistributionJobData
): Promise<void> {
if (!shouldContinue) {
log.info(
'Ran out of time. Giving up on sending sender key distribution message'
);
return;
}
log.info(
`starting sender key distribution message send to ${conversation.idForLogging()} with timestamp ${timestamp}`
);
if (!isDirectConversation(conversation.attributes)) {
log.info('Failing attempt to send null message to group');
return;
}
if (!shouldSendToConversation(conversation, log)) {
return;
}
if (isConversationUnregistered(conversation.attributes)) {
log.info(
`conversation ${conversation.idForLogging()} is unregistered; refusing to send sender key distribution message`
);
return;
}
const sendOptions = await getSendOptions(conversation.attributes);
const { groupId } = data;
const group = window.ConversationController.get(groupId);
const distributionId = group?.get('senderKeyInfo')?.distributionId;
const uuid = conversation.get('uuid');
if (!distributionId) {
log.info(
`group ${group?.idForLogging()} had no distributionid, cancelling job.`
);
return;
}
if (!uuid) {
log.info(
`conversation ${conversation.idForLogging()} was missing uuid, cancelling job.`
);
return;
}
try {
await handleMessageSend(
messaging.sendSenderKeyDistributionMessage(
{
distributionId,
groupId,
identifiers: [uuid],
throwIfNotInDatabase: true,
urgent: false,
},
sendOptions
),
{ messageIds: [], sendType: 'senderKeyDistributionMessage' }
);
} catch (error: unknown) {
if (
error instanceof NoSenderKeyError ||
error instanceof OutgoingIdentityKeyError ||
error instanceof UnregisteredUserError
) {
log.info(
'Send failure was NoSenderKeyError, OutgoingIdentityKeyError or UnregisteredUserError. Cancelling job.'
);
return;
}
await handleMultipleSendErrors({
errors: maybeExpandErrors(error),
isFinalAttempt,
log,
timeRemaining,
toThrow: error,
});
}
}

View file

@ -312,3 +312,5 @@ export class UnknownRecipientError extends Error {}
export class IncorrectSenderKeyAuthError extends Error {} export class IncorrectSenderKeyAuthError extends Error {}
export class WarnOnlyError extends Error {} export class WarnOnlyError extends Error {}
export class NoSenderKeyError extends Error {}

View file

@ -55,6 +55,7 @@ import {
SignedPreKeyRotationError, SignedPreKeyRotationError,
SendMessageProtoError, SendMessageProtoError,
HTTPError, HTTPError,
NoSenderKeyError,
} from './Errors'; } from './Errors';
import type { BodyRangesType, StoryContextType } from '../types/Util'; import type { BodyRangesType, StoryContextType } from '../types/Util';
import type { import type {
@ -2123,63 +2124,18 @@ export default class MessageSender {
}); });
} }
static getNullMessage({ static getNullMessage(
uuid, options: Readonly<{
e164,
padding,
}: Readonly<{
uuid?: string;
e164?: string;
padding?: Uint8Array; padding?: Uint8Array;
}>): SingleProtoJobData { }> = {}
): Proto.Content {
const nullMessage = new Proto.NullMessage(); const nullMessage = new Proto.NullMessage();
nullMessage.padding = options.padding || MessageSender.getRandomPadding();
const identifier = uuid || e164;
if (!identifier) {
throw new Error('sendNullMessage: Got neither uuid nor e164!');
}
nullMessage.padding = padding || MessageSender.getRandomPadding();
const contentMessage = new Proto.Content(); const contentMessage = new Proto.Content();
contentMessage.nullMessage = nullMessage; contentMessage.nullMessage = nullMessage;
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; return contentMessage;
return {
contentHint: ContentHint.RESENDABLE,
identifier,
isSyncMessage: false,
protoBase64: Bytes.toBase64(
Proto.Content.encode(contentMessage).finish()
),
type: 'nullMessage',
urgent: false,
};
}
async sendRetryRequest({
groupId,
options,
plaintext,
uuid,
}: Readonly<{
groupId?: string;
options?: SendOptionsType;
plaintext: PlaintextContent;
uuid: string;
}>): Promise<CallbackResultType> {
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
return this.sendMessageProtoAndWait({
timestamp: Date.now(),
recipients: [uuid],
proto: plaintext,
contentHint: ContentHint.DEFAULT,
groupId,
options,
urgent: false,
});
} }
// Group sends // Group sends
@ -2358,7 +2314,7 @@ export default class MessageSender {
distributionId distributionId
); );
if (!key) { if (!key) {
throw new Error( throw new NoSenderKeyError(
`getSenderKeyDistributionMessage: Distribution ${distributionId} was not in database as expected` `getSenderKeyDistributionMessage: Distribution ${distributionId} was not in database as expected`
); );
} }

View file

@ -5,14 +5,12 @@ import {
DecryptionErrorMessage, DecryptionErrorMessage,
PlaintextContent, PlaintextContent,
} from '@signalapp/libsignal-client'; } from '@signalapp/libsignal-client';
import { isBoolean, isNumber } from 'lodash'; import { isNumber } from 'lodash';
import * as Bytes from '../Bytes'; import * as Bytes from '../Bytes';
import dataInterface from '../sql/Client'; import dataInterface from '../sql/Client';
import { isProduction } from './version'; import { isProduction } from './version';
import { strictAssert } from './assert'; import { strictAssert } from './assert';
import { getSendOptions } from './getSendOptions';
import { handleMessageSend } from './handleMessageSend';
import { isGroupV2 } from './whatTypeOfConversation'; import { isGroupV2 } from './whatTypeOfConversation';
import { isOlderThan } from './timestamp'; import { isOlderThan } from './timestamp';
import { parseIntOrThrow } from './parseIntOrThrow'; import { parseIntOrThrow } from './parseIntOrThrow';
@ -37,17 +35,13 @@ import type {
import { SignalService as Proto } from '../protobuf'; import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log'; import * as log from '../logging/log';
import MessageSender from '../textsecure/SendMessage'; import type MessageSender from '../textsecure/SendMessage';
import type { StoryDistributionListDataType } from '../state/ducks/storyDistributionLists'; import type { StoryDistributionListDataType } from '../state/ducks/storyDistributionLists';
import { drop } from './drop'; import { drop } from './drop';
import { conversationJobQueue } from '../jobs/conversationJobQueue';
const RETRY_LIMIT = 5; const RETRY_LIMIT = 5;
// Note: Neither of the the two functions onRetryRequest and onDecrytionError use a job
// queue to make sure sends are reliable. That's unnecessary because these tasks are
// tied to incoming message processing queue, and will only confirm() completion on
// successful send.
// Entrypoints // Entrypoints
const retryRecord = new Map<number, number>(); const retryRecord = new Map<number, number>();
@ -175,23 +169,17 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
requesterUuid, requesterUuid,
'private' 'private'
); );
const sendOptions = await getSendOptions(recipientConversation.attributes, { const protoToSend = new Proto.Content(contentProto);
story,
}); await conversationJobQueue.add({
const promise = messaging.sendMessageProtoAndWait({ type: 'SavedProto',
conversationId: recipientConversation.id,
contentHint, contentHint,
groupId, groupId,
options: sendOptions, protoBase64: Bytes.toBase64(Proto.Content.encode(protoToSend).finish()),
proto: new Proto.Content(contentProto), story,
recipients: [requesterUuid],
timestamp, timestamp,
urgent, urgent,
story,
});
await handleMessageSend(promise, {
messageIds: [],
sendType: 'resendFromLog',
}); });
confirm(); confirm();
@ -313,7 +301,6 @@ async function sendDistributionMessageOrNullMessage(
requesterUuid, requesterUuid,
'private' 'private'
); );
const sendOptions = await getSendOptions(conversation.attributes);
if (groupId) { if (groupId) {
const group = window.ConversationController.get(groupId); const group = window.ConversationController.get(groupId);
@ -331,23 +318,15 @@ async function sendDistributionMessageOrNullMessage(
); );
try { try {
await handleMessageSend( await conversationJobQueue.add({
messaging.sendSenderKeyDistributionMessage( type: 'SenderKeyDistribution',
{ conversationId: conversation.id,
distributionId,
groupId, groupId,
identifiers: [requesterUuid], });
throwIfNotInDatabase: true,
urgent: false,
},
sendOptions
),
{ messageIds: [], sendType: 'senderKeyDistributionMessage' }
);
sentDistributionMessage = true; sentDistributionMessage = true;
} catch (error) { } catch (error) {
log.error( log.error(
`sendDistributionMessageOrNullMessage/${logId}: Failed to send sender key distribution message`, `sendDistributionMessageOrNullMessage/${logId}: Failed to queue sender key distribution message`,
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
} }
@ -368,24 +347,13 @@ async function sendDistributionMessageOrNullMessage(
// Enqueue a null message using the newly-created session // Enqueue a null message using the newly-created session
try { try {
const nullMessage = MessageSender.getNullMessage({ await conversationJobQueue.add({
uuid: requesterUuid, type: 'NullMessage',
conversationId: conversation.id,
}); });
await handleMessageSend(
messaging.sendIndividualProto({
...nullMessage,
options: sendOptions,
proto: Proto.Content.decode(
Bytes.fromBase64(nullMessage.protoBase64)
),
timestamp: Date.now(),
urgent: isBoolean(nullMessage.urgent) ? nullMessage.urgent : true,
}),
{ messageIds: [], sendType: nullMessage.type }
);
} catch (error) { } catch (error) {
log.error( log.error(
'sendDistributionMessageOrNullMessage: Failed to send null message', 'sendDistributionMessageOrNullMessage: Failed to queue null message',
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
} }
@ -606,16 +574,12 @@ async function requestResend(decryptionError: DecryptionErrorEventData) {
// 1. Find the target conversation // 1. Find the target conversation
const group = groupId
? window.ConversationController.get(groupId)
: undefined;
const sender = window.ConversationController.getOrCreate( const sender = window.ConversationController.getOrCreate(
senderUuid, senderUuid,
'private' 'private'
); );
const conversation = group || sender;
// 2. Send resend request // 2. Prepare resend request
if (!cipherTextBytes || !isNumber(cipherTextType)) { if (!cipherTextBytes || !isNumber(cipherTextType)) {
log.warn( log.warn(
@ -625,7 +589,6 @@ async function requestResend(decryptionError: DecryptionErrorEventData) {
return; return;
} }
try {
const message = DecryptionErrorMessage.forOriginal( const message = DecryptionErrorMessage.forOriginal(
Buffer.from(cipherTextBytes), Buffer.from(cipherTextBytes),
cipherTextType, cipherTextType,
@ -634,75 +597,29 @@ async function requestResend(decryptionError: DecryptionErrorEventData) {
); );
const plaintext = PlaintextContent.from(message); const plaintext = PlaintextContent.from(message);
const options = await getSendOptions(conversation.attributes);
const result = await handleMessageSend( // 3. Queue resend request
messaging.sendRetryRequest({
plaintext, try {
options, await conversationJobQueue.add({
type: 'ResendRequest',
contentHint,
conversationId: sender.id,
groupId, groupId,
uuid: senderUuid, plaintext: Bytes.toBase64(plaintext.serialize()),
}), receivedAtCounter,
{ messageIds: [], sendType: 'retryRequest' } receivedAtDate,
); senderUuid,
if (result && result.errors && result.errors.length > 0) { senderDevice,
throw result.errors[0]; timestamp,
} });
} catch (error) { } catch (error) {
log.error( log.error(
`requestResend/${logId}: Failed to send retry request, failing over to automatic reset`, `requestResend/${logId}: Failed to queue resend request, failing over to automatic reset`,
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
startAutomaticSessionReset(decryptionError); startAutomaticSessionReset(decryptionError);
return;
} }
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
// 3. Determine how to represent this to the user. Three different options.
// We believe that it could be successfully re-sent, so we'll add a placeholder.
if (contentHint === ContentHint.RESENDABLE) {
const { retryPlaceholders } = window.Signal.Services;
strictAssert(retryPlaceholders, 'requestResend: adding placeholder');
log.info(`requestResend/${logId}: Adding placeholder`);
const state = window.reduxStore.getState();
const selectedId = state.conversations.selectedConversationId;
const wasOpened = selectedId === conversation.id;
await retryPlaceholders.add({
conversationId: conversation.get('id'),
receivedAt: receivedAtDate,
receivedAtCounter,
sentAt: timestamp,
senderUuid,
wasOpened,
});
return;
}
// This message cannot be resent. We'll show no error and trust the other side to
// reset their session.
if (contentHint === ContentHint.IMPLICIT) {
log.info(`requestResend/${logId}: contentHint is IMPLICIT, doing nothing.`);
return;
}
log.warn(`requestResend/${logId}: No content hint, adding error immediately`);
drop(
conversation.queueJob('addDeliveryIssue', async () => {
drop(
conversation.addDeliveryIssue({
receivedAt: receivedAtDate,
receivedAtCounter,
senderUuid,
sentAt: timestamp,
})
);
})
);
} }
function scheduleSessionReset(senderUuid: string, senderDevice: number) { function scheduleSessionReset(senderUuid: string, senderDevice: number) {
@ -726,7 +643,12 @@ function scheduleSessionReset(senderUuid: string, senderDevice: number) {
); );
} }
function startAutomaticSessionReset(decryptionError: DecryptionErrorEventData) { export function startAutomaticSessionReset(
decryptionError: Pick<
DecryptionErrorEventData,
'senderUuid' | 'senderDevice' | 'timestamp'
>
): void {
const { senderUuid, senderDevice, timestamp } = decryptionError; const { senderUuid, senderDevice, timestamp } = decryptionError;
const logId = `${senderUuid}.${senderDevice} ${timestamp}`; const logId = `${senderUuid}.${senderDevice} ${timestamp}`;
@ -740,7 +662,7 @@ function startAutomaticSessionReset(decryptionError: DecryptionErrorEventData) {
}); });
if (!conversation) { if (!conversation) {
log.warn( log.warn(
'onLightSessionReset: No conversation, cannot add message to timeline' 'startAutomaticSessionReset: No conversation, cannot add message to timeline'
); );
return; return;
} }
@ -749,12 +671,10 @@ function startAutomaticSessionReset(decryptionError: DecryptionErrorEventData) {
const receivedAtCounter = window.Signal.Util.incrementMessageCounter(); const receivedAtCounter = window.Signal.Util.incrementMessageCounter();
drop( drop(
conversation.queueJob('addChatSessionRefreshed', async () => { conversation.queueJob('addChatSessionRefreshed', async () => {
drop( await conversation.addChatSessionRefreshed({
conversation.addChatSessionRefreshed({
receivedAt, receivedAt,
receivedAtCounter, receivedAtCounter,
}) });
);
}) })
); );
} }