diff --git a/ts/test-both/util/getStreamWithTimeout_test.ts b/ts/test-both/util/getStreamWithTimeout_test.ts new file mode 100644 index 000000000000..31d1ee60580b --- /dev/null +++ b/ts/test-both/util/getStreamWithTimeout_test.ts @@ -0,0 +1,139 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { Readable } from 'stream'; +import * as sinon from 'sinon'; +import { noop } from 'lodash'; + +import { getStreamWithTimeout } from '../../util/getStreamWithTimeout'; + +describe('getStreamWithTimeout', () => { + let sandbox: sinon.SinonSandbox; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + clock = sandbox.useFakeTimers(); + }); + + afterEach(() => { + sandbox.restore(); + }); + + it('resolves on finished stream', async () => { + const stream = new Readable({ + read: noop, + }); + + stream.push('hello'); + stream.push(' '); + stream.push('world'); + stream.push(null); + + const abort = sinon.stub(); + const data = await getStreamWithTimeout(stream, { + name: 'test', + timeout: 1000, + abortController: { abort }, + }); + + assert.strictEqual(Buffer.from(data).toString(), 'hello world'); + sinon.assert.notCalled(abort); + }); + + it('does not timeout on slow but steady stream', async () => { + const stream = new Readable({ + read: noop, + }); + + const abort = sinon.stub(); + const data = getStreamWithTimeout(stream, { + name: 'test', + timeout: 1000, + abortController: { abort }, + }); + + await clock.tickAsync(500); + stream.push('hello '); + await clock.tickAsync(500); + stream.push('world'); + await clock.tickAsync(500); + stream.push(null); + await clock.nextAsync(); + + assert.strictEqual(Buffer.from(await data).toString(), 'hello world'); + sinon.assert.notCalled(abort); + }); + + it('does timeout on slow but unsteady stream', async () => { + const stream = new Readable({ + read: noop, + }); + + const abort = sinon.stub(); + const data = getStreamWithTimeout(stream, { + name: 'test', + timeout: 1000, + abortController: { abort }, + }); + + await clock.tickAsync(500); + stream.push('hello '); + await clock.tickAsync(500); + stream.push('world'); + + const promise = assert.isRejected( + data, + 'getStreamWithTimeout(test) timed out' + ); + + await clock.tickAsync(1000); + + await promise; + sinon.assert.called(abort); + }); + + it('rejects on timeout', async () => { + const stream = new Readable({ + read: noop, + }); + + const abort = sinon.stub(); + const promise = assert.isRejected( + getStreamWithTimeout(stream, { + name: 'test', + timeout: 1000, + abortController: { abort }, + }), + 'getStreamWithTimeout(test) timed out' + ); + + await clock.tickAsync(1000); + + await promise; + + sinon.assert.called(abort); + }); + + it('rejects on stream error', async () => { + const stream = new Readable({ + read: noop, + }); + + const abort = sinon.stub(); + const promise = assert.isRejected( + getStreamWithTimeout(stream, { + name: 'test', + timeout: 1000, + abortController: { abort }, + }), + 'welp' + ); + + stream.emit('error', new Error('welp')); + + await promise; + sinon.assert.notCalled(abort); + }); +}); diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index edba5469d99f..71bebf2fb1d3 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -8,6 +8,7 @@ /* eslint-disable no-nested-ternary */ /* eslint-disable @typescript-eslint/no-explicit-any */ +import AbortController from 'abort-controller'; import type { Response } from 'node-fetch'; import fetch from 'node-fetch'; import ProxyAgent from 'proxy-agent'; @@ -22,10 +23,12 @@ import PQueue from 'p-queue'; import { v4 as getGuid } from 'uuid'; import { z } from 'zod'; import Long from 'long'; +import type { Readable } from 'stream'; import { assert, strictAssert } from '../util/assert'; import * as durations from '../util/durations'; import { getUserAgent } from '../util/getUserAgent'; +import { getStreamWithTimeout } from '../util/getStreamWithTimeout'; import { formatAcceptLanguageHeader } from '../util/userLanguages'; import { toWebSafeBase64 } from '../util/webSafeBase64'; import type { SocketStatus } from '../types/SocketStatus'; @@ -140,6 +143,7 @@ function _validateResponse(response: any, schema: any) { } const FIVE_MINUTES = 5 * durations.MINUTE; +const GET_ATTACHMENT_CHUNK_TIMEOUT = 10 * durations.SECOND; type AgentCacheType = { [name: string]: { @@ -177,7 +181,12 @@ type PromiseAjaxOptionsType = { proxyUrl?: string; redactUrl?: RedactUrl; redirect?: 'error' | 'follow' | 'manual'; - responseType?: 'json' | 'jsonwithdetails' | 'bytes' | 'byteswithdetails'; + responseType?: + | 'json' + | 'jsonwithdetails' + | 'bytes' + | 'byteswithdetails' + | 'stream'; serverUrl?: string; stack?: string; timeout?: number; @@ -186,6 +195,7 @@ type PromiseAjaxOptionsType = { user?: string; validateResponse?: any; version: string; + abortSignal?: AbortSignal; }; type JSONWithDetailsType = { @@ -295,6 +305,7 @@ async function _promiseAjax( agent, ca: options.certificateAuthority, timeout, + abortSignal: options.abortSignal, }; if (fetchOptions.body instanceof Uint8Array) { @@ -329,7 +340,7 @@ async function _promiseAjax( } let response: Response; - let result: string | Uint8Array | unknown; + let result: string | Uint8Array | Readable | unknown; try { response = socketManager ? await socketManager.fetch(url, fetchOptions) @@ -362,6 +373,8 @@ async function _promiseAjax( options.responseType === 'byteswithdetails' ) { result = await response.buffer(); + } else if (options.responseType === 'stream') { + result = response.body; } else { result = await response.textConverted(); } @@ -466,6 +479,10 @@ function _outerAjax( providedUrl: string | null, options: PromiseAjaxOptionsType & { responseType: 'byteswithdetails' } ): Promise; +function _outerAjax( + providedUrl: string | null, + options: PromiseAjaxOptionsType & { responseType?: 'stream' } +): Promise; function _outerAjax( providedUrl: string | null, options: PromiseAjaxOptionsType @@ -602,7 +619,7 @@ type AjaxOptionsType = { jsonData?: unknown; password?: string; redactUrl?: RedactUrl; - responseType?: 'json' | 'bytes' | 'byteswithdetails'; + responseType?: 'json' | 'bytes' | 'byteswithdetails' | 'stream'; schema?: unknown; timeout?: number; unauthenticated?: boolean; @@ -1123,6 +1140,9 @@ export function initialize({ function _ajax( param: AjaxOptionsType & { responseType: 'byteswithdetails' } ): Promise; + function _ajax( + param: AjaxOptionsType & { responseType: 'stream' } + ): Promise; function _ajax( param: AjaxOptionsType & { responseType: 'json' } ): Promise; @@ -2023,18 +2043,27 @@ export function initialize({ } async function getAttachment(cdnKey: string, cdnNumber?: number) { + const abortController = new AbortController(); + const cdnUrl = isNumber(cdnNumber) ? cdnUrlObject[cdnNumber] || cdnUrlObject['0'] : cdnUrlObject['0']; // This is going to the CDN, not the service, so we use _outerAjax - return _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, { + const stream = await _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, { certificateAuthority, proxyUrl, - responseType: 'bytes', + responseType: 'stream', timeout: 0, type: 'GET', redactUrl: _createRedactor(cdnKey), version, + abortSignal: abortController.signal, + }); + + return getStreamWithTimeout(stream, { + name: `getAttachment(${cdnKey})`, + timeout: GET_ATTACHMENT_CHUNK_TIMEOUT, + abortController, }); } diff --git a/ts/util/getStreamWithTimeout.ts b/ts/util/getStreamWithTimeout.ts new file mode 100644 index 000000000000..a58be8fcb7d6 --- /dev/null +++ b/ts/util/getStreamWithTimeout.ts @@ -0,0 +1,62 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Readable } from 'stream'; + +import * as Bytes from '../Bytes'; +import { explodePromise } from './explodePromise'; + +export type OptionsType = Readonly<{ + name: string; + timeout: number; + abortController: { abort(): void }; +}>; + +export class StreamTimeoutError extends Error {} + +export function getStreamWithTimeout( + stream: Readable, + { name, timeout, abortController }: OptionsType +): Promise { + const { promise, resolve, reject } = explodePromise(); + + const chunks = new Array(); + + let timer: NodeJS.Timeout | undefined; + + const clearTimer = () => { + if (timer !== undefined) { + clearTimeout(timer); + timer = undefined; + } + }; + + const reset = () => { + clearTimer(); + + timer = setTimeout(() => { + abortController.abort(); + reject(new StreamTimeoutError(`getStreamWithTimeout(${name}) timed out`)); + }, timeout); + }; + + stream.on('data', chunk => { + reset(); + + chunks.push(chunk); + }); + + stream.on('end', () => { + clearTimer(); + resolve(Bytes.concatenate(chunks)); + }); + + stream.on('error', error => { + clearTimer(); + reject(error); + }); + + reset(); + + return promise; +}