signal-desktop/ts/jobs/conversationJobQueue.ts

348 lines
11 KiB
TypeScript
Raw Normal View History

2022-02-16 18:36:21 +00:00
// Copyright 2021-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { z } from 'zod';
import type PQueue from 'p-queue';
import * as globalLogger from '../logging/log';
import * as durations from '../util/durations';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
import { InMemoryQueues } from './helpers/InMemoryQueues';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { JobQueue } from './JobQueue';
import { sendNormalMessage } from './helpers/sendNormalMessage';
import { sendDirectExpirationTimerUpdate } from './helpers/sendDirectExpirationTimerUpdate';
import { sendGroupUpdate } from './helpers/sendGroupUpdate';
import { sendDeleteForEveryone } from './helpers/sendDeleteForEveryone';
import { sendProfileKey } from './helpers/sendProfileKey';
import { sendReaction } from './helpers/sendReaction';
import type { LoggerType } from '../types/Logging';
import { ConversationVerificationState } from '../state/ducks/conversationsEnums';
import { sleep } from '../util/sleep';
import { SECOND } from '../util/durations';
import {
OutgoingIdentityKeyError,
SendMessageProtoError,
} from '../textsecure/Errors';
import { strictAssert } from '../util/assert';
import { missingCaseError } from '../util/missingCaseError';
import { explodePromise } from '../util/explodePromise';
// Note: generally, we only want to add to this list. If you do need to change one of
// these values, you'll likely need to write a database migration.
export const conversationQueueJobEnum = z.enum([
'DeleteForEveryone',
'DirectExpirationTimerUpdate',
'GroupUpdate',
'NormalMessage',
'ProfileKey',
'Reaction',
]);
const deleteForEveryoneJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.DeleteForEveryone),
conversationId: z.string(),
messageId: z.string(),
recipients: z.array(z.string()),
revision: z.number().optional(),
targetTimestamp: z.number(),
});
export type DeleteForEveryoneJobData = z.infer<
typeof deleteForEveryoneJobDataSchema
>;
const expirationTimerUpdateJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.DirectExpirationTimerUpdate),
conversationId: z.string(),
expireTimer: z.number().or(z.undefined()),
// Note: no recipients/revision, because this job is for 1:1 conversations only!
});
export type ExpirationTimerUpdateJobData = z.infer<
typeof expirationTimerUpdateJobDataSchema
>;
const groupUpdateJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.GroupUpdate),
conversationId: z.string(),
groupChangeBase64: z.string().optional(),
recipients: z.array(z.string()),
revision: z.number(),
});
export type GroupUpdateJobData = z.infer<typeof groupUpdateJobDataSchema>;
const normalMessageSendJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.NormalMessage),
conversationId: z.string(),
messageId: z.string(),
// Note: recipients are baked into the message itself
revision: z.number().optional(),
});
export type NormalMessageSendJobData = z.infer<
typeof normalMessageSendJobDataSchema
>;
const profileKeyJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.ProfileKey),
conversationId: z.string(),
// Note: we will use whichever recipients list is up to date when this job runs
revision: z.number().optional(),
});
export type ProfileKeyJobData = z.infer<typeof profileKeyJobDataSchema>;
const reactionJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.Reaction),
conversationId: z.string(),
messageId: z.string(),
// Note: recipients are baked into the message itself
revision: z.number().optional(),
});
export type ReactionJobData = z.infer<typeof reactionJobDataSchema>;
export const conversationQueueJobDataSchema = z.union([
deleteForEveryoneJobDataSchema,
expirationTimerUpdateJobDataSchema,
groupUpdateJobDataSchema,
normalMessageSendJobDataSchema,
profileKeyJobDataSchema,
reactionJobDataSchema,
]);
export type ConversationQueueJobData = z.infer<
typeof conversationQueueJobDataSchema
>;
export type ConversationQueueJobBundle = {
isFinalAttempt: boolean;
shouldContinue: boolean;
timeRemaining: number;
timestamp: number;
log: LoggerType;
};
const MAX_RETRY_TIME = durations.DAY;
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
private readonly inMemoryQueues = new InMemoryQueues();
private readonly verificationWaitMap = new Map<
string,
{
resolve: (value: unknown) => unknown;
reject: (error: Error) => unknown;
promise: Promise<unknown>;
}
>();
protected parseData(data: unknown): ConversationQueueJobData {
return conversationQueueJobDataSchema.parse(data);
}
protected override getInMemoryQueue({
data,
}: Readonly<{ data: ConversationQueueJobData }>): PQueue {
return this.inMemoryQueues.get(data.conversationId);
}
private startVerificationWaiter(conversationId: string): Promise<unknown> {
const existing = this.verificationWaitMap.get(conversationId);
if (existing) {
globalLogger.info(
`startVerificationWaiter: Found existing waiter for conversation ${conversationId}. Returning it.`
);
return existing.promise;
}
globalLogger.info(
`startVerificationWaiter: Starting new waiter for conversation ${conversationId}.`
);
const { resolve, reject, promise } = explodePromise();
this.verificationWaitMap.set(conversationId, {
resolve,
reject,
promise,
});
return promise;
}
public resolveVerificationWaiter(conversationId: string): void {
const existing = this.verificationWaitMap.get(conversationId);
if (existing) {
globalLogger.info(
`resolveVerificationWaiter: Found waiter for conversation ${conversationId}. Resolving.`
);
existing.resolve('resolveVerificationWaiter: success');
this.verificationWaitMap.delete(conversationId);
} else {
globalLogger.warn(
`resolveVerificationWaiter: Missing waiter for conversation ${conversationId}.`
);
}
}
protected async run(
{
data,
timestamp,
}: Readonly<{ data: ConversationQueueJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
const { type, conversationId } = data;
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
await window.ConversationController.load();
const conversation = window.ConversationController.get(conversationId);
if (!conversation) {
throw new Error(`Failed to find conversation ${conversationId}`);
}
let timeRemaining: number;
let shouldContinue: boolean;
// eslint-disable-next-line no-constant-condition
while (true) {
log.info('calculating timeRemaining and shouldContinue...');
timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
// eslint-disable-next-line no-await-in-loop
shouldContinue = await commonShouldJobContinue({
attempt,
log,
timeRemaining,
});
if (!shouldContinue) {
break;
}
const verificationData =
window.reduxStore.getState().conversations
.verificationDataByConversation[conversationId];
if (!verificationData) {
break;
}
if (
verificationData.type ===
ConversationVerificationState.PendingVerification
) {
log.info(
'verification is pending for this conversation; waiting at most 30s...'
);
// eslint-disable-next-line no-await-in-loop
await Promise.race([
this.startVerificationWaiter(conversation.id),
sleep(30 * SECOND),
]);
continue;
}
if (
verificationData.type ===
ConversationVerificationState.VerificationCancelled
) {
if (verificationData.canceledAt >= timestamp) {
log.info(
'cancelling job; user cancelled out of verification dialog.'
);
shouldContinue = false;
} else {
log.info(
'clearing cancellation tombstone; continuing ahead with job'
);
window.reduxActions.conversations.clearCancelledConversationVerification(
conversation.id
);
}
break;
}
throw missingCaseError(verificationData);
}
const jobBundle = {
isFinalAttempt,
shouldContinue,
timeRemaining,
timestamp,
log,
};
// Note: A six-letter variable makes below code autoformatting easier to read.
const jobSet = conversationQueueJobEnum.enum;
try {
switch (type) {
case jobSet.DeleteForEveryone:
await sendDeleteForEveryone(conversation, jobBundle, data);
break;
case jobSet.DirectExpirationTimerUpdate:
await sendDirectExpirationTimerUpdate(conversation, jobBundle, data);
break;
case jobSet.GroupUpdate:
await sendGroupUpdate(conversation, jobBundle, data);
break;
case jobSet.NormalMessage:
await sendNormalMessage(conversation, jobBundle, data);
break;
case jobSet.ProfileKey:
await sendProfileKey(conversation, jobBundle, data);
break;
case jobSet.Reaction:
await sendReaction(conversation, jobBundle, data);
break;
default: {
// Note: This should never happen, because the zod call in parseData wouldn't
// accept data that doesn't look like our type specification.
const problem: never = type;
log.error(
`conversationJobQueue: Got job with type ${problem}; Cancelling job.`
);
}
}
} catch (error: unknown) {
const untrustedConversationIds: Array<string> = [];
if (error instanceof OutgoingIdentityKeyError) {
const failedConversation = window.ConversationController.getOrCreate(
error.identifier,
'private'
);
strictAssert(failedConversation, 'Conversation should be created');
untrustedConversationIds.push(failedConversation.id);
2022-02-16 18:36:21 +00:00
} else if (error instanceof SendMessageProtoError) {
(error.errors || []).forEach(innerError => {
if (innerError instanceof OutgoingIdentityKeyError) {
const failedConversation =
window.ConversationController.getOrCreate(
innerError.identifier,
'private'
);
strictAssert(failedConversation, 'Conversation should be created');
untrustedConversationIds.push(failedConversation.id);
2022-02-16 18:36:21 +00:00
}
});
}
if (untrustedConversationIds.length) {
log.error(
`Send failed because ${untrustedConversationIds.length} conversation(s) were untrusted. Adding to verification list.`
);
window.reduxActions.conversations.conversationStoppedByMissingVerification(
{
conversationId: conversation.id,
untrustedConversationIds,
}
);
}
throw error;
}
}
}
export const conversationJobQueue = new ConversationJobQueue({
store: jobQueueDatabaseStore,
queueType: 'conversation',
maxAttempts: MAX_ATTEMPTS,
});