Add media granularity to backup attachment download source
This commit is contained in:
parent
2432631fb9
commit
11e612f57b
12 changed files with 74 additions and 49 deletions
|
@ -253,7 +253,7 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
|
|||
job.messageId
|
||||
);
|
||||
|
||||
if (job.source === AttachmentDownloadSource.BACKUP_IMPORT) {
|
||||
if (job.source === AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA) {
|
||||
const { outOfSpace } =
|
||||
await this.#checkFreeDiskSpaceForBackupImport();
|
||||
if (outOfSpace) {
|
||||
|
@ -297,26 +297,22 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
|
|||
|
||||
const logId = `AttachmentDownloadManager/addJob(${sentAt}.${attachmentType})`;
|
||||
|
||||
if (source === AttachmentDownloadSource.BACKUP_IMPORT) {
|
||||
// For non-media-enabled backups, we will skip queueing download for old attachments
|
||||
// that cannot still be on the transit tier
|
||||
if (source === AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA) {
|
||||
if (attachment.error) {
|
||||
return attachment;
|
||||
}
|
||||
|
||||
// For non-media-enabled backups, we will skip queueing download for old attachments
|
||||
// that cannot still be on the transit tier
|
||||
if (!this.#hasMediaBackups() && !wasImportedFromLocalBackup(attachment)) {
|
||||
const attachmentUploadedAt = attachment.uploadTimestamp || sentAt;
|
||||
|
||||
// Skip queueing download if attachment is older than twice the message queue time
|
||||
// (a generous buffer that ensures we download anything that could still exist on
|
||||
// the transit tier)
|
||||
if (
|
||||
isOlderThan(attachmentUploadedAt, this.#getMessageQueueTime() * 2)
|
||||
) {
|
||||
if (isOlderThan(attachmentUploadedAt, this.#getMessageQueueTime() * 2)) {
|
||||
return attachment;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const parseResult = safeParsePartial(coreAttachmentDownloadJobSchema, {
|
||||
attachment,
|
||||
|
|
|
@ -2,7 +2,10 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { Aci, Pni, ServiceId } from '@signalapp/libsignal-client';
|
||||
import { ReceiptCredentialPresentation } from '@signalapp/libsignal-client/zkgroup';
|
||||
import {
|
||||
BackupLevel,
|
||||
ReceiptCredentialPresentation,
|
||||
} from '@signalapp/libsignal-client/zkgroup';
|
||||
import { v7 as generateUuid } from 'uuid';
|
||||
import pMap from 'p-map';
|
||||
import { Writable } from 'stream';
|
||||
|
@ -124,8 +127,8 @@ import {
|
|||
} from '../../util/callLinksRingrtc';
|
||||
import { loadAllAndReinitializeRedux } from '../allLoaders';
|
||||
import {
|
||||
resetBackupMediaDownloadProgress,
|
||||
startBackupMediaDownload,
|
||||
resetBackupMediaDownloadStats,
|
||||
} from '../../util/backupMediaDownload';
|
||||
import {
|
||||
getEnvironment,
|
||||
|
@ -254,6 +257,7 @@ export class BackupImportStream extends Writable {
|
|||
#releaseNotesChatId: Long | undefined;
|
||||
#pendingGroupAvatars = new Map<string, string>();
|
||||
#frameErrorCount: number = 0;
|
||||
#backupTier: BackupLevel | undefined;
|
||||
|
||||
private constructor(
|
||||
private readonly backupType: BackupType,
|
||||
|
@ -267,7 +271,8 @@ export class BackupImportStream extends Writable {
|
|||
localBackupSnapshotDir: string | undefined = undefined
|
||||
): Promise<BackupImportStream> {
|
||||
await AttachmentDownloadManager.stop();
|
||||
await resetBackupMediaDownloadProgress();
|
||||
await DataWriter.removeAllBackupAttachmentDownloadJobs();
|
||||
await resetBackupMediaDownloadStats();
|
||||
|
||||
return new BackupImportStream(backupType, localBackupSnapshotDir);
|
||||
}
|
||||
|
@ -673,7 +678,9 @@ export class BackupImportStream extends Writable {
|
|||
const model = new MessageModel(attributes);
|
||||
attachmentDownloadJobPromises.push(
|
||||
queueAttachmentDownloads(model, {
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: this.#isMediaEnabledBackup()
|
||||
? AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
: AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA,
|
||||
isManualDownload: false,
|
||||
})
|
||||
);
|
||||
|
@ -835,6 +842,7 @@ export class BackupImportStream extends Writable {
|
|||
);
|
||||
}
|
||||
|
||||
this.#backupTier = accountSettings?.backupTier?.toNumber();
|
||||
await storage.put('backupTier', accountSettings?.backupTier?.toNumber());
|
||||
|
||||
const { PhoneNumberSharingMode: BackupMode } = Backups.AccountData;
|
||||
|
@ -3774,6 +3782,14 @@ export class BackupImportStream extends Writable {
|
|||
|
||||
return {};
|
||||
}
|
||||
|
||||
#isLocalBackup() {
|
||||
return this.localBackupSnapshotDir != null;
|
||||
}
|
||||
|
||||
#isMediaEnabledBackup() {
|
||||
return this.#isLocalBackup() || this.#backupTier === BackupLevel.Paid;
|
||||
}
|
||||
}
|
||||
|
||||
function rgbIntToDesktopHSL(intValue: number): {
|
||||
|
|
|
@ -833,7 +833,9 @@ async function saveAttachmentDownloadJob(
|
|||
job: AttachmentDownloadJobType
|
||||
): Promise<void> {
|
||||
await writableChannel.saveAttachmentDownloadJob(job);
|
||||
if (job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT) {
|
||||
if (
|
||||
job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
) {
|
||||
drop(
|
||||
throttledUpdateBackupMediaDownloadProgress(
|
||||
readableChannel.getBackupAttachmentDownloadProgress
|
||||
|
@ -847,7 +849,8 @@ async function saveAttachmentDownloadJobs(
|
|||
await writableChannel.saveAttachmentDownloadJobs(jobs);
|
||||
if (
|
||||
jobs.some(
|
||||
job => job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT
|
||||
job =>
|
||||
job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
)
|
||||
) {
|
||||
drop(
|
||||
|
@ -862,7 +865,9 @@ async function removeAttachmentDownloadJob(
|
|||
job: AttachmentDownloadJobType
|
||||
): Promise<void> {
|
||||
await writableChannel.removeAttachmentDownloadJob(job);
|
||||
if (job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT) {
|
||||
if (
|
||||
job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
) {
|
||||
drop(
|
||||
throttledUpdateBackupMediaDownloadProgress(
|
||||
readableChannel.getBackupAttachmentDownloadProgress
|
||||
|
|
|
@ -548,7 +548,10 @@ export type GetRecentStoryRepliesOptionsType = {
|
|||
};
|
||||
|
||||
export enum AttachmentDownloadSource {
|
||||
BACKUP_IMPORT = 'backup_import',
|
||||
// Imported when paid (media) backups were enabled, or from a local backup
|
||||
BACKUP_IMPORT_WITH_MEDIA = 'backup_import',
|
||||
// Imported when paid (media) backups were not enabled
|
||||
BACKUP_IMPORT_NO_MEDIA = 'backup_import_no_media',
|
||||
STANDARD = 'standard',
|
||||
BACKFILL = 'backfill',
|
||||
}
|
||||
|
|
|
@ -5577,7 +5577,10 @@ function _getAttachmentDownloadJob(
|
|||
function removeAllBackupAttachmentDownloadJobs(db: WritableDB): void {
|
||||
const [query, params] = sql`
|
||||
DELETE FROM attachment_downloads
|
||||
WHERE source = ${AttachmentDownloadSource.BACKUP_IMPORT};`;
|
||||
WHERE
|
||||
source = ${AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA}
|
||||
OR
|
||||
source = ${AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA};`;
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
|
||||
|
@ -7739,6 +7742,7 @@ function removeAllConfiguration(db: WritableDB): void {
|
|||
db.exec(
|
||||
`
|
||||
DELETE FROM attachment_backup_jobs;
|
||||
DELETE FROM attachment_downloads;
|
||||
DELETE FROM backup_cdn_object_metadata;
|
||||
DELETE FROM groupSendCombinedEndorsement;
|
||||
DELETE FROM groupSendMemberEndorsement;
|
||||
|
|
|
@ -336,7 +336,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
|
||||
it('triggers onLowDiskSpace for backup import jobs', async () => {
|
||||
const jobs = await addJobs(1, _idx => ({
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
}));
|
||||
|
||||
const jobAttempts = getPromisesForAttempts(jobs[0], 2);
|
||||
|
@ -479,7 +479,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
const jobs = await addJobs(6, idx => ({
|
||||
source:
|
||||
idx % 2 === 0
|
||||
? AttachmentDownloadSource.BACKUP_IMPORT
|
||||
? AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
: AttachmentDownloadSource.STANDARD,
|
||||
}));
|
||||
// make one of the backup job messages visible to test that code path as well
|
||||
|
@ -506,7 +506,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
it('retries backup job immediately if retryAfters are reset', async () => {
|
||||
strictAssert(downloadManager, 'must exist');
|
||||
const jobs = await addJobs(1, {
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
});
|
||||
const jobAttempts = getPromisesForAttempts(jobs[0], 2);
|
||||
|
||||
|
@ -532,7 +532,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
strictAssert(downloadManager, 'must exist');
|
||||
const job = (
|
||||
await addJobs(1, {
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
})
|
||||
)[0];
|
||||
const jobAttempts = getPromisesForAttempts(job, 3);
|
||||
|
@ -567,11 +567,10 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
|
||||
describe('will drop jobs from non-media backup imports that are old', () => {
|
||||
it('will not queue attachments older than 90 days (2 * message queue time)', async () => {
|
||||
hasMediaBackups.returns(false);
|
||||
await addJobs(
|
||||
1,
|
||||
{
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA,
|
||||
},
|
||||
{ uploadTimestamp: Date.now() - 4 * MONTH }
|
||||
);
|
||||
|
@ -586,7 +585,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
await addJobs(
|
||||
1,
|
||||
{
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
},
|
||||
{ uploadTimestamp: Date.now() - 4 * MONTH }
|
||||
);
|
||||
|
@ -601,7 +600,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
await addJobs(
|
||||
1,
|
||||
{
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
},
|
||||
{
|
||||
uploadTimestamp: Date.now() - 4 * MONTH,
|
||||
|
@ -620,7 +619,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
|
|||
await addJobs(
|
||||
1,
|
||||
{
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA,
|
||||
sentAt: Date.now() - 4 * MONTH,
|
||||
},
|
||||
{ uploadTimestamp: 0 }
|
||||
|
|
|
@ -47,8 +47,8 @@ describe('sql/AttachmentDownloadBackupStats', () => {
|
|||
const backupJob1 = createAttachmentDownloadJob(1, {
|
||||
messageId: 'message0',
|
||||
ciphertextSize: 1000,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
});
|
||||
await DataWriter.saveAttachmentDownloadJob(backupJob1);
|
||||
assert.deepStrictEqual(
|
||||
|
@ -59,8 +59,8 @@ describe('sql/AttachmentDownloadBackupStats', () => {
|
|||
const backupJob2 = createAttachmentDownloadJob(2, {
|
||||
messageId: 'message0',
|
||||
ciphertextSize: 2000,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
});
|
||||
await DataWriter.saveAttachmentDownloadJob(backupJob2);
|
||||
assert.deepStrictEqual(
|
||||
|
@ -128,8 +128,8 @@ describe('sql/AttachmentDownloadBackupStats', () => {
|
|||
}
|
||||
);
|
||||
const backupJob = createAttachmentDownloadJob(0, {
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
ciphertextSize: 128,
|
||||
});
|
||||
await DataWriter.saveAttachmentDownloadJob(backupJob);
|
||||
|
@ -167,8 +167,8 @@ describe('sql/AttachmentDownloadBackupStats', () => {
|
|||
}
|
||||
);
|
||||
const backupJob = createAttachmentDownloadJob(0, {
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
originalSource: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
source: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
ciphertextSize: 128,
|
||||
});
|
||||
await DataWriter.saveAttachmentDownloadJob(backupJob);
|
||||
|
@ -190,7 +190,7 @@ describe('sql/AttachmentDownloadBackupStats', () => {
|
|||
|
||||
assert.strictEqual(
|
||||
savedJob?.originalSource,
|
||||
AttachmentDownloadSource.BACKUP_IMPORT
|
||||
AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA
|
||||
);
|
||||
assert.strictEqual(savedJob?.source, AttachmentDownloadSource.STANDARD);
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ describe('SQL/updateToSchemaVersion1200', () => {
|
|||
source:
|
||||
i < NUM_STANDARD_JOBS
|
||||
? AttachmentDownloadSource.STANDARD
|
||||
: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
});
|
||||
}
|
||||
})();
|
||||
|
|
|
@ -87,7 +87,7 @@ describe('SQL/updateToSchemaVersion1410', () => {
|
|||
source:
|
||||
i < 5
|
||||
? AttachmentDownloadSource.STANDARD
|
||||
: AttachmentDownloadSource.BACKUP_IMPORT,
|
||||
: AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA,
|
||||
});
|
||||
}
|
||||
})();
|
||||
|
|
|
@ -36,14 +36,12 @@ export async function resetBackupMediaDownloadItems(): Promise<void> {
|
|||
|
||||
export async function cancelBackupMediaDownload(): Promise<void> {
|
||||
log.info('Canceling media download');
|
||||
await window.storage.put('backupMediaDownloadBannerDismissed', true);
|
||||
await dismissBackupMediaDownloadBanner();
|
||||
await DataWriter.removeAllBackupAttachmentDownloadJobs();
|
||||
await DataWriter.resetBackupAttachmentDownloadStats();
|
||||
await resetBackupMediaDownloadItems();
|
||||
await resetBackupMediaDownloadStats();
|
||||
}
|
||||
|
||||
export async function resetBackupMediaDownloadProgress(): Promise<void> {
|
||||
await DataWriter.removeAllBackupAttachmentDownloadJobs();
|
||||
export async function resetBackupMediaDownloadStats(): Promise<void> {
|
||||
await DataWriter.resetBackupAttachmentDownloadStats();
|
||||
await resetBackupMediaDownloadItems();
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import type { Backups, SignalService } from '../protobuf';
|
|||
import * as Bytes from '../Bytes';
|
||||
import { drop } from './drop';
|
||||
import { createLogger } from '../logging/log';
|
||||
import { resetBackupMediaDownloadStats } from './backupMediaDownload';
|
||||
|
||||
const log = createLogger('BackupSubscriptionData');
|
||||
|
||||
|
@ -64,6 +65,7 @@ export async function saveBackupTier(
|
|||
await window.storage.put('backupTier', backupTier);
|
||||
if (backupTier !== previousBackupTier) {
|
||||
log.info('backup tier has changed', { previousBackupTier, backupTier });
|
||||
await resetBackupMediaDownloadStats();
|
||||
drop(window.Signal.Services.backups.resetCachedData());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,9 @@ export type MessageAttachmentsDownloadedType = {
|
|||
};
|
||||
|
||||
function getLogger(source: AttachmentDownloadSource) {
|
||||
const verbose = source !== AttachmentDownloadSource.BACKUP_IMPORT;
|
||||
const verbose =
|
||||
source !== AttachmentDownloadSource.BACKUP_IMPORT_NO_MEDIA &&
|
||||
source !== AttachmentDownloadSource.BACKUP_IMPORT_WITH_MEDIA;
|
||||
const log = verbose ? defaultLogger : { ...defaultLogger, info: () => null };
|
||||
return log;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue