From 86026bd66ad54830e359628ea9edacf968870688 Mon Sep 17 00:00:00 2001 From: trevor-signal <131492920+trevor-signal@users.noreply.github.com> Date: Mon, 28 Oct 2024 18:25:15 -0400 Subject: [PATCH] Batch attachment download jobs --- ts/jobs/AttachmentDownloadManager.ts | 23 ++++++++++++++++++- ts/jobs/JobManager.ts | 8 +++++-- ts/services/backups/import.ts | 16 +++++++++---- ts/sql/Interface.ts | 1 + ts/sql/Server.ts | 12 ++++++++++ .../AttachmentDownloadManager_test.ts | 1 + 6 files changed, 54 insertions(+), 7 deletions(-) diff --git a/ts/jobs/AttachmentDownloadManager.ts b/ts/jobs/AttachmentDownloadManager.ts index 09c0c839028f..23ee4e6b30d3 100644 --- a/ts/jobs/AttachmentDownloadManager.ts +++ b/ts/jobs/AttachmentDownloadManager.ts @@ -47,6 +47,7 @@ import { AttachmentDownloadSource } from '../sql/Interface'; import { drop } from '../util/drop'; import { getAttachmentCiphertextLength } from '../AttachmentCrypto'; import { safeParsePartial } from '../util/schemas'; +import { createBatcher } from '../util/batcher'; export enum AttachmentDownloadUrgency { IMMEDIATE = 'immediate', @@ -110,12 +111,27 @@ function getJobIdForLogging(job: CoreAttachmentDownloadJobType): string { export class AttachmentDownloadManager extends JobManager { private visibleTimelineMessages: Set = new Set(); + private saveJobsBatcher = createBatcher({ + name: 'saveAttachmentDownloadJobs', + wait: 150, + maxSize: 1000, + processBatch: async jobs => { + await DataWriter.saveAttachmentDownloadJobs(jobs); + drop(this.maybeStartJobs()); + }, + }); private static _instance: AttachmentDownloadManager | undefined; override logPrefix = 'AttachmentDownloadManager'; static defaultParams: AttachmentDownloadManagerParamsType = { markAllJobsInactive: DataWriter.resetAttachmentDownloadActive, - saveJob: DataWriter.saveAttachmentDownloadJob, + saveJob: async (job, options) => { + if (options?.allowBatching) { + AttachmentDownloadManager._instance?.saveJobsBatcher.add(job); + } else { + await DataWriter.saveAttachmentDownloadJob(job); + } + }, removeJob: DataWriter.removeAttachmentDownloadJob, getNextJobs: DataWriter.getNextAttachmentDownloadJobs, runDownloadAttachmentJob, @@ -227,9 +243,14 @@ export class AttachmentDownloadManager extends JobManager { + await AttachmentDownloadManager.saveBatchedJobs(); await AttachmentDownloadManager.instance.start(); } + static async saveBatchedJobs(): Promise { + await AttachmentDownloadManager.instance.saveJobsBatcher.flushAndWait(); + } + static async stop(): Promise { return AttachmentDownloadManager._instance?.stop(); } diff --git a/ts/jobs/JobManager.ts b/ts/jobs/JobManager.ts index c57b8113aab4..c6df6017353d 100644 --- a/ts/jobs/JobManager.ts +++ b/ts/jobs/JobManager.ts @@ -39,7 +39,10 @@ export type JobManagerParamsType< limit: number; timestamp: number; }) => Promise>; - saveJob: (job: JobType) => Promise; + saveJob: ( + job: JobType, + options?: { allowBatching?: boolean } + ) => Promise; removeJob: (job: JobType) => Promise; runJob: ( job: JobType, @@ -190,7 +193,8 @@ export abstract class JobManager { return { isAlreadyRunning: true }; } - await this.params.saveJob(job); + // Allow batching of all saves except those that we will start immediately + await this.params.saveJob(job, { allowBatching: !options?.forceStart }); if (options?.forceStart) { if (!this.enabled) { diff --git a/ts/services/backups/import.ts b/ts/services/backups/import.ts index ed00eac97be1..6e8c600ba4fa 100644 --- a/ts/services/backups/import.ts +++ b/ts/services/backups/import.ts @@ -110,6 +110,7 @@ import { loadAllAndReinitializeRedux } from '../allLoaders'; import { resetBackupMediaDownloadProgress } from '../../util/backupMediaDownload'; import { getEnvironment, isTestEnvironment } from '../../environment'; import { drop } from '../../util/drop'; +import { hasAttachmentDownloads } from '../../util/hasAttachmentDownloads'; const MAX_CONCURRENCY = 10; @@ -519,6 +520,8 @@ export class BackupImportStream extends Writable { ourAci, }); + const attachmentDownloadJobPromises: Array> = []; + // TODO (DESKTOP-7402): consider re-saving after updating the pending state for (const attributes of batch) { const { editHistory } = attributes; @@ -539,11 +542,16 @@ export class BackupImportStream extends Writable { ); } - // eslint-disable-next-line no-await-in-loop - await queueAttachmentDownloads(attributes, { - source: AttachmentDownloadSource.BACKUP_IMPORT, - }); + if (hasAttachmentDownloads(attributes)) { + attachmentDownloadJobPromises.push( + queueAttachmentDownloads(attributes, { + source: AttachmentDownloadSource.BACKUP_IMPORT, + }) + ); + } } + await Promise.all(attachmentDownloadJobPromises); + await AttachmentDownloadManager.saveBatchedJobs(); } private async saveCallHistory( diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index d3f3726d0480..2e7e1760d7b2 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -863,6 +863,7 @@ type WritableInterface = { timestamp?: number; }) => Array; saveAttachmentDownloadJob: (job: AttachmentDownloadJobType) => void; + saveAttachmentDownloadJobs: (jobs: Array) => void; resetAttachmentDownloadActive: () => void; removeAttachmentDownloadJob: (job: AttachmentDownloadJobType) => void; removeAllBackupAttachmentDownloadJobs: () => void; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index ad540c1fa747..8e7e4c07ea69 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -486,6 +486,7 @@ export const DataWriter: ServerWritableInterface = { getNextAttachmentDownloadJobs, saveAttachmentDownloadJob, + saveAttachmentDownloadJobs, resetAttachmentDownloadActive, removeAttachmentDownloadJob, removeAllBackupAttachmentDownloadJobs, @@ -5000,6 +5001,17 @@ function getNextAttachmentDownloadJobs( } } +function saveAttachmentDownloadJobs( + db: WritableDB, + jobs: Array +): void { + db.transaction(() => { + for (const job of jobs) { + saveAttachmentDownloadJob(db, job); + } + })(); +} + function saveAttachmentDownloadJob( db: WritableDB, job: AttachmentDownloadJobType diff --git a/ts/test-electron/services/AttachmentDownloadManager_test.ts b/ts/test-electron/services/AttachmentDownloadManager_test.ts index 3d5fd41a1eda..5f3fa41ef4bc 100644 --- a/ts/test-electron/services/AttachmentDownloadManager_test.ts +++ b/ts/test-electron/services/AttachmentDownloadManager_test.ts @@ -83,6 +83,7 @@ describe('AttachmentDownloadManager/JobManager', () => { downloadManager = new AttachmentDownloadManager({ ...AttachmentDownloadManager.defaultParams, + saveJob: DataWriter.saveAttachmentDownloadJob, shouldHoldOffOnStartingQueuedJobs: isInCall, runDownloadAttachmentJob: runJob, getRetryConfig: () => ({