Batch attachment download jobs

This commit is contained in:
trevor-signal 2024-10-28 18:25:15 -04:00 committed by GitHub
parent 1b8be6a3d1
commit 86026bd66a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 54 additions and 7 deletions

View file

@ -47,6 +47,7 @@ import { AttachmentDownloadSource } from '../sql/Interface';
import { drop } from '../util/drop';
import { getAttachmentCiphertextLength } from '../AttachmentCrypto';
import { safeParsePartial } from '../util/schemas';
import { createBatcher } from '../util/batcher';
export enum AttachmentDownloadUrgency {
IMMEDIATE = 'immediate',
@ -110,12 +111,27 @@ function getJobIdForLogging(job: CoreAttachmentDownloadJobType): string {
export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownloadJobType> {
private visibleTimelineMessages: Set<string> = new Set();
private saveJobsBatcher = createBatcher<AttachmentDownloadJobType>({
name: 'saveAttachmentDownloadJobs',
wait: 150,
maxSize: 1000,
processBatch: async jobs => {
await DataWriter.saveAttachmentDownloadJobs(jobs);
drop(this.maybeStartJobs());
},
});
private static _instance: AttachmentDownloadManager | undefined;
override logPrefix = 'AttachmentDownloadManager';
static defaultParams: AttachmentDownloadManagerParamsType = {
markAllJobsInactive: DataWriter.resetAttachmentDownloadActive,
saveJob: DataWriter.saveAttachmentDownloadJob,
saveJob: async (job, options) => {
if (options?.allowBatching) {
AttachmentDownloadManager._instance?.saveJobsBatcher.add(job);
} else {
await DataWriter.saveAttachmentDownloadJob(job);
}
},
removeJob: DataWriter.removeAttachmentDownloadJob,
getNextJobs: DataWriter.getNextAttachmentDownloadJobs,
runDownloadAttachmentJob,
@ -227,9 +243,14 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
}
static async start(): Promise<void> {
await AttachmentDownloadManager.saveBatchedJobs();
await AttachmentDownloadManager.instance.start();
}
static async saveBatchedJobs(): Promise<void> {
await AttachmentDownloadManager.instance.saveJobsBatcher.flushAndWait();
}
static async stop(): Promise<void> {
return AttachmentDownloadManager._instance?.stop();
}

View file

@ -39,7 +39,10 @@ export type JobManagerParamsType<
limit: number;
timestamp: number;
}) => Promise<Array<JobType>>;
saveJob: (job: JobType) => Promise<void>;
saveJob: (
job: JobType,
options?: { allowBatching?: boolean }
) => Promise<void>;
removeJob: (job: JobType) => Promise<void>;
runJob: (
job: JobType,
@ -190,7 +193,8 @@ export abstract class JobManager<CoreJobType> {
return { isAlreadyRunning: true };
}
await this.params.saveJob(job);
// Allow batching of all saves except those that we will start immediately
await this.params.saveJob(job, { allowBatching: !options?.forceStart });
if (options?.forceStart) {
if (!this.enabled) {

View file

@ -110,6 +110,7 @@ import { loadAllAndReinitializeRedux } from '../allLoaders';
import { resetBackupMediaDownloadProgress } from '../../util/backupMediaDownload';
import { getEnvironment, isTestEnvironment } from '../../environment';
import { drop } from '../../util/drop';
import { hasAttachmentDownloads } from '../../util/hasAttachmentDownloads';
const MAX_CONCURRENCY = 10;
@ -519,6 +520,8 @@ export class BackupImportStream extends Writable {
ourAci,
});
const attachmentDownloadJobPromises: Array<Promise<unknown>> = [];
// TODO (DESKTOP-7402): consider re-saving after updating the pending state
for (const attributes of batch) {
const { editHistory } = attributes;
@ -539,11 +542,16 @@ export class BackupImportStream extends Writable {
);
}
// eslint-disable-next-line no-await-in-loop
await queueAttachmentDownloads(attributes, {
source: AttachmentDownloadSource.BACKUP_IMPORT,
});
if (hasAttachmentDownloads(attributes)) {
attachmentDownloadJobPromises.push(
queueAttachmentDownloads(attributes, {
source: AttachmentDownloadSource.BACKUP_IMPORT,
})
);
}
}
await Promise.all(attachmentDownloadJobPromises);
await AttachmentDownloadManager.saveBatchedJobs();
}
private async saveCallHistory(

View file

@ -863,6 +863,7 @@ type WritableInterface = {
timestamp?: number;
}) => Array<AttachmentDownloadJobType>;
saveAttachmentDownloadJob: (job: AttachmentDownloadJobType) => void;
saveAttachmentDownloadJobs: (jobs: Array<AttachmentDownloadJobType>) => void;
resetAttachmentDownloadActive: () => void;
removeAttachmentDownloadJob: (job: AttachmentDownloadJobType) => void;
removeAllBackupAttachmentDownloadJobs: () => void;

View file

@ -486,6 +486,7 @@ export const DataWriter: ServerWritableInterface = {
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,
saveAttachmentDownloadJobs,
resetAttachmentDownloadActive,
removeAttachmentDownloadJob,
removeAllBackupAttachmentDownloadJobs,
@ -5000,6 +5001,17 @@ function getNextAttachmentDownloadJobs(
}
}
function saveAttachmentDownloadJobs(
db: WritableDB,
jobs: Array<AttachmentDownloadJobType>
): void {
db.transaction(() => {
for (const job of jobs) {
saveAttachmentDownloadJob(db, job);
}
})();
}
function saveAttachmentDownloadJob(
db: WritableDB,
job: AttachmentDownloadJobType

View file

@ -83,6 +83,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
downloadManager = new AttachmentDownloadManager({
...AttachmentDownloadManager.defaultParams,
saveJob: DataWriter.saveAttachmentDownloadJob,
shouldHoldOffOnStartingQueuedJobs: isInCall,
runDownloadAttachmentJob: runJob,
getRetryConfig: () => ({