diff --git a/protos/ContactDiscovery.proto b/protos/ContactDiscovery.proto index 3614f8e55..22eb1ad5a 100644 --- a/protos/ContactDiscovery.proto +++ b/protos/ContactDiscovery.proto @@ -46,13 +46,6 @@ message CDSClientResponse { // leakage based on the size of the encrypted output. optional bytes e164_pni_aci_triples = 1; - // If the user has run out of quota for lookups, they will receive - // a response with just the following field set, followed by a websocket - // closure of type 4008 (RESOURCE_EXHAUSTED). Should they retry exactly - // the same request after the provided number of seconds has passed, - // we expect it should work. - optional int32 retry_after_secs = 2; - // A token which allows subsequent calls' rate limiting to discount the // e164s sent up in this request, only counting those in the next // request's new_e164s. diff --git a/ts/textsecure/cds/CDSSocketBase.ts b/ts/textsecure/cds/CDSSocketBase.ts index 707318091..b7a2b2762 100644 --- a/ts/textsecure/cds/CDSSocketBase.ts +++ b/ts/textsecure/cds/CDSSocketBase.ts @@ -9,7 +9,6 @@ import Long from 'long'; import type { LoggerType } from '../../types/Logging'; import { strictAssert } from '../../util/assert'; -import { dropNull } from '../../util/dropNull'; import { UUID_BYTE_SIZE } from '../../types/UUID'; import * as Bytes from '../../Bytes'; import { uuidToBytes, bytesToUuid } from '../../Crypto'; @@ -19,17 +18,13 @@ import type { CDSResponseEntryType, CDSResponseType, } from './Types.d'; +import { RateLimitedError } from './RateLimitedError'; export type CDSSocketBaseOptionsType = Readonly<{ logger: LoggerType; socket: WebSocket; }>; -export type CDSSocketResponseType = Readonly<{ - response: CDSResponseType; - retryAfterSecs?: number; -}>; - export enum CDSSocketState { Open = 'Open', Handshake = 'Handshake', @@ -71,7 +66,7 @@ export abstract class CDSSocketBase< acis, accessKeys, returnAcisWithoutUaks = false, - }: CDSRequestOptionsType): Promise { + }: CDSRequestOptionsType): Promise { const log = this.logger; strictAssert( @@ -117,7 +112,6 @@ export abstract class CDSSocketBase< await this.sendRequest(version, Buffer.from(request)); const resultMap: Map = new Map(); - let retryAfterSecs: number | undefined; // eslint-disable-next-line no-constant-condition while (true) { @@ -134,21 +128,12 @@ export abstract class CDSSocketBase< log.info('CDSSocket.request(): processing response message'); const response = Proto.CDSClientResponse.decode(message); - const newRetryAfterSecs = dropNull(response.retryAfterSecs); decodeSingleResponse(resultMap, response); - - if (newRetryAfterSecs) { - retryAfterSecs = Math.max(newRetryAfterSecs, retryAfterSecs ?? 0); - } } log.info('CDSSocket.request(): done'); - - return { - response: resultMap, - retryAfterSecs, - }; + return resultMap; } // Abstract methods @@ -200,6 +185,19 @@ export abstract class CDSSocketBase< this.socket.on('close', (code, reason) => { if (code === 1000) { stream.push(null); + } else if (code === 4008) { + try { + const payload = JSON.parse(reason); + + stream.destroy(new RateLimitedError(payload)); + } catch (error) { + stream.destroy( + new Error( + `Socket closed with code ${code} and reason ${reason}, ` + + 'but rate limiting response cannot be parsed' + ) + ); + } } else { stream.destroy( new Error(`Socket closed with code ${code} and reason ${reason}`) diff --git a/ts/textsecure/cds/CDSSocketManagerBase.ts b/ts/textsecure/cds/CDSSocketManagerBase.ts index 0f92fe7b6..623e95806 100644 --- a/ts/textsecure/cds/CDSSocketManagerBase.ts +++ b/ts/textsecure/cds/CDSSocketManagerBase.ts @@ -17,6 +17,7 @@ import type { CDSResponseType, CDSAuthType, } from './Types.d'; +import { RateLimitedError } from './RateLimitedError'; import { connect as connectWebSocket } from '../WebSocket'; const REQUEST_TIMEOUT = 10 * SECOND; @@ -65,19 +66,19 @@ export abstract class CDSSocketManagerBase< } // Send request - const { response, retryAfterSecs = 0 } = await pTimeout( - socket.request(options), - timeout - ); - - if (retryAfterSecs > 0) { - this.retryAfter = Math.max( - this.retryAfter ?? Date.now(), - Date.now() + retryAfterSecs * durations.SECOND - ); - } + const response = await pTimeout(socket.request(options), timeout); return response; + } catch (error) { + if (error instanceof RateLimitedError) { + if (error.retryAfterSecs > 0) { + this.retryAfter = Math.max( + this.retryAfter ?? Date.now(), + Date.now() + error.retryAfterSecs * durations.SECOND + ); + } + } + throw error; } finally { log.info('CDSSocketManager: closing socket'); socket.close(3000, 'Normal'); diff --git a/ts/textsecure/cds/RateLimitedError.ts b/ts/textsecure/cds/RateLimitedError.ts new file mode 100644 index 000000000..c64355057 --- /dev/null +++ b/ts/textsecure/cds/RateLimitedError.ts @@ -0,0 +1,23 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export type RateLimitedErrorPayloadType = Readonly<{ + // eslint-disable-next-line camelcase + retry_after?: number; +}>; + +export class RateLimitedError extends Error { + public readonly retryAfterSecs: number; + + // eslint-disable-next-line camelcase + constructor({ retry_after }: RateLimitedErrorPayloadType) { + super( + 'RateLimitedError: got 4008 close code from CDSI, ' + + // eslint-disable-next-line camelcase + `retry_after=${retry_after}` + ); + + // eslint-disable-next-line camelcase + this.retryAfterSecs = retry_after ?? 0; + } +}