Add stream timeout for attachment downloads

This commit is contained in:
Fedor Indutny 2021-11-15 23:54:59 +01:00 committed by GitHub
parent 7d17158add
commit c6ee6a038e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 235 additions and 5 deletions

View file

@ -0,0 +1,139 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { Readable } from 'stream';
import * as sinon from 'sinon';
import { noop } from 'lodash';
import { getStreamWithTimeout } from '../../util/getStreamWithTimeout';
describe('getStreamWithTimeout', () => {
let sandbox: sinon.SinonSandbox;
let clock: sinon.SinonFakeTimers;
beforeEach(() => {
sandbox = sinon.createSandbox();
clock = sandbox.useFakeTimers();
});
afterEach(() => {
sandbox.restore();
});
it('resolves on finished stream', async () => {
const stream = new Readable({
read: noop,
});
stream.push('hello');
stream.push(' ');
stream.push('world');
stream.push(null);
const abort = sinon.stub();
const data = await getStreamWithTimeout(stream, {
name: 'test',
timeout: 1000,
abortController: { abort },
});
assert.strictEqual(Buffer.from(data).toString(), 'hello world');
sinon.assert.notCalled(abort);
});
it('does not timeout on slow but steady stream', async () => {
const stream = new Readable({
read: noop,
});
const abort = sinon.stub();
const data = getStreamWithTimeout(stream, {
name: 'test',
timeout: 1000,
abortController: { abort },
});
await clock.tickAsync(500);
stream.push('hello ');
await clock.tickAsync(500);
stream.push('world');
await clock.tickAsync(500);
stream.push(null);
await clock.nextAsync();
assert.strictEqual(Buffer.from(await data).toString(), 'hello world');
sinon.assert.notCalled(abort);
});
it('does timeout on slow but unsteady stream', async () => {
const stream = new Readable({
read: noop,
});
const abort = sinon.stub();
const data = getStreamWithTimeout(stream, {
name: 'test',
timeout: 1000,
abortController: { abort },
});
await clock.tickAsync(500);
stream.push('hello ');
await clock.tickAsync(500);
stream.push('world');
const promise = assert.isRejected(
data,
'getStreamWithTimeout(test) timed out'
);
await clock.tickAsync(1000);
await promise;
sinon.assert.called(abort);
});
it('rejects on timeout', async () => {
const stream = new Readable({
read: noop,
});
const abort = sinon.stub();
const promise = assert.isRejected(
getStreamWithTimeout(stream, {
name: 'test',
timeout: 1000,
abortController: { abort },
}),
'getStreamWithTimeout(test) timed out'
);
await clock.tickAsync(1000);
await promise;
sinon.assert.called(abort);
});
it('rejects on stream error', async () => {
const stream = new Readable({
read: noop,
});
const abort = sinon.stub();
const promise = assert.isRejected(
getStreamWithTimeout(stream, {
name: 'test',
timeout: 1000,
abortController: { abort },
}),
'welp'
);
stream.emit('error', new Error('welp'));
await promise;
sinon.assert.notCalled(abort);
});
});

View file

@ -8,6 +8,7 @@
/* eslint-disable no-nested-ternary */
/* eslint-disable @typescript-eslint/no-explicit-any */
import AbortController from 'abort-controller';
import type { Response } from 'node-fetch';
import fetch from 'node-fetch';
import ProxyAgent from 'proxy-agent';
@ -22,10 +23,12 @@ import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { z } from 'zod';
import Long from 'long';
import type { Readable } from 'stream';
import { assert, strictAssert } from '../util/assert';
import * as durations from '../util/durations';
import { getUserAgent } from '../util/getUserAgent';
import { getStreamWithTimeout } from '../util/getStreamWithTimeout';
import { formatAcceptLanguageHeader } from '../util/userLanguages';
import { toWebSafeBase64 } from '../util/webSafeBase64';
import type { SocketStatus } from '../types/SocketStatus';
@ -140,6 +143,7 @@ function _validateResponse(response: any, schema: any) {
}
const FIVE_MINUTES = 5 * durations.MINUTE;
const GET_ATTACHMENT_CHUNK_TIMEOUT = 10 * durations.SECOND;
type AgentCacheType = {
[name: string]: {
@ -177,7 +181,12 @@ type PromiseAjaxOptionsType = {
proxyUrl?: string;
redactUrl?: RedactUrl;
redirect?: 'error' | 'follow' | 'manual';
responseType?: 'json' | 'jsonwithdetails' | 'bytes' | 'byteswithdetails';
responseType?:
| 'json'
| 'jsonwithdetails'
| 'bytes'
| 'byteswithdetails'
| 'stream';
serverUrl?: string;
stack?: string;
timeout?: number;
@ -186,6 +195,7 @@ type PromiseAjaxOptionsType = {
user?: string;
validateResponse?: any;
version: string;
abortSignal?: AbortSignal;
};
type JSONWithDetailsType = {
@ -295,6 +305,7 @@ async function _promiseAjax(
agent,
ca: options.certificateAuthority,
timeout,
abortSignal: options.abortSignal,
};
if (fetchOptions.body instanceof Uint8Array) {
@ -329,7 +340,7 @@ async function _promiseAjax(
}
let response: Response;
let result: string | Uint8Array | unknown;
let result: string | Uint8Array | Readable | unknown;
try {
response = socketManager
? await socketManager.fetch(url, fetchOptions)
@ -362,6 +373,8 @@ async function _promiseAjax(
options.responseType === 'byteswithdetails'
) {
result = await response.buffer();
} else if (options.responseType === 'stream') {
result = response.body;
} else {
result = await response.textConverted();
}
@ -466,6 +479,10 @@ function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType & { responseType: 'byteswithdetails' }
): Promise<BytesWithDetailsType>;
function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType & { responseType?: 'stream' }
): Promise<Readable>;
function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType
@ -602,7 +619,7 @@ type AjaxOptionsType = {
jsonData?: unknown;
password?: string;
redactUrl?: RedactUrl;
responseType?: 'json' | 'bytes' | 'byteswithdetails';
responseType?: 'json' | 'bytes' | 'byteswithdetails' | 'stream';
schema?: unknown;
timeout?: number;
unauthenticated?: boolean;
@ -1123,6 +1140,9 @@ export function initialize({
function _ajax(
param: AjaxOptionsType & { responseType: 'byteswithdetails' }
): Promise<BytesWithDetailsType>;
function _ajax(
param: AjaxOptionsType & { responseType: 'stream' }
): Promise<Readable>;
function _ajax(
param: AjaxOptionsType & { responseType: 'json' }
): Promise<unknown>;
@ -2023,18 +2043,27 @@ export function initialize({
}
async function getAttachment(cdnKey: string, cdnNumber?: number) {
const abortController = new AbortController();
const cdnUrl = isNumber(cdnNumber)
? cdnUrlObject[cdnNumber] || cdnUrlObject['0']
: cdnUrlObject['0'];
// This is going to the CDN, not the service, so we use _outerAjax
return _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, {
const stream = await _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, {
certificateAuthority,
proxyUrl,
responseType: 'bytes',
responseType: 'stream',
timeout: 0,
type: 'GET',
redactUrl: _createRedactor(cdnKey),
version,
abortSignal: abortController.signal,
});
return getStreamWithTimeout(stream, {
name: `getAttachment(${cdnKey})`,
timeout: GET_ATTACHMENT_CHUNK_TIMEOUT,
abortController,
});
}

View file

@ -0,0 +1,62 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Readable } from 'stream';
import * as Bytes from '../Bytes';
import { explodePromise } from './explodePromise';
export type OptionsType = Readonly<{
name: string;
timeout: number;
abortController: { abort(): void };
}>;
export class StreamTimeoutError extends Error {}
export function getStreamWithTimeout(
stream: Readable,
{ name, timeout, abortController }: OptionsType
): Promise<Uint8Array> {
const { promise, resolve, reject } = explodePromise<Uint8Array>();
const chunks = new Array<Uint8Array>();
let timer: NodeJS.Timeout | undefined;
const clearTimer = () => {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}
};
const reset = () => {
clearTimer();
timer = setTimeout(() => {
abortController.abort();
reject(new StreamTimeoutError(`getStreamWithTimeout(${name}) timed out`));
}, timeout);
};
stream.on('data', chunk => {
reset();
chunks.push(chunk);
});
stream.on('end', () => {
clearTimer();
resolve(Bytes.concatenate(chunks));
});
stream.on('error', error => {
clearTimer();
reject(error);
});
reset();
return promise;
}