Update Singleton code

This commit is contained in:
Cheng Zhao 2017-04-13 18:03:57 +09:00
parent e43b34b8f8
commit 9f94a0b246

View file

@ -49,25 +49,34 @@
#include <unistd.h> #include <unistd.h>
#include <cstring> #include <cstring>
#include <memory>
#include <set> #include <set>
#include <string> #include <string>
#include <stddef.h>
#include "atom/common/atom_command_line.h" #include "atom/common/atom_command_line.h"
#include "base/base_paths.h" #include "base/base_paths.h"
#include "base/bind.h" #include "base/bind.h"
#include "base/command_line.h" #include "base/command_line.h"
#include "base/files/file_descriptor_watcher_posix.h"
#include "base/files/file_path.h" #include "base/files/file_path.h"
#include "base/files/file_util.h" #include "base/files/file_util.h"
#include "base/location.h" #include "base/location.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/metrics/histogram_macros.h"
#include "base/path_service.h" #include "base/path_service.h"
#include "base/posix/eintr_wrapper.h" #include "base/posix/eintr_wrapper.h"
#include "base/posix/safe_strerror.h" #include "base/posix/safe_strerror.h"
#include "base/rand_util.h" #include "base/rand_util.h"
#include "base/sequenced_task_runner_helpers.h" #include "base/sequenced_task_runner_helpers.h"
#include "base/single_thread_task_runner.h" #include "base/single_thread_task_runner.h"
#include "base/stl_util.h" #include "base/single_thread_task_runner.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "base/strings/string_split.h" #include "base/strings/string_split.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
@ -78,6 +87,7 @@
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h" #include "base/time/time.h"
#include "base/timer/timer.h" #include "base/timer/timer.h"
#include "build/build_config.h"
#include "content/public/browser/browser_thread.h" #include "content/public/browser/browser_thread.h"
#include "net/base/network_interfaces.h" #include "net/base/network_interfaces.h"
#include "ui/base/l10n/l10n_util.h" #include "ui/base/l10n/l10n_util.h"
@ -222,9 +232,8 @@ int SetupSocketOnly() {
int sock = socket(PF_UNIX, SOCK_STREAM, 0); int sock = socket(PF_UNIX, SOCK_STREAM, 0);
PCHECK(sock >= 0) << "socket() failed"; PCHECK(sock >= 0) << "socket() failed";
int rv = base::SetNonBlocking(sock); DCHECK(base::SetNonBlocking(sock)) << "Failed to make non-blocking socket.";
DCHECK_EQ(0, rv) << "Failed to make non-blocking socket."; int rv = SetCloseOnExec(sock);
rv = SetCloseOnExec(sock);
DCHECK_EQ(0, rv) << "Failed to set CLOEXEC on socket."; DCHECK_EQ(0, rv) << "Failed to set CLOEXEC on socket.";
return sock; return sock;
@ -305,7 +314,6 @@ bool ParseLockPath(const base::FilePath& path,
bool DisplayProfileInUseError(const base::FilePath& lock_path, bool DisplayProfileInUseError(const base::FilePath& lock_path,
const std::string& hostname, const std::string& hostname,
int pid) { int pid) {
// TODO: yolo
return true; return true;
} }
@ -455,44 +463,38 @@ bool ReplaceOldSingletonLock(const base::FilePath& symlink_content,
// This class sets up a listener on the singleton socket and handles parsing // This class sets up a listener on the singleton socket and handles parsing
// messages that come in on the singleton socket. // messages that come in on the singleton socket.
class ProcessSingleton::LinuxWatcher class ProcessSingleton::LinuxWatcher
: public base::MessageLoopForIO::Watcher, : public base::RefCountedThreadSafe<ProcessSingleton::LinuxWatcher,
public base::MessageLoop::DestructionObserver,
public base::RefCountedThreadSafe<ProcessSingleton::LinuxWatcher,
BrowserThread::DeleteOnIOThread> { BrowserThread::DeleteOnIOThread> {
public: public:
// A helper class to read message from an established socket. // A helper class to read message from an established socket.
class SocketReader : public base::MessageLoopForIO::Watcher { class SocketReader {
public: public:
SocketReader(ProcessSingleton::LinuxWatcher* parent, SocketReader(ProcessSingleton::LinuxWatcher* parent,
base::MessageLoop* ui_message_loop, scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner,
int fd) int fd)
: parent_(parent), : parent_(parent),
ui_message_loop_(ui_message_loop), ui_task_runner_(ui_task_runner),
fd_(fd), fd_(fd),
bytes_read_(0) { bytes_read_(0) {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
// Wait for reads. // Wait for reads.
base::MessageLoopForIO::current()->WatchFileDescriptor( fd_watch_controller_ = base::FileDescriptorWatcher::WatchReadable(
fd, true, base::MessageLoopForIO::WATCH_READ, &fd_reader_, this); fd, base::Bind(&SocketReader::OnSocketCanReadWithoutBlocking,
base::Unretained(this)));
// If we haven't completed in a reasonable amount of time, give up. // If we haven't completed in a reasonable amount of time, give up.
timer_.Start(FROM_HERE, base::TimeDelta::FromSeconds(kTimeoutInSeconds), timer_.Start(FROM_HERE, base::TimeDelta::FromSeconds(kTimeoutInSeconds),
this, &SocketReader::CleanupAndDeleteSelf); this, &SocketReader::CleanupAndDeleteSelf);
} }
~SocketReader() override { CloseSocket(fd_); } ~SocketReader() { CloseSocket(fd_); }
// MessageLoopForIO::Watcher impl.
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override {
// SocketReader only watches for accept (read) events.
NOTREACHED();
}
// Finish handling the incoming message by optionally sending back an ACK // Finish handling the incoming message by optionally sending back an ACK
// message and removing this SocketReader. // message and removing this SocketReader.
void FinishWithACK(const char *message, size_t length); void FinishWithACK(const char *message, size_t length);
private: private:
void OnSocketCanReadWithoutBlocking();
void CleanupAndDeleteSelf() { void CleanupAndDeleteSelf() {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
@ -500,13 +502,15 @@ class ProcessSingleton::LinuxWatcher
// We're deleted beyond this point. // We're deleted beyond this point.
} }
base::MessageLoopForIO::FileDescriptorWatcher fd_reader_; // Controls watching |fd_|.
std::unique_ptr<base::FileDescriptorWatcher::Controller>
fd_watch_controller_;
// The ProcessSingleton::LinuxWatcher that owns us. // The ProcessSingleton::LinuxWatcher that owns us.
ProcessSingleton::LinuxWatcher* const parent_; ProcessSingleton::LinuxWatcher* const parent_;
// A reference to the UI message loop. // A reference to the UI task runner.
base::MessageLoop* const ui_message_loop_; scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner_;
// The file descriptor we're reading. // The file descriptor we're reading.
const int fd_; const int fd_;
@ -525,9 +529,7 @@ class ProcessSingleton::LinuxWatcher
// We expect to only be constructed on the UI thread. // We expect to only be constructed on the UI thread.
explicit LinuxWatcher(ProcessSingleton* parent) explicit LinuxWatcher(ProcessSingleton* parent)
: ui_message_loop_(base::MessageLoop::current()), : ui_task_runner_(base::ThreadTaskRunnerHandle::Get()), parent_(parent) {}
parent_(parent) {
}
// Start listening for connections on the socket. This method should be // Start listening for connections on the socket. This method should be
// called from the IO thread. // called from the IO thread.
@ -540,79 +542,63 @@ class ProcessSingleton::LinuxWatcher
const std::vector<std::string>& argv, const std::vector<std::string>& argv,
SocketReader* reader); SocketReader* reader);
// MessageLoopForIO::Watcher impl. These run on the IO thread.
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override {
// ProcessSingleton only watches for accept (read) events.
NOTREACHED();
}
// MessageLoop::DestructionObserver
void WillDestroyCurrentMessageLoop() override {
fd_watcher_.StopWatchingFileDescriptor();
}
private: private:
friend struct BrowserThread::DeleteOnThread<BrowserThread::IO>; friend struct BrowserThread::DeleteOnThread<BrowserThread::IO>;
friend class base::DeleteHelper<ProcessSingleton::LinuxWatcher>; friend class base::DeleteHelper<ProcessSingleton::LinuxWatcher>;
~LinuxWatcher() override { ~LinuxWatcher() {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
STLDeleteElements(&readers_);
base::MessageLoopForIO* ml = base::MessageLoopForIO::current();
ml->RemoveDestructionObserver(this);
} }
void OnSocketCanReadWithoutBlocking(int socket);
// Removes and deletes the SocketReader. // Removes and deletes the SocketReader.
void RemoveSocketReader(SocketReader* reader); void RemoveSocketReader(SocketReader* reader);
base::MessageLoopForIO::FileDescriptorWatcher fd_watcher_; std::unique_ptr<base::FileDescriptorWatcher::Controller> socket_watcher_;
// A reference to the UI message loop (i.e., the message loop we were // A reference to the UI message loop (i.e., the message loop we were
// constructed on). // constructed on).
base::MessageLoop* ui_message_loop_; scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner_;
// The ProcessSingleton that owns us. // The ProcessSingleton that owns us.
ProcessSingleton* const parent_; ProcessSingleton* const parent_;
std::set<SocketReader*> readers_; std::set<std::unique_ptr<SocketReader>> readers_;
DISALLOW_COPY_AND_ASSIGN(LinuxWatcher); DISALLOW_COPY_AND_ASSIGN(LinuxWatcher);
}; };
void ProcessSingleton::LinuxWatcher::OnFileCanReadWithoutBlocking(int fd) { void ProcessSingleton::LinuxWatcher::OnSocketCanReadWithoutBlocking(
int socket) {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
// Accepting incoming client. // Accepting incoming client.
sockaddr_un from; sockaddr_un from;
socklen_t from_len = sizeof(from); socklen_t from_len = sizeof(from);
int connection_socket = HANDLE_EINTR(accept( int connection_socket = HANDLE_EINTR(
fd, reinterpret_cast<sockaddr*>(&from), &from_len)); accept(socket, reinterpret_cast<sockaddr*>(&from), &from_len));
if (-1 == connection_socket) { if (-1 == connection_socket) {
PLOG(ERROR) << "accept() failed"; PLOG(ERROR) << "accept() failed";
return; return;
} }
int rv = base::SetNonBlocking(connection_socket); DCHECK(base::SetNonBlocking(connection_socket))
DCHECK_EQ(0, rv) << "Failed to make non-blocking socket."; << "Failed to make non-blocking socket.";
SocketReader* reader = new SocketReader(this, readers_.insert(
ui_message_loop_, base::MakeUnique<SocketReader>(this, ui_task_runner_, connection_socket));
connection_socket);
readers_.insert(reader);
} }
void ProcessSingleton::LinuxWatcher::StartListening(int socket) { void ProcessSingleton::LinuxWatcher::StartListening(int socket) {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
// Watch for client connections on this socket. // Watch for client connections on this socket.
base::MessageLoopForIO* ml = base::MessageLoopForIO::current(); socket_watcher_ = base::FileDescriptorWatcher::WatchReadable(
ml->AddDestructionObserver(this); socket, base::Bind(&LinuxWatcher::OnSocketCanReadWithoutBlocking,
ml->WatchFileDescriptor(socket, true, base::MessageLoopForIO::WATCH_READ, base::Unretained(this), socket));
&fd_watcher_, this);
} }
void ProcessSingleton::LinuxWatcher::HandleMessage( void ProcessSingleton::LinuxWatcher::HandleMessage(
const std::string& current_dir, const std::vector<std::string>& argv, const std::string& current_dir, const std::vector<std::string>& argv,
SocketReader* reader) { SocketReader* reader) {
DCHECK(ui_message_loop_ == base::MessageLoop::current()); DCHECK(ui_task_runner_->BelongsToCurrentThread());
DCHECK(reader); DCHECK(reader);
if (parent_->notification_callback_.Run(argv, if (parent_->notification_callback_.Run(argv,
@ -632,25 +618,27 @@ void ProcessSingleton::LinuxWatcher::HandleMessage(
void ProcessSingleton::LinuxWatcher::RemoveSocketReader(SocketReader* reader) { void ProcessSingleton::LinuxWatcher::RemoveSocketReader(SocketReader* reader) {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
DCHECK(reader); DCHECK(reader);
readers_.erase(reader); auto it = std::find_if(readers_.begin(), readers_.end(),
delete reader; [reader](const std::unique_ptr<SocketReader>& ptr) {
return ptr.get() == reader;
});
readers_.erase(it);
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// ProcessSingleton::LinuxWatcher::SocketReader // ProcessSingleton::LinuxWatcher::SocketReader
// //
void ProcessSingleton::LinuxWatcher::SocketReader::OnFileCanReadWithoutBlocking( void ProcessSingleton::LinuxWatcher::SocketReader::
int fd) { OnSocketCanReadWithoutBlocking() {
DCHECK_CURRENTLY_ON(BrowserThread::IO); DCHECK_CURRENTLY_ON(BrowserThread::IO);
DCHECK_EQ(fd, fd_);
while (bytes_read_ < sizeof(buf_)) { while (bytes_read_ < sizeof(buf_)) {
ssize_t rv = HANDLE_EINTR( ssize_t rv =
read(fd, buf_ + bytes_read_, sizeof(buf_) - bytes_read_)); HANDLE_EINTR(read(fd_, buf_ + bytes_read_, sizeof(buf_) - bytes_read_));
if (rv < 0) { if (rv < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "read() failed"; PLOG(ERROR) << "read() failed";
CloseSocket(fd); CloseSocket(fd_);
return; return;
} else { } else {
// It would block, so we just return and continue to watch for the next // It would block, so we just return and continue to watch for the next
@ -696,10 +684,10 @@ void ProcessSingleton::LinuxWatcher::SocketReader::OnFileCanReadWithoutBlocking(
tokens.erase(tokens.begin()); tokens.erase(tokens.begin());
// Return to the UI thread to handle opening a new browser tab. // Return to the UI thread to handle opening a new browser tab.
ui_message_loop_->task_runner()->PostTask( ui_task_runner_->PostTask(
FROM_HERE, base::Bind(&ProcessSingleton::LinuxWatcher::HandleMessage, FROM_HERE, base::Bind(&ProcessSingleton::LinuxWatcher::HandleMessage,
parent_, current_dir, tokens, this)); parent_, current_dir, tokens, this));
fd_reader_.StopWatchingFileDescriptor(); fd_watch_controller_.reset();
// LinuxWatcher::HandleMessage() is in charge of destroying this SocketReader // LinuxWatcher::HandleMessage() is in charge of destroying this SocketReader
// object by invoking SocketReader::FinishWithACK(). // object by invoking SocketReader::FinishWithACK().
@ -731,7 +719,8 @@ ProcessSingleton::ProcessSingleton(
const base::FilePath& user_data_dir, const base::FilePath& user_data_dir,
const NotificationCallback& notification_callback) const NotificationCallback& notification_callback)
: notification_callback_(notification_callback), : notification_callback_(notification_callback),
current_pid_(base::GetCurrentProcId()) { current_pid_(base::GetCurrentProcId()),
watcher_(new LinuxWatcher(this)) {
// The user_data_dir may have not been created yet. // The user_data_dir may have not been created yet.
base::CreateDirectoryAndGetError(user_data_dir, nullptr); base::CreateDirectoryAndGetError(user_data_dir, nullptr);
@ -897,12 +886,26 @@ ProcessSingleton::NotifyOtherProcessWithTimeoutOrCreate(
const base::CommandLine& command_line, const base::CommandLine& command_line,
int retry_attempts, int retry_attempts,
const base::TimeDelta& timeout) { const base::TimeDelta& timeout) {
const base::TimeTicks begin_ticks = base::TimeTicks::Now();
NotifyResult result = NotifyOtherProcessWithTimeout( NotifyResult result = NotifyOtherProcessWithTimeout(
command_line, retry_attempts, timeout, true); command_line, retry_attempts, timeout, true);
if (result != PROCESS_NONE) if (result != PROCESS_NONE) {
if (result == PROCESS_NOTIFIED) {
UMA_HISTOGRAM_MEDIUM_TIMES("Chrome.ProcessSingleton.TimeToNotify",
base::TimeTicks::Now() - begin_ticks);
} else {
UMA_HISTOGRAM_MEDIUM_TIMES("Chrome.ProcessSingleton.TimeToFailure",
base::TimeTicks::Now() - begin_ticks);
}
return result; return result;
if (Create()) }
if (Create()) {
UMA_HISTOGRAM_MEDIUM_TIMES("Chrome.ProcessSingleton.TimeToCreate",
base::TimeTicks::Now() - begin_ticks);
return PROCESS_NONE; return PROCESS_NONE;
}
// If the Create() failed, try again to notify. (It could be that another // If the Create() failed, try again to notify. (It could be that another
// instance was starting at the same time and managed to grab the lock before // instance was starting at the same time and managed to grab the lock before
// we did.) // we did.)
@ -910,6 +913,15 @@ ProcessSingleton::NotifyOtherProcessWithTimeoutOrCreate(
// aren't going to try to take over the lock ourselves. // aren't going to try to take over the lock ourselves.
result = NotifyOtherProcessWithTimeout( result = NotifyOtherProcessWithTimeout(
command_line, retry_attempts, timeout, false); command_line, retry_attempts, timeout, false);
if (result == PROCESS_NOTIFIED) {
UMA_HISTOGRAM_MEDIUM_TIMES("Chrome.ProcessSingleton.TimeToNotify",
base::TimeTicks::Now() - begin_ticks);
} else {
UMA_HISTOGRAM_MEDIUM_TIMES("Chrome.ProcessSingleton.TimeToFailure",
base::TimeTicks::Now() - begin_ticks);
}
if (result != PROCESS_NONE) if (result != PROCESS_NONE)
return result; return result;
@ -1019,15 +1031,13 @@ bool ProcessSingleton::Create() {
if (listen(sock, 5) < 0) if (listen(sock, 5) < 0)
NOTREACHED() << "listen failed: " << base::safe_strerror(errno); NOTREACHED() << "listen failed: " << base::safe_strerror(errno);
// In Electron the ProcessSingleton is created earlier than the IO DCHECK(BrowserThread::IsMessageLoopValid(BrowserThread::IO));
// thread gets created, so we have to postpone the call until message BrowserThread::PostTask(
// loop is up an running. BrowserThread::IO,
scoped_refptr<base::SingleThreadTaskRunner> task_runner =
base::ThreadTaskRunnerHandle::Get();
task_runner->PostTask(
FROM_HERE, FROM_HERE,
base::Bind(&ProcessSingleton::StartListening, base::Bind(&ProcessSingleton::LinuxWatcher::StartListening,
base::Unretained(this), sock)); watcher_,
sock));
return true; return true;
} }
@ -1038,17 +1048,6 @@ void ProcessSingleton::Cleanup() {
UnlinkPath(lock_path_); UnlinkPath(lock_path_);
} }
void ProcessSingleton::StartListening(int sock) {
watcher_ = new LinuxWatcher(this);
DCHECK(BrowserThread::IsMessageLoopValid(BrowserThread::IO));
BrowserThread::PostTask(
BrowserThread::IO,
FROM_HERE,
base::Bind(&ProcessSingleton::LinuxWatcher::StartListening,
watcher_.get(),
sock));
}
bool ProcessSingleton::IsSameChromeInstance(pid_t pid) { bool ProcessSingleton::IsSameChromeInstance(pid_t pid) {
pid_t cur_pid = current_pid_; pid_t cur_pid = current_pid_;
while (pid != cur_pid) { while (pid != cur_pid) {