diff --git a/browser/base/content/browser-sync.js b/browser/base/content/browser-sync.js index 686c0c9b5ed2..82b0566e9329 100644 --- a/browser/base/content/browser-sync.js +++ b/browser/base/content/browser-sync.js @@ -645,7 +645,7 @@ var gSync = { // We are pretty confident that push helps us pick up all FxA commands, // but some users might have issues with push, so let's unblock them // by fetching the missed FxA commands on manual sync. - fxAccounts.commands.fetchMissedRemoteCommands().catch(e => { + fxAccounts.commands.pollDeviceCommands().catch(e => { console.error("Fetching missed remote commands failed.", e); }); Weave.Service.sync(); diff --git a/browser/components/BrowserGlue.jsm b/browser/components/BrowserGlue.jsm index 00723363512f..8017faf81902 100644 --- a/browser/components/BrowserGlue.jsm +++ b/browser/components/BrowserGlue.jsm @@ -2860,8 +2860,8 @@ BrowserGlue.prototype = { const firstTab = await openTab(URIs[0]); await Promise.all(URIs.slice(1).map(URI => openTab(URI))); + const deviceName = URIs[0].sender && URIs[0].sender.name; let title, body; - const deviceName = URIs[0].sender.name; const bundle = Services.strings.createBundle("chrome://browser/locale/accounts.properties"); if (URIs.length == 1) { // Due to bug 1305895, tabs from iOS may not have device information, so @@ -2885,13 +2885,15 @@ BrowserGlue.prototype = { } } else { title = bundle.GetStringFromName("multipleTabsArrivingNotification.title"); - const allSameDevice = URIs.every(URI => URI.sender.id == URIs[0].sender.id); - const unknownDevice = allSameDevice && !deviceName; + const allKnownSender = URIs.every(URI => URI.sender != null); + const allSameDevice = allKnownSender && URIs.every(URI => URI.sender.id == URIs[0].sender.id); let tabArrivingBody; - if (unknownDevice) { - tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body"; - } else if (allSameDevice) { - tabArrivingBody = "unnamedTabsArrivingNotification2.body"; + if (allSameDevice) { + if (deviceName) { + tabArrivingBody = "unnamedTabsArrivingNotification2.body"; + } else { + tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body"; + } } else { tabArrivingBody = "unnamedTabsArrivingNotificationMultiple2.body"; } diff --git a/services/fxaccounts/FxAccountsCommands.js b/services/fxaccounts/FxAccountsCommands.js index 05fff646a547..263aec8fd2b8 100644 --- a/services/fxaccounts/FxAccountsCommands.js +++ b/services/fxaccounts/FxAccountsCommands.js @@ -38,62 +38,52 @@ class FxAccountsCommands { log.info(`Payload sent to device ${device.id}.`); } - async consumeRemoteCommand(index) { + /** + * Poll and handle device commands for the current device. + * This method can be called either in response to a Push message, + * or by itself as a "commands recovery" mechanism. + * + * @param {Number} receivedIndex "Command received" push messages include + * the index of the command that triggered the message. We use it as a + * hint when we have no "last command index" stored. + */ + async pollDeviceCommands(receivedIndex = 0) { + // Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA + // servers in response to a message being received or simply scheduled in order + // to fetch missed messages. + const scheduledFetch = receivedIndex == 0; if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) { return false; } - log.info(`Consuming command with index ${index}.`); - const {messages} = await this._fetchRemoteCommands(index, 1); - if (messages.length != 1) { - log.warn(`Should have retrieved 1 and only 1 message, got ${messages.length}.`); - } - return this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => { - const {device} = await getUserData(["device"]); - if (!device) { - throw new Error("No device registration."); - } - const handledCommands = (device.handledCommands || []).concat(messages.map(m => m.index)); - await updateUserData({ - device: {...device, handledCommands}, - }); - await this._handleCommands(messages); - - // Once the handledCommands array length passes a threshold, check the - // potentially missed remote commands in order to clear it. - if (handledCommands.length > 20) { - await this.fetchMissedRemoteCommands(); - } - }); - } - - async fetchMissedRemoteCommands() { - if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) { - return false; - } - log.info(`Consuming missed commands.`); + log.info(`Polling device commands.`); await this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => { const {device} = await getUserData(["device"]); if (!device) { throw new Error("No device registration."); } - const lastCommandIndex = device.lastCommandIndex || 0; - const handledCommands = device.handledCommands || []; - handledCommands.push(lastCommandIndex); // Because the server also returns this command. - const {index, messages} = await this._fetchRemoteCommands(lastCommandIndex); - const missedMessages = messages.filter(m => !handledCommands.includes(m.index)); - await updateUserData({ - device: {...device, lastCommandIndex: index, handledCommands: []}, - }); - if (missedMessages.length) { - log.info(`Handling ${missedMessages.length} missed messages`); - Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", missedMessages.length); - await this._handleCommands(missedMessages); + // We increment lastCommandIndex by 1 because the server response includes the current index. + // If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got. + const lastCommandIndex = (device.lastCommandIndex + 1) || receivedIndex; + // We have already received this message before. + if (receivedIndex > 0 && receivedIndex < lastCommandIndex) { + return; + } + const {index, messages} = await this._fetchDeviceCommands(lastCommandIndex); + if (messages.length) { + await updateUserData({ + device: {...device, lastCommandIndex: index}, + }); + log.info(`Handling ${messages.length} messages`); + if (scheduledFetch) { + Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", messages.length); + } + await this._handleCommands(messages); } }); return true; } - async _fetchRemoteCommands(index, limit = null) { + async _fetchDeviceCommands(index, limit = null) { const userData = await this._fxAccounts.getSignedInUser(); if (!userData) { throw new Error("No user."); @@ -112,15 +102,20 @@ class FxAccountsCommands { async _handleCommands(messages) { const fxaDevices = await this._fxAccounts.getDeviceList(); + // We debounce multiple incoming tabs so we show a single notification. + const tabsReceived = []; for (const {data} of messages) { - let {command, payload, sender} = data; - if (sender) { - sender = fxaDevices.find(d => d.id == sender); + const {command, payload, sender: senderId} = data; + const sender = senderId ? fxaDevices.find(d => d.id == senderId) : null; + if (!sender) { + log.warn("Incoming command is from an unknown device (maybe disconnected?)"); } switch (command) { case COMMAND_SENDTAB: try { - await this.sendTab.handle(sender, payload); + const {title, uri} = await this.sendTab.handle(payload); + log.info(`Tab received with FxA commands: ${title} from ${sender ? sender.name : "Unknown device"}.`); + tabsReceived.push({title, uri, sender}); } catch (e) { log.error(`Error while handling incoming Send Tab payload.`, e); } @@ -129,6 +124,9 @@ class FxAccountsCommands { log.info(`Unknown command: ${command}.`); } } + if (tabsReceived.length) { + Observers.notify("fxaccounts:commands:open-uri", tabsReceived); + } } } @@ -184,22 +182,17 @@ class SendTab { } // Handle incoming send tab payload, called by FxAccountsCommands. - async handle(sender, {encrypted}) { - if (!sender) { - log.warn("Incoming tab is from an unknown device (maybe disconnected?)"); - } + async handle({encrypted}) { const bytes = await this._decrypt(encrypted); const decoder = new TextDecoder("utf8"); const data = JSON.parse(decoder.decode(bytes)); const current = data.hasOwnProperty("current") ? data.current : data.entries.length - 1; - const tabSender = { - id: sender ? sender.id : "", - name: sender ? sender.name : "", - }; const {title, url: uri} = data.entries[current]; - log.info(`Tab received with FxA commands: ${title} from ${tabSender.name}.`); - Observers.notify("fxaccounts:commands:open-uri", [{uri, title, sender: tabSender}]); + return { + title, + uri, + }; } async _encrypt(bytes, device) { diff --git a/services/fxaccounts/FxAccountsPush.jsm b/services/fxaccounts/FxAccountsPush.jsm index 3a4e3e0b5e64..456a6f7c944a 100644 --- a/services/fxaccounts/FxAccountsPush.jsm +++ b/services/fxaccounts/FxAccountsPush.jsm @@ -3,6 +3,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ const {Services} = ChromeUtils.import("resource://gre/modules/Services.jsm"); +const {Async} = ChromeUtils.import("resource://services-common/async.js"); const {FXA_PUSH_SCOPE_ACCOUNT_UPDATE, ONLOGOUT_NOTIFICATION, ON_ACCOUNT_DESTROYED_NOTIFICATION, ON_ACCOUNT_STATE_CHANGE_NOTIFICATION, ON_COLLECTION_CHANGED_NOTIFICATION, ON_COMMAND_RECEIVED_NOTIFICATION, ON_DEVICE_CONNECTED_NOTIFICATION, ON_DEVICE_DISCONNECTED_NOTIFICATION, ON_PASSWORD_CHANGED_NOTIFICATION, ON_PASSWORD_RESET_NOTIFICATION, ON_PROFILE_CHANGE_NOTIFICATION, ON_PROFILE_UPDATED_NOTIFICATION, ON_VERIFY_LOGIN_NOTIFICATION, log} = ChromeUtils.import("resource://gre/modules/FxAccountsCommon.js"); /** @@ -71,10 +72,17 @@ FxAccountsPushService.prototype = { "resource://gre/modules/FxAccounts.jsm"); } - // listen to new push messages, push changes and logout events - Services.obs.addObserver(this, this.pushService.pushTopic); - Services.obs.addObserver(this, this.pushService.subscriptionChangeTopic); - Services.obs.addObserver(this, ONLOGOUT_NOTIFICATION); + this.asyncObserver = Async.asyncObserver(this, this.log); + // We use an async observer because a device waking up can + // observe multiple "Send Tab received" push notifications at the same time. + // The way these notifications are handled is as follows: + // Read index from storage, make network request, update the index. + // You can imagine what happens when multiple calls race: we load + // the same index multiple times and receive the same exact tabs, multiple times. + // The async observer will ensure we make these network requests serially. + Services.obs.addObserver(this.asyncObserver, this.pushService.pushTopic); + Services.obs.addObserver(this.asyncObserver, this.pushService.subscriptionChangeTopic); + Services.obs.addObserver(this.asyncObserver, ONLOGOUT_NOTIFICATION); this.log.debug("FxAccountsPush initialized"); return true; @@ -103,51 +111,50 @@ FxAccountsPushService.prototype = { }); }, /** - * Standard observer interface to listen to push messages, changes and logout. + * Async observer interface to listen to push messages, changes and logout. * * @param subject * @param topic * @param data * @returns {Promise} */ - _observe(subject, topic, data) { - this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`); - switch (topic) { - case this.pushService.pushTopic: - if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) { - let message = subject.QueryInterface(Ci.nsIPushMessage); - this._onPushMessage(message); - } - break; - case this.pushService.subscriptionChangeTopic: - if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) { - this._onPushSubscriptionChange(); - } - break; - case ONLOGOUT_NOTIFICATION: - // user signed out, we need to stop polling the Push Server - this.unsubscribe().catch(err => { - this.log.error("Error during unsubscribe", err); - }); - default: - break; + async observe(subject, topic, data) { + try { + this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`); + switch (topic) { + case this.pushService.pushTopic: + if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) { + let message = subject.QueryInterface(Ci.nsIPushMessage); + await this._onPushMessage(message); + } + break; + case this.pushService.subscriptionChangeTopic: + if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) { + await this._onPushSubscriptionChange(); + } + break; + case ONLOGOUT_NOTIFICATION: + // user signed out, we need to stop polling the Push Server + try { + await this.unsubscribe(); + } catch (err) { + this.log.error("Error during unsubscribe", err); + } + default: + break; + } + } catch (err) { + this.log.error(err); } }, - /** - * Wrapper around _observe that catches errors - */ - observe(subject, topic, data) { - Promise.resolve() - .then(() => this._observe(subject, topic, data)) - .catch(err => this.log.error(err)); - }, + /** * Fired when the Push server sends a notification. * * @private * @returns {Promise} */ - _onPushMessage(message) { + async _onPushMessage(message) { this.log.trace("FxAccountsPushService _onPushMessage"); if (!message.data) { // Use the empty signal to check the verification state of the account right away @@ -159,7 +166,7 @@ FxAccountsPushService.prototype = { this.log.debug(`push command: ${payload.command}`); switch (payload.command) { case ON_COMMAND_RECEIVED_NOTIFICATION: - this.fxAccounts.commands.consumeRemoteCommand(payload.data.index); + await this.fxAccounts.commands.pollDeviceCommands(payload.data.index); break; case ON_DEVICE_CONNECTED_NOTIFICATION: Services.obs.notifyObservers(null, ON_DEVICE_CONNECTED_NOTIFICATION, payload.data.deviceName); diff --git a/services/fxaccounts/tests/xpcshell/test_commands.js b/services/fxaccounts/tests/xpcshell/test_commands.js index ba8b000a1c2f..3426d48b269c 100644 --- a/services/fxaccounts/tests/xpcshell/test_commands.js +++ b/services/fxaccounts/tests/xpcshell/test_commands.js @@ -50,15 +50,30 @@ add_task(async function test_sendtab_send() { Assert.ok(commands.invoke.calledTwice); }); -add_task(async function test_commands_fetchMissedRemoteCommands() { +add_task(async function test_commands_pollDeviceCommands_push() { + // Server state. + const remoteMessages = [ + { + index: 11, + data: {}, + }, + { + index: 12, + data: {}, + }, + ]; + const remoteIndex = 12; + + // Local state. + const pushIndexReceived = 11; const accountState = { data: { device: { - handledCommands: [8, 9, 10, 11], - lastCommandIndex: 11, + lastCommandIndex: 10, }, }, }; + const fxAccounts = { async _withCurrentAccountState(cb) { const get = () => accountState.data; @@ -67,26 +82,168 @@ add_task(async function test_commands_fetchMissedRemoteCommands() { }, }; const commands = new FxAccountsCommands(fxAccounts); - commands._fetchRemoteCommands = () => { - return { - index: 12, - messages: [ - { - index: 11, - data: {}, - }, - { - index: 12, - data: {}, - }, - ], - }; - }; - commands._handleCommands = sinon.spy(); - await commands.fetchMissedRemoteCommands(); + const mockCommands = sinon.mock(commands); + mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({ + index: remoteIndex, + messages: remoteMessages, + }); + mockCommands.expects("_handleCommands").once().withArgs(remoteMessages); + await commands.pollDeviceCommands(pushIndexReceived); - Assert.equal(accountState.data.device.handledCommands.length, 0); + mockCommands.verify(); + Assert.equal(accountState.data.device.lastCommandIndex, 12); +}); + +add_task(async function test_commands_pollDeviceCommands_push_already_fetched() { + // Local state. + const pushIndexReceived = 12; + const accountState = { + data: { + device: { + lastCommandIndex: 12, + }, + }, + }; + + const fxAccounts = { + async _withCurrentAccountState(cb) { + const get = () => accountState.data; + const set = (val) => { accountState.data = val; }; + await cb(get, set); + }, + }; + const commands = new FxAccountsCommands(fxAccounts); + const mockCommands = sinon.mock(commands); + mockCommands.expects("_fetchDeviceCommands").never(); + mockCommands.expects("_handleCommands").never(); + await commands.pollDeviceCommands(pushIndexReceived); + + mockCommands.verify(); + Assert.equal(accountState.data.device.lastCommandIndex, 12); +}); + +add_task(async function test_commands_pollDeviceCommands_push_local_state_empty() { + // Server state. + const remoteMessages = [ + { + index: 11, + data: {}, + }, + { + index: 12, + data: {}, + }, + ]; + const remoteIndex = 12; + + // Local state. + const pushIndexReceived = 11; + const accountState = { + data: { + device: {}, + }, + }; + + const fxAccounts = { + async _withCurrentAccountState(cb) { + const get = () => accountState.data; + const set = (val) => { accountState.data = val; }; + await cb(get, set); + }, + }; + const commands = new FxAccountsCommands(fxAccounts); + const mockCommands = sinon.mock(commands); + mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({ + index: remoteIndex, + messages: remoteMessages, + }); + mockCommands.expects("_handleCommands").once().withArgs(remoteMessages); + await commands.pollDeviceCommands(pushIndexReceived); + + mockCommands.verify(); + Assert.equal(accountState.data.device.lastCommandIndex, 12); +}); + +add_task(async function test_commands_pollDeviceCommands_scheduled_local() { + // Server state. + const remoteMessages = [ + { + index: 11, + data: {}, + }, + { + index: 12, + data: {}, + }, + ]; + const remoteIndex = 12; + + // Local state. + const accountState = { + data: { + device: { + lastCommandIndex: 10, + }, + }, + }; + + const fxAccounts = { + async _withCurrentAccountState(cb) { + const get = () => accountState.data; + const set = (val) => { accountState.data = val; }; + await cb(get, set); + }, + }; + const commands = new FxAccountsCommands(fxAccounts); + const mockCommands = sinon.mock(commands); + mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({ + index: remoteIndex, + messages: remoteMessages, + }); + mockCommands.expects("_handleCommands").once().withArgs(remoteMessages); + await commands.pollDeviceCommands(); + + mockCommands.verify(); + Assert.equal(accountState.data.device.lastCommandIndex, 12); +}); + +add_task(async function test_commands_pollDeviceCommands_scheduled_local_state_empty() { + // Server state. + const remoteMessages = [ + { + index: 11, + data: {}, + }, + { + index: 12, + data: {}, + }, + ]; + const remoteIndex = 12; + + // Local state. + const accountState = { + data: { + device: {}, + }, + }; + + const fxAccounts = { + async _withCurrentAccountState(cb) { + const get = () => accountState.data; + const set = (val) => { accountState.data = val; }; + await cb(get, set); + }, + }; + const commands = new FxAccountsCommands(fxAccounts); + const mockCommands = sinon.mock(commands); + mockCommands.expects("_fetchDeviceCommands").once().withArgs(0).returns({ + index: remoteIndex, + messages: remoteMessages, + }); + mockCommands.expects("_handleCommands").once().withArgs(remoteMessages); + await commands.pollDeviceCommands(); + + mockCommands.verify(); Assert.equal(accountState.data.device.lastCommandIndex, 12); - const callArgs = commands._handleCommands.args[0][0]; - Assert.equal(callArgs[0].index, 12); }); diff --git a/services/fxaccounts/tests/xpcshell/test_push_service.js b/services/fxaccounts/tests/xpcshell/test_push_service.js index 046d0d5bc89b..e04b6023f61e 100644 --- a/services/fxaccounts/tests/xpcshell/test_push_service.js +++ b/services/fxaccounts/tests/xpcshell/test_push_service.js @@ -426,7 +426,7 @@ add_task(async function commandReceived() { let fxAccountsMock = {}; const promiseConsumeRemoteMessagesCalled = new Promise(res => { fxAccountsMock.commands = { - consumeRemoteCommand() { + pollDeviceCommands() { res(); }, }; diff --git a/services/sync/modules/policies.js b/services/sync/modules/policies.js index 154b6bb03c60..79c6a6d95683 100644 --- a/services/sync/modules/policies.js +++ b/services/sync/modules/policies.js @@ -550,7 +550,7 @@ SyncScheduler.prototype = { // Only fetch missed messages in a "scheduled" sync so we don't race against // the Push service reconnecting on a network link change for example. if (why == "schedule" && now >= this.missedFxACommandsLastFetch + this.missedFxACommandsFetchInterval) { - fxAccounts.commands.fetchMissedRemoteCommands().then(() => { + fxAccounts.commands.pollDeviceCommands().then(() => { this.missedFxACommandsLastFetch = now; }).catch(e => { this._log.error("Fetching missed remote commands failed.", e);