Enable attachment backup uploading
This commit is contained in:
parent
94a262b799
commit
4254356812
27 changed files with 2054 additions and 534 deletions
|
@ -38,6 +38,8 @@ const DIGEST_LENGTH = 32;
|
|||
const HEX_DIGEST_LENGTH = DIGEST_LENGTH * 2;
|
||||
const ATTACHMENT_MAC_LENGTH = 32;
|
||||
|
||||
export class ReencyptedDigestMismatchError extends Error {}
|
||||
|
||||
/** @private */
|
||||
export const KEY_SET_LENGTH = KEY_LENGTH + ATTACHMENT_MAC_LENGTH;
|
||||
|
||||
|
@ -45,6 +47,10 @@ export function _generateAttachmentIv(): Uint8Array {
|
|||
return randomBytes(IV_LENGTH);
|
||||
}
|
||||
|
||||
export function generateAttachmentKeys(): Uint8Array {
|
||||
return randomBytes(KEY_SET_LENGTH);
|
||||
}
|
||||
|
||||
export type EncryptedAttachmentV2 = {
|
||||
digest: Uint8Array;
|
||||
iv: Uint8Array;
|
||||
|
@ -201,11 +207,12 @@ export async function encryptAttachmentV2({
|
|||
|
||||
if (dangerousIv?.reason === 'reencrypting-for-backup') {
|
||||
if (!constantTimeEqual(ourDigest, dangerousIv.digestToMatch)) {
|
||||
throw new Error(
|
||||
throw new ReencyptedDigestMismatchError(
|
||||
`${logId}: iv was hardcoded for backup re-encryption, but digest does not match`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
digest: ourDigest,
|
||||
iv,
|
||||
|
|
|
@ -204,6 +204,7 @@ import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync';
|
|||
import { CallMode } from './types/Calling';
|
||||
import { queueSyncTasks } from './util/syncTasks';
|
||||
import { isEnabled } from './RemoteConfig';
|
||||
import { AttachmentBackupManager } from './jobs/AttachmentBackupManager';
|
||||
|
||||
export function isOverHourIntoPast(timestamp: number): boolean {
|
||||
return isNumber(timestamp) && isOlderThan(timestamp, HOUR);
|
||||
|
@ -831,9 +832,10 @@ export async function startApp(): Promise<void> {
|
|||
'background/shutdown: waiting for all attachment downloads to finish'
|
||||
);
|
||||
|
||||
// Since we canceled the inflight requests earlier in shutdown, this should
|
||||
// Since we canceled the inflight requests earlier in shutdown, these should
|
||||
// resolve quickly
|
||||
await AttachmentDownloadManager.stop();
|
||||
await AttachmentBackupManager.stop();
|
||||
|
||||
log.info('background/shutdown: closing the database');
|
||||
|
||||
|
@ -1588,6 +1590,7 @@ export async function startApp(): Promise<void> {
|
|||
|
||||
drop(challengeHandler?.onOffline());
|
||||
drop(AttachmentDownloadManager.stop());
|
||||
drop(AttachmentBackupManager.stop());
|
||||
drop(messageReceiver?.drain());
|
||||
|
||||
if (hasAppEverBeenRegistered) {
|
||||
|
@ -1725,6 +1728,7 @@ export async function startApp(): Promise<void> {
|
|||
|
||||
if (isBackupEnabled()) {
|
||||
backupsService.start();
|
||||
drop(AttachmentBackupManager.start());
|
||||
}
|
||||
|
||||
if (connectCount === 0) {
|
||||
|
|
374
ts/jobs/AttachmentBackupManager.ts
Normal file
374
ts/jobs/AttachmentBackupManager.ts
Normal file
|
@ -0,0 +1,374 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
/* eslint-disable max-classes-per-file */
|
||||
|
||||
import { existsSync } from 'fs';
|
||||
|
||||
import * as durations from '../util/durations';
|
||||
import * as log from '../logging/log';
|
||||
import dataInterface from '../sql/Client';
|
||||
|
||||
import * as Errors from '../types/errors';
|
||||
import { redactGenericText } from '../util/privacy';
|
||||
import {
|
||||
JobManager,
|
||||
type JobManagerParamsType,
|
||||
type JobManagerJobResultType,
|
||||
} from './JobManager';
|
||||
import { deriveBackupMediaKeyMaterial } from '../Crypto';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import { type BackupsService, backupsService } from '../services/backups';
|
||||
import {
|
||||
type EncryptedAttachmentV2,
|
||||
getAttachmentCiphertextLength,
|
||||
getAesCbcCiphertextLength,
|
||||
ReencyptedDigestMismatchError,
|
||||
} from '../AttachmentCrypto';
|
||||
import { getBackupKey } from '../services/backups/crypto';
|
||||
import { isMoreRecentThan } from '../util/timestamp';
|
||||
import type {
|
||||
AttachmentBackupJobType,
|
||||
CoreAttachmentBackupJobType,
|
||||
} from '../types/AttachmentBackup';
|
||||
import { isInCall as isInCallSelector } from '../state/selectors/calling';
|
||||
import { encryptAndUploadAttachment } from '../util/uploadAttachment';
|
||||
import { getMediaIdFromMediaName } from '../services/backups/util/mediaId';
|
||||
import { fromBase64 } from '../Bytes';
|
||||
import type { WebAPIType } from '../textsecure/WebAPI';
|
||||
|
||||
const TIME_ON_TRANSIT_TIER = 30 * durations.DAY;
|
||||
const MAX_CONCURRENT_JOBS = 3;
|
||||
const RETRY_CONFIG = {
|
||||
// As long as we have the file locally, we're always going to keep trying
|
||||
maxAttempts: Infinity,
|
||||
backoffConfig: {
|
||||
// 1 minute, 5 minutes, 25 minutes, every hour
|
||||
multiplier: 5,
|
||||
firstBackoffs: [durations.MINUTE],
|
||||
maxBackoffTime: durations.HOUR,
|
||||
},
|
||||
};
|
||||
|
||||
export class AttachmentBackupManager extends JobManager<CoreAttachmentBackupJobType> {
|
||||
private static _instance: AttachmentBackupManager | undefined;
|
||||
static defaultParams: JobManagerParamsType<CoreAttachmentBackupJobType> = {
|
||||
markAllJobsInactive: dataInterface.markAllAttachmentBackupJobsInactive,
|
||||
saveJob: dataInterface.saveAttachmentBackupJob,
|
||||
removeJob: dataInterface.removeAttachmentBackupJob,
|
||||
getNextJobs: dataInterface.getNextAttachmentBackupJobs,
|
||||
runJob: runAttachmentBackupJob,
|
||||
shouldHoldOffOnStartingQueuedJobs: () => {
|
||||
const reduxState = window.reduxStore?.getState();
|
||||
if (reduxState) {
|
||||
return isInCallSelector(reduxState);
|
||||
}
|
||||
return false;
|
||||
},
|
||||
getJobId,
|
||||
getJobIdForLogging,
|
||||
getRetryConfig: () => RETRY_CONFIG,
|
||||
maxConcurrentJobs: MAX_CONCURRENT_JOBS,
|
||||
};
|
||||
|
||||
override logPrefix = 'AttachmentBackupManager';
|
||||
|
||||
static get instance(): AttachmentBackupManager {
|
||||
if (!AttachmentBackupManager._instance) {
|
||||
AttachmentBackupManager._instance = new AttachmentBackupManager(
|
||||
AttachmentBackupManager.defaultParams
|
||||
);
|
||||
}
|
||||
return AttachmentBackupManager._instance;
|
||||
}
|
||||
|
||||
static async start(): Promise<void> {
|
||||
log.info('AttachmentBackupManager/starting');
|
||||
await AttachmentBackupManager.instance.start();
|
||||
}
|
||||
|
||||
static async stop(): Promise<void> {
|
||||
log.info('AttachmentBackupManager/stopping');
|
||||
return AttachmentBackupManager._instance?.stop();
|
||||
}
|
||||
|
||||
static async addJob(newJob: CoreAttachmentBackupJobType): Promise<void> {
|
||||
return AttachmentBackupManager.instance.addJob(newJob);
|
||||
}
|
||||
}
|
||||
|
||||
function getJobId(job: CoreAttachmentBackupJobType): string {
|
||||
return job.mediaName;
|
||||
}
|
||||
|
||||
function getJobIdForLogging(job: CoreAttachmentBackupJobType): string {
|
||||
return redactGenericText(job.mediaName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Backup-specific methods
|
||||
*/
|
||||
class AttachmentPermanentlyMissingError extends Error {}
|
||||
class FileNotFoundOnTransitTierError extends Error {}
|
||||
|
||||
type RunAttachmentBackupJobDependenciesType = {
|
||||
getAbsoluteAttachmentPath: typeof window.Signal.Migrations.getAbsoluteAttachmentPath;
|
||||
backupMediaBatch?: WebAPIType['backupMediaBatch'];
|
||||
backupsService: BackupsService;
|
||||
encryptAndUploadAttachment: typeof encryptAndUploadAttachment;
|
||||
};
|
||||
|
||||
export async function runAttachmentBackupJob(
|
||||
job: AttachmentBackupJobType,
|
||||
_isLastAttempt: boolean,
|
||||
dependencies: RunAttachmentBackupJobDependenciesType = {
|
||||
getAbsoluteAttachmentPath:
|
||||
window.Signal.Migrations.getAbsoluteAttachmentPath,
|
||||
backupsService,
|
||||
backupMediaBatch: window.textsecure.server?.backupMediaBatch,
|
||||
encryptAndUploadAttachment,
|
||||
}
|
||||
): Promise<JobManagerJobResultType> {
|
||||
const jobIdForLogging = getJobIdForLogging(job);
|
||||
const logId = `AttachmentBackupManager/runAttachmentBackupJob/${jobIdForLogging}`;
|
||||
try {
|
||||
await runAttachmentBackupJobInner(job, dependencies);
|
||||
return { status: 'finished' };
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`${logId}: Failed to backup attachment, attempt ${job.attempts}`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
|
||||
if (error instanceof AttachmentPermanentlyMissingError) {
|
||||
log.error(`${logId}: Attachment unable to be found, giving up on job`);
|
||||
return { status: 'finished' };
|
||||
}
|
||||
|
||||
if (error instanceof ReencyptedDigestMismatchError) {
|
||||
log.error(
|
||||
`${logId}: Unable to reencrypt to match same digest; content must have changed`
|
||||
);
|
||||
return { status: 'finished' };
|
||||
}
|
||||
|
||||
return { status: 'retry' };
|
||||
}
|
||||
}
|
||||
|
||||
async function runAttachmentBackupJobInner(
|
||||
job: AttachmentBackupJobType,
|
||||
dependencies: RunAttachmentBackupJobDependenciesType
|
||||
): Promise<void> {
|
||||
const jobIdForLogging = getJobIdForLogging(job);
|
||||
const logId = `AttachmentBackupManager.UploadOrCopyToBackupTier(mediaName:${jobIdForLogging})`;
|
||||
|
||||
log.info(`${logId}: starting`);
|
||||
|
||||
const { mediaName, type } = job;
|
||||
|
||||
// TODO (DESKTOP-6913): generate & upload thumbnail
|
||||
strictAssert(
|
||||
type === 'standard',
|
||||
'Only standard uploads are currently supported'
|
||||
);
|
||||
|
||||
const { path, transitCdnInfo, iv, digest, keys, size } = job.data;
|
||||
|
||||
const mediaId = getMediaIdFromMediaName(mediaName);
|
||||
const backupKeyMaterial = deriveBackupMediaKeyMaterial(
|
||||
getBackupKey(),
|
||||
mediaId.bytes
|
||||
);
|
||||
|
||||
const { isInBackupTier } = await dependencies.backupsService.getBackupCdnInfo(
|
||||
mediaId.string
|
||||
);
|
||||
|
||||
if (isInBackupTier) {
|
||||
log.info(`${logId}: object already in backup tier, done`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (transitCdnInfo) {
|
||||
const {
|
||||
cdnKey: transitCdnKey,
|
||||
cdnNumber: transitCdnNumber,
|
||||
uploadTimestamp: transitCdnUploadTimestamp,
|
||||
} = transitCdnInfo;
|
||||
if (
|
||||
transitCdnKey &&
|
||||
transitCdnNumber != null &&
|
||||
(!transitCdnUploadTimestamp ||
|
||||
isMoreRecentThan(transitCdnUploadTimestamp, TIME_ON_TRANSIT_TIER))
|
||||
) {
|
||||
// If we have transit CDN info, optimistically try to copy to the backup tier
|
||||
try {
|
||||
await copyToBackupTier({
|
||||
cdnKey: transitCdnKey,
|
||||
cdnNumber: transitCdnNumber,
|
||||
size,
|
||||
mediaId: mediaId.string,
|
||||
...backupKeyMaterial,
|
||||
dependencies,
|
||||
});
|
||||
log.info(`${logId}: copied to backup tier successfully`);
|
||||
return;
|
||||
} catch (e) {
|
||||
if (e instanceof FileNotFoundOnTransitTierError) {
|
||||
log.info(
|
||||
`${logId}: file not found on transit tier, uploadTimestamp: ${transitCdnUploadTimestamp}`
|
||||
);
|
||||
} else {
|
||||
log.error(
|
||||
`${logId}: error copying to backup tier`,
|
||||
Errors.toLogFormat(e)
|
||||
);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!path) {
|
||||
throw new AttachmentPermanentlyMissingError(
|
||||
'File not on transit tier and no path property'
|
||||
);
|
||||
}
|
||||
|
||||
const absolutePath = dependencies.getAbsoluteAttachmentPath(path);
|
||||
if (!existsSync(absolutePath)) {
|
||||
throw new AttachmentPermanentlyMissingError('No file at provided path');
|
||||
}
|
||||
|
||||
log.info(`${logId}: uploading to transit tier`);
|
||||
const uploadResult = await uploadToTransitTier({
|
||||
absolutePath,
|
||||
keys,
|
||||
iv,
|
||||
digest,
|
||||
logPrefix: logId,
|
||||
dependencies,
|
||||
});
|
||||
|
||||
log.info(`${logId}: copying to backup tier`);
|
||||
await copyToBackupTier({
|
||||
cdnKey: uploadResult.cdnKey,
|
||||
cdnNumber: uploadResult.cdnNumber,
|
||||
size,
|
||||
mediaId: mediaId.string,
|
||||
...backupKeyMaterial,
|
||||
dependencies,
|
||||
});
|
||||
}
|
||||
|
||||
type UploadResponseType = {
|
||||
cdnKey: string;
|
||||
cdnNumber: number;
|
||||
encrypted: EncryptedAttachmentV2;
|
||||
};
|
||||
|
||||
async function uploadToTransitTier({
|
||||
absolutePath,
|
||||
keys,
|
||||
iv,
|
||||
digest,
|
||||
logPrefix,
|
||||
dependencies,
|
||||
}: {
|
||||
absolutePath: string;
|
||||
iv: string;
|
||||
digest: string;
|
||||
keys: string;
|
||||
logPrefix: string;
|
||||
dependencies: {
|
||||
encryptAndUploadAttachment: typeof encryptAndUploadAttachment;
|
||||
};
|
||||
}): Promise<UploadResponseType> {
|
||||
try {
|
||||
const uploadResult = await dependencies.encryptAndUploadAttachment({
|
||||
plaintext: { absolutePath },
|
||||
keys: fromBase64(keys),
|
||||
dangerousIv: {
|
||||
reason: 'reencrypting-for-backup',
|
||||
iv: fromBase64(iv),
|
||||
digestToMatch: fromBase64(digest),
|
||||
},
|
||||
uploadType: 'backup',
|
||||
});
|
||||
return uploadResult;
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`${logPrefix}/uploadToTransitTier: Error while encrypting and uploading`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export const FILE_NOT_FOUND_ON_TRANSIT_TIER_STATUS = 410;
|
||||
|
||||
async function copyToBackupTier({
|
||||
cdnNumber,
|
||||
cdnKey,
|
||||
size,
|
||||
mediaId,
|
||||
macKey,
|
||||
aesKey,
|
||||
iv,
|
||||
dependencies,
|
||||
}: {
|
||||
cdnNumber: number;
|
||||
cdnKey: string;
|
||||
size: number;
|
||||
mediaId: string;
|
||||
macKey: Uint8Array;
|
||||
aesKey: Uint8Array;
|
||||
iv: Uint8Array;
|
||||
dependencies: {
|
||||
backupMediaBatch?: WebAPIType['backupMediaBatch'];
|
||||
backupsService: BackupsService;
|
||||
};
|
||||
}): Promise<{ cdnNumberOnBackup: number }> {
|
||||
strictAssert(
|
||||
dependencies.backupMediaBatch,
|
||||
'backupMediaBatch must be intialized'
|
||||
);
|
||||
const ciphertextLength = getAttachmentCiphertextLength(size);
|
||||
|
||||
const { responses } = await dependencies.backupMediaBatch({
|
||||
headers: await dependencies.backupsService.credentials.getHeadersForToday(),
|
||||
items: [
|
||||
{
|
||||
sourceAttachment: {
|
||||
cdn: cdnNumber,
|
||||
key: cdnKey,
|
||||
},
|
||||
objectLength: ciphertextLength,
|
||||
mediaId,
|
||||
hmacKey: macKey,
|
||||
encryptionKey: aesKey,
|
||||
iv,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const response = responses[0];
|
||||
if (!response.isSuccess) {
|
||||
if (response.status === FILE_NOT_FOUND_ON_TRANSIT_TIER_STATUS) {
|
||||
throw new FileNotFoundOnTransitTierError();
|
||||
}
|
||||
throw new Error(
|
||||
`copyToBackupTier failed: ${response.failureReason}, code: ${response.status}`
|
||||
);
|
||||
}
|
||||
|
||||
// Update our local understanding of what's in the backup cdn
|
||||
const sizeOnBackupCdn = getAesCbcCiphertextLength(ciphertextLength);
|
||||
await window.Signal.Data.saveBackupCdnObjectMetadata([
|
||||
{ mediaId, cdnNumber: response.cdn, sizeOnBackupCdn },
|
||||
]);
|
||||
|
||||
return {
|
||||
cdnNumberOnBackup: response.cdn,
|
||||
};
|
||||
}
|
|
@ -2,15 +2,13 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import { omit } from 'lodash';
|
||||
|
||||
import { drop } from '../util/drop';
|
||||
import * as durations from '../util/durations';
|
||||
import { missingCaseError } from '../util/missingCaseError';
|
||||
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
|
||||
import * as log from '../logging/log';
|
||||
import {
|
||||
type AttachmentDownloadJobTypeType,
|
||||
type AttachmentDownloadJobType,
|
||||
attachmentDownloadJobSchema,
|
||||
type CoreAttachmentDownloadJobType,
|
||||
coreAttachmentDownloadJobSchema,
|
||||
} from '../types/AttachmentDownload';
|
||||
import {
|
||||
AttachmentNotFoundOnCdnError,
|
||||
|
@ -19,15 +17,7 @@ import {
|
|||
import dataInterface from '../sql/Client';
|
||||
import { getValue } from '../RemoteConfig';
|
||||
|
||||
import {
|
||||
explodePromise,
|
||||
type ExplodePromiseResultType,
|
||||
} from '../util/explodePromise';
|
||||
import { isInCall as isInCallSelector } from '../state/selectors/calling';
|
||||
import {
|
||||
type ExponentialBackoffOptionsType,
|
||||
exponentialBackoffSleepTime,
|
||||
} from '../util/exponentialBackoff';
|
||||
import { AttachmentSizeError, type AttachmentType } from '../types/Attachment';
|
||||
import { __DEPRECATED$getMessageById } from '../messages/getMessageById';
|
||||
import type { MessageModel } from '../models/messages';
|
||||
|
@ -39,20 +29,17 @@ import {
|
|||
import { addAttachmentToMessage } from '../messageModifiers/AttachmentDownloads';
|
||||
import * as Errors from '../types/errors';
|
||||
import { redactGenericText } from '../util/privacy';
|
||||
import {
|
||||
JobManager,
|
||||
type JobManagerParamsType,
|
||||
type JobManagerJobResultType,
|
||||
} from './JobManager';
|
||||
|
||||
export enum AttachmentDownloadUrgency {
|
||||
IMMEDIATE = 'immediate',
|
||||
STANDARD = 'standard',
|
||||
}
|
||||
|
||||
const TICK_INTERVAL = durations.MINUTE;
|
||||
const MAX_CONCURRENT_JOBS = 3;
|
||||
|
||||
type AttachmentDownloadJobIdentifiersType = Pick<
|
||||
AttachmentDownloadJobType,
|
||||
'messageId' | 'attachmentType' | 'digest'
|
||||
>;
|
||||
|
||||
// Type for adding a new job
|
||||
export type NewAttachmentDownloadJobType = {
|
||||
attachment: AttachmentType;
|
||||
|
@ -63,117 +50,78 @@ export type NewAttachmentDownloadJobType = {
|
|||
urgency?: AttachmentDownloadUrgency;
|
||||
};
|
||||
|
||||
const RETRY_CONFIG: Record<
|
||||
'default',
|
||||
{ maxRetries: number; backoffConfig: ExponentialBackoffOptionsType }
|
||||
> = {
|
||||
default: {
|
||||
maxRetries: 4,
|
||||
backoffConfig: {
|
||||
// 30 seconds, 5 minutes, 50 minutes, (max) 6 hrs
|
||||
multiplier: 10,
|
||||
firstBackoffTime: 30 * durations.SECOND,
|
||||
maxBackoffTime: 6 * durations.HOUR,
|
||||
},
|
||||
const MAX_CONCURRENT_JOBS = 3;
|
||||
|
||||
const DEFAULT_RETRY_CONFIG = {
|
||||
maxAttempts: 5,
|
||||
backoffConfig: {
|
||||
// 30 seconds, 5 minutes, 50 minutes, (max) 6 hrs
|
||||
multiplier: 10,
|
||||
firstBackoffs: [30 * durations.SECOND],
|
||||
maxBackoffTime: 6 * durations.HOUR,
|
||||
},
|
||||
};
|
||||
|
||||
type AttachmentDownloadManagerParamsType = {
|
||||
type AttachmentDownloadManagerParamsType = Omit<
|
||||
JobManagerParamsType<CoreAttachmentDownloadJobType>,
|
||||
'getNextJobs'
|
||||
> & {
|
||||
getNextJobs: (options: {
|
||||
limit: number;
|
||||
prioritizeMessageIds?: Array<string>;
|
||||
timestamp?: number;
|
||||
}) => Promise<Array<AttachmentDownloadJobType>>;
|
||||
|
||||
saveJob: (job: AttachmentDownloadJobType) => Promise<void>;
|
||||
removeJob: (job: AttachmentDownloadJobType) => Promise<unknown>;
|
||||
runJob: (
|
||||
job: AttachmentDownloadJobType,
|
||||
isLastAttempt: boolean
|
||||
) => Promise<JobResultType>;
|
||||
isInCall: () => boolean;
|
||||
beforeStart?: () => Promise<void>;
|
||||
maxAttempts: number;
|
||||
};
|
||||
export type JobResultType = { status: 'retry' | 'finished' };
|
||||
export class AttachmentDownloadManager {
|
||||
private static _instance: AttachmentDownloadManager | undefined;
|
||||
|
||||
function getJobId(job: CoreAttachmentDownloadJobType): string {
|
||||
const { messageId, attachmentType, digest } = job;
|
||||
return `${messageId}.${attachmentType}.${digest}`;
|
||||
}
|
||||
|
||||
function getJobIdForLogging(job: CoreAttachmentDownloadJobType): string {
|
||||
const { sentAt, attachmentType, digest } = job;
|
||||
const redactedDigest = redactGenericText(digest);
|
||||
return `${sentAt}.${attachmentType}.${redactedDigest}`;
|
||||
}
|
||||
|
||||
export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownloadJobType> {
|
||||
private visibleTimelineMessages: Array<string> = [];
|
||||
private enabled: boolean = false;
|
||||
private activeJobs: Map<
|
||||
string,
|
||||
{
|
||||
completionPromise: ExplodePromiseResultType<void>;
|
||||
job: AttachmentDownloadJobType;
|
||||
}
|
||||
> = new Map();
|
||||
private timeout: NodeJS.Timeout | null = null;
|
||||
private jobStartPromises: Map<string, ExplodePromiseResultType<void>> =
|
||||
new Map();
|
||||
private jobCompletePromises: Map<string, ExplodePromiseResultType<void>> =
|
||||
new Map();
|
||||
private static _instance: AttachmentDownloadManager | undefined;
|
||||
override logPrefix = 'AttachmentDownloadManager';
|
||||
|
||||
static defaultParams: AttachmentDownloadManagerParamsType = {
|
||||
beforeStart: dataInterface.resetAttachmentDownloadActive,
|
||||
getNextJobs: dataInterface.getNextAttachmentDownloadJobs,
|
||||
markAllJobsInactive: dataInterface.resetAttachmentDownloadActive,
|
||||
saveJob: dataInterface.saveAttachmentDownloadJob,
|
||||
removeJob: dataInterface.removeAttachmentDownloadJob,
|
||||
getNextJobs: dataInterface.getNextAttachmentDownloadJobs,
|
||||
runJob: runDownloadAttachmentJob,
|
||||
isInCall: () => {
|
||||
shouldHoldOffOnStartingQueuedJobs: () => {
|
||||
const reduxState = window.reduxStore?.getState();
|
||||
if (reduxState) {
|
||||
return isInCallSelector(reduxState);
|
||||
}
|
||||
return false;
|
||||
},
|
||||
maxAttempts: RETRY_CONFIG.default.maxRetries + 1,
|
||||
getJobId,
|
||||
getJobIdForLogging,
|
||||
getRetryConfig: () => DEFAULT_RETRY_CONFIG,
|
||||
maxConcurrentJobs: MAX_CONCURRENT_JOBS,
|
||||
};
|
||||
|
||||
readonly getNextJobs: AttachmentDownloadManagerParamsType['getNextJobs'];
|
||||
readonly saveJob: AttachmentDownloadManagerParamsType['saveJob'];
|
||||
readonly removeJob: AttachmentDownloadManagerParamsType['removeJob'];
|
||||
readonly runJob: AttachmentDownloadManagerParamsType['runJob'];
|
||||
readonly beforeStart: AttachmentDownloadManagerParamsType['beforeStart'];
|
||||
readonly isInCall: AttachmentDownloadManagerParamsType['isInCall'];
|
||||
readonly maxAttempts: number;
|
||||
|
||||
constructor(
|
||||
params: AttachmentDownloadManagerParamsType = AttachmentDownloadManager.defaultParams
|
||||
) {
|
||||
this.getNextJobs = params.getNextJobs;
|
||||
this.saveJob = params.saveJob;
|
||||
this.removeJob = params.removeJob;
|
||||
this.runJob = params.runJob;
|
||||
this.beforeStart = params.beforeStart;
|
||||
this.isInCall = params.isInCall;
|
||||
this.maxAttempts = params.maxAttempts;
|
||||
constructor(params: AttachmentDownloadManagerParamsType) {
|
||||
super({
|
||||
...params,
|
||||
getNextJobs: ({ limit }) => {
|
||||
return params.getNextJobs({
|
||||
limit,
|
||||
prioritizeMessageIds: this.visibleTimelineMessages,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
this.enabled = true;
|
||||
await this.beforeStart?.();
|
||||
this.tick();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.enabled = false;
|
||||
clearTimeoutIfNecessary(this.timeout);
|
||||
this.timeout = null;
|
||||
await Promise.all(
|
||||
[...this.activeJobs.values()].map(
|
||||
({ completionPromise }) => completionPromise.promise
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
tick(): void {
|
||||
clearTimeoutIfNecessary(this.timeout);
|
||||
this.timeout = null;
|
||||
drop(this.maybeStartJobs());
|
||||
this.timeout = setTimeout(() => this.tick(), TICK_INTERVAL);
|
||||
}
|
||||
|
||||
async addJob(
|
||||
// @ts-expect-error we are overriding the return type of JobManager's addJob
|
||||
override async addJob(
|
||||
newJobData: NewAttachmentDownloadJobType
|
||||
): Promise<AttachmentType> {
|
||||
const {
|
||||
|
@ -184,7 +132,7 @@ export class AttachmentDownloadManager {
|
|||
sentAt,
|
||||
urgency = AttachmentDownloadUrgency.STANDARD,
|
||||
} = newJobData;
|
||||
const parseResult = attachmentDownloadJobSchema.safeParse({
|
||||
const parseResult = coreAttachmentDownloadJobSchema.safeParse({
|
||||
messageId,
|
||||
receivedAt,
|
||||
sentAt,
|
||||
|
@ -193,10 +141,6 @@ export class AttachmentDownloadManager {
|
|||
contentType: attachment.contentType,
|
||||
size: attachment.size,
|
||||
attachment,
|
||||
active: false,
|
||||
attempts: 0,
|
||||
retryAfter: null,
|
||||
lastAttemptTimestamp: null,
|
||||
});
|
||||
|
||||
if (!parseResult.success) {
|
||||
|
@ -208,42 +152,18 @@ export class AttachmentDownloadManager {
|
|||
}
|
||||
|
||||
const newJob = parseResult.data;
|
||||
const jobIdForLogging = getJobIdForLogging(newJob);
|
||||
const logId = `AttachmentDownloadManager/addJob(${jobIdForLogging})`;
|
||||
|
||||
try {
|
||||
const runningJob = this.getRunningJob(newJob);
|
||||
if (runningJob) {
|
||||
log.info(`${logId}: already running; resetting attempts`);
|
||||
runningJob.attempts = 0;
|
||||
const { isAlreadyRunning } = await this._addJob(newJob, {
|
||||
forceStart: urgency === AttachmentDownloadUrgency.IMMEDIATE,
|
||||
});
|
||||
|
||||
await this.saveJob({
|
||||
...runningJob,
|
||||
attempts: 0,
|
||||
});
|
||||
return attachment;
|
||||
}
|
||||
|
||||
await this.saveJob(newJob);
|
||||
} catch (e) {
|
||||
log.error(`${logId}: error saving job`, Errors.toLogFormat(e));
|
||||
}
|
||||
|
||||
switch (urgency) {
|
||||
case AttachmentDownloadUrgency.IMMEDIATE:
|
||||
log.info(`${logId}: starting job immediately`);
|
||||
drop(this.startJob(newJob));
|
||||
break;
|
||||
case AttachmentDownloadUrgency.STANDARD:
|
||||
drop(this.maybeStartJobs());
|
||||
break;
|
||||
default:
|
||||
throw missingCaseError(urgency);
|
||||
if (isAlreadyRunning) {
|
||||
return attachment;
|
||||
}
|
||||
|
||||
return {
|
||||
...attachment,
|
||||
pending: !this.shouldHoldOffOnStartingQueuedJobs(),
|
||||
pending: !this.params.shouldHoldOffOnStartingQueuedJobs?.(),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -251,220 +171,11 @@ export class AttachmentDownloadManager {
|
|||
this.visibleTimelineMessages = messageIds;
|
||||
}
|
||||
|
||||
// used in testing
|
||||
public waitForJobToBeStarted(job: AttachmentDownloadJobType): Promise<void> {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
const existingPromise = this.jobStartPromises.get(id)?.promise;
|
||||
if (existingPromise) {
|
||||
return existingPromise;
|
||||
}
|
||||
const { promise, resolve, reject } = explodePromise<void>();
|
||||
this.jobStartPromises.set(id, { promise, resolve, reject });
|
||||
return promise;
|
||||
}
|
||||
|
||||
public waitForJobToBeCompleted(
|
||||
job: AttachmentDownloadJobType
|
||||
): Promise<void> {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
const existingPromise = this.jobCompletePromises.get(id)?.promise;
|
||||
if (existingPromise) {
|
||||
return existingPromise;
|
||||
}
|
||||
const { promise, resolve, reject } = explodePromise<void>();
|
||||
this.jobCompletePromises.set(id, { promise, resolve, reject });
|
||||
return promise;
|
||||
}
|
||||
|
||||
// Private methods
|
||||
|
||||
// maybeStartJobs is called:
|
||||
// 1. every minute (via tick)
|
||||
// 2. after a job is added (via addJob)
|
||||
// 3. after a job finishes (via startJob)
|
||||
// preventing re-entrancy allow us to simplify some logic and ensure we don't try to
|
||||
// start too many jobs
|
||||
private _inMaybeStartJobs = false;
|
||||
private async maybeStartJobs(): Promise<void> {
|
||||
if (this._inMaybeStartJobs) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this._inMaybeStartJobs = true;
|
||||
|
||||
if (!this.enabled) {
|
||||
log.info(
|
||||
'AttachmentDownloadManager/_maybeStartJobs: not enabled, returning'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const numJobsToStart = this.getMaximumNumberOfJobsToStart();
|
||||
|
||||
if (numJobsToStart <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nextJobs = await this.getNextJobs({
|
||||
limit: numJobsToStart,
|
||||
// TODO (DESKTOP-6912): we'll want to prioritize more than just visible timeline
|
||||
// messages, including:
|
||||
// - media opened in lightbox
|
||||
// - media for stories
|
||||
prioritizeMessageIds: [...this.visibleTimelineMessages],
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
if (nextJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.shouldHoldOffOnStartingQueuedJobs()) {
|
||||
log.info(
|
||||
`AttachmentDownloadManager/_maybeStartJobs: holding off on starting ${nextJobs.length} new job(s)`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO (DESKTOP-6913): if a prioritized job is selected, we will to update the
|
||||
// in-memory job with that information so we can handle it differently, including
|
||||
// e.g. downloading a thumbnail before the full-size version
|
||||
for (const job of nextJobs) {
|
||||
drop(this.startJob(job));
|
||||
}
|
||||
} finally {
|
||||
this._inMaybeStartJobs = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async startJob(job: AttachmentDownloadJobType): Promise<void> {
|
||||
const logId = `AttachmentDownloadManager/startJob(${getJobIdForLogging(
|
||||
job
|
||||
)})`;
|
||||
if (this.isJobRunning(job)) {
|
||||
log.info(`${logId}: job is already running`);
|
||||
return;
|
||||
}
|
||||
const isLastAttempt = job.attempts + 1 >= this.maxAttempts;
|
||||
|
||||
try {
|
||||
log.info(`${logId}: starting job`);
|
||||
this.addRunningJob(job);
|
||||
await this.saveJob({ ...job, active: true });
|
||||
this.handleJobStartPromises(job);
|
||||
|
||||
const { status } = await this.runJob(job, isLastAttempt);
|
||||
log.info(`${logId}: job completed with status: ${status}`);
|
||||
|
||||
switch (status) {
|
||||
case 'finished':
|
||||
await this.removeJob(job);
|
||||
return;
|
||||
case 'retry':
|
||||
if (isLastAttempt) {
|
||||
throw new Error('Cannot retry on last attempt');
|
||||
}
|
||||
await this.retryJobLater(job);
|
||||
return;
|
||||
default:
|
||||
throw missingCaseError(status);
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(`${logId}: error when running job`, e);
|
||||
if (isLastAttempt) {
|
||||
await this.removeJob(job);
|
||||
} else {
|
||||
await this.retryJobLater(job);
|
||||
}
|
||||
} finally {
|
||||
this.removeRunningJob(job);
|
||||
drop(this.maybeStartJobs());
|
||||
}
|
||||
}
|
||||
|
||||
private async retryJobLater(job: AttachmentDownloadJobType) {
|
||||
const now = Date.now();
|
||||
await this.saveJob({
|
||||
...job,
|
||||
active: false,
|
||||
attempts: job.attempts + 1,
|
||||
// TODO (DESKTOP-6845): adjust retry based on job type (e.g. backup)
|
||||
retryAfter:
|
||||
now +
|
||||
exponentialBackoffSleepTime(
|
||||
job.attempts + 1,
|
||||
RETRY_CONFIG.default.backoffConfig
|
||||
),
|
||||
lastAttemptTimestamp: now,
|
||||
});
|
||||
}
|
||||
|
||||
private getActiveJobCount(): number {
|
||||
return this.activeJobs.size;
|
||||
}
|
||||
|
||||
private getMaximumNumberOfJobsToStart(): number {
|
||||
return MAX_CONCURRENT_JOBS - this.getActiveJobCount();
|
||||
}
|
||||
|
||||
private getRunningJob(
|
||||
job: AttachmentDownloadJobIdentifiersType
|
||||
): AttachmentDownloadJobType | undefined {
|
||||
const id = this.getJobId(job);
|
||||
return this.activeJobs.get(id)?.job;
|
||||
}
|
||||
|
||||
private isJobRunning(job: AttachmentDownloadJobType): boolean {
|
||||
return Boolean(this.getRunningJob(job));
|
||||
}
|
||||
|
||||
private removeRunningJob(job: AttachmentDownloadJobType) {
|
||||
const idWithAttempts = this.getJobIdIncludingAttempts(job);
|
||||
this.jobCompletePromises.get(idWithAttempts)?.resolve();
|
||||
this.jobCompletePromises.delete(idWithAttempts);
|
||||
|
||||
const id = this.getJobId(job);
|
||||
this.activeJobs.get(id)?.completionPromise.resolve();
|
||||
this.activeJobs.delete(id);
|
||||
}
|
||||
|
||||
private addRunningJob(job: AttachmentDownloadJobType) {
|
||||
if (this.isJobRunning(job)) {
|
||||
const jobIdForLogging = getJobIdForLogging(job);
|
||||
log.warn(
|
||||
`AttachmentDownloadManager/addRunningJob: job ${jobIdForLogging} is already running`
|
||||
);
|
||||
}
|
||||
this.activeJobs.set(this.getJobId(job), {
|
||||
completionPromise: explodePromise<void>(),
|
||||
job,
|
||||
});
|
||||
}
|
||||
|
||||
private handleJobStartPromises(job: AttachmentDownloadJobType) {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
this.jobStartPromises.get(id)?.resolve();
|
||||
this.jobStartPromises.delete(id);
|
||||
}
|
||||
|
||||
private getJobIdIncludingAttempts(job: AttachmentDownloadJobType) {
|
||||
return `${this.getJobId(job)}.${job.attempts}`;
|
||||
}
|
||||
|
||||
private getJobId(job: AttachmentDownloadJobIdentifiersType): string {
|
||||
const { messageId, attachmentType, digest } = job;
|
||||
return `${messageId}.${attachmentType}.${digest}`;
|
||||
}
|
||||
|
||||
private shouldHoldOffOnStartingQueuedJobs(): boolean {
|
||||
return this.isInCall();
|
||||
}
|
||||
|
||||
// Static methods
|
||||
static get instance(): AttachmentDownloadManager {
|
||||
if (!AttachmentDownloadManager._instance) {
|
||||
AttachmentDownloadManager._instance = new AttachmentDownloadManager();
|
||||
AttachmentDownloadManager._instance = new AttachmentDownloadManager(
|
||||
AttachmentDownloadManager.defaultParams
|
||||
);
|
||||
}
|
||||
return AttachmentDownloadManager._instance;
|
||||
}
|
||||
|
@ -492,10 +203,13 @@ export class AttachmentDownloadManager {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO (DESKTOP-6913): if a prioritized job is selected, we will to update the
|
||||
// in-memory job with that information so we can handle it differently, including
|
||||
// e.g. downloading a thumbnail before the full-size version
|
||||
async function runDownloadAttachmentJob(
|
||||
job: AttachmentDownloadJobType,
|
||||
isLastAttempt: boolean
|
||||
): Promise<JobResultType> {
|
||||
): Promise<JobManagerJobResultType> {
|
||||
const jobIdForLogging = getJobIdForLogging(job);
|
||||
const logId = `AttachmentDownloadManager/runDownloadAttachmentJob/${jobIdForLogging}`;
|
||||
|
||||
|
@ -640,9 +354,3 @@ function _markAttachmentAsTransientlyErrored(
|
|||
): AttachmentType {
|
||||
return { ...attachment, pending: false, error: true };
|
||||
}
|
||||
|
||||
function getJobIdForLogging(job: AttachmentDownloadJobType): string {
|
||||
const { sentAt, attachmentType, digest } = job;
|
||||
const redactedDigest = redactGenericText(digest);
|
||||
return `${sentAt}.${attachmentType}.${redactedDigest}`;
|
||||
}
|
||||
|
|
358
ts/jobs/JobManager.ts
Normal file
358
ts/jobs/JobManager.ts
Normal file
|
@ -0,0 +1,358 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import * as z from 'zod';
|
||||
import { MINUTE } from '../util/durations';
|
||||
import {
|
||||
explodePromise,
|
||||
type ExplodePromiseResultType,
|
||||
} from '../util/explodePromise';
|
||||
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
|
||||
import { drop } from '../util/drop';
|
||||
import * as log from '../logging/log';
|
||||
import { missingCaseError } from '../util/missingCaseError';
|
||||
import {
|
||||
type ExponentialBackoffOptionsType,
|
||||
exponentialBackoffSleepTime,
|
||||
} from '../util/exponentialBackoff';
|
||||
import * as Errors from '../types/errors';
|
||||
|
||||
export type JobManagerJobType = {
|
||||
active: boolean;
|
||||
attempts: number;
|
||||
retryAfter: number | null;
|
||||
lastAttemptTimestamp: number | null;
|
||||
};
|
||||
|
||||
export const jobManagerJobSchema = z.object({
|
||||
attempts: z.number(),
|
||||
active: z.boolean(),
|
||||
retryAfter: z.number().nullable(),
|
||||
lastAttemptTimestamp: z.number().nullable(),
|
||||
}) satisfies z.ZodType<JobManagerJobType>;
|
||||
|
||||
export type JobManagerParamsType<
|
||||
CoreJobType,
|
||||
JobType = CoreJobType & JobManagerJobType
|
||||
> = {
|
||||
markAllJobsInactive: () => Promise<void>;
|
||||
getNextJobs: (options: {
|
||||
limit: number;
|
||||
timestamp: number;
|
||||
}) => Promise<Array<JobType>>;
|
||||
saveJob: (job: JobType) => Promise<void>;
|
||||
removeJob: (job: JobType) => Promise<void>;
|
||||
runJob: (
|
||||
job: JobType,
|
||||
isLastAttempt: boolean
|
||||
) => Promise<JobManagerJobResultType>;
|
||||
shouldHoldOffOnStartingQueuedJobs?: () => boolean;
|
||||
getJobId: (job: CoreJobType) => string;
|
||||
getJobIdForLogging: (job: JobType) => string;
|
||||
getRetryConfig: (job: JobType) => {
|
||||
maxAttempts: number;
|
||||
backoffConfig: ExponentialBackoffOptionsType;
|
||||
};
|
||||
maxConcurrentJobs: number;
|
||||
};
|
||||
|
||||
export type JobManagerJobResultType = { status: 'retry' | 'finished' };
|
||||
|
||||
const TICK_INTERVAL = MINUTE;
|
||||
|
||||
export abstract class JobManager<CoreJobType> {
|
||||
protected enabled: boolean = false;
|
||||
protected activeJobs: Map<
|
||||
string,
|
||||
{
|
||||
completionPromise: ExplodePromiseResultType<void>;
|
||||
job: CoreJobType & JobManagerJobType;
|
||||
}
|
||||
> = new Map();
|
||||
protected jobStartPromises: Map<string, ExplodePromiseResultType<void>> =
|
||||
new Map();
|
||||
protected jobCompletePromises: Map<string, ExplodePromiseResultType<void>> =
|
||||
new Map();
|
||||
|
||||
protected tickTimeout: NodeJS.Timeout | null = null;
|
||||
protected logPrefix = 'JobManager';
|
||||
|
||||
constructor(readonly params: JobManagerParamsType<CoreJobType>) {}
|
||||
|
||||
async start(): Promise<void> {
|
||||
this.enabled = true;
|
||||
await this.params.markAllJobsInactive();
|
||||
this.tick();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.enabled = false;
|
||||
clearTimeoutIfNecessary(this.tickTimeout);
|
||||
this.tickTimeout = null;
|
||||
await Promise.all(
|
||||
[...this.activeJobs.values()].map(
|
||||
({ completionPromise }) => completionPromise.promise
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
tick(): void {
|
||||
clearTimeoutIfNecessary(this.tickTimeout);
|
||||
this.tickTimeout = null;
|
||||
drop(this.maybeStartJobs());
|
||||
this.tickTimeout = setTimeout(() => this.tick(), TICK_INTERVAL);
|
||||
}
|
||||
|
||||
// used in testing
|
||||
waitForJobToBeStarted(
|
||||
job: CoreJobType & Pick<JobManagerJobType, 'attempts'>
|
||||
): Promise<void> {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
const existingPromise = this.jobStartPromises.get(id)?.promise;
|
||||
if (existingPromise) {
|
||||
return existingPromise;
|
||||
}
|
||||
const { promise, resolve, reject } = explodePromise<void>();
|
||||
this.jobStartPromises.set(id, { promise, resolve, reject });
|
||||
return promise;
|
||||
}
|
||||
|
||||
waitForJobToBeCompleted(
|
||||
job: CoreJobType & Pick<JobManagerJobType, 'attempts'>
|
||||
): Promise<void> {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
const existingPromise = this.jobCompletePromises.get(id)?.promise;
|
||||
if (existingPromise) {
|
||||
return existingPromise;
|
||||
}
|
||||
const { promise, resolve, reject } = explodePromise<void>();
|
||||
this.jobCompletePromises.set(id, { promise, resolve, reject });
|
||||
return promise;
|
||||
}
|
||||
|
||||
async addJob(newJob: CoreJobType): Promise<void> {
|
||||
await this._addJob(newJob);
|
||||
}
|
||||
|
||||
// Protected methods
|
||||
protected async _addJob(
|
||||
newJob: CoreJobType,
|
||||
options?: { forceStart: boolean }
|
||||
): Promise<{ isAlreadyRunning: boolean }> {
|
||||
const job: CoreJobType & JobManagerJobType = {
|
||||
...newJob,
|
||||
attempts: 0,
|
||||
retryAfter: null,
|
||||
lastAttemptTimestamp: null,
|
||||
active: false,
|
||||
};
|
||||
const logId = this.params.getJobIdForLogging(job);
|
||||
try {
|
||||
const runningJob = this.getRunningJob(job);
|
||||
if (runningJob) {
|
||||
log.info(`${logId}: already running; resetting attempts`);
|
||||
runningJob.attempts = 0;
|
||||
|
||||
await this.params.saveJob({
|
||||
...runningJob,
|
||||
attempts: 0,
|
||||
});
|
||||
|
||||
return { isAlreadyRunning: true };
|
||||
}
|
||||
|
||||
await this.params.saveJob(job);
|
||||
|
||||
if (options?.forceStart) {
|
||||
if (!this.enabled) {
|
||||
log.warn(
|
||||
`${logId}: added but jobManager not enabled, can't start immediately`
|
||||
);
|
||||
} else {
|
||||
log.info(`${logId}: starting job immediately`);
|
||||
drop(this.startJob(job));
|
||||
}
|
||||
} else if (this.enabled) {
|
||||
drop(this.maybeStartJobs());
|
||||
}
|
||||
|
||||
return { isAlreadyRunning: false };
|
||||
} catch (e) {
|
||||
log.error(`${logId}: error saving job`, Errors.toLogFormat(e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// maybeStartJobs is called:
|
||||
// 1. every minute (via tick)
|
||||
// 2. after a job is added (via addJob)
|
||||
// 3. after a job finishes (via startJob)
|
||||
// preventing re-entrancy allow us to simplify some logic and ensure we don't try to
|
||||
// start too many jobs
|
||||
private _inMaybeStartJobs = false;
|
||||
protected async maybeStartJobs(): Promise<void> {
|
||||
if (this._inMaybeStartJobs) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this._inMaybeStartJobs = true;
|
||||
|
||||
if (!this.enabled) {
|
||||
log.info(`${this.logPrefix}/_maybeStartJobs: not enabled, returning`);
|
||||
return;
|
||||
}
|
||||
|
||||
const numJobsToStart = this.getMaximumNumberOfJobsToStart();
|
||||
|
||||
if (numJobsToStart <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nextJobs = await this.params.getNextJobs({
|
||||
limit: numJobsToStart,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
if (nextJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.params.shouldHoldOffOnStartingQueuedJobs?.()) {
|
||||
log.info(
|
||||
`${this.logPrefix}/_maybeStartJobs: holding off on starting ${nextJobs.length} new job(s)`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const job of nextJobs) {
|
||||
drop(this.startJob(job));
|
||||
}
|
||||
} finally {
|
||||
this._inMaybeStartJobs = false;
|
||||
}
|
||||
}
|
||||
|
||||
protected async startJob(
|
||||
job: CoreJobType & JobManagerJobType
|
||||
): Promise<void> {
|
||||
const logId = `${this.logPrefix}/startJob(${this.params.getJobIdForLogging(
|
||||
job
|
||||
)})`;
|
||||
if (this.isJobRunning(job)) {
|
||||
log.info(`${logId}: job is already running`);
|
||||
return;
|
||||
}
|
||||
|
||||
const isLastAttempt =
|
||||
job.attempts + 1 >=
|
||||
(this.params.getRetryConfig(job).maxAttempts ?? Infinity);
|
||||
|
||||
try {
|
||||
log.info(`${logId}: starting job`);
|
||||
this.addRunningJob(job);
|
||||
await this.params.saveJob({ ...job, active: true });
|
||||
this.handleJobStartPromises(job);
|
||||
|
||||
const { status } = await this.params.runJob(job, isLastAttempt);
|
||||
log.info(`${logId}: job completed with status: ${status}`);
|
||||
|
||||
switch (status) {
|
||||
case 'finished':
|
||||
await this.params.removeJob(job);
|
||||
return;
|
||||
case 'retry':
|
||||
if (isLastAttempt) {
|
||||
throw new Error('Cannot retry on last attempt');
|
||||
}
|
||||
await this.retryJobLater(job);
|
||||
return;
|
||||
default:
|
||||
throw missingCaseError(status);
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(`${logId}: error when running job`, e);
|
||||
if (isLastAttempt) {
|
||||
await this.params.removeJob(job);
|
||||
} else {
|
||||
await this.retryJobLater(job);
|
||||
}
|
||||
} finally {
|
||||
this.removeRunningJob(job);
|
||||
drop(this.maybeStartJobs());
|
||||
}
|
||||
}
|
||||
|
||||
private async retryJobLater(job: CoreJobType & JobManagerJobType) {
|
||||
const now = Date.now();
|
||||
|
||||
await this.params.saveJob({
|
||||
...job,
|
||||
active: false,
|
||||
attempts: job.attempts + 1,
|
||||
retryAfter:
|
||||
now +
|
||||
exponentialBackoffSleepTime(
|
||||
job.attempts + 1,
|
||||
this.params.getRetryConfig(job).backoffConfig
|
||||
),
|
||||
lastAttemptTimestamp: now,
|
||||
});
|
||||
}
|
||||
|
||||
private getActiveJobCount(): number {
|
||||
return this.activeJobs.size;
|
||||
}
|
||||
|
||||
private getMaximumNumberOfJobsToStart(): number {
|
||||
return Math.max(
|
||||
0,
|
||||
this.params.maxConcurrentJobs - this.getActiveJobCount()
|
||||
);
|
||||
}
|
||||
|
||||
private getRunningJob(
|
||||
job: CoreJobType & JobManagerJobType
|
||||
): (CoreJobType & JobManagerJobType) | undefined {
|
||||
const id = this.params.getJobId(job);
|
||||
return this.activeJobs.get(id)?.job;
|
||||
}
|
||||
|
||||
private isJobRunning(job: CoreJobType & JobManagerJobType): boolean {
|
||||
return Boolean(this.getRunningJob(job));
|
||||
}
|
||||
|
||||
private removeRunningJob(job: CoreJobType & JobManagerJobType) {
|
||||
const idWithAttempts = this.getJobIdIncludingAttempts(job);
|
||||
this.jobCompletePromises.get(idWithAttempts)?.resolve();
|
||||
this.jobCompletePromises.delete(idWithAttempts);
|
||||
|
||||
const id = this.params.getJobId(job);
|
||||
this.activeJobs.get(id)?.completionPromise.resolve();
|
||||
this.activeJobs.delete(id);
|
||||
}
|
||||
|
||||
private addRunningJob(job: CoreJobType & JobManagerJobType) {
|
||||
if (this.isJobRunning(job)) {
|
||||
const jobIdForLogging = this.params.getJobIdForLogging(job);
|
||||
log.warn(
|
||||
`${this.logPrefix}/addRunningJob: job ${jobIdForLogging} is already running`
|
||||
);
|
||||
}
|
||||
this.activeJobs.set(this.params.getJobId(job), {
|
||||
completionPromise: explodePromise<void>(),
|
||||
job,
|
||||
});
|
||||
}
|
||||
|
||||
private handleJobStartPromises(job: CoreJobType & JobManagerJobType) {
|
||||
const id = this.getJobIdIncludingAttempts(job);
|
||||
this.jobStartPromises.get(id)?.resolve();
|
||||
this.jobStartPromises.delete(id);
|
||||
}
|
||||
|
||||
private getJobIdIncludingAttempts(
|
||||
job: CoreJobType & Pick<JobManagerJobType, 'attempts'>
|
||||
) {
|
||||
return `${this.params.getJobId(job)}.${job.attempts}`;
|
||||
}
|
||||
}
|
|
@ -92,7 +92,13 @@ import {
|
|||
isGIF,
|
||||
isDownloaded,
|
||||
} from '../../types/Attachment';
|
||||
import { convertAttachmentToFilePointer } from './util/filePointers';
|
||||
import {
|
||||
getFilePointerForAttachment,
|
||||
maybeGetBackupJobForAttachmentAndFilePointer,
|
||||
} from './util/filePointers';
|
||||
import type { CoreAttachmentBackupJobType } from '../../types/AttachmentBackup';
|
||||
import { AttachmentBackupManager } from '../../jobs/AttachmentBackupManager';
|
||||
import { getBackupCdnInfo } from './util/mediaId';
|
||||
|
||||
const MAX_CONCURRENCY = 10;
|
||||
|
||||
|
@ -155,6 +161,7 @@ type NonBubbleResultType = Readonly<
|
|||
export class BackupExportStream extends Readable {
|
||||
private readonly backupTimeMs = getSafeLongFromTimestamp(Date.now());
|
||||
private readonly convoIdToRecipientId = new Map<string, number>();
|
||||
private attachmentBackupJobs: Array<CoreAttachmentBackupJobType> = [];
|
||||
private buffers = new Array<Uint8Array>();
|
||||
private nextRecipientId = 0;
|
||||
private flushResolve: (() => void) | undefined;
|
||||
|
@ -163,6 +170,7 @@ export class BackupExportStream extends Readable {
|
|||
drop(
|
||||
(async () => {
|
||||
log.info('BackupExportStream: starting...');
|
||||
|
||||
await Data.pauseWriteAccess();
|
||||
try {
|
||||
await this.unsafeRun(backupLevel);
|
||||
|
@ -170,6 +178,12 @@ export class BackupExportStream extends Readable {
|
|||
this.emit('error', error);
|
||||
} finally {
|
||||
await Data.resumeWriteAccess();
|
||||
await Promise.all(
|
||||
this.attachmentBackupJobs.map(job =>
|
||||
AttachmentBackupManager.addJob(job)
|
||||
)
|
||||
);
|
||||
drop(AttachmentBackupManager.start());
|
||||
log.info('BackupExportStream: finished');
|
||||
}
|
||||
})()
|
||||
|
@ -356,7 +370,10 @@ export class BackupExportStream extends Readable {
|
|||
|
||||
await this.flush();
|
||||
|
||||
log.warn('backups: final stats', stats);
|
||||
log.warn('backups: final stats', {
|
||||
...stats,
|
||||
attachmentBackupJobs: this.attachmentBackupJobs.length,
|
||||
});
|
||||
|
||||
this.push(null);
|
||||
}
|
||||
|
@ -788,6 +805,7 @@ export class BackupExportStream extends Readable {
|
|||
return this.processMessageAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt: message.received_at,
|
||||
});
|
||||
})
|
||||
)
|
||||
|
@ -1693,13 +1711,16 @@ export class BackupExportStream extends Readable {
|
|||
private async processMessageAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt,
|
||||
}: {
|
||||
attachment: AttachmentType;
|
||||
backupLevel: BackupLevel;
|
||||
messageReceivedAt: number;
|
||||
}): Promise<Backups.MessageAttachment> {
|
||||
const filePointer = await this.processAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt,
|
||||
});
|
||||
|
||||
return new Backups.MessageAttachment({
|
||||
|
@ -1712,18 +1733,35 @@ export class BackupExportStream extends Readable {
|
|||
private async processAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt,
|
||||
}: {
|
||||
attachment: AttachmentType;
|
||||
backupLevel: BackupLevel;
|
||||
messageReceivedAt: number;
|
||||
}): Promise<Backups.FilePointer> {
|
||||
const filePointer = await convertAttachmentToFilePointer({
|
||||
attachment,
|
||||
backupLevel,
|
||||
// TODO (DESKTOP-6983) -- Retrieve & save backup tier media list
|
||||
getBackupTierInfo: () => ({
|
||||
isInBackupTier: false,
|
||||
}),
|
||||
const { filePointer, updatedAttachment } =
|
||||
await getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
getBackupCdnInfo,
|
||||
});
|
||||
|
||||
if (updatedAttachment) {
|
||||
// TODO (DESKTOP-6688): ensure that we update the message/attachment in DB with the
|
||||
// new keys so that we don't try to re-upload it again on the next export
|
||||
}
|
||||
|
||||
const backupJob = await maybeGetBackupJobForAttachmentAndFilePointer({
|
||||
attachment: updatedAttachment ?? attachment,
|
||||
filePointer,
|
||||
getBackupCdnInfo,
|
||||
messageReceivedAt,
|
||||
});
|
||||
|
||||
if (backupJob) {
|
||||
this.attachmentBackupJobs.push(backupJob);
|
||||
}
|
||||
|
||||
return filePointer;
|
||||
}
|
||||
|
||||
|
|
|
@ -187,6 +187,51 @@ export class BackupsService {
|
|||
}
|
||||
}
|
||||
|
||||
public async fetchAndSaveBackupCdnObjectMetadata(): Promise<void> {
|
||||
log.info('fetchAndSaveBackupCdnObjectMetadata: clearing existing metadata');
|
||||
await window.Signal.Data.clearAllBackupCdnObjectMetadata();
|
||||
|
||||
let cursor: string | undefined;
|
||||
const PAGE_SIZE = 1000;
|
||||
let numObjects = 0;
|
||||
do {
|
||||
log.info('fetchAndSaveBackupCdnObjectMetadata: fetching next page');
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const listResult = await this.api.listMedia({ cursor, limit: PAGE_SIZE });
|
||||
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await window.Signal.Data.saveBackupCdnObjectMetadata(
|
||||
listResult.storedMediaObjects.map(object => ({
|
||||
mediaId: object.mediaId,
|
||||
cdnNumber: object.cdn,
|
||||
sizeOnBackupCdn: object.objectLength,
|
||||
}))
|
||||
);
|
||||
numObjects += listResult.storedMediaObjects.length;
|
||||
|
||||
cursor = listResult.cursor ?? undefined;
|
||||
} while (cursor);
|
||||
|
||||
log.info(
|
||||
`fetchAndSaveBackupCdnObjectMetadata: finished fetching metadata for ${numObjects} objects`
|
||||
);
|
||||
}
|
||||
|
||||
public async getBackupCdnInfo(
|
||||
mediaId: string
|
||||
): Promise<
|
||||
{ isInBackupTier: true; cdnNumber: number } | { isInBackupTier: false }
|
||||
> {
|
||||
const storedInfo = await window.Signal.Data.getBackupCdnObjectMetadata(
|
||||
mediaId
|
||||
);
|
||||
if (!storedInfo) {
|
||||
return { isInBackupTier: false };
|
||||
}
|
||||
|
||||
return { isInBackupTier: true, cdnNumber: storedInfo.cdnNumber };
|
||||
}
|
||||
|
||||
private async exportBackup(
|
||||
sink: Writable,
|
||||
backupLevel: BackupLevel = BackupLevel.Messages
|
||||
|
@ -197,9 +242,17 @@ export class BackupsService {
|
|||
this.isRunning = true;
|
||||
|
||||
try {
|
||||
const { aesKey, macKey } = getKeyMaterial();
|
||||
// TODO (DESKTOP-7168): Update mock-server to support this endpoint
|
||||
if (!window.SignalCI) {
|
||||
// We first fetch the latest info on what's on the CDN, since this affects the
|
||||
// filePointers we will generate during export
|
||||
log.info('Fetching latest backup CDN metadata');
|
||||
await this.fetchAndSaveBackupCdnObjectMetadata();
|
||||
}
|
||||
|
||||
const { aesKey, macKey } = getKeyMaterial();
|
||||
const recordStream = new BackupExportStream();
|
||||
|
||||
recordStream.run(backupLevel);
|
||||
|
||||
const iv = randomBytes(IV_LENGTH);
|
||||
|
|
|
@ -7,6 +7,7 @@ import {
|
|||
APPLICATION_OCTET_STREAM,
|
||||
stringToMIMEType,
|
||||
} from '../../../types/MIME';
|
||||
import * as log from '../../../logging/log';
|
||||
import {
|
||||
type AttachmentType,
|
||||
isDownloadableFromTransitTier,
|
||||
|
@ -16,13 +17,25 @@ import {
|
|||
type AttachmentDownloadableFromBackupTier,
|
||||
type LocallySavedAttachment,
|
||||
type AttachmentReadyForBackup,
|
||||
isDecryptable,
|
||||
isReencryptableToSameDigest,
|
||||
} from '../../../types/Attachment';
|
||||
import { Backups } from '../../../protobuf';
|
||||
import * as Bytes from '../../../Bytes';
|
||||
import { getTimestampFromLong } from '../../../util/timestampLongUtils';
|
||||
import { getRandomBytes } from '../../../Crypto';
|
||||
import { encryptAttachmentV2 } from '../../../AttachmentCrypto';
|
||||
import {
|
||||
encryptAttachmentV2,
|
||||
generateAttachmentKeys,
|
||||
} from '../../../AttachmentCrypto';
|
||||
import { strictAssert } from '../../../util/assert';
|
||||
import type { CoreAttachmentBackupJobType } from '../../../types/AttachmentBackup';
|
||||
import {
|
||||
type GetBackupCdnInfoType,
|
||||
getMediaIdForAttachment,
|
||||
getMediaIdFromMediaName,
|
||||
getMediaNameForAttachment,
|
||||
} from './mediaId';
|
||||
import { redactGenericText } from '../../../util/privacy';
|
||||
|
||||
export function convertFilePointerToAttachment(
|
||||
filePointer: Backups.FilePointer
|
||||
|
@ -116,54 +129,56 @@ export function convertFilePointerToAttachment(
|
|||
* export-creation time) to calculate the digest which will be saved in the backup proto
|
||||
* along with the new keys.
|
||||
*/
|
||||
async function fixupAttachmentForBackup(
|
||||
attachment: LocallySavedAttachment
|
||||
|
||||
async function generateNewEncryptionInfoForAttachment(
|
||||
attachment: Readonly<LocallySavedAttachment>
|
||||
): Promise<AttachmentReadyForBackup> {
|
||||
const fixedUpAttachment = { ...attachment };
|
||||
const keyToUse = attachment.key ?? Bytes.toBase64(getRandomBytes(64));
|
||||
let digestToUse = attachment.key ? attachment.digest : undefined;
|
||||
|
||||
if (!digestToUse) {
|
||||
// Delete current locators for the attachment; we can no longer use them and will need
|
||||
// to fully re-encrypt and upload
|
||||
delete fixedUpAttachment.cdnId;
|
||||
delete fixedUpAttachment.cdnKey;
|
||||
delete fixedUpAttachment.cdnNumber;
|
||||
// Since we are changing the encryption, we need to delete all encryption & location
|
||||
// related info
|
||||
delete fixedUpAttachment.cdnId;
|
||||
delete fixedUpAttachment.cdnKey;
|
||||
delete fixedUpAttachment.cdnNumber;
|
||||
delete fixedUpAttachment.backupLocator;
|
||||
delete fixedUpAttachment.uploadTimestamp;
|
||||
delete fixedUpAttachment.digest;
|
||||
delete fixedUpAttachment.iv;
|
||||
delete fixedUpAttachment.key;
|
||||
|
||||
// encrypt this file in memory in order to calculate the digest
|
||||
const { digest } = await encryptAttachmentV2({
|
||||
keys: Bytes.fromBase64(keyToUse),
|
||||
plaintext: {
|
||||
absolutePath: window.Signal.Migrations.getAbsoluteAttachmentPath(
|
||||
attachment.path
|
||||
),
|
||||
},
|
||||
});
|
||||
const keys = generateAttachmentKeys();
|
||||
|
||||
digestToUse = Bytes.toBase64(digest);
|
||||
|
||||
// TODO (DESKTOP-6688): ensure that we update the message/attachment in DB with the
|
||||
// new keys so that we don't try to re-upload it again on the next export
|
||||
}
|
||||
// encrypt this file without writing the ciphertext to disk in order to calculate the
|
||||
// digest
|
||||
const { digest, iv } = await encryptAttachmentV2({
|
||||
keys,
|
||||
plaintext: {
|
||||
absolutePath: window.Signal.Migrations.getAbsoluteAttachmentPath(
|
||||
attachment.path
|
||||
),
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
...fixedUpAttachment,
|
||||
key: keyToUse,
|
||||
digest: digestToUse,
|
||||
digest: Bytes.toBase64(digest),
|
||||
iv: Bytes.toBase64(iv),
|
||||
key: Bytes.toBase64(keys),
|
||||
};
|
||||
}
|
||||
|
||||
export async function convertAttachmentToFilePointer({
|
||||
export async function getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
getBackupTierInfo,
|
||||
getBackupCdnInfo,
|
||||
}: {
|
||||
attachment: AttachmentType;
|
||||
backupLevel: BackupLevel;
|
||||
getBackupTierInfo: (
|
||||
mediaName: string
|
||||
) => { isInBackupTier: true; cdnNumber: number } | { isInBackupTier: false };
|
||||
}): Promise<Backups.FilePointer> {
|
||||
getBackupCdnInfo: GetBackupCdnInfoType;
|
||||
}): Promise<{
|
||||
filePointer: Backups.FilePointer;
|
||||
updatedAttachment?: AttachmentType;
|
||||
}> {
|
||||
const filePointerRootProps = new Backups.FilePointer({
|
||||
contentType: attachment.contentType,
|
||||
incrementalMac: attachment.incrementalMac
|
||||
|
@ -176,6 +191,9 @@ export async function convertAttachmentToFilePointer({
|
|||
caption: attachment.caption,
|
||||
blurHash: attachment.blurHash,
|
||||
});
|
||||
const logId = `getFilePointerForAttachment(${redactGenericText(
|
||||
attachment.digest ?? ''
|
||||
)})`;
|
||||
|
||||
if (!isAttachmentLocallySaved(attachment)) {
|
||||
// 1. If the attachment is undownloaded, we cannot trust its digest / mediaName. Thus,
|
||||
|
@ -186,70 +204,110 @@ export async function convertAttachmentToFilePointer({
|
|||
isDownloadableFromBackupTier(attachment) &&
|
||||
backupLevel === BackupLevel.Media
|
||||
) {
|
||||
return new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
backupLocator: getBackupLocator(attachment),
|
||||
});
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
backupLocator: getBackupLocator(attachment),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
// 2. Otherwise, we only return the transit CDN info via AttachmentLocator
|
||||
if (isDownloadableFromTransitTier(attachment)) {
|
||||
return new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
attachmentLocator: getAttachmentLocator(attachment),
|
||||
});
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
attachmentLocator: getAttachmentLocator(attachment),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
// 3. Otherwise, we don't have the attachment, and we don't have info to download it
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
invalidAttachmentLocator: getInvalidAttachmentLocator(),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
// The attachment is locally saved
|
||||
if (backupLevel !== BackupLevel.Media) {
|
||||
// 1. If we have information to donwnload the file from the transit tier, great, let's
|
||||
// just create an attachmentLocator so the restorer can try to download from the
|
||||
// transit tier
|
||||
if (isDownloadableFromTransitTier(attachment)) {
|
||||
return new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
attachmentLocator: getAttachmentLocator(attachment),
|
||||
});
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
attachmentLocator: getAttachmentLocator(attachment),
|
||||
}),
|
||||
};
|
||||
}
|
||||
return new Backups.FilePointer({
|
||||
|
||||
// 2. Otherwise, we have the attachment locally, but we don't have information to put
|
||||
// in the backup proto to allow the restorer to download it. (This shouldn't
|
||||
// happen!)
|
||||
log.warn(
|
||||
`${logId}: Attachment is downloaded but we lack information to decrypt it`
|
||||
);
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
invalidAttachmentLocator: getInvalidAttachmentLocator(),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
// From here on, this attachment is headed to (or already on) the backup tier!
|
||||
const mediaNameForCurrentVersionOfAttachment =
|
||||
getMediaNameForAttachment(attachment);
|
||||
|
||||
const backupCdnInfo = await getBackupCdnInfo(
|
||||
getMediaIdFromMediaName(mediaNameForCurrentVersionOfAttachment).string
|
||||
);
|
||||
|
||||
// We can generate a backupLocator for this mediaName iff
|
||||
// 1. we have iv, key, and digest so we can re-encrypt to the existing digest when
|
||||
// uploading, or
|
||||
// 2. the mediaId is already in the backup tier and we have the key & digest to decrypt
|
||||
// and verify it
|
||||
if (
|
||||
isReencryptableToSameDigest(attachment) ||
|
||||
(backupCdnInfo.isInBackupTier && isDecryptable(attachment))
|
||||
) {
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
backupLocator: getBackupLocator({
|
||||
...attachment,
|
||||
backupLocator: {
|
||||
mediaName: mediaNameForCurrentVersionOfAttachment,
|
||||
cdnNumber: backupCdnInfo.isInBackupTier
|
||||
? backupCdnInfo.cdnNumber
|
||||
: undefined,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
log.info(`${logId}: Generating new encryption info for attachment`);
|
||||
const attachmentWithNewEncryptionInfo =
|
||||
await generateNewEncryptionInfoForAttachment(attachment);
|
||||
return {
|
||||
filePointer: new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
invalidAttachmentLocator: getInvalidAttachmentLocator(),
|
||||
});
|
||||
}
|
||||
|
||||
if (!isAttachmentLocallySaved(attachment)) {
|
||||
return new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
invalidAttachmentLocator: getInvalidAttachmentLocator(),
|
||||
});
|
||||
}
|
||||
|
||||
const attachmentForBackup = await fixupAttachmentForBackup(attachment);
|
||||
const mediaName = getMediaNameForAttachment(attachmentForBackup);
|
||||
|
||||
const backupTierInfo = getBackupTierInfo(mediaName);
|
||||
let cdnNumberInBackupTier: number | undefined;
|
||||
if (backupTierInfo.isInBackupTier) {
|
||||
cdnNumberInBackupTier = backupTierInfo.cdnNumber;
|
||||
}
|
||||
|
||||
return new Backups.FilePointer({
|
||||
...filePointerRootProps,
|
||||
backupLocator: getBackupLocator({
|
||||
...attachmentForBackup,
|
||||
backupLocator: {
|
||||
mediaName,
|
||||
cdnNumber: cdnNumberInBackupTier,
|
||||
},
|
||||
backupLocator: getBackupLocator({
|
||||
...attachmentWithNewEncryptionInfo,
|
||||
backupLocator: {
|
||||
mediaName: getMediaNameForAttachment(attachmentWithNewEncryptionInfo),
|
||||
cdnNumber: undefined,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
export function getMediaNameForAttachment(attachment: AttachmentType): string {
|
||||
strictAssert(attachment.digest, 'Digest must be present');
|
||||
return attachment.digest;
|
||||
}
|
||||
|
||||
// mediaId is special in that it is encoded in base64url
|
||||
export function getBytesFromMediaId(mediaId: string): Uint8Array {
|
||||
return Bytes.fromBase64url(mediaId);
|
||||
updatedAttachment: attachmentWithNewEncryptionInfo,
|
||||
};
|
||||
}
|
||||
|
||||
function getAttachmentLocator(
|
||||
|
@ -282,3 +340,74 @@ function getBackupLocator(attachment: AttachmentDownloadableFromBackupTier) {
|
|||
function getInvalidAttachmentLocator() {
|
||||
return new Backups.FilePointer.InvalidAttachmentLocator();
|
||||
}
|
||||
|
||||
export async function maybeGetBackupJobForAttachmentAndFilePointer({
|
||||
attachment,
|
||||
filePointer,
|
||||
getBackupCdnInfo,
|
||||
messageReceivedAt,
|
||||
}: {
|
||||
attachment: AttachmentType;
|
||||
filePointer: Backups.FilePointer;
|
||||
getBackupCdnInfo: GetBackupCdnInfoType;
|
||||
messageReceivedAt: number;
|
||||
}): Promise<CoreAttachmentBackupJobType | null> {
|
||||
if (!filePointer.backupLocator) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const mediaName = getMediaNameForAttachment(attachment);
|
||||
strictAssert(mediaName, 'mediaName must exist');
|
||||
|
||||
const { isInBackupTier } = await getBackupCdnInfo(
|
||||
getMediaIdForAttachment(attachment).string
|
||||
);
|
||||
|
||||
if (isInBackupTier) {
|
||||
return null;
|
||||
}
|
||||
|
||||
strictAssert(
|
||||
isReencryptableToSameDigest(attachment),
|
||||
'Attachment must now have all required info for re-encryption'
|
||||
);
|
||||
|
||||
strictAssert(
|
||||
isAttachmentLocallySaved(attachment),
|
||||
'Attachment must be saved locally for it to be backed up'
|
||||
);
|
||||
|
||||
const {
|
||||
path,
|
||||
contentType,
|
||||
key: keys,
|
||||
digest,
|
||||
iv,
|
||||
size,
|
||||
cdnKey,
|
||||
cdnNumber,
|
||||
uploadTimestamp,
|
||||
} = attachment;
|
||||
|
||||
return {
|
||||
mediaName,
|
||||
receivedAt: messageReceivedAt,
|
||||
type: 'standard',
|
||||
data: {
|
||||
path,
|
||||
contentType,
|
||||
keys,
|
||||
digest,
|
||||
iv,
|
||||
size,
|
||||
transitCdnInfo:
|
||||
cdnKey && cdnNumber != null
|
||||
? {
|
||||
cdnKey,
|
||||
cdnNumber,
|
||||
uploadTimestamp,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
58
ts/services/backups/util/mediaId.ts
Normal file
58
ts/services/backups/util/mediaId.ts
Normal file
|
@ -0,0 +1,58 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import * as Bytes from '../../../Bytes';
|
||||
import { getBackupKey } from '../crypto';
|
||||
import type { AttachmentType } from '../../../types/Attachment';
|
||||
import { deriveMediaIdFromMediaName } from '../../../Crypto';
|
||||
import { strictAssert } from '../../../util/assert';
|
||||
|
||||
export function getMediaIdFromMediaName(mediaName: string): {
|
||||
string: string;
|
||||
bytes: Uint8Array;
|
||||
} {
|
||||
const mediaIdBytes = deriveMediaIdFromMediaName(getBackupKey(), mediaName);
|
||||
return {
|
||||
string: Bytes.toBase64url(mediaIdBytes),
|
||||
bytes: mediaIdBytes,
|
||||
};
|
||||
}
|
||||
|
||||
export function getMediaIdForAttachment(attachment: AttachmentType): {
|
||||
string: string;
|
||||
bytes: Uint8Array;
|
||||
} {
|
||||
const mediaName = getMediaNameForAttachment(attachment);
|
||||
return getMediaIdFromMediaName(mediaName);
|
||||
}
|
||||
|
||||
export function getMediaNameForAttachment(attachment: AttachmentType): string {
|
||||
if (attachment.backupLocator) {
|
||||
return attachment.backupLocator.mediaName;
|
||||
}
|
||||
strictAssert(attachment.digest, 'Digest must be present');
|
||||
return attachment.digest;
|
||||
}
|
||||
|
||||
export function getBytesFromMediaIdString(mediaId: string): Uint8Array {
|
||||
return Bytes.fromBase64url(mediaId);
|
||||
}
|
||||
|
||||
export type GetBackupCdnInfoType = (
|
||||
mediaId: string
|
||||
) => Promise<
|
||||
{ isInBackupTier: true; cdnNumber: number } | { isInBackupTier: false }
|
||||
>;
|
||||
|
||||
export const getBackupCdnInfo: GetBackupCdnInfoType = async (
|
||||
mediaId: string
|
||||
) => {
|
||||
const savedInfo = await window.Signal.Data.getBackupCdnObjectMetadata(
|
||||
mediaId
|
||||
);
|
||||
if (!savedInfo) {
|
||||
return { isInBackupTier: false };
|
||||
}
|
||||
|
||||
return { isInBackupTier: true, cdnNumber: savedInfo.cdnNumber };
|
||||
};
|
|
@ -33,6 +33,7 @@ import type { CallLinkStateType, CallLinkType } from '../types/CallLink';
|
|||
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
|
||||
import type { GroupSendEndorsementsData } from '../types/GroupSendEndorsements';
|
||||
import type { SyncTaskType } from '../util/syncTasks';
|
||||
import type { AttachmentBackupJobType } from '../types/AttachmentBackup';
|
||||
|
||||
export type AdjacentMessagesByConversationOptionsType = Readonly<{
|
||||
conversationId: string;
|
||||
|
@ -425,6 +426,12 @@ export type EditedMessageType = Readonly<{
|
|||
readStatus: MessageType['readStatus'];
|
||||
}>;
|
||||
|
||||
export type BackupCdnMediaObjectType = {
|
||||
mediaId: string;
|
||||
cdnNumber: number;
|
||||
sizeOnBackupCdn: number;
|
||||
};
|
||||
|
||||
export type DataInterface = {
|
||||
close: () => Promise<void>;
|
||||
pauseWriteAccess(): Promise<void>;
|
||||
|
@ -761,6 +768,22 @@ export type DataInterface = {
|
|||
job: AttachmentDownloadJobType
|
||||
) => Promise<void>;
|
||||
|
||||
getNextAttachmentBackupJobs: (options: {
|
||||
limit: number;
|
||||
timestamp?: number;
|
||||
}) => Promise<Array<AttachmentBackupJobType>>;
|
||||
saveAttachmentBackupJob: (job: AttachmentBackupJobType) => Promise<void>;
|
||||
markAllAttachmentBackupJobsInactive: () => Promise<void>;
|
||||
removeAttachmentBackupJob: (job: AttachmentBackupJobType) => Promise<void>;
|
||||
|
||||
clearAllBackupCdnObjectMetadata: () => Promise<void>;
|
||||
saveBackupCdnObjectMetadata: (
|
||||
mediaObjects: Array<BackupCdnMediaObjectType>
|
||||
) => Promise<void>;
|
||||
getBackupCdnObjectMetadata: (
|
||||
mediaId: string
|
||||
) => Promise<BackupCdnMediaObjectType | undefined>;
|
||||
|
||||
createOrUpdateStickerPack: (pack: StickerPackType) => Promise<void>;
|
||||
updateStickerPackStatus: (
|
||||
id: string,
|
||||
|
|
170
ts/sql/Server.ts
170
ts/sql/Server.ts
|
@ -144,6 +144,7 @@ import type {
|
|||
UnprocessedUpdateType,
|
||||
GetNearbyMessageFromDeletedSetOptionsType,
|
||||
StoredKyberPreKeyType,
|
||||
BackupCdnMediaObjectType,
|
||||
} from './Interface';
|
||||
import { SeenStatus } from '../MessageSeenStatus';
|
||||
import {
|
||||
|
@ -187,6 +188,11 @@ import {
|
|||
import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types';
|
||||
import type { SyncTaskType } from '../util/syncTasks';
|
||||
import { isMoreRecentThan } from '../util/timestamp';
|
||||
import {
|
||||
type AttachmentBackupJobType,
|
||||
attachmentBackupJobSchema,
|
||||
} from '../types/AttachmentBackup';
|
||||
import { redactGenericText } from '../util/privacy';
|
||||
|
||||
type ConversationRow = Readonly<{
|
||||
json: string;
|
||||
|
@ -384,6 +390,15 @@ const dataInterface: ServerInterface = {
|
|||
resetAttachmentDownloadActive,
|
||||
removeAttachmentDownloadJob,
|
||||
|
||||
getNextAttachmentBackupJobs,
|
||||
saveAttachmentBackupJob,
|
||||
markAllAttachmentBackupJobsInactive,
|
||||
removeAttachmentBackupJob,
|
||||
|
||||
clearAllBackupCdnObjectMetadata,
|
||||
saveBackupCdnObjectMetadata,
|
||||
getBackupCdnObjectMetadata,
|
||||
|
||||
createOrUpdateStickerPack,
|
||||
updateStickerPackStatus,
|
||||
updateStickerPackInfo,
|
||||
|
@ -4843,6 +4858,157 @@ async function removeAttachmentDownloadJob(
|
|||
return removeAttachmentDownloadJobSync(db, job);
|
||||
}
|
||||
|
||||
// Backup Attachments
|
||||
|
||||
async function markAllAttachmentBackupJobsInactive(): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
db.prepare<EmptyQuery>(
|
||||
`
|
||||
UPDATE attachment_backup_jobs
|
||||
SET active = 0;
|
||||
`
|
||||
).run();
|
||||
}
|
||||
|
||||
async function saveAttachmentBackupJob(
|
||||
job: AttachmentBackupJobType
|
||||
): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
|
||||
const [query, params] = sql`
|
||||
INSERT OR REPLACE INTO attachment_backup_jobs (
|
||||
active,
|
||||
attempts,
|
||||
data,
|
||||
lastAttemptTimestamp,
|
||||
mediaName,
|
||||
receivedAt,
|
||||
retryAfter,
|
||||
type
|
||||
) VALUES (
|
||||
${job.active ? 1 : 0},
|
||||
${job.attempts},
|
||||
${objectToJSON(job.data)},
|
||||
${job.lastAttemptTimestamp},
|
||||
${job.mediaName},
|
||||
${job.receivedAt},
|
||||
${job.retryAfter},
|
||||
${job.type}
|
||||
);
|
||||
`;
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
|
||||
async function getNextAttachmentBackupJobs({
|
||||
limit,
|
||||
timestamp = Date.now(),
|
||||
}: {
|
||||
limit: number;
|
||||
timestamp?: number;
|
||||
}): Promise<Array<AttachmentBackupJobType>> {
|
||||
const db = await getWritableInstance();
|
||||
|
||||
const [query, params] = sql`
|
||||
SELECT * FROM attachment_backup_jobs
|
||||
WHERE
|
||||
active = 0
|
||||
AND
|
||||
(retryAfter is NULL OR retryAfter <= ${timestamp})
|
||||
ORDER BY receivedAt DESC
|
||||
LIMIT ${limit}
|
||||
`;
|
||||
const rows = db.prepare(query).all(params);
|
||||
return rows
|
||||
.map(row => {
|
||||
const parseResult = attachmentBackupJobSchema.safeParse({
|
||||
...row,
|
||||
active: Boolean(row.active),
|
||||
data: jsonToObject(row.data),
|
||||
});
|
||||
if (!parseResult.success) {
|
||||
const redactedMediaName = redactGenericText(row.mediaName);
|
||||
logger.error(
|
||||
`getNextAttachmentBackupJobs: invalid data, removing. mediaName: ${redactedMediaName}`,
|
||||
Errors.toLogFormat(parseResult.error)
|
||||
);
|
||||
removeAttachmentBackupJobSync(db, { mediaName: row.mediaName });
|
||||
return null;
|
||||
}
|
||||
return parseResult.data;
|
||||
})
|
||||
.filter(isNotNil);
|
||||
}
|
||||
|
||||
async function removeAttachmentBackupJob(
|
||||
job: Pick<AttachmentBackupJobType, 'mediaName'>
|
||||
): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
return removeAttachmentBackupJobSync(db, job);
|
||||
}
|
||||
|
||||
function removeAttachmentBackupJobSync(
|
||||
db: Database,
|
||||
job: Pick<AttachmentBackupJobType, 'mediaName'>
|
||||
): void {
|
||||
const [query, params] = sql`
|
||||
DELETE FROM attachment_backup_jobs
|
||||
WHERE
|
||||
mediaName = ${job.mediaName};
|
||||
`;
|
||||
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
|
||||
// Attachments on backup CDN
|
||||
async function clearAllBackupCdnObjectMetadata(): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
db.prepare('DELETE FROM backup_cdn_object_metadata;').run();
|
||||
}
|
||||
|
||||
function saveBackupCdnObjectMetadataSync(
|
||||
db: Database,
|
||||
storedMediaObject: BackupCdnMediaObjectType
|
||||
) {
|
||||
const { mediaId, cdnNumber, sizeOnBackupCdn } = storedMediaObject;
|
||||
const [query, params] = sql`
|
||||
INSERT OR REPLACE INTO backup_cdn_object_metadata
|
||||
(
|
||||
mediaId,
|
||||
cdnNumber,
|
||||
sizeOnBackupCdn
|
||||
) VALUES (
|
||||
${mediaId},
|
||||
${cdnNumber},
|
||||
${sizeOnBackupCdn}
|
||||
);
|
||||
`;
|
||||
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
|
||||
async function saveBackupCdnObjectMetadata(
|
||||
storedMediaObjects: Array<BackupCdnMediaObjectType>
|
||||
): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
db.transaction(() => {
|
||||
for (const obj of storedMediaObjects) {
|
||||
saveBackupCdnObjectMetadataSync(db, obj);
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
async function getBackupCdnObjectMetadata(
|
||||
mediaId: string
|
||||
): Promise<BackupCdnMediaObjectType | undefined> {
|
||||
const db = getReadonlyInstance();
|
||||
const [
|
||||
query,
|
||||
params,
|
||||
] = sql`SELECT * from backup_cdn_object_metadata WHERE mediaId = ${mediaId}`;
|
||||
|
||||
return db.prepare(query).get(params);
|
||||
}
|
||||
|
||||
// Stickers
|
||||
|
||||
async function createOrUpdateStickerPack(pack: StickerPackType): Promise<void> {
|
||||
|
@ -6140,6 +6306,8 @@ async function removeAll(): Promise<void> {
|
|||
DROP TRIGGER messages_on_delete;
|
||||
|
||||
DELETE FROM attachment_downloads;
|
||||
DELETE FROM attachment_backup_jobs;
|
||||
DELETE FROM backup_cdn_object_metadata;
|
||||
DELETE FROM badgeImageFiles;
|
||||
DELETE FROM badges;
|
||||
DELETE FROM callLinks;
|
||||
|
@ -6200,6 +6368,8 @@ async function removeAllConfiguration(): Promise<void> {
|
|||
db.transaction(() => {
|
||||
db.exec(
|
||||
`
|
||||
DELETE FROM attachment_backup_jobs;
|
||||
DELETE FROM backup_cdn_object_metadata;
|
||||
DELETE FROM groupSendCombinedEndorsement;
|
||||
DELETE FROM groupSendMemberEndorsement;
|
||||
DELETE FROM identityKeys;
|
||||
|
|
55
ts/sql/migrations/1070-attachment-backup.ts
Normal file
55
ts/sql/migrations/1070-attachment-backup.ts
Normal file
|
@ -0,0 +1,55 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { Database } from '@signalapp/better-sqlite3';
|
||||
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
|
||||
export const version = 1070;
|
||||
|
||||
export function updateToSchemaVersion1070(
|
||||
currentVersion: number,
|
||||
db: Database,
|
||||
logger: LoggerType
|
||||
): void {
|
||||
if (currentVersion >= 1070) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.transaction(() => {
|
||||
db.exec(`
|
||||
CREATE TABLE attachment_backup_jobs (
|
||||
mediaName TEXT NOT NULL PRIMARY KEY,
|
||||
type TEXT NOT NULL,
|
||||
data TEXT NOT NULL,
|
||||
receivedAt INTEGER NOT NULL,
|
||||
|
||||
-- job manager fields
|
||||
attempts INTEGER NOT NULL,
|
||||
active INTEGER NOT NULL,
|
||||
retryAfter INTEGER,
|
||||
lastAttemptTimestamp INTEGER
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX attachment_backup_jobs_receivedAt
|
||||
ON attachment_backup_jobs (
|
||||
receivedAt
|
||||
);
|
||||
|
||||
CREATE INDEX attachment_backup_jobs_type_receivedAt
|
||||
ON attachment_backup_jobs (
|
||||
type, receivedAt
|
||||
);
|
||||
|
||||
CREATE TABLE backup_cdn_object_metadata (
|
||||
mediaId TEXT NOT NULL PRIMARY KEY,
|
||||
cdnNumber INTEGER NOT NULL,
|
||||
sizeOnBackupCdn INTEGER
|
||||
) STRICT;
|
||||
`);
|
||||
})();
|
||||
|
||||
db.pragma('user_version = 1070');
|
||||
|
||||
logger.info('updateToSchemaVersion1070: success!');
|
||||
}
|
|
@ -81,10 +81,11 @@ import { updateToSchemaVersion1020 } from './1020-self-merges';
|
|||
import { updateToSchemaVersion1030 } from './1030-unblock-event';
|
||||
import { updateToSchemaVersion1040 } from './1040-undownloaded-backed-up-media';
|
||||
import { updateToSchemaVersion1050 } from './1050-group-send-endorsements';
|
||||
import { updateToSchemaVersion1060 } from './1060-addressable-messages-and-sync-tasks';
|
||||
import {
|
||||
updateToSchemaVersion1060,
|
||||
updateToSchemaVersion1070,
|
||||
version as MAX_VERSION,
|
||||
} from './1060-addressable-messages-and-sync-tasks';
|
||||
} from './1070-attachment-backup';
|
||||
|
||||
function updateToSchemaVersion1(
|
||||
currentVersion: number,
|
||||
|
@ -2034,6 +2035,7 @@ export const SCHEMA_VERSIONS = [
|
|||
updateToSchemaVersion1040,
|
||||
updateToSchemaVersion1050,
|
||||
updateToSchemaVersion1060,
|
||||
updateToSchemaVersion1070,
|
||||
];
|
||||
|
||||
export class DBVersionFromFutureError extends Error {
|
||||
|
|
|
@ -30,15 +30,14 @@ describe('exponential backoff utilities', () => {
|
|||
const options = {
|
||||
maxBackoffTime: 10000,
|
||||
multiplier: 2,
|
||||
firstBackoffTime: 1000,
|
||||
firstBackoffs: [1000],
|
||||
};
|
||||
assert.strictEqual(exponentialBackoffSleepTime(1, options), 0);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(2, options), 1000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(3, options), 2000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(4, options), 4000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(5, options), 8000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(1, options), 1000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(2, options), 2000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(3, options), 4000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(4, options), 8000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(5, options), 10000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(6, options), 10000);
|
||||
assert.strictEqual(exponentialBackoffSleepTime(7, options), 10000);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import { assert } from 'chai';
|
|||
import { readFileSync, unlinkSync, writeFileSync } from 'fs';
|
||||
import { join } from 'path';
|
||||
|
||||
import { createCipheriv, randomBytes } from 'crypto';
|
||||
import { createCipheriv } from 'crypto';
|
||||
import * as log from '../logging/log';
|
||||
import * as Bytes from '../Bytes';
|
||||
import * as Curve from '../Curve';
|
||||
|
@ -38,13 +38,13 @@ import {
|
|||
} from '../Crypto';
|
||||
import {
|
||||
type HardcodedIVForEncryptionType,
|
||||
KEY_SET_LENGTH,
|
||||
_generateAttachmentIv,
|
||||
decryptAttachmentV2,
|
||||
encryptAttachmentV2ToDisk,
|
||||
getAesCbcCiphertextLength,
|
||||
getAttachmentCiphertextLength,
|
||||
splitKeys,
|
||||
generateAttachmentKeys,
|
||||
} from '../AttachmentCrypto';
|
||||
import { createTempDir, deleteTempDir } from '../updater/common';
|
||||
import { uuidToBytes, bytesToUuid } from '../util/uuidToBytes';
|
||||
|
@ -536,10 +536,6 @@ describe('Crypto', () => {
|
|||
const FILE_HASH = sha256(FILE_CONTENTS);
|
||||
let tempDir: string;
|
||||
|
||||
function generateAttachmentKeys(): Uint8Array {
|
||||
return randomBytes(KEY_SET_LENGTH);
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
tempDir = await createTempDir();
|
||||
});
|
||||
|
|
|
@ -7,13 +7,15 @@ import * as sinon from 'sinon';
|
|||
import { BackupLevel } from '@signalapp/libsignal-client/zkgroup';
|
||||
import { Backups } from '../../protobuf';
|
||||
import {
|
||||
convertAttachmentToFilePointer,
|
||||
getFilePointerForAttachment,
|
||||
convertFilePointerToAttachment,
|
||||
maybeGetBackupJobForAttachmentAndFilePointer,
|
||||
} from '../../services/backups/util/filePointers';
|
||||
import { APPLICATION_OCTET_STREAM, IMAGE_PNG } from '../../types/MIME';
|
||||
import * as Bytes from '../../Bytes';
|
||||
import type { AttachmentType } from '../../types/Attachment';
|
||||
import { strictAssert } from '../../util/assert';
|
||||
import type { GetBackupCdnInfoType } from '../../services/backups/util/mediaId';
|
||||
|
||||
describe('convertFilePointerToAttachment', () => {
|
||||
it('processes filepointer with attachmentLocator', () => {
|
||||
|
@ -167,6 +169,7 @@ function composeAttachment(
|
|||
path: 'path/to/file.png',
|
||||
key: 'key',
|
||||
digest: 'digest',
|
||||
iv: 'iv',
|
||||
width: 100,
|
||||
height: 100,
|
||||
blurHash: 'blurhash',
|
||||
|
@ -227,21 +230,30 @@ const filePointerWithInvalidLocator = new Backups.FilePointer({
|
|||
async function testAttachmentToFilePointer(
|
||||
attachment: AttachmentType,
|
||||
filePointer: Backups.FilePointer,
|
||||
options?: { backupLevel?: BackupLevel; backupCdnNumber?: number }
|
||||
options?: {
|
||||
backupLevel?: BackupLevel;
|
||||
backupCdnNumber?: number;
|
||||
updatedAttachment?: AttachmentType;
|
||||
}
|
||||
) {
|
||||
async function _doTest(withBackupLevel: BackupLevel) {
|
||||
assert.deepStrictEqual(
|
||||
await convertAttachmentToFilePointer({
|
||||
assert.deepEqual(
|
||||
await getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel: withBackupLevel,
|
||||
getBackupTierInfo: _mediaName => {
|
||||
getBackupCdnInfo: async _mediaId => {
|
||||
if (options?.backupCdnNumber != null) {
|
||||
return { isInBackupTier: true, cdnNumber: options.backupCdnNumber };
|
||||
}
|
||||
return { isInBackupTier: false };
|
||||
},
|
||||
}),
|
||||
filePointer
|
||||
{
|
||||
filePointer,
|
||||
...(options?.updatedAttachment
|
||||
? { updatedAttachment: options?.updatedAttachment }
|
||||
: {}),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -253,7 +265,11 @@ async function testAttachmentToFilePointer(
|
|||
}
|
||||
}
|
||||
|
||||
describe('convertAttachmentToFilePointer', () => {
|
||||
const notInBackupCdn: GetBackupCdnInfoType = async () => {
|
||||
return { isInBackupTier: false };
|
||||
};
|
||||
|
||||
describe('getFilePointerForAttachment', () => {
|
||||
describe('not downloaded locally', () => {
|
||||
const undownloadedAttachment = composeAttachment({ path: undefined });
|
||||
it('returns invalidAttachmentLocator if missing critical decryption info', async () => {
|
||||
|
@ -384,7 +400,7 @@ describe('convertAttachmentToFilePointer', () => {
|
|||
});
|
||||
});
|
||||
describe('BackupLevel.Media', () => {
|
||||
describe('if missing critical decryption info', () => {
|
||||
describe('if missing critical decryption / encryption info', () => {
|
||||
const FILE_PATH = join(__dirname, '../../../fixtures/ghost-kitty.mp4');
|
||||
|
||||
let sandbox: sinon.SinonSandbox;
|
||||
|
@ -405,14 +421,14 @@ describe('convertAttachmentToFilePointer', () => {
|
|||
sandbox.restore();
|
||||
});
|
||||
|
||||
it('generates new key & digest and removes existing CDN info', async () => {
|
||||
const result = await convertAttachmentToFilePointer({
|
||||
it('if missing key, generates new key & digest and removes existing CDN info', async () => {
|
||||
const { filePointer: result } = await getFilePointerForAttachment({
|
||||
attachment: {
|
||||
...downloadedAttachment,
|
||||
key: undefined,
|
||||
},
|
||||
backupLevel: BackupLevel.Media,
|
||||
getBackupTierInfo: () => ({ isInBackupTier: false }),
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
});
|
||||
const newKey = result.backupLocator?.key;
|
||||
const newDigest = result.backupLocator?.digest;
|
||||
|
@ -433,6 +449,53 @@ describe('convertAttachmentToFilePointer', () => {
|
|||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('if not on backup tier, and missing iv, regenerates encryption info', async () => {
|
||||
const { filePointer: result } = await getFilePointerForAttachment({
|
||||
attachment: {
|
||||
...downloadedAttachment,
|
||||
iv: undefined,
|
||||
},
|
||||
backupLevel: BackupLevel.Media,
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
});
|
||||
|
||||
const newKey = result.backupLocator?.key;
|
||||
const newDigest = result.backupLocator?.digest;
|
||||
|
||||
strictAssert(newDigest, 'must create new digest');
|
||||
assert.deepStrictEqual(
|
||||
result,
|
||||
new Backups.FilePointer({
|
||||
...filePointerWithBackupLocator,
|
||||
backupLocator: new Backups.FilePointer.BackupLocator({
|
||||
...defaultBackupLocator,
|
||||
key: newKey,
|
||||
digest: newDigest,
|
||||
mediaName: Bytes.toBase64(newDigest),
|
||||
transitCdnKey: undefined,
|
||||
transitCdnNumber: undefined,
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('if on backup tier, and not missing iv, does not regenerate encryption info', async () => {
|
||||
await testAttachmentToFilePointer(
|
||||
{
|
||||
...downloadedAttachment,
|
||||
iv: undefined,
|
||||
},
|
||||
new Backups.FilePointer({
|
||||
...filePointerWithBackupLocator,
|
||||
backupLocator: new Backups.FilePointer.BackupLocator({
|
||||
...defaultBackupLocator,
|
||||
cdnNumber: 12,
|
||||
}),
|
||||
}),
|
||||
{ backupLevel: BackupLevel.Media, backupCdnNumber: 12 }
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('returns BackupLocator, with cdnNumber if in backup tier already', async () => {
|
||||
|
@ -459,3 +522,80 @@ describe('convertAttachmentToFilePointer', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('getBackupJobForAttachmentAndFilePointer', async () => {
|
||||
const attachment = composeAttachment();
|
||||
|
||||
it('returns null if filePointer does not have backupLocator', async () => {
|
||||
const { filePointer } = await getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel: BackupLevel.Messages,
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
});
|
||||
assert.strictEqual(
|
||||
await maybeGetBackupJobForAttachmentAndFilePointer({
|
||||
attachment,
|
||||
filePointer,
|
||||
messageReceivedAt: 100,
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
}),
|
||||
null
|
||||
);
|
||||
});
|
||||
|
||||
it('returns job if filePointer does have backupLocator', async () => {
|
||||
const { filePointer, updatedAttachment } =
|
||||
await getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel: BackupLevel.Media,
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
});
|
||||
const attachmentToUse = updatedAttachment ?? attachment;
|
||||
assert.deepStrictEqual(
|
||||
await maybeGetBackupJobForAttachmentAndFilePointer({
|
||||
attachment: attachmentToUse,
|
||||
filePointer,
|
||||
messageReceivedAt: 100,
|
||||
getBackupCdnInfo: notInBackupCdn,
|
||||
}),
|
||||
{
|
||||
mediaName: 'digest',
|
||||
receivedAt: 100,
|
||||
type: 'standard',
|
||||
data: {
|
||||
path: 'path/to/file.png',
|
||||
contentType: IMAGE_PNG,
|
||||
keys: 'key',
|
||||
digest: 'digest',
|
||||
iv: 'iv',
|
||||
size: 100,
|
||||
transitCdnInfo: {
|
||||
cdnKey: 'cdnKey',
|
||||
cdnNumber: 2,
|
||||
uploadTimestamp: 1234,
|
||||
},
|
||||
},
|
||||
}
|
||||
);
|
||||
});
|
||||
it('does not return job if already in backup tier', async () => {
|
||||
const isInBackupTier = async () => ({
|
||||
isInBackupTier: true,
|
||||
cdnNumber: 42,
|
||||
});
|
||||
const { filePointer } = await getFilePointerForAttachment({
|
||||
attachment,
|
||||
backupLevel: BackupLevel.Media,
|
||||
getBackupCdnInfo: isInBackupTier,
|
||||
});
|
||||
assert.deepStrictEqual(
|
||||
await maybeGetBackupJobForAttachmentAndFilePointer({
|
||||
attachment,
|
||||
filePointer,
|
||||
messageReceivedAt: 100,
|
||||
getBackupCdnInfo: isInBackupTier,
|
||||
}),
|
||||
null
|
||||
);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -7,6 +7,7 @@ import { tmpdir } from 'os';
|
|||
import { sortBy } from 'lodash';
|
||||
import { createReadStream } from 'fs';
|
||||
import { mkdtemp, rm } from 'fs/promises';
|
||||
import * as sinon from 'sinon';
|
||||
|
||||
import type { MessageAttributesType } from '../../model-types';
|
||||
import type {
|
||||
|
@ -128,6 +129,10 @@ export async function asymmetricRoundtripHarness(
|
|||
after: Array<MessageAttributesType>
|
||||
): Promise<void> {
|
||||
const outDir = await mkdtemp(path.join(tmpdir(), 'signal-temp-'));
|
||||
const fetchAndSaveBackupCdnObjectMetadata = sinon.stub(
|
||||
backupsService,
|
||||
'fetchAndSaveBackupCdnObjectMetadata'
|
||||
);
|
||||
try {
|
||||
const targetOutputFile = path.join(outDir, 'backup.bin');
|
||||
|
||||
|
@ -149,6 +154,7 @@ export async function asymmetricRoundtripHarness(
|
|||
const actual = sortAndNormalize(messagesFromDatabase);
|
||||
assert.deepEqual(expected, actual);
|
||||
} finally {
|
||||
fetchAndSaveBackupCdnObjectMetadata.restore();
|
||||
await rm(outDir, { recursive: true });
|
||||
}
|
||||
}
|
||||
|
|
264
ts/test-electron/services/AttachmentBackupManager_test.ts
Normal file
264
ts/test-electron/services/AttachmentBackupManager_test.ts
Normal file
|
@ -0,0 +1,264 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import * as sinon from 'sinon';
|
||||
import { assert } from 'chai';
|
||||
import { join } from 'path';
|
||||
|
||||
import * as Bytes from '../../Bytes';
|
||||
import {
|
||||
AttachmentBackupManager,
|
||||
FILE_NOT_FOUND_ON_TRANSIT_TIER_STATUS,
|
||||
runAttachmentBackupJob,
|
||||
} from '../../jobs/AttachmentBackupManager';
|
||||
import type {
|
||||
AttachmentBackupJobType,
|
||||
CoreAttachmentBackupJobType,
|
||||
} from '../../types/AttachmentBackup';
|
||||
import dataInterface from '../../sql/Client';
|
||||
import { getRandomBytes } from '../../Crypto';
|
||||
import { VIDEO_MP4 } from '../../types/MIME';
|
||||
|
||||
const TRANSIT_CDN = 2;
|
||||
const TRANSIT_CDN_FOR_NEW_UPLOAD = 42;
|
||||
const BACKUP_CDN = 3;
|
||||
describe('AttachmentBackupManager/JobManager', () => {
|
||||
let backupManager: AttachmentBackupManager | undefined;
|
||||
let runJob: sinon.SinonSpy;
|
||||
let backupMediaBatch: sinon.SinonStub;
|
||||
let backupsService = {};
|
||||
let encryptAndUploadAttachment: sinon.SinonStub;
|
||||
let getAbsoluteAttachmentPath: sinon.SinonStub;
|
||||
let sandbox: sinon.SinonSandbox;
|
||||
let isInCall: sinon.SinonStub;
|
||||
|
||||
function composeJob(
|
||||
index: number,
|
||||
overrides: Partial<CoreAttachmentBackupJobType['data']> = {}
|
||||
): CoreAttachmentBackupJobType {
|
||||
const mediaName = `mediaName${index}`;
|
||||
|
||||
return {
|
||||
mediaName,
|
||||
type: 'standard',
|
||||
receivedAt: index,
|
||||
data: {
|
||||
path: 'ghost-kitty.mp4',
|
||||
contentType: VIDEO_MP4,
|
||||
keys: 'keys=',
|
||||
iv: 'iv==',
|
||||
digest: 'digest=',
|
||||
transitCdnInfo: {
|
||||
cdnKey: 'transitCdnKey',
|
||||
cdnNumber: TRANSIT_CDN,
|
||||
uploadTimestamp: Date.now(),
|
||||
},
|
||||
size: 128,
|
||||
...overrides,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
await dataInterface.removeAll();
|
||||
await window.storage.put('masterKey', Bytes.toBase64(getRandomBytes(32)));
|
||||
|
||||
sandbox = sinon.createSandbox();
|
||||
isInCall = sandbox.stub().returns(false);
|
||||
|
||||
backupMediaBatch = sandbox
|
||||
.stub()
|
||||
.returns(Promise.resolve({ responses: [{ isSuccess: true, cdn: 3 }] }));
|
||||
|
||||
backupsService = {
|
||||
credentials: {
|
||||
getHeadersForToday: () => Promise.resolve({}),
|
||||
},
|
||||
getBackupCdnInfo: () => ({
|
||||
isInBackupTier: false,
|
||||
}),
|
||||
};
|
||||
|
||||
encryptAndUploadAttachment = sinon.stub().returns({
|
||||
cdnKey: 'newKeyOnTransitTier',
|
||||
cdnNumber: TRANSIT_CDN_FOR_NEW_UPLOAD,
|
||||
});
|
||||
|
||||
getAbsoluteAttachmentPath = sandbox.stub().callsFake(path => {
|
||||
if (path === 'ghost-kitty.mp4') {
|
||||
return join(__dirname, '../../../fixtures/ghost-kitty.mp4');
|
||||
}
|
||||
return getAbsoluteAttachmentPath.wrappedMethod(path);
|
||||
});
|
||||
|
||||
runJob = sandbox.stub().callsFake((job: AttachmentBackupJobType) => {
|
||||
return runAttachmentBackupJob(job, false, {
|
||||
// @ts-expect-error incomplete stubbing
|
||||
backupsService,
|
||||
backupMediaBatch,
|
||||
getAbsoluteAttachmentPath,
|
||||
encryptAndUploadAttachment,
|
||||
});
|
||||
});
|
||||
|
||||
backupManager = new AttachmentBackupManager({
|
||||
...AttachmentBackupManager.defaultParams,
|
||||
shouldHoldOffOnStartingQueuedJobs: isInCall,
|
||||
runJob,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
sandbox.restore();
|
||||
delete window.textsecure.server;
|
||||
await backupManager?.stop();
|
||||
});
|
||||
|
||||
async function addJobs(
|
||||
num: number,
|
||||
overrides: Partial<CoreAttachmentBackupJobType['data']> = {}
|
||||
): Promise<Array<CoreAttachmentBackupJobType>> {
|
||||
const jobs = new Array(num)
|
||||
.fill(null)
|
||||
.map((_, idx) => composeJob(idx, overrides));
|
||||
for (const job of jobs) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await backupManager?.addJob(job);
|
||||
}
|
||||
return jobs;
|
||||
}
|
||||
|
||||
function waitForJobToBeStarted(
|
||||
job: CoreAttachmentBackupJobType,
|
||||
attempts: number = 0
|
||||
) {
|
||||
return backupManager?.waitForJobToBeStarted({ ...job, attempts });
|
||||
}
|
||||
|
||||
function waitForJobToBeCompleted(
|
||||
job: CoreAttachmentBackupJobType,
|
||||
attempts: number = 0
|
||||
) {
|
||||
return backupManager?.waitForJobToBeCompleted({ ...job, attempts });
|
||||
}
|
||||
|
||||
function assertRunJobCalledWith(jobs: Array<CoreAttachmentBackupJobType>) {
|
||||
return assert.strictEqual(
|
||||
JSON.stringify(runJob.getCalls().map(call => call.args[0].mediaName)),
|
||||
JSON.stringify(jobs.map(job => job.mediaName))
|
||||
);
|
||||
}
|
||||
|
||||
async function getAllSavedJobs(): Promise<Array<AttachmentBackupJobType>> {
|
||||
return dataInterface.getNextAttachmentBackupJobs({
|
||||
limit: 1000,
|
||||
timestamp: Infinity,
|
||||
});
|
||||
}
|
||||
|
||||
it('saves jobs, removes jobs, and runs 3 jobs at a time in descending receivedAt order', async () => {
|
||||
const jobs = await addJobs(5);
|
||||
|
||||
// Confirm they are saved to DB
|
||||
const allJobs = await getAllSavedJobs();
|
||||
assert.strictEqual(allJobs.length, 5);
|
||||
assert.strictEqual(
|
||||
JSON.stringify(allJobs.map(job => job.mediaName)),
|
||||
JSON.stringify([
|
||||
'mediaName4',
|
||||
'mediaName3',
|
||||
'mediaName2',
|
||||
'mediaName1',
|
||||
'mediaName0',
|
||||
])
|
||||
);
|
||||
|
||||
await backupManager?.start();
|
||||
await waitForJobToBeStarted(jobs[2]);
|
||||
|
||||
assert.strictEqual(runJob.callCount, 3);
|
||||
assertRunJobCalledWith([jobs[4], jobs[3], jobs[2]]);
|
||||
|
||||
await waitForJobToBeStarted(jobs[0]);
|
||||
assert.strictEqual(runJob.callCount, 5);
|
||||
assertRunJobCalledWith([jobs[4], jobs[3], jobs[2], jobs[1], jobs[0]]);
|
||||
|
||||
await waitForJobToBeCompleted(jobs[0]);
|
||||
assert.strictEqual((await getAllSavedJobs()).length, 0);
|
||||
});
|
||||
|
||||
it('with transitCdnInfo, will copy to backup tier', async () => {
|
||||
const [job] = await addJobs(1);
|
||||
await backupManager?.start();
|
||||
await waitForJobToBeCompleted(job);
|
||||
assert.strictEqual(backupMediaBatch.callCount, 1);
|
||||
assert.strictEqual(encryptAndUploadAttachment.callCount, 0);
|
||||
|
||||
assert.deepStrictEqual(
|
||||
backupMediaBatch.getCall(0).args[0].items[0].sourceAttachment,
|
||||
{ key: 'transitCdnKey', cdn: TRANSIT_CDN }
|
||||
);
|
||||
});
|
||||
|
||||
it('with transitCdnInfo, will upload to attachment tier if copy operation returns FileNotFoundOnTransitTier', async () => {
|
||||
backupMediaBatch.onFirstCall().returns(
|
||||
Promise.resolve({
|
||||
responses: [
|
||||
{ isSuccess: false, status: FILE_NOT_FOUND_ON_TRANSIT_TIER_STATUS },
|
||||
],
|
||||
})
|
||||
);
|
||||
|
||||
backupMediaBatch.onSecondCall().returns(
|
||||
Promise.resolve({
|
||||
responses: [{ isSuccess: true, cdn: BACKUP_CDN }],
|
||||
})
|
||||
);
|
||||
|
||||
const [job] = await addJobs(1);
|
||||
await backupManager?.start();
|
||||
await waitForJobToBeCompleted(job);
|
||||
assert.strictEqual(encryptAndUploadAttachment.callCount, 1);
|
||||
assert.strictEqual(backupMediaBatch.callCount, 2);
|
||||
|
||||
assert.deepStrictEqual(
|
||||
backupMediaBatch.getCall(0).args[0].items[0].sourceAttachment,
|
||||
{ key: 'transitCdnKey', cdn: TRANSIT_CDN }
|
||||
);
|
||||
assert.deepStrictEqual(
|
||||
backupMediaBatch.getCall(1).args[0].items[0].sourceAttachment,
|
||||
{ key: 'newKeyOnTransitTier', cdn: TRANSIT_CDN_FOR_NEW_UPLOAD }
|
||||
);
|
||||
|
||||
const allRemainingJobs = await getAllSavedJobs();
|
||||
assert.strictEqual(allRemainingJobs.length, 0);
|
||||
});
|
||||
|
||||
it('without transitCdnInfo, will upload then copy', async () => {
|
||||
const [job] = await addJobs(1, { transitCdnInfo: undefined });
|
||||
|
||||
await backupManager?.start();
|
||||
await waitForJobToBeCompleted(job);
|
||||
|
||||
assert.strictEqual(backupMediaBatch.callCount, 1);
|
||||
assert.strictEqual(encryptAndUploadAttachment.callCount, 1);
|
||||
|
||||
// Job removed
|
||||
const allRemainingJobs = await getAllSavedJobs();
|
||||
assert.strictEqual(allRemainingJobs.length, 0);
|
||||
});
|
||||
|
||||
it('without transitCdnInfo, will permanently remove job if file not found at path', async () => {
|
||||
const [job] = await addJobs(1, { transitCdnInfo: undefined });
|
||||
getAbsoluteAttachmentPath.returns('no/file/here');
|
||||
await backupManager?.start();
|
||||
await waitForJobToBeCompleted(job);
|
||||
|
||||
assert.strictEqual(backupMediaBatch.callCount, 0);
|
||||
assert.strictEqual(encryptAndUploadAttachment.callCount, 0);
|
||||
|
||||
// Job removed
|
||||
const allRemainingJobs = await getAllSavedJobs();
|
||||
assert.strictEqual(allRemainingJobs.length, 0);
|
||||
});
|
||||
});
|
|
@ -17,7 +17,7 @@ import dataInterface from '../../sql/Client';
|
|||
import { HOUR, MINUTE, SECOND } from '../../util/durations';
|
||||
import { type AciString } from '../../types/ServiceId';
|
||||
|
||||
describe('AttachmentDownloadManager', () => {
|
||||
describe('AttachmentDownloadManager/JobManager', () => {
|
||||
let downloadManager: AttachmentDownloadManager | undefined;
|
||||
let runJob: sinon.SinonStub;
|
||||
let sandbox: sinon.SinonSandbox;
|
||||
|
@ -58,10 +58,10 @@ describe('AttachmentDownloadManager', () => {
|
|||
await dataInterface.removeAll();
|
||||
|
||||
sandbox = sinon.createSandbox();
|
||||
clock = sinon.useFakeTimers();
|
||||
clock = sandbox.useFakeTimers();
|
||||
|
||||
isInCall = sinon.stub().returns(false);
|
||||
runJob = sinon.stub().callsFake(async () => {
|
||||
isInCall = sandbox.stub().returns(false);
|
||||
runJob = sandbox.stub().callsFake(async () => {
|
||||
return new Promise<{ status: 'finished' | 'retry' }>(resolve => {
|
||||
Promise.resolve().then(() => {
|
||||
resolve({ status: 'finished' });
|
||||
|
@ -71,7 +71,7 @@ describe('AttachmentDownloadManager', () => {
|
|||
|
||||
downloadManager = new AttachmentDownloadManager({
|
||||
...AttachmentDownloadManager.defaultParams,
|
||||
isInCall,
|
||||
shouldHoldOffOnStartingQueuedJobs: isInCall,
|
||||
runJob,
|
||||
});
|
||||
});
|
||||
|
@ -287,7 +287,7 @@ describe('AttachmentDownloadManager', () => {
|
|||
assert.strictEqual(retriedJob?.attempts, 1);
|
||||
assert.isNumber(retriedJob?.retryAfter);
|
||||
|
||||
await advanceTime(30 * SECOND);
|
||||
await advanceTime(60 * SECOND); // one tick
|
||||
await job1Attempts[1].completed;
|
||||
assert.strictEqual(runJob.callCount, 3);
|
||||
|
||||
|
@ -331,7 +331,7 @@ describe('AttachmentDownloadManager', () => {
|
|||
await attempts[0].completed;
|
||||
assert.strictEqual(runJob.callCount, 1);
|
||||
|
||||
await advanceTime(30 * SECOND);
|
||||
await advanceTime(1 * MINUTE);
|
||||
await attempts[1].completed;
|
||||
assert.strictEqual(runJob.callCount, 2);
|
||||
|
||||
|
@ -340,12 +340,12 @@ describe('AttachmentDownloadManager', () => {
|
|||
assert.strictEqual(runJob.callCount, 3);
|
||||
|
||||
// add the same job again and it should retry ASAP and reset attempts
|
||||
attempts = getPromisesForAttempts(jobs[0], 4);
|
||||
attempts = getPromisesForAttempts(jobs[0], 5);
|
||||
await downloadManager?.addJob(jobs[0]);
|
||||
await attempts[0].completed;
|
||||
assert.strictEqual(runJob.callCount, 4);
|
||||
|
||||
await advanceTime(30 * SECOND);
|
||||
await advanceTime(1 * MINUTE);
|
||||
await attempts[1].completed;
|
||||
assert.strictEqual(runJob.callCount, 5);
|
||||
|
||||
|
@ -358,7 +358,7 @@ describe('AttachmentDownloadManager', () => {
|
|||
assert.strictEqual(runJob.callCount, 7);
|
||||
|
||||
await advanceTime(6 * HOUR);
|
||||
await attempts[3].completed;
|
||||
await attempts[4].completed;
|
||||
assert.strictEqual(runJob.callCount, 8);
|
||||
|
||||
// Ensure it's been removed
|
||||
|
|
|
@ -937,7 +937,10 @@ describe('both/state/ducks/stories', () => {
|
|||
},
|
||||
],
|
||||
};
|
||||
|
||||
await window.Signal.Data.saveMessage(messageAttributes, {
|
||||
forceSave: true,
|
||||
ourAci: generateAci(),
|
||||
});
|
||||
const rootState = getEmptyRootState();
|
||||
|
||||
const getState = () => ({
|
||||
|
@ -1000,6 +1003,10 @@ describe('both/state/ducks/stories', () => {
|
|||
preview: [preview],
|
||||
};
|
||||
|
||||
await window.Signal.Data.saveMessage(messageAttributes, {
|
||||
forceSave: true,
|
||||
ourAci: generateAci(),
|
||||
});
|
||||
const rootState = getEmptyRootState();
|
||||
|
||||
const getState = () => ({
|
||||
|
|
|
@ -1062,6 +1062,10 @@ export const backupMediaBatchResponseSchema = z.object({
|
|||
cdn: z.number(),
|
||||
mediaId: z.string(),
|
||||
})
|
||||
.transform(response => ({
|
||||
...response,
|
||||
isSuccess: isSuccess(response.status),
|
||||
}))
|
||||
.array(),
|
||||
});
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import { AttachmentSizeError, type AttachmentType } from '../types/Attachment';
|
|||
import * as MIME from '../types/MIME';
|
||||
import * as Bytes from '../Bytes';
|
||||
import {
|
||||
deriveMediaIdFromMediaName,
|
||||
deriveBackupMediaKeyMaterial,
|
||||
type BackupMediaKeyMaterialType,
|
||||
} from '../Crypto';
|
||||
|
@ -30,6 +29,7 @@ import { createName, getRelativePath } from '../windows/attachments';
|
|||
import { MediaTier } from '../types/AttachmentDownload';
|
||||
import { getBackupKey } from '../services/backups/crypto';
|
||||
import { backupsService } from '../services/backups';
|
||||
import { getMediaIdForAttachment } from '../services/backups/util/mediaId';
|
||||
|
||||
const DEFAULT_BACKUP_CDN_NUMBER = 3;
|
||||
|
||||
|
@ -39,23 +39,12 @@ export function getCdnKey(attachment: ProcessedAttachment): string {
|
|||
return cdnKey;
|
||||
}
|
||||
|
||||
function getMediaIdBytes(attachment: ProcessedAttachment): Uint8Array {
|
||||
const mediaName = attachment.backupLocator?.mediaName;
|
||||
strictAssert(mediaName, 'Attachment was missing mediaName');
|
||||
const backupKey = getBackupKey();
|
||||
return deriveMediaIdFromMediaName(backupKey, mediaName);
|
||||
}
|
||||
|
||||
function getMediaIdForBackupTier(attachment: ProcessedAttachment): string {
|
||||
return Bytes.toBase64url(getMediaIdBytes(attachment));
|
||||
}
|
||||
|
||||
function getBackupMediaKeyMaterial(
|
||||
attachment: ProcessedAttachment
|
||||
attachment: AttachmentType
|
||||
): BackupMediaKeyMaterialType {
|
||||
const mediaId = getMediaIdBytes(attachment);
|
||||
const mediaId = getMediaIdForAttachment(attachment);
|
||||
const backupKey = getBackupKey();
|
||||
return deriveBackupMediaKeyMaterial(backupKey, mediaId);
|
||||
return deriveBackupMediaKeyMaterial(backupKey, mediaId.bytes);
|
||||
}
|
||||
|
||||
async function getCdnNumberForBackupTier(
|
||||
|
@ -106,7 +95,7 @@ export async function downloadAttachment(
|
|||
});
|
||||
downloadedPath = await downloadToDisk({ downloadStream, size });
|
||||
} else {
|
||||
const mediaId = getMediaIdForBackupTier(attachment);
|
||||
const mediaId = getMediaIdForAttachment(attachment);
|
||||
const cdnNumber = await getCdnNumberForBackupTier(attachment);
|
||||
const cdnCredentials =
|
||||
await backupsService.credentials.getCDNReadCredentials(cdnNumber);
|
||||
|
@ -115,7 +104,7 @@ export async function downloadAttachment(
|
|||
const mediaDir = await backupsService.api.getMediaDir();
|
||||
|
||||
const downloadStream = await server.getAttachmentFromBackupTier({
|
||||
mediaId,
|
||||
mediaId: mediaId.string,
|
||||
backupDir,
|
||||
mediaDir,
|
||||
headers: cdnCredentials.headers,
|
||||
|
|
|
@ -1001,12 +1001,18 @@ export function getAttachmentSignature(attachment: AttachmentType): string {
|
|||
}
|
||||
|
||||
type RequiredPropertiesForDecryption = 'key' | 'digest';
|
||||
type RequiredPropertiesForReencryption = 'key' | 'digest' | 'iv';
|
||||
|
||||
type DecryptableAttachment = WithRequiredProperties<
|
||||
AttachmentType,
|
||||
RequiredPropertiesForDecryption
|
||||
>;
|
||||
|
||||
type ReencryptableAttachment = WithRequiredProperties<
|
||||
AttachmentType,
|
||||
RequiredPropertiesForReencryption
|
||||
>;
|
||||
|
||||
export type AttachmentDownloadableFromTransitTier = WithRequiredProperties<
|
||||
DecryptableAttachment,
|
||||
'cdnKey' | 'cdnNumber'
|
||||
|
@ -1024,15 +1030,25 @@ export type LocallySavedAttachment = WithRequiredProperties<
|
|||
|
||||
export type AttachmentReadyForBackup = WithRequiredProperties<
|
||||
LocallySavedAttachment,
|
||||
RequiredPropertiesForDecryption
|
||||
RequiredPropertiesForReencryption
|
||||
>;
|
||||
|
||||
function isDecryptable(
|
||||
export function isDecryptable(
|
||||
attachment: AttachmentType
|
||||
): attachment is DecryptableAttachment {
|
||||
return Boolean(attachment.key) && Boolean(attachment.digest);
|
||||
}
|
||||
|
||||
export function isReencryptableToSameDigest(
|
||||
attachment: AttachmentType
|
||||
): attachment is ReencryptableAttachment {
|
||||
return (
|
||||
Boolean(attachment.key) &&
|
||||
Boolean(attachment.digest) &&
|
||||
Boolean(attachment.iv)
|
||||
);
|
||||
}
|
||||
|
||||
export function isDownloadableFromTransitTier(
|
||||
attachment: AttachmentType
|
||||
): attachment is AttachmentDownloadableFromTransitTier {
|
||||
|
|
102
ts/types/AttachmentBackup.ts
Normal file
102
ts/types/AttachmentBackup.ts
Normal file
|
@ -0,0 +1,102 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import { z } from 'zod';
|
||||
import {
|
||||
type JobManagerJobType,
|
||||
jobManagerJobSchema,
|
||||
} from '../jobs/JobManager';
|
||||
import { type MIMEType, MIMETypeSchema } from './MIME';
|
||||
|
||||
export type CoreAttachmentBackupJobType =
|
||||
| StandardAttachmentBackupJobType
|
||||
| ThumbnailAttachmentBackupJobType;
|
||||
|
||||
type StandardAttachmentBackupJobType = {
|
||||
type: 'standard';
|
||||
mediaName: string;
|
||||
receivedAt: number;
|
||||
data: {
|
||||
path: string | null;
|
||||
contentType: MIMEType;
|
||||
keys: string;
|
||||
digest: string;
|
||||
iv: string;
|
||||
transitCdnInfo?: {
|
||||
cdnKey: string;
|
||||
cdnNumber: number;
|
||||
uploadTimestamp?: number;
|
||||
};
|
||||
size: number;
|
||||
};
|
||||
};
|
||||
|
||||
type ThumbnailAttachmentBackupJobType = {
|
||||
type: 'thumbnail';
|
||||
mediaName: string;
|
||||
receivedAt: number;
|
||||
data: {
|
||||
fullsizePath: string | null;
|
||||
contentType: MIMEType;
|
||||
keys: string;
|
||||
};
|
||||
};
|
||||
|
||||
const standardBackupJobDataSchema = z.object({
|
||||
type: z.literal('standard'),
|
||||
data: z.object({
|
||||
path: z.string(),
|
||||
size: z.number(),
|
||||
contentType: MIMETypeSchema,
|
||||
keys: z.string(),
|
||||
iv: z.string(),
|
||||
digest: z.string(),
|
||||
transitCdnInfo: z
|
||||
.object({
|
||||
cdnKey: z.string(),
|
||||
cdnNumber: z.number(),
|
||||
uploadTimestamp: z.number().optional(),
|
||||
})
|
||||
.optional(),
|
||||
}),
|
||||
});
|
||||
|
||||
const thumbnailBackupJobDataSchema = z.object({
|
||||
type: z.literal('thumbnail'),
|
||||
data: z.object({
|
||||
fullsizePath: z.string(),
|
||||
contentType: MIMETypeSchema,
|
||||
keys: z.string(),
|
||||
}),
|
||||
});
|
||||
|
||||
export const attachmentBackupJobSchema = z
|
||||
.object({
|
||||
mediaName: z.string(),
|
||||
receivedAt: z.number(),
|
||||
})
|
||||
.and(
|
||||
z.discriminatedUnion('type', [
|
||||
standardBackupJobDataSchema,
|
||||
thumbnailBackupJobDataSchema,
|
||||
])
|
||||
)
|
||||
.and(jobManagerJobSchema) satisfies z.ZodType<
|
||||
AttachmentBackupJobType,
|
||||
z.ZodTypeDef,
|
||||
// With branded types, we need to specify that the input type of the schema is just a
|
||||
// string
|
||||
Omit<AttachmentBackupJobType, 'data'> & {
|
||||
data: Omit<AttachmentBackupJobType['data'], 'contentType'> & {
|
||||
contentType: string;
|
||||
};
|
||||
}
|
||||
>;
|
||||
|
||||
export const thumbnailBackupJobRecordSchema = z.object({
|
||||
mediaName: z.string(),
|
||||
type: z.literal('standard'),
|
||||
json: thumbnailBackupJobDataSchema.omit({ type: true }),
|
||||
});
|
||||
|
||||
export type AttachmentBackupJobType = CoreAttachmentBackupJobType &
|
||||
JobManagerJobType;
|
|
@ -3,6 +3,10 @@
|
|||
import { z } from 'zod';
|
||||
import { MIMETypeSchema, type MIMEType } from './MIME';
|
||||
import type { AttachmentType } from './Attachment';
|
||||
import {
|
||||
type JobManagerJobType,
|
||||
jobManagerJobSchema,
|
||||
} from '../jobs/JobManager';
|
||||
|
||||
export enum MediaTier {
|
||||
STANDARD = 'standard',
|
||||
|
@ -22,22 +26,21 @@ export type AttachmentDownloadJobTypeType = z.infer<
|
|||
typeof attachmentDownloadTypeSchema
|
||||
>;
|
||||
|
||||
export type AttachmentDownloadJobType = {
|
||||
export type CoreAttachmentDownloadJobType = {
|
||||
messageId: string;
|
||||
receivedAt: number;
|
||||
sentAt: number;
|
||||
attachmentType: AttachmentDownloadJobTypeType;
|
||||
attachment: AttachmentType;
|
||||
attempts: number;
|
||||
active: boolean;
|
||||
retryAfter: number | null;
|
||||
lastAttemptTimestamp: number | null;
|
||||
digest: string;
|
||||
contentType: MIMEType;
|
||||
size: number;
|
||||
};
|
||||
|
||||
export const attachmentDownloadJobSchema = z.object({
|
||||
export type AttachmentDownloadJobType = CoreAttachmentDownloadJobType &
|
||||
JobManagerJobType;
|
||||
|
||||
export const coreAttachmentDownloadJobSchema = z.object({
|
||||
messageId: z.string(),
|
||||
receivedAt: z.number(),
|
||||
sentAt: z.number(),
|
||||
|
@ -45,15 +48,15 @@ export const attachmentDownloadJobSchema = z.object({
|
|||
attachment: z
|
||||
.object({ size: z.number(), contentType: MIMETypeSchema })
|
||||
.passthrough(),
|
||||
attempts: z.number(),
|
||||
active: z.boolean(),
|
||||
retryAfter: z.number().nullable(),
|
||||
lastAttemptTimestamp: z.number().nullable(),
|
||||
digest: z.string(),
|
||||
contentType: MIMETypeSchema,
|
||||
size: z.number(),
|
||||
messageIdForLogging: z.string().optional(),
|
||||
}) satisfies z.ZodType<
|
||||
});
|
||||
|
||||
export const attachmentDownloadJobSchema = coreAttachmentDownloadJobSchema.and(
|
||||
jobManagerJobSchema
|
||||
) satisfies z.ZodType<
|
||||
Omit<AttachmentDownloadJobType, 'attachment' | 'contentType'> & {
|
||||
contentType: string;
|
||||
attachment: Record<string, unknown>;
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { strictAssert } from './assert';
|
||||
import * as durations from './durations';
|
||||
|
||||
const BACKOFF_FACTOR = 1.9;
|
||||
const MAX_BACKOFF = 15 * durations.MINUTE;
|
||||
const FIRST_BACKOFF = 100 * BACKOFF_FACTOR;
|
||||
|
||||
const FIRST_BACKOFFS = [0, 190];
|
||||
/**
|
||||
* For a given attempt, how long should we sleep (in milliseconds)?
|
||||
*
|
||||
|
@ -21,24 +21,35 @@ const FIRST_BACKOFF = 100 * BACKOFF_FACTOR;
|
|||
export type ExponentialBackoffOptionsType = {
|
||||
maxBackoffTime: number;
|
||||
multiplier: number;
|
||||
firstBackoffTime: number;
|
||||
firstBackoffs: Array<number>;
|
||||
};
|
||||
export function exponentialBackoffSleepTime(
|
||||
attempt: number,
|
||||
options: ExponentialBackoffOptionsType = {
|
||||
maxBackoffTime: MAX_BACKOFF,
|
||||
multiplier: BACKOFF_FACTOR,
|
||||
firstBackoffTime: FIRST_BACKOFF,
|
||||
firstBackoffs: FIRST_BACKOFFS,
|
||||
}
|
||||
): number {
|
||||
if (attempt === 1) {
|
||||
return 0;
|
||||
const numHardcodedBackoffs = options.firstBackoffs.length;
|
||||
strictAssert(
|
||||
numHardcodedBackoffs > 0,
|
||||
'must include explicit first backoffs'
|
||||
);
|
||||
|
||||
if (attempt - 1 < numHardcodedBackoffs) {
|
||||
return options.firstBackoffs[attempt - 1];
|
||||
}
|
||||
|
||||
const lastHardcodedBackoff = options.firstBackoffs.at(-1);
|
||||
strictAssert(
|
||||
lastHardcodedBackoff != null && lastHardcodedBackoff > 0,
|
||||
'lastHardcodedBackoff must be a positive number'
|
||||
);
|
||||
return Math.min(
|
||||
options.maxBackoffTime,
|
||||
(options.firstBackoffTime / options.multiplier) *
|
||||
options.multiplier ** (attempt - 1)
|
||||
(lastHardcodedBackoff / options.multiplier) *
|
||||
options.multiplier ** (attempt - numHardcodedBackoffs + 1)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import {
|
|||
encryptAttachmentV2ToDisk,
|
||||
safeUnlinkSync,
|
||||
type PlaintextSourceType,
|
||||
type HardcodedIVForEncryptionType,
|
||||
} from '../AttachmentCrypto';
|
||||
import { missingCaseError } from './missingCaseError';
|
||||
|
||||
|
@ -58,10 +59,12 @@ export async function uploadAttachment(
|
|||
export async function encryptAndUploadAttachment({
|
||||
plaintext,
|
||||
keys,
|
||||
dangerousIv,
|
||||
uploadType,
|
||||
}: {
|
||||
plaintext: PlaintextSourceType;
|
||||
keys: Uint8Array;
|
||||
dangerousIv?: HardcodedIVForEncryptionType;
|
||||
uploadType: 'standard' | 'backup';
|
||||
}): Promise<{
|
||||
cdnKey: string;
|
||||
|
@ -91,6 +94,7 @@ export async function encryptAndUploadAttachment({
|
|||
const encrypted = await encryptAttachmentV2ToDisk({
|
||||
plaintext,
|
||||
keys,
|
||||
dangerousIv,
|
||||
});
|
||||
|
||||
absoluteCiphertextPath = window.Signal.Migrations.getAbsoluteAttachmentPath(
|
||||
|
|
Loading…
Reference in a new issue