Handle new rate limiting info from CDSI
This commit is contained in:
parent
b26f60d2fc
commit
f0a3735ca2
4 changed files with 51 additions and 36 deletions
|
@ -46,13 +46,6 @@ message CDSClientResponse {
|
|||
// leakage based on the size of the encrypted output.
|
||||
optional bytes e164_pni_aci_triples = 1;
|
||||
|
||||
// If the user has run out of quota for lookups, they will receive
|
||||
// a response with just the following field set, followed by a websocket
|
||||
// closure of type 4008 (RESOURCE_EXHAUSTED). Should they retry exactly
|
||||
// the same request after the provided number of seconds has passed,
|
||||
// we expect it should work.
|
||||
optional int32 retry_after_secs = 2;
|
||||
|
||||
// A token which allows subsequent calls' rate limiting to discount the
|
||||
// e164s sent up in this request, only counting those in the next
|
||||
// request's new_e164s.
|
||||
|
|
|
@ -9,7 +9,6 @@ import Long from 'long';
|
|||
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
import { strictAssert } from '../../util/assert';
|
||||
import { dropNull } from '../../util/dropNull';
|
||||
import { UUID_BYTE_SIZE } from '../../types/UUID';
|
||||
import * as Bytes from '../../Bytes';
|
||||
import { uuidToBytes, bytesToUuid } from '../../Crypto';
|
||||
|
@ -19,17 +18,13 @@ import type {
|
|||
CDSResponseEntryType,
|
||||
CDSResponseType,
|
||||
} from './Types.d';
|
||||
import { RateLimitedError } from './RateLimitedError';
|
||||
|
||||
export type CDSSocketBaseOptionsType = Readonly<{
|
||||
logger: LoggerType;
|
||||
socket: WebSocket;
|
||||
}>;
|
||||
|
||||
export type CDSSocketResponseType = Readonly<{
|
||||
response: CDSResponseType;
|
||||
retryAfterSecs?: number;
|
||||
}>;
|
||||
|
||||
export enum CDSSocketState {
|
||||
Open = 'Open',
|
||||
Handshake = 'Handshake',
|
||||
|
@ -71,7 +66,7 @@ export abstract class CDSSocketBase<
|
|||
acis,
|
||||
accessKeys,
|
||||
returnAcisWithoutUaks = false,
|
||||
}: CDSRequestOptionsType): Promise<CDSSocketResponseType> {
|
||||
}: CDSRequestOptionsType): Promise<CDSResponseType> {
|
||||
const log = this.logger;
|
||||
|
||||
strictAssert(
|
||||
|
@ -117,7 +112,6 @@ export abstract class CDSSocketBase<
|
|||
await this.sendRequest(version, Buffer.from(request));
|
||||
|
||||
const resultMap: Map<string, CDSResponseEntryType> = new Map();
|
||||
let retryAfterSecs: number | undefined;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
|
@ -134,21 +128,12 @@ export abstract class CDSSocketBase<
|
|||
log.info('CDSSocket.request(): processing response message');
|
||||
|
||||
const response = Proto.CDSClientResponse.decode(message);
|
||||
const newRetryAfterSecs = dropNull(response.retryAfterSecs);
|
||||
|
||||
decodeSingleResponse(resultMap, response);
|
||||
|
||||
if (newRetryAfterSecs) {
|
||||
retryAfterSecs = Math.max(newRetryAfterSecs, retryAfterSecs ?? 0);
|
||||
}
|
||||
}
|
||||
|
||||
log.info('CDSSocket.request(): done');
|
||||
|
||||
return {
|
||||
response: resultMap,
|
||||
retryAfterSecs,
|
||||
};
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
// Abstract methods
|
||||
|
@ -200,6 +185,19 @@ export abstract class CDSSocketBase<
|
|||
this.socket.on('close', (code, reason) => {
|
||||
if (code === 1000) {
|
||||
stream.push(null);
|
||||
} else if (code === 4008) {
|
||||
try {
|
||||
const payload = JSON.parse(reason);
|
||||
|
||||
stream.destroy(new RateLimitedError(payload));
|
||||
} catch (error) {
|
||||
stream.destroy(
|
||||
new Error(
|
||||
`Socket closed with code ${code} and reason ${reason}, ` +
|
||||
'but rate limiting response cannot be parsed'
|
||||
)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
stream.destroy(
|
||||
new Error(`Socket closed with code ${code} and reason ${reason}`)
|
||||
|
|
|
@ -17,6 +17,7 @@ import type {
|
|||
CDSResponseType,
|
||||
CDSAuthType,
|
||||
} from './Types.d';
|
||||
import { RateLimitedError } from './RateLimitedError';
|
||||
import { connect as connectWebSocket } from '../WebSocket';
|
||||
|
||||
const REQUEST_TIMEOUT = 10 * SECOND;
|
||||
|
@ -65,19 +66,19 @@ export abstract class CDSSocketManagerBase<
|
|||
}
|
||||
|
||||
// Send request
|
||||
const { response, retryAfterSecs = 0 } = await pTimeout(
|
||||
socket.request(options),
|
||||
timeout
|
||||
);
|
||||
|
||||
if (retryAfterSecs > 0) {
|
||||
this.retryAfter = Math.max(
|
||||
this.retryAfter ?? Date.now(),
|
||||
Date.now() + retryAfterSecs * durations.SECOND
|
||||
);
|
||||
}
|
||||
const response = await pTimeout(socket.request(options), timeout);
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
if (error instanceof RateLimitedError) {
|
||||
if (error.retryAfterSecs > 0) {
|
||||
this.retryAfter = Math.max(
|
||||
this.retryAfter ?? Date.now(),
|
||||
Date.now() + error.retryAfterSecs * durations.SECOND
|
||||
);
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
} finally {
|
||||
log.info('CDSSocketManager: closing socket');
|
||||
socket.close(3000, 'Normal');
|
||||
|
|
23
ts/textsecure/cds/RateLimitedError.ts
Normal file
23
ts/textsecure/cds/RateLimitedError.ts
Normal file
|
@ -0,0 +1,23 @@
|
|||
// Copyright 2022 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
export type RateLimitedErrorPayloadType = Readonly<{
|
||||
// eslint-disable-next-line camelcase
|
||||
retry_after?: number;
|
||||
}>;
|
||||
|
||||
export class RateLimitedError extends Error {
|
||||
public readonly retryAfterSecs: number;
|
||||
|
||||
// eslint-disable-next-line camelcase
|
||||
constructor({ retry_after }: RateLimitedErrorPayloadType) {
|
||||
super(
|
||||
'RateLimitedError: got 4008 close code from CDSI, ' +
|
||||
// eslint-disable-next-line camelcase
|
||||
`retry_after=${retry_after}`
|
||||
);
|
||||
|
||||
// eslint-disable-next-line camelcase
|
||||
this.retryAfterSecs = retry_after ?? 0;
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue