// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { z } from 'zod'; import type PQueue from 'p-queue'; import * as globalLogger from '../logging/log'; import * as durations from '../util/durations'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; import { InMemoryQueues } from './helpers/InMemoryQueues'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { JobQueue } from './JobQueue'; import { sendNormalMessage } from './helpers/sendNormalMessage'; import { sendDirectExpirationTimerUpdate } from './helpers/sendDirectExpirationTimerUpdate'; import { sendGroupUpdate } from './helpers/sendGroupUpdate'; import { sendDeleteForEveryone } from './helpers/sendDeleteForEveryone'; import { sendProfileKey } from './helpers/sendProfileKey'; import { sendReaction } from './helpers/sendReaction'; import type { LoggerType } from '../types/Logging'; import { ConversationVerificationState } from '../state/ducks/conversationsEnums'; import { sleep } from '../util/sleep'; import { SECOND } from '../util/durations'; import { OutgoingIdentityKeyError, SendMessageProtoError, } from '../textsecure/Errors'; import { strictAssert } from '../util/assert'; import { missingCaseError } from '../util/missingCaseError'; import { explodePromise } from '../util/explodePromise'; // Note: generally, we only want to add to this list. If you do need to change one of // these values, you'll likely need to write a database migration. export const conversationQueueJobEnum = z.enum([ 'DeleteForEveryone', 'DirectExpirationTimerUpdate', 'GroupUpdate', 'NormalMessage', 'ProfileKey', 'Reaction', ]); const deleteForEveryoneJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.DeleteForEveryone), conversationId: z.string(), messageId: z.string(), recipients: z.array(z.string()), revision: z.number().optional(), targetTimestamp: z.number(), }); export type DeleteForEveryoneJobData = z.infer< typeof deleteForEveryoneJobDataSchema >; const expirationTimerUpdateJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.DirectExpirationTimerUpdate), conversationId: z.string(), expireTimer: z.number().or(z.undefined()), // Note: no recipients/revision, because this job is for 1:1 conversations only! }); export type ExpirationTimerUpdateJobData = z.infer< typeof expirationTimerUpdateJobDataSchema >; const groupUpdateJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.GroupUpdate), conversationId: z.string(), groupChangeBase64: z.string().optional(), recipients: z.array(z.string()), revision: z.number(), }); export type GroupUpdateJobData = z.infer; const normalMessageSendJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.NormalMessage), conversationId: z.string(), messageId: z.string(), // Note: recipients are baked into the message itself revision: z.number().optional(), }); export type NormalMessageSendJobData = z.infer< typeof normalMessageSendJobDataSchema >; const profileKeyJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.ProfileKey), conversationId: z.string(), // Note: we will use whichever recipients list is up to date when this job runs revision: z.number().optional(), }); export type ProfileKeyJobData = z.infer; const reactionJobDataSchema = z.object({ type: z.literal(conversationQueueJobEnum.enum.Reaction), conversationId: z.string(), messageId: z.string(), // Note: recipients are baked into the message itself revision: z.number().optional(), }); export type ReactionJobData = z.infer; export const conversationQueueJobDataSchema = z.union([ deleteForEveryoneJobDataSchema, expirationTimerUpdateJobDataSchema, groupUpdateJobDataSchema, normalMessageSendJobDataSchema, profileKeyJobDataSchema, reactionJobDataSchema, ]); export type ConversationQueueJobData = z.infer< typeof conversationQueueJobDataSchema >; export type ConversationQueueJobBundle = { isFinalAttempt: boolean; shouldContinue: boolean; timeRemaining: number; timestamp: number; log: LoggerType; }; const MAX_RETRY_TIME = durations.DAY; const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); export class ConversationJobQueue extends JobQueue { private readonly inMemoryQueues = new InMemoryQueues(); private readonly verificationWaitMap = new Map< string, { resolve: (value: unknown) => unknown; reject: (error: Error) => unknown; promise: Promise; } >(); protected parseData(data: unknown): ConversationQueueJobData { return conversationQueueJobDataSchema.parse(data); } protected override getInMemoryQueue({ data, }: Readonly<{ data: ConversationQueueJobData }>): PQueue { return this.inMemoryQueues.get(data.conversationId); } private startVerificationWaiter(conversationId: string): Promise { const existing = this.verificationWaitMap.get(conversationId); if (existing) { globalLogger.info( `startVerificationWaiter: Found existing waiter for conversation ${conversationId}. Returning it.` ); return existing.promise; } globalLogger.info( `startVerificationWaiter: Starting new waiter for conversation ${conversationId}.` ); const { resolve, reject, promise } = explodePromise(); this.verificationWaitMap.set(conversationId, { resolve, reject, promise, }); return promise; } public resolveVerificationWaiter(conversationId: string): void { const existing = this.verificationWaitMap.get(conversationId); if (existing) { globalLogger.info( `resolveVerificationWaiter: Found waiter for conversation ${conversationId}. Resolving.` ); existing.resolve('resolveVerificationWaiter: success'); this.verificationWaitMap.delete(conversationId); } else { globalLogger.warn( `resolveVerificationWaiter: Missing waiter for conversation ${conversationId}.` ); } } protected async run( { data, timestamp, }: Readonly<{ data: ConversationQueueJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> ): Promise { const { type, conversationId } = data; const isFinalAttempt = attempt >= MAX_ATTEMPTS; await window.ConversationController.load(); const conversation = window.ConversationController.get(conversationId); if (!conversation) { throw new Error(`Failed to find conversation ${conversationId}`); } let timeRemaining: number; let shouldContinue: boolean; // eslint-disable-next-line no-constant-condition while (true) { log.info('calculating timeRemaining and shouldContinue...'); timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); // eslint-disable-next-line no-await-in-loop shouldContinue = await commonShouldJobContinue({ attempt, log, timeRemaining, }); if (!shouldContinue) { break; } const verificationData = window.reduxStore.getState().conversations .verificationDataByConversation[conversationId]; if (!verificationData) { break; } if ( verificationData.type === ConversationVerificationState.PendingVerification ) { log.info( 'verification is pending for this conversation; waiting at most 30s...' ); // eslint-disable-next-line no-await-in-loop await Promise.race([ this.startVerificationWaiter(conversation.id), sleep(30 * SECOND), ]); continue; } if ( verificationData.type === ConversationVerificationState.VerificationCancelled ) { if (verificationData.canceledAt >= timestamp) { log.info( 'cancelling job; user cancelled out of verification dialog.' ); shouldContinue = false; } else { log.info( 'clearing cancellation tombstone; continuing ahead with job' ); window.reduxActions.conversations.clearCancelledConversationVerification( conversation.id ); } break; } throw missingCaseError(verificationData); } const jobBundle = { isFinalAttempt, shouldContinue, timeRemaining, timestamp, log, }; // Note: A six-letter variable makes below code autoformatting easier to read. const jobSet = conversationQueueJobEnum.enum; try { switch (type) { case jobSet.DeleteForEveryone: await sendDeleteForEveryone(conversation, jobBundle, data); break; case jobSet.DirectExpirationTimerUpdate: await sendDirectExpirationTimerUpdate(conversation, jobBundle, data); break; case jobSet.GroupUpdate: await sendGroupUpdate(conversation, jobBundle, data); break; case jobSet.NormalMessage: await sendNormalMessage(conversation, jobBundle, data); break; case jobSet.ProfileKey: await sendProfileKey(conversation, jobBundle, data); break; case jobSet.Reaction: await sendReaction(conversation, jobBundle, data); break; default: { // Note: This should never happen, because the zod call in parseData wouldn't // accept data that doesn't look like our type specification. const problem: never = type; log.error( `conversationJobQueue: Got job with type ${problem}; Cancelling job.` ); } } } catch (error: unknown) { const untrustedConversationIds: Array = []; if (error instanceof OutgoingIdentityKeyError) { const failedConversation = window.ConversationController.getOrCreate( error.identifier, 'private' ); strictAssert(failedConversation, 'Conversation should be created'); untrustedConversationIds.push(conversation.id); } else if (error instanceof SendMessageProtoError) { (error.errors || []).forEach(innerError => { if (innerError instanceof OutgoingIdentityKeyError) { const failedConversation = window.ConversationController.getOrCreate( innerError.identifier, 'private' ); strictAssert(failedConversation, 'Conversation should be created'); untrustedConversationIds.push(conversation.id); } }); } if (untrustedConversationIds.length) { log.error( `Send failed because ${untrustedConversationIds.length} conversation(s) were untrusted. Adding to verification list.` ); window.reduxActions.conversations.conversationStoppedByMissingVerification( { conversationId: conversation.id, untrustedConversationIds, } ); } throw error; } } } export const conversationJobQueue = new ConversationJobQueue({ store: jobQueueDatabaseStore, queueType: 'conversation', maxAttempts: MAX_ATTEMPTS, });