From 6322c329bf2b746fe2199247a641a1af05a37630 Mon Sep 17 00:00:00 2001 From: "trop[bot]" <37223003+trop[bot]@users.noreply.github.com> Date: Sat, 27 Jul 2024 14:47:12 -0500 Subject: [PATCH] fix: always terminate active Node Streams (#43071) `.destroy()` is an important method in the lifecycle of a Node.js Readable stream. It is typically called to reclaim the resources (e.g., close file descriptor). The only situations where calling it manually isn't necessary are when the following events are emitted first: - `end`: natural end of a stream - `error`: stream terminated due to a failure Prior to this commit the ended state was incorrectly tracked together with a pending internal error. It led to situations where the request could get aborted during a read and then get marked as ended (having pending error). With this change we disentangle pending "error" and "destroyed" cases to always properly terminate an active Node.js Readable stream. Co-authored-by: trop[bot] <37223003+trop[bot]@users.noreply.github.com> Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> --- shell/browser/net/node_stream_loader.cc | 24 ++++++++++++++++-------- shell/browser/net/node_stream_loader.h | 8 +++++++- spec/api-protocol-spec.ts | 24 ++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/shell/browser/net/node_stream_loader.cc b/shell/browser/net/node_stream_loader.cc index ca7c6f982026..83709adc0965 100644 --- a/shell/browser/net/node_stream_loader.cc +++ b/shell/browser/net/node_stream_loader.cc @@ -43,7 +43,7 @@ NodeStreamLoader::~NodeStreamLoader() { } // Destroy the stream if not already ended - if (!ended_) { + if (!destroyed_) { node::MakeCallback(isolate_, emitter_.Get(isolate_), "destroy", 0, nullptr, {0, 0}); } @@ -63,13 +63,21 @@ void NodeStreamLoader::Start(network::mojom::URLResponseHeadPtr head) { std::nullopt); auto weak = weak_factory_.GetWeakPtr(); - On("end", - base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK)); - On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, - net::ERR_FAILED)); + On("end", base::BindRepeating(&NodeStreamLoader::NotifyEnd, weak)); + On("error", base::BindRepeating(&NodeStreamLoader::NotifyError, weak)); On("readable", base::BindRepeating(&NodeStreamLoader::NotifyReadable, weak)); } +void NodeStreamLoader::NotifyEnd() { + destroyed_ = true; + NotifyComplete(net::OK); +} + +void NodeStreamLoader::NotifyError() { + destroyed_ = true; + NotifyComplete(net::ERR_FAILED); +} + void NodeStreamLoader::NotifyReadable() { if (!readable_) ReadMore(); @@ -81,7 +89,7 @@ void NodeStreamLoader::NotifyReadable() { void NodeStreamLoader::NotifyComplete(int result) { // Wait until write finishes or fails. if (is_reading_ || is_writing_) { - ended_ = true; + pending_result_ = true; result_ = result; return; } @@ -121,7 +129,7 @@ void NodeStreamLoader::ReadMore() { } readable_ = false; - if (ended_) { + if (pending_result_) { NotifyComplete(result_); } return; @@ -146,7 +154,7 @@ void NodeStreamLoader::ReadMore() { void NodeStreamLoader::DidWrite(MojoResult result) { is_writing_ = false; // We were told to end streaming. - if (ended_) { + if (pending_result_) { NotifyComplete(result_); return; } diff --git a/shell/browser/net/node_stream_loader.h b/shell/browser/net/node_stream_loader.h index 5ca1e9b29993..a1e5a713657c 100644 --- a/shell/browser/net/node_stream_loader.h +++ b/shell/browser/net/node_stream_loader.h @@ -47,6 +47,8 @@ class NodeStreamLoader : public network::mojom::URLLoader { using EventCallback = base::RepeatingCallback; void Start(network::mojom::URLResponseHeadPtr head); + void NotifyEnd(); + void NotifyError(); void NotifyReadable(); void NotifyComplete(int result); void ReadMore(); @@ -86,9 +88,13 @@ class NodeStreamLoader : public network::mojom::URLLoader { // When NotifyComplete is called while writing, we will save the result and // quit with it after the write is done. - bool ended_ = false; + bool pending_result_ = false; int result_ = net::OK; + // Set to `true` when we get either `end` or `error` event on the stream. + // If `false` - we call `stream.destroy()` to finalize the stream. + bool destroyed_ = false; + // When the stream emits the readable event, we only want to start reading // data if the stream was not readable before, so we store the state in a // flag. diff --git a/spec/api-protocol-spec.ts b/spec/api-protocol-spec.ts index 3ba77351d839..ab33252c3337 100644 --- a/spec/api-protocol-spec.ts +++ b/spec/api-protocol-spec.ts @@ -1194,6 +1194,30 @@ describe('protocol module', () => { expect(body).to.equal(text); }); + it('calls destroy on aborted body stream', async () => { + const abortController = new AbortController(); + + class TestStream extends stream.Readable { + _read () { + this.push('infinite data'); + + // Abort the request that reads from this stream. + abortController.abort(); + } + }; + const body = new TestStream(); + protocol.handle('test-scheme', () => { + return new Response(stream.Readable.toWeb(body) as ReadableStream); + }); + defer(() => { protocol.unhandle('test-scheme'); }); + + const res = net.fetch('test-scheme://foo', { + signal: abortController.signal + }); + await expect(res).to.be.rejectedWith('This operation was aborted'); + await expect(once(body, 'end')).to.be.rejectedWith('The operation was aborted'); + }); + it('accepts urls with no hostname in non-standard schemes', async () => { protocol.handle('test-scheme', (req) => new Response(req.url)); defer(() => { protocol.unhandle('test-scheme'); });