2013-03-26 06:14:26 +00:00
|
|
|
/*
|
|
|
|
***** BEGIN LICENSE BLOCK *****
|
|
|
|
|
|
|
|
Copyright © 2013 Center for History and New Media
|
|
|
|
George Mason University, Fairfax, Virginia, USA
|
|
|
|
http://zotero.org
|
|
|
|
|
|
|
|
This file is part of Zotero.
|
|
|
|
|
|
|
|
Zotero is free software: you can redistribute it and/or modify
|
|
|
|
it under the terms of the GNU Affero General Public License as published by
|
|
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
Zotero is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU Affero General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
|
|
along with Zotero. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
***** END LICENSE BLOCK *****
|
|
|
|
*/
|
|
|
|
|
2018-02-24 09:54:53 +00:00
|
|
|
var EXPORTED_SYMBOLS = ["ConcurrentCaller"];
|
2017-05-22 23:01:14 +00:00
|
|
|
|
2018-07-11 08:27:50 +00:00
|
|
|
if (!(typeof process === 'object' && process + '' === '[object process]')) {
|
2019-01-21 09:01:04 +00:00
|
|
|
// Components.utils.import('resource://zotero/require.js');
|
|
|
|
// Not using Cu.import here since we don't want the require module to be cached
|
|
|
|
// for includes within ZoteroPane or other code where we want the window instance available to modules.
|
|
|
|
Components.classes["@mozilla.org/moz/jssubscript-loader;1"]
|
|
|
|
.getService(Components.interfaces.mozIJSSubScriptLoader)
|
|
|
|
.loadSubScript('resource://zotero/require.js');
|
2018-07-11 08:27:50 +00:00
|
|
|
var Promise = require('resource://zotero/bluebird.js');
|
|
|
|
} else {
|
|
|
|
Promise = require('bluebird');
|
|
|
|
}
|
2013-03-26 06:14:26 +00:00
|
|
|
|
2013-08-21 05:03:10 +00:00
|
|
|
/**
|
2013-03-26 06:14:26 +00:00
|
|
|
* Call a fixed number of functions at once, queueing the rest until slots
|
|
|
|
* open and returning a promise for the final completion. The functions do
|
|
|
|
* not need to return promises, but they should if they have asynchronous
|
Async DB megacommit
Promise-based rewrite of most of the codebase, with asynchronous database and file access -- see https://github.com/zotero/zotero/issues/518 for details.
WARNING: This includes backwards-incompatible schema changes.
An incomplete list of other changes:
- Schema overhaul
- Replace main tables with new versions with updated schema
- Enable real foreign key support and remove previous triggers
- Don't use NULLs for local libraryID, which broke the UNIQUE index
preventing object key duplication. All code (Zotero and third-party)
using NULL for the local library will need to be updated to use 0
instead (already done for Zotero code)
- Add 'compatibility' DB version that can be incremented manually to break DB
compatibility with previous versions. 'userdata' upgrades will no longer
automatically break compatibility.
- Demote creators and tags from first-class objects to item properties
- New API syncing properties
- 'synced'/'version' properties to data objects
- 'etag' to groups
- 'version' to libraries
- Create Zotero.DataObject that other objects inherit from
- Consolidate data object loading into Zotero.DataObjects
- Change object reloading so that only the loaded and changed parts of objects are reloaded, instead of reloading all data from the database (with some exceptions, including item primary data)
- Items and collections now have .parentItem and .parentKey properties, replacing item.getSource() and item.getSourceKey()
- New function Zotero.serial(fn), to wrap an async function such that all calls are run serially
- New function Zotero.Utilities.Internal.forEachChunkAsync(arr, chunkSize, func)
- Add tag selector loading message
- Various API and name changes, since everything was breaking anyway
Known broken things:
- Syncing (will be completely rewritten for API syncing)
- Translation architecture (needs promise-based rewrite)
- Duplicates view
- DB integrity check (from schema changes)
- Dragging (may be difficult to fix)
Lots of other big and little things are certainly broken, particularly with the UI, which can be affected by async code in all sorts of subtle ways.
2014-08-06 21:38:05 +00:00
|
|
|
* work to perform.
|
2013-03-26 06:14:26 +00:00
|
|
|
*
|
|
|
|
* Example:
|
|
|
|
*
|
2015-10-29 06:50:29 +00:00
|
|
|
* var caller = new ConcurrentCaller({
|
|
|
|
* numConcurrent: 2,
|
|
|
|
* stopOnError: true
|
|
|
|
* });
|
|
|
|
* yield caller.start([foo, bar, baz, qux);
|
2013-03-26 06:14:26 +00:00
|
|
|
*
|
|
|
|
* In this example, foo and bar would run immediately, and baz and qux would
|
|
|
|
* be queued for later. When foo or bar finished, baz would be run, followed
|
|
|
|
* by qux when another slot opened.
|
|
|
|
*
|
2015-10-29 06:50:29 +00:00
|
|
|
* Additional functions can be added at any time with another call to start(). The promises for
|
|
|
|
* all open start() calls will be resolved when all requests are finished.
|
|
|
|
*
|
|
|
|
* @param {Object} options
|
|
|
|
* @param {Integer} options.numConcurrent - The number of concurrent functions to run.
|
|
|
|
* @param {String} [options.id] - Identifier to use in debug output
|
|
|
|
* @param {Boolean} [options.stopOnError]
|
|
|
|
* @param {Function} [options.onError]
|
|
|
|
* @param {Integer} [options.interval] - Interval between the end of one function run and the
|
|
|
|
* beginning of another, in milliseconds
|
|
|
|
* @param {Function} [options.logger]
|
2019-01-22 11:48:11 +00:00
|
|
|
* @param {Object} [options.Promise] The Zotero instance of Promise to allow
|
|
|
|
* stubbing/spying in tests
|
2013-03-26 06:14:26 +00:00
|
|
|
*/
|
2018-02-24 09:54:53 +00:00
|
|
|
var ConcurrentCaller = function (options = {}) {
|
2015-10-29 06:50:29 +00:00
|
|
|
if (typeof options == 'number') {
|
|
|
|
this._log("ConcurrentCaller now takes an object rather than a number");
|
|
|
|
options = {
|
|
|
|
numConcurrent: options
|
|
|
|
};
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
if (!options.numConcurrent) throw new Error("numConcurrent must be provided");
|
|
|
|
|
2019-01-22 11:48:11 +00:00
|
|
|
if (options.Promise) Promise = options.Promise;
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
this.stopOnError = options.stopOnError || false;
|
|
|
|
this.onError = options.onError || null;
|
2015-11-11 07:14:30 +00:00
|
|
|
this.numConcurrent = options.numConcurrent;
|
2013-03-26 06:14:26 +00:00
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
this._id = options.id;
|
2013-08-13 21:29:14 +00:00
|
|
|
this._numRunning = 0;
|
|
|
|
this._queue = [];
|
2015-10-29 06:50:29 +00:00
|
|
|
this._logger = options.logger || null;
|
|
|
|
this._interval = options.interval || 0;
|
|
|
|
this._pausing = false;
|
2013-11-26 06:12:15 +00:00
|
|
|
this._pauseUntil = 0;
|
2015-10-29 06:50:29 +00:00
|
|
|
this._deferred = null;
|
2013-03-26 06:14:26 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2013-08-13 21:29:14 +00:00
|
|
|
ConcurrentCaller.prototype.setInterval = function (ms) {
|
2015-10-29 06:50:29 +00:00
|
|
|
this._log("setInterval() is deprecated -- pass .interval to constructor");
|
2013-08-13 21:29:14 +00:00
|
|
|
this._interval = ms;
|
|
|
|
};
|
2013-03-26 06:14:26 +00:00
|
|
|
|
|
|
|
|
2013-08-13 21:29:14 +00:00
|
|
|
ConcurrentCaller.prototype.setLogger = function (func) {
|
2015-10-29 06:50:29 +00:00
|
|
|
this._log("setLogger() is deprecated -- pass .logger to constructor");
|
2013-08-13 21:29:14 +00:00
|
|
|
this._logger = func;
|
|
|
|
};
|
2013-03-26 06:14:26 +00:00
|
|
|
|
|
|
|
|
2013-11-26 06:12:15 +00:00
|
|
|
/**
|
|
|
|
* Don't run any new functions for the specified amount of time
|
|
|
|
*/
|
|
|
|
ConcurrentCaller.prototype.pause = function (ms) {
|
|
|
|
this._pauseUntil = Date.now() + ms;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2013-03-26 06:14:26 +00:00
|
|
|
/**
|
2015-10-29 06:50:29 +00:00
|
|
|
* Add a task to the queue without starting it
|
|
|
|
*
|
|
|
|
* @param {Function|Function[]} - One or more functions to run
|
2016-04-27 06:34:46 +00:00
|
|
|
* @return {Promise|Promise<PromiseInspection[]>} - If one function is passed, a promise for the return
|
|
|
|
* value of the passed function; if multiple, a promise for an array of PromiseInspection objects
|
|
|
|
* for those functions, resolved once they have all finished, even if other functions are still running
|
2013-03-26 06:14:26 +00:00
|
|
|
*/
|
2015-10-29 06:50:29 +00:00
|
|
|
ConcurrentCaller.prototype.add = function (func) {
|
2013-03-26 06:14:26 +00:00
|
|
|
if (Array.isArray(func)) {
|
2015-10-29 06:50:29 +00:00
|
|
|
let promises = [];
|
2015-01-28 22:28:38 +00:00
|
|
|
for (let i = 0; i < func.length; i++) {
|
2016-04-27 06:34:46 +00:00
|
|
|
promises.push(this.start(func[i]).reflect());
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
2016-04-27 06:34:46 +00:00
|
|
|
return Promise.all(promises);
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
if (!this._deferred || !this._deferred.promise.isPending()) {
|
|
|
|
this._deferred = Promise.defer();
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
var deferred = Promise.defer();
|
|
|
|
this._queue.push({
|
|
|
|
func: Promise.method(func),
|
|
|
|
deferred: deferred
|
|
|
|
});
|
2015-11-11 07:14:30 +00:00
|
|
|
return deferred.promise;
|
2015-10-29 06:50:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param {Function|Function[]} - One or more functions to run
|
2015-11-11 07:14:30 +00:00
|
|
|
* @return {Promise[]} - An array of promises for passed functions, resolved once they have all
|
|
|
|
* finished (even if other functions are still running)
|
2015-10-29 06:50:29 +00:00
|
|
|
*/
|
|
|
|
ConcurrentCaller.prototype.start = function (func) {
|
|
|
|
var promise = this.add(func);
|
|
|
|
var run = this._processNext();
|
|
|
|
if (!run) {
|
2015-11-11 07:14:30 +00:00
|
|
|
this._log("Already at " + this.numConcurrent + " -- queueing for later");
|
2015-10-29 06:50:29 +00:00
|
|
|
}
|
|
|
|
return promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Start processing if not already running and wait for all tasks to complete
|
|
|
|
*
|
|
|
|
* @return {Promise[]} - An array of promises for all currently queued tasks
|
|
|
|
*/
|
|
|
|
ConcurrentCaller.prototype.runAll = function () {
|
|
|
|
// If nothing queued, return immediately
|
|
|
|
if (!this._deferred) {
|
|
|
|
return Promise.resolve([]);
|
|
|
|
}
|
|
|
|
var promises = this._queue.map(x => x.deferred.promise);
|
|
|
|
do {
|
|
|
|
var run = this._processNext();
|
|
|
|
}
|
|
|
|
while (run);
|
|
|
|
return this._deferred.promise.return(promises);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Wait for all running tasks to complete
|
|
|
|
*
|
|
|
|
* @return {Promise}
|
|
|
|
*/
|
|
|
|
ConcurrentCaller.prototype.wait = function () {
|
|
|
|
return this._deferred ? this._deferred.promise : Promise.resolve();
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2013-08-13 21:29:14 +00:00
|
|
|
ConcurrentCaller.prototype.stop = function () {
|
2015-01-28 22:28:38 +00:00
|
|
|
this._log("Clearing queue");
|
|
|
|
this._queue = [];
|
2013-08-13 21:29:14 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
ConcurrentCaller.prototype._processNext = function () {
|
2015-11-11 07:14:30 +00:00
|
|
|
if (this._numRunning >= this.numConcurrent) {
|
2015-10-29 06:50:29 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2020-12-11 07:02:20 +00:00
|
|
|
var task = this._queue.shift();
|
|
|
|
|
|
|
|
// Nothing left to run
|
|
|
|
if (!task) {
|
2015-10-29 06:50:29 +00:00
|
|
|
if (this._numRunning == 0 && !this._pausing) {
|
|
|
|
this._log("All tasks are done");
|
|
|
|
this._deferred.resolve();
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
this._log("Nothing left to run -- waiting for running tasks to complete");
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2020-12-11 07:02:20 +00:00
|
|
|
var runFunc = async () => {
|
|
|
|
this._log("Running function ("
|
2015-11-11 07:14:30 +00:00
|
|
|
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
2015-10-29 06:50:29 +00:00
|
|
|
+ this._queue.length + " queued)");
|
Async DB megacommit
Promise-based rewrite of most of the codebase, with asynchronous database and file access -- see https://github.com/zotero/zotero/issues/518 for details.
WARNING: This includes backwards-incompatible schema changes.
An incomplete list of other changes:
- Schema overhaul
- Replace main tables with new versions with updated schema
- Enable real foreign key support and remove previous triggers
- Don't use NULLs for local libraryID, which broke the UNIQUE index
preventing object key duplication. All code (Zotero and third-party)
using NULL for the local library will need to be updated to use 0
instead (already done for Zotero code)
- Add 'compatibility' DB version that can be incremented manually to break DB
compatibility with previous versions. 'userdata' upgrades will no longer
automatically break compatibility.
- Demote creators and tags from first-class objects to item properties
- New API syncing properties
- 'synced'/'version' properties to data objects
- 'etag' to groups
- 'version' to libraries
- Create Zotero.DataObject that other objects inherit from
- Consolidate data object loading into Zotero.DataObjects
- Change object reloading so that only the loaded and changed parts of objects are reloaded, instead of reloading all data from the database (with some exceptions, including item primary data)
- Items and collections now have .parentItem and .parentKey properties, replacing item.getSource() and item.getSourceKey()
- New function Zotero.serial(fn), to wrap an async function such that all calls are run serially
- New function Zotero.Utilities.Internal.forEachChunkAsync(arr, chunkSize, func)
- Add tag selector loading message
- Various API and name changes, since everything was breaking anyway
Known broken things:
- Syncing (will be completely rewritten for API syncing)
- Translation architecture (needs promise-based rewrite)
- Duplicates view
- DB integrity check (from schema changes)
- Dragging (may be difficult to fix)
Lots of other big and little things are certainly broken, particularly with the UI, which can be affected by async code in all sorts of subtle ways.
2014-08-06 21:38:05 +00:00
|
|
|
|
2020-12-11 07:02:20 +00:00
|
|
|
try {
|
|
|
|
let value = await task.func();
|
|
|
|
this._numRunning--;
|
|
|
|
|
|
|
|
this._log("Done with function ("
|
|
|
|
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
|
|
|
+ this._queue.length + " queued)");
|
|
|
|
|
|
|
|
task.deferred.resolve(value);
|
Async DB megacommit
Promise-based rewrite of most of the codebase, with asynchronous database and file access -- see https://github.com/zotero/zotero/issues/518 for details.
WARNING: This includes backwards-incompatible schema changes.
An incomplete list of other changes:
- Schema overhaul
- Replace main tables with new versions with updated schema
- Enable real foreign key support and remove previous triggers
- Don't use NULLs for local libraryID, which broke the UNIQUE index
preventing object key duplication. All code (Zotero and third-party)
using NULL for the local library will need to be updated to use 0
instead (already done for Zotero code)
- Add 'compatibility' DB version that can be incremented manually to break DB
compatibility with previous versions. 'userdata' upgrades will no longer
automatically break compatibility.
- Demote creators and tags from first-class objects to item properties
- New API syncing properties
- 'synced'/'version' properties to data objects
- 'etag' to groups
- 'version' to libraries
- Create Zotero.DataObject that other objects inherit from
- Consolidate data object loading into Zotero.DataObjects
- Change object reloading so that only the loaded and changed parts of objects are reloaded, instead of reloading all data from the database (with some exceptions, including item primary data)
- Items and collections now have .parentItem and .parentKey properties, replacing item.getSource() and item.getSourceKey()
- New function Zotero.serial(fn), to wrap an async function such that all calls are run serially
- New function Zotero.Utilities.Internal.forEachChunkAsync(arr, chunkSize, func)
- Add tag selector loading message
- Various API and name changes, since everything was breaking anyway
Known broken things:
- Syncing (will be completely rewritten for API syncing)
- Translation architecture (needs promise-based rewrite)
- Duplicates view
- DB integrity check (from schema changes)
- Dragging (may be difficult to fix)
Lots of other big and little things are certainly broken, particularly with the UI, which can be affected by async code in all sorts of subtle ways.
2014-08-06 21:38:05 +00:00
|
|
|
}
|
2020-12-11 07:02:20 +00:00
|
|
|
catch (e) {
|
|
|
|
this._numRunning--;
|
|
|
|
|
|
|
|
this._log("Error in function (" + this._numRunning + "/" + this.numConcurrent + ", "
|
|
|
|
+ this._queue.length + " in queue)"
|
|
|
|
+ ((!this.onError && !this.stopOnError) ? ": " + e : ""));
|
|
|
|
|
|
|
|
if (this.onError) {
|
|
|
|
this.onError(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (this.stopOnError && this._queue.length) {
|
|
|
|
this._log("Stopping on error: " + e);
|
|
|
|
this._oldQueue = this._queue;
|
|
|
|
this._queue = [];
|
|
|
|
for (let o of this._oldQueue) {
|
|
|
|
//this._log("Rejecting promise");
|
|
|
|
o.deferred.reject();
|
|
|
|
}
|
2015-11-11 07:14:30 +00:00
|
|
|
}
|
2020-12-11 07:02:20 +00:00
|
|
|
|
|
|
|
e.handledRejection = true;
|
|
|
|
task.deferred.reject(e);
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
2020-12-11 07:02:20 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
if (this._getIntervalNeeded() > 0) {
|
|
|
|
this._waitForPause()
|
|
|
|
.then(runFunc)
|
|
|
|
.then(() => this._processNext());
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
runFunc()
|
|
|
|
.then(() => this._processNext());
|
|
|
|
}
|
|
|
|
|
|
|
|
this._numRunning++;
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-12-11 07:02:20 +00:00
|
|
|
ConcurrentCaller.prototype._getIntervalNeeded = function () {
|
|
|
|
var interval = this._interval;
|
|
|
|
var now = Date.now();
|
|
|
|
if (this._pauseUntil > now && (this._pauseUntil - now > interval)) {
|
|
|
|
interval = this._pauseUntil - now
|
|
|
|
}
|
|
|
|
return interval;
|
|
|
|
};
|
|
|
|
|
|
|
|
|
2015-10-29 06:50:29 +00:00
|
|
|
/**
|
|
|
|
* Wait until the specified interval has elapsed or the current pause (if there is one) is over,
|
|
|
|
* whichever is longer
|
|
|
|
*/
|
|
|
|
ConcurrentCaller.prototype._waitForPause = Promise.coroutine(function* () {
|
2020-12-11 07:02:20 +00:00
|
|
|
var interval = this._getIntervalNeeded();
|
|
|
|
if (interval == 0) return;
|
2015-10-29 06:50:29 +00:00
|
|
|
this._pausing = true;
|
|
|
|
yield Promise.delay(interval);
|
|
|
|
this._pausing = false;
|
|
|
|
});
|
2013-03-26 06:14:26 +00:00
|
|
|
|
|
|
|
ConcurrentCaller.prototype._log = function (msg) {
|
2013-08-13 21:29:14 +00:00
|
|
|
if (this._logger) {
|
2015-10-29 06:50:29 +00:00
|
|
|
this._logger("[ConcurrentCaller] " + (this._id ? `[${this._id}] ` : "") + msg);
|
2013-03-26 06:14:26 +00:00
|
|
|
}
|
2013-08-13 21:29:14 +00:00
|
|
|
};
|
2018-07-11 08:27:50 +00:00
|
|
|
|
|
|
|
if (typeof process === 'object' && process + '' === '[object process]'){
|
|
|
|
module.exports = ConcurrentCaller;
|
|
|
|
}
|