From 7fbfdce00e2ffd1a9a51e2b76eb78a43aefbe4a3 Mon Sep 17 00:00:00 2001 From: Dan Stillman Date: Tue, 26 Mar 2013 02:14:26 -0400 Subject: [PATCH] ConcurrentCaller -- run a fixed number of concurrent async functions If the functions return promises, the caller will wait until they're fulfilled and slots open to start new functions. --- resource/concurrent-caller.js | 155 ++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 resource/concurrent-caller.js diff --git a/resource/concurrent-caller.js b/resource/concurrent-caller.js new file mode 100644 index 0000000000..f4b8fccc25 --- /dev/null +++ b/resource/concurrent-caller.js @@ -0,0 +1,155 @@ +/* + ***** BEGIN LICENSE BLOCK ***** + + Copyright © 2013 Center for History and New Media + George Mason University, Fairfax, Virginia, USA + http://zotero.org + + This file is part of Zotero. + + Zotero is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Zotero is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with Zotero. If not, see . + + ***** END LICENSE BLOCK ***** +*/ + +EXPORTED_SYMBOLS = ["ConcurrentCaller"]; +Components.utils.import("resource://zotero/q.js"); + +/** + * Call a fixed number of functions at once, queueing the rest until slots + * open and returning a promise for the final completion. The functions do + * not need to return promises, but they should if they have asynchronous + * work to perform.. + * + * Example: + * + * var caller = new ConcurrentCaller(2); + * caller.stopOnError = true; + * caller.fcall([foo, bar, baz, qux).done(); + * + * In this example, foo and bar would run immediately, and baz and qux would + * be queued for later. When foo or bar finished, baz would be run, followed + * by qux when another slot opened. + * + * @param {Integer} numConcurrent The number of concurrent functions to run. + */ +ConcurrentCaller = function (numConcurrent) { + if (typeof numConcurrent == 'undefined') { + throw new Error("numConcurrent not provided"); + } + + this.stopOnError = false; + + this.numConcurrent = numConcurrent; + this.numRunning = 0; + this.queue = []; + this.logger = null; + this.errorLogger = null; +}; + + +ConcurrentCaller.prototype.setLogger = function (func) { + this.logger = func; +} + + +ConcurrentCaller.prototype.setErrorLogger = function (func) { + this.errorLogger = func; +} + + +/** + * @param {Function[]|Function} func One or more functions to run + */ +ConcurrentCaller.prototype.fcall = function (func) { + if (Array.isArray(func)) { + var promises = []; + for (var i in func) { + //this._log("Running fcall on function"); + promises.push(this.fcall(func[i])); + } + return this.stopOnError ? Q.all(promises) : Q.allResolved(promises); + } + + // If we're at the maximum number of concurrent functions, + // queue this function for later + if (this.numRunning == this.numConcurrent) { + this._log("Already at " + this.numConcurrent + " -- queueing for later"); + var deferred = Q.defer(); + this.queue.push({ + func: Q.fbind(func), + deferred: deferred + }); + return deferred.promise; + } + + this._log("Running function (" + this.numRunning + " current < " + this.numConcurrent + " max)"); + + // Otherwise run it now + this.numRunning++; + return this._onFunctionDone(Q.fcall(func)); +} + + +ConcurrentCaller.prototype._onFunctionDone = function (promise) { + var self = this; + return Q.when( + promise, + function (promise) { + self.numRunning--; + + self._log("Done with function (" + + self.numRunning + "/" + self.numConcurrent + " running, " + + self.queue.length + " queued)"); + + // If there's a function to call and we're under the concurrent limit, + // run it now + let f = self.queue.shift(); + if (f && self.numRunning < self.numConcurrent) { + Q.delay(1) + .then(function () { + self.numRunning++; + var p = self._onFunctionDone(f.func()); + f.deferred.resolve(p); + }); + } + + return promise; + }, + function (e) { + if (self.errorLogger) { + self.errorLogger(e); + } + + self.numRunning--; + + self._log("Done with function (" + self.numRunning + "/" + self.numConcurrent + ", " + + self.queue.length + " in queue)"); + + if (self.stopOnError && self.queue.length) { + self._log("Stopping on error: " + e); + self.queue = []; + } + + throw e; + } + ); +} + + +ConcurrentCaller.prototype._log = function (msg) { + if (this.logger) { + this.logger(msg); + } +}