Temporary fix for a Node.js regression

This commit is contained in:
Fedor Indutny 2024-08-05 10:21:50 -07:00 committed by GitHub
parent d42e428ec3
commit 7d37ec5f80
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 143 additions and 3 deletions

View file

@ -4,7 +4,7 @@
import { ipcMain, protocol } from 'electron'; import { ipcMain, protocol } from 'electron';
import { createReadStream } from 'node:fs'; import { createReadStream } from 'node:fs';
import { join, normalize } from 'node:path'; import { join, normalize } from 'node:path';
import { Readable, PassThrough, type Writable } from 'node:stream'; import { PassThrough, type Writable } from 'node:stream';
import { pipeline } from 'node:stream/promises'; import { pipeline } from 'node:stream/promises';
import { randomBytes } from 'node:crypto'; import { randomBytes } from 'node:crypto';
import { once } from 'node:events'; import { once } from 'node:events';
@ -41,6 +41,7 @@ import { SECOND } from '../ts/util/durations';
import { drop } from '../ts/util/drop'; import { drop } from '../ts/util/drop';
import { strictAssert } from '../ts/util/assert'; import { strictAssert } from '../ts/util/assert';
import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough'; import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough';
import { toWebStream } from '../ts/util/toWebStream';
import { decryptAttachmentV2ToSink } from '../ts/AttachmentCrypto'; import { decryptAttachmentV2ToSink } from '../ts/AttachmentCrypto';
let initialized = false; let initialized = false;
@ -514,7 +515,7 @@ function handleRangeRequest({
const create200Response = (): Response => { const create200Response = (): Response => {
const plaintext = rangeFinder.get(0, context); const plaintext = rangeFinder.get(0, context);
return new Response(Readable.toWeb(plaintext) as ReadableStream<Buffer>, { return new Response(toWebStream(plaintext), {
status: 200, status: 200,
headers, headers,
}); });
@ -547,7 +548,7 @@ function handleRangeRequest({
} }
const stream = rangeFinder.get(start, context); const stream = rangeFinder.get(start, context);
return new Response(Readable.toWeb(stream) as ReadableStream<Buffer>, { return new Response(toWebStream(stream), {
status: 206, status: 206,
headers, headers,
}); });

View file

@ -0,0 +1,95 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { Readable } from 'node:stream';
import { once } from 'node:events';
import { toWebStream } from '../../util/toWebStream';
describe('toWebStream', () => {
it('only reads what it needs', async () => {
const CHUNK_SIZE = 16 * 1024;
let pushed = 0;
const readable = new Readable({
read() {
pushed += 1;
this.push(Buffer.alloc(CHUNK_SIZE));
},
});
const reader = toWebStream(readable).getReader();
const { value, done } = await reader.read();
// One to be read, one buffered
assert.strictEqual(pushed, 2);
assert.isFalse(done);
assert.strictEqual(value?.byteLength, 2 * CHUNK_SIZE);
});
it('closes controller on end', async () => {
const readable = Readable.from([
Buffer.from('hello '),
Buffer.from('world'),
]);
const reader = toWebStream(readable).getReader();
{
const { value, done } = await reader.read();
assert.strictEqual(value?.toString(), 'hello ');
assert.isFalse(done);
}
{
const { value, done } = await reader.read();
assert.strictEqual(value?.toString(), 'world');
assert.isFalse(done);
}
{
const { value, done } = await reader.read();
assert.isUndefined(value);
assert.isTrue(done);
}
});
it('handles premature close', async () => {
const readable = new Readable({
read() {
// no-op
},
});
const reader = toWebStream(readable).getReader();
readable.destroy();
await assert.isRejected(reader.read(), 'Premature close');
});
it('handles error close', async () => {
const readable = new Readable({
read() {
// no-op
},
});
const reader = toWebStream(readable).getReader();
readable.destroy(new Error('error msg'));
await assert.isRejected(reader.read(), 'error msg');
});
it('can be wrapped and destroyed during data read', async () => {
const readable = new Readable({
read() {
this.push(Buffer.from('hello'));
},
});
const web = toWebStream(readable);
// Some sort of mismatch between Node's expectation for ReadStream and
// what TS says ReadStream is in WebAPIs.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const node = Readable.fromWeb(web as any);
node.on('data', () => {
node.destroy();
});
await once(node, 'close');
});
});

44
ts/util/toWebStream.ts Normal file
View file

@ -0,0 +1,44 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { type Readable, finished } from 'node:stream';
import { once } from 'node:events';
// Note: can be removed once https://github.com/nodejs/node/issues/54205 is
// resolved and ported to Electron.
export function toWebStream(readable: Readable): ReadableStream<Buffer> {
let controller: ReadableStreamDefaultController<Buffer>;
const cleanup = finished(readable, err => {
if (err != null) {
return controller.error(err);
}
controller.close();
});
return new ReadableStream({
start(newController) {
controller = newController;
},
async pull() {
try {
await once(readable, 'readable');
const chunk = readable.read();
if (chunk != null) {
controller.enqueue(chunk);
}
} catch (error) {
cleanup();
controller.error(error);
}
},
cancel(reason) {
// If we got canceled - don't call controller.close/.error since it will
// throw.
cleanup();
readable.destroy(reason ? new Error(reason) : undefined);
},
});
}