forked from mirrors/gecko-dev
Bug 1817904 (part 2) - update golden_gate and webext_storage_bridge for BridgedEngine changes. r=lina
Differential Revision: https://phabricator.services.mozilla.com/D170467
This commit is contained in:
parent
333488dcfc
commit
205211acee
10 changed files with 129 additions and 285 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -2418,6 +2418,7 @@ dependencies = [
|
|||
name = "golden_gate"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"atomic_refcell",
|
||||
"cstr",
|
||||
"interrupt-support",
|
||||
|
|
@ -6254,6 +6255,7 @@ dependencies = [
|
|||
name = "webext_storage_bridge"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"atomic_refcell",
|
||||
"cstr",
|
||||
"golden_gate",
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ edition = "2018"
|
|||
license = "MPL-2.0"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
atomic_refcell = "0.1"
|
||||
cstr = "0.2"
|
||||
interrupt-support = "0.1"
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ impl fmt::Display for Error {
|
|||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Nsresult(result) => write!(f, "Operation failed with {}", result.error_name()),
|
||||
Error::DidNotRun(what) => write!(f, "Failed to run `{}` on background thread", what),
|
||||
Error::DidNotRun(what) => write!(f, "Failed to run `{what}` on background thread"),
|
||||
Error::MalformedString(error) => error.fmt(f),
|
||||
}
|
||||
}
|
||||
|
|
@ -69,18 +69,3 @@ impl From<Error> for nsresult {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait that constrains the type of `BridgedEngine::Error`, such that it can
|
||||
/// be used as a trait bound for ferries and tasks. `BridgedEngine` doesn't
|
||||
/// constrain its associated `Error` type, but we must, so that we can return
|
||||
/// Golden Gate errors alongside `BridgedEngine::Error`s, and pass their
|
||||
/// result codes and messages to `mozIBridgedSyncEngine*Callback::HandleError`.
|
||||
/// Since tasks share error results between the main and background threads,
|
||||
/// errors must also be `Send + Sync`.
|
||||
///
|
||||
/// This would be cleaner to express as a trait alias, but those aren't stable
|
||||
/// yet (see rust-lang/rust#41517). Instead, we define a trait with no methods,
|
||||
/// and a blanket implementation for its supertraits.
|
||||
pub trait BridgedError: From<Error> + Into<nsresult> + fmt::Display + Send + Sync {}
|
||||
|
||||
impl<T> BridgedError for T where T: From<Error> + Into<nsresult> + fmt::Display + Send + Sync {}
|
||||
|
|
|
|||
|
|
@ -110,18 +110,15 @@ impl Log for LogSink {
|
|||
}
|
||||
if let Some(logger) = &self.logger {
|
||||
let mut message = nsString::new();
|
||||
match write!(message, "{}", record.args()) {
|
||||
Ok(_) => {
|
||||
let task = LogTask {
|
||||
logger: logger.clone(),
|
||||
level: record.metadata().level(),
|
||||
message,
|
||||
};
|
||||
let _ =
|
||||
TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
|
||||
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
|
||||
}
|
||||
Err(_) => {}
|
||||
if write!(message, "{}", record.args()).is_ok() {
|
||||
let task = LogTask {
|
||||
logger: logger.clone(),
|
||||
level: record.metadata().level(),
|
||||
message,
|
||||
};
|
||||
let _ =
|
||||
TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task))
|
||||
.and_then(|r| TaskRunnable::dispatch(r, logger.owning_thread()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,15 +2,11 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use std::{
|
||||
fmt::Write,
|
||||
mem, result,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
use std::{fmt::Write, mem, result};
|
||||
|
||||
use atomic_refcell::AtomicRefCell;
|
||||
use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder};
|
||||
use nserror::nsresult;
|
||||
use nserror::{nsresult, NS_ERROR_FAILURE};
|
||||
use nsstring::{nsACString, nsCString};
|
||||
use sync15::engine::{ApplyResults, BridgedEngine};
|
||||
use sync15::Guid;
|
||||
|
|
@ -22,55 +18,57 @@ use xpcom::{
|
|||
RefPtr,
|
||||
};
|
||||
|
||||
use crate::error::{BridgedError, Error, Result};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::ferry::{Ferry, FerryResult};
|
||||
|
||||
/// A ferry task sends (or ferries) an operation to a bridged engine on a
|
||||
/// background thread or task queue, and ferries back an optional result to
|
||||
/// a callback.
|
||||
pub struct FerryTask<N: ?Sized + BridgedEngine> {
|
||||
/// A ferry task holds a weak reference to the bridged engine, and upgrades
|
||||
/// it to a strong reference when run on a background thread. This avoids
|
||||
/// scheduled ferries blocking finalization: if the main thread holds the
|
||||
/// only strong reference to the engine, it can be unwrapped (using
|
||||
/// `Arc::try_unwrap`) and dropped, either on the main thread, or as part of
|
||||
/// a teardown task.
|
||||
engine: Weak<N>,
|
||||
pub struct FerryTask {
|
||||
/// We want to ensure scheduled ferries can't block finalization of the underlying
|
||||
/// store - we want a degree of confidence that closing the database will happen when
|
||||
/// we want even if tasks are queued up to run on another thread.
|
||||
/// We rely on the semantics of our BridgedEngines to help here:
|
||||
/// * A bridged engine is expected to hold a weak reference to its store.
|
||||
/// * Our LazyStore is the only thing holding a reference to the "real" store.
|
||||
/// Thus, when our LazyStore asks our "real" store to close, we can be confident
|
||||
/// a close will happen (ie, we assume that the real store will be able to unwrapp
|
||||
/// the underlying sqlite `Connection` (using `Arc::try_unwrap`) and close it.
|
||||
/// However, note that if an operation on the bridged engine is currently running,
|
||||
/// we will block waiting for that operation to complete, so while this isn't
|
||||
/// guaranteed to happen immediately, it should happen "soon enough".
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
ferry: Ferry,
|
||||
callback: ThreadPtrHandle<mozIBridgedSyncEngineCallback>,
|
||||
result: AtomicRefCell<result::Result<FerryResult, N::Error>>,
|
||||
result: AtomicRefCell<anyhow::Result<FerryResult>>,
|
||||
}
|
||||
|
||||
impl<N> FerryTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine + Send + Sync + 'static,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
impl FerryTask {
|
||||
/// Creates a task to fetch the engine's last sync time, in milliseconds.
|
||||
#[inline]
|
||||
pub fn for_last_sync(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::LastSync, callback)
|
||||
}
|
||||
|
||||
/// Creates a task to set the engine's last sync time, in milliseconds.
|
||||
#[inline]
|
||||
pub fn for_set_last_sync(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
last_sync_millis: i64,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback)
|
||||
}
|
||||
|
||||
/// Creates a task to fetch the engine's sync ID.
|
||||
#[inline]
|
||||
pub fn for_sync_id(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::SyncId, callback)
|
||||
}
|
||||
|
||||
|
|
@ -78,9 +76,9 @@ where
|
|||
/// metadata.
|
||||
#[inline]
|
||||
pub fn for_reset_sync_id(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::ResetSyncId, callback)
|
||||
}
|
||||
|
||||
|
|
@ -89,10 +87,10 @@ where
|
|||
/// to use.
|
||||
#[inline]
|
||||
pub fn for_ensure_current_sync_id(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
new_sync_id: &nsACString,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(
|
||||
engine,
|
||||
Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()),
|
||||
|
|
@ -103,18 +101,18 @@ where
|
|||
/// Creates a task to signal that the engine is about to sync.
|
||||
#[inline]
|
||||
pub fn for_sync_started(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::SyncStarted, callback)
|
||||
}
|
||||
|
||||
/// Creates a task to store incoming records.
|
||||
pub fn for_store_incoming(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
incoming_envelopes_json: &[nsCString],
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(
|
||||
engine,
|
||||
Ferry::StoreIncoming(incoming_envelopes_json.to_vec()),
|
||||
|
|
@ -126,14 +124,14 @@ where
|
|||
/// may be called multiple times per sync, or not at all if there are no
|
||||
/// records to upload.
|
||||
pub fn for_set_uploaded(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
server_modified_millis: i64,
|
||||
uploaded_ids: &[nsCString],
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
let uploaded_ids = uploaded_ids
|
||||
.iter()
|
||||
.map(|id| Guid::from_slice(&*id))
|
||||
.map(|id| Guid::from_slice(id))
|
||||
.collect();
|
||||
Self::with_ferry(
|
||||
engine,
|
||||
|
|
@ -147,9 +145,9 @@ where
|
|||
/// records uploaded.
|
||||
#[inline]
|
||||
pub fn for_sync_finished(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::SyncFinished, callback)
|
||||
}
|
||||
|
||||
|
|
@ -157,18 +155,18 @@ where
|
|||
/// erasing user data.
|
||||
#[inline]
|
||||
pub fn for_reset(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::Reset, callback)
|
||||
}
|
||||
|
||||
/// Creates a task to erase all local user data for the engine.
|
||||
#[inline]
|
||||
pub fn for_wipe(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
Self::with_ferry(engine, Ferry::Wipe, callback)
|
||||
}
|
||||
|
||||
|
|
@ -176,13 +174,13 @@ where
|
|||
/// thread, and will be called once, after the ferry returns from the
|
||||
/// background thread.
|
||||
fn with_ferry(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
ferry: Ferry,
|
||||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<FerryTask<N>> {
|
||||
) -> Result<FerryTask> {
|
||||
let name = ferry.name();
|
||||
Ok(FerryTask {
|
||||
engine: Arc::downgrade(engine),
|
||||
engine,
|
||||
ferry,
|
||||
callback: ThreadPtrHolder::new(
|
||||
cstr!("mozIBridgedSyncEngineCallback"),
|
||||
|
|
@ -204,20 +202,11 @@ where
|
|||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<N> FerryTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
/// Runs the task on the background thread. This is split out into its own
|
||||
/// method to make error handling easier.
|
||||
fn inner_run(&self) -> result::Result<FerryResult, N::Error> {
|
||||
let engine = match self.engine.upgrade() {
|
||||
Some(outer) => outer,
|
||||
None => return Err(Error::DidNotRun(self.ferry.name()).into()),
|
||||
};
|
||||
fn inner_run(&self) -> anyhow::Result<FerryResult> {
|
||||
let engine = &self.engine;
|
||||
Ok(match &self.ferry {
|
||||
Ferry::LastSync => FerryResult::LastSync(engine.last_sync()?),
|
||||
Ferry::SetLastSync(last_sync_millis) => {
|
||||
|
|
@ -227,7 +216,7 @@ where
|
|||
Ferry::SyncId => FerryResult::SyncId(engine.sync_id()?),
|
||||
Ferry::ResetSyncId => FerryResult::AssignedSyncId(engine.reset_sync_id()?),
|
||||
Ferry::EnsureCurrentSyncId(new_sync_id) => {
|
||||
FerryResult::AssignedSyncId(engine.ensure_current_sync_id(&*new_sync_id)?)
|
||||
FerryResult::AssignedSyncId(engine.ensure_current_sync_id(new_sync_id)?)
|
||||
}
|
||||
Ferry::SyncStarted => {
|
||||
engine.sync_started()?;
|
||||
|
|
@ -236,7 +225,7 @@ where
|
|||
Ferry::StoreIncoming(incoming_envelopes_json) => {
|
||||
let incoming_envelopes = incoming_envelopes_json
|
||||
.iter()
|
||||
.map(|envelope| Ok(serde_json::from_slice(&*envelope)?))
|
||||
.map(|envelope| Ok(serde_json::from_slice(envelope)?))
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
engine.store_incoming(incoming_envelopes)?;
|
||||
|
|
@ -262,11 +251,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<N> Task for FerryTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
impl Task for FerryTask {
|
||||
fn run(&self) {
|
||||
*self.result.borrow_mut() = self.inner_run();
|
||||
}
|
||||
|
|
@ -280,8 +265,8 @@ where
|
|||
Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) },
|
||||
Err(err) => {
|
||||
let mut message = nsCString::new();
|
||||
write!(message, "{}", err).unwrap();
|
||||
unsafe { callback.HandleError(err.into(), &*message) }
|
||||
write!(message, "{err}").unwrap();
|
||||
unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) }
|
||||
}
|
||||
}
|
||||
.to_result()
|
||||
|
|
@ -291,53 +276,39 @@ where
|
|||
/// An apply task ferries incoming records to an engine on a background
|
||||
/// thread, and ferries back records to upload. It's separate from
|
||||
/// `FerryTask` because its callback type is different.
|
||||
pub struct ApplyTask<N: ?Sized + BridgedEngine> {
|
||||
engine: Weak<N>,
|
||||
pub struct ApplyTask {
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: ThreadPtrHandle<mozIBridgedSyncEngineApplyCallback>,
|
||||
result: AtomicRefCell<result::Result<Vec<String>, N::Error>>,
|
||||
result: AtomicRefCell<anyhow::Result<Vec<String>>>,
|
||||
}
|
||||
|
||||
impl<N> ApplyTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
impl ApplyTask {
|
||||
/// Returns the task name for debugging.
|
||||
pub fn name() -> &'static str {
|
||||
concat!(module_path!(), "apply")
|
||||
}
|
||||
|
||||
/// Runs the task on the background thread.
|
||||
fn inner_run(&self) -> result::Result<Vec<String>, N::Error> {
|
||||
let engine = match self.engine.upgrade() {
|
||||
Some(outer) => outer,
|
||||
None => return Err(Error::DidNotRun(Self::name()).into()),
|
||||
};
|
||||
fn inner_run(&self) -> anyhow::Result<Vec<String>> {
|
||||
let ApplyResults {
|
||||
records: outgoing_records,
|
||||
..
|
||||
} = engine.apply()?;
|
||||
} = self.engine.apply()?;
|
||||
let outgoing_records_json = outgoing_records
|
||||
.iter()
|
||||
.map(|record| Ok(serde_json::to_string(record)?))
|
||||
.collect::<Result<_>>()?;
|
||||
Ok(outgoing_records_json)
|
||||
}
|
||||
}
|
||||
|
||||
impl<N> ApplyTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine + Send + Sync + 'static,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
/// Creates a task. The `callback` is bound to the current thread, and will
|
||||
/// be called once, after the records are applied on the background thread.
|
||||
pub fn new(
|
||||
engine: &Arc<N>,
|
||||
engine: Box<dyn BridgedEngine>,
|
||||
callback: &mozIBridgedSyncEngineApplyCallback,
|
||||
) -> Result<ApplyTask<N>> {
|
||||
) -> Result<ApplyTask> {
|
||||
Ok(ApplyTask {
|
||||
engine: Arc::downgrade(engine),
|
||||
engine,
|
||||
callback: ThreadPtrHolder::new(
|
||||
cstr!("mozIBridgedSyncEngineApplyCallback"),
|
||||
RefPtr::new(callback),
|
||||
|
|
@ -358,11 +329,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<N> Task for ApplyTask<N>
|
||||
where
|
||||
N: ?Sized + BridgedEngine,
|
||||
N::Error: BridgedError,
|
||||
{
|
||||
impl Task for ApplyTask {
|
||||
fn run(&self) {
|
||||
*self.result.borrow_mut() = self.inner_run();
|
||||
}
|
||||
|
|
@ -382,8 +349,8 @@ where
|
|||
}
|
||||
Err(err) => {
|
||||
let mut message = nsCString::new();
|
||||
write!(message, "{}", err).unwrap();
|
||||
unsafe { callback.HandleError(err.into(), &*message) }
|
||||
write!(message, "{err}").unwrap();
|
||||
unsafe { callback.HandleError(NS_ERROR_FAILURE, &*message) }
|
||||
}
|
||||
}
|
||||
.to_result()
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ edition = "2018"
|
|||
license = "MPL-2.0"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
atomic_refcell = "0.1"
|
||||
cstr = "0.2"
|
||||
golden_gate = { path = "../../../../../services/sync/golden_gate" }
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
use golden_gate::{ApplyTask, FerryTask};
|
||||
use golden_gate::{ApplyTask, BridgedEngine, FerryTask};
|
||||
use moz_task::{self, DispatchOptions, TaskRunnable};
|
||||
use nserror::{nsresult, NS_OK};
|
||||
use nsstring::{nsACString, nsCString, nsString};
|
||||
|
|
@ -43,12 +43,12 @@ fn path_from_nsifile(file: &nsIFile) -> Result<PathBuf> {
|
|||
#[cfg(windows)]
|
||||
{
|
||||
use std::os::windows::prelude::*;
|
||||
OsString::from_wide(&*raw_path)
|
||||
OsString::from_wide(&raw_path)
|
||||
}
|
||||
// On other platforms, we must first decode the raw path from
|
||||
// UTF-16, and then create our native string.
|
||||
#[cfg(not(windows))]
|
||||
OsString::from(String::from_utf16(&*raw_path)?)
|
||||
OsString::from(String::from_utf16(&raw_path)?)
|
||||
};
|
||||
Ok(native_path.into())
|
||||
}
|
||||
|
|
@ -144,8 +144,8 @@ impl StorageSyncArea {
|
|||
) -> Result<()> {
|
||||
self.dispatch(
|
||||
Punt::Set {
|
||||
ext_id: str::from_utf8(&*ext_id)?.into(),
|
||||
value: serde_json::from_str(str::from_utf8(&*json)?)?,
|
||||
ext_id: str::from_utf8(ext_id)?.into(),
|
||||
value: serde_json::from_str(str::from_utf8(json)?)?,
|
||||
},
|
||||
callback,
|
||||
)?;
|
||||
|
|
@ -168,8 +168,8 @@ impl StorageSyncArea {
|
|||
) -> Result<()> {
|
||||
self.dispatch(
|
||||
Punt::Get {
|
||||
ext_id: str::from_utf8(&*ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(&*json)?)?,
|
||||
ext_id: str::from_utf8(ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(json)?)?,
|
||||
},
|
||||
callback,
|
||||
)
|
||||
|
|
@ -191,8 +191,8 @@ impl StorageSyncArea {
|
|||
) -> Result<()> {
|
||||
self.dispatch(
|
||||
Punt::Remove {
|
||||
ext_id: str::from_utf8(&*ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(&*json)?)?,
|
||||
ext_id: str::from_utf8(ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(json)?)?,
|
||||
},
|
||||
callback,
|
||||
)
|
||||
|
|
@ -208,7 +208,7 @@ impl StorageSyncArea {
|
|||
fn clear(&self, ext_id: &nsACString, callback: &mozIExtensionStorageCallback) -> Result<()> {
|
||||
self.dispatch(
|
||||
Punt::Clear {
|
||||
ext_id: str::from_utf8(&*ext_id)?.into(),
|
||||
ext_id: str::from_utf8(ext_id)?.into(),
|
||||
},
|
||||
callback,
|
||||
)
|
||||
|
|
@ -230,8 +230,8 @@ impl StorageSyncArea {
|
|||
) -> Result<()> {
|
||||
self.dispatch(
|
||||
Punt::GetBytesInUse {
|
||||
ext_id: str::from_utf8(&*ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(&*keys)?)?,
|
||||
ext_id: str::from_utf8(ext_id)?.into(),
|
||||
keys: serde_json::from_str(str::from_utf8(keys)?)?,
|
||||
},
|
||||
callback,
|
||||
)
|
||||
|
|
@ -341,7 +341,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn get_last_sync(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_last_sync(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_last_sync(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -356,7 +356,7 @@ impl StorageSyncArea {
|
|||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
FerryTask::for_set_last_sync(&*self.store()?, last_sync_millis, callback)?
|
||||
FerryTask::for_set_last_sync(self.new_bridge()?, last_sync_millis, callback)?
|
||||
.dispatch(&self.queue)?,
|
||||
)
|
||||
}
|
||||
|
|
@ -367,7 +367,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn get_sync_id(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_sync_id(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_sync_id(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -376,7 +376,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn reset_sync_id(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_reset_sync_id(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_reset_sync_id(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -391,7 +391,7 @@ impl StorageSyncArea {
|
|||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<()> {
|
||||
Ok(
|
||||
FerryTask::for_ensure_current_sync_id(&*self.store()?, new_sync_id, callback)?
|
||||
FerryTask::for_ensure_current_sync_id(self.new_bridge()?, new_sync_id, callback)?
|
||||
.dispatch(&self.queue)?,
|
||||
)
|
||||
}
|
||||
|
|
@ -402,7 +402,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn sync_started(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_sync_started(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_sync_started(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -417,7 +417,7 @@ impl StorageSyncArea {
|
|||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<()> {
|
||||
Ok(FerryTask::for_store_incoming(
|
||||
&*self.store()?,
|
||||
self.new_bridge()?,
|
||||
incoming_envelopes_json.map(|v| v.as_slice()).unwrap_or(&[]),
|
||||
callback,
|
||||
)?
|
||||
|
|
@ -426,7 +426,7 @@ impl StorageSyncArea {
|
|||
|
||||
xpcom_method!(apply => Apply(callback: *const mozIBridgedSyncEngineApplyCallback));
|
||||
fn apply(&self, callback: &mozIBridgedSyncEngineApplyCallback) -> Result<()> {
|
||||
Ok(ApplyTask::new(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(ApplyTask::new(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -443,7 +443,7 @@ impl StorageSyncArea {
|
|||
callback: &mozIBridgedSyncEngineCallback,
|
||||
) -> Result<()> {
|
||||
Ok(FerryTask::for_set_uploaded(
|
||||
&*self.store()?,
|
||||
self.new_bridge()?,
|
||||
server_modified_millis,
|
||||
uploaded_ids.map(|v| v.as_slice()).unwrap_or(&[]),
|
||||
callback,
|
||||
|
|
@ -457,7 +457,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn sync_finished(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_sync_finished(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_sync_finished(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -466,7 +466,7 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn reset(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_reset(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_reset(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
xpcom_method!(
|
||||
|
|
@ -475,6 +475,10 @@ impl StorageSyncArea {
|
|||
)
|
||||
);
|
||||
fn wipe(&self, callback: &mozIBridgedSyncEngineCallback) -> Result<()> {
|
||||
Ok(FerryTask::for_wipe(&*self.store()?, callback)?.dispatch(&self.queue)?)
|
||||
Ok(FerryTask::for_wipe(self.new_bridge()?, callback)?.dispatch(&self.queue)?)
|
||||
}
|
||||
|
||||
fn new_bridge(&self) -> Result<Box<dyn BridgedEngine>> {
|
||||
Ok(Box::new(self.store()?.get()?.bridged_engine()))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,9 +104,9 @@ impl From<Error> for nsresult {
|
|||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Nsresult(result) => write!(f, "Operation failed with {}", result),
|
||||
Error::Nsresult(result) => write!(f, "Operation failed with {result}"),
|
||||
Error::WebextStorage(error) => error.fmt(f),
|
||||
Error::MigrationFailed(error) => write!(f, "Migration failed with {}", error),
|
||||
Error::MigrationFailed(error) => write!(f, "Migration failed with {error}"),
|
||||
Error::GoldenGate(error) => error.fmt(f),
|
||||
Error::MalformedString(error) => error.fmt(f),
|
||||
Error::AlreadyConfigured => write!(f, "The storage area is already configured"),
|
||||
|
|
@ -114,8 +114,8 @@ impl fmt::Display for Error {
|
|||
f,
|
||||
"The storage area must be configured by calling `configure` first"
|
||||
),
|
||||
Error::AlreadyRan(what) => write!(f, "`{}` already ran on the background thread", what),
|
||||
Error::DidNotRun(what) => write!(f, "`{}` didn't run on the background thread", what),
|
||||
Error::AlreadyRan(what) => write!(f, "`{what}` already ran on the background thread"),
|
||||
Error::DidNotRun(what) => write!(f, "`{what}` didn't run on the background thread"),
|
||||
Error::AlreadyTornDown => {
|
||||
write!(f, "Can't use a storage area that's already torn down")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -232,7 +232,7 @@ impl Task for PuntTask {
|
|||
}
|
||||
Err(err) => {
|
||||
let mut message = nsCString::new();
|
||||
write!(message, "{}", err).unwrap();
|
||||
write!(message, "{err}").unwrap();
|
||||
unsafe { callback.HandleError(err.into(), &*message) }
|
||||
}
|
||||
}
|
||||
|
|
@ -312,7 +312,7 @@ impl Task for TeardownTask {
|
|||
Ok(()) => unsafe { callback.HandleSuccess(().into_variant().coerce()) },
|
||||
Err(err) => {
|
||||
let mut message = nsCString::new();
|
||||
write!(message, "{}", err).unwrap();
|
||||
write!(message, "{err}").unwrap();
|
||||
unsafe { callback.HandleError(err.into(), &*message) }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,19 +2,13 @@
|
|||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
use std::{
|
||||
fs::remove_file,
|
||||
mem,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex, MutexGuard},
|
||||
};
|
||||
use std::{fs::remove_file, path::PathBuf, sync::Arc};
|
||||
|
||||
use golden_gate::{ApplyResults, BridgedEngine, Guid, IncomingBso};
|
||||
use interrupt_support::SqlInterruptHandle;
|
||||
use once_cell::sync::OnceCell;
|
||||
use webext_storage::store::Store;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::error::{self, Error};
|
||||
|
||||
/// Options for an extension storage area.
|
||||
pub struct LazyStoreConfig {
|
||||
|
|
@ -36,20 +30,16 @@ pub struct LazyStore {
|
|||
}
|
||||
|
||||
/// An `InterruptStore` wraps an inner extension store, and its interrupt
|
||||
/// handle. The inner store is protected by a mutex, which we don't want to
|
||||
/// lock on the main thread because it'll block waiting on any storage
|
||||
/// operations running on the background thread, which defeats the point of the
|
||||
/// interrupt. The interrupt handle is safe to access on the main thread, and
|
||||
/// doesn't require locking.
|
||||
/// handle.
|
||||
struct InterruptStore {
|
||||
inner: Mutex<Store>,
|
||||
inner: Store,
|
||||
handle: Arc<SqlInterruptHandle>,
|
||||
}
|
||||
|
||||
impl LazyStore {
|
||||
/// Configures the lazy store. Returns an error if the store has already
|
||||
/// been configured. This method should be called from the main thread.
|
||||
pub fn configure(&self, config: LazyStoreConfig) -> Result<()> {
|
||||
pub fn configure(&self, config: LazyStoreConfig) -> error::Result<()> {
|
||||
self.config
|
||||
.set(config)
|
||||
.map_err(|_| Error::AlreadyConfigured)
|
||||
|
|
@ -69,133 +59,34 @@ impl LazyStore {
|
|||
/// Returns the underlying store, initializing it if needed. This method
|
||||
/// should only be called from a background thread or task queue, since
|
||||
/// opening the database does I/O.
|
||||
pub fn get(&self) -> Result<MutexGuard<'_, Store>> {
|
||||
Ok(self
|
||||
pub fn get(&self) -> error::Result<&Store> {
|
||||
Ok(&self
|
||||
.store
|
||||
.get_or_try_init(|| match self.config.get() {
|
||||
Some(config) => {
|
||||
let store = init_store(config)?;
|
||||
let handle = store.interrupt_handle();
|
||||
Ok(InterruptStore {
|
||||
inner: Mutex::new(store),
|
||||
inner: store,
|
||||
handle,
|
||||
})
|
||||
}
|
||||
None => Err(Error::NotConfigured),
|
||||
})?
|
||||
.inner
|
||||
.lock()
|
||||
.unwrap())
|
||||
.inner)
|
||||
}
|
||||
|
||||
/// Tears down the store. If the store wasn't initialized, this is a no-op.
|
||||
/// This should only be called from a background thread or task queue,
|
||||
/// because closing the database also does I/O.
|
||||
pub fn teardown(self) -> Result<()> {
|
||||
if let Some(store) = self
|
||||
.store
|
||||
.into_inner()
|
||||
.map(|outer| outer.inner.into_inner().unwrap())
|
||||
{
|
||||
if let Err((store, error)) = store.close() {
|
||||
// Since we're most likely being called during shutdown, leak
|
||||
// the store on error...it'll be cleaned up when the process
|
||||
// quits, anyway. We don't want to drop it, because its
|
||||
// destructor will try to close it again, and panic on error.
|
||||
// That'll become a shutdown crash, which we want to avoid.
|
||||
mem::forget(store);
|
||||
return Err(error.into());
|
||||
}
|
||||
pub fn teardown(self) -> error::Result<()> {
|
||||
if let Some(store) = self.store.into_inner() {
|
||||
store.inner.close()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// `Store::bridged_engine()` returns a `BridgedEngine` implementation, but we
|
||||
// provide our own for `LazyStore` that forwards to it. This is for three
|
||||
// reasons.
|
||||
//
|
||||
// 1. We need to override the associated `Error` type, since Golden Gate has
|
||||
// an `Into<nsresult>` bound for errors. We can't satisfy this bound in
|
||||
// `webext_storage` because `nsresult` is a Gecko-only type. We could try
|
||||
// to reduce the boilerplate by declaring an `AsBridgedEngine` trait, with
|
||||
// associated types for `Error` and `Engine`, but then we run into...
|
||||
// 2. `Store::bridged_engine()` returns a type with a lifetime parameter,
|
||||
// because the engine can't outlive the store. But we can't represent that
|
||||
// in our `AsBridgedEngine` trait without an associated type constructor,
|
||||
// which Rust doesn't support yet (rust-lang/rfcs#1598).
|
||||
// 3. Related to (2), our store is lazily initialized behind a mutex, so
|
||||
// `LazyStore::get()` returns a guard. But now, `Store::bridged_engine()`
|
||||
// must return a type that lives only as long as the guard...which it
|
||||
// can't, because it doesn't know that! This is another case where
|
||||
// higher-kinded types would be helpful, so that our hypothetical
|
||||
// `AsBridgedEngine::bridged_engine()` could return either a `T<'a>` or
|
||||
// `MutexGuard<'_, T<'a>>`, but that's not possible now.
|
||||
//
|
||||
// There are workarounds for Rust's lack of HKTs, but they all introduce
|
||||
// indirection and cognitive overhead. So we do the simple thing and implement
|
||||
// `BridgedEngine`, with a bit more boilerplate.
|
||||
impl BridgedEngine for LazyStore {
|
||||
type Error = Error;
|
||||
|
||||
fn last_sync(&self) -> Result<i64> {
|
||||
Ok(self.get()?.bridged_engine().last_sync()?)
|
||||
}
|
||||
|
||||
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
|
||||
Ok(self
|
||||
.get()?
|
||||
.bridged_engine()
|
||||
.set_last_sync(last_sync_millis)?)
|
||||
}
|
||||
|
||||
fn sync_id(&self) -> Result<Option<String>> {
|
||||
Ok(self.get()?.bridged_engine().sync_id()?)
|
||||
}
|
||||
|
||||
fn reset_sync_id(&self) -> Result<String> {
|
||||
Ok(self.get()?.bridged_engine().reset_sync_id()?)
|
||||
}
|
||||
|
||||
fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String> {
|
||||
Ok(self
|
||||
.get()?
|
||||
.bridged_engine()
|
||||
.ensure_current_sync_id(new_sync_id)?)
|
||||
}
|
||||
|
||||
fn sync_started(&self) -> Result<()> {
|
||||
Ok(self.get()?.bridged_engine().sync_started()?)
|
||||
}
|
||||
|
||||
fn store_incoming(&self, envelopes: Vec<IncomingBso>) -> Result<()> {
|
||||
Ok(self.get()?.bridged_engine().store_incoming(envelopes)?)
|
||||
}
|
||||
|
||||
fn apply(&self) -> Result<ApplyResults> {
|
||||
Ok(self.get()?.bridged_engine().apply()?)
|
||||
}
|
||||
|
||||
fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()> {
|
||||
Ok(self
|
||||
.get()?
|
||||
.bridged_engine()
|
||||
.set_uploaded(server_modified_millis, ids)?)
|
||||
}
|
||||
|
||||
fn sync_finished(&self) -> Result<()> {
|
||||
Ok(self.get()?.bridged_engine().sync_finished()?)
|
||||
}
|
||||
|
||||
fn reset(&self) -> Result<()> {
|
||||
Ok(self.get()?.bridged_engine().reset()?)
|
||||
}
|
||||
|
||||
fn wipe(&self) -> Result<()> {
|
||||
Ok(self.get()?.bridged_engine().wipe()?)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the store, performing a migration if necessary.
|
||||
// The requirements for migration are, roughly:
|
||||
// * If kinto_path doesn't exist, we don't try to migrate.
|
||||
|
|
@ -207,7 +98,7 @@ impl BridgedEngine for LazyStore {
|
|||
// ignore all "read" errors from the source, but propagate "write" errors on our
|
||||
// DB - the intention is that things like corrupted source databases never fail,
|
||||
// but disk-space failures on our database does.
|
||||
fn init_store(config: &LazyStoreConfig) -> Result<Store> {
|
||||
fn init_store(config: &LazyStoreConfig) -> error::Result<Store> {
|
||||
let should_migrate = config.kinto_path.exists() && !config.path.exists();
|
||||
let store = Store::new(&config.path)?;
|
||||
if should_migrate {
|
||||
|
|
@ -222,23 +113,19 @@ fn init_store(config: &LazyStoreConfig) -> Result<Store> {
|
|||
Ok(store)
|
||||
}
|
||||
Err(e) => {
|
||||
println!("extension-storage: migration failure: {}", e);
|
||||
if let Err((store, e)) = store.close() {
|
||||
println!("extension-storage: migration failure: {e}");
|
||||
if let Err(e) = store.close() {
|
||||
// welp, this probably isn't going to end well...
|
||||
println!(
|
||||
"extension-storage: failed to close the store after migration failure: {}",
|
||||
e
|
||||
"extension-storage: failed to close the store after migration failure: {e}"
|
||||
);
|
||||
// I don't think we should hit this in this case - I guess we
|
||||
// could sleep and retry if we thought we were.
|
||||
mem::drop(store);
|
||||
}
|
||||
if let Err(e) = remove_file(&config.path) {
|
||||
// this is bad - if it happens regularly it will defeat
|
||||
// out entire migration strategy - we'll assume it
|
||||
// worked.
|
||||
// So it's desirable to make noise if this happens.
|
||||
println!("Failed to remove file after failed migration: {}", e);
|
||||
println!("Failed to remove file after failed migration: {e}");
|
||||
}
|
||||
Err(Error::MigrationFailed(e))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue