diff --git a/ts/background.ts b/ts/background.ts index 7db5ebe1ff03..50b6ba4155df 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -128,7 +128,6 @@ import type { ViewOnceOpenSyncAttributesType } from './messageModifiers/ViewOnce import { ReadStatus } from './messages/MessageReadStatus'; import type { SendStateByConversationId } from './messages/MessageSendState'; import { SendStatus } from './messages/MessageSendState'; -import * as AttachmentDownloads from './messageModifiers/AttachmentDownloads'; import * as Stickers from './types/Stickers'; import * as Errors from './types/errors'; import { SignalService as Proto } from './protobuf'; @@ -197,6 +196,7 @@ import { } from './util/callDisposition'; import { deriveStorageServiceKey } from './Crypto'; import { getThemeType } from './util/getThemeType'; +import { AttachmentDownloadManager } from './jobs/AttachmentDownloadManager'; export function isOverHourIntoPast(timestamp: number): boolean { return isNumber(timestamp) && isOlderThan(timestamp, HOUR); @@ -715,8 +715,9 @@ export async function startApp(): Promise { 'background/shutdown: shutdown requested' ); + server?.cancelInflightRequests('shutdown'); + // Stop background processing - void AttachmentDownloads.stop(); idleDetector.stop(); // Stop processing incoming messages @@ -793,6 +794,14 @@ export async function startApp(): Promise { window.waitForAllWaitBatchers(), ]); + log.info( + 'background/shutdown: waiting for all attachment downloads to finish' + ); + + // Since we canceled the inflight requests earlier in shutdown, this should + // resolve quickly + await AttachmentDownloadManager.stop(); + log.info('background/shutdown: closing the database'); // Shut down the data interface cleanly @@ -1541,7 +1550,7 @@ export async function startApp(): Promise { log.info('background: offline'); drop(challengeHandler?.onOffline()); - drop(AttachmentDownloads.stop()); + drop(AttachmentDownloadManager.stop()); drop(messageReceiver?.drain()); if (connectCount === 0) { @@ -1686,11 +1695,7 @@ export async function startApp(): Promise { void window.Signal.Services.initializeGroupCredentialFetcher(); - drop( - AttachmentDownloads.start({ - logger: log, - }) - ); + drop(AttachmentDownloadManager.start()); if (connectCount === 1) { Stickers.downloadQueuedPacks(); diff --git a/ts/components/conversation/Timeline.tsx b/ts/components/conversation/Timeline.tsx index 9366f129fb8b..065aad14a2f6 100644 --- a/ts/components/conversation/Timeline.tsx +++ b/ts/components/conversation/Timeline.tsx @@ -112,6 +112,7 @@ type PropsHousekeepingType = { i18n: LocalizerType; theme: ThemeType; + updateVisibleMessages?: (messageIds: Array) => void; renderCollidingAvatars: (_: { conversationIds: ReadonlyArray; }) => JSX.Element; @@ -371,6 +372,7 @@ export class Timeline extends React.Component< const intersectionRatios = new Map(); + this.props.updateVisibleMessages?.([]); const intersectionObserverCallback: IntersectionObserverCallback = entries => { // The first time this callback is called, we'll get entries in observation order @@ -384,12 +386,16 @@ export class Timeline extends React.Component< let oldestPartiallyVisible: undefined | Element; let newestPartiallyVisible: undefined | Element; let newestFullyVisible: undefined | Element; - + const visibleMessageIds: Array = []; for (const [element, intersectionRatio] of intersectionRatios) { if (intersectionRatio === 0) { continue; } + const messageId = getMessageIdFromElement(element); + if (messageId) { + visibleMessageIds.push(messageId); + } // We use this "at bottom detector" for two reasons, both for performance. It's // usually faster to use an `IntersectionObserver` instead of a scroll event, // and we want to do that here. @@ -409,6 +415,8 @@ export class Timeline extends React.Component< } } + this.props.updateVisibleMessages?.(visibleMessageIds); + // If a message is fully visible, then you can see its bottom. If not, there's a // very tall message around. We assume you can see the bottom of a message if // (1) another message is partly visible right below it, or (2) you're near the @@ -554,6 +562,7 @@ export class Timeline extends React.Component< this.intersectionObserver?.disconnect(); this.cleanupGroupCallPeekTimeouts(); + this.props.updateVisibleMessages?.([]); } public override getSnapshotBeforeUpdate( diff --git a/ts/jobs/AttachmentDownloadManager.ts b/ts/jobs/AttachmentDownloadManager.ts new file mode 100644 index 000000000000..fd5d475cb037 --- /dev/null +++ b/ts/jobs/AttachmentDownloadManager.ts @@ -0,0 +1,629 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { omit } from 'lodash'; + +import { drop } from '../util/drop'; +import * as durations from '../util/durations'; +import { missingCaseError } from '../util/missingCaseError'; +import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; +import * as log from '../logging/log'; +import { + type AttachmentDownloadJobTypeType, + type AttachmentDownloadJobType, + attachmentDownloadJobSchema, +} from '../types/AttachmentDownload'; +import { + AttachmentNotFoundOnCdnError, + downloadAttachment, +} from '../util/downloadAttachment'; +import dataInterface from '../sql/Client'; +import { getValue } from '../RemoteConfig'; + +import { + explodePromise, + type ExplodePromiseResultType, +} from '../util/explodePromise'; +import { isInCall as isInCallSelector } from '../state/selectors/calling'; +import { + type ExponentialBackoffOptionsType, + exponentialBackoffSleepTime, +} from '../util/exponentialBackoff'; +import { AttachmentSizeError, type AttachmentType } from '../types/Attachment'; +import { __DEPRECATED$getMessageById } from '../messages/getMessageById'; +import type { MessageModel } from '../models/messages'; +import { + KIBIBYTE, + getMaximumIncomingAttachmentSizeInKb, + getMaximumIncomingTextAttachmentSizeInKb, +} from '../types/AttachmentSize'; +import { addAttachmentToMessage } from '../messageModifiers/AttachmentDownloads'; +import * as Errors from '../types/errors'; +import { redactGenericText } from '../util/privacy'; + +export enum AttachmentDownloadUrgency { + IMMEDIATE = 'immediate', + STANDARD = 'standard', +} + +const TICK_INTERVAL = durations.MINUTE; +const MAX_CONCURRENT_JOBS = 3; + +type AttachmentDownloadJobIdentifiersType = Pick< + AttachmentDownloadJobType, + 'messageId' | 'attachmentType' | 'digest' +>; + +// Type for adding a new job +export type NewAttachmentDownloadJobType = { + attachment: AttachmentType; + messageId: string; + receivedAt: number; + sentAt: number; + attachmentType: AttachmentDownloadJobTypeType; + urgency?: AttachmentDownloadUrgency; +}; + +const RETRY_CONFIG: Record< + 'default', + { maxRetries: number; backoffConfig: ExponentialBackoffOptionsType } +> = { + default: { + maxRetries: 4, + backoffConfig: { + // 30 seconds, 5 minutes, 50 minutes, (max) 6 hrs + multiplier: 10, + firstBackoffTime: 30 * durations.SECOND, + maxBackoffTime: 6 * durations.HOUR, + }, + }, +}; + +type AttachmentDownloadManagerParamsType = { + getNextJobs: (options: { + limit: number; + prioritizeMessageIds?: Array; + timestamp?: number; + }) => Promise>; + + saveJob: (job: AttachmentDownloadJobType) => Promise; + removeJob: (job: AttachmentDownloadJobType) => Promise; + runJob: ( + job: AttachmentDownloadJobType, + isLastAttempt: boolean + ) => Promise; + isInCall: () => boolean; + beforeStart?: () => Promise; + maxAttempts: number; +}; +export type JobResultType = { status: 'retry' | 'finished' }; +export class AttachmentDownloadManager { + private static _instance: AttachmentDownloadManager | undefined; + private visibleTimelineMessages: Array = []; + private enabled: boolean = false; + private activeJobs: Map< + string, + { + completionPromise: ExplodePromiseResultType; + job: AttachmentDownloadJobType; + } + > = new Map(); + private timeout: NodeJS.Timeout | null = null; + private jobStartPromises: Map> = + new Map(); + private jobCompletePromises: Map> = + new Map(); + + static defaultParams: AttachmentDownloadManagerParamsType = { + beforeStart: dataInterface.resetAttachmentDownloadActive, + getNextJobs: dataInterface.getNextAttachmentDownloadJobs, + saveJob: dataInterface.saveAttachmentDownloadJob, + removeJob: dataInterface.removeAttachmentDownloadJob, + runJob: runDownloadAttachmentJob, + isInCall: () => { + const reduxState = window.reduxStore?.getState(); + if (reduxState) { + return isInCallSelector(reduxState); + } + return false; + }, + maxAttempts: RETRY_CONFIG.default.maxRetries + 1, + }; + + readonly getNextJobs: AttachmentDownloadManagerParamsType['getNextJobs']; + readonly saveJob: AttachmentDownloadManagerParamsType['saveJob']; + readonly removeJob: AttachmentDownloadManagerParamsType['removeJob']; + readonly runJob: AttachmentDownloadManagerParamsType['runJob']; + readonly beforeStart: AttachmentDownloadManagerParamsType['beforeStart']; + readonly isInCall: AttachmentDownloadManagerParamsType['isInCall']; + readonly maxAttempts: number; + + constructor( + params: AttachmentDownloadManagerParamsType = AttachmentDownloadManager.defaultParams + ) { + this.getNextJobs = params.getNextJobs; + this.saveJob = params.saveJob; + this.removeJob = params.removeJob; + this.runJob = params.runJob; + this.beforeStart = params.beforeStart; + this.isInCall = params.isInCall; + this.maxAttempts = params.maxAttempts; + } + + async start(): Promise { + this.enabled = true; + await this.beforeStart?.(); + this.tick(); + } + + async stop(): Promise { + this.enabled = false; + clearTimeoutIfNecessary(this.timeout); + this.timeout = null; + await Promise.all( + [...this.activeJobs.values()].map( + ({ completionPromise }) => completionPromise.promise + ) + ); + } + + tick(): void { + clearTimeoutIfNecessary(this.timeout); + this.timeout = null; + drop(this.maybeStartJobs()); + this.timeout = setTimeout(() => this.tick(), TICK_INTERVAL); + } + + async addJob( + newJobData: NewAttachmentDownloadJobType + ): Promise { + const { + attachment, + messageId, + attachmentType, + receivedAt, + sentAt, + urgency = AttachmentDownloadUrgency.STANDARD, + } = newJobData; + const parseResult = attachmentDownloadJobSchema.safeParse({ + messageId, + receivedAt, + sentAt, + attachmentType, + digest: attachment.digest, + contentType: attachment.contentType, + size: attachment.size, + attachment, + active: false, + attempts: 0, + retryAfter: null, + lastAttemptTimestamp: null, + }); + + if (!parseResult.success) { + log.error( + `AttachmentDownloadManager/addJob(${sentAt}.${attachmentType}): invalid data`, + parseResult.error + ); + return attachment; + } + + const newJob = parseResult.data; + const jobIdForLogging = getJobIdForLogging(newJob); + const logId = `AttachmentDownloadManager/addJob(${jobIdForLogging})`; + + try { + const runningJob = this.getRunningJob(newJob); + if (runningJob) { + log.info(`${logId}: already running; resetting attempts`); + runningJob.attempts = 0; + + await this.saveJob({ + ...runningJob, + attempts: 0, + }); + return attachment; + } + + await this.saveJob(newJob); + } catch (e) { + log.error(`${logId}: error saving job`, Errors.toLogFormat(e)); + } + + switch (urgency) { + case AttachmentDownloadUrgency.IMMEDIATE: + log.info(`${logId}: starting job immediately`); + drop(this.startJob(newJob)); + break; + case AttachmentDownloadUrgency.STANDARD: + drop(this.maybeStartJobs()); + break; + default: + throw missingCaseError(urgency); + } + + return { + ...attachment, + pending: true, + }; + } + + updateVisibleTimelineMessages(messageIds: Array): void { + this.visibleTimelineMessages = messageIds; + } + + // used in testing + public waitForJobToBeStarted(job: AttachmentDownloadJobType): Promise { + const id = this.getJobIdIncludingAttempts(job); + const existingPromise = this.jobStartPromises.get(id)?.promise; + if (existingPromise) { + return existingPromise; + } + const { promise, resolve, reject } = explodePromise(); + this.jobStartPromises.set(id, { promise, resolve, reject }); + return promise; + } + + public waitForJobToBeCompleted( + job: AttachmentDownloadJobType + ): Promise { + const id = this.getJobIdIncludingAttempts(job); + const existingPromise = this.jobCompletePromises.get(id)?.promise; + if (existingPromise) { + return existingPromise; + } + const { promise, resolve, reject } = explodePromise(); + this.jobCompletePromises.set(id, { promise, resolve, reject }); + return promise; + } + + // Private methods + + // maybeStartJobs is called: + // 1. every minute (via tick) + // 2. after a job is added (via addJob) + // 3. after a job finishes (via startJob) + // preventing re-entrancy allow us to simplify some logic and ensure we don't try to + // start too many jobs + private _inMaybeStartJobs = false; + private async maybeStartJobs(): Promise { + if (this._inMaybeStartJobs) { + return; + } + + try { + this._inMaybeStartJobs = true; + + if (!this.enabled) { + log.info( + 'AttachmentDownloadManager/_maybeStartJobs: not enabled, returning' + ); + return; + } + + if (this.isInCall()) { + log.info( + 'AttachmentDownloadManager/_maybeStartJobs: holding off on starting new jobs; in call' + ); + return; + } + + const numJobsToStart = this.getMaximumNumberOfJobsToStart(); + + if (numJobsToStart <= 0) { + return; + } + + const nextJobs = await this.getNextJobs({ + limit: numJobsToStart, + // TODO (DESKTOP-6912): we'll want to prioritize more than just visible timeline + // messages, including: + // - media opened in lightbox + // - media for stories + prioritizeMessageIds: [...this.visibleTimelineMessages], + timestamp: Date.now(), + }); + + // TODO (DESKTOP-6913): if a prioritized job is selected, we will to update the + // in-memory job with that information so we can handle it differently, including + // e.g. downloading a thumbnail before the full-size version + for (const job of nextJobs) { + drop(this.startJob(job)); + } + } finally { + this._inMaybeStartJobs = false; + } + } + + private async startJob(job: AttachmentDownloadJobType): Promise { + const logId = `AttachmentDownloadManager/startJob(${getJobIdForLogging( + job + )})`; + if (this.isJobRunning(job)) { + log.info(`${logId}: job is already running`); + return; + } + const isLastAttempt = job.attempts + 1 >= this.maxAttempts; + + try { + log.info(`${logId}: starting job`); + this.addRunningJob(job); + await this.saveJob({ ...job, active: true }); + this.handleJobStartPromises(job); + + const { status } = await this.runJob(job, isLastAttempt); + log.info(`${logId}: job completed with status: ${status}`); + + switch (status) { + case 'finished': + await this.removeJob(job); + return; + case 'retry': + if (isLastAttempt) { + throw new Error('Cannot retry on last attempt'); + } + await this.retryJobLater(job); + return; + default: + throw missingCaseError(status); + } + } catch (e) { + log.error(`${logId}: error when running job`, e); + if (isLastAttempt) { + await this.removeJob(job); + } else { + await this.retryJobLater(job); + } + } finally { + this.removeRunningJob(job); + drop(this.maybeStartJobs()); + } + } + + private async retryJobLater(job: AttachmentDownloadJobType) { + const now = Date.now(); + await this.saveJob({ + ...job, + active: false, + attempts: job.attempts + 1, + // TODO (DESKTOP-6845): adjust retry based on job type (e.g. backup) + retryAfter: + now + + exponentialBackoffSleepTime( + job.attempts + 1, + RETRY_CONFIG.default.backoffConfig + ), + lastAttemptTimestamp: now, + }); + } + private getActiveJobCount(): number { + return this.activeJobs.size; + } + + private getMaximumNumberOfJobsToStart(): number { + return MAX_CONCURRENT_JOBS - this.getActiveJobCount(); + } + + private getRunningJob( + job: AttachmentDownloadJobIdentifiersType + ): AttachmentDownloadJobType | undefined { + const id = this.getJobId(job); + return this.activeJobs.get(id)?.job; + } + + private isJobRunning(job: AttachmentDownloadJobType): boolean { + return Boolean(this.getRunningJob(job)); + } + + private removeRunningJob(job: AttachmentDownloadJobType) { + const idWithAttempts = this.getJobIdIncludingAttempts(job); + this.jobCompletePromises.get(idWithAttempts)?.resolve(); + this.jobCompletePromises.delete(idWithAttempts); + + const id = this.getJobId(job); + this.activeJobs.get(id)?.completionPromise.resolve(); + this.activeJobs.delete(id); + } + + private addRunningJob(job: AttachmentDownloadJobType) { + if (this.isJobRunning(job)) { + const jobIdForLogging = getJobIdForLogging(job); + log.warn( + `attachmentDownloads/_addRunningJob: job ${jobIdForLogging} is already running` + ); + } + this.activeJobs.set(this.getJobId(job), { + completionPromise: explodePromise(), + job, + }); + } + + private handleJobStartPromises(job: AttachmentDownloadJobType) { + const id = this.getJobIdIncludingAttempts(job); + this.jobStartPromises.get(id)?.resolve(); + this.jobStartPromises.delete(id); + } + + private getJobIdIncludingAttempts(job: AttachmentDownloadJobType) { + return `${this.getJobId(job)}.${job.attempts}`; + } + + private getJobId(job: AttachmentDownloadJobIdentifiersType): string { + const { messageId, attachmentType, digest } = job; + return `${messageId}.${attachmentType}.${digest}`; + } + + // Static methods + static get instance(): AttachmentDownloadManager { + if (!AttachmentDownloadManager._instance) { + AttachmentDownloadManager._instance = new AttachmentDownloadManager(); + } + return AttachmentDownloadManager._instance; + } + + static async start(): Promise { + log.info('AttachmentDownloadManager/starting'); + await AttachmentDownloadManager.instance.start(); + } + + static async stop(): Promise { + log.info('AttachmentDownloadManager/stopping'); + return AttachmentDownloadManager._instance?.stop(); + } + + static async addJob( + newJob: NewAttachmentDownloadJobType + ): Promise { + return AttachmentDownloadManager.instance.addJob(newJob); + } + + static updateVisibleTimelineMessages(messageIds: Array): void { + AttachmentDownloadManager.instance.updateVisibleTimelineMessages( + messageIds + ); + } +} + +async function runDownloadAttachmentJob( + job: AttachmentDownloadJobType, + isLastAttempt: boolean +): Promise { + const jobIdForLogging = getJobIdForLogging(job); + const logId = `attachment_downloads/runDownloadAttachmentJob/${jobIdForLogging}`; + + const message = await __DEPRECATED$getMessageById(job.messageId); + + if (!message) { + log.error(`${logId} message not found`); + return { status: 'finished' }; + } + + try { + log.info(`${logId}: Starting job`); + await runDownloadAttachmentJobInner(job, message); + return { status: 'finished' }; + } catch (error) { + log.error( + `${logId}: Failed to download attachment, attempt ${job.attempts}:`, + Errors.toLogFormat(error) + ); + + if (error instanceof AttachmentSizeError) { + await addAttachmentToMessage( + message, + _markAttachmentAsTooBig(job.attachment), + { type: job.attachmentType } + ); + return { status: 'finished' }; + } + + if (error instanceof AttachmentNotFoundOnCdnError) { + await addAttachmentToMessage( + message, + _markAttachmentAsPermanentlyErrored(job.attachment), + { type: job.attachmentType } + ); + + return { status: 'finished' }; + } + + if (isLastAttempt) { + await addAttachmentToMessage( + message, + _markAttachmentAsTransientlyErrored(job.attachment), + { type: job.attachmentType } + ); + return { status: 'finished' }; + } + + // Remove `pending` flag from the attachment and retry later + await addAttachmentToMessage( + message, + { + ...job.attachment, + pending: false, + }, + { type: job.attachmentType } + ); + return { status: 'retry' }; + } finally { + // This will fail if the message has been deleted before the download finished, which + // is good + await dataInterface.saveMessage(message.attributes, { + ourAci: window.textsecure.storage.user.getCheckedAci(), + }); + } +} + +async function runDownloadAttachmentJobInner( + job: AttachmentDownloadJobType, + message: MessageModel +): Promise { + const { messageId, attachment, attachmentType: type } = job; + + const jobIdForLogging = getJobIdForLogging(job); + const logId = `attachment_downloads/_runDownloadJobInner(${jobIdForLogging})`; + + if (!job || !attachment || !messageId) { + throw new Error(`${logId}: Key information required for job was missing.`); + } + + log.info(`${logId}: starting`); + + const maxInKib = getMaximumIncomingAttachmentSizeInKb(getValue); + const maxTextAttachmentSizeInKib = + getMaximumIncomingTextAttachmentSizeInKb(getValue); + + const { size } = attachment; + const sizeInKib = size / KIBIBYTE; + + if (!Number.isFinite(size) || size < 0 || sizeInKib > maxInKib) { + throw new AttachmentSizeError( + `${logId}: Attachment was ${sizeInKib}kib, max is ${maxInKib}kib` + ); + } + if (type === 'long-message' && sizeInKib > maxTextAttachmentSizeInKib) { + throw new AttachmentSizeError( + `${logId}: Text attachment was ${sizeInKib}kib, max is ${maxTextAttachmentSizeInKib}kib` + ); + } + + await addAttachmentToMessage( + message, + { ...attachment, pending: true }, + { type } + ); + + const downloaded = await downloadAttachment(attachment); + + const upgradedAttachment = + await window.Signal.Migrations.processNewAttachment(downloaded); + + await addAttachmentToMessage(message, omit(upgradedAttachment, 'error'), { + type, + }); +} + +function _markAttachmentAsTooBig(attachment: AttachmentType): AttachmentType { + return { + ..._markAttachmentAsPermanentlyErrored(attachment), + wasTooBig: true, + }; +} + +function _markAttachmentAsPermanentlyErrored( + attachment: AttachmentType +): AttachmentType { + return { ...omit(attachment, ['key', 'id']), pending: false, error: true }; +} + +function _markAttachmentAsTransientlyErrored( + attachment: AttachmentType +): AttachmentType { + return { ...attachment, pending: false, error: true }; +} + +function getJobIdForLogging(job: AttachmentDownloadJobType): string { + const { sentAt, attachmentType, digest } = job; + const redactedDigest = redactGenericText(digest); + return `${sentAt}.${attachmentType}.${redactedDigest}`; +} diff --git a/ts/messageModifiers/AttachmentDownloads.ts b/ts/messageModifiers/AttachmentDownloads.ts index a8bf656bdde3..53c5ba105725 100644 --- a/ts/messageModifiers/AttachmentDownloads.ts +++ b/ts/messageModifiers/AttachmentDownloads.ts @@ -1,530 +1,23 @@ // Copyright 2019 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only - -import { isNumber, omit } from 'lodash'; -import { v4 as getGuid } from 'uuid'; - -import dataInterface from '../sql/Client'; -import * as durations from '../util/durations'; -import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; -import { strictAssert } from '../util/assert'; -import { downloadAttachment } from '../util/downloadAttachment'; +import * as log from '../logging/log'; import * as Bytes from '../Bytes'; -import type { - AttachmentDownloadJobType, - AttachmentDownloadJobTypeType, -} from '../sql/Interface'; +import type { AttachmentDownloadJobTypeType } from '../types/AttachmentDownload'; -import { getValue } from '../RemoteConfig'; import type { MessageModel } from '../models/messages'; import type { AttachmentType } from '../types/Attachment'; -import { - AttachmentSizeError, - getAttachmentSignature, - isDownloaded, -} from '../types/Attachment'; -import * as Errors from '../types/errors'; -import type { LoggerType } from '../types/Logging'; -import * as log from '../logging/log'; -import { - KIBIBYTE, - getMaximumIncomingAttachmentSizeInKb, - getMaximumIncomingTextAttachmentSizeInKb, -} from '../types/AttachmentSize'; -import { redactCdnKey } from '../util/privacy'; +import { getAttachmentSignature, isDownloaded } from '../types/Attachment'; -const { - getMessageById, - getAttachmentDownloadJobById, - getNextAttachmentDownloadJobs, - removeAttachmentDownloadJob, - resetAttachmentDownloadPending, - saveAttachmentDownloadJob, - saveMessage, - setAttachmentDownloadJobPending, -} = dataInterface; - -const MAX_ATTACHMENT_JOB_PARALLELISM = 3; - -const TICK_INTERVAL = durations.MINUTE; - -const RETRY_BACKOFF: Record = { - 1: 30 * durations.SECOND, - 2: 30 * durations.MINUTE, - 3: 6 * durations.HOUR, -}; - -let enabled = false; -let timeout: NodeJS.Timeout | null; -let logger: LoggerType; -const _activeAttachmentDownloadJobs: Record | undefined> = - {}; - -type StartOptionsType = { - logger: LoggerType; -}; - -export async function start(options: StartOptionsType): Promise { - ({ logger } = options); - if (!logger) { - throw new Error('attachment_downloads/start: logger must be provided!'); - } - - logger.info('attachment_downloads/start: enabling'); - enabled = true; - await resetAttachmentDownloadPending(); - - void _tick(); -} - -export async function stop(): Promise { - // If `.start()` wasn't called - the `logger` is `undefined` - if (logger) { - logger.info('attachment_downloads/stop: disabling'); - } - enabled = false; - clearTimeoutIfNecessary(timeout); - timeout = null; -} - -export async function addJob( - attachment: AttachmentType, - // TODO: DESKTOP-5279 - job: { messageId: string; type: AttachmentDownloadJobTypeType; index: number } -): Promise { - if (!attachment) { - throw new Error('attachments_download/addJob: attachment is required'); - } - - const { messageId, type, index } = job; - if (!messageId) { - throw new Error('attachments_download/addJob: job.messageId is required'); - } - if (!type) { - throw new Error('attachments_download/addJob: job.type is required'); - } - if (!isNumber(index)) { - throw new Error('attachments_download/addJob: index must be a number'); - } - - if (attachment.downloadJobId) { - let existingJob = await getAttachmentDownloadJobById( - attachment.downloadJobId - ); - if (existingJob) { - // Reset job attempts through user's explicit action - existingJob = { ...existingJob, attempts: 0 }; - - if (_activeAttachmentDownloadJobs[existingJob.id]) { - logger.info( - `attachment_downloads/addJob: ${existingJob.id} already running` - ); - } else { - logger.info( - `attachment_downloads/addJob: restarting existing job ${existingJob.id}` - ); - _activeAttachmentDownloadJobs[existingJob.id] = _runJob(existingJob); - } - - return { - ...attachment, - pending: true, - }; - } - } - - const id = getGuid(); - const timestamp = Date.now(); - const toSave: AttachmentDownloadJobType = { - ...job, - id, - attachment, - timestamp, - pending: 0, - attempts: 0, - }; - - await saveAttachmentDownloadJob(toSave); - - void _maybeStartJob(); - - return { - ...attachment, - pending: true, - downloadJobId: id, - }; -} - -async function _tick(): Promise { - clearTimeoutIfNecessary(timeout); - timeout = null; - - void _maybeStartJob(); - timeout = setTimeout(_tick, TICK_INTERVAL); -} - -async function _maybeStartJob(): Promise { - if (!enabled) { - logger.info('attachment_downloads/_maybeStartJob: not enabled, returning'); - return; - } - - const jobCount = getActiveJobCount(); - const limit = MAX_ATTACHMENT_JOB_PARALLELISM - jobCount; - if (limit <= 0) { - logger.info( - 'attachment_downloads/_maybeStartJob: reached active job limit, waiting' - ); - return; - } - - const nextJobs = await getNextAttachmentDownloadJobs(limit); - if (nextJobs.length <= 0) { - logger.info( - 'attachment_downloads/_maybeStartJob: no attachment jobs to run' - ); - return; - } - - // To prevent the race condition caused by two parallel database calls, eached kicked - // off because the jobCount wasn't at the max. - const secondJobCount = getActiveJobCount(); - const needed = MAX_ATTACHMENT_JOB_PARALLELISM - secondJobCount; - if (needed <= 0) { - logger.info( - 'attachment_downloads/_maybeStartJob: reached active job limit after ' + - 'db query, waiting' - ); - return; - } - - const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length)); - - logger.info( - `attachment_downloads/_maybeStartJob: starting ${jobs.length} jobs` - ); - - for (let i = 0, max = jobs.length; i < max; i += 1) { - const job = jobs[i]; - const existing = _activeAttachmentDownloadJobs[job.id]; - if (existing) { - logger.warn( - `attachment_downloads/_maybeStartJob: Job ${job.id} is already running` - ); - } else { - logger.info( - `attachment_downloads/_maybeStartJob: Starting job ${job.id}` - ); - const promise = _runJob(job); - _activeAttachmentDownloadJobs[job.id] = promise; - - const postProcess = async () => { - const logId = `attachment_downloads/_maybeStartJob/postProcess/${job.id}`; - try { - await promise; - if (_activeAttachmentDownloadJobs[job.id]) { - throw new Error( - `${logId}: Active attachments jobs list still has this job!` - ); - } - } catch (error: unknown) { - log.error( - `${logId}: Download job threw an error, deleting.`, - Errors.toLogFormat(error) - ); - - delete _activeAttachmentDownloadJobs[job.id]; - try { - await _markAttachmentAsFailed(job); - } catch (deleteError) { - log.error( - `${logId}: Failed to delete attachment job`, - Errors.toLogFormat(deleteError) - ); - } finally { - void _maybeStartJob(); - } - } - }; - - // Note: intentionally not awaiting - void postProcess(); - } - } -} - -async function _runJob(job?: AttachmentDownloadJobType): Promise { - if (!job) { - log.warn('attachment_downloads/_runJob: Job was missing!'); - return; - } - - const { id, messageId, attachment, type, index, attempts } = job; - let message; - - try { - if (!job || !attachment || !messageId) { - throw new Error( - `_runJob: Key information required for job was missing. Job id: ${id}` - ); - } - - const pending = true; - await setAttachmentDownloadJobPending(id, pending); - - message = await _getMessageById(id, messageId); - logger.info( - 'attachment_downloads/_runJob' + - `(jobId: ${id}, type: ${type}, index: ${index},` + - ` cdnKey: ${ - attachment.cdnKey ? redactCdnKey(attachment.cdnKey) : null - },` + - ` messageTimestamp: ${message?.attributes.timestamp}): starting` - ); - - if (!message) { - return; - } - - let downloaded: AttachmentType | null = null; - - try { - const maxInKib = getMaximumIncomingAttachmentSizeInKb(getValue); - const maxTextAttachmentSizeInKib = - getMaximumIncomingTextAttachmentSizeInKb(getValue); - - const { size } = attachment; - const sizeInKib = size / KIBIBYTE; - - if (!Number.isFinite(size) || size < 0 || sizeInKib > maxInKib) { - throw new AttachmentSizeError( - `Attachment Job ${id}: Attachment was ${sizeInKib}kib, max is ${maxInKib}kib` - ); - } - if (type === 'long-message' && sizeInKib > maxTextAttachmentSizeInKib) { - throw new AttachmentSizeError( - `Attachment Job ${id}: Text attachment was ${sizeInKib}kib, max is ${maxTextAttachmentSizeInKib}kib` - ); - } - - await _addAttachmentToMessage( - message, - { ...attachment, pending: true }, - { type, index } - ); - - // If the download is bigger than expected, we'll stop in the middle - downloaded = await downloadAttachment(attachment); - } catch (error) { - if (error instanceof AttachmentSizeError) { - log.error(Errors.toLogFormat(error)); - await _addAttachmentToMessage( - message, - _markAttachmentAsTooBig(attachment), - { type, index } - ); - await _finishJob(message, id); - return; - } - - throw error; - } - - if (!downloaded) { - logger.warn( - `attachment_downloads/_runJob(${id}): Got 404 from server for CDN ${ - attachment.cdnNumber - }, marking attachment ${ - attachment.cdnId || attachment.cdnKey - } from message ${message.idForLogging()} as permanent error` - ); - - await _addAttachmentToMessage( - message, - _markAttachmentAsPermanentError(attachment), - { type, index } - ); - await _finishJob(message, id); - return; - } - - logger.info( - `attachment_downloads/_runJob(${id}): processing new attachment` + - ` of type: ${type}` - ); - const upgradedAttachment = - await window.Signal.Migrations.processNewAttachment(downloaded); - - await _addAttachmentToMessage(message, omit(upgradedAttachment, 'error'), { - type, - index, - }); - - await _finishJob(message, id); - } catch (error) { - const logId = message ? message.idForLogging() : id || ''; - const currentAttempt = (attempts || 0) + 1; - - if (currentAttempt >= 3) { - logger.error( - `attachment_downloads/runJob(${id}): ${currentAttempt} failed ` + - `attempts, marking attachment from message ${logId} as ` + - 'error:', - Errors.toLogFormat(error) - ); - - try { - await _addAttachmentToMessage( - message, - _markAttachmentAsTransientError(attachment), - { type, index } - ); - } finally { - await _finishJob(message, id); - } - - return; - } - - logger.error( - `attachment_downloads/_runJob(${id}): Failed to download attachment ` + - `type ${type} for message ${logId}, attempt ${currentAttempt}:`, - Errors.toLogFormat(error) - ); - - try { - // Remove `pending` flag from the attachment. - await _addAttachmentToMessage( - message, - { - ...attachment, - downloadJobId: id, - }, - { type, index } - ); - if (message) { - await saveMessage(message.attributes, { - ourAci: window.textsecure.storage.user.getCheckedAci(), - }); - } - - const failedJob = { - ...job, - pending: 0, - attempts: currentAttempt, - timestamp: - Date.now() + (RETRY_BACKOFF[currentAttempt] || RETRY_BACKOFF[3]), - }; - - await saveAttachmentDownloadJob(failedJob); - } finally { - delete _activeAttachmentDownloadJobs[id]; - void _maybeStartJob(); - } - } -} - -async function _markAttachmentAsFailed( - job: AttachmentDownloadJobType -): Promise { - const { id, messageId, attachment, type, index } = job; - const message = await _getMessageById(id, messageId); - - try { - if (!message) { - return; - } - await _addAttachmentToMessage( - message, - _markAttachmentAsPermanentError(attachment), - { type, index } - ); - } finally { - await _finishJob(message, id); - } -} - -async function _getMessageById( - id: string, - messageId: string -): Promise { - const message = window.MessageCache.__DEPRECATED$getById(messageId); - - if (message) { - return message; - } - - const messageAttributes = await getMessageById(messageId); - if (!messageAttributes) { - logger.error( - `attachment_downloads/_runJob(${id}): ` + - 'Source message not found, deleting job' - ); - await _finishJob(null, id); - return; - } - - strictAssert(messageId === messageAttributes.id, 'message id mismatch'); - return window.MessageCache.__DEPRECATED$register( - messageId, - messageAttributes, - 'AttachmentDownloads._getMessageById' - ); -} - -async function _finishJob( - message: MessageModel | null | undefined, - id: string -): Promise { - if (message) { - logger.info(`attachment_downloads/_finishJob for job id: ${id}`); - await saveMessage(message.attributes, { - ourAci: window.textsecure.storage.user.getCheckedAci(), - }); - } - - await removeAttachmentDownloadJob(id); - delete _activeAttachmentDownloadJobs[id]; - void _maybeStartJob(); -} - -function getActiveJobCount(): number { - return Object.keys(_activeAttachmentDownloadJobs).length; -} - -function _markAttachmentAsPermanentError( - attachment: AttachmentType -): AttachmentType { - return { - ...omit(attachment, ['key', 'id']), - error: true, - }; -} - -function _markAttachmentAsTooBig(attachment: AttachmentType): AttachmentType { - return { - ...omit(attachment, ['key', 'id']), - error: true, - wasTooBig: true, - }; -} - -function _markAttachmentAsTransientError( - attachment: AttachmentType -): AttachmentType { - return { ...attachment, error: true }; -} - -async function _addAttachmentToMessage( +export async function addAttachmentToMessage( message: MessageModel | null | undefined, attachment: AttachmentType, - { type, index }: { type: AttachmentDownloadJobTypeType; index: number } + { type }: { type: AttachmentDownloadJobTypeType } ): Promise { if (!message) { return; } - const logPrefix = `${message.idForLogging()} (type: ${type}, index: ${index})`; + const logPrefix = `${message.idForLogging()} (type: ${type})`; const attachmentSignature = getAttachmentSignature(attachment); if (type === 'long-message') { @@ -608,7 +101,7 @@ async function _addAttachmentToMessage( await window.Signal.Migrations.deleteAttachmentData(attachment.path); } if (!handledAnywhere) { - logger.warn( + log.warn( `${logPrefix}: Long message attachment found no matching place to apply` ); } @@ -670,7 +163,7 @@ async function _addAttachmentToMessage( } if (!handledAnywhere) { - logger.warn( + log.warn( `${logPrefix}: 'attachment' type found no matching place to apply` ); } @@ -727,33 +220,37 @@ async function _addAttachmentToMessage( } if (type === 'contact') { - const contact = message.get('contact'); - if (!contact || contact.length <= index) { + const contacts = message.get('contact'); + if (!contacts?.length) { + throw new Error(`${logPrefix}: no contacts, cannot add attachment!`); + } + let handled = false; + + const newContacts = contacts.map(contact => { + if (!contact.avatar?.avatar) { + return contact; + } + + const existingAttachment = contact.avatar.avatar; + + const newAttachment = maybeReplaceAttachment(existingAttachment); + if (existingAttachment !== newAttachment) { + handled = true; + return { + ...contact, + avatar: { ...contact.avatar, avatar: newAttachment }, + }; + } + return contact; + }); + + if (!handled) { throw new Error( - `${logPrefix}: contact didn't exist or ${index} was too large` - ); - } - - const item = contact[index]; - if (item && item.avatar && item.avatar.avatar) { - _checkOldAttachment(item.avatar, 'avatar', logPrefix); - - const newContact = [...contact]; - newContact[index] = { - ...item, - avatar: { - ...item.avatar, - avatar: attachment, - }, - }; - - message.set({ contact: newContact }); - } else { - logger.warn( - `${logPrefix}: Couldn't update contact with avatar attachment for message` + `${logPrefix}: Couldn't find matching contact with avatar attachment for message` ); } + message.set({ contact: newContacts }); return; } @@ -831,20 +328,3 @@ async function _addAttachmentToMessage( throw new Error(`${logPrefix}: Unknown job type ${type}`); } - -function _checkOldAttachment( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - object: any, - key: string, - logPrefix: string -): void { - const oldAttachment = object[key]; - if (oldAttachment && oldAttachment.path) { - logger.error( - `_checkOldAttachment: ${logPrefix} - old attachment already had path, not replacing` - ); - throw new Error( - '_checkOldAttachment: old attachment already had path, not replacing' - ); - } -} diff --git a/ts/messageModifiers/ViewSyncs.ts b/ts/messageModifiers/ViewSyncs.ts index 35c9bf1df2c2..d3216c1cdd08 100644 --- a/ts/messageModifiers/ViewSyncs.ts +++ b/ts/messageModifiers/ViewSyncs.ts @@ -16,6 +16,7 @@ import { notificationService } from '../services/notifications'; import { queueAttachmentDownloads } from '../util/queueAttachmentDownloads'; import { queueUpdateMessage } from '../util/messageBatcher'; import { generateCacheKey } from './generateCacheKey'; +import { AttachmentDownloadUrgency } from '../jobs/AttachmentDownloadManager'; export type ViewSyncAttributesType = { envelopeId: string; @@ -127,7 +128,8 @@ export async function onSync(sync: ViewSyncAttributesType): Promise { const attachments = message.get('attachments'); if (!attachments?.every(isDownloaded)) { const updatedFields = await queueAttachmentDownloads( - message.attributes + message.attributes, + AttachmentDownloadUrgency.STANDARD ); if (updatedFields) { message.set(updatedFields); diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 75ff28cee318..ea56e038569b 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -157,6 +157,7 @@ import { getChangesForPropAtTimestamp, } from '../util/editHelpers'; import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp'; +import type { AttachmentDownloadUrgency } from '../jobs/AttachmentDownloadManager'; /* eslint-disable more/no-then */ @@ -1368,8 +1369,10 @@ export class MessageModel extends window.Backbone.Model { return hasAttachmentDownloads(this.attributes); } - async queueAttachmentDownloads(): Promise { - const value = await queueAttachmentDownloads(this.attributes); + async queueAttachmentDownloads( + urgency?: AttachmentDownloadUrgency + ): Promise { + const value = await queueAttachmentDownloads(this.attributes, urgency); if (!value) { return false; } @@ -2279,8 +2282,6 @@ export class MessageModel extends window.Backbone.Model { window.Signal.Data.updateConversation(conversation.attributes); - const reduxState = window.reduxStore.getState(); - const giftBadge = message.get('giftBadge'); if (giftBadge) { const { level } = giftBadge; @@ -2315,35 +2316,6 @@ export class MessageModel extends window.Backbone.Model { } } - // Only queue attachments for downloads if this is a story or - // outgoing message or we've accepted the conversation - const attachments = this.get('attachments') || []; - - let queueStoryForDownload = false; - if (isStory(message.attributes)) { - queueStoryForDownload = await shouldDownloadStory( - conversation.attributes - ); - } - - const shouldHoldOffDownload = - (isStory(message.attributes) && !queueStoryForDownload) || - (!isStory(message.attributes) && - (isImage(attachments) || isVideo(attachments)) && - isInCall(reduxState)); - - if ( - this.hasAttachmentDownloads() && - (conversation.getAccepted() || isOutgoing(message.attributes)) && - !shouldHoldOffDownload - ) { - if (shouldUseAttachmentDownloadQueue()) { - addToAttachmentDownloadQueue(idLog, message); - } else { - await message.queueAttachmentDownloads(); - } - } - const isFirstRun = true; await this.modifyTargetMessage(conversation, isFirstRun); @@ -2365,6 +2337,9 @@ export class MessageModel extends window.Backbone.Model { log.info('Message saved', this.get('sent_at')); + // Once the message is saved to DB, we queue attachment downloads + await this.handleAttachmentDownloadsForNewMessage(conversation); + conversation.trigger('newmessage', this); const isFirstRun = false; @@ -2389,6 +2364,38 @@ export class MessageModel extends window.Backbone.Model { } } + private async handleAttachmentDownloadsForNewMessage( + conversation: ConversationModel + ) { + const idLog = `handleAttachmentDownloadsForNewMessage/${conversation.idForLogging()} ${this.idForLogging()}`; + + // Only queue attachments for downloads if this is a story (with additional logic), or + // if it's either an outgoing message or we've accepted the conversation + let shouldDownloadNow = false; + const attachments = this.get('attachments') || []; + const reduxState = window.reduxStore.getState(); + + if (isStory(this.attributes)) { + shouldDownloadNow = await shouldDownloadStory(conversation.attributes); + } else { + const isVisualMediaAndUserInCall = + isInCall(reduxState) && (isImage(attachments) || isVideo(attachments)); + + shouldDownloadNow = + this.hasAttachmentDownloads() && + (conversation.getAccepted() || isOutgoing(this.attributes)) && + !isVisualMediaAndUserInCall; + } + + if (shouldDownloadNow) { + if (shouldUseAttachmentDownloadQueue()) { + addToAttachmentDownloadQueue(idLog, this); + } else { + await this.queueAttachmentDownloads(); + } + } + } + // This function is called twice - once from handleDataMessage, and then again from // saveAndNotify, a function called at the end of handleDataMessage as a cleanup for // any missed out-of-order events. diff --git a/ts/services/backups/import.ts b/ts/services/backups/import.ts index c8747e1bb0af..499a3150f418 100644 --- a/ts/services/backups/import.ts +++ b/ts/services/backups/import.ts @@ -96,6 +96,7 @@ export class BackupImportStream extends Writable { forceSave: true, ourAci, }); + // TODO (DESKTOP-6845): after we save messages, queue their attachment downloads }, }); private ourConversation?: ConversationAttributesType; @@ -626,6 +627,7 @@ export class BackupImportStream extends Writable { ): Partial { return { body: data.text?.body ?? '', + // TODO (DESKTOP-6845): add attachments reactions: data.reactions?.map( ({ emoji, authorId, sentTimestamp, receivedTimestamp }) => { strictAssert(emoji != null, 'reaction must have an emoji'); diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index 02e058e202ab..b9085d4ae68b 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -35,7 +35,6 @@ import { ipcInvoke, doShutdown } from './channels'; import type { AdjacentMessagesByConversationOptionsType, AllItemsType, - AttachmentDownloadJobType, ClientInterface, ClientExclusiveInterface, ClientSearchResultMessageType, @@ -66,6 +65,7 @@ import { getMessageIdForLogging } from '../util/idForLogging'; import type { MessageAttributesType } from '../model-types'; import { incrementMessageCounter } from '../util/incrementMessageCounter'; import { generateSnippetAroundMention } from '../util/search'; +import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; const ERASE_SQL_KEY = 'erase-sql-key'; const ERASE_ATTACHMENTS_KEY = 'erase-attachments'; diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 8df6e17cae1c..0023da5f75f7 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -10,7 +10,6 @@ import type { StoredJob } from '../jobs/types'; import type { ReactionType, ReactionReadStatus } from '../types/Reactions'; import type { ConversationColorType, CustomColorType } from '../types/Colors'; import type { StorageAccessType } from '../types/Storage.d'; -import type { AttachmentType } from '../types/Attachment'; import type { BytesToStrings } from '../types/Util'; import type { QualifiedAddressStringType } from '../types/QualifiedAddress'; import type { StoryDistributionIdString } from '../types/StoryDistributionId'; @@ -31,6 +30,7 @@ import type { CallHistoryPagination, } from '../types/CallDisposition'; import type { CallLinkType, CallLinkRestrictions } from '../types/CallLink'; +import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; export type AdjacentMessagesByConversationOptionsType = Readonly<{ conversationId: string; @@ -51,24 +51,6 @@ export type GetNearbyMessageFromDeletedSetOptionsType = Readonly<{ includeStoryReplies: boolean; }>; -export type AttachmentDownloadJobTypeType = - | 'long-message' - | 'attachment' - | 'preview' - | 'contact' - | 'quote' - | 'sticker'; - -export type AttachmentDownloadJobType = { - attachment: AttachmentType; - attempts: number; - id: string; - index: number; - messageId: string; - pending: number; - timestamp: number; - type: AttachmentDownloadJobTypeType; -}; export type MessageMetricsType = { id: string; received_at: number; @@ -741,21 +723,22 @@ export type DataInterface = { /** only for testing */ removeAllUnprocessed: () => Promise; - getAttachmentDownloadJobById: ( - id: string - ) => Promise; - getNextAttachmentDownloadJobs: ( - limit?: number, - options?: { timestamp?: number } - ) => Promise>; + getAttachmentDownloadJob( + job: Pick< + AttachmentDownloadJobType, + 'messageId' | 'attachmentType' | 'digest' + > + ): AttachmentDownloadJobType; + getNextAttachmentDownloadJobs: (options: { + limit: number; + prioritizeMessageIds?: Array; + timestamp?: number; + }) => Promise>; saveAttachmentDownloadJob: (job: AttachmentDownloadJobType) => Promise; - resetAttachmentDownloadPending: () => Promise; - setAttachmentDownloadJobPending: ( - id: string, - pending: boolean + resetAttachmentDownloadActive: () => Promise; + removeAttachmentDownloadJob: ( + job: AttachmentDownloadJobType ) => Promise; - removeAttachmentDownloadJob: (id: string) => Promise; - removeAllAttachmentDownloadJobs: () => Promise; createOrUpdateStickerPack: (pack: StickerPackType) => Promise; updateStickerPackStatus: ( diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 050db60d5308..fc2e612e41d9 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -88,7 +88,6 @@ import { updateSchema } from './migrations'; import type { AdjacentMessagesByConversationOptionsType, StoredAllItemsType, - AttachmentDownloadJobType, ConversationMetricsType, ConversationType, DeleteSentProtoRecipientOptionsType, @@ -173,6 +172,10 @@ import { updateCallLinkState, } from './server/callLinks'; import { CallMode } from '../types/Calling'; +import { + attachmentDownloadJobSchema, + type AttachmentDownloadJobType, +} from '../types/AttachmentDownload'; type ConversationRow = Readonly<{ json: string; @@ -353,13 +356,11 @@ const dataInterface: ServerInterface = { removeUnprocessed, removeAllUnprocessed, - getAttachmentDownloadJobById, + getAttachmentDownloadJob, getNextAttachmentDownloadJobs, saveAttachmentDownloadJob, - resetAttachmentDownloadPending, - setAttachmentDownloadJobPending, + resetAttachmentDownloadActive, removeAttachmentDownloadJob, - removeAllAttachmentDownloadJobs, createOrUpdateStickerPack, updateStickerPackStatus, @@ -4403,127 +4404,184 @@ async function removeAllUnprocessed(): Promise { // Attachment Downloads -const ATTACHMENT_DOWNLOADS_TABLE = 'attachment_downloads'; -async function getAttachmentDownloadJobById( - id: string -): Promise { - return getById(getReadonlyInstance(), ATTACHMENT_DOWNLOADS_TABLE, id); +function getAttachmentDownloadJob( + job: Pick< + AttachmentDownloadJobType, + 'messageId' | 'attachmentType' | 'digest' + > +): AttachmentDownloadJobType { + const db = getReadonlyInstance(); + const [query, params] = sql` + SELECT * FROM attachment_downloads + WHERE + messageId = ${job.messageId} + AND + attachmentType = ${job.attachmentType} + AND + digest = ${job.digest}; + `; + + return db.prepare(query).get(params); } -async function getNextAttachmentDownloadJobs( - limit?: number, - options: { timestamp?: number } = {} -): Promise> { + +async function getNextAttachmentDownloadJobs({ + limit = 3, + prioritizeMessageIds, + timestamp = Date.now(), + maxLastAttemptForPrioritizedMessages, +}: { + limit: number; + prioritizeMessageIds?: Array; + timestamp?: number; + maxLastAttemptForPrioritizedMessages?: number; +}): Promise> { const db = await getWritableInstance(); - const timestamp = - options && options.timestamp ? options.timestamp : Date.now(); - const rows: Array<{ json: string; id: string }> = db - .prepare( - ` - SELECT id, json - FROM attachment_downloads - WHERE pending = 0 AND timestamp <= $timestamp - ORDER BY timestamp DESC - LIMIT $limit; - ` - ) - .all({ - limit: limit || 3, - timestamp, - }); + let priorityJobs = []; - const INNER_ERROR = 'jsonToObject error'; + // First, try to get jobs for prioritized messages (e.g. those currently user-visible) + if (prioritizeMessageIds?.length) { + const [priorityQuery, priorityParams] = sql` + SELECT * FROM attachment_downloads + -- very few rows will match messageIds, so in this case we want to optimize + -- the WHERE clause rather than the ORDER BY + INDEXED BY attachment_downloads_active_messageId + WHERE + active = 0 + AND + -- for priority messages, we want to retry based on the last attempt, rather than retryAfter + (lastAttemptTimestamp is NULL OR lastAttemptTimestamp <= ${ + maxLastAttemptForPrioritizedMessages ?? timestamp - durations.HOUR + }) + AND + messageId IN (${sqlJoin(prioritizeMessageIds)}) + -- for priority messages, let's load them oldest first; this helps, e.g. for stories where we + -- want the oldest one first + ORDER BY receivedAt ASC + LIMIT ${limit} + `; + priorityJobs = db.prepare(priorityQuery).all(priorityParams); + } + + // Next, get any other jobs, sorted by receivedAt + const numJobsRemaining = limit - priorityJobs.length; + let standardJobs = []; + if (numJobsRemaining > 0) { + const [query, params] = sql` + SELECT * FROM attachment_downloads + WHERE + active = 0 + AND + (retryAfter is NULL OR retryAfter <= ${timestamp}) + ORDER BY receivedAt DESC + LIMIT ${numJobsRemaining} + `; + + standardJobs = db.prepare(query).all(params); + } + + const allJobs = priorityJobs.concat(standardJobs); + const INNER_ERROR = 'jsonToObject or SchemaParse error'; try { - return rows.map(row => { + return allJobs.map(row => { try { - return jsonToObject(row.json); + return attachmentDownloadJobSchema.parse({ + ...row, + active: Boolean(row.active), + attachment: jsonToObject(row.attachmentJson), + }); } catch (error) { logger.error( - `getNextAttachmentDownloadJobs: Error with job '${row.id}', deleting. ` + - `JSON: '${row.json}' ` + - `Error: ${Errors.toLogFormat(error)}` + `getNextAttachmentDownloadJobs: Error with job for message ${row.messageId}, deleting.` ); - removeAttachmentDownloadJobSync(db, row.id); - throw new Error(INNER_ERROR); + + removeAttachmentDownloadJobSync(db, row); + throw new Error(error); } }); } catch (error) { if ('message' in error && error.message === INNER_ERROR) { - return getNextAttachmentDownloadJobs(limit, { timestamp }); + return getNextAttachmentDownloadJobs({ + limit, + prioritizeMessageIds, + timestamp, + maxLastAttemptForPrioritizedMessages, + }); } throw error; } } + async function saveAttachmentDownloadJob( job: AttachmentDownloadJobType ): Promise { const db = await getWritableInstance(); - const { id, pending, timestamp } = job; - if (!id) { - throw new Error( - 'saveAttachmentDownloadJob: Provided job did not have a truthy id' - ); - } - db.prepare( - ` + const [query, params] = sql` INSERT OR REPLACE INTO attachment_downloads ( - id, - pending, - timestamp, - json - ) values ( - $id, - $pending, - $timestamp, - $json - ) - ` - ).run({ - id, - pending, - timestamp, - json: objectToJSON(job), - }); + messageId, + attachmentType, + digest, + receivedAt, + sentAt, + contentType, + size, + active, + attempts, + retryAfter, + lastAttemptTimestamp, + attachmentJson + ) VALUES ( + ${job.messageId}, + ${job.attachmentType}, + ${job.digest}, + ${job.receivedAt}, + ${job.sentAt}, + ${job.contentType}, + ${job.size}, + ${job.active ? 1 : 0}, + ${job.attempts}, + ${job.retryAfter}, + ${job.lastAttemptTimestamp}, + ${objectToJSON(job.attachment)} + ); + `; + db.prepare(query).run(params); } -async function setAttachmentDownloadJobPending( - id: string, - pending: boolean -): Promise { - const db = await getWritableInstance(); - db.prepare( - ` - UPDATE attachment_downloads - SET pending = $pending - WHERE id = $id; - ` - ).run({ - id, - pending: pending ? 1 : 0, - }); -} -async function resetAttachmentDownloadPending(): Promise { + +async function resetAttachmentDownloadActive(): Promise { const db = await getWritableInstance(); db.prepare( ` UPDATE attachment_downloads - SET pending = 0 - WHERE pending != 0; + SET active = 0 + WHERE active != 0; ` ).run(); } -function removeAttachmentDownloadJobSync(db: Database, id: string): number { - return removeById(db, ATTACHMENT_DOWNLOADS_TABLE, id); + +function removeAttachmentDownloadJobSync( + db: Database, + job: AttachmentDownloadJobType +): void { + const [query, params] = sql` + DELETE FROM attachment_downloads + WHERE + messageId = ${job.messageId} + AND + attachmentType = ${job.attachmentType} + AND + digest = ${job.digest}; + `; + + db.prepare(query).run(params); } -async function removeAttachmentDownloadJob(id: string): Promise { + +async function removeAttachmentDownloadJob( + job: AttachmentDownloadJobType +): Promise { const db = await getWritableInstance(); - return removeAttachmentDownloadJobSync(db, id); -} -async function removeAllAttachmentDownloadJobs(): Promise { - return removeAllFromTable( - await getWritableInstance(), - ATTACHMENT_DOWNLOADS_TABLE - ); + return removeAttachmentDownloadJobSync(db, job); } // Stickers diff --git a/ts/sql/migrations/1040-undownloaded-backed-up-media.ts b/ts/sql/migrations/1040-undownloaded-backed-up-media.ts new file mode 100644 index 000000000000..77687e3cd0a4 --- /dev/null +++ b/ts/sql/migrations/1040-undownloaded-backed-up-media.ts @@ -0,0 +1,208 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Database } from '@signalapp/better-sqlite3'; + +import type { LoggerType } from '../../types/Logging'; +import { + attachmentDownloadJobSchema, + type AttachmentDownloadJobType, + type AttachmentDownloadJobTypeType, +} from '../../types/AttachmentDownload'; +import type { AttachmentType } from '../../types/Attachment'; +import { jsonToObject, objectToJSON, sql } from '../util'; + +export const version = 1040; + +export type LegacyAttachmentDownloadJobType = { + attachment: AttachmentType; + attempts: number; + id: string; + index: number; + messageId: string; + pending: number; + timestamp: number; + type: AttachmentDownloadJobTypeType; +}; + +export function updateToSchemaVersion1040( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 1040) { + return; + } + + db.transaction(() => { + // 1. Load all existing rows into memory (shouldn't be many) + const existingJobs: Array<{ + id: string | null; + timestamp: number | null; + pending: number | null; + json: string | null; + }> = db + .prepare( + ` + SELECT id, timestamp, pending, json from attachment_downloads + ` + ) + .all(); + logger.info( + `updateToSchemaVersion1040: loaded ${existingJobs.length} existing jobs` + ); + + // 2. Create new temp table, with a couple new columns and stricter typing + db.exec(` + CREATE TABLE tmp_attachment_downloads ( + messageId TEXT NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + attachmentType TEXT NOT NULL, + digest TEXT NOT NULL, + receivedAt INTEGER NOT NULL, + sentAt INTEGER NOT NULL, + contentType TEXT NOT NULL, + size INTEGER NOT NULL, + attachmentJson TEXT NOT NULL, + active INTEGER NOT NULL, + attempts INTEGER NOT NULL, + retryAfter INTEGER, + lastAttemptTimestamp INTEGER, + + PRIMARY KEY (messageId, attachmentType, digest) + ) STRICT; + `); + + // 3. Drop existing table + db.exec('DROP TABLE attachment_downloads;'); + + // 4. Rename temp table + db.exec( + 'ALTER TABLE tmp_attachment_downloads RENAME TO attachment_downloads;' + ); + + // 5. Add new index on active & receivedAt. For most queries when there are lots of + // jobs (like during backup restore), many jobs will match the the WHERE clause, so + // the ORDER BY on receivedAt is probably the most expensive part. + db.exec(` + CREATE INDEX attachment_downloads_active_receivedAt + ON attachment_downloads ( + active, receivedAt + ); + `); + + // 6. Add new index on active & messageId. In order to prioritize visible messages, + // we'll also query for rows with a matching messageId. For these, the messageId + // matching is likely going to be the most expensive part. + db.exec(` + CREATE INDEX attachment_downloads_active_messageId + ON attachment_downloads ( + active, messageId + ); + `); + + // 7. Add new index just on messageId, for the ON DELETE CASCADE foreign key + // constraint + db.exec(` + CREATE INDEX attachment_downloads_messageId + ON attachment_downloads ( + messageId + ); + `); + + // 8. Rewrite old rows to match new schema + const rowsToTransfer: Array = []; + + for (const existingJob of existingJobs) { + try { + // Type this as partial in case there is missing data + const existingJobData: Partial = + jsonToObject(existingJob.json ?? ''); + + const updatedJob: Partial = { + messageId: existingJobData.messageId, + attachmentType: existingJobData.type, + attachment: existingJobData.attachment, + // The existing timestamp column works reasonably well in place of + // actually retrieving the message's receivedAt + receivedAt: existingJobData.timestamp ?? Date.now(), + sentAt: existingJobData.timestamp ?? Date.now(), + digest: existingJobData.attachment?.digest, + contentType: existingJobData.attachment?.contentType, + size: existingJobData.attachment?.size, + active: false, // all jobs are inactive on app start + attempts: existingJobData.attempts ?? 0, + retryAfter: null, + lastAttemptTimestamp: null, + }; + + const parsed = attachmentDownloadJobSchema.parse(updatedJob); + + rowsToTransfer.push(parsed as AttachmentDownloadJobType); + } catch { + logger.warn( + `updateToSchemaVersion1040: unable to transfer job ${existingJob.id} to new table; invalid data` + ); + } + } + + let numTransferred = 0; + if (rowsToTransfer.length) { + logger.info( + `updateToSchemaVersion1040: transferring ${rowsToTransfer.length} rows` + ); + for (const row of rowsToTransfer) { + const [insertQuery, insertParams] = sql` + INSERT INTO attachment_downloads + ( + messageId, + attachmentType, + receivedAt, + sentAt, + digest, + contentType, + size, + attachmentJson, + active, + attempts, + retryAfter, + lastAttemptTimestamp + ) + VALUES + ( + ${row.messageId}, + ${row.attachmentType}, + ${row.receivedAt}, + ${row.sentAt}, + ${row.digest}, + ${row.contentType}, + ${row.size}, + ${objectToJSON(row.attachment)}, + ${row.active ? 1 : 0}, + ${row.attempts}, + ${row.retryAfter}, + ${row.lastAttemptTimestamp} + ); + `; + try { + db.prepare(insertQuery).run(insertParams); + numTransferred += 1; + } catch (error) { + logger.error( + 'updateToSchemaVersion1040: error when transferring row', + error + ); + } + } + } + + logger.info( + `updateToSchemaVersion1040: transferred ${numTransferred} rows, removed ${ + existingJobs.length - numTransferred + }` + ); + })(); + + db.pragma('user_version = 1040'); + + logger.info('updateToSchemaVersion1040: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index e37c5bec503d..06514b14e125 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -78,10 +78,11 @@ import { updateToSchemaVersion990 } from './990-phone-number-sharing'; import { updateToSchemaVersion1000 } from './1000-mark-unread-call-history-messages-as-unseen'; import { updateToSchemaVersion1010 } from './1010-call-links-table'; import { updateToSchemaVersion1020 } from './1020-self-merges'; +import { updateToSchemaVersion1030 } from './1030-unblock-event'; import { + updateToSchemaVersion1040, version as MAX_VERSION, - updateToSchemaVersion1030, -} from './1030-unblock-event'; +} from './1040-undownloaded-backed-up-media'; function updateToSchemaVersion1( currentVersion: number, @@ -2027,6 +2028,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1010, updateToSchemaVersion1020, updateToSchemaVersion1030, + updateToSchemaVersion1040, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/state/ducks/conversations.ts b/ts/state/ducks/conversations.ts index 8a03a221e3a2..47fd0a56b335 100644 --- a/ts/state/ducks/conversations.ts +++ b/ts/state/ducks/conversations.ts @@ -183,6 +183,7 @@ import { getAddedByForOurPendingInvitation } from '../../util/getAddedByForOurPe import { getConversationIdForLogging } from '../../util/idForLogging'; import { singleProtoJobQueue } from '../../jobs/singleProtoJobQueue'; import MessageSender from '../../textsecure/SendMessage'; +import { AttachmentDownloadUrgency } from '../../jobs/AttachmentDownloadManager'; // State @@ -2174,7 +2175,9 @@ function kickOffAttachmentDownload( `kickOffAttachmentDownload: Message ${options.messageId} missing!` ); } - const didUpdateValues = await message.queueAttachmentDownloads(); + const didUpdateValues = await message.queueAttachmentDownloads( + AttachmentDownloadUrgency.IMMEDIATE + ); if (didUpdateValues) { drop( diff --git a/ts/state/ducks/stories.ts b/ts/state/ducks/stories.ts index ecbb9b45074d..f36371051a67 100644 --- a/ts/state/ducks/stories.ts +++ b/ts/state/ducks/stories.ts @@ -512,10 +512,9 @@ function queueStoryDownload( return; } - // isDownloading checks for the downloadJobId which is set by - // queueAttachmentDownloads but we optimistically set story.startedDownload - // in redux to prevent race conditions from queuing up multiple attachment - // downloads before the attachment save takes place. + // isDownloading checks if the download is pending but we optimistically set + // story.startedDownload in redux to prevent race conditions from queuing up multiple + // attachment downloads before the attachment save takes place. if (isDownloading(attachment) || story.startedDownload) { return; } diff --git a/ts/state/smart/Timeline.tsx b/ts/state/smart/Timeline.tsx index 4f0621d2ad0f..b30403a18e54 100644 --- a/ts/state/smart/Timeline.tsx +++ b/ts/state/smart/Timeline.tsx @@ -42,6 +42,7 @@ import { SmartHeroRow } from './HeroRow'; import { SmartMiniPlayer } from './MiniPlayer'; import { SmartTimelineItem, type SmartTimelineItemProps } from './TimelineItem'; import { SmartTypingBubble } from './TypingBubble'; +import { AttachmentDownloadManager } from '../../jobs/AttachmentDownloadManager'; type ExternalProps = { id: string; @@ -266,6 +267,9 @@ export const SmartTimeline = memo(function SmartTimeline({ markMessageRead={markMessageRead} messageChangeCounter={messageChangeCounter} messageLoadingState={messageLoadingState} + updateVisibleMessages={ + AttachmentDownloadManager.updateVisibleTimelineMessages + } oldestUnseenIndex={oldestUnseenIndex} peekGroupCallForTheFirstTime={peekGroupCallForTheFirstTime} peekGroupCallIfItHasMembers={peekGroupCallIfItHasMembers} diff --git a/ts/test-both/util/exponentialBackoff_test.ts b/ts/test-both/util/exponentialBackoff_test.ts index 674605262e2e..778df2567991 100644 --- a/ts/test-both/util/exponentialBackoff_test.ts +++ b/ts/test-both/util/exponentialBackoff_test.ts @@ -25,6 +25,21 @@ describe('exponential backoff utilities', () => { assert.strictEqual(exponentialBackoffSleepTime(attempt), maximum); } }); + + it('respects custom variables', () => { + const options = { + maxBackoffTime: 10000, + multiplier: 2, + firstBackoffTime: 1000, + }; + assert.strictEqual(exponentialBackoffSleepTime(1, options), 0); + assert.strictEqual(exponentialBackoffSleepTime(2, options), 1000); + assert.strictEqual(exponentialBackoffSleepTime(3, options), 2000); + assert.strictEqual(exponentialBackoffSleepTime(4, options), 4000); + assert.strictEqual(exponentialBackoffSleepTime(5, options), 8000); + assert.strictEqual(exponentialBackoffSleepTime(6, options), 10000); + assert.strictEqual(exponentialBackoffSleepTime(7, options), 10000); + }); }); describe('exponentialBackoffMaxAttempts', () => { diff --git a/ts/test-electron/services/AttachmentDownloadManager_test.ts b/ts/test-electron/services/AttachmentDownloadManager_test.ts new file mode 100644 index 000000000000..bd3ba57af59e --- /dev/null +++ b/ts/test-electron/services/AttachmentDownloadManager_test.ts @@ -0,0 +1,367 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable more/no-then */ +/* eslint-disable @typescript-eslint/no-floating-promises */ +import * as sinon from 'sinon'; +import { assert } from 'chai'; +import * as MIME from '../../types/MIME'; + +import { + AttachmentDownloadManager, + AttachmentDownloadUrgency, + type NewAttachmentDownloadJobType, +} from '../../jobs/AttachmentDownloadManager'; +import type { AttachmentDownloadJobType } from '../../types/AttachmentDownload'; +import dataInterface from '../../sql/Client'; +import { HOUR, MINUTE, SECOND } from '../../util/durations'; +import { type AciString } from '../../types/ServiceId'; + +describe('AttachmentDownloadManager', () => { + let downloadManager: AttachmentDownloadManager | undefined; + let runJob: sinon.SinonStub; + let sandbox: sinon.SinonSandbox; + let clock: sinon.SinonFakeTimers; + let isInCall: sinon.SinonStub; + + function composeJob({ + messageId, + receivedAt, + }: Pick< + NewAttachmentDownloadJobType, + 'messageId' | 'receivedAt' + >): AttachmentDownloadJobType { + const digest = `digestFor${messageId}`; + const size = 128; + const contentType = MIME.IMAGE_PNG; + return { + messageId, + receivedAt, + sentAt: receivedAt, + attachmentType: 'attachment', + digest, + size, + contentType, + active: false, + attempts: 0, + retryAfter: null, + lastAttemptTimestamp: null, + attachment: { + contentType, + size, + digest: `digestFor${messageId}`, + }, + }; + } + + beforeEach(async () => { + await dataInterface.removeAll(); + + sandbox = sinon.createSandbox(); + clock = sinon.useFakeTimers(); + + isInCall = sinon.stub().returns(false); + runJob = sinon.stub().callsFake(async () => { + return new Promise<{ status: 'finished' | 'retry' }>(resolve => { + Promise.resolve().then(() => { + resolve({ status: 'finished' }); + }); + }); + }); + + downloadManager = new AttachmentDownloadManager({ + ...AttachmentDownloadManager.defaultParams, + isInCall, + runJob, + }); + }); + + afterEach(async () => { + sandbox.restore(); + clock.restore(); + await downloadManager?.stop(); + }); + + async function addJob( + job: AttachmentDownloadJobType, + urgency: AttachmentDownloadUrgency + ) { + // Save message first to satisfy foreign key constraint + await dataInterface.saveMessage( + { + id: job.messageId, + type: 'incoming', + sent_at: job.sentAt, + timestamp: job.sentAt, + received_at: job.receivedAt + 1, + conversationId: 'convoId', + }, + { + ourAci: 'ourAci' as AciString, + forceSave: true, + } + ); + await downloadManager?.addJob({ + ...job, + urgency, + }); + } + async function addJobs( + num: number + ): Promise> { + const jobs = new Array(num) + .fill(null) + .map((_, idx) => + composeJob({ messageId: `message-${idx}`, receivedAt: idx }) + ); + for (const job of jobs) { + // eslint-disable-next-line no-await-in-loop + await addJob(job, AttachmentDownloadUrgency.STANDARD); + } + return jobs; + } + + function waitForJobToBeStarted(job: AttachmentDownloadJobType) { + return downloadManager?.waitForJobToBeStarted(job); + } + + function waitForJobToBeCompleted(job: AttachmentDownloadJobType) { + return downloadManager?.waitForJobToBeCompleted(job); + } + + function assertRunJobCalledWith(jobs: Array) { + return assert.strictEqual( + JSON.stringify( + runJob + .getCalls() + .map( + call => + `${call.args[0].messageId}${call.args[0].attachmentType}.${call.args[0].digest}` + ) + ), + JSON.stringify( + jobs.map(job => `${job.messageId}${job.attachmentType}.${job.digest}`) + ) + ); + } + + async function advanceTime(ms: number) { + // When advancing the timers, we want to make sure any DB operations are completed + // first. In cases like maybeStartJobs where we prevent re-entrancy, without this, + // prior (unfinished) invocations can prevent subsequent calls after the clock is + // ticked forward and make tests unreliable + await dataInterface.getAllItems(); + await clock.tickAsync(ms); + } + + function getPromisesForAttempts( + job: AttachmentDownloadJobType, + attempts: number + ) { + return new Array(attempts).fill(null).map((_, idx) => { + return { + started: waitForJobToBeStarted({ ...job, attempts: idx }), + completed: waitForJobToBeCompleted({ ...job, attempts: idx }), + }; + }); + } + + it('runs 3 jobs at a time in descending receivedAt order', async () => { + const jobs = await addJobs(5); + // Confirm they are saved to DB + const allJobs = await dataInterface.getNextAttachmentDownloadJobs({ + limit: 100, + }); + + assert.strictEqual(allJobs.length, 5); + assert.strictEqual( + JSON.stringify(allJobs.map(job => job.messageId)), + JSON.stringify([ + 'message-4', + 'message-3', + 'message-2', + 'message-1', + 'message-0', + ]) + ); + + await downloadManager?.start(); + await waitForJobToBeStarted(jobs[2]); + + assert.strictEqual(runJob.callCount, 3); + assertRunJobCalledWith([jobs[4], jobs[3], jobs[2]]); + + await waitForJobToBeStarted(jobs[0]); + assert.strictEqual(runJob.callCount, 5); + assertRunJobCalledWith([jobs[4], jobs[3], jobs[2], jobs[1], jobs[0]]); + }); + + it('runs a job immediately if urgency is IMMEDIATE', async () => { + const jobs = await addJobs(6); + await downloadManager?.start(); + + const urgentJobForOldMessage = composeJob({ + messageId: 'message-urgent', + receivedAt: 0, + }); + + await addJob(urgentJobForOldMessage, AttachmentDownloadUrgency.IMMEDIATE); + + await waitForJobToBeStarted(urgentJobForOldMessage); + + assert.strictEqual(runJob.callCount, 4); + assertRunJobCalledWith([jobs[5], jobs[4], jobs[3], urgentJobForOldMessage]); + + await waitForJobToBeStarted(jobs[0]); + assert.strictEqual(runJob.callCount, 7); + assertRunJobCalledWith([ + jobs[5], + jobs[4], + jobs[3], + urgentJobForOldMessage, + jobs[2], + jobs[1], + jobs[0], + ]); + }); + + it('prefers jobs for visible messages', async () => { + const jobs = await addJobs(5); + + downloadManager?.updateVisibleTimelineMessages(['message-0', 'message-1']); + + await downloadManager?.start(); + + await waitForJobToBeStarted(jobs[4]); + assert.strictEqual(runJob.callCount, 3); + assertRunJobCalledWith([jobs[0], jobs[1], jobs[4]]); + + await waitForJobToBeStarted(jobs[2]); + assert.strictEqual(runJob.callCount, 5); + assertRunJobCalledWith([jobs[0], jobs[1], jobs[4], jobs[3], jobs[2]]); + }); + + it("does not start a job if we're in a call", async () => { + const jobs = await addJobs(5); + + isInCall.callsFake(() => true); + + await downloadManager?.start(); + await advanceTime(2 * MINUTE); + assert.strictEqual(runJob.callCount, 0); + + isInCall.callsFake(() => false); + + await advanceTime(2 * MINUTE); + await waitForJobToBeStarted(jobs[0]); + assert.strictEqual(runJob.callCount, 5); + }); + + it('handles retries for failed', async () => { + const jobs = await addJobs(2); + const job0Attempts = getPromisesForAttempts(jobs[0], 1); + const job1Attempts = getPromisesForAttempts(jobs[1], 5); + + runJob.callsFake(async (job: AttachmentDownloadJobType) => { + return new Promise<{ status: 'finished' | 'retry' }>(resolve => { + Promise.resolve().then(() => { + if (job.messageId === jobs[0].messageId) { + resolve({ status: 'finished' }); + } else { + resolve({ status: 'retry' }); + } + }); + }); + }); + + await downloadManager?.start(); + + await job0Attempts[0].completed; + assert.strictEqual(runJob.callCount, 2); + assertRunJobCalledWith([jobs[1], jobs[0]]); + + const retriedJob = await dataInterface.getAttachmentDownloadJob(jobs[1]); + const finishedJob = await dataInterface.getAttachmentDownloadJob(jobs[0]); + + assert.isUndefined(finishedJob); + assert.strictEqual(retriedJob?.attempts, 1); + assert.isNumber(retriedJob?.retryAfter); + + await advanceTime(30 * SECOND); + await job1Attempts[1].completed; + assert.strictEqual(runJob.callCount, 3); + + await advanceTime(5 * MINUTE); + await job1Attempts[2].completed; + assert.strictEqual(runJob.callCount, 4); + + await advanceTime(50 * MINUTE); + await job1Attempts[3].completed; + assert.strictEqual(runJob.callCount, 5); + + await advanceTime(6 * HOUR); + await job1Attempts[4].completed; + assert.strictEqual(runJob.callCount, 6); + assertRunJobCalledWith([ + jobs[1], + jobs[0], + jobs[1], + jobs[1], + jobs[1], + jobs[1], + ]); + + // Ensure it's been removed after completed + assert.isUndefined(await dataInterface.getAttachmentDownloadJob(jobs[1])); + }); + + it('will reset attempts if addJob is called again', async () => { + const jobs = await addJobs(1); + runJob.callsFake(async () => { + return new Promise<{ status: 'finished' | 'retry' }>(resolve => { + Promise.resolve().then(() => { + resolve({ status: 'retry' }); + }); + }); + }); + + let attempts = getPromisesForAttempts(jobs[0], 4); + await downloadManager?.start(); + + await attempts[0].completed; + assert.strictEqual(runJob.callCount, 1); + + await advanceTime(30 * SECOND); + await attempts[1].completed; + assert.strictEqual(runJob.callCount, 2); + + await advanceTime(5 * MINUTE); + await attempts[2].completed; + assert.strictEqual(runJob.callCount, 3); + + // add the same job again and it should retry ASAP and reset attempts + attempts = getPromisesForAttempts(jobs[0], 4); + await downloadManager?.addJob(jobs[0]); + await attempts[0].completed; + assert.strictEqual(runJob.callCount, 4); + + await advanceTime(30 * SECOND); + await attempts[1].completed; + assert.strictEqual(runJob.callCount, 5); + + await advanceTime(5 * MINUTE); + await attempts[2].completed; + assert.strictEqual(runJob.callCount, 6); + + await advanceTime(50 * MINUTE); + await attempts[3].completed; + assert.strictEqual(runJob.callCount, 7); + + await advanceTime(6 * HOUR); + await attempts[3].completed; + assert.strictEqual(runJob.callCount, 8); + + // Ensure it's been removed + assert.isUndefined(await dataInterface.getAttachmentDownloadJob(jobs[0])); + }); +}); diff --git a/ts/test-node/sql/migration_1040_test.ts b/ts/test-node/sql/migration_1040_test.ts new file mode 100644 index 000000000000..71d2d8aa6a75 --- /dev/null +++ b/ts/test-node/sql/migration_1040_test.ts @@ -0,0 +1,484 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { omit } from 'lodash'; +import { assert } from 'chai'; +import type { Database } from '@signalapp/better-sqlite3'; +import SQL from '@signalapp/better-sqlite3'; + +import { jsonToObject, objectToJSON, sql, sqlJoin } from '../../sql/util'; +import { updateToVersion } from './helpers'; +import type { LegacyAttachmentDownloadJobType } from '../../sql/migrations/1040-undownloaded-backed-up-media'; +import type { AttachmentType } from '../../types/Attachment'; +import type { AttachmentDownloadJobType } from '../../types/AttachmentDownload'; +import { IMAGE_JPEG } from '../../types/MIME'; + +function getAttachmentDownloadJobs(db: Database) { + const [query] = sql` + SELECT * FROM attachment_downloads ORDER BY receivedAt DESC; + `; + + return db + .prepare(query) + .all() + .map(job => ({ + ...omit(job, 'attachmentJson'), + attachment: jsonToObject(job.attachmentJson), + })); +} + +type UnflattenedAttachmentDownloadJobType = Omit< + AttachmentDownloadJobType, + 'digest' | 'contentType' | 'size' +>; +function insertNewJob( + db: Database, + job: UnflattenedAttachmentDownloadJobType, + addMessageFirst: boolean = true +): void { + if (addMessageFirst) { + try { + db.prepare('INSERT INTO messages (id) VALUES ($id)').run({ + id: job.messageId, + }); + } catch (e) { + // pass; message has already been inserted + } + } + const [query, params] = sql` + INSERT INTO attachment_downloads + ( + messageId, + attachmentType, + attachmentJson, + digest, + contentType, + size, + receivedAt, + sentAt, + active, + attempts, + retryAfter, + lastAttemptTimestamp + ) + VALUES + ( + ${job.messageId}, + ${job.attachmentType}, + ${objectToJSON(job.attachment)}, + ${job.attachment.digest}, + ${job.attachment.contentType}, + ${job.attachment.size}, + ${job.receivedAt}, + ${job.sentAt}, + ${job.active ? 1 : 0}, + ${job.attempts}, + ${job.retryAfter}, + ${job.lastAttemptTimestamp} + ); +`; + + db.prepare(query).run(params); +} + +describe('SQL/updateToSchemaVersion1040', () => { + describe('Storing of new attachment jobs', () => { + let db: Database; + + beforeEach(() => { + db = new SQL(':memory:'); + updateToVersion(db, 1040); + }); + + afterEach(() => { + db.close(); + }); + + it('allows storing of new backup attachment jobs', () => { + insertNewJob(db, { + messageId: 'message1', + attachmentType: 'attachment', + attachment: { + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + }, + receivedAt: 1970, + sentAt: 2070, + active: false, + retryAfter: null, + attempts: 0, + lastAttemptTimestamp: null, + }); + + insertNewJob(db, { + messageId: 'message2', + attachmentType: 'attachment', + attachment: { + digest: 'digest2', + contentType: IMAGE_JPEG, + size: 128, + }, + receivedAt: 1971, + sentAt: 2071, + active: false, + retryAfter: 1204, + attempts: 0, + lastAttemptTimestamp: 1004, + }); + + const attachments = getAttachmentDownloadJobs(db); + assert.strictEqual(attachments.length, 2); + assert.deepEqual(attachments, [ + { + messageId: 'message2', + attachmentType: 'attachment', + digest: 'digest2', + contentType: IMAGE_JPEG, + size: 128, + receivedAt: 1971, + sentAt: 2071, + active: 0, + retryAfter: 1204, + attempts: 0, + lastAttemptTimestamp: 1004, + attachment: { + digest: 'digest2', + contentType: IMAGE_JPEG, + size: 128, + }, + }, + { + messageId: 'message1', + attachmentType: 'attachment', + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + receivedAt: 1970, + sentAt: 2070, + active: 0, + retryAfter: null, + attempts: 0, + lastAttemptTimestamp: null, + attachment: { + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + }, + }, + ]); + }); + + it('Respects primary key constraint', () => { + const job: UnflattenedAttachmentDownloadJobType = { + messageId: 'message1', + attachmentType: 'attachment', + attachment: { + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + }, + receivedAt: 1970, + sentAt: 2070, + active: false, + retryAfter: null, + attempts: 0, + lastAttemptTimestamp: null, + }; + insertNewJob(db, job); + assert.throws(() => { + insertNewJob(db, { ...job, attempts: 1 }); + }); + + const attachments = getAttachmentDownloadJobs(db); + assert.strictEqual(attachments.length, 1); + assert.strictEqual(attachments[0].attempts, 0); + }); + + it('uses indices searching for next job', () => { + const now = Date.now(); + + const job: UnflattenedAttachmentDownloadJobType = { + messageId: 'message1', + attachmentType: 'attachment', + attachment: { + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + }, + receivedAt: 101, + sentAt: 101, + attempts: 0, + active: false, + retryAfter: null, + lastAttemptTimestamp: null, + }; + insertNewJob(db, job); + insertNewJob(db, { + ...job, + messageId: 'message2', + receivedAt: 102, + sentAt: 102, + retryAfter: now + 1, + lastAttemptTimestamp: now - 10, + }); + insertNewJob(db, { + ...job, + messageId: 'message3', + active: true, + receivedAt: 103, + sentAt: 103, + }); + insertNewJob(db, { + ...job, + messageId: 'message4', + attachmentType: 'contact', + receivedAt: 104, + sentAt: 104, + retryAfter: now, + lastAttemptTimestamp: now - 1000, + }); + + { + const [query, params] = sql` + SELECT * FROM attachment_downloads + WHERE + active = 0 + AND + (retryAfter is NULL OR retryAfter <= ${now}) + ORDER BY receivedAt DESC + LIMIT 5 + `; + + const result = db.prepare(query).all(params); + assert.strictEqual(result.length, 2); + assert.deepStrictEqual( + result.map(res => res.messageId), + ['message4', 'message1'] + ); + + const details = db + .prepare(`EXPLAIN QUERY PLAN ${query}`) + .all(params) + .map(step => step.detail) + .join(', '); + assert.include( + details, + 'USING INDEX attachment_downloads_active_receivedAt' + ); + assert.notInclude(details, 'TEMP B-TREE'); + assert.notInclude(details, 'SCAN'); + } + { + const messageIds = ['message1', 'message2', 'message4']; + const [query, params] = sql` + SELECT * FROM attachment_downloads + INDEXED BY attachment_downloads_active_messageId + WHERE + active = 0 + AND + (lastAttemptTimestamp is NULL OR lastAttemptTimestamp <= ${now - 100}) + AND + messageId IN (${sqlJoin(messageIds)}) + ORDER BY receivedAt ASC + LIMIT 5 + `; + + const result = db.prepare(query).all(params); + assert.strictEqual(result.length, 2); + assert.deepStrictEqual( + result.map(res => res.messageId), + ['message1', 'message4'] + ); + const details = db + .prepare(`EXPLAIN QUERY PLAN ${query}`) + .all(params) + .map(step => step.detail) + .join(', '); + + // This query _will_ use a temp b-tree for ordering, but the number of rows + // should be quite low. + assert.include( + details, + 'USING INDEX attachment_downloads_active_messageId' + ); + } + }); + + it('respects foreign key constraint on messageId', () => { + const job: AttachmentDownloadJobType = { + messageId: 'message1', + attachmentType: 'attachment', + attachment: { + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + }, + receivedAt: 1970, + digest: 'digest1', + contentType: IMAGE_JPEG, + size: 128, + sentAt: 2070, + active: false, + retryAfter: null, + attempts: 0, + lastAttemptTimestamp: null, + }; + // throws if we don't add the message first + assert.throws(() => insertNewJob(db, job, false)); + insertNewJob(db, job, true); + + assert.strictEqual(getAttachmentDownloadJobs(db).length, 1); + + // Deletes the job when the message is deleted + db.prepare('DELETE FROM messages WHERE id = $id').run({ + id: job.messageId, + }); + assert.strictEqual(getAttachmentDownloadJobs(db).length, 0); + }); + }); + + describe('existing jobs are transferred', () => { + let db: Database; + + beforeEach(() => { + db = new SQL(':memory:'); + updateToVersion(db, 1030); + }); + + afterEach(() => { + db.close(); + }); + + it('existing rows are retained; invalid existing rows are removed', () => { + insertLegacyJob(db, { + id: 'id-1', + messageId: 'message-1', + timestamp: 1000, + attachment: { + size: 100, + contentType: 'image/png', + digest: 'digest1', + cdnKey: 'key1', + } as AttachmentType, + pending: 0, + index: 0, + type: 'attachment', + }); + insertLegacyJob(db, { + id: 'invalid-1', + }); + insertLegacyJob(db, { + id: 'id-2', + messageId: 'message-2', + timestamp: 1001, + attachment: { + size: 100, + contentType: 'image/jpeg', + digest: 'digest2', + cdnKey: 'key2', + } as AttachmentType, + pending: 1, + index: 2, + type: 'attachment', + attempts: 1, + }); + insertLegacyJob(db, { + id: 'invalid-2', + timestamp: 1000, + attachment: { size: 100, contentType: 'image/jpeg' } as AttachmentType, + pending: 0, + index: 0, + type: 'attachment', + }); + insertLegacyJob(db, { + id: 'invalid-3-no-content-type', + timestamp: 1000, + attachment: { size: 100 } as AttachmentType, + pending: 0, + index: 0, + type: 'attachment', + }); + insertLegacyJob(db, { + id: 'duplicate-1', + messageId: 'message-1', + timestamp: 1000, + attachment: { + size: 100, + contentType: 'image/jpeg', + digest: 'digest1', + } as AttachmentType, + pending: 0, + index: 0, + type: 'attachment', + }); + + const legacyJobs = db.prepare('SELECT * FROM attachment_downloads').all(); + assert.strictEqual(legacyJobs.length, 6); + + updateToVersion(db, 1040); + + const newJobs = getAttachmentDownloadJobs(db); + assert.strictEqual(newJobs.length, 2); + assert.deepEqual(newJobs[1], { + messageId: 'message-1', + receivedAt: 1000, + sentAt: 1000, + attachment: { + size: 100, + contentType: 'image/png', + digest: 'digest1', + cdnKey: 'key1', + }, + size: 100, + contentType: 'image/png', + digest: 'digest1', + active: 0, + attempts: 0, + attachmentType: 'attachment', + lastAttemptTimestamp: null, + retryAfter: null, + }); + assert.deepEqual(newJobs[0], { + messageId: 'message-2', + receivedAt: 1001, + sentAt: 1001, + attachment: { + size: 100, + contentType: 'image/jpeg', + digest: 'digest2', + cdnKey: 'key2', + }, + size: 100, + contentType: 'image/jpeg', + digest: 'digest2', + active: 0, + attempts: 1, + attachmentType: 'attachment', + lastAttemptTimestamp: null, + retryAfter: null, + }); + }); + }); +}); + +function insertLegacyJob( + db: Database, + job: Partial +): void { + db.prepare('INSERT OR REPLACE INTO messages (id) VALUES ($id)').run({ + id: job.messageId, + }); + const [query, params] = sql` + INSERT INTO attachment_downloads + (id, timestamp, pending, json) + VALUES + ( + ${job.id}, + ${job.timestamp}, + ${job.pending}, + ${objectToJSON(job)} + ); + `; + + db.prepare(query).run(params); +} diff --git a/ts/textsecure/downloadAttachment.ts b/ts/textsecure/downloadAttachment.ts index 347907ebe470..daf3ad3cd89f 100644 --- a/ts/textsecure/downloadAttachment.ts +++ b/ts/textsecure/downloadAttachment.ts @@ -96,6 +96,8 @@ export async function downloadAttachmentV2( strictAssert(key, `${logId}: missing key`); strictAssert(isNumber(size), `${logId}: missing size`); + // TODO (DESKTOP-6845): download attachments differentially based on their + // media tier (i.e. transit tier or backup tier) const downloadStream = await server.getAttachmentV2( cdn, dropNull(cdnNumber), diff --git a/ts/types/Attachment.ts b/ts/types/Attachment.ts index 656ea5373971..be93fced7c72 100644 --- a/ts/types/Attachment.ts +++ b/ts/types/Attachment.ts @@ -70,7 +70,6 @@ export type AttachmentType = { flags?: number; thumbnail?: ThumbnailType; isCorrupted?: boolean; - downloadJobId?: string; cdnNumber?: number; cdnId?: string; cdnKey?: string; @@ -696,7 +695,7 @@ export function hasNotResolved(attachment?: AttachmentType): boolean { export function isDownloading(attachment?: AttachmentType): boolean { const resolved = resolveNestedAttachment(attachment); - return Boolean(resolved && resolved.downloadJobId && resolved.pending); + return Boolean(resolved && resolved.pending); } export function hasFailed(attachment?: AttachmentType): boolean { diff --git a/ts/types/AttachmentDownload.ts b/ts/types/AttachmentDownload.ts new file mode 100644 index 000000000000..f3b6a41a9300 --- /dev/null +++ b/ts/types/AttachmentDownload.ts @@ -0,0 +1,56 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { z } from 'zod'; +import { MIMETypeSchema, type MIMEType } from './MIME'; +import type { AttachmentType } from './Attachment'; + +export const attachmentDownloadTypeSchema = z.enum([ + 'long-message', + 'attachment', + 'preview', + 'contact', + 'quote', + 'sticker', +]); + +export type AttachmentDownloadJobTypeType = z.infer< + typeof attachmentDownloadTypeSchema +>; + +export type AttachmentDownloadJobType = { + messageId: string; + receivedAt: number; + sentAt: number; + attachmentType: AttachmentDownloadJobTypeType; + attachment: AttachmentType; + attempts: number; + active: boolean; + retryAfter: number | null; + lastAttemptTimestamp: number | null; + digest: string; + contentType: MIMEType; + size: number; +}; + +export const attachmentDownloadJobSchema = z.object({ + messageId: z.string(), + receivedAt: z.number(), + sentAt: z.number(), + attachmentType: attachmentDownloadTypeSchema, + attachment: z + .object({ size: z.number(), contentType: MIMETypeSchema }) + .passthrough(), + attempts: z.number(), + active: z.boolean(), + retryAfter: z.number().nullable(), + lastAttemptTimestamp: z.number().nullable(), + digest: z.string(), + contentType: MIMETypeSchema, + size: z.number(), + messageIdForLogging: z.string().optional(), +}) satisfies z.ZodType< + Omit & { + contentType: string; + attachment: Record; + } +>; diff --git a/ts/types/MIME.ts b/ts/types/MIME.ts index 04f77b2a19fc..b6bb0109e766 100644 --- a/ts/types/MIME.ts +++ b/ts/types/MIME.ts @@ -1,7 +1,9 @@ // Copyright 2018 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import { z } from 'zod'; -export type MIMEType = string & { _mimeTypeBrand: never }; +export const MIMETypeSchema = z.string().brand('mimeType'); +export type MIMEType = z.infer; export const stringToMIMEType = (value: string): MIMEType => { return value as MIMEType; diff --git a/ts/util/downloadAttachment.ts b/ts/util/downloadAttachment.ts index 7ab95e8c2f4a..e47363ee3637 100644 --- a/ts/util/downloadAttachment.ts +++ b/ts/util/downloadAttachment.ts @@ -4,9 +4,10 @@ import type { AttachmentType } from '../types/Attachment'; import { downloadAttachmentV2 as doDownloadAttachment } from '../textsecure/downloadAttachment'; +export class AttachmentNotFoundOnCdnError extends Error {} export async function downloadAttachment( attachmentData: AttachmentType -): Promise { +): Promise { let migratedAttachment: AttachmentType; const { server } = window.textsecure; @@ -30,7 +31,7 @@ export async function downloadAttachment( } catch (error) { // Attachments on the server expire after 30 days, then start returning 404 or 403 if (error && (error.code === 404 || error.code === 403)) { - return null; + throw new AttachmentNotFoundOnCdnError(error.code); } throw error; diff --git a/ts/util/exponentialBackoff.ts b/ts/util/exponentialBackoff.ts index 87f29c3ac31d..618d628117c9 100644 --- a/ts/util/exponentialBackoff.ts +++ b/ts/util/exponentialBackoff.ts @@ -5,6 +5,7 @@ import * as durations from './durations'; const BACKOFF_FACTOR = 1.9; const MAX_BACKOFF = 15 * durations.MINUTE; +const FIRST_BACKOFF = 100 * BACKOFF_FACTOR; /** * For a given attempt, how long should we sleep (in milliseconds)? @@ -16,12 +17,29 @@ const MAX_BACKOFF = 15 * durations.MINUTE; * * [0]: https://github.com/signalapp/Signal-iOS/blob/6069741602421744edfb59923d2fb3a66b1b23c1/SignalServiceKit/src/Util/OWSOperation.swift */ -export function exponentialBackoffSleepTime(attempt: number): number { - const failureCount = attempt - 1; - if (failureCount === 0) { + +export type ExponentialBackoffOptionsType = { + maxBackoffTime: number; + multiplier: number; + firstBackoffTime: number; +}; +export function exponentialBackoffSleepTime( + attempt: number, + options: ExponentialBackoffOptionsType = { + maxBackoffTime: MAX_BACKOFF, + multiplier: BACKOFF_FACTOR, + firstBackoffTime: FIRST_BACKOFF, + } +): number { + if (attempt === 1) { return 0; } - return Math.min(MAX_BACKOFF, 100 * BACKOFF_FACTOR ** failureCount); + + return Math.min( + options.maxBackoffTime, + (options.firstBackoffTime / options.multiplier) * + options.multiplier ** (attempt - 1) + ); } /** @@ -31,7 +49,8 @@ export function exponentialBackoffSleepTime(attempt: number): number { * `desiredDurationMs` should be at least 1. */ export function exponentialBackoffMaxAttempts( - desiredDurationMs: number + desiredDurationMs: number, + options?: ExponentialBackoffOptionsType ): number { let attempts = 0; let total = 0; @@ -39,7 +58,7 @@ export function exponentialBackoffMaxAttempts( // fast even for giant numbers, and is typically called just once at startup. do { attempts += 1; - total += exponentialBackoffSleepTime(attempts); + total += exponentialBackoffSleepTime(attempts, options); } while (total < desiredDurationMs); return attempts; } diff --git a/ts/util/privacy.ts b/ts/util/privacy.ts index e1089cae9aa8..446b1466dcac 100644 --- a/ts/util/privacy.ts +++ b/ts/util/privacy.ts @@ -167,6 +167,10 @@ export const redactCdnKey = (cdnKey: string): string => { return `${REDACTION_PLACEHOLDER}${cdnKey.slice(-3)}`; }; +export const redactGenericText = (text: string): string => { + return `${REDACTION_PLACEHOLDER}${text.slice(-3)}`; +}; + const createRedactSensitivePaths = ( paths: ReadonlyArray ): RedactFunction => { diff --git a/ts/util/queueAttachmentDownloads.ts b/ts/util/queueAttachmentDownloads.ts index aa657ad8ddaa..86df8281ab52 100644 --- a/ts/util/queueAttachmentDownloads.ts +++ b/ts/util/queueAttachmentDownloads.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: AGPL-3.0-only import { partition } from 'lodash'; -import * as AttachmentDownloads from '../messageModifiers/AttachmentDownloads'; import * as log from '../logging/log'; import { isLongMessage } from '../types/MIME'; import { getMessageIdForLogging } from './idForLogging'; @@ -29,6 +28,10 @@ import { import type { StickerType } from '../types/Stickers'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; import { isNotNil } from './isNotNil'; +import { + AttachmentDownloadManager, + AttachmentDownloadUrgency, +} from '../jobs/AttachmentDownloadManager'; export type MessageAttachmentsDownloadedType = { bodyAttachment?: AttachmentType; @@ -58,7 +61,8 @@ function getAttachmentSignatureSafe( // NOTE: If you're changing any logic in this function that deals with the // count then you'll also have to modify ./hasAttachmentsDownloads export async function queueAttachmentDownloads( - message: MessageAttributesType + message: MessageAttributesType, + urgency: AttachmentDownloadUrgency = AttachmentDownloadUrgency.STANDARD ): Promise { const attachmentsToQueue = message.attachments || []; const messageId = message.id; @@ -82,9 +86,11 @@ export async function queueAttachmentDownloads( log.error(`${idLog}: Received more than one long message attachment`); } - log.info( - `${idLog}: Queueing ${longMessageAttachments.length} long message attachment downloads` - ); + if (longMessageAttachments.length > 0) { + log.info( + `${idLog}: Queueing ${longMessageAttachments.length} long message attachment downloads` + ); + } if (longMessageAttachments.length > 0) { count += 1; @@ -96,54 +102,77 @@ export async function queueAttachmentDownloads( } if (bodyAttachment) { - await AttachmentDownloads.addJob(bodyAttachment, { + await AttachmentDownloadManager.addJob({ + attachment: bodyAttachment, messageId, - type: 'long-message', - index: 0, + attachmentType: 'long-message', + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, }); } - log.info( - `${idLog}: Queueing ${normalAttachments.length} normal attachment downloads` - ); + if (normalAttachments.length > 0) { + log.info( + `${idLog}: Queueing ${normalAttachments.length} normal attachment downloads` + ); + } const { attachments, count: attachmentsCount } = await queueNormalAttachments( - idLog, - messageId, - normalAttachments, - message.editHistory?.flatMap(x => x.attachments ?? []) + { + idLog, + messageId, + attachments: normalAttachments, + otherAttachments: message.editHistory?.flatMap(x => x.attachments ?? []), + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, + } ); count += attachmentsCount; const previewsToQueue = message.preview || []; - log.info( - `${idLog}: Queueing ${previewsToQueue.length} preview attachment downloads` - ); - const { preview, count: previewCount } = await queuePreviews( + if (previewsToQueue.length > 0) { + log.info( + `${idLog}: Queueing ${previewsToQueue.length} preview attachment downloads` + ); + } + const { preview, count: previewCount } = await queuePreviews({ idLog, messageId, - previewsToQueue, - message.editHistory?.flatMap(x => x.preview ?? []) - ); + previews: previewsToQueue, + otherPreviews: message.editHistory?.flatMap(x => x.preview ?? []), + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, + }); count += previewCount; - log.info( - `${idLog}: Queueing ${message.quote?.attachments?.length ?? 0} ` + - 'quote attachment downloads' - ); - const { quote, count: thumbnailCount } = await queueQuoteAttachments( + const numQuoteAttachments = message.quote?.attachments?.length ?? 0; + if (numQuoteAttachments > 0) { + log.info( + `${idLog}: Queueing ${numQuoteAttachments} ` + + 'quote attachment downloads' + ); + } + const { quote, count: thumbnailCount } = await queueQuoteAttachments({ idLog, messageId, - message.quote, - message.editHistory?.map(x => x.quote).filter(isNotNil) ?? [] - ); + quote: message.quote, + otherQuotes: message.editHistory?.map(x => x.quote).filter(isNotNil) ?? [], + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, + }); count += thumbnailCount; const contactsToQueue = message.contact || []; - log.info( - `${idLog}: Queueing ${contactsToQueue.length} contact attachment downloads` - ); + if (contactsToQueue.length > 0) { + log.info( + `${idLog}: Queueing ${contactsToQueue.length} contact attachment downloads` + ); + } const contact = await Promise.all( - contactsToQueue.map(async (item, index) => { + contactsToQueue.map(async item => { if (!item.avatar || !item.avatar.avatar) { return item; } @@ -158,10 +187,13 @@ export async function queueAttachmentDownloads( ...item, avatar: { ...item.avatar, - avatar: await AttachmentDownloads.addJob(item.avatar.avatar, { + avatar: await AttachmentDownloadManager.addJob({ + attachment: item.avatar.avatar, messageId, - type: 'contact', - index, + attachmentType: 'contact', + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, }), }, }; @@ -191,10 +223,13 @@ export async function queueAttachmentDownloads( } if (!data) { if (sticker.data) { - data = await AttachmentDownloads.addJob(sticker.data, { + data = await AttachmentDownloadManager.addJob({ + attachment: sticker.data, messageId, - type: 'sticker', - index: 0, + attachmentType: 'sticker', + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, }); } else { log.error(`${idLog}: Sticker data was missing`); @@ -224,12 +259,15 @@ export async function queueAttachmentDownloads( editHistory = await Promise.all( editHistory.map(async edit => { const { attachments: editAttachments, count: editAttachmentsCount } = - await queueNormalAttachments( + await queueNormalAttachments({ idLog, messageId, - edit.attachments, - attachments - ); + attachments: edit.attachments, + otherAttachments: attachments, + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, + }); count += editAttachmentsCount; if (editAttachmentsCount !== 0) { log.info( @@ -239,7 +277,15 @@ export async function queueAttachmentDownloads( } const { preview: editPreview, count: editPreviewCount } = - await queuePreviews(idLog, messageId, edit.preview, preview); + await queuePreviews({ + idLog, + messageId, + previews: edit.preview, + otherPreviews: preview, + receivedAt: message.received_at, + sentAt: message.sent_at, + urgency, + }); count += editPreviewCount; if (editPreviewCount !== 0) { log.info( @@ -274,12 +320,23 @@ export async function queueAttachmentDownloads( }; } -async function queueNormalAttachments( - idLog: string, - messageId: string, - attachments: MessageAttributesType['attachments'] = [], - otherAttachments: MessageAttributesType['attachments'] -): Promise<{ +async function queueNormalAttachments({ + idLog, + messageId, + attachments = [], + otherAttachments, + receivedAt, + sentAt, + urgency, +}: { + idLog: string; + messageId: string; + attachments: MessageAttributesType['attachments']; + otherAttachments: MessageAttributesType['attachments']; + receivedAt: number; + sentAt: number; + urgency: AttachmentDownloadUrgency; +}): Promise<{ attachments: Array; count: number; }> { @@ -299,7 +356,7 @@ async function queueNormalAttachments( let count = 0; const nextAttachments = await Promise.all( - attachments.map((attachment, index) => { + attachments.map(attachment => { if (!attachment) { return attachment; } @@ -329,10 +386,13 @@ async function queueNormalAttachments( count += 1; - return AttachmentDownloads.addJob(attachment, { + return AttachmentDownloadManager.addJob({ + attachment, messageId, - type: 'attachment', - index, + attachmentType: 'attachment', + receivedAt, + sentAt, + urgency, }); }) ); @@ -358,12 +418,23 @@ function getLinkPreviewSignature(preview: LinkPreviewType): string | undefined { return `<${url}>${signature}`; } -async function queuePreviews( - idLog: string, - messageId: string, - previews: MessageAttributesType['preview'] = [], - otherPreviews: MessageAttributesType['preview'] -): Promise<{ preview: Array; count: number }> { +async function queuePreviews({ + idLog, + messageId, + previews = [], + otherPreviews, + receivedAt, + sentAt, + urgency, +}: { + idLog: string; + messageId: string; + previews: MessageAttributesType['preview']; + otherPreviews: MessageAttributesType['preview']; + receivedAt: number; + sentAt: number; + urgency: AttachmentDownloadUrgency; +}): Promise<{ preview: Array; count: number }> { // Similar to queueNormalAttachments' logic for detecting same attachments // except here we also pick by link preview URL. const previewSignatures: Map = new Map(); @@ -378,7 +449,7 @@ async function queuePreviews( let count = 0; const preview = await Promise.all( - previews.map(async (item, index) => { + previews.map(async item => { if (!item.image) { return item; } @@ -407,10 +478,13 @@ async function queuePreviews( count += 1; return { ...item, - image: await AttachmentDownloads.addJob(item.image, { + image: await AttachmentDownloadManager.addJob({ + attachment: item.image, messageId, - type: 'preview', - index, + attachmentType: 'preview', + receivedAt, + sentAt, + urgency, }), }; }) @@ -436,12 +510,23 @@ function getQuoteThumbnailSignature( return `<${quote.id}>${signature}`; } -async function queueQuoteAttachments( - idLog: string, - messageId: string, - quote: QuotedMessageType | undefined, - otherQuotes: ReadonlyArray -): Promise<{ quote?: QuotedMessageType; count: number }> { +async function queueQuoteAttachments({ + idLog, + messageId, + quote, + otherQuotes, + receivedAt, + sentAt, + urgency, +}: { + idLog: string; + messageId: string; + quote: QuotedMessageType | undefined; + otherQuotes: ReadonlyArray; + receivedAt: number; + sentAt: number; + urgency: AttachmentDownloadUrgency; +}): Promise<{ quote?: QuotedMessageType; count: number }> { let count = 0; if (!quote) { return { quote, count }; @@ -473,7 +558,7 @@ async function queueQuoteAttachments( quote: { ...quote, attachments: await Promise.all( - quote.attachments.map(async (item, index) => { + quote.attachments.map(async item => { if (!item.thumbnail) { return item; } @@ -508,10 +593,13 @@ async function queueQuoteAttachments( count += 1; return { ...item, - thumbnail: await AttachmentDownloads.addJob(item.thumbnail, { + thumbnail: await AttachmentDownloadManager.addJob({ + attachment: item.thumbnail, messageId, - type: 'quote', - index, + attachmentType: 'quote', + receivedAt, + sentAt, + urgency, }), }; })