Bug 1528622 - Debounce FxA Send Tab commands. r=markh,rfkelly

Differential Revision: https://phabricator.services.mozilla.com/D21286

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Edouard Oger 2019-03-12 01:05:03 +00:00
parent 104c0bd83d
commit ff5f53617b
7 changed files with 285 additions and 126 deletions

View file

@ -645,7 +645,7 @@ var gSync = {
// We are pretty confident that push helps us pick up all FxA commands,
// but some users might have issues with push, so let's unblock them
// by fetching the missed FxA commands on manual sync.
fxAccounts.commands.fetchMissedRemoteCommands().catch(e => {
fxAccounts.commands.pollDeviceCommands().catch(e => {
console.error("Fetching missed remote commands failed.", e);
});
Weave.Service.sync();

View file

@ -2860,8 +2860,8 @@ BrowserGlue.prototype = {
const firstTab = await openTab(URIs[0]);
await Promise.all(URIs.slice(1).map(URI => openTab(URI)));
const deviceName = URIs[0].sender && URIs[0].sender.name;
let title, body;
const deviceName = URIs[0].sender.name;
const bundle = Services.strings.createBundle("chrome://browser/locale/accounts.properties");
if (URIs.length == 1) {
// Due to bug 1305895, tabs from iOS may not have device information, so
@ -2885,13 +2885,15 @@ BrowserGlue.prototype = {
}
} else {
title = bundle.GetStringFromName("multipleTabsArrivingNotification.title");
const allSameDevice = URIs.every(URI => URI.sender.id == URIs[0].sender.id);
const unknownDevice = allSameDevice && !deviceName;
const allKnownSender = URIs.every(URI => URI.sender != null);
const allSameDevice = allKnownSender && URIs.every(URI => URI.sender.id == URIs[0].sender.id);
let tabArrivingBody;
if (unknownDevice) {
tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body";
} else if (allSameDevice) {
tabArrivingBody = "unnamedTabsArrivingNotification2.body";
if (allSameDevice) {
if (deviceName) {
tabArrivingBody = "unnamedTabsArrivingNotification2.body";
} else {
tabArrivingBody = "unnamedTabsArrivingNotificationNoDevice.body";
}
} else {
tabArrivingBody = "unnamedTabsArrivingNotificationMultiple2.body";
}

View file

@ -38,62 +38,52 @@ class FxAccountsCommands {
log.info(`Payload sent to device ${device.id}.`);
}
async consumeRemoteCommand(index) {
/**
* Poll and handle device commands for the current device.
* This method can be called either in response to a Push message,
* or by itself as a "commands recovery" mechanism.
*
* @param {Number} receivedIndex "Command received" push messages include
* the index of the command that triggered the message. We use it as a
* hint when we have no "last command index" stored.
*/
async pollDeviceCommands(receivedIndex = 0) {
// Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA
// servers in response to a message being received or simply scheduled in order
// to fetch missed messages.
const scheduledFetch = receivedIndex == 0;
if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) {
return false;
}
log.info(`Consuming command with index ${index}.`);
const {messages} = await this._fetchRemoteCommands(index, 1);
if (messages.length != 1) {
log.warn(`Should have retrieved 1 and only 1 message, got ${messages.length}.`);
}
return this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => {
const {device} = await getUserData(["device"]);
if (!device) {
throw new Error("No device registration.");
}
const handledCommands = (device.handledCommands || []).concat(messages.map(m => m.index));
await updateUserData({
device: {...device, handledCommands},
});
await this._handleCommands(messages);
// Once the handledCommands array length passes a threshold, check the
// potentially missed remote commands in order to clear it.
if (handledCommands.length > 20) {
await this.fetchMissedRemoteCommands();
}
});
}
async fetchMissedRemoteCommands() {
if (!Services.prefs.getBoolPref("identity.fxaccounts.commands.enabled", true)) {
return false;
}
log.info(`Consuming missed commands.`);
log.info(`Polling device commands.`);
await this._fxAccounts._withCurrentAccountState(async (getUserData, updateUserData) => {
const {device} = await getUserData(["device"]);
if (!device) {
throw new Error("No device registration.");
}
const lastCommandIndex = device.lastCommandIndex || 0;
const handledCommands = device.handledCommands || [];
handledCommands.push(lastCommandIndex); // Because the server also returns this command.
const {index, messages} = await this._fetchRemoteCommands(lastCommandIndex);
const missedMessages = messages.filter(m => !handledCommands.includes(m.index));
await updateUserData({
device: {...device, lastCommandIndex: index, handledCommands: []},
});
if (missedMessages.length) {
log.info(`Handling ${missedMessages.length} missed messages`);
Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", missedMessages.length);
await this._handleCommands(missedMessages);
// We increment lastCommandIndex by 1 because the server response includes the current index.
// If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got.
const lastCommandIndex = (device.lastCommandIndex + 1) || receivedIndex;
// We have already received this message before.
if (receivedIndex > 0 && receivedIndex < lastCommandIndex) {
return;
}
const {index, messages} = await this._fetchDeviceCommands(lastCommandIndex);
if (messages.length) {
await updateUserData({
device: {...device, lastCommandIndex: index},
});
log.info(`Handling ${messages.length} messages`);
if (scheduledFetch) {
Services.telemetry.scalarAdd("identity.fxaccounts.missed_commands_fetched", messages.length);
}
await this._handleCommands(messages);
}
});
return true;
}
async _fetchRemoteCommands(index, limit = null) {
async _fetchDeviceCommands(index, limit = null) {
const userData = await this._fxAccounts.getSignedInUser();
if (!userData) {
throw new Error("No user.");
@ -112,15 +102,20 @@ class FxAccountsCommands {
async _handleCommands(messages) {
const fxaDevices = await this._fxAccounts.getDeviceList();
// We debounce multiple incoming tabs so we show a single notification.
const tabsReceived = [];
for (const {data} of messages) {
let {command, payload, sender} = data;
if (sender) {
sender = fxaDevices.find(d => d.id == sender);
const {command, payload, sender: senderId} = data;
const sender = senderId ? fxaDevices.find(d => d.id == senderId) : null;
if (!sender) {
log.warn("Incoming command is from an unknown device (maybe disconnected?)");
}
switch (command) {
case COMMAND_SENDTAB:
try {
await this.sendTab.handle(sender, payload);
const {title, uri} = await this.sendTab.handle(payload);
log.info(`Tab received with FxA commands: ${title} from ${sender ? sender.name : "Unknown device"}.`);
tabsReceived.push({title, uri, sender});
} catch (e) {
log.error(`Error while handling incoming Send Tab payload.`, e);
}
@ -129,6 +124,9 @@ class FxAccountsCommands {
log.info(`Unknown command: ${command}.`);
}
}
if (tabsReceived.length) {
Observers.notify("fxaccounts:commands:open-uri", tabsReceived);
}
}
}
@ -184,22 +182,17 @@ class SendTab {
}
// Handle incoming send tab payload, called by FxAccountsCommands.
async handle(sender, {encrypted}) {
if (!sender) {
log.warn("Incoming tab is from an unknown device (maybe disconnected?)");
}
async handle({encrypted}) {
const bytes = await this._decrypt(encrypted);
const decoder = new TextDecoder("utf8");
const data = JSON.parse(decoder.decode(bytes));
const current = data.hasOwnProperty("current") ? data.current :
data.entries.length - 1;
const tabSender = {
id: sender ? sender.id : "",
name: sender ? sender.name : "",
};
const {title, url: uri} = data.entries[current];
log.info(`Tab received with FxA commands: ${title} from ${tabSender.name}.`);
Observers.notify("fxaccounts:commands:open-uri", [{uri, title, sender: tabSender}]);
return {
title,
uri,
};
}
async _encrypt(bytes, device) {

View file

@ -3,6 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
const {Services} = ChromeUtils.import("resource://gre/modules/Services.jsm");
const {Async} = ChromeUtils.import("resource://services-common/async.js");
const {FXA_PUSH_SCOPE_ACCOUNT_UPDATE, ONLOGOUT_NOTIFICATION, ON_ACCOUNT_DESTROYED_NOTIFICATION, ON_ACCOUNT_STATE_CHANGE_NOTIFICATION, ON_COLLECTION_CHANGED_NOTIFICATION, ON_COMMAND_RECEIVED_NOTIFICATION, ON_DEVICE_CONNECTED_NOTIFICATION, ON_DEVICE_DISCONNECTED_NOTIFICATION, ON_PASSWORD_CHANGED_NOTIFICATION, ON_PASSWORD_RESET_NOTIFICATION, ON_PROFILE_CHANGE_NOTIFICATION, ON_PROFILE_UPDATED_NOTIFICATION, ON_VERIFY_LOGIN_NOTIFICATION, log} = ChromeUtils.import("resource://gre/modules/FxAccountsCommon.js");
/**
@ -71,10 +72,17 @@ FxAccountsPushService.prototype = {
"resource://gre/modules/FxAccounts.jsm");
}
// listen to new push messages, push changes and logout events
Services.obs.addObserver(this, this.pushService.pushTopic);
Services.obs.addObserver(this, this.pushService.subscriptionChangeTopic);
Services.obs.addObserver(this, ONLOGOUT_NOTIFICATION);
this.asyncObserver = Async.asyncObserver(this, this.log);
// We use an async observer because a device waking up can
// observe multiple "Send Tab received" push notifications at the same time.
// The way these notifications are handled is as follows:
// Read index from storage, make network request, update the index.
// You can imagine what happens when multiple calls race: we load
// the same index multiple times and receive the same exact tabs, multiple times.
// The async observer will ensure we make these network requests serially.
Services.obs.addObserver(this.asyncObserver, this.pushService.pushTopic);
Services.obs.addObserver(this.asyncObserver, this.pushService.subscriptionChangeTopic);
Services.obs.addObserver(this.asyncObserver, ONLOGOUT_NOTIFICATION);
this.log.debug("FxAccountsPush initialized");
return true;
@ -103,51 +111,50 @@ FxAccountsPushService.prototype = {
});
},
/**
* Standard observer interface to listen to push messages, changes and logout.
* Async observer interface to listen to push messages, changes and logout.
*
* @param subject
* @param topic
* @param data
* @returns {Promise}
*/
_observe(subject, topic, data) {
this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`);
switch (topic) {
case this.pushService.pushTopic:
if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
let message = subject.QueryInterface(Ci.nsIPushMessage);
this._onPushMessage(message);
}
break;
case this.pushService.subscriptionChangeTopic:
if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
this._onPushSubscriptionChange();
}
break;
case ONLOGOUT_NOTIFICATION:
// user signed out, we need to stop polling the Push Server
this.unsubscribe().catch(err => {
this.log.error("Error during unsubscribe", err);
});
default:
break;
async observe(subject, topic, data) {
try {
this.log.trace(`observed topic=${topic}, data=${data}, subject=${subject}`);
switch (topic) {
case this.pushService.pushTopic:
if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
let message = subject.QueryInterface(Ci.nsIPushMessage);
await this._onPushMessage(message);
}
break;
case this.pushService.subscriptionChangeTopic:
if (data === FXA_PUSH_SCOPE_ACCOUNT_UPDATE) {
await this._onPushSubscriptionChange();
}
break;
case ONLOGOUT_NOTIFICATION:
// user signed out, we need to stop polling the Push Server
try {
await this.unsubscribe();
} catch (err) {
this.log.error("Error during unsubscribe", err);
}
default:
break;
}
} catch (err) {
this.log.error(err);
}
},
/**
* Wrapper around _observe that catches errors
*/
observe(subject, topic, data) {
Promise.resolve()
.then(() => this._observe(subject, topic, data))
.catch(err => this.log.error(err));
},
/**
* Fired when the Push server sends a notification.
*
* @private
* @returns {Promise}
*/
_onPushMessage(message) {
async _onPushMessage(message) {
this.log.trace("FxAccountsPushService _onPushMessage");
if (!message.data) {
// Use the empty signal to check the verification state of the account right away
@ -159,7 +166,7 @@ FxAccountsPushService.prototype = {
this.log.debug(`push command: ${payload.command}`);
switch (payload.command) {
case ON_COMMAND_RECEIVED_NOTIFICATION:
this.fxAccounts.commands.consumeRemoteCommand(payload.data.index);
await this.fxAccounts.commands.pollDeviceCommands(payload.data.index);
break;
case ON_DEVICE_CONNECTED_NOTIFICATION:
Services.obs.notifyObservers(null, ON_DEVICE_CONNECTED_NOTIFICATION, payload.data.deviceName);

View file

@ -50,15 +50,30 @@ add_task(async function test_sendtab_send() {
Assert.ok(commands.invoke.calledTwice);
});
add_task(async function test_commands_fetchMissedRemoteCommands() {
add_task(async function test_commands_pollDeviceCommands_push() {
// Server state.
const remoteMessages = [
{
index: 11,
data: {},
},
{
index: 12,
data: {},
},
];
const remoteIndex = 12;
// Local state.
const pushIndexReceived = 11;
const accountState = {
data: {
device: {
handledCommands: [8, 9, 10, 11],
lastCommandIndex: 11,
lastCommandIndex: 10,
},
},
};
const fxAccounts = {
async _withCurrentAccountState(cb) {
const get = () => accountState.data;
@ -67,26 +82,168 @@ add_task(async function test_commands_fetchMissedRemoteCommands() {
},
};
const commands = new FxAccountsCommands(fxAccounts);
commands._fetchRemoteCommands = () => {
return {
index: 12,
messages: [
{
index: 11,
data: {},
},
{
index: 12,
data: {},
},
],
};
};
commands._handleCommands = sinon.spy();
await commands.fetchMissedRemoteCommands();
const mockCommands = sinon.mock(commands);
mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
index: remoteIndex,
messages: remoteMessages,
});
mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
await commands.pollDeviceCommands(pushIndexReceived);
Assert.equal(accountState.data.device.handledCommands.length, 0);
mockCommands.verify();
Assert.equal(accountState.data.device.lastCommandIndex, 12);
});
add_task(async function test_commands_pollDeviceCommands_push_already_fetched() {
// Local state.
const pushIndexReceived = 12;
const accountState = {
data: {
device: {
lastCommandIndex: 12,
},
},
};
const fxAccounts = {
async _withCurrentAccountState(cb) {
const get = () => accountState.data;
const set = (val) => { accountState.data = val; };
await cb(get, set);
},
};
const commands = new FxAccountsCommands(fxAccounts);
const mockCommands = sinon.mock(commands);
mockCommands.expects("_fetchDeviceCommands").never();
mockCommands.expects("_handleCommands").never();
await commands.pollDeviceCommands(pushIndexReceived);
mockCommands.verify();
Assert.equal(accountState.data.device.lastCommandIndex, 12);
});
add_task(async function test_commands_pollDeviceCommands_push_local_state_empty() {
// Server state.
const remoteMessages = [
{
index: 11,
data: {},
},
{
index: 12,
data: {},
},
];
const remoteIndex = 12;
// Local state.
const pushIndexReceived = 11;
const accountState = {
data: {
device: {},
},
};
const fxAccounts = {
async _withCurrentAccountState(cb) {
const get = () => accountState.data;
const set = (val) => { accountState.data = val; };
await cb(get, set);
},
};
const commands = new FxAccountsCommands(fxAccounts);
const mockCommands = sinon.mock(commands);
mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
index: remoteIndex,
messages: remoteMessages,
});
mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
await commands.pollDeviceCommands(pushIndexReceived);
mockCommands.verify();
Assert.equal(accountState.data.device.lastCommandIndex, 12);
});
add_task(async function test_commands_pollDeviceCommands_scheduled_local() {
// Server state.
const remoteMessages = [
{
index: 11,
data: {},
},
{
index: 12,
data: {},
},
];
const remoteIndex = 12;
// Local state.
const accountState = {
data: {
device: {
lastCommandIndex: 10,
},
},
};
const fxAccounts = {
async _withCurrentAccountState(cb) {
const get = () => accountState.data;
const set = (val) => { accountState.data = val; };
await cb(get, set);
},
};
const commands = new FxAccountsCommands(fxAccounts);
const mockCommands = sinon.mock(commands);
mockCommands.expects("_fetchDeviceCommands").once().withArgs(11).returns({
index: remoteIndex,
messages: remoteMessages,
});
mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
await commands.pollDeviceCommands();
mockCommands.verify();
Assert.equal(accountState.data.device.lastCommandIndex, 12);
});
add_task(async function test_commands_pollDeviceCommands_scheduled_local_state_empty() {
// Server state.
const remoteMessages = [
{
index: 11,
data: {},
},
{
index: 12,
data: {},
},
];
const remoteIndex = 12;
// Local state.
const accountState = {
data: {
device: {},
},
};
const fxAccounts = {
async _withCurrentAccountState(cb) {
const get = () => accountState.data;
const set = (val) => { accountState.data = val; };
await cb(get, set);
},
};
const commands = new FxAccountsCommands(fxAccounts);
const mockCommands = sinon.mock(commands);
mockCommands.expects("_fetchDeviceCommands").once().withArgs(0).returns({
index: remoteIndex,
messages: remoteMessages,
});
mockCommands.expects("_handleCommands").once().withArgs(remoteMessages);
await commands.pollDeviceCommands();
mockCommands.verify();
Assert.equal(accountState.data.device.lastCommandIndex, 12);
const callArgs = commands._handleCommands.args[0][0];
Assert.equal(callArgs[0].index, 12);
});

View file

@ -426,7 +426,7 @@ add_task(async function commandReceived() {
let fxAccountsMock = {};
const promiseConsumeRemoteMessagesCalled = new Promise(res => {
fxAccountsMock.commands = {
consumeRemoteCommand() {
pollDeviceCommands() {
res();
},
};

View file

@ -550,7 +550,7 @@ SyncScheduler.prototype = {
// Only fetch missed messages in a "scheduled" sync so we don't race against
// the Push service reconnecting on a network link change for example.
if (why == "schedule" && now >= this.missedFxACommandsLastFetch + this.missedFxACommandsFetchInterval) {
fxAccounts.commands.fetchMissedRemoteCommands().then(() => {
fxAccounts.commands.pollDeviceCommands().then(() => {
this.missedFxACommandsLastFetch = now;
}).catch(e => {
this._log.error("Fetching missed remote commands failed.", e);