diff --git a/atom/browser/api/atom_api_protocol.cc b/atom/browser/api/atom_api_protocol.cc index 4ff8c5c6bab4..6ea1d8b75775 100644 --- a/atom/browser/api/atom_api_protocol.cc +++ b/atom/browser/api/atom_api_protocol.cc @@ -10,6 +10,7 @@ #include "atom/browser/net/url_request_async_asar_job.h" #include "atom/browser/net/url_request_buffer_job.h" #include "atom/browser/net/url_request_fetch_job.h" +#include "atom/browser/net/url_request_stream_job.h" #include "atom/browser/net/url_request_string_job.h" #include "atom/common/native_mate_converters/callback.h" #include "atom/common/native_mate_converters/value_converter.h" @@ -208,6 +209,8 @@ void Protocol::BuildPrototype( &Protocol::RegisterProtocol) .SetMethod("registerHttpProtocol", &Protocol::RegisterProtocol) + .SetMethod("registerStreamProtocol", + &Protocol::RegisterProtocol) .SetMethod("unregisterProtocol", &Protocol::UnregisterProtocol) .SetMethod("isProtocolHandled", &Protocol::IsProtocolHandled) .SetMethod("interceptStringProtocol", @@ -218,6 +221,8 @@ void Protocol::BuildPrototype( &Protocol::InterceptProtocol) .SetMethod("interceptHttpProtocol", &Protocol::InterceptProtocol) + .SetMethod("interceptStreamProtocol", + &Protocol::InterceptProtocol) .SetMethod("uninterceptProtocol", &Protocol::UninterceptProtocol); } diff --git a/atom/browser/net/url_request_stream_job.cc b/atom/browser/net/url_request_stream_job.cc new file mode 100644 index 000000000000..99e4e74cd776 --- /dev/null +++ b/atom/browser/net/url_request_stream_job.cc @@ -0,0 +1,204 @@ +// Copyright (c) 2017 GitHub, Inc. +// Use of this source code is governed by the MIT license that can be +// found in the LICENSE file. + +#include +#include +#include + +#include "atom/browser/net/url_request_stream_job.h" +#include "atom/common/api/event_emitter_caller.h" +#include "atom/common/atom_constants.h" +#include "atom/common/native_mate_converters/net_converter.h" +#include "atom/common/node_includes.h" +#include "base/strings/string_number_conversions.h" +#include "base/strings/string_util.h" +#include "base/time/time.h" +#include "net/filter/gzip_source_stream.h" + +namespace atom { + +URLRequestStreamJob::URLRequestStreamJob(net::URLRequest* request, + net::NetworkDelegate* network_delegate) + : JsAsker(request, network_delegate), + ended_(false), + errored_(false), + pending_io_buf_(nullptr), + pending_io_buf_size_(0), + response_headers_(nullptr), + weak_factory_(this) {} + +void URLRequestStreamJob::BeforeStartInUI(v8::Isolate* isolate, + v8::Local value) { + if (value->IsNull() || value->IsUndefined() || !value->IsObject()) { + // Invalid opts. + ended_ = true; + errored_ = true; + return; + } + + mate::Dictionary opts(isolate, v8::Local::Cast(value)); + int status_code; + if (!opts.Get("statusCode", &status_code)) { + // assume HTTP OK if statusCode is not passed. + status_code = 200; + } + std::string status("HTTP/1.1 "); + status.append(base::IntToString(status_code)); + status.append(" "); + status.append( + net::GetHttpReasonPhrase(static_cast(status_code))); + status.append("\0\0", 2); + response_headers_ = new net::HttpResponseHeaders(status); + + if (opts.Get("headers", &value)) { + mate::Converter::FromV8(isolate, value, + response_headers_.get()); + } + + if (!opts.Get("data", &value)) { + // Assume the opts is already a stream + value = opts.GetHandle(); + } else if (value->IsNullOrUndefined()) { + // "data" was explicitly passed as null or undefined, assume the user wants + // to send an empty body. + ended_ = true; + return; + } + + mate::Dictionary data(isolate, v8::Local::Cast(value)); + if (!data.Get("on", &value) || !value->IsFunction() || + !data.Get("removeListener", &value) || !value->IsFunction()) { + // If data is passed but it is not a stream, signal an error. + ended_ = true; + errored_ = true; + return; + } + + subscriber_.reset(new mate::EventSubscriber( + this, isolate, data.GetHandle())); + subscriber_->On("data", &URLRequestStreamJob::OnData); + subscriber_->On("end", &URLRequestStreamJob::OnEnd); + subscriber_->On("error", &URLRequestStreamJob::OnError); +} + +void URLRequestStreamJob::StartAsync(std::unique_ptr options) { + NotifyHeadersComplete(); +} + +void URLRequestStreamJob::OnData(mate::Arguments* args) { + v8::Local node_data; + args->GetNext(&node_data); + if (node_data->IsUint8Array()) { + const char* data = node::Buffer::Data(node_data); + size_t data_size = node::Buffer::Length(node_data); + std::copy(data, data + data_size, std::back_inserter(buffer_)); + } else { + NOTREACHED(); + } + if (pending_io_buf_) { + CopyMoreData(pending_io_buf_, pending_io_buf_size_); + } +} + +void URLRequestStreamJob::OnEnd(mate::Arguments* args) { + ended_ = true; + if (pending_io_buf_) { + CopyMoreData(pending_io_buf_, pending_io_buf_size_); + } +} + +void URLRequestStreamJob::OnError(mate::Arguments* args) { + errored_ = true; + if (pending_io_buf_) { + CopyMoreData(pending_io_buf_, pending_io_buf_size_); + } +} + +int URLRequestStreamJob::ReadRawData(net::IOBuffer* dest, int dest_size) { + content::BrowserThread::PostTask( + content::BrowserThread::UI, FROM_HERE, + base::Bind(&URLRequestStreamJob::CopyMoreData, weak_factory_.GetWeakPtr(), + make_scoped_refptr(dest), dest_size)); + return net::ERR_IO_PENDING; +} + +void URLRequestStreamJob::DoneReading() { + subscriber_.reset(); + buffer_.clear(); + ended_ = true; +} + +void URLRequestStreamJob::DoneReadingRedirectResponse() { + DoneReading(); +} + +void URLRequestStreamJob::CopyMoreDataDone(scoped_refptr io_buf, + int status) { + if (status <= 0) { + subscriber_.reset(); + } + ReadRawDataComplete(status); + io_buf = nullptr; +} + +void URLRequestStreamJob::CopyMoreData(scoped_refptr io_buf, + int io_buf_size) { + // reset any instance references to io_buf + pending_io_buf_ = nullptr; + pending_io_buf_size_ = 0; + + int read_count = 0; + if (buffer_.size()) { + size_t count = std::min((size_t)io_buf_size, buffer_.size()); + std::copy(buffer_.begin(), buffer_.begin() + count, io_buf->data()); + buffer_.erase(buffer_.begin(), buffer_.begin() + count); + read_count = count; + } else if (!ended_ && !errored_) { + // No data available yet, save references to the IOBuffer, which will be + // passed back to this function when OnData/OnEnd/OnError are called + pending_io_buf_ = io_buf; + pending_io_buf_size_ = io_buf_size; + } + + if (!pending_io_buf_) { + // Only call CopyMoreDataDone if we have read something. + int status = (errored_ && !read_count) ? net::ERR_FAILED : read_count; + content::BrowserThread::PostTask( + content::BrowserThread::IO, FROM_HERE, + base::Bind(&URLRequestStreamJob::CopyMoreDataDone, + weak_factory_.GetWeakPtr(), io_buf, status)); + } +} + +std::unique_ptr URLRequestStreamJob::SetUpSourceStream() { + std::unique_ptr source = + net::URLRequestJob::SetUpSourceStream(); + size_t i = 0; + std::string type; + while (response_headers_->EnumerateHeader(&i, "Content-Encoding", &type)) { + if (base::LowerCaseEqualsASCII(type, "gzip") || + base::LowerCaseEqualsASCII(type, "x-gzip")) { + return net::GzipSourceStream::Create(std::move(source), + net::SourceStream::TYPE_GZIP); + } else if (base::LowerCaseEqualsASCII(type, "deflate")) { + return net::GzipSourceStream::Create(std::move(source), + net::SourceStream::TYPE_DEFLATE); + } + } + return source; +} + +bool URLRequestStreamJob::GetMimeType(std::string* mime_type) const { + return response_headers_->GetMimeType(mime_type); +} + +int URLRequestStreamJob::GetResponseCode() const { + return response_headers_->response_code(); +} + +void URLRequestStreamJob::GetResponseInfo(net::HttpResponseInfo* info) { + info->headers = response_headers_; +} + +} // namespace atom diff --git a/atom/browser/net/url_request_stream_job.h b/atom/browser/net/url_request_stream_job.h new file mode 100644 index 000000000000..372cad7e2d5f --- /dev/null +++ b/atom/browser/net/url_request_stream_job.h @@ -0,0 +1,66 @@ +// Copyright (c) 2017 GitHub, Inc. +// Use of this source code is governed by the MIT license that can be +// found in the LICENSE file. + +#ifndef ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_ +#define ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_ + +#include +#include + +#include "atom/browser/api/event_subscriber.h" +#include "atom/browser/net/js_asker.h" +#include "base/memory/ref_counted_memory.h" +#include "native_mate/persistent_dictionary.h" +#include "net/base/io_buffer.h" +#include "net/http/http_status_code.h" +#include "net/url_request/url_request_context_getter.h" +#include "v8/include/v8.h" + +namespace atom { + +class URLRequestStreamJob : public JsAsker { + public: + URLRequestStreamJob(net::URLRequest* request, + net::NetworkDelegate* network_delegate); + + void OnData(mate::Arguments* args); + void OnEnd(mate::Arguments* args); + void OnError(mate::Arguments* args); + + // URLRequestJob + void GetResponseInfo(net::HttpResponseInfo* info) override; + + protected: + // URLRequestJob + int ReadRawData(net::IOBuffer* buf, int buf_size) override; + void DoneReading() override; + void DoneReadingRedirectResponse() override; + std::unique_ptr SetUpSourceStream() override; + bool GetMimeType(std::string* mime_type) const override; + int GetResponseCode() const override; + + private: + // JSAsker + void BeforeStartInUI(v8::Isolate*, v8::Local) override; + void StartAsync(std::unique_ptr options) override; + void OnResponse(bool success, std::unique_ptr value); + + // Callback after data is asynchronously read from the file into |buf|. + void CopyMoreData(scoped_refptr io_buf, int io_buf_size); + void CopyMoreDataDone(scoped_refptr io_buf, int read_count); + + std::deque buffer_; + bool ended_; + bool errored_; + scoped_refptr pending_io_buf_; + int pending_io_buf_size_; + scoped_refptr response_headers_; + mate::EventSubscriber::SafePtr subscriber_; + base::WeakPtrFactory weak_factory_; + + DISALLOW_COPY_AND_ASSIGN(URLRequestStreamJob); +}; +} // namespace atom + +#endif // ATOM_BROWSER_NET_URL_REQUEST_STREAM_JOB_H_ diff --git a/filenames.gypi b/filenames.gypi index 2f17ec1086b2..efe5c5f854d8 100644 --- a/filenames.gypi +++ b/filenames.gypi @@ -274,6 +274,8 @@ 'atom/browser/net/url_request_buffer_job.h', 'atom/browser/net/url_request_fetch_job.cc', 'atom/browser/net/url_request_fetch_job.h', + 'atom/browser/net/url_request_stream_job.cc', + 'atom/browser/net/url_request_stream_job.h', 'atom/browser/node_debugger.cc', 'atom/browser/node_debugger.h', 'atom/browser/relauncher_linux.cc',