292dd765b2
`.destroy()` is an important method in the lifecycle of a Node.js Readable stream. It is typically called to reclaim the resources (e.g., close file descriptor). The only situations where calling it manually isn't necessary are when the following events are emitted first: - `end`: natural end of a stream - `error`: stream terminated due to a failure Prior to this commit the ended state was incorrectly tracked together with a pending internal error. It led to situations where the request could get aborted during a read and then get marked as ended (having pending error). With this change we disentangle pending "error" and "destroyed" cases to always properly terminate an active Node.js Readable stream. Co-authored-by: trop[bot] <37223003+trop[bot]@users.noreply.github.com> Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
183 lines
5.6 KiB
C++
183 lines
5.6 KiB
C++
// Copyright (c) 2019 GitHub, Inc.
|
|
// Use of this source code is governed by the MIT license that can be
|
|
// found in the LICENSE file.
|
|
|
|
#include "shell/browser/net/node_stream_loader.h"
|
|
|
|
#include <string_view>
|
|
#include <utility>
|
|
|
|
#include "mojo/public/cpp/system/string_data_source.h"
|
|
#include "shell/common/gin_converters/callback_converter.h"
|
|
#include "shell/common/node_includes.h"
|
|
|
|
namespace electron {
|
|
|
|
NodeStreamLoader::NodeStreamLoader(
|
|
network::mojom::URLResponseHeadPtr head,
|
|
mojo::PendingReceiver<network::mojom::URLLoader> loader,
|
|
mojo::PendingRemote<network::mojom::URLLoaderClient> client,
|
|
v8::Isolate* isolate,
|
|
v8::Local<v8::Object> emitter)
|
|
: url_loader_(this, std::move(loader)),
|
|
client_(std::move(client)),
|
|
isolate_(isolate),
|
|
emitter_(isolate, emitter) {
|
|
url_loader_.set_disconnect_handler(
|
|
base::BindOnce(&NodeStreamLoader::NotifyComplete,
|
|
weak_factory_.GetWeakPtr(), net::ERR_FAILED));
|
|
|
|
Start(std::move(head));
|
|
}
|
|
|
|
NodeStreamLoader::~NodeStreamLoader() {
|
|
v8::Isolate::Scope isolate_scope(isolate_);
|
|
v8::HandleScope handle_scope(isolate_);
|
|
|
|
// Unsubscribe all handlers.
|
|
for (const auto& it : handlers_) {
|
|
v8::Local<v8::Value> args[] = {gin::StringToV8(isolate_, it.first),
|
|
it.second.Get(isolate_)};
|
|
node::MakeCallback(isolate_, emitter_.Get(isolate_), "removeListener",
|
|
node::arraysize(args), args, {0, 0});
|
|
}
|
|
|
|
// Destroy the stream if not already ended
|
|
if (!destroyed_) {
|
|
node::MakeCallback(isolate_, emitter_.Get(isolate_), "destroy", 0, nullptr,
|
|
{0, 0});
|
|
}
|
|
}
|
|
|
|
void NodeStreamLoader::Start(network::mojom::URLResponseHeadPtr head) {
|
|
mojo::ScopedDataPipeProducerHandle producer;
|
|
mojo::ScopedDataPipeConsumerHandle consumer;
|
|
MojoResult rv = mojo::CreateDataPipe(nullptr, producer, consumer);
|
|
if (rv != MOJO_RESULT_OK) {
|
|
NotifyComplete(net::ERR_INSUFFICIENT_RESOURCES);
|
|
return;
|
|
}
|
|
|
|
producer_ = std::make_unique<mojo::DataPipeProducer>(std::move(producer));
|
|
client_->OnReceiveResponse(std::move(head), std::move(consumer),
|
|
std::nullopt);
|
|
|
|
auto weak = weak_factory_.GetWeakPtr();
|
|
On("end", base::BindRepeating(&NodeStreamLoader::NotifyEnd, weak));
|
|
On("error", base::BindRepeating(&NodeStreamLoader::NotifyError, weak));
|
|
On("readable", base::BindRepeating(&NodeStreamLoader::NotifyReadable, weak));
|
|
}
|
|
|
|
void NodeStreamLoader::NotifyEnd() {
|
|
destroyed_ = true;
|
|
NotifyComplete(net::OK);
|
|
}
|
|
|
|
void NodeStreamLoader::NotifyError() {
|
|
destroyed_ = true;
|
|
NotifyComplete(net::ERR_FAILED);
|
|
}
|
|
|
|
void NodeStreamLoader::NotifyReadable() {
|
|
if (!readable_)
|
|
ReadMore();
|
|
else if (is_reading_)
|
|
has_read_waiting_ = true;
|
|
readable_ = true;
|
|
}
|
|
|
|
void NodeStreamLoader::NotifyComplete(int result) {
|
|
// Wait until write finishes or fails.
|
|
if (is_reading_ || is_writing_) {
|
|
pending_result_ = true;
|
|
result_ = result;
|
|
return;
|
|
}
|
|
|
|
network::URLLoaderCompletionStatus status(result);
|
|
status.completion_time = base::TimeTicks::Now();
|
|
status.decoded_body_length = bytes_written_;
|
|
client_->OnComplete(status);
|
|
delete this;
|
|
}
|
|
|
|
void NodeStreamLoader::ReadMore() {
|
|
if (is_reading_) {
|
|
// Calling read() can trigger the "readable" event again, making this
|
|
// function re-entrant. If we're already reading, we don't want to start
|
|
// a nested read, so short-circuit.
|
|
return;
|
|
}
|
|
is_reading_ = true;
|
|
auto weak = weak_factory_.GetWeakPtr();
|
|
v8::HandleScope scope(isolate_);
|
|
// buffer = emitter.read()
|
|
v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
|
|
isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});
|
|
DCHECK(weak) << "We shouldn't have been destroyed when calling read()";
|
|
|
|
// If there is no buffer read, wait until |readable| is emitted again.
|
|
v8::Local<v8::Value> buffer;
|
|
if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer)) {
|
|
is_reading_ = false;
|
|
|
|
// If 'readable' was called after 'read()', try again
|
|
if (has_read_waiting_) {
|
|
has_read_waiting_ = false;
|
|
ReadMore();
|
|
return;
|
|
}
|
|
|
|
readable_ = false;
|
|
if (pending_result_) {
|
|
NotifyComplete(result_);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Hold the buffer until the write is done.
|
|
buffer_.Reset(isolate_, buffer);
|
|
|
|
bytes_written_ += node::Buffer::Length(buffer);
|
|
|
|
// Write buffer to mojo pipe asynchronously.
|
|
is_reading_ = false;
|
|
is_writing_ = true;
|
|
producer_->Write(std::make_unique<mojo::StringDataSource>(
|
|
std::string_view{node::Buffer::Data(buffer),
|
|
node::Buffer::Length(buffer)},
|
|
mojo::StringDataSource::AsyncWritingMode::
|
|
STRING_STAYS_VALID_UNTIL_COMPLETION),
|
|
base::BindOnce(&NodeStreamLoader::DidWrite, weak));
|
|
}
|
|
|
|
void NodeStreamLoader::DidWrite(MojoResult result) {
|
|
is_writing_ = false;
|
|
// We were told to end streaming.
|
|
if (pending_result_) {
|
|
NotifyComplete(result_);
|
|
return;
|
|
}
|
|
|
|
if (result == MOJO_RESULT_OK && readable_)
|
|
ReadMore();
|
|
else
|
|
NotifyComplete(net::ERR_FAILED);
|
|
}
|
|
|
|
void NodeStreamLoader::On(const char* event, EventCallback callback) {
|
|
v8::Isolate::Scope isolate_scope(isolate_);
|
|
v8::HandleScope handle_scope(isolate_);
|
|
|
|
// emitter.on(event, callback)
|
|
v8::Local<v8::Value> args[] = {
|
|
gin::StringToV8(isolate_, event),
|
|
gin_helper::CallbackToV8Leaked(isolate_, std::move(callback)),
|
|
};
|
|
handlers_[event].Reset(isolate_, args[1]);
|
|
node::MakeCallback(isolate_, emitter_.Get(isolate_), "on",
|
|
node::arraysize(args), args, {0, 0});
|
|
// No more code below, as this class may destruct when subscribing.
|
|
}
|
|
|
|
} // namespace electron
|