Refactor backup media download progress tracking

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2025-07-18 18:19:02 -05:00 committed by GitHub
parent b5ce45aeed
commit 62cecc0a1c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 668 additions and 102 deletions

View file

@ -46,13 +46,13 @@
height: 24px;
@include mixins.light-theme {
@include mixins.color-svg(
'../images/icons/v3/signal_backups.svg',
'../images/icons/v3/signal_backups/signal_backups.svg',
variables.$color-ultramarine
);
}
@include mixins.dark-theme {
@include mixins.color-svg(
'../images/icons/v3/signal_backups.svg',
'../images/icons/v3/signal_backups/signal_backups.svg',
variables.$color-ultramarine-light
);
}

View file

@ -14,7 +14,7 @@ import {
coreAttachmentDownloadJobSchema,
} from '../types/AttachmentDownload';
import { downloadAttachment as downloadAttachmentUtil } from '../util/downloadAttachment';
import { DataWriter } from '../sql/Client';
import { DataReader, DataWriter } from '../sql/Client';
import { getValue } from '../RemoteConfig';
import { isInCall as isInCallSelector } from '../state/selectors/calling';
@ -23,7 +23,6 @@ import {
type AttachmentType,
AttachmentVariant,
AttachmentPermanentlyUndownloadableError,
hasRequiredInformationForBackup,
wasImportedFromLocalBackup,
canAttachmentHaveThumbnail,
shouldAttachmentEndUpInRemoteBackup,
@ -64,6 +63,7 @@ import {
} from './helpers/attachmentBackfill';
import { formatCountForLogging } from '../logging/formatCountForLogging';
import { strictAssert } from '../util/assert';
import { updateBackupMediaDownloadProgress } from '../util/updateBackupMediaDownloadProgress';
const log = createLogger('AttachmentDownloadManager');
@ -286,15 +286,8 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
receivedAt,
sentAt,
size: attachment.size,
// If the attachment cannot exist on the backup tier, we don't want to store it as a
// "backup import" attachment, since it's really just a normal attachment that we'll
// try to download from the transit tier (or it's an invalid attachment, etc.). We
// may need to extend the attachment_downloads table in the future to better
// differentiate source vs. location
// TODO: DESKTOP-8879
source: hasRequiredInformationForBackup(attachment)
? source
: AttachmentDownloadSource.STANDARD,
source,
originalSource: source,
});
if (!parseResult.success) {
@ -369,12 +362,15 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
static async start(): Promise<void> {
await AttachmentDownloadManager.saveBatchedJobs();
await window.storage.put('attachmentDownloadManagerIdled', false);
await AttachmentDownloadManager.instance.start();
drop(
AttachmentDownloadManager.waitForIdle(async () => {
await updateBackupMediaDownloadProgress(
DataReader.getBackupAttachmentDownloadProgress
);
await window.storage.put('attachmentDownloadManagerIdled', true);
})
);
await AttachmentDownloadManager.instance.start();
}
static async saveBatchedJobs(): Promise<void> {
@ -475,18 +471,6 @@ async function runDownloadAttachmentJob({
};
}
// TODO: DESKTOP-8879
if (job.source === AttachmentDownloadSource.BACKUP_IMPORT) {
const currentDownloadedSize =
window.storage.get('backupMediaDownloadCompletedBytes') ?? 0;
drop(
window.storage.put(
'backupMediaDownloadCompletedBytes',
currentDownloadedSize + job.ciphertextSize
)
);
}
return {
status: 'finished',
};

View file

@ -120,11 +120,11 @@ export abstract class JobManager<CoreJobType> {
}
async waitForIdle(): Promise<void> {
if (this.#activeJobs.size === 0) {
return;
}
await new Promise<void>(resolve => this.#idleCallbacks.push(resolve));
const idledPromise = new Promise<void>(resolve =>
this.#idleCallbacks.push(resolve)
);
this.#tick();
return idledPromise;
}
#tick(): void {

View file

@ -143,6 +143,7 @@ import {
type NotificationProfileType,
} from '../../types/NotificationProfile';
import { normalizeNotificationProfileId } from '../../types/NotificationProfile-node';
import { updateBackupMediaDownloadProgress } from '../../util/updateBackupMediaDownloadProgress';
const log = createLogger('import');
@ -263,7 +264,6 @@ export class BackupImportStream extends Writable {
localBackupSnapshotDir: string | undefined = undefined
): Promise<BackupImportStream> {
await AttachmentDownloadManager.stop();
await DataWriter.removeAllBackupAttachmentDownloadJobs();
await resetBackupMediaDownloadProgress();
return new BackupImportStream(backupType, localBackupSnapshotDir);
@ -418,9 +418,8 @@ export class BackupImportStream extends Writable {
.map(([, id]) => id)
);
await window.storage.put(
'backupMediaDownloadTotalBytes',
await DataReader.getSizeOfPendingBackupAttachmentDownloadJobs()
await updateBackupMediaDownloadProgress(
DataReader.getBackupAttachmentDownloadProgress
);
if (

View file

@ -61,8 +61,13 @@ import type {
ClientOnlyReadableInterface,
ClientOnlyWritableInterface,
} from './Interface';
import { AttachmentDownloadSource } from './Interface';
import type { MessageAttributesType } from '../model-types';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
import {
throttledUpdateBackupMediaDownloadProgress,
updateBackupMediaDownloadProgress,
} from '../util/updateBackupMediaDownloadProgress';
const log = createLogger('Client');
@ -142,12 +147,21 @@ const clientOnlyWritable: ClientOnlyWritableInterface = {
type ClientOverridesType = ClientOnlyWritableInterface &
Pick<
ClientInterfaceWrap<ServerWritableDirectInterface>,
'saveAttachmentDownloadJob' | 'updateConversations'
| 'saveAttachmentDownloadJob'
| 'saveAttachmentDownloadJobs'
| 'removeAllBackupAttachmentDownloadJobs'
| 'removeAttachmentDownloadJob'
| 'removeAttachmentDownloadJobsForMessage'
| 'updateConversations'
>;
const clientOnlyWritableOverrides: ClientOverridesType = {
...clientOnlyWritable,
saveAttachmentDownloadJob,
saveAttachmentDownloadJobs,
removeAllBackupAttachmentDownloadJobs,
removeAttachmentDownloadJob,
removeAttachmentDownloadJobsForMessage,
updateConversations,
};
@ -818,7 +832,61 @@ async function removeMessagesInConversation(
async function saveAttachmentDownloadJob(
job: AttachmentDownloadJobType
): Promise<void> {
await writableChannel.saveAttachmentDownloadJob(_cleanData(job));
await writableChannel.saveAttachmentDownloadJob(job);
if (job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT) {
drop(
throttledUpdateBackupMediaDownloadProgress(
readableChannel.getBackupAttachmentDownloadProgress
)
);
}
}
async function saveAttachmentDownloadJobs(
jobs: Array<AttachmentDownloadJobType>
): Promise<void> {
await writableChannel.saveAttachmentDownloadJobs(jobs);
if (
jobs.some(
job => job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT
)
) {
drop(
throttledUpdateBackupMediaDownloadProgress(
readableChannel.getBackupAttachmentDownloadProgress
)
);
}
}
async function removeAttachmentDownloadJob(
job: AttachmentDownloadJobType
): Promise<void> {
await writableChannel.removeAttachmentDownloadJob(job);
if (job.originalSource === AttachmentDownloadSource.BACKUP_IMPORT) {
drop(
throttledUpdateBackupMediaDownloadProgress(
readableChannel.getBackupAttachmentDownloadProgress
)
);
}
}
async function removeAttachmentDownloadJobsForMessage(
messageId: string
): Promise<void> {
await writableChannel.removeAttachmentDownloadJobsForMessage(messageId);
drop(
throttledUpdateBackupMediaDownloadProgress(
readableChannel.getBackupAttachmentDownloadProgress
)
);
}
async function removeAllBackupAttachmentDownloadJobs(): Promise<void> {
await writableChannel.removeAllBackupAttachmentDownloadJobs();
await updateBackupMediaDownloadProgress(
readableChannel.getBackupAttachmentDownloadProgress
);
}
// Other

View file

@ -557,6 +557,11 @@ export type MessageCountBySchemaVersionType = Array<{
count: number;
}>;
export type BackupAttachmentDownloadProgress = {
totalBytes: number;
completedBytes: number;
};
export const MESSAGE_ATTACHMENT_COLUMNS = [
'messageId',
'conversationId',
@ -884,7 +889,7 @@ type ReadableInterface = {
getMaxMessageCounter(): number | undefined;
getStatisticsForLogging(): Record<string, string>;
getSizeOfPendingBackupAttachmentDownloadJobs(): number;
getBackupAttachmentDownloadProgress(): BackupAttachmentDownloadProgress;
getAttachmentReferencesForMessages: (
messageIds: Array<string>
) => Array<MessageAttachmentDBType>;
@ -1095,6 +1100,7 @@ type WritableInterface = {
removeAttachmentDownloadJob: (job: AttachmentDownloadJobType) => void;
removeAttachmentDownloadJobsForMessage: (messageId: string) => void;
removeAllBackupAttachmentDownloadJobs: () => void;
resetBackupAttachmentDownloadStats: () => void;
getNextAttachmentBackupJobs: (options: {
limit: number;

View file

@ -52,7 +52,11 @@ import * as Errors from '../types/errors';
import { assertDev, strictAssert } from '../util/assert';
import { combineNames } from '../util/combineNames';
import { consoleLogger } from '../util/consoleLogger';
import { dropNull, shallowConvertUndefinedToNull } from '../util/dropNull';
import {
dropNull,
shallowConvertUndefinedToNull,
type ShallowUndefinedToNull,
} from '../util/dropNull';
import { isNormalNumber } from '../util/isNormalNumber';
import { isNotNil } from '../util/isNotNil';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
@ -187,6 +191,7 @@ import type {
MessageTypeUnhydrated,
ServerMessageSearchResultType,
MessageCountBySchemaVersionType,
BackupAttachmentDownloadProgress,
} from './Interface';
import {
AttachmentDownloadSource,
@ -298,22 +303,37 @@ type StickerPackRow = InstalledStickerPackRow &
stickers: string;
title: string;
}>;
type AttachmentDownloadJobRow = Readonly<{
messageId: string;
attachmentType: string;
attachmentSignature: string;
receivedAt: number;
sentAt: number;
contentType: string;
size: number;
active: number;
attempts: number;
retryAfter: number;
lastAttemptTimestamp: number;
const ATTACHMENT_DOWNLOADS_COLUMNS: ReadonlyArray<
| keyof Omit<AttachmentDownloadJobType, 'attachment' | 'isManualDownload'>
| 'attachmentJson'
> = [
'messageId',
'attachmentType',
'attachmentSignature',
'receivedAt',
'sentAt',
'contentType',
'size',
'active',
'attempts',
'retryAfter',
'lastAttemptTimestamp',
'attachmentJson',
'ciphertextSize',
'originalSource',
'source',
] as const;
type AttachmentDownloadJobRow = Omit<
ShallowUndefinedToNull<AttachmentDownloadJobType>,
// TODO: DESKTOP-8995
'attachment' | 'contentType' | 'active' | 'isManualDownload'
> & {
attachmentJson: string;
ciphertextSize: number;
source: string;
}>;
contentType: string;
active: 0 | 1;
};
// Because we can't force this module to conform to an interface, we narrow our exports
// to this one default export, which does conform to the interface.
@ -445,7 +465,7 @@ export const DataReader: ServerReadableInterface = {
getStatisticsForLogging,
getBackupCdnObjectMetadata,
getSizeOfPendingBackupAttachmentDownloadJobs,
getBackupAttachmentDownloadProgress,
getAttachmentReferencesForMessages,
getMessageCountBySchemaVersion,
getMessageSampleForSchemaVersion,
@ -581,6 +601,7 @@ export const DataWriter: ServerWritableInterface = {
removeAttachmentDownloadJob,
removeAttachmentDownloadJobsForMessage,
removeAllBackupAttachmentDownloadJobs,
resetBackupAttachmentDownloadStats,
getNextAttachmentBackupJobs,
saveAttachmentBackupJob,
@ -5477,16 +5498,28 @@ function removeAllBackupAttachmentDownloadJobs(db: WritableDB): void {
db.prepare(query).run(params);
}
function getSizeOfPendingBackupAttachmentDownloadJobs(db: ReadableDB): number {
function resetBackupAttachmentDownloadStats(db: WritableDB): void {
const [query, params] = sql`
SELECT SUM(ciphertextSize) FROM attachment_downloads
WHERE source = ${AttachmentDownloadSource.BACKUP_IMPORT};`;
INSERT OR REPLACE INTO attachment_downloads_backup_stats
(id, totalBytes, completedBytes)
VALUES
(0,0,0);
`;
db.prepare(query).run(params);
}
function getBackupAttachmentDownloadProgress(
db: ReadableDB
): BackupAttachmentDownloadProgress {
const [query, params] = sql`
SELECT totalBytes, completedBytes FROM attachment_downloads_backup_stats
WHERE id = 0;
`;
return (
db
.prepare(query, {
pluck: true,
})
.get<number>(params) ?? 0
db.prepare(query).get<BackupAttachmentDownloadProgress>(params) ?? {
totalBytes: 0,
completedBytes: 0,
}
);
}
@ -5642,41 +5675,42 @@ function saveAttachmentDownloadJob(
logger.warn('saveAttachmentDownloadJob: message does not exist, bailing');
return;
}
const jobToInsert: AttachmentDownloadJobRow = {
messageId: job.messageId,
attachmentType: job.attachmentType,
attachmentSignature: job.attachmentSignature,
receivedAt: job.receivedAt,
sentAt: job.sentAt,
contentType: job.contentType,
size: job.size,
active: job.active ? 1 : 0,
attempts: job.attempts,
retryAfter: job.retryAfter,
lastAttemptTimestamp: job.lastAttemptTimestamp,
attachmentJson: objectToJSON(job.attachment),
ciphertextSize: job.ciphertextSize,
originalSource: job.originalSource,
source: job.source,
} as const satisfies Record<
(typeof ATTACHMENT_DOWNLOADS_COLUMNS)[number],
unknown
>;
const [insertQuery, insertParams] = sql`
INSERT OR REPLACE INTO attachment_downloads (
messageId,
attachmentType,
attachmentSignature,
receivedAt,
sentAt,
contentType,
size,
active,
attempts,
retryAfter,
lastAttemptTimestamp,
attachmentJson,
ciphertextSize,
source
) VALUES (
${job.messageId},
${job.attachmentType},
${job.attachmentSignature},
${job.receivedAt},
${job.sentAt},
${job.contentType},
${job.size},
${job.active ? 1 : 0},
${job.attempts},
${job.retryAfter},
${job.lastAttemptTimestamp},
${objectToJSON(job.attachment)},
${job.ciphertextSize},
${job.source}
);
`;
db.prepare(insertQuery).run(insertParams);
db.prepare(
`
INSERT INTO attachment_downloads
(${ATTACHMENT_DOWNLOADS_COLUMNS.join(', ')})
VALUES
(${ATTACHMENT_DOWNLOADS_COLUMNS.map(name => `$${name}`).join(', ')})
ON CONFLICT DO UPDATE SET
-- preserve originalSource
${ATTACHMENT_DOWNLOADS_COLUMNS.filter(
name => name !== 'originalSource'
)
.map(name => `${name} = $${name}`)
.join(', ')}
`
).run(jobToInsert);
})();
}
@ -7533,6 +7567,7 @@ function removeAll(db: WritableDB): void {
DELETE FROM attachment_downloads;
DELETE FROM attachment_backup_jobs;
DELETE FROM attachment_downloads_backup_stats;
DELETE FROM backup_cdn_object_metadata;
DELETE FROM badgeImageFiles;
DELETE FROM badges;
@ -7549,11 +7584,13 @@ function removeAll(db: WritableDB): void {
DELETE FROM items;
DELETE FROM jobs;
DELETE FROM kyberPreKeys;
DELETE FROM message_attachments;
DELETE FROM messages_fts;
DELETE FROM messages;
DELETE FROM notificationProfiles;
DELETE FROM preKeys;
DELETE FROM reactions;
DELETE FROM recentGifs;
DELETE FROM senderKeys;
DELETE FROM sendLogMessageIds;
DELETE FROM sendLogPayloads;
@ -7569,10 +7606,10 @@ function removeAll(db: WritableDB): void {
DELETE FROM syncTasks;
DELETE FROM unprocessed;
DELETE FROM uninstalled_sticker_packs;
DELETE FROM message_attachments;
INSERT INTO messages_fts(messages_fts) VALUES('optimize');
--- Re-create the messages delete trigger
--- See migration 45
CREATE TRIGGER messages_on_delete AFTER DELETE ON messages BEGIN
@ -7588,6 +7625,8 @@ function removeAll(db: WritableDB): void {
DELETE FROM storyReads WHERE storyId = old.storyId;
END;
`);
resetBackupAttachmentDownloadStats(db);
})();
}

View file

@ -53,7 +53,7 @@ const attachmentDownloadJobSchemaV1040 = z
.and(jobManagerJobSchema);
export type _AttachmentDownloadJobTypeV1040 = Omit<
AttachmentDownloadJobType,
'attachmentSignature'
'attachmentSignature' | 'originalSource'
> & { digest: string };
export function updateToSchemaVersion1040(

View file

@ -0,0 +1,70 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { LoggerType } from '../../types/Logging';
import { AttachmentDownloadSource, type WritableDB } from '../Interface';
export const version = 1420;
export function updateToSchemaVersion1420(
currentVersion: number,
db: WritableDB,
logger: LoggerType
): void {
if (currentVersion >= 1420) {
return;
}
db.transaction(() => {
db.exec(`
ALTER TABLE attachment_downloads
ADD COLUMN originalSource TEXT NOT NULL DEFAULT ${AttachmentDownloadSource.STANDARD};
UPDATE attachment_downloads
SET originalSource = source;
`);
db.exec(`
CREATE TABLE attachment_downloads_backup_stats (
id INTEGER PRIMARY KEY CHECK (id = 0),
totalBytes INTEGER NOT NULL,
completedBytes INTEGER NOT NULL
) STRICT;
INSERT INTO attachment_downloads_backup_stats
(id, totalBytes, completedBytes)
VALUES
(0, 0, 0);
CREATE TRIGGER attachment_downloads_backup_job_insert
AFTER INSERT ON attachment_downloads
WHEN NEW.originalSource = 'backup_import'
BEGIN
UPDATE attachment_downloads_backup_stats SET
totalBytes = totalBytes + NEW.ciphertextSize;
END;
CREATE TRIGGER attachment_downloads_backup_job_update
AFTER UPDATE OF ciphertextSize ON attachment_downloads
WHEN NEW.originalSource = 'backup_import'
BEGIN
UPDATE attachment_downloads_backup_stats SET
totalBytes = MAX(0, totalBytes - OLD.ciphertextSize + NEW.ciphertextSize)
WHERE id = 0;
END;
CREATE TRIGGER attachment_downloads_backup_job_delete
AFTER DELETE ON attachment_downloads
WHEN OLD.originalSource = 'backup_import'
BEGIN
UPDATE attachment_downloads_backup_stats SET
completedBytes = completedBytes + OLD.ciphertextSize
WHERE id = 0;
END;
`);
db.pragma('user_version = 1420');
})();
logger.info('updateToSchemaVersion1420: success!');
}

View file

@ -116,10 +116,11 @@ import { updateToSchemaVersion1370 } from './1370-message-attachment-indexes';
import { updateToSchemaVersion1380 } from './1380-donation-receipts';
import { updateToSchemaVersion1390 } from './1390-attachment-download-keys';
import { updateToSchemaVersion1400 } from './1400-simplify-receipts';
import { updateToSchemaVersion1410 } from './1410-remove-wallpaper';
import {
updateToSchemaVersion1410,
updateToSchemaVersion1420,
version as MAX_VERSION,
} from './1410-remove-wallpaper';
} from './1420-backup-downloads';
import { DataWriter } from '../Server';
@ -2115,6 +2116,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion1400,
updateToSchemaVersion1410,
updateToSchemaVersion1420,
];
export class DBVersionFromFutureError extends Error {

View file

@ -59,6 +59,7 @@ function composeJob({
attempts: 0,
retryAfter: null,
lastAttemptTimestamp: null,
originalSource: jobOverrides?.source ?? AttachmentDownloadSource.STANDARD,
source: AttachmentDownloadSource.STANDARD,
attachment: {
contentType,

View file

@ -0,0 +1,205 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { DataReader, DataWriter } from '../../sql/Client';
import { createAttachmentDownloadJob } from '../../test-helpers/attachmentDownloads';
import type { MessageAttributesType } from '../../model-types';
import { generateAci } from '../../types/ServiceId';
import { AttachmentDownloadSource } from '../../sql/Interface';
import { cleanupMessages } from '../../util/cleanup';
describe('sql/AttachmentDownloadBackupStats', () => {
beforeEach(async () => {
await DataWriter.removeAll();
window.ConversationController.reset();
await window.ConversationController.load();
});
afterEach(async () => {
await DataWriter.removeAll();
});
it('updates backup stats when adding, updating, and removing a job', async () => {
await DataWriter.saveMessage(
{
id: 'message0',
received_at: 1,
conversationId: 'id',
} as MessageAttributesType,
{
forceSave: true,
ourAci: generateAci(),
postSaveUpdates: () => Promise.resolve(),
}
);
const standardJob = createAttachmentDownloadJob(0, {
originalSource: AttachmentDownloadSource.STANDARD,
source: AttachmentDownloadSource.STANDARD,
ciphertextSize: 128,
});
await DataWriter.saveAttachmentDownloadJob(standardJob);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 0, completedBytes: 0 }
);
const backupJob1 = createAttachmentDownloadJob(1, {
messageId: 'message0',
ciphertextSize: 1000,
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
source: AttachmentDownloadSource.BACKUP_IMPORT,
});
await DataWriter.saveAttachmentDownloadJob(backupJob1);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 1000, completedBytes: 0 }
);
const backupJob2 = createAttachmentDownloadJob(2, {
messageId: 'message0',
ciphertextSize: 2000,
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
source: AttachmentDownloadSource.BACKUP_IMPORT,
});
await DataWriter.saveAttachmentDownloadJob(backupJob2);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 3000, completedBytes: 0 }
);
const backupJob2NowStandard = {
...backupJob2,
originalSource: AttachmentDownloadSource.STANDARD,
source: AttachmentDownloadSource.STANDARD,
};
// Updating the job source has no effect
await DataWriter.saveAttachmentDownloadJob(backupJob2NowStandard);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 3000, completedBytes: 0 }
);
// Updating the job size updates stats
await DataWriter.saveAttachmentDownloadJob({
...backupJob2NowStandard,
ciphertextSize: 3000,
});
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 4000, completedBytes: 0 }
);
// Deleting a job updates queued & completed, based on original source
await DataWriter.removeAttachmentDownloadJob(backupJob2);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 4000, completedBytes: 3000 }
);
// Deleting a standard job has no effect on backup stats
await DataWriter.removeAttachmentDownloadJob(standardJob);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 4000, completedBytes: 3000 }
);
// Deleting a job updates queued & completed
await DataWriter.removeAttachmentDownloadJob(backupJob1);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 4000, completedBytes: 4000 }
);
});
it('updates backup stats when deleting a message', async () => {
await DataWriter.saveMessage(
{
id: 'message0',
received_at: 1,
conversationId: 'id',
} as MessageAttributesType,
{
forceSave: true,
ourAci: generateAci(),
postSaveUpdates: () => Promise.resolve(),
}
);
const backupJob = createAttachmentDownloadJob(0, {
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
source: AttachmentDownloadSource.BACKUP_IMPORT,
ciphertextSize: 128,
});
await DataWriter.saveAttachmentDownloadJob(backupJob);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 128, completedBytes: 0 }
);
await DataWriter.removeMessage('message0', {
cleanupMessages,
});
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 128, completedBytes: 128 }
);
await DataWriter.resetBackupAttachmentDownloadStats();
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 0, completedBytes: 0 }
);
});
it('the original source of the job is retained', async () => {
await DataWriter.saveMessage(
{
id: 'message0',
received_at: 1,
conversationId: 'id',
} as MessageAttributesType,
{
forceSave: true,
ourAci: generateAci(),
postSaveUpdates: () => Promise.resolve(),
}
);
const backupJob = createAttachmentDownloadJob(0, {
originalSource: AttachmentDownloadSource.BACKUP_IMPORT,
source: AttachmentDownloadSource.BACKUP_IMPORT,
ciphertextSize: 128,
});
await DataWriter.saveAttachmentDownloadJob(backupJob);
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 128, completedBytes: 0 }
);
const backupJobNowStandard = {
...backupJob,
originalSource: AttachmentDownloadSource.STANDARD,
source: AttachmentDownloadSource.STANDARD,
};
await DataWriter.saveAttachmentDownloadJob(backupJobNowStandard);
const savedJob =
await DataReader._getAttachmentDownloadJob(backupJobNowStandard);
assert.strictEqual(
savedJob?.originalSource,
AttachmentDownloadSource.BACKUP_IMPORT
);
assert.strictEqual(savedJob?.source, AttachmentDownloadSource.STANDARD);
await DataWriter.removeMessage('message0', {
cleanupMessages,
});
assert.deepStrictEqual(
await DataReader.getBackupAttachmentDownloadProgress(),
{ totalBytes: 128, completedBytes: 128 }
);
});
});

View file

@ -0,0 +1,34 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { AttachmentDownloadSource } from '../sql/Interface';
import { IMAGE_JPEG, IMAGE_PNG } from '../types/MIME';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
export function createAttachmentDownloadJob(
index: number,
overrides?: Partial<AttachmentDownloadJobType>
): AttachmentDownloadJobType {
return {
messageId: `message${index}`,
attachmentType: 'attachment',
attachment: {
digest: `digest${index}`,
contentType: IMAGE_JPEG,
size: 128,
},
attachmentSignature: `digest${index}`,
receivedAt: 100 + index,
ciphertextSize: 1000 + index,
size: 900 + index,
contentType: IMAGE_PNG,
sentAt: 100 + index,
attempts: index,
active: index % 2 === 0,
retryAfter: Date.now() + index,
lastAttemptTimestamp: Date.now() + index,
originalSource: AttachmentDownloadSource.STANDARD,
source: AttachmentDownloadSource.STANDARD,
...overrides,
};
}

View file

@ -0,0 +1,113 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { AttachmentDownloadSource, type WritableDB } from '../../sql/Interface';
import { objectToJSON, sql } from '../../sql/util';
import { createDB, updateToVersion } from './helpers';
import type { AttachmentDownloadJobType } from '../../types/AttachmentDownload';
import { createAttachmentDownloadJob } from '../../test-helpers/attachmentDownloads';
function createJobAndEnsureMessage(
db: WritableDB,
index: number,
overrides?: Partial<AttachmentDownloadJobType>
) {
const job = createAttachmentDownloadJob(index, overrides);
try {
db.prepare('INSERT INTO messages (id) VALUES ($id)').run({
id: job.messageId,
});
} catch (e) {
// pass; message has already been inserted
}
return job;
}
function insertLegacyJob(
db: WritableDB,
index: number,
overrides?: Partial<AttachmentDownloadJobType>
): void {
const job = createJobAndEnsureMessage(db, index, overrides);
const [query, params] = sql`
INSERT INTO attachment_downloads
(
messageId,
attachmentType,
attachmentJson,
attachmentSignature,
ciphertextSize,
contentType,
size,
receivedAt,
sentAt,
active,
attempts,
retryAfter,
lastAttemptTimestamp,
source
)
VALUES
(
${job.messageId},
${job.attachmentType},
${objectToJSON(job.attachment)},
${job.attachmentSignature},
${job.ciphertextSize},
${job.attachment.contentType},
${job.attachment.size},
${job.receivedAt},
${job.sentAt},
${job.active ? 1 : 0},
${job.attempts},
${job.retryAfter},
${job.lastAttemptTimestamp},
${job.source}
);
`;
db.prepare(query).run(params);
}
describe('SQL/updateToSchemaVersion1410', () => {
let db: WritableDB;
afterEach(() => {
db.close();
});
it('copies source to originalSource', () => {
db = createDB();
updateToVersion(db, 1410);
db.transaction(() => {
for (let i = 0; i < 15; i += 1) {
insertLegacyJob(db, i, {
source:
i < 5
? AttachmentDownloadSource.STANDARD
: AttachmentDownloadSource.BACKUP_IMPORT,
});
}
})();
updateToVersion(db, 1420);
const numOriginalSourceStandardJobs = db
.prepare(
"SELECT COUNT(*) FROM attachment_downloads WHERE originalSource = 'standard'",
{ pluck: true }
)
.get();
assert.strictEqual(numOriginalSourceStandardJobs, 5);
const numOriginalSourceBackupJobs = db
.prepare(
"SELECT COUNT(*) FROM attachment_downloads WHERE originalSource = 'backup_import'",
{ pluck: true }
)
.get();
assert.strictEqual(numOriginalSourceBackupJobs, 10);
});
});

View file

@ -35,6 +35,7 @@ export type CoreAttachmentDownloadJobType = {
attachmentSignature: string;
isManualDownload?: boolean;
messageId: string;
originalSource: AttachmentDownloadSource;
receivedAt: number;
sentAt: number;
size: number;
@ -55,6 +56,7 @@ export const coreAttachmentDownloadJobSchema = z.object({
isManualDownload: z.boolean().optional(),
messageId: z.string(),
messageIdForLogging: z.string().optional(),
originalSource: z.nativeEnum(AttachmentDownloadSource),
receivedAt: z.number(),
sentAt: z.number(),
size: z.number(),

View file

@ -2,8 +2,11 @@
// SPDX-License-Identifier: AGPL-3.0-only
import { AttachmentDownloadManager } from '../jobs/AttachmentDownloadManager';
import { createLogger } from '../logging/log';
import { DataWriter } from '../sql/Client';
const log = createLogger('backupMediaDownload');
export async function startBackupMediaDownload(): Promise<void> {
await window.storage.put('backupMediaDownloadPaused', false);
@ -11,10 +14,12 @@ export async function startBackupMediaDownload(): Promise<void> {
}
export async function pauseBackupMediaDownload(): Promise<void> {
log.info('Pausing media download');
await window.storage.put('backupMediaDownloadPaused', true);
}
export async function resumeBackupMediaDownload(): Promise<void> {
log.info('Resuming media download');
return startBackupMediaDownload();
}
@ -28,11 +33,16 @@ export async function resetBackupMediaDownloadItems(): Promise<void> {
}
export async function cancelBackupMediaDownload(): Promise<void> {
log.info('Canceling media download');
await window.storage.put('backupMediaDownloadBannerDismissed', true);
await DataWriter.removeAllBackupAttachmentDownloadJobs();
await DataWriter.resetBackupAttachmentDownloadStats();
await resetBackupMediaDownloadItems();
}
export async function resetBackupMediaDownloadProgress(): Promise<void> {
await DataWriter.removeAllBackupAttachmentDownloadJobs();
await DataWriter.resetBackupAttachmentDownloadStats();
await resetBackupMediaDownloadItems();
}

View file

@ -27,6 +27,7 @@ import { drop } from './drop';
import { hydrateStoryContext } from './hydrateStoryContext';
import { update as updateExpiringMessagesService } from '../services/expiringMessagesDeletion';
import { tapToViewMessagesDeletionService } from '../services/tapToViewMessagesDeletionService';
import { throttledUpdateBackupMediaDownloadProgress } from './updateBackupMediaDownloadProgress';
const log = createLogger('cleanup');
@ -115,6 +116,12 @@ export async function cleanupMessages(
)
);
await unloadedQueue.onIdle();
drop(
throttledUpdateBackupMediaDownloadProgress(
DataReader.getBackupAttachmentDownloadProgress
)
);
}
/** Removes a message from redux caches & backbone, but does NOT delete files on disk,

View file

@ -7,6 +7,10 @@ export type NullToUndefined<T> =
export type UndefinedToNull<T> =
Extract<T, undefined> extends never ? T : Exclude<T, undefined> | null;
export type ShallowUndefinedToNull<T extends { [key: string]: unknown }> = {
[P in keyof T]: UndefinedToNull<T[P]>;
};
export function dropNull<T>(
value: NonNullable<T> | null | undefined
): T | undefined {
@ -49,7 +53,7 @@ export function convertUndefinedToNull<T>(value: T | undefined): T | null {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function shallowConvertUndefinedToNull<T extends { [key: string]: any }>(
obj: T
): { [P in keyof T]: UndefinedToNull<T[P]> } {
): ShallowUndefinedToNull<T> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result: any = {};

View file

@ -0,0 +1,22 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { throttle } from 'lodash';
import type { BackupAttachmentDownloadProgress } from '../sql/Interface';
export async function updateBackupMediaDownloadProgress(
getBackupAttachmentDownloadProgress: () => Promise<BackupAttachmentDownloadProgress>
): Promise<void> {
const { totalBytes, completedBytes } =
await getBackupAttachmentDownloadProgress();
await Promise.all([
window.storage.put('backupMediaDownloadCompletedBytes', completedBytes),
window.storage.put('backupMediaDownloadTotalBytes', totalBytes),
]);
}
export const throttledUpdateBackupMediaDownloadProgress = throttle(
updateBackupMediaDownloadProgress,
200
);