forked from mirrors/gecko-dev
		
	 e4a5ee36a6
			
		
	
	
		e4a5ee36a6
		
	
	
	
	
		
			
			Theoretically we only need to change this where the strings might be non-ascii but it seems safer in the long run to just avoid the "char" versions entirely. Differential Revision: https://phabricator.services.mozilla.com/D200342
		
			
				
	
	
		
			1781 lines
		
	
	
	
		
			58 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			1781 lines
		
	
	
	
		
			58 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| /* Any copyright is dedicated to the Public Domain.
 | |
|  * http://creativecommons.org/publicdomain/zero/1.0/ */
 | |
| 
 | |
| const { Weave } = ChromeUtils.importESModule(
 | |
|   "resource://services-sync/main.sys.mjs"
 | |
| );
 | |
| const { WBORecord } = ChromeUtils.importESModule(
 | |
|   "resource://services-sync/record.sys.mjs"
 | |
| );
 | |
| const { Service } = ChromeUtils.importESModule(
 | |
|   "resource://services-sync/service.sys.mjs"
 | |
| );
 | |
| const { RotaryEngine } = ChromeUtils.importESModule(
 | |
|   "resource://testing-common/services/sync/rotaryengine.sys.mjs"
 | |
| );
 | |
| 
 | |
| function makeRotaryEngine() {
 | |
|   return new RotaryEngine(Service);
 | |
| }
 | |
| 
 | |
| async function clean(engine) {
 | |
|   for (const pref of Svc.PrefBranch.getChildList("")) {
 | |
|     Svc.PrefBranch.clearUserPref(pref);
 | |
|   }
 | |
|   Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace");
 | |
|   Service.recordManager.clearCache();
 | |
|   await engine._tracker.clearChangedIDs();
 | |
|   await engine.finalize();
 | |
| }
 | |
| 
 | |
| async function cleanAndGo(engine, server) {
 | |
|   await clean(engine);
 | |
|   await promiseStopServer(server);
 | |
| }
 | |
| 
 | |
| async function promiseClean(engine, server) {
 | |
|   await clean(engine);
 | |
|   await promiseStopServer(server);
 | |
| }
 | |
| 
 | |
| async function createServerAndConfigureClient() {
 | |
|   let engine = new RotaryEngine(Service);
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
| 
 | |
|   let contents = {
 | |
|     meta: {
 | |
|       global: { engines: { rotary: { version: engine.version, syncID } } },
 | |
|     },
 | |
|     crypto: {},
 | |
|     rotary: {},
 | |
|   };
 | |
| 
 | |
|   const USER = "foo";
 | |
|   let server = new SyncServer();
 | |
|   server.registerUser(USER, "password");
 | |
|   server.createContents(USER, contents);
 | |
|   server.start();
 | |
| 
 | |
|   await SyncTestingInfrastructure(server, USER);
 | |
|   Service._updateCachedURLs();
 | |
| 
 | |
|   return [engine, server, USER];
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Tests
 | |
|  *
 | |
|  * SyncEngine._sync() is divided into four rather independent steps:
 | |
|  *
 | |
|  * - _syncStartup()
 | |
|  * - _processIncoming()
 | |
|  * - _uploadOutgoing()
 | |
|  * - _syncFinish()
 | |
|  *
 | |
|  * In the spirit of unit testing, these are tested individually for
 | |
|  * different scenarios below.
 | |
|  */
 | |
| 
 | |
| add_task(async function setup() {
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
|   Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace");
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() {
 | |
|   _(
 | |
|     "SyncEngine._syncStartup resets sync and wipes server data if there's no or an outdated global record"
 | |
|   );
 | |
| 
 | |
|   // Some server side data that's going to be wiped
 | |
|   let collection = new ServerCollection();
 | |
|   collection.insert(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
|   collection.insert(
 | |
|     "scotsman",
 | |
|     encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
 | |
|   );
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store.items = { rekolok: "Rekonstruktionslokomotive" };
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.rekolok, undefined);
 | |
|     let metaGlobal = await Service.recordManager.get(engine.metaURL);
 | |
|     Assert.equal(metaGlobal.payload.engines, undefined);
 | |
|     Assert.ok(!!collection.payload("flying"));
 | |
|     Assert.ok(!!collection.payload("scotsman"));
 | |
| 
 | |
|     await engine.setLastSync(Date.now() / 1000);
 | |
| 
 | |
|     // Trying to prompt a wipe -- we no longer track CryptoMeta per engine,
 | |
|     // so it has nothing to check.
 | |
|     await engine._syncStartup();
 | |
| 
 | |
|     // The meta/global WBO has been filled with data about the engine
 | |
|     let engineData = metaGlobal.payload.engines.rotary;
 | |
|     Assert.equal(engineData.version, engine.version);
 | |
|     Assert.equal(engineData.syncID, await engine.getSyncID());
 | |
| 
 | |
|     // Sync was reset and server data was wiped
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     Assert.equal(collection.payload("scotsman"), undefined);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncStartup_serverHasNewerVersion() {
 | |
|   _("SyncEngine._syncStartup ");
 | |
| 
 | |
|   let global = new ServerWBO("global", {
 | |
|     engines: { rotary: { version: 23456 } },
 | |
|   });
 | |
|   let server = httpd_setup({
 | |
|     "/1.1/foo/storage/meta/global": global.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     // The server has a newer version of the data and our engine can
 | |
|     // handle.  That should give us an exception.
 | |
|     let error;
 | |
|     try {
 | |
|       await engine._syncStartup();
 | |
|     } catch (ex) {
 | |
|       error = ex;
 | |
|     }
 | |
|     Assert.equal(error.failureCode, VERSION_OUT_OF_DATE);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncStartup_syncIDMismatchResetsClient() {
 | |
|   _("SyncEngine._syncStartup resets sync if syncIDs don't match");
 | |
| 
 | |
|   let server = sync_httpd_setup({});
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   // global record with a different syncID than our engine has
 | |
|   let engine = makeRotaryEngine();
 | |
|   let global = new ServerWBO("global", {
 | |
|     engines: { rotary: { version: engine.version, syncID: "foobar" } },
 | |
|   });
 | |
|   server.registerPathHandler("/1.1/foo/storage/meta/global", global.handler());
 | |
| 
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(await engine.getSyncID(), "");
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.rekolok, undefined);
 | |
| 
 | |
|     await engine.setLastSync(Date.now() / 1000);
 | |
|     await engine._syncStartup();
 | |
| 
 | |
|     // The engine has assumed the server's syncID
 | |
|     Assert.equal(await engine.getSyncID(), "foobar");
 | |
| 
 | |
|     // Sync was reset
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_emptyServer() {
 | |
|   _("SyncEngine._processIncoming working with an empty server backend");
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     // Merely ensure that this code path is run without any errors
 | |
|     await engine._processIncoming();
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_createFromServer() {
 | |
|   _("SyncEngine._processIncoming creates new records from server data");
 | |
| 
 | |
|   // Some server records that will be downloaded
 | |
|   let collection = new ServerCollection();
 | |
|   collection.insert(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
|   collection.insert(
 | |
|     "scotsman",
 | |
|     encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
 | |
|   );
 | |
| 
 | |
|   // Two pathological cases involving relative URIs gone wrong.
 | |
|   let pathologicalPayload = encryptPayload({
 | |
|     id: "../pathological",
 | |
|     denomination: "Pathological Case",
 | |
|   });
 | |
|   collection.insert("../pathological", pathologicalPayload);
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|     "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
 | |
|     "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|     Assert.equal(engine.lastModified, null);
 | |
|     Assert.equal(engine._store.items.flying, undefined);
 | |
|     Assert.equal(engine._store.items.scotsman, undefined);
 | |
|     Assert.equal(engine._store.items["../pathological"], undefined);
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Timestamps of last sync and last server modification are set.
 | |
|     Assert.ok((await engine.getLastSync()) > 0);
 | |
|     Assert.ok(engine.lastModified > 0);
 | |
| 
 | |
|     // Local records have been created from the server data.
 | |
|     Assert.equal(engine._store.items.flying, "LNER Class A3 4472");
 | |
|     Assert.equal(engine._store.items.scotsman, "Flying Scotsman");
 | |
|     Assert.equal(engine._store.items["../pathological"], "Pathological Case");
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_reconcile() {
 | |
|   _("SyncEngine._processIncoming updates local records");
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
| 
 | |
|   // This server record is newer than the corresponding client one,
 | |
|   // so it'll update its data.
 | |
|   collection.insert(
 | |
|     "newrecord",
 | |
|     encryptPayload({ id: "newrecord", denomination: "New stuff..." })
 | |
|   );
 | |
| 
 | |
|   // This server record is newer than the corresponding client one,
 | |
|   // so it'll update its data.
 | |
|   collection.insert(
 | |
|     "newerserver",
 | |
|     encryptPayload({ id: "newerserver", denomination: "New data!" })
 | |
|   );
 | |
| 
 | |
|   // This server record is 2 mins older than the client counterpart
 | |
|   // but identical to it, so we're expecting the client record's
 | |
|   // changedID to be reset.
 | |
|   collection.insert(
 | |
|     "olderidentical",
 | |
|     encryptPayload({
 | |
|       id: "olderidentical",
 | |
|       denomination: "Older but identical",
 | |
|     })
 | |
|   );
 | |
|   collection._wbos.olderidentical.modified -= 120;
 | |
| 
 | |
|   // This item simply has different data than the corresponding client
 | |
|   // record (which is unmodified), so it will update the client as well
 | |
|   collection.insert(
 | |
|     "updateclient",
 | |
|     encryptPayload({ id: "updateclient", denomination: "Get this!" })
 | |
|   );
 | |
| 
 | |
|   // This is a dupe of 'original'.
 | |
|   collection.insert(
 | |
|     "duplication",
 | |
|     encryptPayload({ id: "duplication", denomination: "Original Entry" })
 | |
|   );
 | |
| 
 | |
|   // This record is marked as deleted, so we're expecting the client
 | |
|   // record to be removed.
 | |
|   collection.insert(
 | |
|     "nukeme",
 | |
|     encryptPayload({ id: "nukeme", denomination: "Nuke me!", deleted: true })
 | |
|   );
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store.items = {
 | |
|     newerserver: "New data, but not as new as server!",
 | |
|     olderidentical: "Older but identical",
 | |
|     updateclient: "Got data?",
 | |
|     original: "Original Entry",
 | |
|     long_original: "Long Original Entry",
 | |
|     nukeme: "Nuke me!",
 | |
|   };
 | |
|   // Make this record 1 min old, thus older than the one on the server
 | |
|   await engine._tracker.addChangedID("newerserver", Date.now() / 1000 - 60);
 | |
|   // This record has been changed 2 mins later than the one on the server
 | |
|   await engine._tracker.addChangedID("olderidentical", Date.now() / 1000);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(engine._store.items.newrecord, undefined);
 | |
|     Assert.equal(
 | |
|       engine._store.items.newerserver,
 | |
|       "New data, but not as new as server!"
 | |
|     );
 | |
|     Assert.equal(engine._store.items.olderidentical, "Older but identical");
 | |
|     Assert.equal(engine._store.items.updateclient, "Got data?");
 | |
|     Assert.equal(engine._store.items.nukeme, "Nuke me!");
 | |
|     let changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.ok(changes.olderidentical > 0);
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Timestamps of last sync and last server modification are set.
 | |
|     Assert.ok((await engine.getLastSync()) > 0);
 | |
|     Assert.ok(engine.lastModified > 0);
 | |
| 
 | |
|     // The new record is created.
 | |
|     Assert.equal(engine._store.items.newrecord, "New stuff...");
 | |
| 
 | |
|     // The 'newerserver' record is updated since the server data is newer.
 | |
|     Assert.equal(engine._store.items.newerserver, "New data!");
 | |
| 
 | |
|     // The data for 'olderidentical' is identical on the server, so
 | |
|     // it's no longer marked as changed anymore.
 | |
|     Assert.equal(engine._store.items.olderidentical, "Older but identical");
 | |
|     changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.olderidentical, undefined);
 | |
| 
 | |
|     // Updated with server data.
 | |
|     Assert.equal(engine._store.items.updateclient, "Get this!");
 | |
| 
 | |
|     // The incoming ID is preferred.
 | |
|     Assert.equal(engine._store.items.original, undefined);
 | |
|     Assert.equal(engine._store.items.duplication, "Original Entry");
 | |
|     Assert.notEqual(engine._delete.ids.indexOf("original"), -1);
 | |
| 
 | |
|     // The 'nukeme' record marked as deleted is removed.
 | |
|     Assert.equal(engine._store.items.nukeme, undefined);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_reconcile_local_deleted() {
 | |
|   _("Ensure local, duplicate ID is deleted on server.");
 | |
| 
 | |
|   // When a duplicate is resolved, the local ID (which is never taken) should
 | |
|   // be deleted on the server.
 | |
|   let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|   let now = Date.now() / 1000 - 10;
 | |
|   await engine.setLastSync(now);
 | |
|   engine.lastModified = now + 1;
 | |
| 
 | |
|   let record = encryptPayload({
 | |
|     id: "DUPE_INCOMING",
 | |
|     denomination: "incoming",
 | |
|   });
 | |
|   let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
 | |
|   server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|   record = encryptPayload({ id: "DUPE_LOCAL", denomination: "local" });
 | |
|   wbo = new ServerWBO("DUPE_LOCAL", record, now - 1);
 | |
|   server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|   await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
 | |
|   Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
 | |
|   Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
 | |
| 
 | |
|   await engine._sync();
 | |
| 
 | |
|   do_check_attribute_count(engine._store.items, 1);
 | |
|   Assert.ok("DUPE_INCOMING" in engine._store.items);
 | |
| 
 | |
|   let collection = server.getCollection(user, "rotary");
 | |
|   Assert.equal(1, collection.count());
 | |
|   Assert.notEqual(undefined, collection.wbo("DUPE_INCOMING"));
 | |
| 
 | |
|   await cleanAndGo(engine, server);
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_reconcile_equivalent() {
 | |
|   _("Ensure proper handling of incoming records that match local.");
 | |
| 
 | |
|   let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|   let now = Date.now() / 1000 - 10;
 | |
|   await engine.setLastSync(now);
 | |
|   engine.lastModified = now + 1;
 | |
| 
 | |
|   let record = encryptPayload({ id: "entry", denomination: "denomination" });
 | |
|   let wbo = new ServerWBO("entry", record, now + 2);
 | |
|   server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|   engine._store.items = { entry: "denomination" };
 | |
|   Assert.ok(await engine._store.itemExists("entry"));
 | |
| 
 | |
|   await engine._sync();
 | |
| 
 | |
|   do_check_attribute_count(engine._store.items, 1);
 | |
| 
 | |
|   await cleanAndGo(engine, server);
 | |
| });
 | |
| 
 | |
| add_task(
 | |
|   async function test_processIncoming_reconcile_locally_deleted_dupe_new() {
 | |
|     _(
 | |
|       "Ensure locally deleted duplicate record newer than incoming is handled."
 | |
|     );
 | |
| 
 | |
|     // This is a somewhat complicated test. It ensures that if a client receives
 | |
|     // a modified record for an item that is deleted locally but with a different
 | |
|     // ID that the incoming record is ignored. This is a corner case for record
 | |
|     // handling, but it needs to be supported.
 | |
|     let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|     let now = Date.now() / 1000 - 10;
 | |
|     await engine.setLastSync(now);
 | |
|     engine.lastModified = now + 1;
 | |
| 
 | |
|     let record = encryptPayload({
 | |
|       id: "DUPE_INCOMING",
 | |
|       denomination: "incoming",
 | |
|     });
 | |
|     let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
 | |
|     server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|     // Simulate a locally-deleted item.
 | |
|     engine._store.items = {};
 | |
|     await engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
 | |
|     Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL"));
 | |
|     Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING"));
 | |
|     Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
 | |
| 
 | |
|     engine.lastModified = server.getCollection(user, engine.name).timestamp;
 | |
|     await engine._sync();
 | |
| 
 | |
|     // After the sync, the server's payload for the original ID should be marked
 | |
|     // as deleted.
 | |
|     do_check_empty(engine._store.items);
 | |
|     let collection = server.getCollection(user, "rotary");
 | |
|     Assert.equal(1, collection.count());
 | |
|     wbo = collection.wbo("DUPE_INCOMING");
 | |
|     Assert.notEqual(null, wbo);
 | |
|     let payload = wbo.getCleartext();
 | |
|     Assert.ok(payload.deleted);
 | |
| 
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(
 | |
|   async function test_processIncoming_reconcile_locally_deleted_dupe_old() {
 | |
|     _(
 | |
|       "Ensure locally deleted duplicate record older than incoming is restored."
 | |
|     );
 | |
| 
 | |
|     // This is similar to the above test except it tests the condition where the
 | |
|     // incoming record is newer than the local deletion, therefore overriding it.
 | |
| 
 | |
|     let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|     let now = Date.now() / 1000 - 10;
 | |
|     await engine.setLastSync(now);
 | |
|     engine.lastModified = now + 1;
 | |
| 
 | |
|     let record = encryptPayload({
 | |
|       id: "DUPE_INCOMING",
 | |
|       denomination: "incoming",
 | |
|     });
 | |
|     let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
 | |
|     server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|     // Simulate a locally-deleted item.
 | |
|     engine._store.items = {};
 | |
|     await engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
 | |
|     Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL"));
 | |
|     Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING"));
 | |
|     Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
 | |
| 
 | |
|     await engine._sync();
 | |
| 
 | |
|     // Since the remote change is newer, the incoming item should exist locally.
 | |
|     do_check_attribute_count(engine._store.items, 1);
 | |
|     Assert.ok("DUPE_INCOMING" in engine._store.items);
 | |
|     Assert.equal("incoming", engine._store.items.DUPE_INCOMING);
 | |
| 
 | |
|     let collection = server.getCollection(user, "rotary");
 | |
|     Assert.equal(1, collection.count());
 | |
|     wbo = collection.wbo("DUPE_INCOMING");
 | |
|     let payload = wbo.getCleartext();
 | |
|     Assert.equal("incoming", payload.denomination);
 | |
| 
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(async function test_processIncoming_reconcile_changed_dupe() {
 | |
|   _("Ensure that locally changed duplicate record is handled properly.");
 | |
| 
 | |
|   let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|   let now = Date.now() / 1000 - 10;
 | |
|   await engine.setLastSync(now);
 | |
|   engine.lastModified = now + 1;
 | |
| 
 | |
|   // The local record is newer than the incoming one, so it should be retained.
 | |
|   let record = encryptPayload({
 | |
|     id: "DUPE_INCOMING",
 | |
|     denomination: "incoming",
 | |
|   });
 | |
|   let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
 | |
|   server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|   await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
 | |
|   await engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
 | |
|   Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
 | |
|   Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
 | |
| 
 | |
|   engine.lastModified = server.getCollection(user, engine.name).timestamp;
 | |
|   await engine._sync();
 | |
| 
 | |
|   // The ID should have been changed to incoming.
 | |
|   do_check_attribute_count(engine._store.items, 1);
 | |
|   Assert.ok("DUPE_INCOMING" in engine._store.items);
 | |
| 
 | |
|   // On the server, the local ID should be deleted and the incoming ID should
 | |
|   // have its payload set to what was in the local record.
 | |
|   let collection = server.getCollection(user, "rotary");
 | |
|   Assert.equal(1, collection.count());
 | |
|   wbo = collection.wbo("DUPE_INCOMING");
 | |
|   Assert.notEqual(undefined, wbo);
 | |
|   let payload = wbo.getCleartext();
 | |
|   Assert.equal("local", payload.denomination);
 | |
| 
 | |
|   await cleanAndGo(engine, server);
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_reconcile_changed_dupe_new() {
 | |
|   _("Ensure locally changed duplicate record older than incoming is ignored.");
 | |
| 
 | |
|   // This test is similar to the above except the incoming record is younger
 | |
|   // than the local record. The incoming record should be authoritative.
 | |
|   let [engine, server, user] = await createServerAndConfigureClient();
 | |
| 
 | |
|   let now = Date.now() / 1000 - 10;
 | |
|   await engine.setLastSync(now);
 | |
|   engine.lastModified = now + 1;
 | |
| 
 | |
|   let record = encryptPayload({
 | |
|     id: "DUPE_INCOMING",
 | |
|     denomination: "incoming",
 | |
|   });
 | |
|   let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
 | |
|   server.insertWBO(user, "rotary", wbo);
 | |
| 
 | |
|   await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
 | |
|   await engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
 | |
|   Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
 | |
|   Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
 | |
| 
 | |
|   engine.lastModified = server.getCollection(user, engine.name).timestamp;
 | |
|   await engine._sync();
 | |
| 
 | |
|   // The ID should have been changed to incoming.
 | |
|   do_check_attribute_count(engine._store.items, 1);
 | |
|   Assert.ok("DUPE_INCOMING" in engine._store.items);
 | |
| 
 | |
|   // On the server, the local ID should be deleted and the incoming ID should
 | |
|   // have its payload retained.
 | |
|   let collection = server.getCollection(user, "rotary");
 | |
|   Assert.equal(1, collection.count());
 | |
|   wbo = collection.wbo("DUPE_INCOMING");
 | |
|   Assert.notEqual(undefined, wbo);
 | |
|   let payload = wbo.getCleartext();
 | |
|   Assert.equal("incoming", payload.denomination);
 | |
|   await cleanAndGo(engine, server);
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_resume_toFetch() {
 | |
|   _(
 | |
|     "toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items."
 | |
|   );
 | |
| 
 | |
|   const LASTSYNC = Date.now() / 1000;
 | |
| 
 | |
|   // Server records that will be downloaded
 | |
|   let collection = new ServerCollection();
 | |
|   collection.insert(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
|   collection.insert(
 | |
|     "scotsman",
 | |
|     encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
 | |
|   );
 | |
|   collection.insert(
 | |
|     "rekolok",
 | |
|     encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" })
 | |
|   );
 | |
|   for (let i = 0; i < 3; i++) {
 | |
|     let id = "failed" + i;
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + i });
 | |
|     let wbo = new ServerWBO(id, payload);
 | |
|     wbo.modified = LASTSYNC - 10;
 | |
|     collection.insertWBO(wbo);
 | |
|   }
 | |
| 
 | |
|   collection.wbo("flying").modified = collection.wbo("scotsman").modified =
 | |
|     LASTSYNC - 10;
 | |
|   collection._wbos.rekolok.modified = LASTSYNC + 10;
 | |
| 
 | |
|   // Time travel 10 seconds into the future but still download the above WBOs.
 | |
|   let engine = makeRotaryEngine();
 | |
|   await engine.setLastSync(LASTSYNC);
 | |
|   engine.toFetch = new SerializableSet(["flying", "scotsman"]);
 | |
|   engine.previousFailed = new SerializableSet([
 | |
|     "failed0",
 | |
|     "failed1",
 | |
|     "failed2",
 | |
|   ]);
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(engine._store.items.flying, undefined);
 | |
|     Assert.equal(engine._store.items.scotsman, undefined);
 | |
|     Assert.equal(engine._store.items.rekolok, undefined);
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Local records have been created from the server data.
 | |
|     Assert.equal(engine._store.items.flying, "LNER Class A3 4472");
 | |
|     Assert.equal(engine._store.items.scotsman, "Flying Scotsman");
 | |
|     Assert.equal(engine._store.items.rekolok, "Rekonstruktionslokomotive");
 | |
|     Assert.equal(engine._store.items.failed0, "Record No. 0");
 | |
|     Assert.equal(engine._store.items.failed1, "Record No. 1");
 | |
|     Assert.equal(engine._store.items.failed2, "Record No. 2");
 | |
|     Assert.equal(engine.previousFailed.size, 0);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_notify_count() {
 | |
|   _("Ensure that failed records are reported only once.");
 | |
| 
 | |
|   const NUMBER_OF_RECORDS = 15;
 | |
| 
 | |
|   // Engine that fails every 5 records.
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
 | |
|   engine._store.applyIncomingBatch = async function (records, countTelemetry) {
 | |
|     let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
 | |
|     let recordsToApply = [],
 | |
|       recordsToFail = [];
 | |
|     for (let i = 0; i < sortedRecords.length; i++) {
 | |
|       (i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]);
 | |
|     }
 | |
|     recordsToFail.forEach(() => {
 | |
|       countTelemetry.addIncomingFailedReason("failed message");
 | |
|     });
 | |
|     await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
 | |
| 
 | |
|     return recordsToFail.map(record => record.id);
 | |
|   };
 | |
| 
 | |
|   // Create a batch of server side records.
 | |
|   let collection = new ServerCollection();
 | |
|   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
 | |
|     let id = "record-no-" + i.toString(10).padStart(2, "0");
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + id });
 | |
|     collection.insert(id, payload);
 | |
|   }
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
|   try {
 | |
|     // Confirm initial environment.
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|     Assert.equal(engine.toFetch.size, 0);
 | |
|     Assert.equal(engine.previousFailed.size, 0);
 | |
|     do_check_empty(engine._store.items);
 | |
| 
 | |
|     let called = 0;
 | |
|     let counts;
 | |
|     function onApplied(count) {
 | |
|       _("Called with " + JSON.stringify(counts));
 | |
|       counts = count;
 | |
|       called++;
 | |
|     }
 | |
|     Svc.Obs.add("weave:engine:sync:applied", onApplied);
 | |
| 
 | |
|     // Do sync.
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Confirm failures.
 | |
|     do_check_attribute_count(engine._store.items, 12);
 | |
|     Assert.deepEqual(
 | |
|       Array.from(engine.previousFailed).sort(),
 | |
|       ["record-no-00", "record-no-05", "record-no-10"].sort()
 | |
|     );
 | |
| 
 | |
|     // There are newly failed records and they are reported.
 | |
|     Assert.equal(called, 1);
 | |
|     Assert.equal(counts.failed, 3);
 | |
|     Assert.equal(counts.failedReasons[0].count, 3);
 | |
|     Assert.equal(counts.failedReasons[0].name, "failed message");
 | |
|     Assert.equal(counts.applied, 15);
 | |
|     Assert.equal(counts.newFailed, 3);
 | |
|     Assert.equal(counts.succeeded, 12);
 | |
| 
 | |
|     // Sync again, 1 of the failed items are the same, the rest didn't fail.
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Confirming removed failures.
 | |
|     do_check_attribute_count(engine._store.items, 14);
 | |
|     // After failing twice the record that failed again [record-no-00]
 | |
|     // should NOT be stored to try again
 | |
|     Assert.deepEqual(Array.from(engine.previousFailed), []);
 | |
| 
 | |
|     Assert.equal(called, 2);
 | |
|     Assert.equal(counts.failed, 1);
 | |
|     Assert.equal(counts.failedReasons[0].count, 1);
 | |
|     Assert.equal(counts.failedReasons[0].name, "failed message");
 | |
|     Assert.equal(counts.applied, 3);
 | |
|     Assert.equal(counts.newFailed, 0);
 | |
|     Assert.equal(counts.succeeded, 2);
 | |
| 
 | |
|     Svc.Obs.remove("weave:engine:sync:applied", onApplied);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_previousFailed() {
 | |
|   _("Ensure that failed records are retried.");
 | |
| 
 | |
|   const NUMBER_OF_RECORDS = 14;
 | |
| 
 | |
|   // Engine that alternates between failing and applying every 2 records.
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
 | |
|   engine._store.applyIncomingBatch = async function (records, countTelemetry) {
 | |
|     let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
 | |
|     let recordsToApply = [],
 | |
|       recordsToFail = [];
 | |
|     let chunks = Array.from(PlacesUtils.chunkArray(sortedRecords, 2));
 | |
|     for (let i = 0; i < chunks.length; i++) {
 | |
|       (i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]);
 | |
|     }
 | |
|     await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
 | |
|     return recordsToFail.map(record => record.id);
 | |
|   };
 | |
| 
 | |
|   // Create a batch of server side records.
 | |
|   let collection = new ServerCollection();
 | |
|   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
 | |
|     let id = "record-no-" + i.toString(10).padStart(2, "0");
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + i });
 | |
|     collection.insert(id, payload);
 | |
|   }
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
|   try {
 | |
|     // Confirm initial environment.
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|     Assert.equal(engine.toFetch.size, 0);
 | |
|     Assert.equal(engine.previousFailed.size, 0);
 | |
|     do_check_empty(engine._store.items);
 | |
| 
 | |
|     // Initial failed items in previousFailed to be reset.
 | |
|     let previousFailed = new SerializableSet([
 | |
|       Utils.makeGUID(),
 | |
|       Utils.makeGUID(),
 | |
|       Utils.makeGUID(),
 | |
|     ]);
 | |
|     engine.previousFailed = previousFailed;
 | |
|     Assert.equal(engine.previousFailed, previousFailed);
 | |
| 
 | |
|     // Do sync.
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Expected result: 4 sync batches with 2 failures each => 8 failures
 | |
|     do_check_attribute_count(engine._store.items, 6);
 | |
|     Assert.deepEqual(
 | |
|       Array.from(engine.previousFailed).sort(),
 | |
|       [
 | |
|         "record-no-00",
 | |
|         "record-no-01",
 | |
|         "record-no-04",
 | |
|         "record-no-05",
 | |
|         "record-no-08",
 | |
|         "record-no-09",
 | |
|         "record-no-12",
 | |
|         "record-no-13",
 | |
|       ].sort()
 | |
|     );
 | |
| 
 | |
|     // Sync again with the same failed items (records 0, 1, 8, 9).
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     do_check_attribute_count(engine._store.items, 10);
 | |
|     // A second sync with the same failed items should NOT add the same items again.
 | |
|     // Items that did not fail a second time should no longer be in previousFailed.
 | |
|     Assert.deepEqual(Array.from(engine.previousFailed).sort(), []);
 | |
| 
 | |
|     // Refetched items that didn't fail the second time are in engine._store.items.
 | |
|     Assert.equal(engine._store.items["record-no-04"], "Record No. 4");
 | |
|     Assert.equal(engine._store.items["record-no-05"], "Record No. 5");
 | |
|     Assert.equal(engine._store.items["record-no-12"], "Record No. 12");
 | |
|     Assert.equal(engine._store.items["record-no-13"], "Record No. 13");
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_failed_records() {
 | |
|   _(
 | |
|     "Ensure that failed records from _reconcile and applyIncomingBatch are refetched."
 | |
|   );
 | |
| 
 | |
|   // Let's create three and a bit batches worth of server side records.
 | |
|   let APPLY_BATCH_SIZE = 50;
 | |
|   let collection = new ServerCollection();
 | |
|   const NUMBER_OF_RECORDS = APPLY_BATCH_SIZE * 3 + 5;
 | |
|   for (let i = 0; i < NUMBER_OF_RECORDS; i++) {
 | |
|     let id = "record-no-" + i;
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + id });
 | |
|     let wbo = new ServerWBO(id, payload);
 | |
|     wbo.modified = Date.now() / 1000 + 60 * (i - APPLY_BATCH_SIZE * 3);
 | |
|     collection.insertWBO(wbo);
 | |
|   }
 | |
| 
 | |
|   // Engine that batches but likes to throw on a couple of records,
 | |
|   // two in each batch: the even ones fail in reconcile, the odd ones
 | |
|   // in applyIncoming.
 | |
|   const BOGUS_RECORDS = [
 | |
|     "record-no-" + 42,
 | |
|     "record-no-" + 23,
 | |
|     "record-no-" + (42 + APPLY_BATCH_SIZE),
 | |
|     "record-no-" + (23 + APPLY_BATCH_SIZE),
 | |
|     "record-no-" + (42 + APPLY_BATCH_SIZE * 2),
 | |
|     "record-no-" + (23 + APPLY_BATCH_SIZE * 2),
 | |
|     "record-no-" + (2 + APPLY_BATCH_SIZE * 3),
 | |
|     "record-no-" + (1 + APPLY_BATCH_SIZE * 3),
 | |
|   ];
 | |
|   let engine = makeRotaryEngine();
 | |
| 
 | |
|   engine.__reconcile = engine._reconcile;
 | |
|   engine._reconcile = async function _reconcile(record) {
 | |
|     if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
 | |
|       throw new Error("I don't like this record! Baaaaaah!");
 | |
|     }
 | |
|     return this.__reconcile.apply(this, arguments);
 | |
|   };
 | |
|   engine._store._applyIncoming = engine._store.applyIncoming;
 | |
|   engine._store.applyIncoming = async function (record) {
 | |
|     if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
 | |
|       throw new Error("I don't like this record! Baaaaaah!");
 | |
|     }
 | |
|     return this._applyIncoming.apply(this, arguments);
 | |
|   };
 | |
| 
 | |
|   // Keep track of requests made of a collection.
 | |
|   let count = 0;
 | |
|   let uris = [];
 | |
|   function recording_handler(recordedCollection) {
 | |
|     let h = recordedCollection.handler();
 | |
|     return function (req, res) {
 | |
|       ++count;
 | |
|       uris.push(req.path + "?" + req.queryString);
 | |
|       return h(req, res);
 | |
|     };
 | |
|   }
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": recording_handler(collection),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(await engine.getLastSync(), 0);
 | |
|     Assert.equal(engine.toFetch.size, 0);
 | |
|     Assert.equal(engine.previousFailed.size, 0);
 | |
|     do_check_empty(engine._store.items);
 | |
| 
 | |
|     let observerSubject;
 | |
|     let observerData;
 | |
|     Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
 | |
|       Svc.Obs.remove("weave:engine:sync:applied", onApplied);
 | |
|       observerSubject = subject;
 | |
|       observerData = data;
 | |
|     });
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     // Ensure that all records but the bogus 4 have been applied.
 | |
|     do_check_attribute_count(
 | |
|       engine._store.items,
 | |
|       NUMBER_OF_RECORDS - BOGUS_RECORDS.length
 | |
|     );
 | |
| 
 | |
|     // Ensure that the bogus records will be fetched again on the next sync.
 | |
|     Assert.equal(engine.previousFailed.size, BOGUS_RECORDS.length);
 | |
|     Assert.deepEqual(
 | |
|       Array.from(engine.previousFailed).sort(),
 | |
|       BOGUS_RECORDS.sort()
 | |
|     );
 | |
| 
 | |
|     // Ensure the observer was notified
 | |
|     Assert.equal(observerData, engine.name);
 | |
|     Assert.equal(observerSubject.failed, BOGUS_RECORDS.length);
 | |
|     Assert.equal(observerSubject.newFailed, BOGUS_RECORDS.length);
 | |
| 
 | |
|     // Testing batching of failed item fetches.
 | |
|     // Try to sync again. Ensure that we split the request into chunks to avoid
 | |
|     // URI length limitations.
 | |
|     async function batchDownload(batchSize) {
 | |
|       count = 0;
 | |
|       uris = [];
 | |
|       engine.guidFetchBatchSize = batchSize;
 | |
|       await engine._processIncoming();
 | |
|       _("Tried again. Requests: " + count + "; URIs: " + JSON.stringify(uris));
 | |
|       return count;
 | |
|     }
 | |
| 
 | |
|     // There are 8 bad records, so this needs 3 fetches.
 | |
|     _("Test batching with ID batch size 3, normal mobile batch size.");
 | |
|     Assert.equal(await batchDownload(3), 3);
 | |
| 
 | |
|     // Since there the previous batch failed again, there should be
 | |
|     // no more records to fetch
 | |
|     _("Test that the second time a record failed to sync, gets ignored");
 | |
|     Assert.equal(await batchDownload(BOGUS_RECORDS.length), 0);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_processIncoming_decrypt_failed() {
 | |
|   _("Ensure that records failing to decrypt are either replaced or refetched.");
 | |
| 
 | |
|   // Some good and some bogus records. One doesn't contain valid JSON,
 | |
|   // the other will throw during decrypt.
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
|   collection._wbos.nojson = new ServerWBO("nojson", "This is invalid JSON");
 | |
|   collection._wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON");
 | |
|   collection._wbos.scotsman = new ServerWBO(
 | |
|     "scotsman",
 | |
|     encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
 | |
|   );
 | |
|   collection._wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!");
 | |
|   collection._wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!");
 | |
| 
 | |
|   // Patch the fake crypto service to throw on the record above.
 | |
|   Weave.Crypto._decrypt = Weave.Crypto.decrypt;
 | |
|   Weave.Crypto.decrypt = function (ciphertext) {
 | |
|     if (ciphertext == "Decrypt this!") {
 | |
|       throw new Error(
 | |
|         "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz."
 | |
|       );
 | |
|     }
 | |
|     return this._decrypt.apply(this, arguments);
 | |
|   };
 | |
| 
 | |
|   // Some broken records also exist locally.
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine.enabled = true;
 | |
|   engine._store.items = { nojson: "Valid JSON", nodecrypt: "Valid ciphertext" };
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
|   try {
 | |
|     // Confirm initial state
 | |
|     Assert.equal(engine.toFetch.size, 0);
 | |
|     Assert.equal(engine.previousFailed.size, 0);
 | |
| 
 | |
|     let observerSubject;
 | |
|     let observerData;
 | |
|     Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
 | |
|       Svc.Obs.remove("weave:engine:sync:applied", onApplied);
 | |
|       observerSubject = subject;
 | |
|       observerData = data;
 | |
|     });
 | |
| 
 | |
|     await engine.setLastSync(collection.wbo("nojson").modified - 1);
 | |
|     let ping = await sync_engine_and_validate_telem(engine, true);
 | |
|     Assert.equal(ping.engines[0].incoming.applied, 2);
 | |
|     Assert.equal(ping.engines[0].incoming.failed, 4);
 | |
|     console.log("incoming telem: ", ping.engines[0].incoming);
 | |
|     Assert.equal(
 | |
|       ping.engines[0].incoming.failedReasons[0].name,
 | |
|       "No ciphertext: nothing to decrypt?"
 | |
|     );
 | |
|     // There should be 4 of the same error
 | |
|     Assert.equal(ping.engines[0].incoming.failedReasons[0].count, 4);
 | |
| 
 | |
|     Assert.equal(engine.previousFailed.size, 4);
 | |
|     Assert.ok(engine.previousFailed.has("nojson"));
 | |
|     Assert.ok(engine.previousFailed.has("nojson2"));
 | |
|     Assert.ok(engine.previousFailed.has("nodecrypt"));
 | |
|     Assert.ok(engine.previousFailed.has("nodecrypt2"));
 | |
| 
 | |
|     // Ensure the observer was notified
 | |
|     Assert.equal(observerData, engine.name);
 | |
|     Assert.equal(observerSubject.applied, 2);
 | |
|     Assert.equal(observerSubject.failed, 4);
 | |
|     Assert.equal(observerSubject.failedReasons[0].count, 4);
 | |
|   } finally {
 | |
|     await promiseClean(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_uploadOutgoing_toEmptyServer() {
 | |
|   _("SyncEngine._uploadOutgoing uploads new records to server");
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO("flying");
 | |
|   collection._wbos.scotsman = new ServerWBO("scotsman");
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|     "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
 | |
|     "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store.items = {
 | |
|     flying: "LNER Class A3 4472",
 | |
|     scotsman: "Flying Scotsman",
 | |
|   };
 | |
|   // Mark one of these records as changed
 | |
|   await engine._tracker.addChangedID("scotsman", 0);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
 | |
| 
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     Assert.equal(collection.payload("scotsman"), undefined);
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._uploadOutgoing();
 | |
| 
 | |
|     // Ensure the marked record ('scotsman') has been uploaded and is
 | |
|     // no longer marked.
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     Assert.ok(!!collection.payload("scotsman"));
 | |
|     Assert.equal(collection.cleartext("scotsman").id, "scotsman");
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.scotsman, undefined);
 | |
| 
 | |
|     // The 'flying' record wasn't marked so it wasn't uploaded
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| async function test_uploadOutgoing_max_record_payload_bytes(
 | |
|   allowSkippedRecord
 | |
| ) {
 | |
|   _(
 | |
|     "SyncEngine._uploadOutgoing throws when payload is bigger than max_record_payload_bytes"
 | |
|   );
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO("flying");
 | |
|   collection._wbos.scotsman = new ServerWBO("scotsman");
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|     "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
 | |
|     "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine.allowSkippedRecord = allowSkippedRecord;
 | |
|   engine._store.items = { flying: "a".repeat(1024 * 1024), scotsman: "abcd" };
 | |
| 
 | |
|   await engine._tracker.addChangedID("flying", 1000);
 | |
|   await engine._tracker.addChangedID("scotsman", 1000);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     await engine.setLastSync(1); // needs to be non-zero so that tracker is queried
 | |
| 
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     Assert.equal(collection.payload("scotsman"), undefined);
 | |
| 
 | |
|     await engine._syncStartup();
 | |
|     await engine._uploadOutgoing();
 | |
| 
 | |
|     if (!allowSkippedRecord) {
 | |
|       do_throw("should not get here");
 | |
|     }
 | |
| 
 | |
|     await engine.trackRemainingChanges();
 | |
| 
 | |
|     // Check we uploaded the other record to the server
 | |
|     Assert.ok(collection.payload("scotsman"));
 | |
|     // And that we won't try to upload the huge record next time.
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, undefined);
 | |
|   } catch (e) {
 | |
|     if (allowSkippedRecord) {
 | |
|       do_throw("should not get here");
 | |
|     }
 | |
| 
 | |
|     await engine.trackRemainingChanges();
 | |
| 
 | |
|     // Check that we will try to upload the huge record next time
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, 1000);
 | |
|   } finally {
 | |
|     // Check we didn't upload the oversized record to the server
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| }
 | |
| 
 | |
| add_task(
 | |
|   async function test_uploadOutgoing_max_record_payload_bytes_disallowSkippedRecords() {
 | |
|     return test_uploadOutgoing_max_record_payload_bytes(false);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(
 | |
|   async function test_uploadOutgoing_max_record_payload_bytes_allowSkippedRecords() {
 | |
|     return test_uploadOutgoing_max_record_payload_bytes(true);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(async function test_uploadOutgoing_failed() {
 | |
|   _(
 | |
|     "SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload."
 | |
|   );
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   // We only define the "flying" WBO on the server, not the "scotsman"
 | |
|   // and "peppercorn" ones.
 | |
|   collection._wbos.flying = new ServerWBO("flying");
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine._store.items = {
 | |
|     flying: "LNER Class A3 4472",
 | |
|     scotsman: "Flying Scotsman",
 | |
|     peppercorn: "Peppercorn Class",
 | |
|   };
 | |
|   // Mark these records as changed
 | |
|   const FLYING_CHANGED = 12345;
 | |
|   const SCOTSMAN_CHANGED = 23456;
 | |
|   const PEPPERCORN_CHANGED = 34567;
 | |
|   await engine._tracker.addChangedID("flying", FLYING_CHANGED);
 | |
|   await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
 | |
|   await engine._tracker.addChangedID("peppercorn", PEPPERCORN_CHANGED);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
 | |
| 
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     let changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, FLYING_CHANGED);
 | |
|     Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
 | |
|     Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
 | |
| 
 | |
|     engine.enabled = true;
 | |
|     await sync_engine_and_validate_telem(engine, true);
 | |
| 
 | |
|     // Ensure the 'flying' record has been uploaded and is no longer marked.
 | |
|     Assert.ok(!!collection.payload("flying"));
 | |
|     changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, undefined);
 | |
| 
 | |
|     // The 'scotsman' and 'peppercorn' records couldn't be uploaded so
 | |
|     // they weren't cleared from the tracker.
 | |
|     Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
 | |
|     Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
 | |
|   } finally {
 | |
|     await promiseClean(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| async function createRecordFailTelemetry(allowSkippedRecord) {
 | |
|   Services.prefs.setStringPref("services.sync.username", "foo");
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO("flying");
 | |
|   collection._wbos.scotsman = new ServerWBO("scotsman");
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine.allowSkippedRecord = allowSkippedRecord;
 | |
|   let oldCreateRecord = engine._store.createRecord;
 | |
|   engine._store.createRecord = async (id, col) => {
 | |
|     if (id != "flying") {
 | |
|       throw new Error("oops");
 | |
|     }
 | |
|     return oldCreateRecord.call(engine._store, id, col);
 | |
|   };
 | |
|   engine._store.items = {
 | |
|     flying: "LNER Class A3 4472",
 | |
|     scotsman: "Flying Scotsman",
 | |
|   };
 | |
|   // Mark these records as changed
 | |
|   const FLYING_CHANGED = 12345;
 | |
|   const SCOTSMAN_CHANGED = 23456;
 | |
|   await engine._tracker.addChangedID("flying", FLYING_CHANGED);
 | |
|   await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   let ping;
 | |
|   try {
 | |
|     await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
 | |
| 
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     let changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, FLYING_CHANGED);
 | |
|     Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
 | |
| 
 | |
|     engine.enabled = true;
 | |
|     ping = await sync_engine_and_validate_telem(engine, true, onErrorPing => {
 | |
|       ping = onErrorPing;
 | |
|     });
 | |
| 
 | |
|     if (!allowSkippedRecord) {
 | |
|       do_throw("should not get here");
 | |
|     }
 | |
| 
 | |
|     // Ensure the 'flying' record has been uploaded and is no longer marked.
 | |
|     Assert.ok(!!collection.payload("flying"));
 | |
|     changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.flying, undefined);
 | |
|   } catch (err) {
 | |
|     if (allowSkippedRecord) {
 | |
|       do_throw("should not get here");
 | |
|     }
 | |
| 
 | |
|     // Ensure the 'flying' record has not been uploaded and is still marked
 | |
|     Assert.ok(!collection.payload("flying"));
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.ok(changes.flying);
 | |
|   } finally {
 | |
|     // We reported in telemetry that we failed a record
 | |
|     Assert.equal(ping.engines[0].outgoing[0].failed, 1);
 | |
|     Assert.equal(ping.engines[0].outgoing[0].failedReasons[0].name, "oops");
 | |
| 
 | |
|     // In any case, the 'scotsman' record couldn't be created so it wasn't
 | |
|     // uploaded nor it was not cleared from the tracker.
 | |
|     Assert.ok(!collection.payload("scotsman"));
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
 | |
| 
 | |
|     engine._store.createRecord = oldCreateRecord;
 | |
|     await promiseClean(engine, server);
 | |
|   }
 | |
| }
 | |
| 
 | |
| add_task(
 | |
|   async function test_uploadOutgoing_createRecord_throws_reported_telemetry() {
 | |
|     _(
 | |
|       "SyncEngine._uploadOutgoing reports a failed record to telemetry if createRecord throws"
 | |
|     );
 | |
|     await createRecordFailTelemetry(true);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(
 | |
|   async function test_uploadOutgoing_createRecord_throws_dontAllowSkipRecord() {
 | |
|     _(
 | |
|       "SyncEngine._uploadOutgoing will throw if createRecord throws and allowSkipRecord is set to false"
 | |
|     );
 | |
|     await createRecordFailTelemetry(false);
 | |
|   }
 | |
| );
 | |
| 
 | |
| add_task(async function test_uploadOutgoing_largeRecords() {
 | |
|   _(
 | |
|     "SyncEngine._uploadOutgoing throws on records larger than the max record payload size"
 | |
|   );
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   engine.allowSkippedRecord = false;
 | |
|   engine._store.items["large-item"] = "Y".repeat(
 | |
|     Service.getMaxRecordPayloadSize() * 2
 | |
|   );
 | |
|   await engine._tracker.addChangedID("large-item", 0);
 | |
|   collection.insert("large-item");
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   try {
 | |
|     await engine._syncStartup();
 | |
|     let error = null;
 | |
|     try {
 | |
|       await engine._uploadOutgoing();
 | |
|     } catch (e) {
 | |
|       error = e;
 | |
|     }
 | |
|     ok(!!error);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncFinish_deleteByIds() {
 | |
|   _(
 | |
|     "SyncEngine._syncFinish deletes server records slated for deletion (list of record IDs)."
 | |
|   );
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
|   collection._wbos.scotsman = new ServerWBO(
 | |
|     "scotsman",
 | |
|     encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
 | |
|   );
 | |
|   collection._wbos.rekolok = new ServerWBO(
 | |
|     "rekolok",
 | |
|     encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" })
 | |
|   );
 | |
| 
 | |
|   let server = httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     engine._delete = { ids: ["flying", "rekolok"] };
 | |
|     await engine._syncFinish();
 | |
| 
 | |
|     // The 'flying' and 'rekolok' records were deleted while the
 | |
|     // 'scotsman' one wasn't.
 | |
|     Assert.equal(collection.payload("flying"), undefined);
 | |
|     Assert.ok(!!collection.payload("scotsman"));
 | |
|     Assert.equal(collection.payload("rekolok"), undefined);
 | |
| 
 | |
|     // The deletion todo list has been reset.
 | |
|     Assert.equal(engine._delete.ids, undefined);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncFinish_deleteLotsInBatches() {
 | |
|   _(
 | |
|     "SyncEngine._syncFinish deletes server records in batches of 100 (list of record IDs)."
 | |
|   );
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
| 
 | |
|   // Let's count how many times the client does a DELETE request to the server
 | |
|   var noOfUploads = 0;
 | |
|   collection.delete = (function (orig) {
 | |
|     return function () {
 | |
|       noOfUploads++;
 | |
|       return orig.apply(this, arguments);
 | |
|     };
 | |
|   })(collection.delete);
 | |
| 
 | |
|   // Create a bunch of records on the server
 | |
|   let now = Date.now();
 | |
|   for (var i = 0; i < 234; i++) {
 | |
|     let id = "record-no-" + i;
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + i });
 | |
|     let wbo = new ServerWBO(id, payload);
 | |
|     wbo.modified = now / 1000 - 60 * (i + 110);
 | |
|     collection.insertWBO(wbo);
 | |
|   }
 | |
| 
 | |
|   let server = httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     // Confirm initial environment
 | |
|     Assert.equal(noOfUploads, 0);
 | |
| 
 | |
|     // Declare what we want to have deleted: all records no. 100 and
 | |
|     // up and all records that are less than 200 mins old (which are
 | |
|     // records 0 thru 90).
 | |
|     engine._delete = { ids: [], newer: now / 1000 - 60 * 200.5 };
 | |
|     for (i = 100; i < 234; i++) {
 | |
|       engine._delete.ids.push("record-no-" + i);
 | |
|     }
 | |
| 
 | |
|     await engine._syncFinish();
 | |
| 
 | |
|     // Ensure that the appropriate server data has been wiped while
 | |
|     // preserving records 90 thru 200.
 | |
|     for (i = 0; i < 234; i++) {
 | |
|       let id = "record-no-" + i;
 | |
|       if (i <= 90 || i >= 100) {
 | |
|         Assert.equal(collection.payload(id), undefined);
 | |
|       } else {
 | |
|         Assert.ok(!!collection.payload(id));
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     // The deletion was done in batches
 | |
|     Assert.equal(noOfUploads, 2 + 1);
 | |
| 
 | |
|     // The deletion todo list has been reset.
 | |
|     Assert.equal(engine._delete.ids, undefined);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_sync_partialUpload() {
 | |
|   _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
|   let oldServerConfiguration = Service.serverConfiguration;
 | |
|   Service.serverConfiguration = {
 | |
|     max_post_records: 100,
 | |
|   };
 | |
|   await SyncTestingInfrastructure(server);
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
| 
 | |
|   // Let the third upload fail completely
 | |
|   var noOfUploads = 0;
 | |
|   collection.post = (function (orig) {
 | |
|     return function () {
 | |
|       if (noOfUploads == 2) {
 | |
|         throw new Error("FAIL!");
 | |
|       }
 | |
|       noOfUploads++;
 | |
|       return orig.apply(this, arguments);
 | |
|     };
 | |
|   })(collection.post);
 | |
| 
 | |
|   // Create a bunch of records (and server side handlers)
 | |
|   for (let i = 0; i < 234; i++) {
 | |
|     let id = "record-no-" + i;
 | |
|     engine._store.items[id] = "Record No. " + i;
 | |
|     await engine._tracker.addChangedID(id, i);
 | |
|     // Let two items in the first upload batch fail.
 | |
|     if (i != 23 && i != 42) {
 | |
|       collection.insert(id);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   try {
 | |
|     await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
 | |
| 
 | |
|     engine.enabled = true;
 | |
|     let error;
 | |
|     try {
 | |
|       await sync_engine_and_validate_telem(engine, true);
 | |
|     } catch (ex) {
 | |
|       error = ex;
 | |
|     }
 | |
| 
 | |
|     ok(!!error);
 | |
| 
 | |
|     const changes = await engine._tracker.getChangedIDs();
 | |
|     for (let i = 0; i < 234; i++) {
 | |
|       let id = "record-no-" + i;
 | |
|       // Ensure failed records are back in the tracker:
 | |
|       // * records no. 23 and 42 were rejected by the server,
 | |
|       // * records after the third batch and higher couldn't be uploaded because
 | |
|       //   we failed hard on the 3rd upload.
 | |
|       if (i == 23 || i == 42 || i >= 200) {
 | |
|         Assert.equal(changes[id], i);
 | |
|       } else {
 | |
|         Assert.equal(false, id in changes);
 | |
|       }
 | |
|     }
 | |
|   } finally {
 | |
|     Service.serverConfiguration = oldServerConfiguration;
 | |
|     await promiseClean(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_canDecrypt_noCryptoKeys() {
 | |
|   _(
 | |
|     "SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection."
 | |
|   );
 | |
| 
 | |
|   // Wipe collection keys so we can test the desired scenario.
 | |
|   Service.collectionKeys.clear();
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     Assert.equal(false, await engine.canDecrypt());
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_canDecrypt_true() {
 | |
|   _(
 | |
|     "SyncEngine.canDecrypt returns true if the engine can decrypt the items on the server."
 | |
|   );
 | |
| 
 | |
|   await generateNewKeys(Service.collectionKeys);
 | |
| 
 | |
|   let collection = new ServerCollection();
 | |
|   collection._wbos.flying = new ServerWBO(
 | |
|     "flying",
 | |
|     encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
 | |
|   );
 | |
| 
 | |
|   let server = sync_httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
|   let engine = makeRotaryEngine();
 | |
|   try {
 | |
|     Assert.ok(await engine.canDecrypt());
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|   }
 | |
| });
 | |
| 
 | |
| add_task(async function test_syncapplied_observer() {
 | |
|   const NUMBER_OF_RECORDS = 10;
 | |
| 
 | |
|   let engine = makeRotaryEngine();
 | |
| 
 | |
|   // Create a batch of server side records.
 | |
|   let collection = new ServerCollection();
 | |
|   for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
 | |
|     let id = "record-no-" + i;
 | |
|     let payload = encryptPayload({ id, denomination: "Record No. " + id });
 | |
|     collection.insert(id, payload);
 | |
|   }
 | |
| 
 | |
|   let server = httpd_setup({
 | |
|     "/1.1/foo/storage/rotary": collection.handler(),
 | |
|   });
 | |
| 
 | |
|   await SyncTestingInfrastructure(server);
 | |
| 
 | |
|   let syncID = await engine.resetLocalSyncID();
 | |
|   let meta_global = Service.recordManager.set(
 | |
|     engine.metaURL,
 | |
|     new WBORecord(engine.metaURL)
 | |
|   );
 | |
|   meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
 | |
| 
 | |
|   let numApplyCalls = 0;
 | |
|   let engine_name;
 | |
|   let count;
 | |
|   function onApplied(subject, data) {
 | |
|     numApplyCalls++;
 | |
|     engine_name = data;
 | |
|     count = subject;
 | |
|   }
 | |
| 
 | |
|   Svc.Obs.add("weave:engine:sync:applied", onApplied);
 | |
| 
 | |
|   try {
 | |
|     Service.scheduler.hasIncomingItems = false;
 | |
| 
 | |
|     // Do sync.
 | |
|     await engine._syncStartup();
 | |
|     await engine._processIncoming();
 | |
| 
 | |
|     do_check_attribute_count(engine._store.items, 10);
 | |
| 
 | |
|     Assert.equal(numApplyCalls, 1);
 | |
|     Assert.equal(engine_name, "rotary");
 | |
|     Assert.equal(count.applied, 10);
 | |
| 
 | |
|     Assert.ok(Service.scheduler.hasIncomingItems);
 | |
|   } finally {
 | |
|     await cleanAndGo(engine, server);
 | |
|     Service.scheduler.hasIncomingItems = false;
 | |
|     Svc.Obs.remove("weave:engine:sync:applied", onApplied);
 | |
|   }
 | |
| });
 |