From 7d37ec5f80b1ffd770e70c80f4ddf3b7486d2eef Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 5 Aug 2024 10:21:50 -0700 Subject: [PATCH] Temporary fix for a Node.js regression --- app/attachment_channel.ts | 7 +- ts/test-node/util/toWebStream_test.ts | 95 +++++++++++++++++++++++++++ ts/util/toWebStream.ts | 44 +++++++++++++ 3 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 ts/test-node/util/toWebStream_test.ts create mode 100644 ts/util/toWebStream.ts diff --git a/app/attachment_channel.ts b/app/attachment_channel.ts index 104dacfd8312..5be5093e6560 100644 --- a/app/attachment_channel.ts +++ b/app/attachment_channel.ts @@ -4,7 +4,7 @@ import { ipcMain, protocol } from 'electron'; import { createReadStream } from 'node:fs'; import { join, normalize } from 'node:path'; -import { Readable, PassThrough, type Writable } from 'node:stream'; +import { PassThrough, type Writable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { randomBytes } from 'node:crypto'; import { once } from 'node:events'; @@ -41,6 +41,7 @@ import { SECOND } from '../ts/util/durations'; import { drop } from '../ts/util/drop'; import { strictAssert } from '../ts/util/assert'; import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough'; +import { toWebStream } from '../ts/util/toWebStream'; import { decryptAttachmentV2ToSink } from '../ts/AttachmentCrypto'; let initialized = false; @@ -514,7 +515,7 @@ function handleRangeRequest({ const create200Response = (): Response => { const plaintext = rangeFinder.get(0, context); - return new Response(Readable.toWeb(plaintext) as ReadableStream, { + return new Response(toWebStream(plaintext), { status: 200, headers, }); @@ -547,7 +548,7 @@ function handleRangeRequest({ } const stream = rangeFinder.get(start, context); - return new Response(Readable.toWeb(stream) as ReadableStream, { + return new Response(toWebStream(stream), { status: 206, headers, }); diff --git a/ts/test-node/util/toWebStream_test.ts b/ts/test-node/util/toWebStream_test.ts new file mode 100644 index 000000000000..032b2c1c8a6e --- /dev/null +++ b/ts/test-node/util/toWebStream_test.ts @@ -0,0 +1,95 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { Readable } from 'node:stream'; +import { once } from 'node:events'; +import { toWebStream } from '../../util/toWebStream'; + +describe('toWebStream', () => { + it('only reads what it needs', async () => { + const CHUNK_SIZE = 16 * 1024; + let pushed = 0; + const readable = new Readable({ + read() { + pushed += 1; + this.push(Buffer.alloc(CHUNK_SIZE)); + }, + }); + + const reader = toWebStream(readable).getReader(); + const { value, done } = await reader.read(); + + // One to be read, one buffered + assert.strictEqual(pushed, 2); + assert.isFalse(done); + assert.strictEqual(value?.byteLength, 2 * CHUNK_SIZE); + }); + + it('closes controller on end', async () => { + const readable = Readable.from([ + Buffer.from('hello '), + Buffer.from('world'), + ]); + + const reader = toWebStream(readable).getReader(); + { + const { value, done } = await reader.read(); + assert.strictEqual(value?.toString(), 'hello '); + assert.isFalse(done); + } + { + const { value, done } = await reader.read(); + assert.strictEqual(value?.toString(), 'world'); + assert.isFalse(done); + } + { + const { value, done } = await reader.read(); + assert.isUndefined(value); + assert.isTrue(done); + } + }); + + it('handles premature close', async () => { + const readable = new Readable({ + read() { + // no-op + }, + }); + + const reader = toWebStream(readable).getReader(); + readable.destroy(); + await assert.isRejected(reader.read(), 'Premature close'); + }); + + it('handles error close', async () => { + const readable = new Readable({ + read() { + // no-op + }, + }); + + const reader = toWebStream(readable).getReader(); + readable.destroy(new Error('error msg')); + await assert.isRejected(reader.read(), 'error msg'); + }); + + it('can be wrapped and destroyed during data read', async () => { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + }, + }); + + const web = toWebStream(readable); + + // Some sort of mismatch between Node's expectation for ReadStream and + // what TS says ReadStream is in WebAPIs. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const node = Readable.fromWeb(web as any); + node.on('data', () => { + node.destroy(); + }); + await once(node, 'close'); + }); +}); diff --git a/ts/util/toWebStream.ts b/ts/util/toWebStream.ts new file mode 100644 index 000000000000..1b0d8a974698 --- /dev/null +++ b/ts/util/toWebStream.ts @@ -0,0 +1,44 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { type Readable, finished } from 'node:stream'; +import { once } from 'node:events'; + +// Note: can be removed once https://github.com/nodejs/node/issues/54205 is +// resolved and ported to Electron. +export function toWebStream(readable: Readable): ReadableStream { + let controller: ReadableStreamDefaultController; + + const cleanup = finished(readable, err => { + if (err != null) { + return controller.error(err); + } + + controller.close(); + }); + + return new ReadableStream({ + start(newController) { + controller = newController; + }, + async pull() { + try { + await once(readable, 'readable'); + const chunk = readable.read(); + if (chunk != null) { + controller.enqueue(chunk); + } + } catch (error) { + cleanup(); + controller.error(error); + } + }, + cancel(reason) { + // If we got canceled - don't call controller.close/.error since it will + // throw. + cleanup(); + + readable.destroy(reason ? new Error(reason) : undefined); + }, + }); +}