feat: add protocol.handle (#36674)

This commit is contained in:
Jeremy Rose 2023-03-27 10:00:55 -07:00 committed by GitHub
parent 6a6908c4c8
commit fda8ea9277
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1254 additions and 89 deletions

View file

@ -15,16 +15,21 @@
#include "base/values.h"
#include "gin/converter.h"
#include "gin/dictionary.h"
#include "gin/object_template_builder.h"
#include "net/cert/x509_certificate.h"
#include "net/cert/x509_util.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_version.h"
#include "net/url_request/redirect_info.h"
#include "services/network/public/cpp/data_element.h"
#include "services/network/public/cpp/resource_request.h"
#include "services/network/public/cpp/resource_request_body.h"
#include "services/network/public/mojom/chunked_data_pipe_getter.mojom.h"
#include "shell/browser/api/electron_api_data_pipe_holder.h"
#include "shell/common/gin_converters/gurl_converter.h"
#include "shell/common/gin_converters/std_converter.h"
#include "shell/common/gin_converters/value_converter.h"
#include "shell/common/gin_helper/promise.h"
#include "shell/common/node_includes.h"
namespace gin {
@ -246,6 +251,246 @@ bool Converter<net::HttpRequestHeaders>::FromV8(v8::Isolate* isolate,
return true;
}
class ChunkedDataPipeReadableStream
: public gin::Wrappable<ChunkedDataPipeReadableStream> {
public:
static gin::Handle<ChunkedDataPipeReadableStream> Create(
v8::Isolate* isolate,
network::ResourceRequestBody* request,
network::DataElementChunkedDataPipe* data_element) {
return gin::CreateHandle(isolate, new ChunkedDataPipeReadableStream(
isolate, request, data_element));
}
// gin::Wrappable
gin::ObjectTemplateBuilder GetObjectTemplateBuilder(
v8::Isolate* isolate) override {
return gin::Wrappable<
ChunkedDataPipeReadableStream>::GetObjectTemplateBuilder(isolate)
.SetMethod("read", &ChunkedDataPipeReadableStream::Read);
}
static gin::WrapperInfo kWrapperInfo;
private:
ChunkedDataPipeReadableStream(
v8::Isolate* isolate,
network::ResourceRequestBody* request,
network::DataElementChunkedDataPipe* data_element)
: isolate_(isolate),
resource_request_body_(request),
data_element_(data_element),
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
base::SequencedTaskRunner::GetCurrentDefault()) {}
~ChunkedDataPipeReadableStream() override = default;
int Init() {
chunked_data_pipe_getter_.Bind(
data_element_->ReleaseChunkedDataPipeGetter());
for (auto& element : *resource_request_body_->elements_mutable()) {
if (element.type() ==
network::mojom::DataElement::Tag::kChunkedDataPipe &&
data_element_ == &element.As<network::DataElementChunkedDataPipe>()) {
element = network::DataElement(
network::DataElementBytes(std::vector<uint8_t>()));
break;
}
}
chunked_data_pipe_getter_.set_disconnect_handler(
base::BindOnce(&ChunkedDataPipeReadableStream::OnDataPipeGetterClosed,
base::Unretained(this)));
chunked_data_pipe_getter_->GetSize(
base::BindOnce(&ChunkedDataPipeReadableStream::OnSizeReceived,
base::Unretained(this)));
mojo::ScopedDataPipeProducerHandle data_pipe_producer;
mojo::ScopedDataPipeConsumerHandle data_pipe_consumer;
MojoResult result =
mojo::CreateDataPipe(nullptr, data_pipe_producer, data_pipe_consumer);
if (result != MOJO_RESULT_OK)
return net::ERR_INSUFFICIENT_RESOURCES;
chunked_data_pipe_getter_->StartReading(std::move(data_pipe_producer));
data_pipe_ = std::move(data_pipe_consumer);
return net::OK;
}
v8::Local<v8::Promise> Read(v8::Local<v8::ArrayBufferView> buf) {
gin_helper::Promise<int> promise(isolate_);
v8::Local<v8::Promise> handle = promise.GetHandle();
int status = ReadInternal(buf);
if (status == net::ERR_IO_PENDING) {
promise_ = std::move(promise);
} else {
if (status < 0)
std::move(promise).RejectWithErrorMessage(net::ErrorToString(status));
else
std::move(promise).Resolve(status);
}
return handle;
}
int ReadInternal(v8::Local<v8::ArrayBufferView> buf) {
if (!data_pipe_)
status_ = Init();
// If there was an error either passed to the ReadCallback or as a result of
// closing the DataPipeGetter pipe, fail the read.
if (status_ != net::OK)
return status_;
// Nothing else to do, if the entire body was read.
if (size_ && bytes_read_ == *size_) {
// This shouldn't be called if the stream was already completed.
DCHECK(!is_eof_);
is_eof_ = true;
return net::OK;
}
if (!handle_watcher_.IsWatching()) {
handle_watcher_.Watch(
data_pipe_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
base::BindRepeating(&ChunkedDataPipeReadableStream::OnHandleReadable,
base::Unretained(this)));
}
uint32_t num_bytes = buf->ByteLength();
if (size_ && num_bytes > *size_ - bytes_read_)
num_bytes = *size_ - bytes_read_;
MojoResult rv = data_pipe_->ReadData(
static_cast<void*>(static_cast<char*>(buf->Buffer()->Data()) +
buf->ByteOffset()),
&num_bytes, MOJO_READ_DATA_FLAG_NONE);
if (rv == MOJO_RESULT_OK) {
bytes_read_ += num_bytes;
// Not needed for correctness, but this allows the consumer to send the
// final chunk and the end of stream message together, for protocols that
// allow it.
if (size_ && *size_ == bytes_read_)
is_eof_ = true;
return num_bytes;
}
if (rv == MOJO_RESULT_SHOULD_WAIT) {
handle_watcher_.ArmOrNotify();
buf_.Reset(isolate_, buf);
return net::ERR_IO_PENDING;
}
// The pipe was closed. If the size isn't known yet, could be a success or a
// failure.
if (!size_) {
// Need to keep the buffer around because its presence is used to indicate
// that there's a pending UploadDataStream read.
buf_.Reset(isolate_, buf);
handle_watcher_.Cancel();
data_pipe_.reset();
return net::ERR_IO_PENDING;
}
// |size_| was checked earlier, so if this point is reached, the pipe was
// closed before receiving all bytes.
DCHECK_LT(bytes_read_, *size_);
return net::ERR_FAILED;
}
void OnSizeReceived(int32_t status, uint64_t size) {
DCHECK(!size_);
DCHECK_EQ(net::OK, status_);
status_ = status;
if (status == net::OK) {
size_ = size;
if (size == bytes_read_) {
// Only set this as a final chunk if there's a read in progress. Setting
// it asynchronously could result in confusing consumers.
if (!buf_.IsEmpty())
is_eof_ = true;
} else if (size < bytes_read_ ||
(!buf_.IsEmpty() && !data_pipe_.is_valid())) {
// If more data was received than was expected, or there's a pending
// read and data pipe was closed without passing in as many bytes as
// expected, the upload can't continue. If there's no pending read but
// the pipe was closed, the closure and size difference will be noticed
// on the next read attempt.
status_ = net::ERR_FAILED;
}
}
// If this is done, and there's a pending read, complete the pending read.
// If there's not a pending read, either |status_| will be reported on the
// next read, the file will be marked as done, so ReadInternal() won't be
// called again.
if (!buf_.IsEmpty() && (is_eof_ || status_ != net::OK)) {
// |data_pipe_| isn't needed any more, and if it's still open, a close
// pipe message would cause issues, since this class normally only watches
// the pipe when there's a pending read.
handle_watcher_.Cancel();
data_pipe_.reset();
// Clear |buf_| as well, so it's only non-null while there's a pending
// read.
buf_.Reset();
chunked_data_pipe_getter_.reset();
OnReadCompleted(status_);
// |this| may have been deleted at this point.
}
}
void OnHandleReadable(MojoResult result) {
DCHECK(!buf_.IsEmpty());
v8::HandleScope handle_scope(isolate_);
v8::Local<v8::ArrayBufferView> buf = buf_.Get(isolate_);
buf_.Reset();
int rv = ReadInternal(buf);
if (rv != net::ERR_IO_PENDING)
OnReadCompleted(rv);
// |this| may have been deleted at this point.
}
void OnReadCompleted(int result) {
if (result < 0)
std::move(promise_).RejectWithErrorMessage(net::ErrorToString(result));
else
std::move(promise_).Resolve(result);
}
void OnDataPipeGetterClosed() {
// If the size hasn't been received yet, treat this as receiving an error.
// Otherwise, this will only be a problem if/when InitInternal() tries to
// start reading again, so do nothing.
if (status_ == net::OK && !size_)
OnSizeReceived(net::ERR_FAILED, 0);
}
v8::Isolate* isolate_;
int status_ = net::OK;
scoped_refptr<network::ResourceRequestBody> resource_request_body_;
network::DataElementChunkedDataPipe* data_element_;
mojo::Remote<network::mojom::ChunkedDataPipeGetter> chunked_data_pipe_getter_;
mojo::ScopedDataPipeConsumerHandle data_pipe_;
mojo::SimpleWatcher handle_watcher_;
absl::optional<uint64_t> size_;
uint64_t bytes_read_ = 0;
bool is_eof_ = false;
v8::Global<v8::ArrayBufferView> buf_;
gin_helper::Promise<int> promise_;
};
gin::WrapperInfo ChunkedDataPipeReadableStream::kWrapperInfo = {
gin::kEmbedderNativeGin};
// static
v8::Local<v8::Value> Converter<network::ResourceRequestBody>::ToV8(
v8::Isolate* isolate,
@ -288,6 +533,21 @@ v8::Local<v8::Value> Converter<network::ResourceRequestBody>::ToV8(
upload_data.Set("dataPipe", holder);
break;
}
case network::mojom::DataElement::Tag::kChunkedDataPipe: {
upload_data.Set("type", "stream");
// ReleaseChunkedDataPipeGetter mutates the element, but unfortunately
// gin converters are only allowed const references, so we need to cast
// off the const here.
auto& mutable_element =
const_cast<network::DataElementChunkedDataPipe&>(
element.As<network::DataElementChunkedDataPipe>());
upload_data.Set(
"body",
ChunkedDataPipeReadableStream::Create(
isolate, const_cast<network::ResourceRequestBody*>(&val),
&mutable_element));
break;
}
default:
NOTREACHED() << "Found unsupported data element";
}

View file

@ -17,6 +17,8 @@ PromiseBase::PromiseBase(v8::Isolate* isolate,
context_(isolate, isolate->GetCurrentContext()),
resolver_(isolate, handle) {}
PromiseBase::PromiseBase() : isolate_(nullptr) {}
PromiseBase::PromiseBase(PromiseBase&&) = default;
PromiseBase::~PromiseBase() = default;

View file

@ -30,6 +30,7 @@ class PromiseBase {
public:
explicit PromiseBase(v8::Isolate* isolate);
PromiseBase(v8::Isolate* isolate, v8::Local<v8::Promise::Resolver> handle);
PromiseBase();
~PromiseBase();
// disable copy