signal-desktop/ts/jobs/AttachmentDownloadManager.ts

585 lines
17 KiB
TypeScript
Raw Normal View History

// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { omit } from 'lodash';
import * as durations from '../util/durations';
import * as log from '../logging/log';
import {
type AttachmentDownloadJobTypeType,
type AttachmentDownloadJobType,
2024-05-29 23:46:43 +00:00
type CoreAttachmentDownloadJobType,
coreAttachmentDownloadJobSchema,
} from '../types/AttachmentDownload';
import {
AttachmentPermanentlyUndownloadableError,
downloadAttachment as downloadAttachmentUtil,
} from '../util/downloadAttachment';
2024-07-22 18:16:33 +00:00
import { DataWriter } from '../sql/Client';
import { getValue } from '../RemoteConfig';
import { isInCall as isInCallSelector } from '../state/selectors/calling';
import {
AttachmentSizeError,
type AttachmentType,
AttachmentVariant,
mightBeOnBackupTier,
} from '../types/Attachment';
import { __DEPRECATED$getMessageById } from '../messages/getMessageById';
import {
KIBIBYTE,
getMaximumIncomingAttachmentSizeInKb,
getMaximumIncomingTextAttachmentSizeInKb,
} from '../types/AttachmentSize';
import { addAttachmentToMessage } from '../messageModifiers/AttachmentDownloads';
import * as Errors from '../types/errors';
import { redactGenericText } from '../util/privacy';
2024-05-29 23:46:43 +00:00
import {
JobManager,
type JobManagerParamsType,
type JobManagerJobResultType,
} from './JobManager';
import {
isImageTypeSupported,
isVideoTypeSupported,
} from '../util/GoogleChrome';
import type { MIMEType } from '../types/MIME';
import { AttachmentDownloadSource } from '../sql/Interface';
import { drop } from '../util/drop';
import { getAttachmentCiphertextLength } from '../AttachmentCrypto';
2024-10-02 19:03:10 +00:00
import { safeParsePartial } from '../util/schemas';
2024-10-28 22:25:15 +00:00
import { createBatcher } from '../util/batcher';
export enum AttachmentDownloadUrgency {
IMMEDIATE = 'immediate',
STANDARD = 'standard',
}
// Type for adding a new job
export type NewAttachmentDownloadJobType = {
attachment: AttachmentType;
messageId: string;
receivedAt: number;
sentAt: number;
attachmentType: AttachmentDownloadJobTypeType;
source: AttachmentDownloadSource;
urgency?: AttachmentDownloadUrgency;
};
2024-05-29 23:46:43 +00:00
const MAX_CONCURRENT_JOBS = 3;
const DEFAULT_RETRY_CONFIG = {
maxAttempts: 5,
backoffConfig: {
// 30 seconds, 5 minutes, 50 minutes, (max) 6 hrs
multiplier: 10,
firstBackoffs: [30 * durations.SECOND],
maxBackoffTime: 6 * durations.HOUR,
},
};
const BACKUP_RETRY_CONFIG = {
...DEFAULT_RETRY_CONFIG,
maxAttempts: Infinity,
};
2024-05-29 23:46:43 +00:00
type AttachmentDownloadManagerParamsType = Omit<
JobManagerParamsType<CoreAttachmentDownloadJobType>,
'getNextJobs' | 'runJob'
2024-05-29 23:46:43 +00:00
> & {
getNextJobs: (options: {
limit: number;
prioritizeMessageIds?: Array<string>;
sources?: Array<AttachmentDownloadSource>;
timestamp?: number;
}) => Promise<Array<AttachmentDownloadJobType>>;
runDownloadAttachmentJob: (args: {
job: AttachmentDownloadJobType;
isLastAttempt: boolean;
options?: { isForCurrentlyVisibleMessage: boolean };
dependencies?: DependenciesType;
}) => Promise<JobManagerJobResultType<CoreAttachmentDownloadJobType>>;
};
2024-05-29 23:46:43 +00:00
function getJobId(job: CoreAttachmentDownloadJobType): string {
const { messageId, attachmentType, digest } = job;
return `${messageId}.${attachmentType}.${digest}`;
}
function getJobIdForLogging(job: CoreAttachmentDownloadJobType): string {
const { sentAt, attachmentType, digest } = job;
const redactedDigest = redactGenericText(digest);
return `${sentAt}.${attachmentType}.${redactedDigest}`;
}
export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownloadJobType> {
private visibleTimelineMessages: Set<string> = new Set();
2024-10-28 22:25:15 +00:00
private saveJobsBatcher = createBatcher<AttachmentDownloadJobType>({
name: 'saveAttachmentDownloadJobs',
wait: 150,
maxSize: 1000,
processBatch: async jobs => {
await DataWriter.saveAttachmentDownloadJobs(jobs);
drop(this.maybeStartJobs());
},
});
2024-05-29 23:46:43 +00:00
private static _instance: AttachmentDownloadManager | undefined;
override logPrefix = 'AttachmentDownloadManager';
static defaultParams: AttachmentDownloadManagerParamsType = {
2024-07-22 18:16:33 +00:00
markAllJobsInactive: DataWriter.resetAttachmentDownloadActive,
2024-10-28 22:25:15 +00:00
saveJob: async (job, options) => {
if (options?.allowBatching) {
AttachmentDownloadManager._instance?.saveJobsBatcher.add(job);
} else {
await DataWriter.saveAttachmentDownloadJob(job);
}
},
2024-07-22 18:16:33 +00:00
removeJob: DataWriter.removeAttachmentDownloadJob,
getNextJobs: DataWriter.getNextAttachmentDownloadJobs,
runDownloadAttachmentJob,
2024-05-29 23:46:43 +00:00
shouldHoldOffOnStartingQueuedJobs: () => {
const reduxState = window.reduxStore?.getState();
if (reduxState) {
return isInCallSelector(reduxState);
}
return false;
},
2024-05-29 23:46:43 +00:00
getJobId,
getJobIdForLogging,
getRetryConfig: job =>
job.attachment.backupLocator?.mediaName
? BACKUP_RETRY_CONFIG
: DEFAULT_RETRY_CONFIG,
2024-05-29 23:46:43 +00:00
maxConcurrentJobs: MAX_CONCURRENT_JOBS,
};
2024-05-29 23:46:43 +00:00
constructor(params: AttachmentDownloadManagerParamsType) {
super({
...params,
getNextJobs: ({ limit }) => {
return params.getNextJobs({
limit,
prioritizeMessageIds: [...this.visibleTimelineMessages],
sources: window.storage.get('backupMediaDownloadPaused')
? [AttachmentDownloadSource.STANDARD]
: undefined,
2024-05-29 23:46:43 +00:00
timestamp: Date.now(),
});
},
runJob: (job: AttachmentDownloadJobType, isLastAttempt: boolean) => {
const isForCurrentlyVisibleMessage = this.visibleTimelineMessages.has(
job.messageId
);
return params.runDownloadAttachmentJob({
job,
isLastAttempt,
options: {
isForCurrentlyVisibleMessage,
},
});
},
2024-05-29 23:46:43 +00:00
});
}
2024-05-29 23:46:43 +00:00
// @ts-expect-error we are overriding the return type of JobManager's addJob
override async addJob(
newJobData: NewAttachmentDownloadJobType
): Promise<AttachmentType> {
const {
attachment,
messageId,
attachmentType,
receivedAt,
sentAt,
source,
urgency = AttachmentDownloadUrgency.STANDARD,
} = newJobData;
2024-10-02 19:03:10 +00:00
const parseResult = safeParsePartial(coreAttachmentDownloadJobSchema, {
messageId,
receivedAt,
sentAt,
attachmentType,
digest: attachment.digest,
contentType: attachment.contentType,
size: attachment.size,
ciphertextSize: getAttachmentCiphertextLength(attachment.size),
attachment,
// If the attachment does not have a backupLocator, we don't want to store it as a
// "backup import" attachment, since it's really just a normal attachment that we'll
// try to download from the transit tier (or it's an invalid attachment, etc.). We
// may need to extend the attachment_downloads table in the future to better
// differentiate source vs. location.
source: mightBeOnBackupTier(attachment)
? source
: AttachmentDownloadSource.STANDARD,
});
if (!parseResult.success) {
log.error(
`AttachmentDownloadManager/addJob(${sentAt}.${attachmentType}): invalid data`,
parseResult.error
);
return attachment;
}
const newJob = parseResult.data;
await this._addJob(newJob, {
2024-05-29 23:46:43 +00:00
forceStart: urgency === AttachmentDownloadUrgency.IMMEDIATE,
});
return attachment;
}
updateVisibleTimelineMessages(messageIds: Array<string>): void {
this.visibleTimelineMessages = new Set(messageIds);
}
static get instance(): AttachmentDownloadManager {
if (!AttachmentDownloadManager._instance) {
2024-05-29 23:46:43 +00:00
AttachmentDownloadManager._instance = new AttachmentDownloadManager(
AttachmentDownloadManager.defaultParams
);
}
return AttachmentDownloadManager._instance;
}
static async start(): Promise<void> {
2024-10-28 22:25:15 +00:00
await AttachmentDownloadManager.saveBatchedJobs();
await AttachmentDownloadManager.instance.start();
}
2024-10-28 22:25:15 +00:00
static async saveBatchedJobs(): Promise<void> {
await AttachmentDownloadManager.instance.saveJobsBatcher.flushAndWait();
}
static async stop(): Promise<void> {
return AttachmentDownloadManager._instance?.stop();
}
static async addJob(
newJob: NewAttachmentDownloadJobType
): Promise<AttachmentType> {
return AttachmentDownloadManager.instance.addJob(newJob);
}
static updateVisibleTimelineMessages(messageIds: Array<string>): void {
AttachmentDownloadManager.instance.updateVisibleTimelineMessages(
messageIds
);
}
static async waitForIdle(callback?: VoidFunction): Promise<void> {
await AttachmentDownloadManager.instance.waitForIdle();
if (callback) {
callback();
}
}
}
type DependenciesType = {
downloadAttachment: typeof downloadAttachmentUtil;
processNewAttachment: typeof window.Signal.Migrations.processNewAttachment;
};
async function runDownloadAttachmentJob({
job,
isLastAttempt,
options,
dependencies = {
downloadAttachment: downloadAttachmentUtil,
processNewAttachment: window.Signal.Migrations.processNewAttachment,
},
}: {
job: AttachmentDownloadJobType;
isLastAttempt: boolean;
options?: { isForCurrentlyVisibleMessage: boolean };
dependencies?: DependenciesType;
}): Promise<JobManagerJobResultType<CoreAttachmentDownloadJobType>> {
const jobIdForLogging = getJobIdForLogging(job);
const logId = `AttachmentDownloadManager/runDownloadAttachmentJob/${jobIdForLogging}`;
const message = await __DEPRECATED$getMessageById(job.messageId);
if (!message) {
log.error(`${logId} message not found`);
return { status: 'finished' };
}
try {
log.info(`${logId}: Starting job`);
const result = await runDownloadAttachmentJobInner({
job,
isForCurrentlyVisibleMessage:
options?.isForCurrentlyVisibleMessage ?? false,
dependencies,
});
if (result.downloadedVariant === AttachmentVariant.ThumbnailFromBackup) {
return {
status: 'finished',
newJob: { ...job, attachment: result.attachmentWithThumbnail },
};
}
if (mightBeOnBackupTier(job.attachment)) {
const currentDownloadedSize =
window.storage.get('backupMediaDownloadCompletedBytes') ?? 0;
drop(
window.storage.put(
'backupMediaDownloadCompletedBytes',
currentDownloadedSize + job.ciphertextSize
)
);
}
return {
status: 'finished',
};
} catch (error) {
log.error(
`${logId}: Failed to download attachment, attempt ${job.attempts}:`,
Errors.toLogFormat(error)
);
if (error instanceof AttachmentSizeError) {
await addAttachmentToMessage(
message.id,
_markAttachmentAsTooBig(job.attachment),
logId,
{ type: job.attachmentType }
);
return { status: 'finished' };
}
if (error instanceof AttachmentPermanentlyUndownloadableError) {
await addAttachmentToMessage(
message.id,
_markAttachmentAsPermanentlyErrored(job.attachment),
logId,
{ type: job.attachmentType }
);
return { status: 'finished' };
}
if (isLastAttempt) {
await addAttachmentToMessage(
message.id,
_markAttachmentAsTransientlyErrored(job.attachment),
logId,
{ type: job.attachmentType }
);
return { status: 'finished' };
}
// Remove `pending` flag from the attachment and retry later
await addAttachmentToMessage(
message.id,
{
...job.attachment,
pending: false,
},
logId,
{ type: job.attachmentType }
);
return { status: 'retry' };
} finally {
// This will fail if the message has been deleted before the download finished, which
// is good
2024-07-22 18:16:33 +00:00
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
});
}
}
type DownloadAttachmentResultType =
| { downloadedVariant: AttachmentVariant.Default }
| {
downloadedVariant: AttachmentVariant.ThumbnailFromBackup;
attachmentWithThumbnail: AttachmentType;
};
export async function runDownloadAttachmentJobInner({
job,
isForCurrentlyVisibleMessage,
dependencies,
}: {
job: AttachmentDownloadJobType;
isForCurrentlyVisibleMessage: boolean;
dependencies: DependenciesType;
}): Promise<DownloadAttachmentResultType> {
const { messageId, attachment, attachmentType } = job;
const jobIdForLogging = getJobIdForLogging(job);
let logId = `AttachmentDownloadManager/runDownloadJobInner(${jobIdForLogging})`;
if (!job || !attachment || !messageId) {
throw new Error(`${logId}: Key information required for job was missing.`);
}
const maxInKib = getMaximumIncomingAttachmentSizeInKb(getValue);
const maxTextAttachmentSizeInKib =
getMaximumIncomingTextAttachmentSizeInKb(getValue);
const { size } = attachment;
const sizeInKib = size / KIBIBYTE;
if (!Number.isFinite(size) || size < 0 || sizeInKib > maxInKib) {
throw new AttachmentSizeError(
`${logId}: Attachment was ${sizeInKib}kib, max is ${maxInKib}kib`
);
}
if (
attachmentType === 'long-message' &&
sizeInKib > maxTextAttachmentSizeInKib
) {
throw new AttachmentSizeError(
`${logId}: Text attachment was ${sizeInKib}kib, max is ${maxTextAttachmentSizeInKib}kib`
);
}
const preferBackupThumbnail =
isForCurrentlyVisibleMessage &&
mightHaveThumbnailOnBackupTier(job.attachment) &&
// TODO (DESKTOP-7204): check if thumbnail exists on attachment, not on job
!job.attachment.thumbnailFromBackup;
if (preferBackupThumbnail) {
logId += '.preferringBackupThumbnail';
}
if (preferBackupThumbnail) {
try {
const attachmentWithThumbnail = await downloadBackupThumbnail({
attachment,
dependencies,
});
await addAttachmentToMessage(messageId, attachmentWithThumbnail, logId, {
type: attachmentType,
});
return {
downloadedVariant: AttachmentVariant.ThumbnailFromBackup,
attachmentWithThumbnail,
};
} catch (e) {
log.warn(`${logId}: error when trying to download thumbnail`);
}
}
// TODO (DESKTOP-7204): currently we only set pending state when downloading the
// full-size attachment
await addAttachmentToMessage(
messageId,
{ ...attachment, pending: true },
logId,
{ type: attachmentType }
);
try {
const downloaded = await dependencies.downloadAttachment({
attachment,
variant: AttachmentVariant.Default,
});
const upgradedAttachment = await dependencies.processNewAttachment({
...omit(attachment, ['error', 'pending', 'downloadPath']),
...downloaded,
});
await addAttachmentToMessage(messageId, upgradedAttachment, logId, {
type: attachmentType,
});
return { downloadedVariant: AttachmentVariant.Default };
} catch (error) {
if (
!job.attachment.thumbnailFromBackup &&
mightHaveThumbnailOnBackupTier(attachment) &&
!preferBackupThumbnail
) {
log.error(
`${logId}: failed to download fullsize attachment, falling back to thumbnail`,
Errors.toLogFormat(error)
);
try {
const attachmentWithThumbnail = omit(
await downloadBackupThumbnail({
attachment,
dependencies,
}),
'pending'
);
await addAttachmentToMessage(
messageId,
attachmentWithThumbnail,
logId,
{
type: attachmentType,
}
);
return {
downloadedVariant: AttachmentVariant.ThumbnailFromBackup,
attachmentWithThumbnail,
};
} catch (thumbnailError) {
log.error(
`${logId}: fallback attempt to download thumbnail failed`,
Errors.toLogFormat(thumbnailError)
);
}
}
throw error;
}
}
async function downloadBackupThumbnail({
attachment,
dependencies,
}: {
attachment: AttachmentType;
dependencies: { downloadAttachment: typeof downloadAttachmentUtil };
}): Promise<AttachmentType> {
const downloadedThumbnail = await dependencies.downloadAttachment({
attachment,
variant: AttachmentVariant.ThumbnailFromBackup,
});
const attachmentWithThumbnail = {
...attachment,
thumbnailFromBackup: downloadedThumbnail,
};
return attachmentWithThumbnail;
}
function _markAttachmentAsTooBig(attachment: AttachmentType): AttachmentType {
return {
..._markAttachmentAsPermanentlyErrored(attachment),
wasTooBig: true,
};
}
function _markAttachmentAsPermanentlyErrored(
attachment: AttachmentType
): AttachmentType {
return { ...omit(attachment, ['key', 'id']), pending: false, error: true };
}
function _markAttachmentAsTransientlyErrored(
attachment: AttachmentType
): AttachmentType {
return { ...attachment, pending: false, error: true };
}
function mightHaveThumbnailOnBackupTier(
attachment: Pick<AttachmentType, 'backupLocator' | 'contentType'>
): boolean {
if (!attachment.backupLocator?.mediaName) {
return false;
}
return canAttachmentHaveThumbnail(attachment.contentType);
}
export function canAttachmentHaveThumbnail(contentType: MIMEType): boolean {
return isVideoTypeSupported(contentType) || isImageTypeSupported(contentType);
}