When a job fails, respect the Retry-After
header if applicable
This commit is contained in:
parent
c7873dd7ea
commit
1f45bce0a2
12 changed files with 228 additions and 30 deletions
|
@ -468,9 +468,7 @@ export class ChallengeHandler {
|
|||
throw error;
|
||||
}
|
||||
|
||||
const retryAfter = parseRetryAfter(
|
||||
error.responseHeaders['retry-after'].toString()
|
||||
);
|
||||
const retryAfter = parseRetryAfter(error.responseHeaders['retry-after']);
|
||||
|
||||
window.log.info(`challenge: retry after ${retryAfter}ms`);
|
||||
this.options.onChallengeFailed(retryAfter);
|
||||
|
|
|
@ -10,17 +10,12 @@ import { isDone as isDeviceLinked } from '../../util/registration';
|
|||
export async function commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime,
|
||||
timestamp,
|
||||
timeRemaining,
|
||||
}: Readonly<{
|
||||
attempt: number;
|
||||
log: LoggerType;
|
||||
maxRetryTime: number;
|
||||
timestamp: number;
|
||||
timeRemaining: number;
|
||||
}>): Promise<boolean> {
|
||||
const maxJobAge = timestamp + maxRetryTime;
|
||||
const timeRemaining = maxJobAge - Date.now();
|
||||
|
||||
if (timeRemaining <= 0) {
|
||||
log.info("giving up because it's been too long");
|
||||
return false;
|
||||
|
|
|
@ -3,11 +3,17 @@
|
|||
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
import { parseIntWithFallback } from '../../util/parseIntWithFallback';
|
||||
import { sleepFor413RetryAfterTimeIfApplicable } from './sleepFor413RetryAfterTimeIfApplicable';
|
||||
|
||||
export function handleCommonJobRequestError(
|
||||
err: unknown,
|
||||
log: LoggerType
|
||||
): void {
|
||||
export async function handleCommonJobRequestError({
|
||||
err,
|
||||
log,
|
||||
timeRemaining,
|
||||
}: Readonly<{
|
||||
err: unknown;
|
||||
log: LoggerType;
|
||||
timeRemaining: number;
|
||||
}>): Promise<void> {
|
||||
if (!(err instanceof Error)) {
|
||||
throw err;
|
||||
}
|
||||
|
@ -18,5 +24,7 @@ export function handleCommonJobRequestError(
|
|||
return;
|
||||
}
|
||||
|
||||
await sleepFor413RetryAfterTimeIfApplicable({ err, log, timeRemaining });
|
||||
|
||||
throw err;
|
||||
}
|
||||
|
|
|
@ -90,11 +90,12 @@ export async function runReadOrViewSyncJob({
|
|||
return;
|
||||
}
|
||||
|
||||
const timeRemaining = timestamp + maxRetryTime - Date.now();
|
||||
|
||||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime,
|
||||
timestamp,
|
||||
timeRemaining,
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
return;
|
||||
|
@ -117,6 +118,6 @@ export async function runReadOrViewSyncJob({
|
|||
})
|
||||
);
|
||||
} catch (err: unknown) {
|
||||
handleCommonJobRequestError(err, log);
|
||||
await handleCommonJobRequestError({ err, log, timeRemaining });
|
||||
}
|
||||
}
|
||||
|
|
37
ts/jobs/helpers/sleepFor413RetryAfterTimeIfApplicable.ts
Normal file
37
ts/jobs/helpers/sleepFor413RetryAfterTimeIfApplicable.ts
Normal file
|
@ -0,0 +1,37 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
import { sleep } from '../../util/sleep';
|
||||
import { parseRetryAfter } from '../../util/parseRetryAfter';
|
||||
import { isRecord } from '../../util/isRecord';
|
||||
|
||||
export async function sleepFor413RetryAfterTimeIfApplicable({
|
||||
err,
|
||||
log,
|
||||
timeRemaining,
|
||||
}: Readonly<{
|
||||
err: unknown;
|
||||
log: Pick<LoggerType, 'info'>;
|
||||
timeRemaining: number;
|
||||
}>): Promise<void> {
|
||||
if (
|
||||
timeRemaining <= 0 ||
|
||||
!(err instanceof Error) ||
|
||||
err.code !== 413 ||
|
||||
!isRecord(err.responseHeaders)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const retryAfter = Math.min(
|
||||
parseRetryAfter(err.responseHeaders['retry-after']),
|
||||
timeRemaining
|
||||
);
|
||||
|
||||
log.info(
|
||||
`Got a 413 response code. Sleeping for ${retryAfter} millisecond(s)`
|
||||
);
|
||||
|
||||
await sleep(retryAfter);
|
||||
}
|
|
@ -7,6 +7,7 @@ import PQueue from 'p-queue';
|
|||
import type { LoggerType } from '../logging/log';
|
||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
||||
import { sleepFor413RetryAfterTimeIfApplicable } from './helpers/sleepFor413RetryAfterTimeIfApplicable';
|
||||
import type { MessageModel } from '../models/messages';
|
||||
import { getMessageById } from '../messages/getMessageById';
|
||||
import type { ConversationModel } from '../models/conversations';
|
||||
|
@ -123,6 +124,7 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
|
|||
const { messageId, conversationId } = data;
|
||||
|
||||
await this.enqueue(conversationId, async () => {
|
||||
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
|
||||
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
|
||||
|
||||
// We don't immediately use this value because we may want to mark the message
|
||||
|
@ -130,8 +132,7 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
|
|||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime: MAX_RETRY_TIME,
|
||||
timestamp,
|
||||
timeRemaining,
|
||||
});
|
||||
|
||||
await window.ConversationController.loadPromise();
|
||||
|
@ -347,6 +348,18 @@ export class NormalMessageSendJobQueue extends JobQueue<NormalMessageSendJobData
|
|||
return;
|
||||
}
|
||||
|
||||
if (!isFinalAttempt) {
|
||||
const maybe413Error: undefined | Error = messageSendErrors.find(
|
||||
(messageSendError: unknown) =>
|
||||
messageSendError instanceof Error && messageSendError.code === 413
|
||||
);
|
||||
await sleepFor413RetryAfterTimeIfApplicable({
|
||||
err: maybe413Error,
|
||||
log,
|
||||
timeRemaining,
|
||||
});
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -39,11 +39,12 @@ export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
|
|||
}: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>,
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<void> {
|
||||
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
|
||||
|
||||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime: MAX_RETRY_TIME,
|
||||
timestamp,
|
||||
timeRemaining,
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
return;
|
||||
|
@ -52,7 +53,7 @@ export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
|
|||
try {
|
||||
await sendViewedReceipt(data.viewedReceipt);
|
||||
} catch (err: unknown) {
|
||||
handleCommonJobRequestError(err, log);
|
||||
await handleCommonJobRequestError({ err, log, timeRemaining });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,16 +6,21 @@ import { assert } from 'chai';
|
|||
import { parseRetryAfter } from '../../util/parseRetryAfter';
|
||||
|
||||
describe('parseRetryAfter', () => {
|
||||
it('should return 0 on invalid input', () => {
|
||||
it('should return 1 second when passed non-strings', () => {
|
||||
assert.equal(parseRetryAfter(undefined), 1000);
|
||||
assert.equal(parseRetryAfter(1234), 1000);
|
||||
});
|
||||
|
||||
it('should return 1 second with invalid strings', () => {
|
||||
assert.equal(parseRetryAfter('nope'), 1000);
|
||||
assert.equal(parseRetryAfter('1ff'), 1000);
|
||||
});
|
||||
|
||||
it('should return milleseconds on valid input', () => {
|
||||
it('should return milliseconds on valid input', () => {
|
||||
assert.equal(parseRetryAfter('100'), 100000);
|
||||
});
|
||||
|
||||
it('should return apply minimum value', () => {
|
||||
it('should return 1 second at minimum', () => {
|
||||
assert.equal(parseRetryAfter('0'), 1000);
|
||||
assert.equal(parseRetryAfter('-1'), 1000);
|
||||
});
|
||||
|
|
|
@ -0,0 +1,133 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { assert } from 'chai';
|
||||
import * as sinon from 'sinon';
|
||||
import { HTTPError } from '../../../textsecure/Errors';
|
||||
import * as durations from '../../../util/durations';
|
||||
|
||||
import { sleepFor413RetryAfterTimeIfApplicable } from '../../../jobs/helpers/sleepFor413RetryAfterTimeIfApplicable';
|
||||
|
||||
describe('sleepFor413RetryAfterTimeIfApplicable', () => {
|
||||
const createLogger = () => ({ info: sinon.spy() });
|
||||
|
||||
let sandbox: sinon.SinonSandbox;
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
|
||||
beforeEach(() => {
|
||||
sandbox = sinon.createSandbox();
|
||||
clock = sandbox.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
sandbox.restore();
|
||||
});
|
||||
|
||||
it('does nothing if not passed a 413 HTTP error', async () => {
|
||||
const log = createLogger();
|
||||
|
||||
const errors = [
|
||||
undefined,
|
||||
new Error('Normal error'),
|
||||
new HTTPError('Uh oh', { code: 422, headers: {}, response: {} }),
|
||||
];
|
||||
await Promise.all(
|
||||
errors.map(async err => {
|
||||
await sleepFor413RetryAfterTimeIfApplicable({
|
||||
err,
|
||||
log,
|
||||
timeRemaining: 1234,
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
sinon.assert.notCalled(log.info);
|
||||
});
|
||||
|
||||
it('waits for 1 second if receiving a 413 HTTP error without a Retry-After header', async () => {
|
||||
const err = new HTTPError('Slow down', {
|
||||
code: 413,
|
||||
headers: {},
|
||||
response: {},
|
||||
});
|
||||
|
||||
let done = false;
|
||||
|
||||
(async () => {
|
||||
await sleepFor413RetryAfterTimeIfApplicable({
|
||||
err,
|
||||
log: createLogger(),
|
||||
timeRemaining: 1234,
|
||||
});
|
||||
done = true;
|
||||
})();
|
||||
|
||||
await clock.tickAsync(999);
|
||||
assert.isFalse(done);
|
||||
|
||||
await clock.tickAsync(2);
|
||||
assert.isTrue(done);
|
||||
});
|
||||
|
||||
it('waits for Retry-After seconds if receiving a 413', async () => {
|
||||
const err = new HTTPError('Slow down', {
|
||||
code: 413,
|
||||
headers: { 'retry-after': '200' },
|
||||
response: {},
|
||||
});
|
||||
|
||||
let done = false;
|
||||
|
||||
(async () => {
|
||||
await sleepFor413RetryAfterTimeIfApplicable({
|
||||
err,
|
||||
log: createLogger(),
|
||||
timeRemaining: 123456789,
|
||||
});
|
||||
done = true;
|
||||
})();
|
||||
|
||||
await clock.tickAsync(199 * durations.SECOND);
|
||||
assert.isFalse(done);
|
||||
|
||||
await clock.tickAsync(2 * durations.SECOND);
|
||||
assert.isTrue(done);
|
||||
});
|
||||
|
||||
it("won't wait longer than the remaining time", async () => {
|
||||
const err = new HTTPError('Slow down', {
|
||||
code: 413,
|
||||
headers: { 'retry-after': '99999' },
|
||||
response: {},
|
||||
});
|
||||
|
||||
let done = false;
|
||||
|
||||
(async () => {
|
||||
await sleepFor413RetryAfterTimeIfApplicable({
|
||||
err,
|
||||
log: createLogger(),
|
||||
timeRemaining: 3 * durations.SECOND,
|
||||
});
|
||||
done = true;
|
||||
})();
|
||||
|
||||
await clock.tickAsync(4 * durations.SECOND);
|
||||
assert.isTrue(done);
|
||||
});
|
||||
|
||||
it('logs how long it will wait', async () => {
|
||||
const log = createLogger();
|
||||
const err = new HTTPError('Slow down', {
|
||||
code: 413,
|
||||
headers: { 'retry-after': '123' },
|
||||
response: {},
|
||||
});
|
||||
|
||||
sleepFor413RetryAfterTimeIfApplicable({ err, log, timeRemaining: 9999999 });
|
||||
await clock.nextAsync();
|
||||
|
||||
sinon.assert.calledOnce(log.info);
|
||||
sinon.assert.calledWith(log.info, sinon.match(/123000 millisecond\(s\)/));
|
||||
});
|
||||
});
|
|
@ -6,7 +6,8 @@
|
|||
|
||||
import { parseRetryAfter } from '../util/parseRetryAfter';
|
||||
|
||||
import { CallbackResultType } from './Types.d';
|
||||
import type { CallbackResultType } from './Types.d';
|
||||
import type { HeaderListType } from './WebAPI';
|
||||
|
||||
function appendStack(newError: Error, originalError: Error) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
|
@ -129,6 +130,8 @@ export class OutgoingMessageError extends ReplayableError {
|
|||
export class SendMessageNetworkError extends ReplayableError {
|
||||
identifier: string;
|
||||
|
||||
responseHeaders?: HeaderListType | undefined;
|
||||
|
||||
constructor(identifier: string, _m: unknown, httpError: Error) {
|
||||
super({
|
||||
name: 'SendMessageNetworkError',
|
||||
|
@ -137,6 +140,7 @@ export class SendMessageNetworkError extends ReplayableError {
|
|||
|
||||
[this.identifier] = identifier.split('.');
|
||||
this.code = httpError.code;
|
||||
this.responseHeaders = httpError.responseHeaders;
|
||||
|
||||
appendStack(this, httpError);
|
||||
}
|
||||
|
@ -166,8 +170,7 @@ export class SendMessageChallengeError extends ReplayableError {
|
|||
|
||||
const headers = httpError.responseHeaders || {};
|
||||
|
||||
this.retryAfter =
|
||||
Date.now() + parseRetryAfter((headers['retry-after'] ?? 0).toString());
|
||||
this.retryAfter = Date.now() + parseRetryAfter(headers['retry-after']);
|
||||
|
||||
appendStack(this, httpError);
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ function getContentType(response: Response) {
|
|||
}
|
||||
|
||||
type FetchHeaderListType = { [name: string]: string };
|
||||
type HeaderListType = { [name: string]: string | ReadonlyArray<string> };
|
||||
export type HeaderListType = { [name: string]: string | ReadonlyArray<string> };
|
||||
type HTTPCodeType = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH';
|
||||
|
||||
type RedactUrl = (url: string) => string;
|
||||
|
|
|
@ -6,7 +6,11 @@ import { isNormalNumber } from './isNormalNumber';
|
|||
const ONE_SECOND = 1000;
|
||||
const MINIMAL_RETRY_AFTER = ONE_SECOND;
|
||||
|
||||
export function parseRetryAfter(value: string): number {
|
||||
export function parseRetryAfter(value: unknown): number {
|
||||
if (typeof value !== 'string') {
|
||||
return MINIMAL_RETRY_AFTER;
|
||||
}
|
||||
|
||||
let retryAfter = parseInt(value, 10);
|
||||
if (!isNormalNumber(retryAfter) || retryAfter.toString() !== value) {
|
||||
retryAfter = 0;
|
||||
|
|
Loading…
Reference in a new issue