Bug 1785208 - Update AudioIPC macOS branch. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D154875
This commit is contained in:
Matthew Gregan 2022-08-24 06:55:04 +00:00
parent ccea767237
commit 540be5aee2
12 changed files with 164 additions and 91 deletions

View file

@ -65,7 +65,7 @@ rev = "21c26326f5f45f415c49eac4ba5bc41a2f961321"
[source."https://github.com/kinetiknz/audioipc-2"] [source."https://github.com/kinetiknz/audioipc-2"]
git = "https://github.com/kinetiknz/audioipc-2" git = "https://github.com/kinetiknz/audioipc-2"
replace-with = "vendored-sources" replace-with = "vendored-sources"
rev = "499b95580c8b276e52bd9757d735249504202e5c" rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb"
[source."https://github.com/jfkthame/mapped_hyph.git"] [source."https://github.com/jfkthame/mapped_hyph.git"]
git = "https://github.com/jfkthame/mapped_hyph.git" git = "https://github.com/jfkthame/mapped_hyph.git"

7
Cargo.lock generated
View file

@ -318,7 +318,7 @@ dependencies = [
[[package]] [[package]]
name = "audioipc2" name = "audioipc2"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb"
dependencies = [ dependencies = [
"arrayvec", "arrayvec",
"ashmem", "ashmem",
@ -327,6 +327,7 @@ dependencies = [
"byteorder", "byteorder",
"bytes 1.2.1", "bytes 1.2.1",
"cc", "cc",
"crossbeam-channel",
"cubeb", "cubeb",
"error-chain", "error-chain",
"iovec", "iovec",
@ -345,7 +346,7 @@ dependencies = [
[[package]] [[package]]
name = "audioipc2-client" name = "audioipc2-client"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb"
dependencies = [ dependencies = [
"audio_thread_priority", "audio_thread_priority",
"audioipc2", "audioipc2",
@ -356,7 +357,7 @@ dependencies = [
[[package]] [[package]]
name = "audioipc2-server" name = "audioipc2-server"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb"
dependencies = [ dependencies = [
"audio_thread_priority", "audio_thread_priority",
"audioipc2", "audioipc2",

View file

@ -1 +1 @@
{"files":{"Cargo.toml":"31dc34fae9951183eaed3511cffe3d830d52ba3c046257454f09a06156d0716b","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"17bdf1dfc8d910b745f94d5bc74121850174c716f8a2eb6d1c4b075d42fa5df5","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"064a657c845762be1dbcbbfc18b3f8a51582eb540def8d2ceecf200184ad4f7a","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null} {"files":{"Cargo.toml":"31dc34fae9951183eaed3511cffe3d830d52ba3c046257454f09a06156d0716b","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"4593aa41ee97b72622b572ad80f0e4c939d8be2ea45fed1f5587f5a109a67735","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"859abe75b521eb4297c84b30423814b5b87f3c7741ad16fe72189212e123e1ac","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null}

View file

@ -71,8 +71,7 @@ fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
match get_current_thread_info() { match get_current_thread_info() {
Ok(info) => { Ok(info) => {
let bytes = info.serialize(); let bytes = info.serialize();
// Don't wait for the response, this is on the callback thread, which must not block. let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
} }
Err(_) => { Err(_) => {
warn!("Could not remotely promote thread to RT."); warn!("Could not remotely promote thread to RT.");

View file

@ -43,7 +43,7 @@ macro_rules! send_recv {
$rpc.call(ServerMessage::$smsg($($a),*)) $rpc.call(ServerMessage::$smsg($($a),*))
}); });
(__recv $resp:expr, $rmsg:ident) => ({ (__recv $resp:expr, $rmsg:ident) => ({
match $resp.wait() { match $resp {
Ok(ClientMessage::$rmsg) => Ok(()), Ok(ClientMessage::$rmsg) => Ok(()),
Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
Ok(m) => { Ok(m) => {
@ -57,7 +57,7 @@ macro_rules! send_recv {
} }
}); });
(__recv $resp:expr, $rmsg:ident __result) => ({ (__recv $resp:expr, $rmsg:ident __result) => ({
match $resp.wait() { match $resp {
Ok(ClientMessage::$rmsg(v)) => Ok(v), Ok(ClientMessage::$rmsg(v)) => Ok(v),
Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
Ok(m) => { Ok(m) => {

View file

@ -1 +1 @@
{"files":{"Cargo.toml":"7feb495b23148ecc83ec7f480aefe19c9804a8900cdb4ceb005c049cdce82428","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"d9cc7ca311cceb70acbc63b2190d6205094152e582faaad1b4a6061019f5803f","src/server.rs":"00740854f3e4f64cbeabfdb04d1337a2fdb89122464ea64d23fe1272045aee7d"},"package":null} {"files":{"Cargo.toml":"7feb495b23148ecc83ec7f480aefe19c9804a8900cdb4ceb005c049cdce82428","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"d9cc7ca311cceb70acbc63b2190d6205094152e582faaad1b4a6061019f5803f","src/server.rs":"362ec34c541e43befb95204795622b5a2da036f8e417d524c64eb6c6550d094b"},"package":null}

View file

@ -252,14 +252,11 @@ impl ServerStreamCallbacks {
return cubeb::ffi::CUBEB_ERROR.try_into().unwrap(); return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
} }
let r = self let r = self.data_callback_rpc.call(CallbackReq::Data {
.data_callback_rpc nframes,
.call(CallbackReq::Data { input_frame_size: self.input_frame_size as usize,
nframes, output_frame_size: self.output_frame_size as usize,
input_frame_size: self.input_frame_size as usize, });
output_frame_size: self.output_frame_size as usize,
})
.wait();
match r { match r {
Ok(CallbackResp::Data(frames)) => { Ok(CallbackResp::Data(frames)) => {
@ -282,8 +279,7 @@ impl ServerStreamCallbacks {
trace!("Stream state callback: {:?}", state); trace!("Stream state callback: {:?}", state);
let r = self let r = self
.state_callback_rpc .state_callback_rpc
.call(CallbackReq::State(state.into())) .call(CallbackReq::State(state.into()));
.wait();
match r { match r {
Ok(CallbackResp::State) => {} Ok(CallbackResp::State) => {}
_ => { _ => {
@ -296,8 +292,7 @@ impl ServerStreamCallbacks {
trace!("Stream device change callback"); trace!("Stream device change callback");
let r = self let r = self
.device_change_callback_rpc .device_change_callback_rpc
.call(CallbackReq::DeviceChange) .call(CallbackReq::DeviceChange);
.wait();
match r { match r {
Ok(CallbackResp::DeviceChange) => {} Ok(CallbackResp::DeviceChange) => {}
_ => { _ => {
@ -347,8 +342,7 @@ impl DeviceCollectionChangeCallback {
); );
let _ = self let _ = self
.rpc .rpc
.call(DeviceCollectionReq::DeviceChange(device_type)) .call(DeviceCollectionReq::DeviceChange(device_type));
.wait();
} }
} }

View file

@ -1 +1 @@
{"files":{"Cargo.toml":"4a6aaffaf15fc11d3c41fc399a6e36a1ac9016f0edd592d4ff4059a2092818af","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"4e029396765db803201249e90bcf724eb56deed3b2e455822d6673f40550a3e1","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"6d33898f5bc61963797d21b44d36a7ee52d3d0cf13a4f19fa2d59373720eead8","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"9fa24cb6d487b436382e35f82d0809ad2b315ce049ebaa767b4f88d3d5637f2e","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"8a27a20383c333c5d033e58a546a530e26b964942a4615793d1ca078c65efb75","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"c0103cc058aeb890ab7aa023fcd6d3b9a0135d6b28fdecdec446650957210508","src/sys/windows/mod.rs":"7b1288e42b3ce34c7004b9fe3eeb6d9822c55e2688d3c2a40e55db46a2ca5d76"},"package":null} {"files":{"Cargo.toml":"8b2d3abbe023360a24d37b306dbec9e8bd0162025d38ca106ebcc8d7abab4039","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"4e029396765db803201249e90bcf724eb56deed3b2e455822d6673f40550a3e1","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"eda3629e363124c84d5b826dea03f5551a8adad6c8efbc61b98f7d6572fdfa18","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"21568946ca59e0cf1cb0dc6254ebda577a014343438a4fde2556a22e44eea2bf","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"8a27a20383c333c5d033e58a546a530e26b964942a4615793d1ca078c65efb75","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"c0103cc058aeb890ab7aa023fcd6d3b9a0135d6b28fdecdec446650957210508","src/sys/windows/mod.rs":"7b1288e42b3ce34c7004b9fe3eeb6d9822c55e2688d3c2a40e55db46a2ca5d76"},"package":null}

View file

@ -21,6 +21,7 @@ serde_bytes = "0.11"
mio = { version = "0.8", features = ["os-poll", "net", "os-ext"] } mio = { version = "0.8", features = ["os-poll", "net", "os-ext"] }
slab = "0.4" slab = "0.4"
scopeguard = "1.1.0" scopeguard = "1.1.0"
crossbeam-channel = "0.5"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
iovec = "0.1" iovec = "0.1"

View file

@ -7,6 +7,7 @@ use std::io::{self, Result};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::thread; use std::thread;
use crossbeam_channel::{self, Receiver, Sender};
use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker}; use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker};
use slab::Slab; use slab::Slab;
@ -53,7 +54,7 @@ enum Request {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
waker: Arc<Waker>, waker: Arc<Waker>,
requests_tx: mpsc::Sender<Request>, requests_tx: Sender<Request>,
} }
impl EventLoopHandle { impl EventLoopHandle {
@ -140,8 +141,8 @@ struct EventLoop {
waker: Arc<Waker>, waker: Arc<Waker>,
name: String, name: String,
connections: Slab<Connection>, connections: Slab<Connection>,
requests_rx: mpsc::Receiver<Request>, requests_rx: Receiver<Request>,
requests_tx: mpsc::Sender<Request>, requests_tx: Sender<Request>,
} }
const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow. const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow.
@ -151,7 +152,7 @@ impl EventLoop {
fn new(name: String) -> Result<EventLoop> { fn new(name: String) -> Result<EventLoop> {
let poll = Poll::new()?; let poll = Poll::new()?;
let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?);
let (tx, rx) = mpsc::channel(); let (tx, rx) = crossbeam_channel::bounded(EVENT_LOOP_INITIAL_CLIENTS);
let eventloop = EventLoop { let eventloop = EventLoop {
poll, poll,
events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION), events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION),
@ -824,7 +825,7 @@ mod test {
// RPC message from client to server. // RPC message from client to server.
let response = client_proxy.call(TestServerMessage::TestRequest); let response = client_proxy.call(TestServerMessage::TestRequest);
let response = response.wait().expect("client response"); let response = response.expect("client response");
assert_eq!(response, TestClientMessage::TestResponse); assert_eq!(response, TestClientMessage::TestResponse);
// Explicit shutdown. // Explicit shutdown.
@ -840,7 +841,7 @@ mod test {
// RPC message from client to server. // RPC message from client to server.
let response = client_proxy.call(TestServerMessage::TestRequest); let response = client_proxy.call(TestServerMessage::TestRequest);
let response = response.wait().expect("client response"); let response = response.expect("client response");
assert_eq!(response, TestClientMessage::TestResponse); assert_eq!(response, TestClientMessage::TestResponse);
// Explicit shutdown. // Explicit shutdown.
@ -855,7 +856,7 @@ mod test {
drop(server); drop(server);
let response = client_proxy.call(TestServerMessage::TestRequest); let response = client_proxy.call(TestServerMessage::TestRequest);
response.wait().expect_err("sending on closed channel"); response.expect_err("sending on closed channel");
} }
#[test] #[test]
@ -865,7 +866,7 @@ mod test {
drop(client); drop(client);
let response = client_proxy.call(TestServerMessage::TestRequest); let response = client_proxy.call(TestServerMessage::TestRequest);
response.wait().expect_err("sending on a closed channel"); response.expect_err("sending on a closed channel");
} }
#[test] #[test]

View file

@ -3,11 +3,14 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use std::collections::VecDeque;
use std::io::{self, Result}; use std::io::{self, Result};
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
use std::{collections::VecDeque, sync::mpsc}; use std::sync::{Arc, Mutex, Weak};
use crossbeam_channel::{self, Receiver, Sender};
use mio::Token; use mio::Token;
use slab::Slab;
use crate::ipccore::EventLoopHandle; use crate::ipccore::EventLoopHandle;
@ -41,21 +44,57 @@ pub trait Server {
fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage;
} }
// RPC Client Proxy implementation. ProxyRequest's Sender is connected to ProxyReceiver's Receiver, // RPC Client Proxy implementation.
// allowing the ProxyReceiver to wait on a response from the proxy. type ProxyKey = usize;
type ProxyRequest<Request, Response> = (Request, mpsc::Sender<Response>); type ProxyRequest<Request> = (ProxyKey, Request);
type ProxyReceiver<Request, Response> = mpsc::Receiver<ProxyRequest<Request, Response>>;
// Each RPC Proxy `call` returns a blocking waitable ProxyResponse. // RPC Proxy that may be `clone`d for use by multiple owners/threads.
// `wait` produces the response received over RPC from the associated // A Proxy `call` arranges for the supplied request to be transmitted
// Proxy `call`. // to the associated Server via RPC and blocks awaiting the response
pub struct ProxyResponse<Response> { // via `response_rx`.
inner: mpsc::Receiver<Response>, // A Proxy is associated with the ClientHandler via `handler_tx` to send requests,
// `response_rx` to receive responses, and uses `key` to identify the Proxy with
// the sending side of `response_rx` ClientHandler.
// Each Proxy is registered with the ClientHandler on initialization via the
// ProxyManager and unregistered when dropped.
// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler
// encounters an internal error, `response_tx` will be closed and `proxy_mgr` can
// no longer be upgraded to register new Proxies.
#[derive(Debug)]
pub struct Proxy<Request, Response> {
handle: Option<(EventLoopHandle, Token)>,
key: ProxyKey,
response_rx: Receiver<Response>,
handler_tx: ManuallyDrop<Sender<ProxyRequest<Request>>>,
proxy_mgr: Weak<ProxyManager<Response>>,
} }
impl<Response> ProxyResponse<Response> { impl<Request, Response> Proxy<Request, Response> {
pub fn wait(&self) -> Result<Response> { fn new(
match self.inner.recv() { handler_tx: Sender<ProxyRequest<Request>>,
proxy_mgr: Weak<ProxyManager<Response>>,
) -> Self {
let (tx, rx) = crossbeam_channel::bounded(1);
Self {
handle: None,
key: proxy_mgr.upgrade().unwrap().register_proxy(tx),
response_rx: rx,
handler_tx: ManuallyDrop::new(handler_tx),
proxy_mgr,
}
}
pub fn call(&self, request: Request) -> Result<Response> {
match self.handler_tx.send((self.key, request)) {
Ok(_) => self.wake_connection(),
Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"proxy send error",
))
}
}
match self.response_rx.recv() {
Ok(resp) => Ok(resp), Ok(resp) => Ok(resp),
Err(_) => Err(std::io::Error::new( Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
@ -63,27 +102,6 @@ impl<Response> ProxyResponse<Response> {
)), )),
} }
} }
}
// RPC Proxy that may be `clone`d for use by multiple owners/threads.
// A Proxy `call` arranges for the supplied request to be transmitted
// to the associated Server via RPC. The response can be retrieved by
// `wait`ing on the returned ProxyResponse.
#[derive(Debug)]
pub struct Proxy<Request, Response> {
handle: Option<(EventLoopHandle, Token)>,
tx: ManuallyDrop<mpsc::Sender<ProxyRequest<Request, Response>>>,
}
impl<Request, Response> Proxy<Request, Response> {
pub fn call(&self, request: Request) -> ProxyResponse<Response> {
let (tx, rx) = mpsc::channel();
match self.tx.send((request, tx)) {
Ok(_) => self.wake_connection(),
Err(e) => debug!("Proxy::call error={:?}", e),
}
ProxyResponse { inner: rx }
}
pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) { pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) {
self.handle = Some((handle, token)); self.handle = Some((handle, token));
@ -100,9 +118,13 @@ impl<Request, Response> Proxy<Request, Response> {
impl<Request, Response> Clone for Proxy<Request, Response> { impl<Request, Response> Clone for Proxy<Request, Response> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Proxy { let (tx, rx) = crossbeam_channel::bounded(1);
Self {
handle: self.handle.clone(), handle: self.handle.clone(),
tx: self.tx.clone(), key: self.proxy_mgr.upgrade().unwrap().register_proxy(tx),
response_rx: rx,
handler_tx: self.handler_tx.clone(),
proxy_mgr: self.proxy_mgr.clone(),
} }
} }
} }
@ -110,15 +132,56 @@ impl<Request, Response> Clone for Proxy<Request, Response> {
impl<Request, Response> Drop for Proxy<Request, Response> { impl<Request, Response> Drop for Proxy<Request, Response> {
fn drop(&mut self) { fn drop(&mut self) {
trace!("Proxy drop, waking EventLoop"); trace!("Proxy drop, waking EventLoop");
if let Some(mgr) = self.proxy_mgr.upgrade() {
mgr.unregister_proxy(self.key)
}
// Must drop Sender before waking the connection, otherwise // Must drop Sender before waking the connection, otherwise
// the wake may be processed before Sender is closed. // the wake may be processed before Sender is closed.
unsafe { ManuallyDrop::drop(&mut self.tx) } unsafe {
ManuallyDrop::drop(&mut self.handler_tx);
}
if self.handle.is_some() { if self.handle.is_some() {
self.wake_connection() self.wake_connection()
} }
} }
} }
const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client.
// Manage the Sender side of a ClientHandler's Proxies. Each Proxy registers itself with
// the manager on initialization.
#[derive(Debug)]
struct ProxyManager<Response> {
proxies: Mutex<Slab<Sender<Response>>>,
}
impl<Response> ProxyManager<Response> {
fn new() -> Self {
Self {
proxies: Mutex::new(Slab::with_capacity(RPC_CLIENT_INITIAL_PROXIES)),
}
}
// Register a Proxy's response Sender, returning a unique ID identifying
// the Proxy to the ClientHandler.
fn register_proxy(&self, tx: Sender<Response>) -> ProxyKey {
let mut proxies = self.proxies.lock().unwrap();
let entry = proxies.vacant_entry();
let key = entry.key();
entry.insert(tx);
key
}
fn unregister_proxy(&self, key: ProxyKey) {
let _ = self.proxies.lock().unwrap().remove(key);
}
// Deliver ClientHandler's Response to the Proxy associated with `key`.
fn deliver(&self, key: ProxyKey, resp: Response) {
let _ = self.proxies.lock().unwrap()[key].send(resp);
}
}
// Client-specific Handler implementation. // Client-specific Handler implementation.
// The IPC EventLoop Driver calls this to execute client-specific // The IPC EventLoop Driver calls this to execute client-specific
// RPC handling. Serialized messages sent via a Proxy are queued // RPC handling. Serialized messages sent via a Proxy are queued
@ -127,8 +190,26 @@ impl<Request, Response> Drop for Proxy<Request, Response> {
// trigger response completion by sending the response via a channel // trigger response completion by sending the response via a channel
// connected to a ProxyResponse. // connected to a ProxyResponse.
pub(crate) struct ClientHandler<C: Client> { pub(crate) struct ClientHandler<C: Client> {
messages: ProxyReceiver<C::ServerMessage, C::ClientMessage>, messages: Receiver<ProxyRequest<C::ServerMessage>>,
in_flight: VecDeque<mpsc::Sender<C::ClientMessage>>, // Proxies hold a Weak<ProxyManager> to register on initialization.
// When ClientHandler drops, any Proxies blocked on a response will
// error due to the Sender closing.
proxies: Arc<ProxyManager<C::ClientMessage>>,
in_flight: VecDeque<ProxyKey>,
}
impl<C: Client> ClientHandler<C> {
fn new(rx: Receiver<ProxyRequest<C::ServerMessage>>) -> ClientHandler<C> {
ClientHandler::<C> {
messages: rx,
proxies: Arc::new(ProxyManager::new()),
in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES),
}
}
fn proxy_manager(&self) -> Weak<ProxyManager<<C as Client>::ClientMessage>> {
Arc::downgrade(&self.proxies)
}
} }
impl<C: Client> Handler for ClientHandler<C> { impl<C: Client> Handler for ClientHandler<C> {
@ -137,8 +218,9 @@ impl<C: Client> Handler for ClientHandler<C> {
fn consume(&mut self, response: Self::In) -> Result<()> { fn consume(&mut self, response: Self::In) -> Result<()> {
trace!("ClientHandler::consume"); trace!("ClientHandler::consume");
if let Some(complete) = self.in_flight.pop_front() { // `proxy` identifies the waiting Proxy expecting `response`.
drop(complete.send(response)); if let Some(proxy) = self.in_flight.pop_front() {
self.proxies.deliver(proxy, response);
} else { } else {
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
@ -154,12 +236,12 @@ impl<C: Client> Handler for ClientHandler<C> {
// Try to get a new message // Try to get a new message
match self.messages.try_recv() { match self.messages.try_recv() {
Ok((request, response_tx)) => { Ok((proxy, request)) => {
trace!(" --> received request"); trace!(" --> received request");
self.in_flight.push_back(response_tx); self.in_flight.push_back(proxy);
Ok(Some(request)) Ok(Some(request))
} }
Err(mpsc::TryRecvError::Empty) => { Err(crossbeam_channel::TryRecvError::Empty) => {
trace!(" --> no request"); trace!(" --> no request");
Ok(None) Ok(None)
} }
@ -173,19 +255,12 @@ impl<C: Client> Handler for ClientHandler<C> {
pub(crate) fn make_client<C: Client>( pub(crate) fn make_client<C: Client>(
) -> (ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>) { ) -> (ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>) {
let (tx, rx) = mpsc::channel(); let (tx, rx) = crossbeam_channel::bounded(RPC_CLIENT_INITIAL_PROXIES);
let handler = ClientHandler::<C> { let handler = ClientHandler::new(rx);
messages: rx, let proxy_mgr = handler.proxy_manager();
in_flight: VecDeque::with_capacity(32),
};
let proxy = Proxy { (handler, Proxy::new(tx, proxy_mgr))
handle: None,
tx: ManuallyDrop::new(tx),
};
(handler, proxy)
} }
// Server-specific Handler implementation. // Server-specific Handler implementation.
@ -226,9 +301,11 @@ impl<S: Server> Handler for ServerHandler<S> {
} }
} }
const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server.
pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> { pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> {
ServerHandler::<S> { ServerHandler::<S> {
server, server,
in_flight: VecDeque::with_capacity(32), in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS),
} }
} }

View file

@ -25,8 +25,8 @@ webrender_bindings = { path = "../../../../gfx/webrender_bindings" }
cubeb-coreaudio = { git = "https://github.com/mozilla/cubeb-coreaudio-rs", rev = "44eca95823bb57e964cf7b6d9791ed2ccb4b2108", optional = true } cubeb-coreaudio = { git = "https://github.com/mozilla/cubeb-coreaudio-rs", rev = "44eca95823bb57e964cf7b6d9791ed2ccb4b2108", optional = true }
cubeb-pulse = { git = "https://github.com/mozilla/cubeb-pulse-rs", rev="1f1fe1e08e01a9a534ec7f079702a583a0899ce7", optional = true, features=["pulse-dlopen"] } cubeb-pulse = { git = "https://github.com/mozilla/cubeb-pulse-rs", rev="1f1fe1e08e01a9a534ec7f079702a583a0899ce7", optional = true, features=["pulse-dlopen"] }
cubeb-sys = { version = "0.10", optional = true, features=["gecko-in-tree"] } cubeb-sys = { version = "0.10", optional = true, features=["gecko-in-tree"] }
audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb", optional = true } # macos (v2) branch
audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb", optional = true } # macos (v2) branch
audioipc-client = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true } audioipc-client = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true }
audioipc-server = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true } audioipc-server = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true }
# Force tokio-reactor on an old version to avoid new dependencies of newer versions. # Force tokio-reactor on an old version to avoid new dependencies of newer versions.