// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { assert } from 'chai'; import { Readable } from 'stream'; import * as sinon from 'sinon'; import { noop } from 'lodash'; import { once } from 'events'; import { getStreamWithTimeout } from '../../util/getStreamWithTimeout'; describe('getStreamWithTimeout', () => { let sandbox: sinon.SinonSandbox; let clock: sinon.SinonFakeTimers; // This helps tests preserve ordering. const pushAndWait = ( stream: Readable, chunk: string | null ): Promise => { const promise = once(stream, chunk == null ? 'end' : 'data'); stream.push(chunk); return promise; }; beforeEach(() => { sandbox = sinon.createSandbox(); clock = sandbox.useFakeTimers(); }); afterEach(() => { sandbox.restore(); }); it('resolves on finished stream', async () => { const stream = new Readable({ read: noop, }); stream.push('hello'); stream.push(' '); stream.push('world'); stream.push(null); const abort = sinon.stub(); const data = await getStreamWithTimeout(stream, { name: 'test', timeout: 1000, abortController: { abort }, }); assert.strictEqual(Buffer.from(data).toString(), 'hello world'); sinon.assert.notCalled(abort); }); it('does not timeout on slow but steady stream', async () => { const stream = new Readable({ read: noop, }); const abort = sinon.stub(); const data = getStreamWithTimeout(stream, { name: 'test', timeout: 1000, abortController: { abort }, }); await clock.tickAsync(500); await pushAndWait(stream, 'hello '); await clock.tickAsync(500); await pushAndWait(stream, 'world'); await clock.tickAsync(500); await pushAndWait(stream, null); await clock.nextAsync(); assert.strictEqual(Buffer.from(await data).toString(), 'hello world'); sinon.assert.notCalled(abort); }); it('does timeout on slow but unsteady stream', async () => { const stream = new Readable({ read: noop, }); const abort = sinon.stub(); const data = getStreamWithTimeout(stream, { name: 'test', timeout: 1000, abortController: { abort }, }); await clock.tickAsync(500); await pushAndWait(stream, 'hello '); await clock.tickAsync(500); await pushAndWait(stream, 'world'); const promise = assert.isRejected( data, 'getStreamWithTimeout(test) timed out' ); await clock.tickAsync(1000); await promise; sinon.assert.called(abort); }); it('rejects on timeout', async () => { const stream = new Readable({ read: noop, }); const abort = sinon.stub(); const promise = assert.isRejected( getStreamWithTimeout(stream, { name: 'test', timeout: 1000, abortController: { abort }, }), 'getStreamWithTimeout(test) timed out' ); await clock.tickAsync(1000); await promise; sinon.assert.called(abort); }); it('rejects on stream error', async () => { const stream = new Readable({ read: noop, }); const abort = sinon.stub(); const promise = assert.isRejected( getStreamWithTimeout(stream, { name: 'test', timeout: 1000, abortController: { abort }, }), 'welp' ); stream.emit('error', new Error('welp')); await promise; sinon.assert.notCalled(abort); }); });