Implementing error, close, finish, abort events management.

This commit is contained in:
ali.ibrahim 2016-10-04 17:12:17 +02:00
parent bde30b90e8
commit ec1fc5a17b
7 changed files with 391 additions and 117 deletions

View file

@ -29,7 +29,9 @@ void Net::BuildPrototype(v8::Isolate* isolate,
v8::Local<v8::FunctionTemplate> prototype) {
prototype->SetClassName(mate::StringToV8(isolate, "Net"));
mate::ObjectTemplateBuilder(isolate, prototype->PrototypeTemplate())
.SetProperty("URLRequest", &Net::URLRequest);
.SetProperty("URLRequest", &Net::URLRequest)
.SetMethod("RequestGarbageCollectionForTesting",
&Net::RequestGarbageCollectionForTesting);
}
v8::Local<v8::Value> Net::URLRequest(v8::Isolate* isolate) {
@ -37,6 +39,11 @@ v8::Local<v8::Value> Net::URLRequest(v8::Isolate* isolate) {
}
void Net::RequestGarbageCollectionForTesting() {
isolate()->RequestGarbageCollectionForTesting(
v8::Isolate::GarbageCollectionType::kFullGarbageCollection);
}
} // namespace api

View file

@ -24,6 +24,7 @@ class Net : public mate::EventEmitter<Net> {
~Net() override;
private:
void RequestGarbageCollectionForTesting();
DISALLOW_COPY_AND_ASSIGN(Net);
};

View file

@ -83,6 +83,84 @@ struct Converter<scoped_refptr<const net::IOBufferWithSize>> {
namespace atom {
namespace api {
template <typename Flags>
URLRequest::StateBase<Flags>::StateBase(Flags initialState)
: state_(initialState) {
}
template <typename Flags>
void URLRequest::StateBase<Flags>::SetFlag(Flags flag) {
state_ = static_cast<Flags>(static_cast<int>(state_) &
static_cast<int>(flag));
}
template <typename Flags>
bool URLRequest::StateBase<Flags>::operator==(Flags flag) const {
return state_ == flag;
}
template <typename Flags>
bool URLRequest::StateBase<Flags>::IsFlagSet(Flags flag) const {
return static_cast<int>(state_) & static_cast<int>(flag);
}
URLRequest::RequestState::RequestState()
: StateBase(RequestStateFlags::kNotStarted) {
}
bool URLRequest::RequestState::NotStarted() const {
return *this == RequestStateFlags::kNotStarted;
}
bool URLRequest::RequestState::Started() const {
return IsFlagSet(RequestStateFlags::kStarted);
}
bool URLRequest::RequestState::Finished() const {
return IsFlagSet(RequestStateFlags::kFinished);
}
bool URLRequest::RequestState::Canceled() const {
return IsFlagSet(RequestStateFlags::kCanceled);
}
bool URLRequest::RequestState::Failed() const {
return IsFlagSet(RequestStateFlags::kFailed);
}
bool URLRequest::RequestState::Closed() const {
return IsFlagSet(RequestStateFlags::kClosed);
}
URLRequest::ResponseState::ResponseState()
: StateBase(ResponseStateFlags::kNotStarted) {
}
bool URLRequest::ResponseState::NotStarted() const {
return *this == ResponseStateFlags::kNotStarted;
}
bool URLRequest::ResponseState::Started() const {
return IsFlagSet(ResponseStateFlags::kStarted);
}
bool URLRequest::ResponseState::Ended() const {
return IsFlagSet(ResponseStateFlags::kEnded);
}
bool URLRequest::ResponseState::Canceled() const {
return IsFlagSet(ResponseStateFlags::kCanceled);
}
bool URLRequest::ResponseState::Failed() const {
return IsFlagSet(ResponseStateFlags::kFailed);
}
bool URLRequest::ResponseState::Closed() const {
return IsFlagSet(ResponseStateFlags::kClosed);
}
URLRequest::URLRequest(v8::Isolate* isolate, v8::Local<v8::Object> wrapper)
: weak_ptr_factory_(this) {
InitWith(isolate, wrapper);
@ -127,10 +205,12 @@ void URLRequest::BuildPrototype(v8::Isolate* isolate,
// Request API
.MakeDestroyable()
.SetMethod("write", &URLRequest::Write)
.SetMethod("abort", &URLRequest::Abort)
.SetMethod("cancel", &URLRequest::Cancel)
.SetMethod("setExtraHeader", &URLRequest::SetExtraHeader)
.SetMethod("removeExtraHeader", &URLRequest::RemoveExtraHeader)
.SetMethod("setChunkedUpload", &URLRequest::SetChunkedUpload)
.SetProperty("notStarted", &URLRequest::NotStarted)
.SetProperty("finished", &URLRequest::Finished)
// Response APi
.SetProperty("statusCode", &URLRequest::StatusCode)
.SetProperty("statusMessage", &URLRequest::StatusMessage)
@ -139,19 +219,81 @@ void URLRequest::BuildPrototype(v8::Isolate* isolate,
.SetProperty("httpVersionMinor", &URLRequest::ResponseHttpVersionMinor);
}
bool URLRequest::NotStarted() const {
return request_state_.NotStarted();
}
bool URLRequest::Finished() const {
return request_state_.Finished();
}
bool URLRequest::Canceled() const {
return request_state_.Canceled();
}
bool URLRequest::Write(
scoped_refptr<const net::IOBufferWithSize> buffer,
bool is_last) {
if (request_state_.Canceled() ||
request_state_.Failed() ||
request_state_.Finished() ||
request_state_.Closed()) {
return false;
}
if (request_state_.NotStarted()) {
request_state_.SetFlag(RequestStateFlags::kStarted);
// Pin on first write.
pin();
}
if (is_last) {
request_state_.SetFlag(RequestStateFlags::kFinished);
EmitRequestEvent(true, "finish");
}
DCHECK(atom_request_);
if (atom_request_) {
return atom_request_->Write(buffer, is_last);
}
return false;
}
void URLRequest::Abort() {
atom_request_->Abort();
void URLRequest::Cancel() {
if (request_state_.Canceled()) {
// Cancel only once.
return;
}
// Mark as canceled.
request_state_.SetFlag(RequestStateFlags::kCanceled);
if (request_state_.Started()) {
// Really cancel if it was started.
atom_request_->Cancel();
}
if (!request_state_.Closed()) {
EmitRequestEvent(true, "abort");
}
response_state_.SetFlag(ResponseStateFlags::kCanceled);
if (response_state_.Started() && !response_state_.Closed()) {
EmitResponseEvent(true, "aborted");
}
Close();
}
bool URLRequest::SetExtraHeader(const std::string& name,
const std::string& value) {
// State must be equal to not started.
if (!request_state_.NotStarted()) {
// Cannot change headers after send.
return false;
}
if (!net::HttpUtil::IsValidHeaderName(name)) {
return false;
}
@ -165,45 +307,84 @@ bool URLRequest::SetExtraHeader(const std::string& name,
}
void URLRequest::RemoveExtraHeader(const std::string& name) {
// State must be equal to not started.
if (!request_state_.NotStarted()) {
// Cannot change headers after send.
return;
}
atom_request_->RemoveExtraHeader(name);
}
void URLRequest::SetChunkedUpload(bool is_chunked_upload) {
// State must be equal to not started.
if (!request_state_.NotStarted()) {
// Cannot change headers after send.
return;
}
atom_request_->SetChunkedUpload(is_chunked_upload);
}
void URLRequest::OnAuthenticationRequired(
scoped_refptr<const net::AuthChallengeInfo> auth_info) {
EmitRequestEvent(
false,
"login",
auth_info.get(),
base::Bind(&AtomURLRequest::PassLoginInformation, atom_request_));
}
void URLRequest::OnResponseStarted() {
if (request_state_.Canceled() ||
request_state_.Failed() ||
request_state_.Closed()) {
// Don't emit any event after request cancel.
return;
}
response_state_.SetFlag(ResponseStateFlags::kStarted);
Emit("response");
}
void URLRequest::OnResponseData(
scoped_refptr<const net::IOBufferWithSize> buffer) {
if (request_state_.Canceled()) {
// Don't emit any event after request cancel.
return;
}
if (!buffer || !buffer->data() || !buffer->size()) {
return;
}
EmitResponseEvent("data", buffer);
if (!response_state_.Closed()) {
EmitResponseEvent(false, "data", buffer);
}
}
void URLRequest::OnResponseCompleted() {
EmitResponseEvent("end");
unpin();
atom_request_ = nullptr;
response_state_.SetFlag(ResponseStateFlags::kEnded);
if (request_state_.Canceled()) {
// Don't emit any event after request cancel.
return;
}
if (!response_state_.Closed()) {
EmitResponseEvent(false, "end");
}
Close();
}
void URLRequest::OnError(const std::string& error) {
void URLRequest::OnRequestError(const std::string& error) {
request_state_.SetFlag(RequestStateFlags::kFailed);
auto error_object = v8::Exception::Error(mate::StringToV8(isolate(), error));
EmitRequestEvent("error", error_object);
EmitRequestEvent(false, "error", error_object);
Close();
}
void URLRequest::OnResponseError(const std::string& error) {
response_state_.SetFlag(ResponseStateFlags::kFailed);
auto error_object = v8::Exception::Error(mate::StringToV8(isolate(), error));
EmitResponseEvent(false, "error", error_object);
Close();
}
int URLRequest::StatusCode() const {
if (auto response_headers = atom_request_->GetResponseHeaders()) {
return response_headers->response_code();
@ -238,6 +419,22 @@ uint32_t URLRequest::ResponseHttpVersionMinor() const {
return 0;
}
void URLRequest::Close() {
if (!response_state_.Closed()) {
response_state_.SetFlag(ResponseStateFlags::kClosed);
if (response_state_.Started()) {
// Emit a close event if we really have a response object.
EmitResponseEvent(true, "close");
}
}
if (!request_state_.Closed()) {
request_state_.SetFlag(RequestStateFlags::kClosed);
EmitRequestEvent(true, "close");
}
unpin();
atom_request_ = nullptr;
}
void URLRequest::pin() {
if (wrapper_.IsEmpty()) {
wrapper_.Reset(isolate(), GetWrapper());

View file

@ -32,8 +32,8 @@ namespace api {
// classes (ClientRequest and IncomingMessage) in the net module provide the
// final API, including some state management and arguments validation.
//
// URLRequest's methods fall into two main categories: command and event methods.
// They are always executed on the Browser's UI thread.
// URLRequest's methods fall into two main categories: command and event
// methods. They are always executed on the Browser's UI thread.
// Command methods are called directly from JavaScript code via the API defined
// in BuildPrototype. A command method is generally implemented by forwarding
// the call to a corresponding method on AtomURLRequest which does the
@ -61,7 +61,7 @@ namespace api {
// });
// })();
//
// we still want data to be logged even if the response/request objects are no
// we still want data to be logged even if the response/request objects are n
// more referenced in JavaScript.
//
// Binding by simply following the native_mate Wrapper/Wrappable pattern will
@ -84,9 +84,9 @@ namespace api {
// Chromium reference counting. Reasoning about lifetime issues become much
// more complex.
//
// We chose to split the implementation into two classes linked via a strong/weak
// pointers. A URLRequest instance is deleted if it is unpinned and the
// corresponding JS wrapper object is garbage collected. On the other hand,
// We chose to split the implementation into two classes linked via a
// strong/weak pointers. A URLRequest instance is deleted if it is unpinned and
// the corresponding JS wrapper object is garbage collected. On the other hand,
// an AtmURLRequest instance lifetime is totally governed by reference counting.
//
class URLRequest : public mate::EventEmitter<URLRequest> {
@ -103,7 +103,8 @@ class URLRequest : public mate::EventEmitter<URLRequest> {
void OnResponseStarted();
void OnResponseData(scoped_refptr<const net::IOBufferWithSize> data);
void OnResponseCompleted();
void OnError(const std::string& error);
void OnRequestError(const std::string& error);
void OnResponseError(const std::string& error);
protected:
explicit URLRequest(v8::Isolate* isolate,
@ -111,13 +112,70 @@ class URLRequest : public mate::EventEmitter<URLRequest> {
~URLRequest() override;
private:
template <typename Flags>
class StateBase {
public:
void SetFlag(Flags flag);
protected:
explicit StateBase(Flags initialState);
bool operator==(Flags flag) const;
bool IsFlagSet(Flags flag) const;
private:
Flags state_;
};
enum class RequestStateFlags {
kNotStarted = 0x0,
kStarted = 0x1,
kFinished = 0x2,
kCanceled = 0x4,
kFailed = 0x8,
kClosed = 0x10
};
class RequestState : public StateBase<RequestStateFlags> {
public:
RequestState();
bool NotStarted() const;
bool Started() const;
bool Finished() const;
bool Canceled() const;
bool Failed() const;
bool Closed() const;
};
enum class ResponseStateFlags {
kNotStarted = 0x0,
kStarted = 0x1,
kEnded = 0x2,
kCanceled = 0x4,
kFailed = 0x8,
kClosed = 0x10
};
class ResponseState : public StateBase<ResponseStateFlags> {
public:
ResponseState();
bool NotStarted() const;
bool Started() const;
bool Ended() const;
bool Canceled() const;
bool Failed() const;
bool Closed() const;
};
bool NotStarted() const;
bool Finished() const;
bool Canceled() const;
bool Failed() const;
bool Write(scoped_refptr<const net::IOBufferWithSize> buffer,
bool is_last);
void Abort();
void Cancel();
bool SetExtraHeader(const std::string& name, const std::string& value);
void RemoveExtraHeader(const std::string& name);
void SetChunkedUpload(bool is_chunked_upload);
bool CanReadHeaders() const;
int StatusCode() const;
std::string StatusMessage() const;
scoped_refptr<const net::HttpResponseHeaders> RawResponseHeaders() const;
@ -134,16 +192,20 @@ class URLRequest : public mate::EventEmitter<URLRequest> {
template <typename ... ArgTypes>
void EmitResponseEvent(ArgTypes... args);
void Close();
void pin();
void unpin();
scoped_refptr<AtomURLRequest> atom_request_;
RequestState request_state_;
ResponseState response_state_;
// Used to implement pin/unpin.
v8::Global<v8::Object> wrapper_;
base::WeakPtrFactory<URLRequest> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(URLRequest);
};

View file

@ -106,12 +106,12 @@ void AtomURLRequest::SetChunkedUpload(bool is_chunked_upload) {
}
void AtomURLRequest::Abort() const {
void AtomURLRequest::Cancel() const {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
content::BrowserThread::PostTask(
content::BrowserThread::IO,
FROM_HERE,
base::Bind(&AtomURLRequest::DoAbort, this));
base::Bind(&AtomURLRequest::DoCancel, this));
}
void AtomURLRequest::SetExtraHeader(const std::string& name,
@ -202,7 +202,7 @@ void AtomURLRequest::DoWriteBuffer(
}
}
void AtomURLRequest::DoAbort() const {
void AtomURLRequest::DoCancel() const {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
request_->Cancel();
}
@ -235,39 +235,33 @@ void AtomURLRequest::OnResponseStarted(net::URLRequest* request) {
const auto& status = request_->status();
if (status.is_success()) {
// Cache net::HttpResponseHeaders instance, a read-only objects
// so that headers and other http metainformation can be simultaneously
// read from UI thread while request data is simulataneously streaming
// on IO thread.
response_headers_ = request_->response_headers();
// Success or pending trigger a Read.
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateResponseStarted, this));
ReadResponse();
} else {
} else if (status.status() == net::URLRequestStatus::Status::FAILED) {
// Report error on Start.
DoCancel();
auto error = net::ErrorToString(status.ToNetError());
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateErrorOccured,
base::Bind(&AtomURLRequest::InformDelegateRequestErrorOccured,
this,
std::move(error)));
}
// We don't report an error is the request is canceled.
}
void AtomURLRequest::ReadResponse() {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
// Some servers may treat HEAD requests as GET requests. To free up the
// network connection as soon as possible, signal that the request has
// completed immediately, without trying to read any data back (all we care
// about is the response code and headers, which we already have).
int bytes_read = 0;
if (request_->status().is_success())
if (!request_->Read(response_read_buffer_.get(), kBufferSize, &bytes_read))
bytes_read = -1;
int bytes_read = -1;
if (request_->Read(response_read_buffer_.get(), kBufferSize, &bytes_read)) {
OnReadCompleted(request_.get(), bytes_read);
}
}
void AtomURLRequest::OnReadCompleted(net::URLRequest* request,
@ -277,36 +271,53 @@ void AtomURLRequest::OnReadCompleted(net::URLRequest* request,
DCHECK_EQ(request, request_.get());
const auto status = request_->status();
bool response_error = false;
bool data_ended = false;
bool data_transfer_error = false;
do {
if (!status.is_success() || bytes_read <= 0) {
auto error = net::ErrorToString(status.ToNetError());
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateErrorOccured,
this,
std::move(error)));
if (!status.is_success()) {
response_error = true;
break;
}
if (bytes_read == 0) {
data_ended = true;
break;
}
if (bytes_read < 0 || !CopyAndPostBuffer(bytes_read)) {
data_transfer_error = true;
break;
}
const auto result = CopyAndPostBuffer(bytes_read);
if (!result)
// Failed to transfer data to UI thread.
return;
} while (request_->Read(response_read_buffer_.get(),
kBufferSize,
&bytes_read));
if (!status.is_io_pending())
if (response_error) {
DoCancel();
auto error = net::ErrorToString(status.ToNetError());
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateResponseErrorOccured,
this,
std::move(error)));
} else if (data_ended) {
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateResponseCompleted, this));
} else if (data_transfer_error) {
// We abort the request on corrupted data transfer.
DoCancel();
content::BrowserThread::PostTask(
content::BrowserThread::UI, FROM_HERE,
base::Bind(&AtomURLRequest::InformDelegateResponseErrorOccured,
this,
"Failed to transfer data from IO to UI thread."));
}
}
bool AtomURLRequest::CopyAndPostBuffer(int bytes_read) {
DCHECK_CURRENTLY_ON(content::BrowserThread::IO);
// data is only a wrapper for the async response_read_buffer_.
// data is only a wrapper for the asynchronous response_read_buffer_.
// Make a deep copy of payload and transfer ownership to the UI thread.
auto buffer_copy = new net::IOBufferWithSize(bytes_read);
memcpy(buffer_copy->data(), response_read_buffer_->data(), bytes_read);
@ -349,13 +360,20 @@ void AtomURLRequest::InformDelegateResponseCompleted() const {
delegate_->OnResponseCompleted();
}
void AtomURLRequest::InformDelegateErrorOccured(
void AtomURLRequest::InformDelegateRequestErrorOccured(
const std::string& error) const {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
if (delegate_)
delegate_->OnError(error);
delegate_->OnRequestError(error);
}
void AtomURLRequest::InformDelegateResponseErrorOccured(
const std::string& error) const {
DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
if (delegate_)
delegate_->OnResponseError(error);
}
} // namespace atom

View file

@ -33,7 +33,7 @@ class AtomURLRequest : public base::RefCountedThreadSafe<AtomURLRequest>,
bool Write(scoped_refptr<const net::IOBufferWithSize> buffer,
bool is_last);
void SetChunkedUpload(bool is_chunked_upload);
void Abort() const;
void Cancel() const;
void SetExtraHeader(const std::string& name, const std::string& value) const;
void RemoveExtraHeader(const std::string& name) const;
void PassLoginInformation(const base::string16& username,
@ -56,7 +56,7 @@ class AtomURLRequest : public base::RefCountedThreadSafe<AtomURLRequest>,
void DoWriteBuffer(scoped_refptr<const net::IOBufferWithSize> buffer,
bool is_last);
void DoAbort() const;
void DoCancel() const;
void DoSetAuth(const base::string16& username,
const base::string16& password) const;
void DoCancelAuth() const;
@ -70,7 +70,8 @@ class AtomURLRequest : public base::RefCountedThreadSafe<AtomURLRequest>,
void InformDelegateResponseData(
scoped_refptr<net::IOBufferWithSize> data) const;
void InformDelegateResponseCompleted() const;
void InformDelegateErrorOccured(const std::string& error) const;
void InformDelegateRequestErrorOccured(const std::string& error) const;
void InformDelegateResponseErrorOccured(const std::string& error) const;
base::WeakPtr<api::URLRequest> delegate_;
std::unique_ptr<net::URLRequest> request_;
@ -81,7 +82,6 @@ class AtomURLRequest : public base::RefCountedThreadSafe<AtomURLRequest>,
std::vector<std::unique_ptr<net::UploadElementReader>>
upload_element_readers_;
scoped_refptr<net::IOBuffer> response_read_buffer_;
scoped_refptr<net::HttpResponseHeaders> response_headers_;
DISALLOW_COPY_AND_ASSIGN(AtomURLRequest);
};

View file

@ -50,12 +50,24 @@ class IncomingMessage extends EventEmitter {
}
URLRequest.prototype._emitRequestEvent = function () {
this._request.emit.apply(this._request, arguments)
URLRequest.prototype._emitRequestEvent = function (async, ...rest) {
if (async) {
process.nextTick(() => {
this._request.emit.apply(this._request, rest)
})
} else {
this._request.emit.apply(this._request, rest)
}
}
URLRequest.prototype._emitResponseEvent = function () {
this._response.emit.apply(this._response, arguments)
URLRequest.prototype._emitResponseEvent = function (async, ...rest) {
if (async) {
process.nextTick(() => {
this._request.emit.apply(this._request, rest)
})
} else {
this._response.emit.apply(this._response, rest)
}
}
class ClientRequest extends EventEmitter {
@ -127,16 +139,7 @@ class ClientRequest extends EventEmitter {
}
}
// Flag to prevent request's headers modifications after
// headers flush.
this._started = false
this._aborted = false
// Flag to prevent writings after end.
this._finished = false
// Set when the request uses chuned encoding. Can be switched
// Set when the request uses chunked encoding. Can be switched
// to true only once and never set back to false.
this._chunkedEncoding = false
@ -161,7 +164,7 @@ class ClientRequest extends EventEmitter {
}
set chunkedEncoding (value) {
if (this._started) {
if (!this._url_request.notStarted) {
throw new Error('Can\'t set the transfer encoding, headers have been sent.')
}
this._chunkedEncoding = value
@ -174,7 +177,7 @@ class ClientRequest extends EventEmitter {
if (value === undefined) {
throw new Error('`value` required in setHeader("' + name + '", value).')
}
if (this._started) {
if (!this._url_request.notStarted) {
throw new Error('Can\'t set headers after they are sent.')
}
@ -201,7 +204,7 @@ class ClientRequest extends EventEmitter {
throw new Error('`name` is required for removeHeader(name).')
}
if (this._started) {
if (!this._url_request.notStarted) {
throw new Error('Can\'t remove headers after they are sent.')
}
@ -229,9 +232,8 @@ class ClientRequest extends EventEmitter {
// Since writing to the network is asynchronous, we conservatively
// assume that request headers are written after delivering the first
// buffer to the network IO thread.
if (!this._started) {
if (!this._url_request.notStarted) {
this._url_request.setChunkedUpload(this.chunkedEncoding)
this._started = true
}
// The write callback is fired asynchronously to mimic Node.js.
@ -243,7 +245,7 @@ class ClientRequest extends EventEmitter {
}
write (data, encoding, callback) {
if (this._finished) {
if (this._url_request.finished) {
let error = new Error('Write after end.')
process.nextTick(writeAfterEndNT, this, error, callback)
return true
@ -253,12 +255,10 @@ class ClientRequest extends EventEmitter {
}
end (data, encoding, callback) {
if (this._finished) {
if (this._url_request.finished) {
return false
}
this._finished = true
if (typeof data === 'function') {
callback = data
encoding = null
@ -274,18 +274,7 @@ class ClientRequest extends EventEmitter {
}
abort () {
if (!this._started) {
// Does nothing if stream
return
}
if (!this._aborted) {
this._url_request.abort()
this._aborted = true
process.nextTick(() => {
this.emit('abort')
})
}
this._url_request.cancel()
}
}