* refactor: enable weak ptr unwrap sequence dcheck * spec: remove WeakPtrDeathTest.* from disabled list
		
			
				
	
	
		
			115 lines
		
	
	
	
		
			3.7 KiB
			
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			115 lines
		
	
	
	
		
			3.7 KiB
			
		
	
	
	
		
			C++
		
	
	
	
	
	
// Copyright (c) 2017 GitHub, Inc.
 | 
						|
// Use of this source code is governed by the MIT license that can be
 | 
						|
// found in the LICENSE file.
 | 
						|
 | 
						|
#include "atom/browser/api/stream_subscriber.h"
 | 
						|
 | 
						|
#include <string>
 | 
						|
 | 
						|
#include "atom/browser/net/url_request_stream_job.h"
 | 
						|
#include "atom/common/api/event_emitter_caller.h"
 | 
						|
#include "atom/common/native_mate_converters/callback.h"
 | 
						|
 | 
						|
#include "atom/common/node_includes.h"
 | 
						|
 | 
						|
namespace mate {
 | 
						|
 | 
						|
StreamSubscriber::StreamSubscriber(
 | 
						|
    v8::Isolate* isolate,
 | 
						|
    v8::Local<v8::Object> emitter,
 | 
						|
    base::WeakPtr<atom::URLRequestStreamJob> url_job)
 | 
						|
    : isolate_(isolate),
 | 
						|
      emitter_(isolate, emitter),
 | 
						|
      url_job_(url_job),
 | 
						|
      weak_factory_(this) {
 | 
						|
  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
 | 
						|
  auto weak_self = weak_factory_.GetWeakPtr();
 | 
						|
  On("data", base::Bind(&StreamSubscriber::OnData, weak_self));
 | 
						|
  On("end", base::Bind(&StreamSubscriber::OnEnd, weak_self));
 | 
						|
  On("error", base::Bind(&StreamSubscriber::OnError, weak_self));
 | 
						|
}
 | 
						|
 | 
						|
StreamSubscriber::~StreamSubscriber() {
 | 
						|
  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
 | 
						|
  RemoveAllListeners();
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::On(const std::string& event,
 | 
						|
                          EventCallback&& callback) {  // NOLINT
 | 
						|
  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
 | 
						|
  DCHECK(js_handlers_.find(event) == js_handlers_.end());
 | 
						|
 | 
						|
  v8::Locker locker(isolate_);
 | 
						|
  v8::Isolate::Scope isolate_scope(isolate_);
 | 
						|
  v8::HandleScope handle_scope(isolate_);
 | 
						|
  // emitter.on(event, EventEmitted)
 | 
						|
  auto fn = CallbackToV8(isolate_, callback);
 | 
						|
  js_handlers_[event] = v8::Global<v8::Value>(isolate_, fn);
 | 
						|
  internal::ValueVector args = {StringToV8(isolate_, event), fn};
 | 
						|
  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on", &args);
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::Off(const std::string& event) {
 | 
						|
  DCHECK_CURRENTLY_ON(content::BrowserThread::UI);
 | 
						|
  DCHECK(js_handlers_.find(event) != js_handlers_.end());
 | 
						|
 | 
						|
  v8::Locker locker(isolate_);
 | 
						|
  v8::Isolate::Scope isolate_scope(isolate_);
 | 
						|
  v8::HandleScope handle_scope(isolate_);
 | 
						|
  auto js_handler = js_handlers_.find(event);
 | 
						|
  DCHECK(js_handler != js_handlers_.end());
 | 
						|
  RemoveListener(js_handler);
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::OnData(mate::Arguments* args) {
 | 
						|
  v8::Local<v8::Value> buf;
 | 
						|
  args->GetNext(&buf);
 | 
						|
  if (!node::Buffer::HasInstance(buf)) {
 | 
						|
    args->ThrowError("data must be Buffer");
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  const char* data = node::Buffer::Data(buf);
 | 
						|
  size_t length = node::Buffer::Length(buf);
 | 
						|
  if (length == 0)
 | 
						|
    return;
 | 
						|
 | 
						|
  // Pass the data to the URLJob in IO thread.
 | 
						|
  std::vector<char> buffer(data, data + length);
 | 
						|
  content::BrowserThread::PostTask(
 | 
						|
      content::BrowserThread::IO, FROM_HERE,
 | 
						|
      base::Bind(&atom::URLRequestStreamJob::OnData, url_job_,
 | 
						|
                 base::Passed(&buffer)));
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::OnEnd(mate::Arguments* args) {
 | 
						|
  content::BrowserThread::PostTask(
 | 
						|
      content::BrowserThread::IO, FROM_HERE,
 | 
						|
      base::Bind(&atom::URLRequestStreamJob::OnEnd, url_job_));
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::OnError(mate::Arguments* args) {
 | 
						|
  content::BrowserThread::PostTask(
 | 
						|
      content::BrowserThread::IO, FROM_HERE,
 | 
						|
      base::Bind(&atom::URLRequestStreamJob::OnError, url_job_,
 | 
						|
                 net::ERR_FAILED));
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::RemoveAllListeners() {
 | 
						|
  v8::Locker locker(isolate_);
 | 
						|
  v8::Isolate::Scope isolate_scope(isolate_);
 | 
						|
  v8::HandleScope handle_scope(isolate_);
 | 
						|
  while (!js_handlers_.empty()) {
 | 
						|
    RemoveListener(js_handlers_.begin());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void StreamSubscriber::RemoveListener(JSHandlersMap::iterator it) {
 | 
						|
  internal::ValueVector args = {StringToV8(isolate_, it->first),
 | 
						|
                                it->second.Get(isolate_)};
 | 
						|
  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_),
 | 
						|
                               "removeListener", &args);
 | 
						|
  js_handlers_.erase(it);
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace mate
 |