Respect 429 rate-limiting during attachment backup

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2024-08-01 19:05:30 -05:00 committed by GitHub
parent d336487828
commit 7602c78bd7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 78 additions and 7 deletions

View file

@ -60,6 +60,7 @@ import {
isVideoTypeSupported, isVideoTypeSupported,
} from '../util/GoogleChrome'; } from '../util/GoogleChrome';
import { getLocalAttachmentUrl } from '../util/getLocalAttachmentUrl'; import { getLocalAttachmentUrl } from '../util/getLocalAttachmentUrl';
import { findRetryAfterTimeFromError } from './helpers/findRetryAfterTimeFromError';
const MAX_CONCURRENT_JOBS = 3; const MAX_CONCURRENT_JOBS = 3;
const RETRY_CONFIG = { const RETRY_CONFIG = {
@ -215,6 +216,17 @@ export async function runAttachmentBackupJob(
return { status: 'finished' }; return { status: 'finished' };
} }
if (
error instanceof Error &&
'code' in error &&
(error.code === 413 || error.code === 429)
) {
return {
status: 'rate-limited',
pauseDurationMs: findRetryAfterTimeFromError(error),
};
}
return { status: 'retry' }; return { status: 'retry' };
} }
} }

View file

@ -60,23 +60,24 @@ export type JobManagerJobResultType<CoreJobType> =
| { | {
status: 'retry'; status: 'retry';
} }
| { status: 'finished'; newJob?: CoreJobType }; | { status: 'finished'; newJob?: CoreJobType }
| { status: 'rate-limited'; pauseDurationMs: number };
export abstract class JobManager<CoreJobType> { export abstract class JobManager<CoreJobType> {
protected enabled: boolean = false; private enabled: boolean = false;
protected activeJobs: Map< private activeJobs: Map<
string, string,
{ {
completionPromise: ExplodePromiseResultType<void>; completionPromise: ExplodePromiseResultType<void>;
job: CoreJobType & JobManagerJobType; job: CoreJobType & JobManagerJobType;
} }
> = new Map(); > = new Map();
protected jobStartPromises: Map<string, ExplodePromiseResultType<void>> = private jobStartPromises: Map<string, ExplodePromiseResultType<void>> =
new Map(); new Map();
protected jobCompletePromises: Map<string, ExplodePromiseResultType<void>> = private jobCompletePromises: Map<string, ExplodePromiseResultType<void>> =
new Map(); new Map();
private tickTimeout: NodeJS.Timeout | null = null;
protected tickTimeout: NodeJS.Timeout | null = null;
protected logPrefix = 'JobManager'; protected logPrefix = 'JobManager';
public tickInterval = DEFAULT_TICK_INTERVAL; public tickInterval = DEFAULT_TICK_INTERVAL;
constructor(readonly params: JobManagerParamsType<CoreJobType>) {} constructor(readonly params: JobManagerParamsType<CoreJobType>) {}
@ -98,13 +99,22 @@ export abstract class JobManager<CoreJobType> {
); );
} }
tick(): void { private tick(): void {
clearTimeoutIfNecessary(this.tickTimeout); clearTimeoutIfNecessary(this.tickTimeout);
this.tickTimeout = null; this.tickTimeout = null;
drop(this.maybeStartJobs()); drop(this.maybeStartJobs());
this.tickTimeout = setTimeout(() => this.tick(), this.tickInterval); this.tickTimeout = setTimeout(() => this.tick(), this.tickInterval);
} }
private pauseForDuration(durationMs: number): void {
this.enabled = false;
clearTimeoutIfNecessary(this.tickTimeout);
this.tickTimeout = setTimeout(() => {
this.enabled = true;
this.tick();
}, durationMs);
}
// used in testing // used in testing
waitForJobToBeStarted( waitForJobToBeStarted(
job: CoreJobType & Pick<JobManagerJobType, 'attempts'> job: CoreJobType & Pick<JobManagerJobType, 'attempts'>
@ -270,6 +280,13 @@ export abstract class JobManager<CoreJobType> {
} }
await this.retryJobLater(job); await this.retryJobLater(job);
return; return;
case 'rate-limited':
log.info(
`${logId}: rate-limited; retrying in ${jobRunResult.pauseDurationMs}`
);
this.pauseForDuration(jobRunResult.pauseDurationMs);
await this.retryJobLater(job);
return;
default: default:
throw missingCaseError(status); throw missingCaseError(status);
} }

View file

@ -25,6 +25,7 @@ import { APPLICATION_OCTET_STREAM, VIDEO_MP4 } from '../../types/MIME';
import { createName, getRelativePath } from '../../util/attachmentPath'; import { createName, getRelativePath } from '../../util/attachmentPath';
import { encryptAttachmentV2, generateKeys } from '../../AttachmentCrypto'; import { encryptAttachmentV2, generateKeys } from '../../AttachmentCrypto';
import { SECOND } from '../../util/durations'; import { SECOND } from '../../util/durations';
import { HTTPError } from '../../textsecure/Errors';
const TRANSIT_CDN = 2; const TRANSIT_CDN = 2;
const TRANSIT_CDN_FOR_NEW_UPLOAD = 42; const TRANSIT_CDN_FOR_NEW_UPLOAD = 42;
@ -42,6 +43,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
let backupsService = {}; let backupsService = {};
let encryptAndUploadAttachment: sinon.SinonStub; let encryptAndUploadAttachment: sinon.SinonStub;
let sandbox: sinon.SinonSandbox; let sandbox: sinon.SinonSandbox;
let clock: sinon.SinonFakeTimers;
let isInCall: sinon.SinonStub; let isInCall: sinon.SinonStub;
function composeJob( function composeJob(
@ -116,6 +118,7 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
await window.storage.put('masterKey', Bytes.toBase64(getRandomBytes(32))); await window.storage.put('masterKey', Bytes.toBase64(getRandomBytes(32)));
sandbox = sinon.createSandbox(); sandbox = sinon.createSandbox();
clock = sandbox.useFakeTimers();
isInCall = sandbox.stub().returns(false); isInCall = sandbox.stub().returns(false);
backupMediaBatch = sandbox backupMediaBatch = sandbox
@ -331,6 +334,45 @@ describe('AttachmentBackupManager/JobManager', function attachmentBackupManager(
assert.strictEqual(allRemainingJobs.length, 0); assert.strictEqual(allRemainingJobs.length, 0);
}); });
it('pauses if it receives a retryAfter', async () => {
const jobs = await addJobs(5, { transitCdnInfo: undefined });
encryptAndUploadAttachment.throws(
new HTTPError('Rate limited', {
code: 429,
headers: { 'retry-after': '100' },
})
);
await backupManager?.start();
await waitForJobToBeStarted(jobs[2]);
assert.strictEqual(runJob.callCount, 3);
assertRunJobCalledWith([jobs[4], jobs[3], jobs[2]]);
// no jobs have occurred
await clock.tickAsync(50000);
assert.strictEqual(runJob.callCount, 3);
encryptAndUploadAttachment.returns({
cdnKey: 'newKeyOnTransitTier',
cdnNumber: TRANSIT_CDN_FOR_NEW_UPLOAD,
});
await clock.tickAsync(100000);
await waitForJobToBeStarted(jobs[0]);
assert.strictEqual(runJob.callCount, 8);
assertRunJobCalledWith([
jobs[4],
jobs[3],
jobs[2],
jobs[4],
jobs[3],
jobs[2],
jobs[1],
jobs[0],
]);
});
describe('thumbnail backups', () => { describe('thumbnail backups', () => {
it('addJobAndMaybeThumbnailJob conditionally adds thumbnail job', async () => { it('addJobAndMaybeThumbnailJob conditionally adds thumbnail job', async () => {
const jobForVisualAttachment = composeJob(0); const jobForVisualAttachment = composeJob(0);