From f9abd62b340ec661e901b0689fd4987388ccc789 Mon Sep 17 00:00:00 2001 From: Lina Cambridge Date: Thu, 9 Apr 2020 15:45:37 +0000 Subject: [PATCH] Bug 1596322 - Add XPCOM bindings for Rust Sync engines. r=markh,tcsc,LougeniaBailey MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds a new crate for bridging Rust Sync engines to Desktop, and a `mozIBridgedSyncEngine` for accessing the bridge via JS. Naturally, the bridge is called Golden Gate. 😊 For more information on how to use it, please see `golden_gate/src/lib.rs`. Other changes include: * Ensuring the test Sync server uses UTF-8 for requests and responses. * Renaming `mozISyncedBookmarksMirrorLogger` to `mozIServicesLogger`, and moving it into the shared Sync interfaces. The `BridgedEngine` trait lives in its own crate, called `golden_gate_traits`, to make it easier to eventually move into a-s. `Interruptee` and `Interrupted` already exist in a-s, and are duplicated in this crate for now. Differential Revision: https://phabricator.services.mozilla.com/D65268 --HG-- extra : moz-landing-system : lando --- .../static/browser_all_files_referenced.js | 2 + services/common/tests/unit/head_helpers.js | 23 +- services/interfaces/moz.build | 17 + services/interfaces/mozIBridgedSyncEngine.idl | 149 ++++++ services/interfaces/mozIServicesLogger.idl | 24 + services/moz.build | 1 + .../unit/test_remote_settings_signatures.js | 1 - services/sync/golden_gate/Cargo.toml | 21 + services/sync/golden_gate/src/error.rs | 79 +++ services/sync/golden_gate/src/ferry.rs | 77 +++ services/sync/golden_gate/src/lib.rs | 114 ++++ services/sync/golden_gate/src/log.rs | 162 ++++++ services/sync/golden_gate/src/task.rs | 362 +++++++++++++ services/sync/golden_gate_traits/Cargo.toml | 6 + services/sync/golden_gate_traits/src/lib.rs | 204 ++++++++ services/sync/modules/bridged_engine.js | 490 ++++++++++++++++++ services/sync/modules/engines.js | 5 + services/sync/modules/engines/bookmarks.js | 5 - services/sync/moz.build | 1 + services/sync/tests/unit/head_helpers.js | 2 +- services/sync/tests/unit/head_http_server.js | 14 +- .../sync/tests/unit/test_bridged_engine.js | 265 ++++++++++ .../tests/unit/test_errorhandler_filelog.js | 1 - services/sync/tests/unit/xpcshell.ini | 1 + .../places/SyncedBookmarksMirror.jsm | 16 +- .../places/bookmark_sync/src/driver.rs | 10 +- .../places/bookmark_sync/src/merger.rs | 31 +- .../places/mozISyncedBookmarksMirror.idl | 3 +- 28 files changed, 2038 insertions(+), 48 deletions(-) create mode 100644 services/interfaces/moz.build create mode 100644 services/interfaces/mozIBridgedSyncEngine.idl create mode 100644 services/interfaces/mozIServicesLogger.idl create mode 100644 services/sync/golden_gate/Cargo.toml create mode 100644 services/sync/golden_gate/src/error.rs create mode 100644 services/sync/golden_gate/src/ferry.rs create mode 100644 services/sync/golden_gate/src/lib.rs create mode 100644 services/sync/golden_gate/src/log.rs create mode 100644 services/sync/golden_gate/src/task.rs create mode 100644 services/sync/golden_gate_traits/Cargo.toml create mode 100644 services/sync/golden_gate_traits/src/lib.rs create mode 100644 services/sync/modules/bridged_engine.js create mode 100644 services/sync/tests/unit/test_bridged_engine.js diff --git a/browser/base/content/test/static/browser_all_files_referenced.js b/browser/base/content/test/static/browser_all_files_referenced.js index f20e997344f2..d40369e05c4f 100644 --- a/browser/base/content/test/static/browser_all_files_referenced.js +++ b/browser/base/content/test/static/browser_all_files_referenced.js @@ -269,6 +269,8 @@ for (let entry of ignorableWhitelist) { if (!isDevtools) { // services/sync/modules/main.js whitelist.add("resource://services-sync/service.js"); + // services/sync/modules/bridged_engine.js + whitelist.add("resource://services-sync/bridged_engine.js"); // services/sync/modules/service.js for (let module of [ "addons.js", diff --git a/services/common/tests/unit/head_helpers.js b/services/common/tests/unit/head_helpers.js index 9bcd4ce7cd4d..40a922d96717 100644 --- a/services/common/tests/unit/head_helpers.js +++ b/services/common/tests/unit/head_helpers.js @@ -39,6 +39,7 @@ var { getTestLogger, initTestLogging } = ChromeUtils.import( var { MockRegistrar } = ChromeUtils.import( "resource://testing-common/MockRegistrar.jsm" ); +var { NetUtil } = ChromeUtils.import("resource://gre/modules/NetUtil.jsm"); function do_check_empty(obj) { do_check_attribute_count(obj, 0); @@ -137,7 +138,27 @@ function promiseStopServer(server) { * all available input is read. */ function readBytesFromInputStream(inputStream, count) { - return CommonUtils.readBytesFromInputStream(inputStream, count); + if (!count) { + count = inputStream.available(); + } + if (!count) { + return ""; + } + return NetUtil.readInputStreamToString(inputStream, count, { + charset: "UTF-8", + }); +} + +function writeBytesToOutputStream(outputStream, string) { + if (!string) { + return; + } + let converter = Cc[ + "@mozilla.org/intl/converter-output-stream;1" + ].createInstance(Ci.nsIConverterOutputStream); + converter.init(outputStream, "UTF-8"); + converter.writeString(string); + converter.close(); } /* diff --git a/services/interfaces/moz.build b/services/interfaces/moz.build new file mode 100644 index 000000000000..84aa06e4661a --- /dev/null +++ b/services/interfaces/moz.build @@ -0,0 +1,17 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +with Files('**'): + BUG_COMPONENT = ('Firefox', 'Sync') + +# Services interfaces are shared with other components (like Places and +# WebExtension storage), so we keep them in a separate folder and build them for +# all configurations, regardless of whether we build Sync. + +XPIDL_MODULE = 'services' + +XPIDL_SOURCES += [ + 'mozIBridgedSyncEngine.idl', + 'mozIServicesLogger.idl', +] diff --git a/services/interfaces/mozIBridgedSyncEngine.idl b/services/interfaces/mozIBridgedSyncEngine.idl new file mode 100644 index 000000000000..89b784cb6c23 --- /dev/null +++ b/services/interfaces/mozIBridgedSyncEngine.idl @@ -0,0 +1,149 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "mozIServicesLogger.idl" +#include "nsISupports.idl" + +interface nsICancelable; +interface nsIVariant; + +// A generic callback called with a result. Variants are automatically unboxed +// in JavaScript: for example, a `UTF8String` will be passed as a string +// argument; an `Int32` or `Int64` as a number. Methods that don't return a +// value, like `setLastSync` or `setUploaded`, will pass a `null` variant to +// `handleSuccess`. For all callback types in this file, either `handleSuccess` +// or `handleError` is guaranteed to be called once. +[scriptable, uuid(9b7dd2a3-df99-4469-9ea9-61b222098695)] +interface mozIBridgedSyncEngineCallback : nsISupports { + void handleSuccess(in nsIVariant result); + void handleError(in nsresult code, in AUTF8String message); +}; + +// A callback called after the engine applies incoming records. This is separate +// from `mozIBridgedSyncEngineCallback` because variants can't hold an +// `Array` type. +[scriptable, uuid(2776cdd5-799a-4009-b2f3-356d940a5244)] +interface mozIBridgedSyncEngineApplyCallback : nsISupports { + // Notifies Sync that the bridged engine has finished applying incoming + // records, and has outgoing records. Sync encrypts and uploads these + // records, and notifies the engine that the upload succeeded by + // calling `engine.setUploaded(uploadedOutgoingRecordIds, ...)`. + void handleSuccess(in Array outgoingRecordsAsJSON); + + // Notifies Sync that the bridged engine failed to apply the staged records. + void handleError(in nsresult code, in AUTF8String message); +}; + +// A bridged engine is implemented in native (Rust) code. It handles storage +// internally, and exposes a minimal interface for the JS Sync code to +// control it. +[scriptable, uuid(3b2b80be-c30e-4498-8065-01809cfe8d47)] +interface mozIBridgedSyncEngine : nsISupports { + // The storage version for this engine's collection. If the version in the + // server's `meta/global` record is newer than ours, we'll refuse to sync, + // since we might not understand the data; if it's older, we'll wipe the + // collection on the server, and upload our data as if on a first sync. + readonly attribute long storageVersion; + + // Wires up the Sync logging machinery to the bridged engine. This can be + // `null`, in which case any logs from the engine will be discarded. + attribute mozIServicesLogger logger; + + // Initializes the engine. Sync is guaranteed to call this method before + // any of the engine's methods. + void initialize(in mozIBridgedSyncEngineCallback callback); + + // Returns the last sync time, in milliseconds, for this engine's + // collection. This is used to build the collection URL for fetching + // incoming records, and as the initial value of the `X-I-U-S` header on + // upload. If the engine persists incoming records in a permanent (non-temp) + // table, `getLastSync` can return a "high water mark" that's the newer of + // the collection's last sync time, and the most recent record modification + // time. This avoids redownloading incoming records that were previously + // downloaded, but not applied. + void getLastSync(in mozIBridgedSyncEngineCallback callback); + + // Sets the last sync time, in milliseconds. This is used to fast-forward + // the last sync time for the engine's collection after fetching all + // records, and after each `setUploaded` call with the `X-L-M` header from + // the server. It may be called multiple times per sync. + void setLastSync(in long long lastSyncMillis, + in mozIBridgedSyncEngineCallback callback); + + // Returns the sync ID for this engine's collection. Used for testing; + // Sync only calls `ensureCurrentSyncId` and `resetSyncId`. On success, + // calls `callback.handleSuccess(in AUTF8String currentSyncId)`. + void getSyncId(in mozIBridgedSyncEngineCallback callback); + + // Generates a new sync ID for this engine, and resets all local Sync + // metadata, including the last sync time and any change flags, to start + // over as a first sync. On success, calls + // `callback.handleSuccess(newSyncId)`, where `newSyncId` is + // `AUTF8String` variant. Sync will upload the new sync ID in the + // `meta/global` record. + void resetSyncId(in mozIBridgedSyncEngineCallback callback); + + // Ensures that the local sync ID for the engine matches the sync ID for + // the collection on the server. On a mismatch, the engine can: + // 1. Reset all local Sync state, adopt `newSyncId` as the new sync ID, + // and call `callback.handleSuccess(newSyncId)`. Most engines should + // do this. + // 2. Ignore the given `newSyncId`, use its existing local sync ID + // without resetting any state, and call + // `callback.handleSuccess(existingSyncId)`. This is useful if, for + // example, the underlying database has been restored from a backup, + // and the engine would like to force a reset and first sync on all + // other devices. + // 3. Ignore the given `newSyncId`, reset all local Sync state, and + // generate a fresh sync ID, as if `resetSyncId`. This resets the + // engine's state everywhere, locally and on all other devices. + // If the callback is called with a different sync ID than `newSyncId`, + // Sync will reupload `meta/global` with the different ID. Otherwise, it + // will assume that the engine has adopted the `newSyncId`, and do nothing. + void ensureCurrentSyncId(in AUTF8String newSyncId, + in mozIBridgedSyncEngineCallback callback); + + // Stages a batch of incoming records, and calls the `callback` when + // done. This method may be called multiple times per sync, once per + // incoming batch. Flushing incoming records more often incurs more writes + // to disk, but avoids redownloading and reapplying more records if syncing + // is interrupted. Returns a "pending operation" object with a `cancel` + // method that can be used to interrupt staging. Typically, engines will + // stage incoming records in an SQLite temp table, and merge them with the + // local database when `apply` is called. + nsICancelable storeIncoming(in Array incomingRecordsAsJSON, + in mozIBridgedSyncEngineCallback callback); + + // Applies all the staged records, and calls the `callback` with + // outgoing records to upload. This will always be called after + // `storeIncoming`, and only once per sync. Application should be atomic: + // either all incoming records apply successfully, or none. + nsICancelable apply(in mozIBridgedSyncEngineApplyCallback callback); + + // Notifies the engine that Sync successfully uploaded the records with the + // given IDs. This method may be called multiple times per sync, once per + // batch upload. This will always be called after `apply`. + nsICancelable setUploaded(in long long newTimestampMillis, + in Array uploadedIds, + in mozIBridgedSyncEngineCallback callback); + + // Notifies the engine that syncing has finished, and the engine shouldn't + // expect any more `setUploaded` calls. At this point, any outgoing records + // that weren't passed to `setUploaded` should be assumed failed. This is + // guaranteed to be called even if the sync fails. This will only be called + // once per sync. + nsICancelable syncFinished(in mozIBridgedSyncEngineCallback callback); + + // Resets all local Sync metadata, including the sync ID, last sync time, + // and any change flags, but preserves all data. After a reset, the engine will + // sync as if for the first time. + void reset(in mozIBridgedSyncEngineCallback callback); + + // Erases all locally stored data and metadata for this engine. + void wipe(in mozIBridgedSyncEngineCallback callback); + + // Tears down the engine, including closing any connections. Sync calls this + // method when an engine is disabled. + void finalize(in mozIBridgedSyncEngineCallback callback); +}; diff --git a/services/interfaces/mozIServicesLogger.idl b/services/interfaces/mozIServicesLogger.idl new file mode 100644 index 000000000000..54db045830c0 --- /dev/null +++ b/services/interfaces/mozIServicesLogger.idl @@ -0,0 +1,24 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "nsISupports.idl" + +// Adapts a `Log.jsm` logger so that it can be used from native (Rust) code. +// The synced bookmarks mirror and bridged engines implement this interface +// to hook in to the services `LogManager` infrastructure. +[scriptable, uuid(c92bfe0d-50b7-4a7f-9686-fe5335a696b9)] +interface mozIServicesLogger : nsISupports { + const short LEVEL_OFF = 0; + const short LEVEL_ERROR = 1; + const short LEVEL_WARN = 2; + const short LEVEL_DEBUG = 3; + const short LEVEL_TRACE = 4; + + attribute short maxLevel; + + void error(in AString message); + void warn(in AString message); + void debug(in AString message); + void trace(in AString message); +}; diff --git a/services/moz.build b/services/moz.build index e57f8caca051..2bd6177a2cf8 100644 --- a/services/moz.build +++ b/services/moz.build @@ -10,6 +10,7 @@ with Files('moz.build'): DIRS += [ 'common', 'crypto', + 'interfaces', 'settings', ] diff --git a/services/settings/test/unit/test_remote_settings_signatures.js b/services/settings/test/unit/test_remote_settings_signatures.js index 514fc302caef..0f51b27e43b1 100644 --- a/services/settings/test/unit/test_remote_settings_signatures.js +++ b/services/settings/test/unit/test_remote_settings_signatures.js @@ -3,7 +3,6 @@ const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm"); -const { NetUtil } = ChromeUtils.import("resource://gre/modules/NetUtil.jsm"); const { RemoteSettings } = ChromeUtils.import( "resource://services-settings/remote-settings.js" ); diff --git a/services/sync/golden_gate/Cargo.toml b/services/sync/golden_gate/Cargo.toml new file mode 100644 index 000000000000..a948c8e74b86 --- /dev/null +++ b/services/sync/golden_gate/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "golden_gate" +description = "A bridge for wiring up Sync engines implemented in Rust" +version = "0.1.0" +authors = ["The Firefox Sync Developers "] +edition = "2018" + +[dependencies] +atomic_refcell = "0.1" +cstr = "0.1" +log = "0.4" +golden_gate_traits = { path = "../golden_gate_traits" } +moz_task = { path = "../../../xpcom/rust/moz_task" } +nserror = { path = "../../../xpcom/rust/nserror" } +nsstring = { path = "../../../xpcom/rust/nsstring" } +storage_variant = { path = "../../../storage/variant" } +xpcom = { path = "../../../xpcom/rust/xpcom" } + +[dependencies.thin-vec] +version = "0.1.0" +features = ["gecko-ffi"] diff --git a/services/sync/golden_gate/src/error.rs b/services/sync/golden_gate/src/error.rs new file mode 100644 index 000000000000..121fc4be8b58 --- /dev/null +++ b/services/sync/golden_gate/src/error.rs @@ -0,0 +1,79 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{error, fmt, result, str::Utf8Error}; + +use nserror::{nsresult, NS_ERROR_INVALID_ARG, NS_ERROR_UNEXPECTED}; + +/// A specialized `Result` type for Golden Gate. +pub type Result = result::Result; + +/// The error type for Golden Gate errors. +#[derive(Debug)] +pub enum Error { + /// A wrapped XPCOM error. + Nsresult(nsresult), + + /// A ferry didn't run on the background task queue. + DidNotRun(&'static str), + + /// A Gecko string couldn't be converted to UTF-8. + MalformedString(Box), +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + Error::MalformedString(error) => Some(error.as_ref()), + _ => None, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Nsresult(result) => write!(f, "Operation failed with {}", result.error_name()), + Error::DidNotRun(what) => write!(f, "Failed to run `{}` on background thread", what), + Error::MalformedString(error) => error.fmt(f), + } + } +} + +impl From for Error { + fn from(result: nsresult) -> Error { + Error::Nsresult(result) + } +} + +impl From for Error { + fn from(error: Utf8Error) -> Error { + Error::MalformedString(error.into()) + } +} + +impl From for nsresult { + fn from(error: Error) -> nsresult { + match error { + Error::DidNotRun(_) => NS_ERROR_UNEXPECTED, + Error::Nsresult(result) => result, + Error::MalformedString(_) => NS_ERROR_INVALID_ARG, + } + } +} + +/// A trait that constrains the type of `BridgedEngine::Error`, such that it can +/// be used as a trait bound for ferries and tasks. `BridgedEngine` doesn't +/// constrain its associated `Error` type, but we must, so that we can return +/// Golden Gate errors alongside `BridgedEngine::Error`s, and pass their +/// result codes and messages to `mozIBridgedSyncEngine*Callback::HandleError`. +/// Since tasks share error results between the main and background threads, +/// errors must also be `Send + Sync`. +/// +/// This would be cleaner to express as a trait alias, but those aren't stable +/// yet (see rust-lang/rust#41517). Instead, we define a trait with no methods, +/// and a blanket implementation for its supertraits. +pub trait BridgedError: From + Into + fmt::Display + Send + Sync {} + +impl BridgedError for T where T: From + Into + fmt::Display + Send + Sync {} diff --git a/services/sync/golden_gate/src/ferry.rs b/services/sync/golden_gate/src/ferry.rs new file mode 100644 index 000000000000..07796cb81eb4 --- /dev/null +++ b/services/sync/golden_gate/src/ferry.rs @@ -0,0 +1,77 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::sync::Arc; + +use nsstring::nsCString; +use storage_variant::VariantType; +use xpcom::{interfaces::nsIVariant, RefPtr}; + +/// An operation that runs on the background thread, and optionally passes a +/// result to its callback. +pub enum Ferry { + Initialize, + LastSync, + SetLastSync(i64), + SyncId, + ResetSyncId, + EnsureCurrentSyncId(String), + StoreIncoming(Vec, Arc), + SetUploaded(i64, Vec, Arc), + SyncFinished(Arc), + Reset, + Wipe, + Finalize, +} + +impl Ferry { + /// Returns the operation name for debugging and labeling the task + /// runnable. + pub fn name(&self) -> &'static str { + match self { + Ferry::Initialize => concat!(module_path!(), "initialize"), + Ferry::LastSync => concat!(module_path!(), "getLastSync"), + Ferry::SetLastSync(_) => concat!(module_path!(), "setLastSync"), + Ferry::SyncId => concat!(module_path!(), "getSyncId"), + Ferry::ResetSyncId => concat!(module_path!(), "resetSyncId"), + Ferry::EnsureCurrentSyncId(_) => concat!(module_path!(), "ensureCurrentSyncId"), + Ferry::StoreIncoming { .. } => concat!(module_path!(), "storeIncoming"), + Ferry::SetUploaded { .. } => concat!(module_path!(), "setUploaded"), + Ferry::SyncFinished(_) => concat!(module_path!(), "sync"), + Ferry::Reset => concat!(module_path!(), "reset"), + Ferry::Wipe => concat!(module_path!(), "wipe"), + Ferry::Finalize => concat!(module_path!(), "finalize"), + } + } +} + +/// The result of a ferry task, sent from the background thread back to the +/// main thread. Results are converted to variants, and passed as arguments to +/// `mozIBridgedSyncEngineCallback`s. +pub enum FerryResult { + LastSync(i64), + SyncId(Option), + AssignedSyncId(String), + Null, +} + +impl From<()> for FerryResult { + fn from(_: ()) -> FerryResult { + FerryResult::Null + } +} + +impl FerryResult { + /// Converts the result to an `nsIVariant` that can be passed as an + /// argument to `callback.handleResult()`. + pub fn into_variant(self) -> RefPtr { + match self { + FerryResult::LastSync(v) => v.into_variant(), + FerryResult::SyncId(Some(v)) => nsCString::from(v).into_variant(), + FerryResult::SyncId(None) => ().into_variant(), + FerryResult::AssignedSyncId(v) => nsCString::from(v).into_variant(), + FerryResult::Null => ().into_variant(), + } + } +} diff --git a/services/sync/golden_gate/src/lib.rs b/services/sync/golden_gate/src/lib.rs new file mode 100644 index 000000000000..0920e84d05d0 --- /dev/null +++ b/services/sync/golden_gate/src/lib.rs @@ -0,0 +1,114 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! **Golden Gate** 🌉 is a crate for bridging Desktop Sync to our suite of +//! Rust sync and storage components. It connects Sync's `BridgedEngine` class +//! to the Rust `BridgedEngine` trait via the `mozIBridgedSyncEngine` XPCOM +//! interface. +//! +//! Due to limitations in implementing XPCOM interfaces for generic types, +//! Golden Gate doesn't implement `mozIBridgedSyncEngine` directly. Instead, +//! it provides helpers, called "ferries", for passing Sync records between +//! JavaScript and Rust. The ferries also handle threading and type +//! conversions. +//! +//! Here's a step-by-step guide for adding a new Rust Sync engine to Firefox. +//! +//! ## Step 1: Create your (XPCOM) bridge +//! +//! In your consuming crate, define a type for your `mozIBridgedSyncEngine` +//! implementation. We'll call this type the **brige**. The bridge is +//! responsible for exposing your Sync engine to XPIDL [^1], in a way that lets +//! JavaScript call it. +//! +//! For your bridge type, you'll need to declare an `Init` struct with the +//! `#[derive(xpcom)]` and `#[xpimplements(mozIBridgedSyncEngine)]` attributes, +//! then define `xpcom_method!()` stubs for the `mozIBridgedSyncEngine` methods. +//! For more details about implementing XPCOM methods in Rust, check out the +//! docs in `xpcom/rust/xpcom/src/method.rs`. +//! +//! You'll also need to add an entry for your bridge type to `components.conf`, +//! and define C++ and Rust constructors for it, so that JavaScript code can +//! create instances of it. Check out `NS_NewWebExtStorage` (and, in C++, +//! `mozilla::extensions::storageapi::NewWebExtStorage`) and +//! `NS_NewSyncedBookmarksMerger` (`mozilla::places::NewSyncedBookmarksMerger` +//! in C++) for how to do this. +//! +//! [^1]: You can think of XPIDL as a souped-up C FFI, with richer types and a +//! degree of type safety. +//! +//! ## Step 2: Add a background task queue to your bridge +//! +//! A task queue lets your engine do I/O, merging, and other syncing tasks on a +//! background thread pool. This is important because database reads and writes +//! can take an unpredictable amount of time. Doing these on the main thread can +//! cause jank, and, in the worst case, lock up the browser UI for seconds at a +//! time. +//! +//! The `moz_task` crate provides a `create_background_task_queue` function to +//! do this. Once you have a queue, you can use it to call into your Rust +//! engine. Golden Gate takes care of ferrying arguments back and forth across +//! the thread boundary. +//! +//! Since it's a queue, ferries arrive in the order they're scheduled, so +//! your engine's `store_incoming` method will always be called before `apply`, +//! which is likewise called before `set_uploaded`. The thread manager scales +//! the pool for you; you don't need to create or manage your own threads. +//! +//! ## Step 3: Create your Rust engine +//! +//! Next, you'll need to implement the Rust side of the bridge. This is a type +//! that implements the `BridgedEngine` trait. +//! +//! Bridged engines handle storing incoming Sync records, merging changes, +//! resolving conflicts, and fetching outgoing records for upload. Under the +//! hood, your engine will hold either a database connection directly, or +//! another object that does. +//! +//! Although outside the scope of Golden Gate, your engine will also likely +//! expose a data storage API, for fetching, updating, and deleting items +//! locally. Golden Gate provides the syncing layer on top of this local store. +//! +//! A `BridgedEngine` itself doesn't need to be `Send` or `Sync`, but the +//! ferries require both, since they're calling into your bridge on the +//! background task queue. +//! +//! In practice, this means your bridge will need to hold a thread-safe owned +//! reference to the engine, via `Arc>`. In fact, this +//! pattern is so common that Golden Gate implements `BridgedEngine` for any +//! `Mutex`, which automatically locks the mutex before calling +//! into the engine. +//! +//! ## Step 4: Connect the bridge to the JavaScript and Rust sides +//! +//! On the JavaScript side, you'll need to subclass Sync's `BridgedEngine` +//! class, and give it a handle to your XPCOM bridge. The base class has all the +//! machinery for hooking up any `mozIBridgedSyncEngine` implementation so that +//! Sync can drive it. +//! +//! On the Rust side, each `mozIBridgedSyncEngine` method should create a +//! Golden Gate ferry, and dispatch it to the background task queue. The +//! ferries correspond to the method names. For example, `ensureCurrentSyncId` +//! should create a `Ferry::ensure_current_sync_id(...)`; `storeIncoming`, a +//! `Ferry::store_incoming(...)`; and so on. This is mostly boilerplate. +//! +//! And that's it! Each ferry will, in turn, call into your Rust +//! `BridgedEngine`, and send the results back to JavaScript. +//! +//! For an example of how all this works, including exposing a storage (not +//! just syncing!) API to JS via XPIDL, check out `webext_storage::Bridge` for +//! the `storage.sync` API! + +#[macro_use] +extern crate cstr; + +pub mod error; +mod ferry; +pub mod log; +pub mod task; + +pub use crate::log::LogSink; +pub use error::{Error, Result}; +pub use golden_gate_traits::{BridgedEngine, Interrupted, Interruptee}; +pub use task::{ApplyTask, FerryTask}; diff --git a/services/sync/golden_gate/src/log.rs b/services/sync/golden_gate/src/log.rs new file mode 100644 index 000000000000..e29092900ae2 --- /dev/null +++ b/services/sync/golden_gate/src/log.rs @@ -0,0 +1,162 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::fmt::{self, Write}; + +use log::{Level, LevelFilter, Log, Metadata, Record}; +use moz_task::{Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; +use nserror::nsresult; +use nsstring::nsString; +use xpcom::{interfaces::mozIServicesLogger, RefPtr}; + +pub struct LogSink { + pub max_level: LevelFilter, + logger: Option>, +} + +impl Default for LogSink { + fn default() -> Self { + LogSink { + max_level: LevelFilter::Off, + logger: None, + } + } +} + +impl LogSink { + /// Creates a log sink that adapts the Rust `log` crate to the Sync + /// `Log.jsm` logger. + /// + /// This is copied from `bookmark_sync::Logger`. It would be nice to share + /// these, but, for now, we've just duplicated it to make prototyping + /// easier. + #[inline] + pub fn new(max_level: LevelFilter, logger: ThreadPtrHandle) -> LogSink { + LogSink { + max_level, + logger: Some(logger), + } + } + + /// Creates a log sink using the given Services `logger` as the + /// underlying implementation. The `logger` will always be called + /// asynchronously on its owning thread; it doesn't need to be + /// thread-safe. + pub fn with_logger(logger: Option<&mozIServicesLogger>) -> Result { + Ok(if let Some(logger) = logger { + // Fetch the maximum log level while we're on the main thread, so + // that `LogSink::enabled()` can check it while on the background + // thread. Otherwise, we'd need to dispatch a `LogTask` for every + // log message, only to discard most of them when the task calls + // into the logger on the main thread. + let mut raw_max_level = 0i16; + let rv = unsafe { logger.GetMaxLevel(&mut raw_max_level) }; + let max_level = if rv.succeeded() { + match raw_max_level as i64 { + mozIServicesLogger::LEVEL_ERROR => LevelFilter::Error, + mozIServicesLogger::LEVEL_WARN => LevelFilter::Warn, + mozIServicesLogger::LEVEL_DEBUG => LevelFilter::Debug, + mozIServicesLogger::LEVEL_TRACE => LevelFilter::Trace, + _ => LevelFilter::Off, + } + } else { + LevelFilter::Off + }; + LogSink::new( + max_level, + ThreadPtrHolder::new(cstr!("mozIServicesLogger"), RefPtr::new(logger))?, + ) + } else { + LogSink::default() + }) + } + + /// Returns a reference to the underlying `mozIServicesLogger`. + pub fn logger(&self) -> Option<&mozIServicesLogger> { + self.logger.as_ref().and_then(|l| l.get()) + } + + /// Logs a message to the Sync logger, if one is set. This would be better + /// implemented as a macro, as Dogear does, so that we can pass variadic + /// arguments without manually invoking `fmt_args!()` every time we want + /// to log a message. + /// + /// The `log` crate's macros aren't suitable here, because those log to the + /// global logger. However, we don't want to set the global logger in our + /// crate, because that will log _everything_ that uses the Rust `log` crate + /// to the Sync logs, including WebRender and audio logging. + pub fn debug(&self, args: fmt::Arguments) { + let meta = Metadata::builder() + .level(Level::Debug) + .target(module_path!()) + .build(); + if self.enabled(&meta) { + self.log(&Record::builder().args(args).metadata(meta).build()); + } + } +} + +impl Log for LogSink { + #[inline] + fn enabled(&self, meta: &Metadata) -> bool { + self.logger.is_some() && meta.level() <= self.max_level + } + + fn log(&self, record: &Record) { + if !self.enabled(record.metadata()) { + return; + } + if let Some(logger) = &self.logger { + let mut message = nsString::new(); + match write!(message, "{}", record.args()) { + Ok(_) => { + let task = LogTask { + logger: logger.clone(), + level: record.metadata().level(), + message, + }; + let _ = + TaskRunnable::new("extension_storage_sync::Logger::log", Box::new(task)) + .and_then(|r| r.dispatch(logger.owning_thread())); + } + Err(_) => {} + } + } + } + + fn flush(&self) {} +} + +/// Logs a message to the mirror logger. This task is created on the background +/// thread queue, and dispatched to the main thread. +struct LogTask { + logger: ThreadPtrHandle, + level: Level, + message: nsString, +} + +impl Task for LogTask { + fn run(&self) { + let logger = self.logger.get().unwrap(); + match self.level { + Level::Error => unsafe { + logger.Error(&*self.message); + }, + Level::Warn => unsafe { + logger.Warn(&*self.message); + }, + Level::Debug => unsafe { + logger.Debug(&*self.message); + }, + Level::Trace => unsafe { + logger.Trace(&*self.message); + }, + _ => {} + } + } + + fn done(&self) -> Result<(), nsresult> { + Ok(()) + } +} diff --git a/services/sync/golden_gate/src/task.rs b/services/sync/golden_gate/src/task.rs new file mode 100644 index 000000000000..b055c8997072 --- /dev/null +++ b/services/sync/golden_gate/src/task.rs @@ -0,0 +1,362 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{fmt::Write, mem, sync::Arc}; + +use atomic_refcell::AtomicRefCell; +use golden_gate_traits::{BridgedEngine, Interruptee}; +use moz_task::{DispatchOptions, Task, TaskRunnable, ThreadPtrHandle, ThreadPtrHolder}; +use nserror::nsresult; +use nsstring::{nsACString, nsCString}; +use thin_vec::ThinVec; +use xpcom::{ + interfaces::{ + mozIBridgedSyncEngineApplyCallback, mozIBridgedSyncEngineCallback, nsIEventTarget, + }, + RefPtr, +}; + +use crate::error::{self, BridgedError, Error}; +use crate::ferry::{Ferry, FerryResult}; + +/// A ferry task sends (or ferries) an operation to a bridged engine on a +/// background thread or task queue, and ferries back an optional result to +/// a callback. +pub struct FerryTask { + engine: Arc, + ferry: Ferry, + callback: ThreadPtrHandle, + result: AtomicRefCell>, +} + +impl FerryTask +where + N: ?Sized + BridgedEngine + Send + Sync + 'static, + S: Interruptee + Send + Sync + 'static, + N::Error: BridgedError, +{ + /// Creates a task to initialize the engine. + #[inline] + pub fn for_initialize( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::Initialize, callback) + } + + /// Creates a task to fetch the engine's last sync time, in milliseconds. + #[inline] + pub fn for_last_sync( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::LastSync, callback) + } + + /// Creates a task to set the engine's last sync time, in milliseconds. + #[inline] + pub fn for_set_last_sync( + engine: &Arc, + last_sync_millis: i64, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::SetLastSync(last_sync_millis), callback) + } + + /// Creates a task to fetch the engine's sync ID. + #[inline] + pub fn for_sync_id( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::SyncId, callback) + } + + /// Creates a task to reset the engine's sync ID and all its local Sync + /// metadata. + #[inline] + pub fn for_reset_sync_id( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::ResetSyncId, callback) + } + + /// Creates a task to compare the bridged engine's local sync ID with + /// the `new_sync_id` from `meta/global`, and ferry back the final sync ID + /// to use. + #[inline] + pub fn for_ensure_current_sync_id( + engine: &Arc, + new_sync_id: &nsACString, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry( + engine, + Ferry::EnsureCurrentSyncId(std::str::from_utf8(new_sync_id)?.into()), + callback, + ) + } + + /// Creates a task to store incoming records. + pub fn for_store_incoming( + engine: &Arc, + incoming_cleartexts: &[nsCString], + signal: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + let incoming_cleartexts = incoming_cleartexts.iter().try_fold( + Vec::with_capacity(incoming_cleartexts.len()), + |mut cleartexts, cleartext| -> error::Result<_> { + // We need to clone the string for the task to take ownership + // of it, anyway; might as well convert to a Rust string while + // we're here. + cleartexts.push(std::str::from_utf8(&*cleartext)?.into()); + Ok(cleartexts) + }, + )?; + Self::with_ferry( + engine, + Ferry::StoreIncoming(incoming_cleartexts, Arc::clone(signal)), + callback, + ) + } + + /// Creates a task to mark a subset of outgoing records as uploaded. This + /// may be called multiple times per sync, or not at all if there are no + /// records to upload. + pub fn for_set_uploaded( + engine: &Arc, + server_modified_millis: i64, + uploaded_ids: &[nsCString], + signal: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + let uploaded_ids = uploaded_ids.iter().try_fold( + Vec::with_capacity(uploaded_ids.len()), + |mut ids, id| -> error::Result<_> { + ids.push(std::str::from_utf8(&*id)?.into()); + Ok(ids) + }, + )?; + Self::with_ferry( + engine, + Ferry::SetUploaded(server_modified_millis, uploaded_ids, Arc::clone(signal)), + callback, + ) + } + + /// Creates a task to signal that all records have been uploaded, and + /// the engine has been synced. This is called even if there were no + /// records uploaded. + #[inline] + pub fn for_sync_finished( + engine: &Arc, + signal: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::SyncFinished(Arc::clone(signal)), callback) + } + + /// Creates a task to reset all local Sync state for the engine, without + /// erasing user data. + #[inline] + pub fn for_reset( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::Reset, callback) + } + + /// Creates a task to erase all local user data for the engine. + #[inline] + pub fn for_wipe( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::Wipe, callback) + } + + /// Creates a task to tear down the engine. + #[inline] + pub fn for_finalize( + engine: &Arc, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + Self::with_ferry(engine, Ferry::Finalize, callback) + } + + /// Creates a task for a ferry. The `callback` is bound to the current + /// thread, and will be called once, after the ferry returns from the + /// background thread. + fn with_ferry( + engine: &Arc, + ferry: Ferry, + callback: &mozIBridgedSyncEngineCallback, + ) -> error::Result> { + let name = ferry.name(); + Ok(FerryTask { + engine: Arc::clone(engine), + ferry, + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(name).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), Error> { + let runnable = TaskRunnable::new(self.ferry.name(), Box::new(self))?; + // `may_block` schedules the task on the I/O thread pool, since we + // expect most operations to wait on I/O. + runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?; + Ok(()) + } +} + +impl Task for FerryTask +where + N: ?Sized + BridgedEngine, + S: Interruptee, + N::Error: BridgedError, +{ + fn run(&self) { + *self.result.borrow_mut() = match &self.ferry { + Ferry::Initialize => self.engine.initialize().map(FerryResult::from), + Ferry::LastSync => self.engine.last_sync().map(FerryResult::LastSync), + Ferry::SetLastSync(last_sync_millis) => self + .engine + .set_last_sync(*last_sync_millis) + .map(FerryResult::from), + Ferry::SyncId => self.engine.sync_id().map(FerryResult::SyncId), + Ferry::ResetSyncId => self.engine.reset_sync_id().map(FerryResult::AssignedSyncId), + Ferry::EnsureCurrentSyncId(new_sync_id) => self + .engine + .ensure_current_sync_id(&*new_sync_id) + .map(FerryResult::AssignedSyncId), + Ferry::StoreIncoming(incoming_cleartexts, signal) => self + .engine + .store_incoming(incoming_cleartexts.as_slice(), signal.as_ref()) + .map(FerryResult::from), + Ferry::SetUploaded(server_modified_millis, uploaded_ids, signal) => self + .engine + .set_uploaded( + *server_modified_millis, + uploaded_ids.as_slice(), + signal.as_ref(), + ) + .map(FerryResult::from), + Ferry::SyncFinished(signal) => self + .engine + .sync_finished(signal.as_ref()) + .map(FerryResult::from), + Ferry::Reset => self.engine.reset().map(FerryResult::from), + Ferry::Wipe => self.engine.wipe().map(FerryResult::from), + Ferry::Finalize => self.engine.finalize().map(FerryResult::from), + }; + } + + fn done(&self) -> Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(self.ferry.name()).into()), + ) { + Ok(result) => unsafe { callback.HandleSuccess(result.into_variant().coerce()) }, + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{}", err).unwrap(); + unsafe { callback.HandleError(err.into(), &*message) } + } + } + .to_result() + } +} + +/// An apply task ferries incoming records to an engine on a background +/// thread, and ferries back records to upload. It's separate from +/// `FerryTask` because its callback type is different. +pub struct ApplyTask { + engine: Arc, + signal: Arc, + callback: ThreadPtrHandle, + result: AtomicRefCell, N::Error>>, +} + +impl ApplyTask +where + N: ?Sized + BridgedEngine, +{ + /// Returns the task name for debugging. + pub fn name() -> &'static str { + concat!(module_path!(), "apply") + } +} + +impl ApplyTask +where + N: ?Sized + BridgedEngine + Send + Sync + 'static, + S: Interruptee + Send + Sync + 'static, + N::Error: BridgedError, +{ + /// Creates a task. The `callback` is bound to the current thread, and will + /// be called once, after the records are applied on the background thread. + pub fn new( + engine: &Arc, + signal: &Arc, + callback: &mozIBridgedSyncEngineApplyCallback, + ) -> error::Result> { + Ok(ApplyTask { + engine: Arc::clone(engine), + signal: Arc::clone(signal), + callback: ThreadPtrHolder::new( + cstr!("mozIBridgedSyncEngineApplyCallback"), + RefPtr::new(callback), + )?, + result: AtomicRefCell::new(Err(Error::DidNotRun(Self::name()).into())), + }) + } + + /// Dispatches the task to the given thread `target`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), Error> { + let runnable = TaskRunnable::new(Self::name(), Box::new(self))?; + runnable.dispatch_with_options(target, DispatchOptions::default().may_block(true))?; + Ok(()) + } +} + +impl Task for ApplyTask +where + N: ?Sized + BridgedEngine, + S: Interruptee, + N::Error: BridgedError, +{ + fn run(&self) { + *self.result.borrow_mut() = self.engine.apply(self.signal.as_ref()); + } + + fn done(&self) -> Result<(), nsresult> { + let callback = self.callback.get().unwrap(); + match mem::replace( + &mut *self.result.borrow_mut(), + Err(Error::DidNotRun(Self::name()).into()), + ) { + Ok(outgoing) => { + let result = outgoing + .into_iter() + .map(nsCString::from) + .collect::>(); + unsafe { callback.HandleSuccess(&result) } + } + Err(err) => { + let mut message = nsCString::new(); + write!(message, "{}", err).unwrap(); + unsafe { callback.HandleError(err.into(), &*message) } + } + } + .to_result() + } +} diff --git a/services/sync/golden_gate_traits/Cargo.toml b/services/sync/golden_gate_traits/Cargo.toml new file mode 100644 index 000000000000..a3011e28bb01 --- /dev/null +++ b/services/sync/golden_gate_traits/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "golden_gate_traits" +description = "Traits used in Golden Gate and Application Services" +version = "0.1.0" +authors = ["The Firefox Sync Developers "] +edition = "2018" diff --git a/services/sync/golden_gate_traits/src/lib.rs b/services/sync/golden_gate_traits/src/lib.rs new file mode 100644 index 000000000000..3021f8ebc4c5 --- /dev/null +++ b/services/sync/golden_gate_traits/src/lib.rs @@ -0,0 +1,204 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! These types should eventually move to the `sync15-traits` crate in +//! Application Services. They're defined in a separate crate in m-c now so +//! that Golden Gate doesn't rely on their internals. + +use std::{error::Error, fmt, sync::Mutex, sync::MutexGuard, sync::PoisonError}; + +/// A bridged Sync engine implements all the methods needed to support +/// Desktop Sync. +pub trait BridgedEngine { + /// The type returned for errors. + type Error; + + /// Initializes the engine. This is called once, when the engine is first + /// created, and guaranteed to be called before any of the other methods. + /// The default implementation does nothing. + fn initialize(&self) -> Result<(), Self::Error> { + Ok(()) + } + + /// Returns the last sync time, in milliseconds, for this engine's + /// collection. This is called before each sync, to determine the lower + /// bound for new records to fetch from the server. + fn last_sync(&self) -> Result; + + /// Sets the last sync time, in milliseconds. This is called throughout + /// the sync, to fast-forward the stored last sync time to match the + /// timestamp on the uploaded records. + fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>; + + /// Returns the sync ID for this engine's collection. This is only used in + /// tests. + fn sync_id(&self) -> Result, Self::Error>; + + /// Resets the sync ID for this engine's collection, returning the new ID. + /// As a side effect, implementations should reset all local Sync state, + /// as in `reset`. + fn reset_sync_id(&self) -> Result; + + /// Ensures that the locally stored sync ID for this engine's collection + /// matches the `new_sync_id` from the server. If the two don't match, + /// implementations should reset all local Sync state, as in `reset`. + /// This method returns the assigned sync ID, which can be either the + /// `new_sync_id`, or a different one if the engine wants to force other + /// devices to reset their Sync state for this collection the next time they + /// sync. + fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result; + + /// Stages a batch of incoming Sync records. This is called multiple + /// times per sync, once for each batch. Implementations can use the + /// signal to check if the operation was aborted, and cancel any + /// pending work. + fn store_incoming( + &self, + incoming_cleartexts: &[String], + signal: &dyn Interruptee, + ) -> Result<(), Self::Error>; + + /// Applies all staged records, reconciling changes on both sides and + /// resolving conflicts. Returns a list of records to upload. + fn apply(&self, signal: &dyn Interruptee) -> Result, Self::Error>; + + /// Indicates that the given record IDs were uploaded successfully to the + /// server. This is called multiple times per sync, once for each batch + /// upload. + fn set_uploaded( + &self, + server_modified_millis: i64, + ids: &[String], + signal: &dyn Interruptee, + ) -> Result<(), Self::Error>; + + /// Indicates that all records have been uploaded. At this point, any record + /// IDs marked for upload that haven't been passed to `set_uploaded`, can be + /// assumed to have failed: for example, because the server rejected a record + /// with an invalid TTL or sort index. + fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error>; + + /// Resets all local Sync state, including any change flags, mirrors, and + /// the last sync time, such that the next sync is treated as a first sync + /// with all new local data. Does not erase any local user data. + fn reset(&self) -> Result<(), Self::Error>; + + /// Erases all local user data for this collection, and any Sync metadata. + /// This method is destructive, and unused for most collections. + fn wipe(&self) -> Result<(), Self::Error>; + + /// Tears down the engine. The opposite of `initialize`, `finalize` is + /// called when an engine is disabled, or otherwise no longer needed. The + /// default implementation does nothing. + fn finalize(&self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// An interruptee is an abort signal used to interrupt a running task. +/// Implementations can store an interrupted flag, usually as an atomic +/// integer or Boolean, set the flag on abort, and have +/// `Interruptee::was_interrupted` return the flag's value. +/// +/// Although it's not required, in practice, an `Interruptee` should be +/// `Send + Sync`, so that a task running on a background task queue can be +/// interrupted from the main thread. +pub trait Interruptee { + /// Indicates if the caller signaled to interrupt. + fn was_interrupted(&self) -> bool; + + /// Returns an error if the caller signaled to abort. This helper makes it + /// easier to use the signal with the `?` operator. + fn err_if_interrupted(&self) -> Result<(), Interrupted> { + if self.was_interrupted() { + Err(Interrupted) + } else { + Ok(()) + } + } +} + +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub struct Interrupted; + +impl Error for Interrupted {} + +impl fmt::Display for Interrupted { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + "The operation was interrupted".fmt(f) + } +} + +/// A blanket implementation of `BridgedEngine` for any `Mutex`. +/// This is provided for convenience, since we expect most bridges to hold +/// their engines in an `Arc>`. +impl BridgedEngine for Mutex +where + E: BridgedEngine, + E::Error: for<'a> From>>, +{ + type Error = E::Error; + + fn initialize(&self) -> Result<(), Self::Error> { + self.lock()?.initialize() + } + + fn last_sync(&self) -> Result { + self.lock()?.last_sync() + } + + fn set_last_sync(&self, millis: i64) -> Result<(), Self::Error> { + self.lock()?.set_last_sync(millis) + } + + fn store_incoming( + &self, + incoming_cleartexts: &[String], + signal: &dyn Interruptee, + ) -> Result<(), Self::Error> { + self.lock()?.store_incoming(incoming_cleartexts, signal) + } + + fn apply(&self, signal: &dyn Interruptee) -> Result, Self::Error> { + self.lock()?.apply(signal) + } + + fn set_uploaded( + &self, + server_modified_millis: i64, + ids: &[String], + signal: &dyn Interruptee, + ) -> Result<(), Self::Error> { + self.lock()? + .set_uploaded(server_modified_millis, ids, signal) + } + + fn sync_finished(&self, signal: &dyn Interruptee) -> Result<(), Self::Error> { + self.lock()?.sync_finished(signal) + } + + fn reset(&self) -> Result<(), Self::Error> { + self.lock()?.reset() + } + + fn wipe(&self) -> Result<(), Self::Error> { + self.lock()?.wipe() + } + + fn finalize(&self) -> Result<(), Self::Error> { + self.lock()?.finalize() + } + + fn sync_id(&self) -> Result, Self::Error> { + self.lock()?.sync_id() + } + + fn reset_sync_id(&self) -> Result { + self.lock()?.reset_sync_id() + } + + fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result { + self.lock()?.ensure_current_sync_id(new_sync_id) + } +} diff --git a/services/sync/modules/bridged_engine.js b/services/sync/modules/bridged_engine.js new file mode 100644 index 000000000000..cfe16734c06c --- /dev/null +++ b/services/sync/modules/bridged_engine.js @@ -0,0 +1,490 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +const { XPCOMUtils } = ChromeUtils.import( + "resource://gre/modules/XPCOMUtils.jsm" +); +const { Changeset, SyncEngine } = ChromeUtils.import( + "resource://services-sync/engines.js" +); +const { CryptoWrapper } = ChromeUtils.import( + "resource://services-sync/record.js" +); + +XPCOMUtils.defineLazyModuleGetters(this, { + Async: "resource://services-common/async.js", + Log: "resource://gre/modules/Log.jsm", + PlacesUtils: "resource://gre/modules/PlacesUtils.jsm", +}); + +var EXPORTED_SYMBOLS = [ + "BridgedEngine", + "BridgedStore", + "BridgedTracker", + "BridgedRecord", +]; + +/** + * A stub store that keeps all decrypted records in memory. Since the interface + * we need is so minimal, this class doesn't inherit from the base `Store` + * implementation...it would take more code to override all those behaviors! + */ +class BridgedStore { + constructor(name, engine) { + if (!engine) { + throw new Error("Store must be associated with an Engine instance."); + } + this.engine = engine; + this._log = Log.repository.getLogger(`Sync.Engine.${name}.Store`); + this._batchChunkSize = 500; + } + + async applyIncomingBatch(records) { + await this.engine.initialize(); + for (let chunk of PlacesUtils.chunkArray(records, this._batchChunkSize)) { + // TODO: We can avoid parsing and re-serializing here... We also need to + // pass attributes like `modified` and `sortindex`, which are not part + // of the cleartext. + let incomingCleartexts = chunk.map(record => record.cleartextToString()); + await promisifyWithSignal( + null, + this.engine._bridge.storeIncoming, + incomingCleartexts + ); + } + // Array of failed records. + return []; + } + + async wipe() { + await this.engine.initialize(); + await promisify(this.engine._bridge.wipe); + } +} + +/** + * A stub tracker that doesn't track anything. + */ +class BridgedTracker { + constructor(name, engine) { + if (!engine) { + throw new Error("Tracker must be associated with an Engine instance."); + } + + this._log = Log.repository.getLogger(`Sync.Engine.${name}.Tracker`); + this.score = 0; + this.asyncObserver = Async.asyncObserver(this, this._log); + } + + get ignoreAll() { + return false; + } + + set ignoreAll(value) {} + + async onEngineEnabledChanged(engineEnabled) { + // ... + } + + resetScore() { + this.score = 0; + } + + start() { + // ... + } + + async stop() { + // ... + } + + async clearChangedIDs() { + // ... + } + + async finalize() { + // ... + } +} + +class BridgedRecord extends CryptoWrapper { + constructor(collection, id, type) { + super(collection, id, type); + } +} + +class BridgeError extends Error { + constructor(code, message) { + super(message); + this.name = "BridgeError"; + // TODO: We may want to use a different name for this, since errors with + // a `result` property are treated specially by telemetry, discarding the + // message...but, unlike other `nserror`s, the message is actually useful, + // and we still want to capture it. + this.result = code; + } +} + +class InterruptedError extends Error { + constructor(message) { + super(message); + this.name = "InterruptedError"; + } +} + +/** + * Adapts a `Log.jsm` logger to a `mozIServicesLogger`. This class is copied + * from `SyncedBookmarksMirror.jsm`. + */ +class LogAdapter { + constructor(log) { + this.log = log; + } + + get maxLevel() { + let level = this.log.level; + if (level <= Log.Level.All) { + return Ci.mozIServicesLogger.LEVEL_TRACE; + } + if (level <= Log.Level.Info) { + return Ci.mozIServicesLogger.LEVEL_DEBUG; + } + if (level <= Log.Level.Warn) { + return Ci.mozIServicesLogger.LEVEL_WARN; + } + if (level <= Log.Level.Error) { + return Ci.mozIServicesLogger.LEVEL_ERROR; + } + return Ci.mozIServicesLogger.LEVEL_OFF; + } + + trace(message) { + this.log.trace(message); + } + + debug(message) { + this.log.debug(message); + } + + warn(message) { + this.log.warn(message); + } + + error(message) { + this.log.error(message); + } +} + +/** + * The JavaScript side of the native bridge. This is a base class that can be + * used to wire up a Sync engine written in Rust to the existing Sync codebase, + * and have it work like any other engine. The Rust side must expose an XPCOM + * component class that implements the `mozIBridgedSyncEngine` interface. + * + * `SyncEngine` has a lot of machinery that we don't need, but makes it fairly + * easy to opt out by overriding those methods. It would be harder to + * reimplement the machinery that we _do_ need here, especially for a first cut. + * However, because of that, this class has lots of methods that do nothing, or + * return empty data. The docs above each method explain what it's overriding, + * and why. + */ +function BridgedEngine(bridge, name, service) { + SyncEngine.call(this, name, service); + + this._bridge = bridge; + this._bridge.logger = new LogAdapter(this._log); + + // The maximum amount of time that we should wait for the bridged engine + // to apply incoming records before aborting. + this._applyTimeoutMillis = 5 * 60 * 60 * 1000; // 5 minutes +} + +BridgedEngine.prototype = { + __proto__: SyncEngine.prototype, + _recordObj: BridgedRecord, + _storeObj: BridgedStore, + _trackerObj: BridgedTracker, + + _initializePromise: null, + + /** Returns the storage version for this engine. */ + get version() { + return this._bridge.storageVersion; + }, + + // Legacy engines allow sync to proceed if some records fail to upload. Since + // we've supported batch uploads on our server for a while, and we want to + // make them stricter (for example, failing the entire batch if a record can't + // be stored, instead of returning its ID in the `failed` response field), we + // require all bridged engines to opt out of partial syncs. + get allowSkippedRecord() { + return false; + }, + + /** + * Initializes the underlying Rust bridge for this engine. Once the bridge is + * ready, subsequent calls to `initialize` are no-ops. If initialization + * fails, the next call to `initialize` will try again. + * + * @throws If initializing the bridge fails. + */ + async initialize() { + if (!this._initializePromise) { + this._initializePromise = promisify(this._bridge.initialize).catch( + err => { + // We may have failed to initialize the bridge temporarily; for example, + // if its database is corrupt. Clear the promise so that subsequent + // calls to `initialize` can try to create the bridge again. + this._initializePromise = null; + throw err; + } + ); + } + return this._initializePromise; + }, + + /** + * Returns the sync ID for this engine. This is exposed for tests, but + * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`, + * not this. + * + * @returns {String?} The sync ID, or `null` if one isn't set. + */ + async getSyncID() { + await this.initialize(); + // Note that all methods on an XPCOM class instance are automatically bound, + // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`. + let syncID = await promisify(this._bridge.getSyncId); + return syncID; + }, + + async resetSyncID() { + await this._deleteServerCollection(); + let newSyncID = await this.resetLocalSyncID(); + return newSyncID; + }, + + async resetLocalSyncID() { + await this.initialize(); + let newSyncID = await promisify(this._bridge.resetSyncId); + return newSyncID; + }, + + async ensureCurrentSyncID(newSyncID) { + await this.initialize(); + let assignedSyncID = await promisify( + this._bridge.ensureCurrentSyncId, + newSyncID + ); + return assignedSyncID; + }, + + async getLastSync() { + await this.initialize(); + let lastSync = await promisify(this._bridge.getLastSync); + return lastSync; + }, + + async setLastSync(lastSyncMillis) { + await this.initialize(); + await promisify(this._bridge.setLastSync, lastSyncMillis); + }, + + /** + * Returns the initial changeset for the sync. Bridged engines handle + * reconciliation internally, so we don't know what changed until after we've + * stored and applied all incoming records. So we return an empty changeset + * here, and replace it with the real one in `_processIncoming`. + */ + async pullChanges() { + return {}; + }, + + async trackRemainingChanges() { + // TODO: Should we call `storeIncoming` here again, to write the records we + // just uploaded (that is, records in the changeset where `synced = true`) + // back to the bridged engine's mirror? Or can we rely on the engine to + // keep the records around (for example, in a temp table), and automatically + // write them back on `syncFinished`? + await this.initialize(); + await promisifyWithSignal(null, this._bridge.syncFinished); + }, + + /** + * Marks a record for a hard-`DELETE` at the end of the sync. The base method + * also removes it from the tracker, but we don't use the tracker for that, + * so we override the method to just mark. + */ + _deleteId(id) { + this._noteDeletedId(id); + }, + + /** + * Always stage incoming records, bypassing the base engine's reconciliation + * machinery. + */ + async _reconcile() { + return true; + }, + + async _processIncoming(newitems) { + await super._processIncoming(newitems); + await this.initialize(); + + // TODO: We could consider having a per-sync watchdog instead; for + // example, time out after 5 minutes total, including any network + // latency. `promisifyWithSignal` makes this flexible. + let watchdog = this._newWatchdog(); + watchdog.start(this._applyTimeoutMillis); + + try { + let outgoingRecords = await promisifyWithSignal( + watchdog.signal, + this._bridge.apply + ); + let changeset = {}; + for (let record of outgoingRecords) { + // TODO: It would be nice if we could pass the cleartext through as-is + // here, too, instead of parsing and re-wrapping for `BridgedRecord`. + let cleartext = JSON.parse(record); + changeset[cleartext.id] = { + synced: false, + cleartext, + }; + } + this._modified.replace(changeset); + } finally { + watchdog.stop(); + if (watchdog.abortReason) { + this._log.warn(`Aborting bookmark merge: ${watchdog.abortReason}`); + } + } + }, + + /** + * Notify the bridged engine that we've successfully uploaded a batch, so + * that it can update its local state. For example, if the engine uses a + * mirror and a temp table for outgoing records, it can write the uploaded + * records from the outgoing table back to the mirror. + */ + async _onRecordsWritten(succeeded, failed, serverModifiedTime) { + await this.initialize(); + await promisifyWithSignal( + null, + this._bridge.setUploaded, + serverModifiedTime, + succeeded + ); + }, + + async _createTombstone() { + throw new Error("Bridged engines don't support weak uploads"); + }, + + async _createRecord(id) { + let change = this._modified.changes[id]; + if (!change) { + throw new TypeError("Can't create record for unchanged item"); + } + let record = new this._recordObj(this.name, id); + record.cleartext = change.cleartext; + return record; + }, + + async _resetClient() { + await super._resetClient(); + await this.initialize(); + await promisify(this._bridge.reset); + }, +}; + +function transformError(code, message) { + switch (code) { + case Cr.NS_ERROR_ABORT: + return new InterruptedError(message); + + default: + return new BridgeError(code, message); + } +} + +// Converts a bridged function that takes a callback into one that returns a +// promise. +function promisify(func, ...params) { + return new Promise((resolve, reject) => { + func(...params, { + // This object implicitly implements all three callback interfaces + // (`mozIBridgedSyncEngine{Apply, Result}Callback`), because they have + // the same methods. The only difference is the type of the argument + // passed to `handleSuccess`, which doesn't matter in JS. + handleSuccess: resolve, + handleError(code, message) { + reject(transformError(code, message)); + }, + }); + }); +} + +// Like `promisify`, but takes an `AbortSignal` for cancelable +// operations. +function promisifyWithSignal(signal, func, ...params) { + if (!signal) { + return promisify(func, ...params); + } + return new Promise((resolve, reject) => { + if (signal.aborted) { + // TODO: Record more specific operation names, so we can see which + // ones get interrupted most in telemetry. + throw new InterruptedError("Interrupted before starting operation"); + } + function onAbort() { + signal.removeEventListener("abort", onAbort); + op.cancel(Cr.NS_ERROR_ABORT); + } + let op = func(...params, { + handleSuccess(result) { + signal.removeEventListener("abort", onAbort); + resolve(result); + }, + handleError(code, message) { + reject(transformError(code, message)); + }, + }); + signal.addEventListener("abort", onAbort); + }); +} + +class BridgedChangeset extends Changeset { + // Only `_reconcile` calls `getModifiedTimestamp` and `has`, and the buffered + // engine does its own reconciliation. + getModifiedTimestamp(id) { + throw new Error("Don't use timestamps to resolve bridged engine conflicts"); + } + + has(id) { + throw new Error( + "Don't use the changeset to resolve bridged engine conflicts" + ); + } + + delete(id) { + let change = this.changes[id]; + if (change) { + // Mark the change as synced without removing it from the set. Depending + // on how we implement `trackRemainingChanges`, this may not be necessary. + // It's copied from the bookmarks changeset now. + change.synced = true; + } + } + + ids() { + let results = []; + for (let id in this.changes) { + if (!this.changes[id].synced) { + results.push(id); + } + } + return results; + } +} diff --git a/services/sync/modules/engines.js b/services/sync/modules/engines.js index 62af4a7cc0f3..8810abe6f498 100644 --- a/services/sync/modules/engines.js +++ b/services/sync/modules/engines.js @@ -2130,6 +2130,11 @@ SyncEngine.prototype = { await this._toFetchStorage.finalize(); await this._previousFailedStorage.finalize(); }, + + // Returns a new watchdog. Exposed for tests. + _newWatchdog() { + return Async.watchdog(); + }, }; /** diff --git a/services/sync/modules/engines/bookmarks.js b/services/sync/modules/engines/bookmarks.js index a70160e86a67..d5eb85867c13 100644 --- a/services/sync/modules/engines/bookmarks.js +++ b/services/sync/modules/engines/bookmarks.js @@ -1016,11 +1016,6 @@ BufferedBookmarksEngine.prototype = { await super.finalize(); await this._store.finalize(); }, - - // Returns a new watchdog. Exposed for tests. - _newWatchdog() { - return Async.watchdog(); - }, }; /** diff --git a/services/sync/moz.build b/services/sync/moz.build index d252a6d4e640..98e9b9e721fd 100644 --- a/services/sync/moz.build +++ b/services/sync/moz.build @@ -20,6 +20,7 @@ EXTRA_JS_MODULES['services-sync'] += [ 'modules/addonutils.js', 'modules/bookmark_repair.js', 'modules/bookmark_validator.js', + 'modules/bridged_engine.js', 'modules/browserid_identity.js', 'modules/collection_repair.js', 'modules/collection_validator.js', diff --git a/services/sync/tests/unit/head_helpers.js b/services/sync/tests/unit/head_helpers.js index ac31379ca33e..4b6392f5a3e9 100644 --- a/services/sync/tests/unit/head_helpers.js +++ b/services/sync/tests/unit/head_helpers.js @@ -278,7 +278,7 @@ function get_sync_test_telemetry() { let ns = {}; ChromeUtils.import("resource://services-sync/telemetry.js", ns); ns.SyncTelemetry.tryRefreshDevices = function() {}; - let testEngines = ["rotary", "steam", "sterling", "catapult"]; + let testEngines = ["rotary", "steam", "sterling", "catapult", "nineties"]; for (let engineName of testEngines) { ns.SyncTelemetry.allowedEngines.add(engineName); } diff --git a/services/sync/tests/unit/head_http_server.js b/services/sync/tests/unit/head_http_server.js index 68332f366ecb..37259420a06d 100644 --- a/services/sync/tests/unit/head_http_server.js +++ b/services/sync/tests/unit/head_http_server.js @@ -52,7 +52,7 @@ function return_timestamp(request, response, timestamp) { let body = "" + timestamp; response.setHeader("X-Weave-Timestamp", body); response.setStatusLine(request.httpVersion, 200, "OK"); - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); return timestamp; } @@ -85,7 +85,7 @@ function httpd_basic_auth_handler(body, metadata, response) { response.setStatusLine(metadata.httpVersion, 401, "Unauthorized"); response.setHeader("WWW-Authenticate", 'Basic realm="secret"', false); } - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); } /* @@ -168,7 +168,7 @@ ServerWBO.prototype = { } response.setHeader("X-Weave-Timestamp", "" + new_timestamp(), false); response.setStatusLine(request.httpVersion, statusCode, status); - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); }; }, @@ -523,7 +523,7 @@ ServerCollection.prototype = { if (!options.ids) { response.setStatusLine(request.httpVersion, "400", "Bad Request"); body = "Bad Request"; - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); return; } options.ids = options.ids.split(","); @@ -587,7 +587,7 @@ ServerCollection.prototype = { response.setHeader("X-Last-Modified", "" + self.timestamp, false); response.setStatusLine(request.httpVersion, statusCode, status); - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); }; }, }; @@ -659,7 +659,7 @@ function track_collections_helper() { response.setHeader("Content-Type", "application/json"); response.setHeader("X-Weave-Timestamp", "" + new_timestamp(), false); response.setStatusLine(request.httpVersion, 200, "OK"); - response.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(response.bodyOutputStream, body); } return { @@ -956,7 +956,7 @@ SyncServer.prototype = { resp.setHeader(header, value); } resp.setHeader("X-Weave-Timestamp", "" + this.timestamp(), false); - resp.bodyOutputStream.write(body, body.length); + writeBytesToOutputStream(resp.bodyOutputStream, body); }, /** diff --git a/services/sync/tests/unit/test_bridged_engine.js b/services/sync/tests/unit/test_bridged_engine.js new file mode 100644 index 000000000000..16ab9e198fa7 --- /dev/null +++ b/services/sync/tests/unit/test_bridged_engine.js @@ -0,0 +1,265 @@ +/* Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ */ + +const { BridgedEngine } = ChromeUtils.import( + "resource://services-sync/bridged_engine.js" +); +const { Service } = ChromeUtils.import("resource://services-sync/service.js"); + +// Wraps an `object` in a proxy so that its methods are bound to it. This +// simulates how XPCOM class instances have all their methods bound. +function withBoundMethods(object) { + return new Proxy(object, { + get(target, key) { + let value = target[key]; + return typeof value == "function" ? value.bind(target) : value; + }, + }); +} + +add_task(async function test_interface() { + class TestBridge { + constructor() { + this.storageVersion = 2; + this.syncID = "syncID111111"; + this.wasInitialized = false; + this.clear(); + } + + clear() { + this.lastSyncMillis = 0; + this.incomingRecords = []; + this.uploadedIDs = []; + this.wasSynced = false; + this.wasReset = false; + this.wasWiped = false; + } + + // `mozIBridgedSyncEngine` methods. + + initialize(callback) { + ok( + !this.wasInitialized, + "Shouldn't initialize a bridged engine more than once" + ); + this.wasInitialized = true; + CommonUtils.nextTick(() => callback.handleSuccess()); + } + + getLastSync(callback) { + ok( + this.wasInitialized, + "Should initialize before getting last sync time" + ); + CommonUtils.nextTick(() => callback.handleSuccess(this.lastSyncMillis)); + } + + setLastSync(millis, callback) { + ok( + this.wasInitialized, + "Should initialize before setting last sync time" + ); + this.lastSyncMillis = millis; + CommonUtils.nextTick(() => callback.handleSuccess()); + } + + resetSyncId(callback) { + ok(this.wasInitialized, "Should initialize before resetting sync ID"); + CommonUtils.nextTick(() => callback.handleSuccess(this.syncID)); + } + + ensureCurrentSyncId(newSyncId, callback) { + ok( + this.wasInitialized, + "Should initialize before ensuring current sync ID" + ); + equal(newSyncId, this.syncID, "Local and new sync IDs should match"); + CommonUtils.nextTick(() => callback.handleSuccess(this.syncID)); + } + + storeIncoming(records, callback) { + ok( + this.wasInitialized, + "Should initialize before storing incoming records" + ); + this.incomingRecords.push(...records.map(r => JSON.parse(r))); + CommonUtils.nextTick(() => callback.handleSuccess()); + } + + apply(callback) { + ok(this.wasInitialized, "Should initialize before applying records"); + let outgoingRecords = [ + { + id: "hanson", + data: { + plants: ["seed", "flower 💐", "rose"], + canYouTell: false, + }, + }, + { + id: "sheryl-crow", + data: { + today: "winding 🛣", + tomorrow: "winding 🛣", + }, + }, + ].map(r => JSON.stringify(r)); + CommonUtils.nextTick(() => callback.handleSuccess(outgoingRecords)); + return { cancel() {} }; + } + + setUploaded(millis, ids, callback) { + ok( + this.wasInitialized, + "Should initialize before setting records as uploaded" + ); + this.uploadedIDs.push(...ids); + CommonUtils.nextTick(() => callback.handleSuccess()); + return { cancel() {} }; + } + + syncFinished(callback) { + ok( + this.wasInitialized, + "Should initialize before flagging sync as finished" + ); + this.wasSynced = true; + CommonUtils.nextTick(() => callback.handleSuccess()); + return { cancel() {} }; + } + + reset(callback) { + ok(this.wasInitialized, "Should initialize before resetting"); + this.clear(); + this.wasReset = true; + CommonUtils.nextTick(() => callback.handleSuccess()); + } + + wipe(callback) { + ok(this.wasInitialized, "Should initialize before wiping"); + this.clear(); + this.wasWiped = true; + CommonUtils.nextTick(() => callback.handleSuccess()); + } + } + + let bridge = new TestBridge(); + let engine = new BridgedEngine(withBoundMethods(bridge), "Nineties", Service); + engine.enabled = true; + + let server = await serverForFoo(engine); + try { + await SyncTestingInfrastructure(server); + + info("Add server records"); + let foo = server.user("foo"); + let collection = foo.collection("nineties"); + let now = new_timestamp(); + collection.insert( + "backstreet", + encryptPayload({ + id: "backstreet", + data: { + say: "I want it that way", + when: "never", + }, + }), + now + ); + collection.insert( + "tlc", + encryptPayload({ + id: "tlc", + data: { + forbidden: ["scrubs 🚫"], + numberAvailable: false, + }, + }), + now + 5 + ); + + info("Sync the engine"); + // Advance the last sync time to skip the Backstreet Boys... + bridge.lastSyncMillis = now + 2; + await sync_engine_and_validate_telem(engine, false); + + let metaGlobal = foo + .collection("meta") + .wbo("global") + .get(); + deepEqual( + JSON.parse(metaGlobal.payload).engines.nineties, + { + version: 2, + syncID: "syncID111111", + }, + "Should write storage version and sync ID to m/g" + ); + + greater(bridge.lastSyncMillis, 0, "Should update last sync time"); + deepEqual( + bridge.incomingRecords.sort((a, b) => a.id.localeCompare(b.id)), + [ + { + id: "tlc", + data: { + forbidden: ["scrubs 🚫"], + numberAvailable: false, + }, + }, + ], + "Should stage incoming records from server" + ); + deepEqual( + bridge.uploadedIDs.sort(), + ["hanson", "sheryl-crow"], + "Should mark new local records as uploaded" + ); + ok(bridge.wasSynced, "Should have finished sync after uploading"); + + deepEqual( + collection.keys().sort(), + ["backstreet", "hanson", "sheryl-crow", "tlc"], + "Should have all records on server" + ); + let expectedRecords = [ + { + id: "sheryl-crow", + data: { + today: "winding 🛣", + tomorrow: "winding 🛣", + }, + }, + { + id: "hanson", + data: { + plants: ["seed", "flower 💐", "rose"], + canYouTell: false, + }, + }, + ]; + for (let expected of expectedRecords) { + let actual = collection.cleartext(expected.id); + deepEqual( + actual, + expected, + `Should upload record ${expected.id} from bridged engine` + ); + } + + await engine.resetClient(); + ok(bridge.wasReset, "Should reset local storage for bridge"); + + await engine.wipeClient(); + ok(bridge.wasWiped, "Should wipe local storage for bridge"); + + await engine.resetSyncID(); + ok( + !foo.collection("nineties"), + "Should delete server collection after resetting sync ID" + ); + } finally { + await promiseStopServer(server); + await engine.finalize(); + } +}); diff --git a/services/sync/tests/unit/test_errorhandler_filelog.js b/services/sync/tests/unit/test_errorhandler_filelog.js index 8135adfd292b..7752e9deec00 100644 --- a/services/sync/tests/unit/test_errorhandler_filelog.js +++ b/services/sync/tests/unit/test_errorhandler_filelog.js @@ -5,7 +5,6 @@ const { Service } = ChromeUtils.import("resource://services-sync/service.js"); const { FileUtils } = ChromeUtils.import( "resource://gre/modules/FileUtils.jsm" ); -const { NetUtil } = ChromeUtils.import("resource://gre/modules/NetUtil.jsm"); const logsdir = FileUtils.getDir("ProfD", ["weave", "logs"], true); diff --git a/services/sync/tests/unit/xpcshell.ini b/services/sync/tests/unit/xpcshell.ini index 01f026d5d30c..3b7056e35713 100644 --- a/services/sync/tests/unit/xpcshell.ini +++ b/services/sync/tests/unit/xpcshell.ini @@ -144,6 +144,7 @@ skip-if = debug skip-if = tsan # Runs unreasonably slow on TSan, bug 1612707 requesttimeoutfactor = 4 [test_bookmark_validator.js] +[test_bridged_engine.js] [test_clients_engine.js] run-sequentially = Frequent timeouts, bug 1395148 [test_clients_escape.js] diff --git a/toolkit/components/places/SyncedBookmarksMirror.jsm b/toolkit/components/places/SyncedBookmarksMirror.jsm index 4d2ff7c50bf5..633253f03472 100644 --- a/toolkit/components/places/SyncedBookmarksMirror.jsm +++ b/toolkit/components/places/SyncedBookmarksMirror.jsm @@ -88,8 +88,8 @@ const DEFAULT_MAX_FRECENCIES_TO_RECALCULATE = 400; // Use a shared jankYielder in these functions XPCOMUtils.defineLazyGetter(this, "yieldState", () => Async.yieldState()); -/** Adapts a `Log.jsm` logger to a `mozISyncedBookmarksMirrorLogger`. */ -class MirrorLoggerAdapter { +/** Adapts a `Log.jsm` logger to a `mozIServicesLogger`. */ +class LogAdapter { constructor(log) { this.log = log; } @@ -97,18 +97,18 @@ class MirrorLoggerAdapter { get maxLevel() { let level = this.log.level; if (level <= Log.Level.All) { - return Ci.mozISyncedBookmarksMirrorLogger.LEVEL_TRACE; + return Ci.mozIServicesLogger.LEVEL_TRACE; } if (level <= Log.Level.Info) { - return Ci.mozISyncedBookmarksMirrorLogger.LEVEL_DEBUG; + return Ci.mozIServicesLogger.LEVEL_DEBUG; } if (level <= Log.Level.Warn) { - return Ci.mozISyncedBookmarksMirrorLogger.LEVEL_WARN; + return Ci.mozIServicesLogger.LEVEL_WARN; } if (level <= Log.Level.Error) { - return Ci.mozISyncedBookmarksMirrorLogger.LEVEL_ERROR; + return Ci.mozIServicesLogger.LEVEL_ERROR; } - return Ci.mozISyncedBookmarksMirrorLogger.LEVEL_OFF; + return Ci.mozIServicesLogger.LEVEL_OFF; } trace(message) { @@ -266,7 +266,7 @@ class SyncedBookmarksMirror { this.merger.db = db.unsafeRawConnection.QueryInterface( Ci.mozIStorageConnection ); - this.merger.logger = new MirrorLoggerAdapter(MirrorLog); + this.merger.logger = new LogAdapter(MirrorLog); // Automatically close the database connection on shutdown. `progress` // tracks state for shutdown hang reporting. diff --git a/toolkit/components/places/bookmark_sync/src/driver.rs b/toolkit/components/places/bookmark_sync/src/driver.rs index 2d0f52c74fac..7f7eb5bfa793 100644 --- a/toolkit/components/places/bookmark_sync/src/driver.rs +++ b/toolkit/components/places/bookmark_sync/src/driver.rs @@ -14,9 +14,7 @@ use moz_task::{Task, TaskRunnable, ThreadPtrHandle}; use nserror::nsresult; use nsstring::{nsACString, nsCString, nsString}; use storage_variant::HashPropertyBag; -use xpcom::interfaces::{ - mozISyncedBookmarksMirrorLogger, mozISyncedBookmarksMirrorProgressListener, -}; +use xpcom::interfaces::{mozIServicesLogger, mozISyncedBookmarksMirrorProgressListener}; extern "C" { fn NS_GeneratePlacesGUID(guid: *mut nsACString) -> nsresult; @@ -109,14 +107,14 @@ impl dogear::Driver for Driver { pub struct Logger { pub max_level: LevelFilter, - logger: Option>, + logger: Option>, } impl Logger { #[inline] pub fn new( max_level: LevelFilter, - logger: Option>, + logger: Option>, ) -> Logger { Logger { max_level, logger } } @@ -155,7 +153,7 @@ impl Log for Logger { /// Logs a message to the mirror logger. This task is created on the async /// thread, and dispatched to the main thread. struct LogTask { - logger: ThreadPtrHandle, + logger: ThreadPtrHandle, level: Level, message: nsString, } diff --git a/toolkit/components/places/bookmark_sync/src/merger.rs b/toolkit/components/places/bookmark_sync/src/merger.rs index 6f0ba46d795c..024df996a924 100644 --- a/toolkit/components/places/bookmark_sync/src/merger.rs +++ b/toolkit/components/places/bookmark_sync/src/merger.rs @@ -14,8 +14,8 @@ use storage::Conn; use thin_vec::ThinVec; use xpcom::{ interfaces::{ - mozIPlacesPendingOperation, mozIStorageConnection, mozISyncedBookmarksMirrorCallback, - mozISyncedBookmarksMirrorLogger, mozISyncedBookmarksMirrorProgressListener, + mozIPlacesPendingOperation, mozIServicesLogger, mozIStorageConnection, + mozISyncedBookmarksMirrorCallback, mozISyncedBookmarksMirrorProgressListener, }, RefPtr, XpCom, }; @@ -29,7 +29,7 @@ use crate::store; #[refcnt = "nonatomic"] pub struct InitSyncedBookmarksMerger { db: RefCell>, - logger: RefCell>>, + logger: RefCell>>, } impl SyncedBookmarksMerger { @@ -56,16 +56,16 @@ impl SyncedBookmarksMerger { Ok(()) } - xpcom_method!(get_logger => GetLogger() -> *const mozISyncedBookmarksMirrorLogger); - fn get_logger(&self) -> Result, nsresult> { + xpcom_method!(get_logger => GetLogger() -> *const mozIServicesLogger); + fn get_logger(&self) -> Result, nsresult> { match *self.logger.borrow() { Some(ref logger) => Ok(logger.clone()), None => Err(NS_OK), } } - xpcom_method!(set_logger => SetLogger(logger: *const mozISyncedBookmarksMirrorLogger)); - fn set_logger(&self, logger: Option<&mozISyncedBookmarksMirrorLogger>) -> Result<(), nsresult> { + xpcom_method!(set_logger => SetLogger(logger: *const mozIServicesLogger)); + fn set_logger(&self, logger: Option<&mozIServicesLogger>) -> Result<(), nsresult> { self.logger.replace(logger.map(RefPtr::new)); Ok(()) } @@ -125,7 +125,7 @@ struct MergeTask { db: Conn, controller: Arc, max_log_level: LevelFilter, - logger: Option>, + logger: Option>, local_time_millis: i64, remote_time_millis: i64, weak_uploads: Vec, @@ -138,7 +138,7 @@ impl MergeTask { fn new( db: &Conn, controller: Arc, - logger: Option>, + logger: Option>, local_time_seconds: i64, remote_time_seconds: i64, weak_uploads: Vec, @@ -152,18 +152,15 @@ impl MergeTask { Some(level) }) .map(|level| match level as i64 { - mozISyncedBookmarksMirrorLogger::LEVEL_ERROR => LevelFilter::Error, - mozISyncedBookmarksMirrorLogger::LEVEL_WARN => LevelFilter::Warn, - mozISyncedBookmarksMirrorLogger::LEVEL_DEBUG => LevelFilter::Debug, - mozISyncedBookmarksMirrorLogger::LEVEL_TRACE => LevelFilter::Trace, + mozIServicesLogger::LEVEL_ERROR => LevelFilter::Error, + mozIServicesLogger::LEVEL_WARN => LevelFilter::Warn, + mozIServicesLogger::LEVEL_DEBUG => LevelFilter::Debug, + mozIServicesLogger::LEVEL_TRACE => LevelFilter::Trace, _ => LevelFilter::Off, }) .unwrap_or(LevelFilter::Off); let logger = match logger { - Some(logger) => Some(ThreadPtrHolder::new( - cstr!("mozISyncedBookmarksMirrorLogger"), - logger, - )?), + Some(logger) => Some(ThreadPtrHolder::new(cstr!("mozIServicesLogger"), logger)?), None => None, }; let progress = callback diff --git a/toolkit/components/places/mozISyncedBookmarksMirror.idl b/toolkit/components/places/mozISyncedBookmarksMirror.idl index 9a2dce900993..8a44f137ba6d 100644 --- a/toolkit/components/places/mozISyncedBookmarksMirror.idl +++ b/toolkit/components/places/mozISyncedBookmarksMirror.idl @@ -2,6 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +#include "mozIServicesLogger.idl" #include "nsISupports.idl" interface mozIPlacesPendingOperation; @@ -78,7 +79,7 @@ interface mozISyncedBookmarksMerger : nsISupports { attribute mozIStorageConnection db; // Optional; used for logging. - attribute mozISyncedBookmarksMirrorLogger logger; + attribute mozIServicesLogger logger; // Merges the local and remote bookmark trees, applies the merged tree to // Places, and stages locally changed and reconciled items for upload. When