Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix socket watching read and write conflict. #4430

Draft
wants to merge 5 commits into
base: 25.lts.1+
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 80 additions & 28 deletions base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,28 @@ MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
}
}

bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
if (!SbSocketIsValid(socket_)) {
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}

SbSocket socket = Release();
bool MessagePumpIOStarboard::SocketWatcher::UnregisterInterest(int interests) {
bool result = true;
if (SbSocketIsValid(socket)) {
DCHECK(pump_);
#if defined(STARBOARD)
if (SbSocketIsValid(socket_)) {
// This may get called multiple times from TCPSocketStarboard.
if (pump_) {
result = pump_->StopWatching(socket);
result = pump_->UnregisterInterest(socket_, interests, this);
}
#else
result = pump_->StopWatching(socket);
#endif
} else {
interests_ = 0;
}
if (!interests_) {
pump_ = nullptr;
watcher_ = nullptr;
}
pump_ = nullptr;
return result;
}

bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
return UnregisterInterest(kSbSocketWaiterInterestRead ||
kSbSocketWaiterInterestWrite);
}

void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket,
bool persistent) {
DCHECK(socket);
Expand Down Expand Up @@ -109,27 +105,76 @@ MessagePumpIOStarboard::~MessagePumpIOStarboard() {
SbSocketWaiterDestroy(waiter_);
}

bool MessagePumpIOStarboard::UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller) {
DCHECK(SbSocketIsValid(socket));
DCHECK(controller);
DCHECK(dropped_interests == kSbSocketWaiterInterestRead ||
dropped_interests == kSbSocketWaiterInterestWrite ||
dropped_interests ==
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
DCHECK_CALLED_ON_VALID_THREAD(watch_socket_caller_checker_);

// Make sure we don't pick up any funky internal masks.
int old_interest_mask =
controller->interests() &
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite);
int interests = old_interest_mask & (~dropped_interests);
if (interests == old_interest_mask) {
// Interests didn't change, return.
return true;
}

SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
NOTREACHED() << "Sockets don't match" << old_socket << "!=" << socket;
return false;
}

// Must disarm the event before we can reuse it.
SbSocketWaiterRemove(waiter_, old_socket);
} else {
interests = kSbSocketWaiterInterestNone;
}
controller->set_interests(interests);

if (!SbSocketIsValid(socket)) {
NOTREACHED() << "Invalid socket" << socket;
return false;
}

if (interests) {
// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests,
controller->persistent())) {
return false;
}
controller->Init(socket, controller->persistent());
}
return true;
}

bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
int interests,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
DCHECK(interests == kSbSocketWaiterInterestRead ||
interests == kSbSocketWaiterInterestWrite ||
interests ==
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
// Watch should be called on the pump thread. It is not threadsafe, and your
// watcher may never be registered.
DCHECK_CALLED_ON_VALID_THREAD(watch_socket_caller_checker_);

int interests = kSbSocketWaiterInterestNone;
if (mode & WATCH_READ) {
interests |= kSbSocketWaiterInterestRead;
}
if (mode & WATCH_WRITE) {
interests |= kSbSocketWaiterInterestWrite;
}

SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
// It's illegal to use this function to listen on 2 separate fds with the
Expand All @@ -151,6 +196,11 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
SbSocketWaiterRemove(waiter_, old_socket);
}

if (!SbSocketIsValid(socket)) {
NOTREACHED() << "Invalid socket" << socket;
return false;
}

// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent)) {
Expand Down Expand Up @@ -181,6 +231,8 @@ void MessagePumpIOStarboard::Run(Delegate* delegate) {
AutoReset<bool> auto_reset_keep_running(&keep_running_, true);

for (;;) {
if (should_quit())
break;
Delegate::NextWorkInfo next_work_info = delegate->DoWork();
bool immediate_work_available = next_work_info.is_immediate();

Expand Down
18 changes: 9 additions & 9 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
SocketWatcher(const SocketWatcher&) = delete;
SocketWatcher& operator=(const SocketWatcher&) = delete;

// NOTE: These methods aren't called StartWatching()/StopWatching() to avoid
// confusion with the win32 ObjectWatcher class.
// Unregisters the interests for watching the socket, always safe to call.
// No-op if there's nothing to do.
bool UnregisterInterest(int interests);

// Stops watching the socket, always safe to call. No-op if there's nothing
// to do.
Expand Down Expand Up @@ -102,12 +103,6 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
base::WeakPtrFactory<SocketWatcher> weak_factory_;
};

enum Mode {
WATCH_READ = 1 << 0,
WATCH_WRITE = 1 << 1,
WATCH_READ_WRITE = WATCH_READ | WATCH_WRITE
};

MessagePumpIOStarboard();
virtual ~MessagePumpIOStarboard();

Expand All @@ -125,10 +120,15 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// success. Must be called on the same thread the message_pump is running on.
bool Watch(SbSocket socket,
bool persistent,
int mode,
int interests,
SocketWatcher* controller,
Watcher* delegate);

// Removes an interest from a socket, and stops watching the socket if needed.
bool UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller);

// Stops watching the socket.
bool StopWatching(SbSocket socket);

Expand Down
70 changes: 39 additions & 31 deletions base/message_loop/message_pump_io_starboard_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class MessagePumpIOStarboardTest : public testing::Test {
MessagePumpIOStarboardTest()
: task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
test::SingleThreadTaskEnvironment::MainThreadType::DEFAULT)),
event_(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED),
io_thread_("MessagePumpIOStarboardTestIOThread") {}
~MessagePumpIOStarboardTest() override = default;

Expand Down Expand Up @@ -68,14 +70,27 @@ class MessagePumpIOStarboardTest : public testing::Test {
}

void SimulateIOEvent(MessagePumpIOStarboard::SocketWatcher* controller) {
MessagePumpIOStarboard::OnSocketWaiterNotification(nullptr,
nullptr,
controller,
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
MessagePumpIOStarboard::OnSocketWaiterNotification(
nullptr, nullptr, controller,
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite));
}

void SimulateIOReadEvent(MessagePumpIOStarboard::SocketWatcher* controller) {
MessagePumpIOStarboard::OnSocketWaiterNotification(
nullptr, nullptr, controller,
kSbSocketWaiterInterestRead);
}

void SimulateIOWriteEvent(MessagePumpIOStarboard::SocketWatcher* controller) {
MessagePumpIOStarboard::OnSocketWaiterNotification(
nullptr, nullptr, controller,
kSbSocketWaiterInterestWrite);
}

std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;

WaitableEvent event_;

private:
Thread io_thread_;
SbSocket socket_;
Expand All @@ -95,7 +110,7 @@ class StupidWatcher : public MessagePumpIOStarboard::Watcher {
};

// Death tests not supported.
TEST_F(MessagePumpIOStarboardTest, DISABLED_QuitOutsideOfRun) {
TEST_F(MessagePumpIOStarboardTest, QuitOutsideOfRun) {
std::unique_ptr<MessagePumpIOStarboard> pump = CreateMessagePump();
ASSERT_DCHECK_DEATH(pump->Quit());
}
Expand Down Expand Up @@ -132,15 +147,14 @@ class DeleteWatcher : public BaseWatcher {
};

// Fails on some platforms.
TEST_F(MessagePumpIOStarboardTest, DISABLED_DeleteWatcher) {
TEST_F(MessagePumpIOStarboardTest, DeleteWatcher) {
DeleteWatcher delegate(
std::make_unique<MessagePumpIOStarboard::SocketWatcher>(FROM_HERE));
std::unique_ptr<MessagePumpIOStarboard> pump = CreateMessagePump();
pump->Watch(socket(),
/*persistent=*/false,
MessagePumpIOStarboard::WATCH_READ_WRITE,
delegate.controller(),
&delegate);
/*persistent=*/false,
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite),
delegate.controller(), &delegate);
SimulateIOEvent(delegate.controller());
}

Expand All @@ -160,15 +174,14 @@ class StopWatcher : public BaseWatcher {
};

// Fails on some platforms.
TEST_F(MessagePumpIOStarboardTest, DISABLED_StopWatcher) {
TEST_F(MessagePumpIOStarboardTest, StopWatcher) {
std::unique_ptr<MessagePumpIOStarboard> pump = CreateMessagePump();
MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE);
StopWatcher delegate(&controller);
pump->Watch(socket(),
/*persistent=*/false,
MessagePumpIOStarboard::WATCH_READ_WRITE,
&controller,
&delegate);
/*persistent=*/false,
(kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite),
&controller, &delegate);
SimulateIOEvent(&controller);
}

Expand Down Expand Up @@ -197,19 +210,18 @@ class NestedPumpWatcher : public MessagePumpIOStarboard::Watcher {
};

// Fails on some platforms.
TEST_F(MessagePumpIOStarboardTest, DISABLED_NestedPumpWatcher) {
TEST_F(MessagePumpIOStarboardTest, NestedPumpWatcher) {
NestedPumpWatcher delegate;
std::unique_ptr<MessagePumpIOStarboard> pump = CreateMessagePump();
MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE);
pump->Watch(socket(),
/*persistent=*/false,
MessagePumpIOStarboard::WATCH_READ,
&controller,
&delegate);
/*persistent=*/false, kSbSocketWaiterInterestRead, &controller,
&delegate);
SimulateIOEvent(&controller);
}

void FatalClosure() {
LOG(INFO) << __FUNCTION__ << " Fatal?";
FAIL() << "Reached fatal closure.";
}

Expand Down Expand Up @@ -237,7 +249,7 @@ void WriteSocketWrapper(MessagePumpIOStarboard* pump,
}

// Fails on some platforms.
TEST_F(MessagePumpIOStarboardTest, DISABLED_QuitWatcher) {
TEST_F(MessagePumpIOStarboardTest, QuitWatcher) {
// Delete the old TaskEnvironment so that we can manage our own one here.
task_environment_.reset();

Expand All @@ -247,29 +259,25 @@ TEST_F(MessagePumpIOStarboardTest, DISABLED_QuitWatcher) {
RunLoop run_loop;
QuitWatcher delegate(run_loop.QuitClosure());
MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE);
WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED);
std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);

// Tell the pump to watch the pipe.
pump->Watch(socket(),
/*persistent=*/false,
MessagePumpIOStarboard::WATCH_READ,
&controller,
&delegate);
/*persistent=*/false, kSbSocketWaiterInterestRead, &controller,
&delegate);

// Make the IO thread wait for |event| before writing to pipefds[1].
// Make the IO thread wait for |event_| before writing to pipefds[1].
const char buf = 0;
WaitableEventWatcher::EventCallback write_socket_task =
BindOnce(&WriteSocketWrapper, base::Unretained(pump));
io_runner()->PostTask(
FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
Unretained(watcher.get()), &event,
Unretained(watcher.get()), &event_,
std::move(write_socket_task), io_runner()));

// Queue |event| to signal on |sequence_manager|.
// Queue task to signal |event_|.
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event_)));

// Now run the MessageLoop.
run_loop.Run();
Expand Down
10 changes: 8 additions & 2 deletions base/task/current_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,17 @@ MessagePumpForIO* CurrentIOThread::GetMessagePumpForIO() const {
#if defined(STARBOARD)
bool CurrentIOThread::Watch(SbSocket socket,
bool persistent,
int mode,
SbSocketWaiterInterest interests,
SocketWatcher* controller,
Watcher* delegate) {
return static_cast<MessagePumpIOStarboard*>(GetMessagePumpForIO())
->Watch(socket, persistent, mode, controller, delegate);
->Watch(socket, persistent, interests, controller, delegate);
}
bool CurrentIOThread::UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller) {
return static_cast<MessagePumpIOStarboard*>(GetMessagePumpForIO())
->UnregisterInterest(socket, dropped_interests, controller);
}
#elif BUILDFLAG(IS_WIN)
HRESULT CurrentIOThread::RegisterIOHandler(
Expand Down
9 changes: 4 additions & 5 deletions base/task/current_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,14 @@ class BASE_EXPORT CurrentIOThread : public CurrentThread {
typedef base::MessagePumpIOStarboard::SocketWatcher SocketWatcher;
typedef base::MessagePumpIOStarboard::IOObserver IOObserver;

enum Mode{WATCH_READ = base::MessagePumpIOStarboard::WATCH_READ,
WATCH_WRITE = base::MessagePumpIOStarboard::WATCH_WRITE,
WATCH_READ_WRITE = base::MessagePumpIOStarboard::WATCH_READ_WRITE};

bool Watch(SbSocket socket,
bool persistent,
int mode,
SbSocketWaiterInterest interests,
SocketWatcher* controller,
Watcher* delegate);
bool UnregisterInterest(SbSocket socket,
int dropped_interests,
SocketWatcher* controller);
#elif BUILDFLAG(IS_WIN)
// Please see MessagePumpWin for definitions of these methods.
HRESULT RegisterIOHandler(HANDLE file, MessagePumpForIO::IOHandler* handler);
Expand Down
Loading
Loading