forked from mirrors/gecko-dev
In the changes from bug 1879375, the conditions for entries to be added to mPendingMessages was changed, such that it is now possible for the broker to temporarily have empty entries in this table. This means the assertion is no longer correct and should be removed. Differential Revision: https://phabricator.services.mozilla.com/D209861
893 lines
31 KiB
C++
893 lines
31 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
* You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "mozilla/ipc/NodeController.h"
|
|
#include "MainThreadUtils.h"
|
|
#include "base/process_util.h"
|
|
#include "chrome/common/ipc_message.h"
|
|
#include "mojo/core/ports/name.h"
|
|
#include "mojo/core/ports/node.h"
|
|
#include "mojo/core/ports/port_locker.h"
|
|
#include "mozilla/AlreadyAddRefed.h"
|
|
#include "mozilla/RandomNum.h"
|
|
#include "mozilla/StaticPtr.h"
|
|
#include "mozilla/Assertions.h"
|
|
#include "mozilla/ToString.h"
|
|
#include "mozilla/ipc/BrowserProcessSubThread.h"
|
|
#include "mozilla/ipc/ProtocolUtils.h"
|
|
#include "mozilla/mozalloc.h"
|
|
#include "nsISerialEventTarget.h"
|
|
#include "nsTArray.h"
|
|
#include "nsXULAppAPI.h"
|
|
#include "nsPrintfCString.h"
|
|
|
|
#define PORTS_ALWAYS_OK(expr) MOZ_ALWAYS_TRUE(mojo::core::ports::OK == (expr))
|
|
|
|
namespace mozilla::ipc {
|
|
|
|
static StaticRefPtr<NodeController> gNodeController;
|
|
|
|
static LazyLogModule gNodeControllerLog{"NodeController"};
|
|
|
|
// Helper logger macro which includes the name of the `this` NodeController in
|
|
// the logged messages.
|
|
#define NODECONTROLLER_LOG(level_, fmt_, ...) \
|
|
MOZ_LOG(gNodeControllerLog, level_, \
|
|
("[%s]: " fmt_, ToString(mName).c_str(), ##__VA_ARGS__))
|
|
|
|
// Helper warning macro which both does logger logging and emits NS_WARNING logs
|
|
// under debug mode.
|
|
#ifdef DEBUG
|
|
# define NODECONTROLLER_WARNING(fmt_, ...) \
|
|
do { \
|
|
nsPrintfCString warning("[%s]: " fmt_, ToString(mName).c_str(), \
|
|
##__VA_ARGS__); \
|
|
NS_WARNING(warning.get()); \
|
|
MOZ_LOG(gNodeControllerLog, LogLevel::Debug, ("%s", warning.get())); \
|
|
} while (0)
|
|
#else
|
|
# define NODECONTROLLER_WARNING(fmt_, ...) \
|
|
NODECONTROLLER_LOG(LogLevel::Warning, fmt_, ##__VA_ARGS__)
|
|
#endif
|
|
|
|
NodeController::NodeController(const NodeName& aName)
|
|
: mName(aName), mNode(MakeUnique<Node>(aName, this)) {}
|
|
|
|
NodeController::~NodeController() {
|
|
auto state = mState.Lock();
|
|
MOZ_RELEASE_ASSERT(state->mPeers.IsEmpty(),
|
|
"Destroying NodeController before closing all peers");
|
|
MOZ_RELEASE_ASSERT(state->mInvites.IsEmpty(),
|
|
"Destroying NodeController before closing all invites");
|
|
};
|
|
|
|
// FIXME: Actually provide some way to create the thing.
|
|
/* static */ NodeController* NodeController::GetSingleton() {
|
|
MOZ_ASSERT(gNodeController);
|
|
return gNodeController;
|
|
}
|
|
|
|
std::pair<ScopedPort, ScopedPort> NodeController::CreatePortPair() {
|
|
PortRef port0, port1;
|
|
PORTS_ALWAYS_OK(mNode->CreatePortPair(&port0, &port1));
|
|
return {ScopedPort{std::move(port0), this},
|
|
ScopedPort{std::move(port1), this}};
|
|
}
|
|
|
|
auto NodeController::GetPort(const PortName& aName) -> PortRef {
|
|
PortRef port;
|
|
int rv = mNode->GetPort(aName, &port);
|
|
if (NS_WARN_IF(rv != mojo::core::ports::OK)) {
|
|
NODECONTROLLER_WARNING("Call to GetPort(%s) Failed",
|
|
ToString(aName).c_str());
|
|
return {};
|
|
}
|
|
return port;
|
|
}
|
|
|
|
void NodeController::SetPortObserver(const PortRef& aPort,
|
|
PortObserver* aObserver) {
|
|
PORTS_ALWAYS_OK(mNode->SetUserData(aPort, aObserver));
|
|
}
|
|
|
|
auto NodeController::GetStatus(const PortRef& aPort) -> Maybe<PortStatus> {
|
|
PortStatus status{};
|
|
int rv = mNode->GetStatus(aPort, &status);
|
|
if (rv != mojo::core::ports::OK) {
|
|
return Nothing();
|
|
}
|
|
return Some(status);
|
|
}
|
|
|
|
void NodeController::ClosePort(const PortRef& aPort) {
|
|
PORTS_ALWAYS_OK(mNode->ClosePort(aPort));
|
|
}
|
|
|
|
bool NodeController::GetMessage(const PortRef& aPort,
|
|
UniquePtr<IPC::Message>* aMessage) {
|
|
UniquePtr<UserMessageEvent> messageEvent;
|
|
int rv = mNode->GetMessage(aPort, &messageEvent, nullptr);
|
|
if (rv != mojo::core::ports::OK) {
|
|
if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
|
|
return false;
|
|
}
|
|
MOZ_CRASH("GetMessage on port in invalid state");
|
|
}
|
|
|
|
if (messageEvent) {
|
|
UniquePtr<IPC::Message> message = messageEvent->TakeMessage<IPC::Message>();
|
|
|
|
// If our UserMessageEvent has any ports directly attached to it, fetch them
|
|
// from our node and attach them to the IPC::Message we extracted.
|
|
//
|
|
// It's important to only do this if we have nonempty set of ports on the
|
|
// event, as we may have never serialized our IPC::Message's ports onto the
|
|
// event if it was routed in-process.
|
|
if (messageEvent->num_ports() > 0) {
|
|
nsTArray<ScopedPort> attachedPorts(messageEvent->num_ports());
|
|
for (size_t i = 0; i < messageEvent->num_ports(); ++i) {
|
|
attachedPorts.AppendElement(
|
|
ScopedPort{GetPort(messageEvent->ports()[i]), this});
|
|
}
|
|
message->SetAttachedPorts(std::move(attachedPorts));
|
|
}
|
|
|
|
*aMessage = std::move(message);
|
|
} else {
|
|
*aMessage = nullptr;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool NodeController::SendUserMessage(const PortRef& aPort,
|
|
UniquePtr<IPC::Message> aMessage) {
|
|
auto messageEvent = MakeUnique<UserMessageEvent>(0);
|
|
messageEvent->AttachMessage(std::move(aMessage));
|
|
|
|
int rv = mNode->SendUserMessage(aPort, std::move(messageEvent));
|
|
if (rv == mojo::core::ports::OK) {
|
|
return true;
|
|
}
|
|
if (rv == mojo::core::ports::ERROR_PORT_PEER_CLOSED) {
|
|
NODECONTROLLER_LOG(LogLevel::Debug,
|
|
"Ignoring message to port %s as peer was closed",
|
|
ToString(aPort.name()).c_str());
|
|
return true;
|
|
}
|
|
NODECONTROLLER_WARNING("Failed to send message to port %s",
|
|
ToString(aPort.name()).c_str());
|
|
return false;
|
|
}
|
|
|
|
auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
|
|
const NodeName* aRelayTarget,
|
|
uint32_t aType)
|
|
-> UniquePtr<IPC::Message> {
|
|
UniquePtr<IPC::Message> message;
|
|
if (aEvent->type() == Event::kUserMessage) {
|
|
MOZ_DIAGNOSTIC_ASSERT(
|
|
aType == EVENT_MESSAGE_TYPE,
|
|
"Can only send a UserMessage in an EVENT_MESSAGE_TYPE");
|
|
message = static_cast<UserMessageEvent*>(aEvent.get())
|
|
->TakeMessage<IPC::Message>();
|
|
} else {
|
|
message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
|
|
}
|
|
|
|
message->set_relay(aRelayTarget != nullptr);
|
|
|
|
size_t length = aEvent->GetSerializedSize();
|
|
if (aRelayTarget) {
|
|
length += sizeof(NodeName);
|
|
}
|
|
|
|
// Use an intermediate buffer to serialize to avoid potential issues with the
|
|
// segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
|
|
// majority of events are fairly small.
|
|
Vector<char, 256, InfallibleAllocPolicy> buffer;
|
|
(void)buffer.initLengthUninitialized(length);
|
|
if (aRelayTarget) {
|
|
memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
|
|
aEvent->Serialize(buffer.begin() + sizeof(NodeName));
|
|
} else {
|
|
aEvent->Serialize(buffer.begin());
|
|
}
|
|
|
|
message->WriteFooter(buffer.begin(), buffer.length());
|
|
message->set_event_footer_size(buffer.length());
|
|
|
|
#ifdef DEBUG
|
|
// Debug-assert that we can read the same data back out of the buffer.
|
|
MOZ_ASSERT(message->event_footer_size() == length);
|
|
Vector<char, 256, InfallibleAllocPolicy> buffer2;
|
|
(void)buffer2.initLengthUninitialized(message->event_footer_size());
|
|
MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
|
|
/* truncate */ false));
|
|
MOZ_ASSERT(!memcmp(buffer2.begin(), buffer.begin(), buffer.length()));
|
|
#endif
|
|
|
|
return message;
|
|
}
|
|
|
|
auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
|
|
NodeName* aRelayTarget)
|
|
-> UniquePtr<Event> {
|
|
if (aMessage->is_relay() && !aRelayTarget) {
|
|
NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
|
|
return nullptr;
|
|
}
|
|
|
|
Vector<char, 256, InfallibleAllocPolicy> buffer;
|
|
(void)buffer.initLengthUninitialized(aMessage->event_footer_size());
|
|
// Truncate the message when reading the footer, so that the extra footer data
|
|
// is no longer present in the message. This allows future code to eventually
|
|
// send the same `IPC::Message` to another process.
|
|
if (!aMessage->ReadFooter(buffer.begin(), buffer.length(),
|
|
/* truncate */ true)) {
|
|
NODECONTROLLER_WARNING("Call to ReadFooter for message '%s' Failed",
|
|
aMessage->name());
|
|
return nullptr;
|
|
}
|
|
aMessage->set_event_footer_size(0);
|
|
|
|
UniquePtr<Event> event;
|
|
if (aRelayTarget) {
|
|
MOZ_ASSERT(aMessage->is_relay());
|
|
if (buffer.length() < sizeof(NodeName)) {
|
|
NODECONTROLLER_WARNING(
|
|
"Insufficient space in message footer for message '%s'",
|
|
aMessage->name());
|
|
return nullptr;
|
|
}
|
|
memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
|
|
event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
|
|
buffer.length() - sizeof(NodeName));
|
|
} else {
|
|
event = Event::Deserialize(buffer.begin(), buffer.length());
|
|
}
|
|
|
|
if (!event) {
|
|
NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
|
|
aMessage->name());
|
|
return nullptr;
|
|
}
|
|
|
|
if (event->type() == Event::kUserMessage) {
|
|
static_cast<UserMessageEvent*>(event.get())
|
|
->AttachMessage(std::move(aMessage));
|
|
}
|
|
return event;
|
|
}
|
|
|
|
already_AddRefed<NodeChannel> NodeController::GetNodeChannel(
|
|
const NodeName& aName) {
|
|
auto state = mState.Lock();
|
|
return do_AddRef(state->mPeers.Get(aName));
|
|
}
|
|
|
|
void NodeController::DropPeer(NodeName aNodeName) {
|
|
AssertIOThread();
|
|
|
|
#ifdef FUZZING_SNAPSHOT
|
|
MOZ_FUZZING_IPC_DROP_PEER("NodeController::DropPeer");
|
|
#endif
|
|
|
|
Invite invite;
|
|
RefPtr<NodeChannel> channel;
|
|
nsTArray<PortRef> pendingMerges;
|
|
{
|
|
auto state = mState.Lock();
|
|
state->mPeers.Remove(aNodeName, &channel);
|
|
state->mPendingMessages.Remove(aNodeName);
|
|
state->mInvites.Remove(aNodeName, &invite);
|
|
state->mPendingMerges.Remove(aNodeName, &pendingMerges);
|
|
}
|
|
|
|
NODECONTROLLER_LOG(LogLevel::Info, "Dropping Peer %s (pid: %" PRIPID ")",
|
|
ToString(aNodeName).c_str(),
|
|
channel ? channel->OtherPid() : base::kInvalidProcessId);
|
|
|
|
if (channel) {
|
|
channel->Close();
|
|
}
|
|
if (invite.mChannel) {
|
|
invite.mChannel->Close();
|
|
}
|
|
if (invite.mToMerge.is_valid()) {
|
|
// Ignore any possible errors here.
|
|
(void)mNode->ClosePort(invite.mToMerge);
|
|
}
|
|
for (auto& port : pendingMerges) {
|
|
// Ignore any possible errors here.
|
|
(void)mNode->ClosePort(port);
|
|
}
|
|
mNode->LostConnectionToNode(aNodeName);
|
|
}
|
|
|
|
void NodeController::ContactRemotePeer(const NodeName& aNode,
|
|
UniquePtr<Event> aEvent) {
|
|
// On Windows and macOS, messages holding HANDLEs or mach ports must be
|
|
// relayed via the broker process so it can transfer ownership.
|
|
bool needsRelay = false;
|
|
#if defined(XP_WIN) || defined(XP_DARWIN)
|
|
if (aEvent && !IsBroker() && aNode != kBrokerNodeName &&
|
|
aEvent->type() == Event::kUserMessage) {
|
|
auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
|
|
needsRelay =
|
|
userEvent->HasMessage() &&
|
|
userEvent->GetMessage<IPC::Message>()->num_relayed_attachments() > 0;
|
|
}
|
|
#endif
|
|
|
|
UniquePtr<IPC::Message> message;
|
|
if (aEvent) {
|
|
message =
|
|
SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
|
|
MOZ_ASSERT(message->is_relay() == needsRelay,
|
|
"Message relay status set incorrectly");
|
|
}
|
|
|
|
RefPtr<NodeChannel> peer;
|
|
RefPtr<NodeChannel> broker;
|
|
bool needsIntroduction = false;
|
|
bool needsBroker = needsRelay;
|
|
{
|
|
auto state = mState.Lock();
|
|
|
|
// Check if we know this peer. If we don't, we'll need to request an
|
|
// introduction.
|
|
peer = state->mPeers.Get(aNode);
|
|
if (!peer) {
|
|
// We don't know the peer, check if we've already requested an
|
|
// introduction, or if we need to request a new one.
|
|
auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
|
|
needsIntroduction = true;
|
|
needsBroker = true;
|
|
return Queue<UniquePtr<IPC::Message>, 64>{};
|
|
});
|
|
// If we aren't relaying, queue up the message to be sent.
|
|
if (message && !needsRelay) {
|
|
queue.Push(std::move(message));
|
|
}
|
|
}
|
|
|
|
if (needsBroker && !IsBroker()) {
|
|
broker = state->mPeers.Get(kBrokerNodeName);
|
|
}
|
|
}
|
|
|
|
if (needsBroker && !broker) {
|
|
NODECONTROLLER_WARNING(
|
|
"Dropping message '%s'; no connection to unknown peer %s",
|
|
message ? message->name() : "<null>", ToString(aNode).c_str());
|
|
if (needsIntroduction) {
|
|
// We have no broker and will never be able to be introduced to this node.
|
|
// Queue a task to clean up any ports connected to it.
|
|
XRE_GetIOMessageLoop()->PostTask(NewRunnableMethod<NodeName>(
|
|
"NodeController::DropPeer", this, &NodeController::DropPeer, aNode));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (needsIntroduction) {
|
|
NODECONTROLLER_LOG(LogLevel::Info, "Requesting introduction to peer %s",
|
|
ToString(aNode).c_str());
|
|
broker->RequestIntroduction(aNode);
|
|
}
|
|
|
|
if (message) {
|
|
if (needsRelay) {
|
|
NODECONTROLLER_LOG(LogLevel::Info,
|
|
"Relaying message '%s' for peer %s due to %" PRIu32
|
|
" attachments",
|
|
message->name(), ToString(aNode).c_str(),
|
|
message->num_relayed_attachments());
|
|
MOZ_ASSERT(message->num_relayed_attachments() > 0 && broker);
|
|
broker->SendEventMessage(std::move(message));
|
|
} else if (peer) {
|
|
peer->SendEventMessage(std::move(message));
|
|
}
|
|
}
|
|
}
|
|
|
|
void NodeController::ForwardEvent(const NodeName& aNode,
|
|
UniquePtr<Event> aEvent) {
|
|
MOZ_ASSERT(aEvent, "cannot forward null event");
|
|
if (aNode == mName) {
|
|
(void)mNode->AcceptEvent(mName, std::move(aEvent));
|
|
} else {
|
|
ContactRemotePeer(aNode, std::move(aEvent));
|
|
}
|
|
}
|
|
|
|
void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
|
|
UniquePtr<IPC::Message> message =
|
|
SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
|
|
|
|
if (IsBroker()) {
|
|
OnBroadcast(mName, std::move(message));
|
|
} else if (RefPtr<NodeChannel> broker = GetNodeChannel(kBrokerNodeName)) {
|
|
broker->Broadcast(std::move(message));
|
|
} else {
|
|
NODECONTROLLER_WARNING(
|
|
"Trying to broadcast event, but no connection to broker");
|
|
}
|
|
}
|
|
|
|
void NodeController::PortStatusChanged(const PortRef& aPortRef) {
|
|
RefPtr<UserData> userData;
|
|
int rv = mNode->GetUserData(aPortRef, &userData);
|
|
if (rv != mojo::core::ports::OK) {
|
|
NODECONTROLLER_WARNING("GetUserData call for port '%s' failed",
|
|
ToString(aPortRef.name()).c_str());
|
|
return;
|
|
}
|
|
if (userData) {
|
|
// All instances of `UserData` attached to ports in this node must be of
|
|
// type `PortObserver`, so we can call `OnPortStatusChanged` directly on
|
|
// them.
|
|
static_cast<PortObserver*>(userData.get())->OnPortStatusChanged();
|
|
}
|
|
}
|
|
|
|
void NodeController::ObserveRemoteNode(const NodeName& aNode) {
|
|
MOZ_ASSERT(aNode != mName);
|
|
ContactRemotePeer(aNode, nullptr);
|
|
}
|
|
|
|
void NodeController::OnEventMessage(const NodeName& aFromNode,
|
|
UniquePtr<IPC::Message> aMessage) {
|
|
AssertIOThread();
|
|
|
|
bool isRelay = aMessage->is_relay();
|
|
if (isRelay && aMessage->num_relayed_attachments() == 0) {
|
|
NODECONTROLLER_WARNING(
|
|
"Invalid relay message without relayed attachments from peer %s!",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
NodeName relayTarget;
|
|
UniquePtr<Event> event = DeserializeEventMessage(
|
|
std::move(aMessage), isRelay ? &relayTarget : nullptr);
|
|
if (!event) {
|
|
NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
NodeName fromNode = aFromNode;
|
|
#if defined(XP_WIN) || defined(XP_DARWIN)
|
|
if (isRelay) {
|
|
if (event->type() != Event::kUserMessage) {
|
|
NODECONTROLLER_WARNING(
|
|
"Unexpected relay of non-UserMessage event from peer %s!",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
// If we're the broker, then we'll need to forward this message on to the
|
|
// true recipient. To do this, we re-serialize the message, passing along
|
|
// the original source node, and send it to the final node.
|
|
if (IsBroker()) {
|
|
UniquePtr<IPC::Message> message =
|
|
SerializeEventMessage(std::move(event), &aFromNode);
|
|
if (!message) {
|
|
NODECONTROLLER_WARNING(
|
|
"Relaying EventMessage from peer %s failed to re-serialize!",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
|
|
MOZ_ASSERT(message->num_relayed_attachments() > 0,
|
|
"Message doesn't have relayed attachments?");
|
|
|
|
NODECONTROLLER_LOG(
|
|
LogLevel::Info,
|
|
"Relaying message '%s' from peer %s to peer %s (%" PRIu32
|
|
" attachments)",
|
|
message->name(), ToString(aFromNode).c_str(),
|
|
ToString(relayTarget).c_str(), message->num_relayed_attachments());
|
|
|
|
RefPtr<NodeChannel> peer;
|
|
{
|
|
auto state = mState.Lock();
|
|
peer = state->mPeers.Get(relayTarget);
|
|
}
|
|
if (!peer) {
|
|
NODECONTROLLER_WARNING(
|
|
"Dropping relayed message from %s to unknown peer %s",
|
|
ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
|
|
return;
|
|
}
|
|
|
|
peer->SendEventMessage(std::move(message));
|
|
return;
|
|
}
|
|
|
|
// Otherwise, we're the final recipient, so we can continue & process the
|
|
// message as usual.
|
|
if (aFromNode != kBrokerNodeName) {
|
|
NODECONTROLLER_WARNING(
|
|
"Unexpected relayed EventMessage from non-broker peer %s!",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
fromNode = relayTarget;
|
|
|
|
NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
|
|
ToString(fromNode).c_str());
|
|
}
|
|
#endif
|
|
|
|
// If we're getting a requested port merge from another process, check to make
|
|
// sure that we're expecting the request, and record that the merge has
|
|
// arrived so we don't try to close the port on error.
|
|
if (event->type() == Event::kMergePort) {
|
|
// Check that the target port for the merge actually exists.
|
|
auto targetPort = GetPort(event->port_name());
|
|
if (!targetPort.is_valid()) {
|
|
NODECONTROLLER_WARNING(
|
|
"Unexpected MergePortEvent from peer %s for unknown port %s",
|
|
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
|
|
DropPeer(fromNode);
|
|
return;
|
|
}
|
|
|
|
// Check if `targetPort` is in our pending merges entry for the given source
|
|
// node. If this makes the `mPendingMerges` entry empty, remove it.
|
|
bool expectingMerge = [&] {
|
|
auto state = mState.Lock();
|
|
auto pendingMerges = state->mPendingMerges.Lookup(aFromNode);
|
|
if (!pendingMerges) {
|
|
return false;
|
|
}
|
|
size_t removed = pendingMerges->RemoveElementsBy(
|
|
[&](auto& port) { return port.name() == targetPort.name(); });
|
|
if (removed != 0 && pendingMerges->IsEmpty()) {
|
|
pendingMerges.Remove();
|
|
}
|
|
return removed != 0;
|
|
}();
|
|
|
|
if (!expectingMerge) {
|
|
NODECONTROLLER_WARNING(
|
|
"Unexpected MergePortEvent from peer %s for port %s",
|
|
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
|
|
DropPeer(fromNode);
|
|
return;
|
|
}
|
|
}
|
|
|
|
(void)mNode->AcceptEvent(fromNode, std::move(event));
|
|
}
|
|
|
|
void NodeController::OnBroadcast(const NodeName& aFromNode,
|
|
UniquePtr<IPC::Message> aMessage) {
|
|
MOZ_DIAGNOSTIC_ASSERT(aMessage->type() == BROADCAST_MESSAGE_TYPE);
|
|
|
|
// NOTE: This method may be called off of the IO thread by the
|
|
// `BroadcastEvent` node callback.
|
|
if (!IsBroker()) {
|
|
NODECONTROLLER_WARNING("Broadcast request received by non-broker node");
|
|
return;
|
|
}
|
|
|
|
UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
|
|
if (!event) {
|
|
NODECONTROLLER_WARNING("Invalid broadcast message from peer");
|
|
return;
|
|
}
|
|
|
|
nsTArray<RefPtr<NodeChannel>> peers;
|
|
{
|
|
auto state = mState.Lock();
|
|
peers.SetCapacity(state->mPeers.Count());
|
|
for (const auto& peer : state->mPeers.Values()) {
|
|
peers.AppendElement(peer);
|
|
}
|
|
}
|
|
for (const auto& peer : peers) {
|
|
// NOTE: This `clone` operation is only supported for a limited number of
|
|
// message types by the ports API, which provides some extra security by
|
|
// only allowing those specific types of messages to be broadcasted.
|
|
// Messages which don't support `CloneForBroadcast` cannot be broadcast, and
|
|
// the ports library will not attempt to broadcast them.
|
|
auto clone = event->CloneForBroadcast();
|
|
if (!clone) {
|
|
NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
|
|
break;
|
|
}
|
|
|
|
peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
|
|
}
|
|
}
|
|
|
|
void NodeController::OnIntroduce(const NodeName& aFromNode,
|
|
NodeChannel::Introduction aIntroduction) {
|
|
AssertIOThread();
|
|
|
|
if (aFromNode != kBrokerNodeName) {
|
|
NODECONTROLLER_WARNING("Introduction received from non-broker node");
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
MOZ_ASSERT(aIntroduction.mMyPid == base::GetCurrentProcId(),
|
|
"We're the wrong process to receive this?");
|
|
|
|
if (!aIntroduction.mHandle) {
|
|
NODECONTROLLER_WARNING("Could not be introduced to peer %s",
|
|
ToString(aIntroduction.mName).c_str());
|
|
mNode->LostConnectionToNode(aIntroduction.mName);
|
|
|
|
auto state = mState.Lock();
|
|
state->mPendingMessages.Remove(aIntroduction.mName);
|
|
return;
|
|
}
|
|
|
|
auto channel =
|
|
MakeUnique<IPC::Channel>(std::move(aIntroduction.mHandle),
|
|
aIntroduction.mMode, aIntroduction.mOtherPid);
|
|
auto nodeChannel = MakeRefPtr<NodeChannel>(
|
|
aIntroduction.mName, std::move(channel), this, aIntroduction.mOtherPid);
|
|
|
|
{
|
|
auto state = mState.Lock();
|
|
bool isNew = false;
|
|
state->mPeers.LookupOrInsertWith(aIntroduction.mName, [&]() {
|
|
isNew = true;
|
|
return nodeChannel;
|
|
});
|
|
if (!isNew) {
|
|
// We got a duplicate introduction. This can happen during normal
|
|
// execution if both sides request an introduction at the same time. We
|
|
// can just ignore the second one, as they'll arrive in the same order in
|
|
// both processes.
|
|
nodeChannel->Close();
|
|
return;
|
|
}
|
|
|
|
// Deliver any pending messages, then remove the entry from our table. We do
|
|
// this while `mState` is still held to ensure that these messages are
|
|
// all sent before another thread can observe the newly created channel.
|
|
// As the channel hasn't been `Connect()`-ed yet, this will only queue the
|
|
// messages up to be sent, so is OK to do with the mutex held. These
|
|
// messages will be processed to be sent during `Start()` below, which is
|
|
// performed outside of the lock.
|
|
if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
|
|
while (!pending->IsEmpty()) {
|
|
nodeChannel->SendEventMessage(pending->Pop());
|
|
}
|
|
pending.Remove();
|
|
}
|
|
}
|
|
|
|
// NodeChannel::Start must be called with the lock not held, as it may lead to
|
|
// callbacks being made into `OnChannelError` or `OnMessageReceived`, which
|
|
// will attempt to re-acquire our lock.
|
|
nodeChannel->Start();
|
|
}
|
|
|
|
void NodeController::OnRequestIntroduction(const NodeName& aFromNode,
|
|
const NodeName& aName) {
|
|
AssertIOThread();
|
|
if (NS_WARN_IF(!IsBroker())) {
|
|
return;
|
|
}
|
|
|
|
RefPtr<NodeChannel> peerA = GetNodeChannel(aFromNode);
|
|
if (!peerA || aName == mojo::core::ports::kInvalidNodeName) {
|
|
NODECONTROLLER_WARNING("Invalid OnRequestIntroduction message from node %s",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
RefPtr<NodeChannel> peerB = GetNodeChannel(aName);
|
|
IPC::Channel::ChannelHandle handleA, handleB;
|
|
if (!peerB || !IPC::Channel::CreateRawPipe(&handleA, &handleB)) {
|
|
NODECONTROLLER_WARNING(
|
|
"Rejecting introduction request from '%s' for unknown peer '%s'",
|
|
ToString(aFromNode).c_str(), ToString(aName).c_str());
|
|
|
|
// We don't know this peer, or ran into issues creating the descriptor! Send
|
|
// an invalid introduction to content to clean up any pending outbound
|
|
// messages.
|
|
NodeChannel::Introduction intro{aName, nullptr, IPC::Channel::MODE_SERVER,
|
|
peerA->OtherPid(), base::kInvalidProcessId};
|
|
peerA->Introduce(std::move(intro));
|
|
return;
|
|
}
|
|
|
|
NodeChannel::Introduction introA{aName, std::move(handleA),
|
|
IPC::Channel::MODE_SERVER, peerA->OtherPid(),
|
|
peerB->OtherPid()};
|
|
NodeChannel::Introduction introB{aFromNode, std::move(handleB),
|
|
IPC::Channel::MODE_CLIENT, peerB->OtherPid(),
|
|
peerA->OtherPid()};
|
|
peerA->Introduce(std::move(introA));
|
|
peerB->Introduce(std::move(introB));
|
|
}
|
|
|
|
void NodeController::OnAcceptInvite(const NodeName& aFromNode,
|
|
const NodeName& aRealName,
|
|
const PortName& aInitialPort) {
|
|
AssertIOThread();
|
|
if (!IsBroker()) {
|
|
NODECONTROLLER_WARNING("Ignoring AcceptInvite message as non-broker");
|
|
return;
|
|
}
|
|
|
|
if (aRealName == mojo::core::ports::kInvalidNodeName ||
|
|
aInitialPort == mojo::core::ports::kInvalidPortName) {
|
|
NODECONTROLLER_WARNING("Invalid name in AcceptInvite message");
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
bool inserted = false;
|
|
Invite invite;
|
|
{
|
|
auto state = mState.Lock();
|
|
|
|
// Try to remove the source node from our invites list and insert it into
|
|
// our peers map under the new name.
|
|
if (state->mInvites.Remove(aFromNode, &invite)) {
|
|
MOZ_ASSERT(invite.mChannel && invite.mToMerge.is_valid());
|
|
state->mPeers.LookupOrInsertWith(aRealName, [&]() {
|
|
inserted = true;
|
|
return invite.mChannel;
|
|
});
|
|
}
|
|
}
|
|
if (!inserted) {
|
|
NODECONTROLLER_WARNING("Invalid AcceptInvite message from node %s",
|
|
ToString(aFromNode).c_str());
|
|
DropPeer(aFromNode);
|
|
return;
|
|
}
|
|
|
|
// Update the name of the node. This field is only accessed from the IO
|
|
// thread, so it's safe to update it without a lock held.
|
|
invite.mChannel->SetName(aRealName);
|
|
|
|
// Start the port merge to allow our existing initial port to begin
|
|
// communicating with the remote port.
|
|
PORTS_ALWAYS_OK(mNode->MergePorts(invite.mToMerge, aRealName, aInitialPort));
|
|
}
|
|
|
|
void NodeController::OnChannelError(const NodeName& aFromNode) {
|
|
AssertIOThread();
|
|
DropPeer(aFromNode);
|
|
}
|
|
|
|
static mojo::core::ports::NodeName RandomNodeName() {
|
|
return {RandomUint64OrDie(), RandomUint64OrDie()};
|
|
}
|
|
|
|
std::tuple<ScopedPort, RefPtr<NodeChannel>> NodeController::InviteChildProcess(
|
|
UniquePtr<IPC::Channel> aChannel,
|
|
GeckoChildProcessHost* aChildProcessHost) {
|
|
MOZ_ASSERT(IsBroker());
|
|
AssertIOThread();
|
|
|
|
// Create the peer with a randomly generated name, and store it in `mInvites`.
|
|
// This channel and name will be used for communication with the node until it
|
|
// sends us its' real name in an `AcceptInvite` message.
|
|
auto ports = CreatePortPair();
|
|
auto inviteName = RandomNodeName();
|
|
auto nodeChannel =
|
|
MakeRefPtr<NodeChannel>(inviteName, std::move(aChannel), this,
|
|
base::kInvalidProcessId, aChildProcessHost);
|
|
{
|
|
auto state = mState.Lock();
|
|
MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(inviteName),
|
|
"UUID conflict?");
|
|
MOZ_DIAGNOSTIC_ASSERT(!state->mInvites.Contains(inviteName),
|
|
"UUID conflict?");
|
|
state->mInvites.InsertOrUpdate(inviteName,
|
|
Invite{nodeChannel, ports.second.Release()});
|
|
}
|
|
|
|
nodeChannel->Start();
|
|
return std::tuple{std::move(ports.first), std::move(nodeChannel)};
|
|
}
|
|
|
|
void NodeController::InitBrokerProcess() {
|
|
AssertIOThread();
|
|
MOZ_ASSERT(!gNodeController);
|
|
gNodeController = new NodeController(kBrokerNodeName);
|
|
}
|
|
|
|
ScopedPort NodeController::InitChildProcess(UniquePtr<IPC::Channel> aChannel,
|
|
base::ProcessId aParentPid) {
|
|
AssertIOThread();
|
|
MOZ_ASSERT(!gNodeController);
|
|
|
|
auto nodeName = RandomNodeName();
|
|
gNodeController = new NodeController(nodeName);
|
|
|
|
auto ports = gNodeController->CreatePortPair();
|
|
PortRef toMerge = ports.second.Release();
|
|
|
|
// Mark the port as expecting a pending merge. This is a duplicate of the
|
|
// information tracked by `mPendingMerges`, and was added by upstream
|
|
// chromium.
|
|
// See https://chromium-review.googlesource.com/c/chromium/src/+/3289065
|
|
{
|
|
mojo::core::ports::SinglePortLocker locker(&toMerge);
|
|
locker.port()->pending_merge_peer = true;
|
|
}
|
|
|
|
auto nodeChannel = MakeRefPtr<NodeChannel>(
|
|
kBrokerNodeName, std::move(aChannel), gNodeController, aParentPid);
|
|
{
|
|
auto state = gNodeController->mState.Lock();
|
|
MOZ_DIAGNOSTIC_ASSERT(!state->mPeers.Contains(kBrokerNodeName));
|
|
state->mPeers.InsertOrUpdate(kBrokerNodeName, nodeChannel);
|
|
MOZ_DIAGNOSTIC_ASSERT(!state->mPendingMerges.Contains(kBrokerNodeName));
|
|
state->mPendingMerges.LookupOrInsert(kBrokerNodeName)
|
|
.AppendElement(toMerge);
|
|
}
|
|
|
|
nodeChannel->Start();
|
|
nodeChannel->AcceptInvite(nodeName, toMerge.name());
|
|
return std::move(ports.first);
|
|
}
|
|
|
|
void NodeController::CleanUp() {
|
|
AssertIOThread();
|
|
MOZ_ASSERT(gNodeController);
|
|
|
|
RefPtr<NodeController> nodeController = gNodeController;
|
|
gNodeController = nullptr;
|
|
|
|
// Collect all objects from our state which need to be cleaned up.
|
|
nsTArray<NodeName> lostConnections;
|
|
nsTArray<RefPtr<NodeChannel>> channelsToClose;
|
|
nsTArray<PortRef> portsToClose;
|
|
{
|
|
auto state = nodeController->mState.Lock();
|
|
for (const auto& chan : state->mPeers) {
|
|
lostConnections.AppendElement(chan.GetKey());
|
|
channelsToClose.AppendElement(chan.GetData());
|
|
}
|
|
for (const auto& pending : state->mPendingMessages.Keys()) {
|
|
lostConnections.AppendElement(pending);
|
|
}
|
|
for (const auto& invite : state->mInvites.Values()) {
|
|
channelsToClose.AppendElement(invite.mChannel);
|
|
portsToClose.AppendElement(invite.mToMerge);
|
|
}
|
|
for (const auto& pendingPorts : state->mPendingMerges.Values()) {
|
|
portsToClose.AppendElements(pendingPorts);
|
|
}
|
|
state->mPeers.Clear();
|
|
state->mPendingMessages.Clear();
|
|
state->mInvites.Clear();
|
|
state->mPendingMerges.Clear();
|
|
}
|
|
for (auto& nodeChannel : channelsToClose) {
|
|
nodeChannel->Close();
|
|
}
|
|
for (auto& port : portsToClose) {
|
|
nodeController->mNode->ClosePort(port);
|
|
}
|
|
for (auto& name : lostConnections) {
|
|
nodeController->mNode->LostConnectionToNode(name);
|
|
}
|
|
}
|
|
|
|
#undef NODECONTROLLER_LOG
|
|
#undef NODECONTROLLER_WARNING
|
|
|
|
} // namespace mozilla::ipc
|