ConcurrentCaller updates

This commit is contained in:
Dan Stillman 2013-08-13 17:29:14 -04:00
parent 3a7042e527
commit 51713c8030

View file

@ -50,23 +50,28 @@ ConcurrentCaller = function (numConcurrent) {
} }
this.stopOnError = false; this.stopOnError = false;
this.onError = null;
this.numConcurrent = numConcurrent; this._numConcurrent = numConcurrent;
this.numRunning = 0; this._numRunning = 0;
this.queue = []; this._queue = [];
this.logger = null; this._logger = null;
this.errorLogger = null; this._interval = 0;
};
/**
* Set the interval between the end of one function run and the beginning
* of another, in milliseconds
*/
ConcurrentCaller.prototype.setInterval = function (ms) {
this._interval = ms;
}; };
ConcurrentCaller.prototype.setLogger = function (func) { ConcurrentCaller.prototype.setLogger = function (func) {
this.logger = func; this._logger = func;
} };
ConcurrentCaller.prototype.setErrorLogger = function (func) {
this.errorLogger = func;
}
/** /**
@ -79,47 +84,57 @@ ConcurrentCaller.prototype.fcall = function (func) {
//this._log("Running fcall on function"); //this._log("Running fcall on function");
promises.push(this.fcall(func[i])); promises.push(this.fcall(func[i]));
} }
return this.stopOnError ? Q.all(promises) : Q.allSettled(promises); return Q.allSettled(promises);
} }
// If we're at the maximum number of concurrent functions, // If we're at the maximum number of concurrent functions,
// queue this function for later // queue this function for later
if (this.numRunning == this.numConcurrent) { if (this._numRunning == this._numConcurrent) {
this._log("Already at " + this.numConcurrent + " -- queueing for later"); this._log("Already at " + this._numConcurrent + " -- queueing for later");
var deferred = Q.defer(); var deferred = Q.defer();
this.queue.push({ this._queue.push({
func: Q.fbind(func), func: Q.fbind(func),
deferred: deferred deferred: deferred
}); });
return deferred.promise; return deferred.promise;
} }
this._log("Running function (" + this.numRunning + " current < " + this.numConcurrent + " max)"); this._log("Running function (" + this._numRunning + " current < " + this._numConcurrent + " max)");
// Otherwise run it now // Otherwise run it now
this.numRunning++; this._numRunning++;
return this._onFunctionDone(Q.fcall(func)); return this._onFunctionDone(Q.fcall(func));
} }
ConcurrentCaller.prototype.stop = function () {
self._log("Clearing queue");
self._queue = [];
};
ConcurrentCaller.prototype._onFunctionDone = function (promise) { ConcurrentCaller.prototype._onFunctionDone = function (promise) {
var self = this; var self = this;
return Q.when( return Q.when(
promise, promise,
function (promise) { function (promise) {
self.numRunning--; self._numRunning--;
self._log("Done with function (" self._log("Done with function ("
+ self.numRunning + "/" + self.numConcurrent + " running, " + self._numRunning + "/" + self._numConcurrent + " running, "
+ self.queue.length + " queued)"); + self._queue.length + " queued)");
// If there's a function to call and we're under the concurrent limit, // If there's a function to call and we're under the concurrent limit,
// run it now // run it now
let f = self.queue.shift(); let f = self._queue.shift();
if (f && self.numRunning < self.numConcurrent) { if (f && self._numRunning < self._numConcurrent) {
Q.delay(1) Q.delay(self._interval)
.then(function () { .then(function () {
self.numRunning++; self._log("Running new function ("
+ self._numRunning + "/" + self._numConcurrent + " running, "
+ self._queue.length + " queued)");
self._numRunning++;
var p = self._onFunctionDone(f.func()); var p = self._onFunctionDone(f.func());
f.deferred.resolve(p); f.deferred.resolve(p);
}); });
@ -128,28 +143,28 @@ ConcurrentCaller.prototype._onFunctionDone = function (promise) {
return promise; return promise;
}, },
function (e) { function (e) {
if (self.errorLogger) { self._numRunning--;
self.errorLogger(e);
self._log("Done with function (" + self._numRunning + "/" + self._numConcurrent + ", "
+ self._queue.length + " in queue)");
if (self.onError) {
self.onError(e);
} }
self.numRunning--; if (self.stopOnError && self._queue.length) {
self._log("Done with function (" + self.numRunning + "/" + self.numConcurrent + ", "
+ self.queue.length + " in queue)");
if (self.stopOnError && self.queue.length) {
self._log("Stopping on error: " + e); self._log("Stopping on error: " + e);
self.queue = []; self._queue = [];
} }
throw e; throw e;
} }
); );
} };
ConcurrentCaller.prototype._log = function (msg) { ConcurrentCaller.prototype._log = function (msg) {
if (this.logger) { if (this._logger) {
this.logger(msg); this._logger("[ConcurrentCaller] " + msg);
} }
} };