forked from mirrors/gecko-dev
Bug 1659109 - Add a dispatch queue for FOG. r=chutten
Differential Revision: https://phabricator.services.mozilla.com/D88837
This commit is contained in:
parent
0ba0667036
commit
54cbab74c5
5 changed files with 334 additions and 0 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -1535,11 +1535,13 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"bincode",
|
||||
"chrono",
|
||||
"crossbeam-channel",
|
||||
"glean-core",
|
||||
"log",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"uuid",
|
||||
"xpcom",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ once_cell = "1.2.0"
|
|||
serde = { version = "1.0", features = ["derive"] }
|
||||
uuid = { version = "0.8.1", features = ["v4"] }
|
||||
xpcom = { path = "../../../../xpcom/rust/xpcom", optional = true }
|
||||
thiserror = "1.0.4"
|
||||
crossbeam-channel = "0.4.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.1.0"
|
||||
|
|
|
|||
65
toolkit/components/glean/api/src/dispatcher/global.rs
Normal file
65
toolkit/components/glean/api/src/dispatcher/global.rs
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use super::{DispatchError, DispatchGuard, Dispatcher};
|
||||
|
||||
const GLOBAL_DISPATCHER_LIMIT: usize = 100;
|
||||
static GLOBAL_DISPATCHER: Lazy<RwLock<Option<Dispatcher>>> =
|
||||
Lazy::new(|| RwLock::new(Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT))));
|
||||
|
||||
fn guard() -> &'static DispatchGuard {
|
||||
static GLOBAL_GUARD: OnceCell<DispatchGuard> = OnceCell::new();
|
||||
|
||||
GLOBAL_GUARD.get_or_init(|| {
|
||||
let lock = GLOBAL_DISPATCHER.read().unwrap();
|
||||
lock.as_ref().unwrap().guard()
|
||||
})
|
||||
}
|
||||
|
||||
/// Launches a new task on the global dispatch queue.
|
||||
///
|
||||
/// The new task will be enqueued immediately.
|
||||
/// If the pre-init queue was already flushed,
|
||||
/// the background thread will process tasks in the queue (see [`flush_init`]).
|
||||
///
|
||||
/// This will not block.
|
||||
///
|
||||
/// [`flush_init`]: fn.flush_init.html
|
||||
pub fn launch(task: impl FnOnce() + Send + 'static) {
|
||||
match guard().launch(task) {
|
||||
Ok(_) => {}
|
||||
Err(DispatchError::QueueFull) => {
|
||||
log::info!("Exceeded maximum queue size, discarding task");
|
||||
// TODO: Record this as an error.
|
||||
}
|
||||
Err(_) => {
|
||||
log::info!("Failed to launch a task on the queue. Discarding task.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts processing queued tasks in the global dispatch queue.
|
||||
///
|
||||
/// This function blocks until queued tasks prior to this call are finished.
|
||||
/// Once the initial queue is empty the dispatcher will wait for new tasks to be launched.
|
||||
pub fn flush_init() -> Result<(), DispatchError> {
|
||||
GLOBAL_DISPATCHER
|
||||
.write()
|
||||
.unwrap()
|
||||
.as_mut()
|
||||
.map(|dispatcher| dispatcher.flush_init())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Shuts down the dispatch queue.
|
||||
///
|
||||
/// This will initiate a shutdown of the worker thread
|
||||
/// and no new tasks will be processed after this.
|
||||
/// It will not block on the worker thread.
|
||||
pub fn try_shutdown() -> Result<(), DispatchError> {
|
||||
guard().shutdown()
|
||||
}
|
||||
260
toolkit/components/glean/api/src/dispatcher/mod.rs
Normal file
260
toolkit/components/glean/api/src/dispatcher/mod.rs
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
//! A global dispatcher queue.
|
||||
//!
|
||||
//! # Example - Global Dispatch queue
|
||||
//!
|
||||
//! The global dispatch queue is pre-configured with a maximum queue size of 100 tasks.
|
||||
//!
|
||||
//! ```rust,ignore
|
||||
//! // Ensure the dispatcher queue is being worked on.
|
||||
//! dispatcher::flush_init();
|
||||
//!
|
||||
//! dispatcher::launch(|| {
|
||||
//! println!("Executing expensive task");
|
||||
//! // Run your expensive task in a separate thread.
|
||||
//! });
|
||||
//!
|
||||
//! dispatcher::launch(|| {
|
||||
//! println!("A second task that's executed sequentially, but off the main thread.");
|
||||
//! });
|
||||
//! ```
|
||||
|
||||
use std::{
|
||||
mem,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{bounded, unbounded, SendError, Sender, TrySendError};
|
||||
use thiserror::Error;
|
||||
|
||||
pub use global::*;
|
||||
|
||||
mod global;
|
||||
|
||||
/// The command a worker should execute.
|
||||
enum Command {
|
||||
/// A task is a user-defined function to run.
|
||||
Task(Box<dyn FnOnce() + Send>),
|
||||
|
||||
/// Swap the channel
|
||||
Swap(Sender<()>),
|
||||
|
||||
/// Signal the worker to finish work and shut down.
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
/// The error returned from operations on the dispatcher
|
||||
#[derive(Error, Debug, PartialEq)]
|
||||
pub enum DispatchError {
|
||||
#[error("The worker panicked while running a task")]
|
||||
WorkerPanic,
|
||||
|
||||
#[error("Maximum queue size reached")]
|
||||
QueueFull,
|
||||
|
||||
#[error("Pre-init buffer was already flushed")]
|
||||
AlreadyFlushed,
|
||||
|
||||
#[error("Failed to send command to worker thread")]
|
||||
SendError,
|
||||
|
||||
#[error("Failed to receive from channel")]
|
||||
RecvError(#[from] crossbeam_channel::RecvError),
|
||||
}
|
||||
|
||||
impl From<TrySendError<Command>> for DispatchError {
|
||||
fn from(err: TrySendError<Command>) -> Self {
|
||||
match err {
|
||||
TrySendError::Full(_) => DispatchError::QueueFull,
|
||||
_ => DispatchError::SendError,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<SendError<T>> for DispatchError {
|
||||
fn from(_: SendError<T>) -> Self {
|
||||
DispatchError::SendError
|
||||
}
|
||||
}
|
||||
|
||||
/// A clonable guard for a dispatch queue.
|
||||
#[derive(Clone)]
|
||||
struct DispatchGuard {
|
||||
queue_preinit: Arc<AtomicBool>,
|
||||
preinit: Sender<Command>,
|
||||
queue: Sender<Command>,
|
||||
}
|
||||
|
||||
impl DispatchGuard {
|
||||
pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> {
|
||||
let task = Command::Task(Box::new(task));
|
||||
self.send(task)
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) -> Result<(), DispatchError> {
|
||||
self.send(Command::Shutdown)
|
||||
}
|
||||
|
||||
fn send(&self, task: Command) -> Result<(), DispatchError> {
|
||||
if self.queue_preinit.load(Ordering::SeqCst) {
|
||||
match self.preinit.try_send(task) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(TrySendError::Full(_)) => Err(DispatchError::QueueFull),
|
||||
Err(TrySendError::Disconnected(_)) => Err(DispatchError::SendError),
|
||||
}
|
||||
} else {
|
||||
self.queue.send(task)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A dispatcher.
|
||||
///
|
||||
/// Run expensive processing tasks sequentially off the main thread.
|
||||
/// Tasks are processed in a single separate thread in the order they are submitted.
|
||||
/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size.
|
||||
/// Processing will start after flushing once, processing already enqueued tasks first, then
|
||||
/// waiting for further tasks to be enqueued.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// # use dispatcher::Dispatcher;
|
||||
/// let mut dispatcher = Dispatcher::new(5);
|
||||
/// dispatcher.flush_init();
|
||||
///
|
||||
/// dispatcher.launch(|| {
|
||||
/// println!("A task of the main thread");
|
||||
/// }).unwrap();
|
||||
///
|
||||
/// dispatcher.try_shutdown().unwrap();
|
||||
/// dispatcher.join().unwrap();
|
||||
/// ```
|
||||
pub struct Dispatcher {
|
||||
/// Whether to queue on the preinit buffer or on the unbounded queue
|
||||
queue_preinit: Arc<AtomicBool>,
|
||||
|
||||
/// Used to unblock the worker thread initially.
|
||||
block_sender: Sender<()>,
|
||||
|
||||
/// Sender for the preinit queue.
|
||||
preinit_sender: Sender<Command>,
|
||||
|
||||
/// Sender for the unbounded queue.
|
||||
sender: Sender<Command>,
|
||||
}
|
||||
|
||||
impl Dispatcher {
|
||||
/// Creates a new dispatcher with a maximum queue size.
|
||||
///
|
||||
/// Launched tasks won't run until [`flush_init`] is called.
|
||||
///
|
||||
/// [`flush_init`]: #method.flush_init
|
||||
pub fn new(max_queue_size: usize) -> Self {
|
||||
let (block_sender, block_receiver) = bounded(0);
|
||||
let (preinit_sender, preinit_receiver) = bounded(max_queue_size);
|
||||
let (sender, mut unbounded_receiver) = unbounded();
|
||||
|
||||
let queue_preinit = Arc::new(AtomicBool::new(true));
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Err(_) = block_receiver.recv() {
|
||||
// The other side was disconnected.
|
||||
// There's nothing the worker thread can do.
|
||||
log::error!("The task producer was disconnected. Worker thread will exit.");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut receiver = preinit_receiver;
|
||||
loop {
|
||||
use Command::*;
|
||||
|
||||
match receiver.recv() {
|
||||
Ok(Shutdown) => {
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(Task(f)) => {
|
||||
(f)();
|
||||
}
|
||||
|
||||
Ok(Swap(swap_done)) => {
|
||||
// A swap should only occur exactly once.
|
||||
// This is upheld by `flush_init`, which errors out if the preinit buffer
|
||||
// was already flushed.
|
||||
|
||||
// We swap the channels we listen on for new tasks.
|
||||
// The next iteration will continue with the unbounded queue.
|
||||
mem::swap(&mut receiver, &mut unbounded_receiver);
|
||||
|
||||
// The swap command MUST be the last one received on the preinit buffer,
|
||||
// so by the time we run this we know all preinit tasks were processed.
|
||||
// We can notify the other side.
|
||||
swap_done
|
||||
.send(())
|
||||
.expect("The caller of `flush_init` has gone missing");
|
||||
}
|
||||
|
||||
// Other side was disconnected.
|
||||
Err(_) => {
|
||||
log::error!("The task producer was disconnected. Worker thread will exit.");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Dispatcher {
|
||||
queue_preinit,
|
||||
block_sender,
|
||||
preinit_sender,
|
||||
sender,
|
||||
}
|
||||
}
|
||||
|
||||
fn guard(&self) -> DispatchGuard {
|
||||
DispatchGuard {
|
||||
queue_preinit: Arc::clone(&self.queue_preinit),
|
||||
preinit: self.preinit_sender.clone(),
|
||||
queue: self.sender.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes the pre-init buffer.
|
||||
///
|
||||
/// This function blocks until tasks queued prior to this call are finished.
|
||||
/// Once the initial queue is empty the dispatcher will wait for new tasks to be launched.
|
||||
///
|
||||
/// Returns an error if called multiple times.
|
||||
pub fn flush_init(&mut self) -> Result<(), DispatchError> {
|
||||
// We immediately stop queueing in the pre-init buffer.
|
||||
let old_val = self.queue_preinit.swap(false, Ordering::SeqCst);
|
||||
if !old_val {
|
||||
return Err(DispatchError::AlreadyFlushed);
|
||||
}
|
||||
|
||||
// Unblock the worker thread exactly once.
|
||||
self.block_sender.send(())?;
|
||||
|
||||
// Single-use channel to communicate with the worker thread.
|
||||
let (swap_sender, swap_receiver) = bounded(0);
|
||||
|
||||
// Send final command and block until it is sent.
|
||||
self.preinit_sender
|
||||
.send(Command::Swap(swap_sender))
|
||||
.map_err(|_| DispatchError::SendError)?;
|
||||
|
||||
// Now wait for the worker thread to do the swap and inform us.
|
||||
// This blocks until all tasks in the preinit buffer have been processed.
|
||||
swap_receiver.recv()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ pub mod pings;
|
|||
pub mod private;
|
||||
|
||||
pub mod ipc;
|
||||
pub(crate) mod dispatcher;
|
||||
|
||||
/// Run a closure with a mutable reference to the locked global Glean object.
|
||||
fn with_glean<F, R>(f: F) -> R
|
||||
|
|
@ -40,3 +41,7 @@ where
|
|||
pub fn is_upload_enabled() -> bool {
|
||||
with_glean(|glean| glean.is_upload_enabled())
|
||||
}
|
||||
|
||||
pub fn flush_init() -> Result<(), dispatcher::DispatchError> {
|
||||
dispatcher::flush_init()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue