Overhaul object downloading/processing during data syncs

Previously, objects were first downloaded and saved to the sync cache,
which was then processed separately to create/update local objects. This
meant that a server bug could result in invalid data in the sync cache
that would never be processed. Now, objects are saved as they're
downloaded and only added to the sync cache after being successfully
saved. The keys of objects that fail are added to a queue, and those
objects are refetched and retried on a backoff schedule or when a new
client version is installed (in case of a client bug or a client with
outdated data model support).

An alternative would be to save to the sync cache first and evict
objects that fail and add them to the queue, but that requires more
complicated logic, and it probably makes more sense just to buffer a few
downloads ahead so that processing is never waiting for downloads to
finish.
This commit is contained in:
Dan Stillman 2016-02-29 04:23:00 -05:00
parent 6ac35c75c1
commit a1ce85decb
17 changed files with 1251 additions and 485 deletions

View file

@ -89,10 +89,10 @@ describe("Zotero.Sync.Data.Local", function() {
})
describe("#processSyncCacheForObjectType()", function () {
describe("#processObjectsFromJSON()", function () {
var types = Zotero.DataObjectUtilities.getTypes();
before(function* () {
beforeEach(function* () {
yield resetDB({
thisArg: this,
skipBundledFiles: true
@ -113,11 +113,8 @@ describe("Zotero.Sync.Data.Local", function() {
version: 10,
data: data
};
yield Zotero.Sync.Data.Local.saveCacheObjects(
type, libraryID, [json]
);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, [json], { stopOnError: true }
);
let localObj = objectsClass.getByLibraryAndKey(libraryID, obj.key);
assert.equal(localObj.version, 10);
@ -148,12 +145,8 @@ describe("Zotero.Sync.Data.Local", function() {
version: 10,
data: data
};
yield Zotero.Sync.Data.Local.saveCacheObjects(
type, libraryID, [json]
);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, [json], { stopOnError: true }
);
assert.equal(obj.version, 10);
assert.equal(obj.getField('title'), changedTitle);
@ -164,23 +157,19 @@ describe("Zotero.Sync.Data.Local", function() {
var libraryID = Zotero.Libraries.userLibraryID;
for (let type of types) {
let objectsClass = Zotero.DataObjectUtilities.getObjectsClassForObjectType(type);
// Save original version
let obj = yield createDataObject(type, { version: 5 });
let data = obj.toJSON();
yield Zotero.Sync.Data.Local.saveCacheObjects(
type, libraryID, [data]
);
}
for (let type of types) {
let objectsClass = Zotero.DataObjectUtilities.getObjectsClassForObjectType(type);
let obj = yield createDataObject(type, { version: 10 });
let data = obj.toJSON();
yield Zotero.Sync.Data.Local.saveCacheObjects(
type, libraryID, [data]
);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
// Save newer version
data.version = 10;
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, [data], { stopOnError: true }
);
let localObj = objectsClass.getByLibraryAndKey(libraryID, obj.key);
@ -195,7 +184,36 @@ describe("Zotero.Sync.Data.Local", function() {
"should have only latest version of " + type + " in cache"
);
}
})
});
it("should delete object from sync queue after processing", function* () {
var objectType = 'item';
var libraryID = Zotero.Libraries.userLibraryID;
var key = Zotero.DataObjectUtilities.generateKey();
yield Zotero.Sync.Data.Local.addObjectsToSyncQueue(objectType, libraryID, [key]);
var versions = yield Zotero.Sync.Data.Local.getObjectsFromSyncQueue(objectType, libraryID);
assert.include(versions, key);
var json = {
key,
version: 10,
data: {
key,
version: 10,
itemType: "book",
title: "Test"
}
};
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
objectType, libraryID, [json], { stopOnError: true }
);
var versions = yield Zotero.Sync.Data.Local.getObjectsFromSyncQueue(objectType, libraryID);
assert.notInclude(versions, key);
});
it("should mark new attachment items for download", function* () {
var libraryID = Zotero.Libraries.userLibraryID;
@ -216,11 +234,8 @@ describe("Zotero.Sync.Data.Local", function() {
}
};
yield Zotero.Sync.Data.Local.saveCacheObjects(
'item', Zotero.Libraries.userLibraryID, [json]
);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, 'item', { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
'item', libraryID, [json], { stopOnError: true }
);
var item = Zotero.Items.getByLibraryAndKey(libraryID, key);
assert.equal(item.attachmentSyncState, Zotero.Sync.Storage.Local.SYNC_STATE_TO_DOWNLOAD);
@ -247,12 +262,8 @@ describe("Zotero.Sync.Data.Local", function() {
json.data.version = 10;
json.data.md5 = '57f8a4fda823187b91e1191487b87fe6';
json.data.mtime = new Date().getTime() + 10000;
yield Zotero.Sync.Data.Local.saveCacheObjects(
'item', Zotero.Libraries.userLibraryID, [json]
);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, 'item', { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
'item', libraryID, [json], { stopOnError: true }
);
assert.equal(item.attachmentSyncState, Zotero.Sync.Storage.Local.SYNC_STATE_TO_DOWNLOAD);
@ -284,17 +295,188 @@ describe("Zotero.Sync.Data.Local", function() {
json.data.version = 10;
json.data.md5 = '57f8a4fda823187b91e1191487b87fe6';
json.data.mtime = new Date().getTime() + 10000;
yield Zotero.Sync.Data.Local.saveCacheObjects('item', libraryID, [json]);
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, 'item', { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
'item', libraryID, [json], { stopOnError: true }
);
assert.equal(item.getField('title'), newTitle);
assert.equal(item.attachmentSyncState, Zotero.Sync.Storage.Local.SYNC_STATE_TO_DOWNLOAD);
})
it("should roll back partial object changes on error", function* () {
var libraryID = Zotero.Libraries.publicationsLibraryID;
var key1 = "AAAAAAAA";
var key2 = "BBBBBBBB";
var json = [
{
key: key1,
version: 1,
data: {
key: key1,
version: 1,
itemType: "book",
title: "Test A"
}
},
{
key: key2,
version: 1,
data: {
key: key2,
version: 1,
itemType: "journalArticle",
title: "Test B",
deleted: true // Not allowed in My Publications
}
}
];
yield Zotero.Sync.Data.Local.processObjectsFromJSON('item', libraryID, json);
// Shouldn't roll back the successful item
yield assert.eventually.equal(Zotero.DB.valueQueryAsync(
"SELECT COUNT(*) FROM items WHERE libraryID=? AND key=?", [libraryID, key1]
), 1);
// Should rollback the unsuccessful item
yield assert.eventually.equal(Zotero.DB.valueQueryAsync(
"SELECT COUNT(*) FROM items WHERE libraryID=? AND key=?", [libraryID, key2]
), 0);
});
})
describe("Sync Queue", function () {
var lib1, lib2;
before(function* () {
lib1 = Zotero.Libraries.userLibraryID;
lib2 = Zotero.Libraries.publicationsLibraryID;
});
beforeEach(function* () {
yield Zotero.DB.queryAsync("DELETE FROM syncQueue");
});
after(function* () {
yield Zotero.DB.queryAsync("DELETE FROM syncQueue");
});
describe("#addObjectsToSyncQueue()", function () {
it("should add new objects and update lastCheck and tries for existing objects", function* () {
var objectType = 'item';
var syncObjectTypeID = Zotero.Sync.Data.Utilities.getSyncObjectTypeID(objectType);
var now = Zotero.Date.getUnixTimestamp();
var key1 = Zotero.DataObjectUtilities.generateKey();
var key2 = Zotero.DataObjectUtilities.generateKey();
var key3 = Zotero.DataObjectUtilities.generateKey();
var key4 = Zotero.DataObjectUtilities.generateKey();
yield Zotero.DB.queryAsync(
"INSERT INTO syncQueue (libraryID, key, syncObjectTypeID, lastCheck, tries) "
+ "VALUES (?, ?, ?, ?, ?), (?, ?, ?, ?, ?), (?, ?, ?, ?, ?)",
[
lib1, key1, syncObjectTypeID, now - 3700, 0,
lib1, key2, syncObjectTypeID, now - 7000, 1,
lib2, key3, syncObjectTypeID, now - 86400, 2
]
);
yield Zotero.Sync.Data.Local.addObjectsToSyncQueue(objectType, lib1, [key1, key2]);
yield Zotero.Sync.Data.Local.addObjectsToSyncQueue(objectType, lib2, [key4]);
var sql = "SELECT lastCheck, tries FROM syncQueue WHERE libraryID=? "
+ `AND syncObjectTypeID=${syncObjectTypeID} AND key=?`;
var row;
// key1
row = yield Zotero.DB.rowQueryAsync(sql, [lib1, key1]);
assert.approximately(row.lastCheck, now, 1);
assert.equal(row.tries, 1);
// key2
row = yield Zotero.DB.rowQueryAsync(sql, [lib1, key2]);
assert.approximately(row.lastCheck, now, 1);
assert.equal(row.tries, 2);
// key3
row = yield Zotero.DB.rowQueryAsync(sql, [lib2, key3]);
assert.equal(row.lastCheck, now - 86400);
assert.equal(row.tries, 2);
// key4
row = yield Zotero.DB.rowQueryAsync(sql, [lib2, key4]);
assert.approximately(row.lastCheck, now, 1);
assert.equal(row.tries, 0);
});
});
describe("#getObjectsToTryFromSyncQueue()", function () {
it("should get objects that should be retried", function* () {
var objectType = 'item';
var syncObjectTypeID = Zotero.Sync.Data.Utilities.getSyncObjectTypeID(objectType);
var now = Zotero.Date.getUnixTimestamp();
var key1 = Zotero.DataObjectUtilities.generateKey();
var key2 = Zotero.DataObjectUtilities.generateKey();
var key3 = Zotero.DataObjectUtilities.generateKey();
var key4 = Zotero.DataObjectUtilities.generateKey();
yield Zotero.DB.queryAsync(
"INSERT INTO syncQueue (libraryID, key, syncObjectTypeID, lastCheck, tries) "
+ "VALUES (?, ?, ?, ?, ?), (?, ?, ?, ?, ?), (?, ?, ?, ?, ?)",
[
lib1, key1, syncObjectTypeID, now - (30 * 60) - 10, 0, // more than half an hour, so should be retried
lib1, key2, syncObjectTypeID, now - (16 * 60 * 60) + 10, 4, // less than 16 hours, shouldn't be retried
lib2, key3, syncObjectTypeID, now - 86400 * 7, 20 // more than 64 hours, so should be retried
]
);
var keys = yield Zotero.Sync.Data.Local.getObjectsToTryFromSyncQueue('item', lib1);
assert.sameMembers(keys, [key1]);
var keys = yield Zotero.Sync.Data.Local.getObjectsToTryFromSyncQueue('item', lib2);
assert.sameMembers(keys, [key3]);
});
});
describe("#removeObjectsFromSyncQueue()", function () {
it("should remove objects from the sync queue", function* () {
var objectType = 'item';
var syncObjectTypeID = Zotero.Sync.Data.Utilities.getSyncObjectTypeID(objectType);
var now = Zotero.Date.getUnixTimestamp();
var key1 = Zotero.DataObjectUtilities.generateKey();
var key2 = Zotero.DataObjectUtilities.generateKey();
var key3 = Zotero.DataObjectUtilities.generateKey();
yield Zotero.DB.queryAsync(
"INSERT INTO syncQueue (libraryID, key, syncObjectTypeID, lastCheck, tries) "
+ "VALUES (?, ?, ?, ?, ?), (?, ?, ?, ?, ?), (?, ?, ?, ?, ?)",
[
lib1, key1, syncObjectTypeID, now, 0,
lib1, key2, syncObjectTypeID, now, 4,
lib2, key3, syncObjectTypeID, now, 20
]
);
yield Zotero.Sync.Data.Local.removeObjectsFromSyncQueue('item', lib1, [key1]);
var sql = "SELECT COUNT(*) FROM syncQueue WHERE libraryID=? "
+ `AND syncObjectTypeID=${syncObjectTypeID} AND key=?`;
assert.notOk(yield Zotero.DB.valueQueryAsync(sql, [lib1, key1]));
assert.ok(yield Zotero.DB.valueQueryAsync(sql, [lib1, key2]));
assert.ok(yield Zotero.DB.valueQueryAsync(sql, [lib2, key3]));
})
});
describe("#resetSyncQueueTries", function () {
var spy;
after(function () {
if (spy) {
spy.restore();
}
})
it("should be run on version upgrade", function* () {
var sql = "REPLACE INTO settings (setting, key, value) VALUES ('client', 'lastVersion', ?)";
yield Zotero.DB.queryAsync(sql, "5.0foo");
spy = sinon.spy(Zotero.Sync.Data.Local, "resetSyncQueueTries");
yield Zotero.Schema.updateSchema();
assert.ok(spy.called);
});
});
});
describe("Conflict Resolution", function () {
beforeEach(function* () {
yield Zotero.DB.queryAsync("DELETE FROM syncCache");
@ -311,6 +493,7 @@ describe("Zotero.Sync.Data.Local", function() {
var objects = [];
var values = [];
var dateAdded = Date.now() - 86400000;
var downloadedJSON = [];
for (let i = 0; i < 2; i++) {
values.push({
left: {},
@ -339,14 +522,15 @@ describe("Zotero.Sync.Data.Local", function() {
};
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
// Create new version in cache, simulating a download
json.version = jsonData.version = 15;
// Create updated JSON, simulating a download
values[i].right.title = jsonData.title = Zotero.Utilities.randomString();
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
values[i].right.version = json.version = jsonData.version = 15;
downloadedJSON.push(json);
// Modify object locally
yield modifyDataObject(obj, undefined, { skipDateModifiedUpdate: true });
values[i].left.title = obj.getField('title');
values[i].left.version = obj.getField('version');
}
waitForWindow('chrome://zotero/content/merge.xul', function (dialog) {
@ -373,12 +557,14 @@ describe("Zotero.Sync.Data.Local", function() {
}
wizard.getButton('finish').click();
})
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, downloadedJSON, { stopOnError: true }
);
assert.equal(objects[0].getField('title'), values[0].right.title);
assert.equal(objects[1].getField('title'), values[1].left.title);
assert.equal(objects[0].getField('version'), values[0].right.version);
assert.equal(objects[1].getField('version'), values[1].left.version);
})
it("should resolve all remaining conflicts with one side", function* () {
@ -388,6 +574,7 @@ describe("Zotero.Sync.Data.Local", function() {
var objects = [];
var values = [];
var downloadedJSON = [];
var dateAdded = Date.now() - 86400000;
for (let i = 0; i < 3; i++) {
values.push({
@ -418,13 +605,15 @@ describe("Zotero.Sync.Data.Local", function() {
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
// Create new version in cache, simulating a download
json.version = jsonData.version = 15;
values[i].right.title = jsonData.title = Zotero.Utilities.randomString();
values[i].right.version = json.version = jsonData.version = 15;
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
downloadedJSON.push(json);
// Modify object locally
yield modifyDataObject(obj, undefined, { skipDateModifiedUpdate: true });
values[i].left.title = obj.getField('title');
values[i].left.version = obj.getField('version');
}
waitForWindow('chrome://zotero/content/merge.xul', function (dialog) {
@ -460,13 +649,16 @@ describe("Zotero.Sync.Data.Local", function() {
}
wizard.getButton('finish').click();
})
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, downloadedJSON, { stopOnError: true }
);
assert.equal(objects[0].getField('title'), values[0].right.title);
assert.equal(objects[0].getField('version'), values[0].right.version);
assert.equal(objects[1].getField('title'), values[1].left.title);
assert.equal(objects[1].getField('version'), values[1].left.version);
assert.equal(objects[2].getField('title'), values[2].left.title);
assert.equal(objects[2].getField('version'), values[2].left.version);
})
it("should handle local item deletion, keeping deletion", function* () {
@ -475,6 +667,8 @@ describe("Zotero.Sync.Data.Local", function() {
var type = 'item';
var objectsClass = Zotero.DataObjectUtilities.getObjectsClassForObjectType(type);
var downloadedJSON = [];
// Create object, generate JSON, and delete
var obj = yield createDataObject(type, { version: 10 });
var jsonData = obj.toJSON();
@ -491,7 +685,7 @@ describe("Zotero.Sync.Data.Local", function() {
// Create new version in cache, simulating a download
json.version = jsonData.version = 15;
jsonData.title = Zotero.Utilities.randomString();
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
downloadedJSON.push(json);
var windowOpened = false;
waitForWindow('chrome://zotero/content/merge.xul', function (dialog) {
@ -508,8 +702,8 @@ describe("Zotero.Sync.Data.Local", function() {
mergeGroup.leftpane.pane.click();
wizard.getButton('finish').click();
})
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, downloadedJSON, { stopOnError: true }
);
assert.isTrue(windowOpened);
@ -523,6 +717,8 @@ describe("Zotero.Sync.Data.Local", function() {
var type = 'item';
var objectsClass = Zotero.DataObjectUtilities.getObjectsClassForObjectType(type);
var downloadedJSON = [];
// Create object, generate JSON, and delete
var obj = yield createDataObject(type, { version: 10 });
var jsonData = obj.toJSON();
@ -538,7 +734,7 @@ describe("Zotero.Sync.Data.Local", function() {
// Create new version in cache, simulating a download
json.version = jsonData.version = 15;
jsonData.title = Zotero.Utilities.randomString();
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
downloadedJSON.push(json);
waitForWindow('chrome://zotero/content/merge.xul', function (dialog) {
var doc = dialog.document;
@ -551,8 +747,8 @@ describe("Zotero.Sync.Data.Local", function() {
assert.equal(mergeGroup.rightpane.getAttribute('selected'), 'true');
wizard.getButton('finish').click();
})
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, downloadedJSON, { stopOnError: true }
);
obj = objectsClass.getByLibraryAndKey(libraryID, key);
@ -562,9 +758,9 @@ describe("Zotero.Sync.Data.Local", function() {
it("should handle note conflict", function* () {
var libraryID = Zotero.Libraries.userLibraryID;
var type = 'item';
var objectsClass = Zotero.DataObjectUtilities.getObjectsClassForObjectType(type);
var downloadedJSON = [];
var noteText1 = "<p>A</p>";
var noteText2 = "<p>B</p>";
@ -586,7 +782,7 @@ describe("Zotero.Sync.Data.Local", function() {
// Create new version in cache, simulating a download
json.version = jsonData.version = 15;
json.data.note = noteText2;
yield Zotero.Sync.Data.Local.saveCacheObjects(type, libraryID, [json]);
downloadedJSON.push(json);
// Delete object locally
obj.setNote(noteText1);
@ -600,8 +796,8 @@ describe("Zotero.Sync.Data.Local", function() {
assert.equal(mergeGroup.rightpane.getAttribute('selected'), 'true');
wizard.getButton('finish').click();
})
yield Zotero.Sync.Data.Local.processSyncCacheForObjectType(
libraryID, type, { stopOnError: true }
yield Zotero.Sync.Data.Local.processObjectsFromJSON(
type, libraryID, downloadedJSON, { stopOnError: true }
);
obj = objectsClass.getByLibraryAndKey(libraryID, key);