From 7602c78bd745ae7aeaa15f062a1d10a77f71d0fb Mon Sep 17 00:00:00 2001 From: automated-signal <37887102+automated-signal@users.noreply.github.com> Date: Thu, 1 Aug 2024 19:05:30 -0500 Subject: [PATCH] Respect 429 rate-limiting during attachment backup Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com> --- ts/jobs/AttachmentBackupManager.ts | 12 ++++++ ts/jobs/JobManager.ts | 31 ++++++++++---- .../services/AttachmentBackupManager_test.ts | 42 +++++++++++++++++++ 3 files changed, 78 insertions(+), 7 deletions(-) diff --git a/ts/jobs/AttachmentBackupManager.ts b/ts/jobs/AttachmentBackupManager.ts index 8774c6814c..2473598b5a 100644 --- a/ts/jobs/AttachmentBackupManager.ts +++ b/ts/jobs/AttachmentBackupManager.ts @@ -60,6 +60,7 @@ import { isVideoTypeSupported, } from '../util/GoogleChrome'; import { getLocalAttachmentUrl } from '../util/getLocalAttachmentUrl'; +import { findRetryAfterTimeFromError } from './helpers/findRetryAfterTimeFromError'; const MAX_CONCURRENT_JOBS = 3; const RETRY_CONFIG = { @@ -215,6 +216,17 @@ export async function runAttachmentBackupJob( return { status: 'finished' }; } + if ( + error instanceof Error && + 'code' in error && + (error.code === 413 || error.code === 429) + ) { + return { + status: 'rate-limited', + pauseDurationMs: findRetryAfterTimeFromError(error), + }; + } + return { status: 'retry' }; } } diff --git a/ts/jobs/JobManager.ts b/ts/jobs/JobManager.ts index cb417cd22f..32364d5b9f 100644 --- a/ts/jobs/JobManager.ts +++ b/ts/jobs/JobManager.ts @@ -60,23 +60,24 @@ export type JobManagerJobResultType = | { status: 'retry'; } - | { status: 'finished'; newJob?: CoreJobType }; + | { status: 'finished'; newJob?: CoreJobType } + | { status: 'rate-limited'; pauseDurationMs: number }; export abstract class JobManager { - protected enabled: boolean = false; - protected activeJobs: Map< + private enabled: boolean = false; + private activeJobs: Map< string, { completionPromise: ExplodePromiseResultType; job: CoreJobType & JobManagerJobType; } > = new Map(); - protected jobStartPromises: Map> = + private jobStartPromises: Map> = new Map(); - protected jobCompletePromises: Map> = + private jobCompletePromises: Map> = new Map(); + private tickTimeout: NodeJS.Timeout | null = null; - protected tickTimeout: NodeJS.Timeout | null = null; protected logPrefix = 'JobManager'; public tickInterval = DEFAULT_TICK_INTERVAL; constructor(readonly params: JobManagerParamsType) {} @@ -98,13 +99,22 @@ export abstract class JobManager { ); } - tick(): void { + private tick(): void { clearTimeoutIfNecessary(this.tickTimeout); this.tickTimeout = null; drop(this.maybeStartJobs()); this.tickTimeout = setTimeout(() => this.tick(), this.tickInterval); } + private pauseForDuration(durationMs: number): void { + this.enabled = false; + clearTimeoutIfNecessary(this.tickTimeout); + this.tickTimeout = setTimeout(() => { + this.enabled = true; + this.tick(); + }, durationMs); + } + // used in testing waitForJobToBeStarted( job: CoreJobType & Pick @@ -270,6 +280,13 @@ export abstract class JobManager { } await this.retryJobLater(job); return; + case 'rate-limited': + log.info( + `${logId}: rate-limited; retrying in ${jobRunResult.pauseDurationMs}` + ); + this.pauseForDuration(jobRunResult.pauseDurationMs); + await this.retryJobLater(job); + return; default: throw missingCaseError(status); } diff --git a/ts/test-electron/services/AttachmentBackupManager_test.ts b/ts/test-electron/services/AttachmentBackupManager_test.ts index c1924430fd..21040334f7 100644 --- a/ts/test-electron/services/AttachmentBackupManager_test.ts +++ b/ts/test-electron/services/AttachmentBackupManager_test.ts @@ -25,6 +25,7 @@ import { APPLICATION_OCTET_STREAM, VIDEO_MP4 } from '../../types/MIME'; import { createName, getRelativePath } from '../../util/attachmentPath'; import { encryptAttachmentV2, generateKeys } from '../../AttachmentCrypto'; import { SECOND } from '../../util/durations'; +import { HTTPError } from '../../textsecure/Errors'; const TRANSIT_CDN = 2; const TRANSIT_CDN_FOR_NEW_UPLOAD = 42; @@ -42,6 +43,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager( let backupsService = {}; let encryptAndUploadAttachment: sinon.SinonStub; let sandbox: sinon.SinonSandbox; + let clock: sinon.SinonFakeTimers; let isInCall: sinon.SinonStub; function composeJob( @@ -116,6 +118,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager( await window.storage.put('masterKey', Bytes.toBase64(getRandomBytes(32))); sandbox = sinon.createSandbox(); + clock = sandbox.useFakeTimers(); isInCall = sandbox.stub().returns(false); backupMediaBatch = sandbox @@ -331,6 +334,45 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager( assert.strictEqual(allRemainingJobs.length, 0); }); + it('pauses if it receives a retryAfter', async () => { + const jobs = await addJobs(5, { transitCdnInfo: undefined }); + + encryptAndUploadAttachment.throws( + new HTTPError('Rate limited', { + code: 429, + headers: { 'retry-after': '100' }, + }) + ); + await backupManager?.start(); + await waitForJobToBeStarted(jobs[2]); + + assert.strictEqual(runJob.callCount, 3); + assertRunJobCalledWith([jobs[4], jobs[3], jobs[2]]); + + // no jobs have occurred + await clock.tickAsync(50000); + assert.strictEqual(runJob.callCount, 3); + + encryptAndUploadAttachment.returns({ + cdnKey: 'newKeyOnTransitTier', + cdnNumber: TRANSIT_CDN_FOR_NEW_UPLOAD, + }); + + await clock.tickAsync(100000); + await waitForJobToBeStarted(jobs[0]); + assert.strictEqual(runJob.callCount, 8); + assertRunJobCalledWith([ + jobs[4], + jobs[3], + jobs[2], + jobs[4], + jobs[3], + jobs[2], + jobs[1], + jobs[0], + ]); + }); + describe('thumbnail backups', () => { it('addJobAndMaybeThumbnailJob conditionally adds thumbnail job', async () => { const jobForVisualAttachment = composeJob(0);