// Copyright (c) 2017 GitHub, Inc.
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.

#include "shell/browser/api/stream_subscriber.h"

#include <string>

#include "base/task/post_task.h"
#include "content/public/browser/browser_task_traits.h"
#include "shell/browser/net/url_request_stream_job.h"
#include "shell/common/api/event_emitter_caller.h"
#include "shell/common/native_mate_converters/callback.h"
#include "shell/common/node_includes.h"

namespace mate {

StreamSubscriber::StreamSubscriber(
    v8::Isolate* isolate,
    v8::Local<v8::Object> emitter,
    base::WeakPtr<electron::URLRequestStreamJob> url_job,
    scoped_refptr<base::SequencedTaskRunner> ui_task_runner)
    : base::RefCountedDeleteOnSequence<StreamSubscriber>(ui_task_runner),
      isolate_(isolate),
      emitter_(isolate, emitter),
      url_job_(url_job),
      weak_factory_(this) {
  DCHECK(ui_task_runner->RunsTasksInCurrentSequence());

  auto weak_self = weak_factory_.GetWeakPtr();
  On("data", base::BindRepeating(&StreamSubscriber::OnData, weak_self));
  On("end", base::BindRepeating(&StreamSubscriber::OnEnd, weak_self));
  On("error", base::BindRepeating(&StreamSubscriber::OnError, weak_self));
}

StreamSubscriber::~StreamSubscriber() {
  RemoveAllListeners();
}

void StreamSubscriber::On(const std::string& event,
                          EventCallback&& callback) {  // NOLINT
  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  DCHECK(js_handlers_.find(event) == js_handlers_.end());

  v8::Locker locker(isolate_);
  v8::Isolate::Scope isolate_scope(isolate_);
  v8::HandleScope handle_scope(isolate_);
  // emitter.on(event, EventEmitted)
  auto fn = CallbackToV8(isolate_, callback);
  js_handlers_[event] = v8::Global<v8::Value>(isolate_, fn);
  internal::ValueVector args = {StringToV8(isolate_, event), fn};
  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_), "on", &args);
}

void StreamSubscriber::Off(const std::string& event) {
  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  DCHECK(js_handlers_.find(event) != js_handlers_.end());

  v8::Locker locker(isolate_);
  v8::Isolate::Scope isolate_scope(isolate_);
  v8::HandleScope handle_scope(isolate_);
  auto js_handler = js_handlers_.find(event);
  DCHECK(js_handler != js_handlers_.end());
  RemoveListener(js_handler);
}

void StreamSubscriber::OnData(mate::Arguments* args) {
  v8::Local<v8::Value> buf;
  args->GetNext(&buf);
  if (!node::Buffer::HasInstance(buf)) {
    args->ThrowError("data must be Buffer");
    return;
  }

  const char* data = node::Buffer::Data(buf);
  size_t length = node::Buffer::Length(buf);
  if (length == 0)
    return;

  // Pass the data to the URLJob in IO thread.
  std::vector<char> buffer(data, data + length);
  base::PostTaskWithTraits(
      FROM_HERE, {content::BrowserThread::IO},
      base::BindOnce(&electron::URLRequestStreamJob::OnData, url_job_,
                     base::Passed(&buffer)));
}

void StreamSubscriber::OnEnd(mate::Arguments* args) {
  base::PostTaskWithTraits(
      FROM_HERE, {content::BrowserThread::IO},
      base::BindOnce(&electron::URLRequestStreamJob::OnEnd, url_job_));
}

void StreamSubscriber::OnError(mate::Arguments* args) {
  base::PostTaskWithTraits(
      FROM_HERE, {content::BrowserThread::IO},
      base::BindOnce(&electron::URLRequestStreamJob::OnError, url_job_,
                     net::ERR_FAILED));
}

void StreamSubscriber::RemoveAllListeners() {
  DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
  v8::Locker locker(isolate_);
  v8::Isolate::Scope isolate_scope(isolate_);
  v8::HandleScope handle_scope(isolate_);
  while (!js_handlers_.empty()) {
    RemoveListener(js_handlers_.begin());
  }
}

void StreamSubscriber::RemoveListener(JSHandlersMap::iterator it) {
  internal::ValueVector args = {StringToV8(isolate_, it->first),
                                it->second.Get(isolate_)};
  internal::CallMethodWithArgs(isolate_, emitter_.Get(isolate_),
                               "removeListener", &args);
  js_handlers_.erase(it);
}

}  // namespace mate