Delay properly on 429 response in syncAPIClient
ConcurrentCaller wasn't waiting properly if start() was called again while it was pausing, so 429 caused an immediate retry, which is pretty much exactly what you don't want a 429 to do.
This commit is contained in:
parent
31c928a3ff
commit
87352822fa
2 changed files with 79 additions and 55 deletions
|
@ -201,9 +201,10 @@ ConcurrentCaller.prototype._processNext = function () {
|
|||
return false;
|
||||
}
|
||||
|
||||
// If there's a function to call and we're under the concurrent limit, run it now
|
||||
var f = this._queue.shift();
|
||||
if (!f) {
|
||||
var task = this._queue.shift();
|
||||
|
||||
// Nothing left to run
|
||||
if (!task) {
|
||||
if (this._numRunning == 0 && !this._pausing) {
|
||||
this._log("All tasks are done");
|
||||
this._deferred.resolve();
|
||||
|
@ -214,72 +215,85 @@ ConcurrentCaller.prototype._processNext = function () {
|
|||
return false;
|
||||
}
|
||||
|
||||
this._log("Running function ("
|
||||
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
||||
+ this._queue.length + " queued)");
|
||||
|
||||
this._numRunning++;
|
||||
f.func().bind(this).then(function (value) {
|
||||
this._numRunning--;
|
||||
|
||||
this._log("Done with function ("
|
||||
var runFunc = async () => {
|
||||
this._log("Running function ("
|
||||
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
||||
+ this._queue.length + " queued)");
|
||||
|
||||
this._waitForPause().bind(this).then(function () {
|
||||
this._processNext();
|
||||
});
|
||||
|
||||
f.deferred.resolve(value);
|
||||
})
|
||||
.catch(function (e) {
|
||||
this._numRunning--;
|
||||
|
||||
this._log("Error in function (" + this._numRunning + "/" + this.numConcurrent + ", "
|
||||
+ this._queue.length + " in queue)"
|
||||
+ ((!this.onError && !this.stopOnError) ? ": " + e : ""));
|
||||
|
||||
if (this.onError) {
|
||||
this.onError(e);
|
||||
try {
|
||||
let value = await task.func();
|
||||
this._numRunning--;
|
||||
|
||||
this._log("Done with function ("
|
||||
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
||||
+ this._queue.length + " queued)");
|
||||
|
||||
task.deferred.resolve(value);
|
||||
}
|
||||
|
||||
if (this.stopOnError && this._queue.length) {
|
||||
this._log("Stopping on error: " + e);
|
||||
this._oldQueue = this._queue;
|
||||
this._queue = [];
|
||||
for (let o of this._oldQueue) {
|
||||
//this._log("Rejecting promise");
|
||||
o.deferred.reject();
|
||||
catch (e) {
|
||||
this._numRunning--;
|
||||
|
||||
this._log("Error in function (" + this._numRunning + "/" + this.numConcurrent + ", "
|
||||
+ this._queue.length + " in queue)"
|
||||
+ ((!this.onError && !this.stopOnError) ? ": " + e : ""));
|
||||
|
||||
if (this.onError) {
|
||||
this.onError(e);
|
||||
}
|
||||
|
||||
if (this.stopOnError && this._queue.length) {
|
||||
this._log("Stopping on error: " + e);
|
||||
this._oldQueue = this._queue;
|
||||
this._queue = [];
|
||||
for (let o of this._oldQueue) {
|
||||
//this._log("Rejecting promise");
|
||||
o.deferred.reject();
|
||||
}
|
||||
}
|
||||
|
||||
e.handledRejection = true;
|
||||
task.deferred.reject(e);
|
||||
}
|
||||
|
||||
this._waitForPause().bind(this).then(function () {
|
||||
this._processNext();
|
||||
});
|
||||
|
||||
e.handledRejection = true;
|
||||
f.deferred.reject(e);
|
||||
});
|
||||
};
|
||||
|
||||
if (this._getIntervalNeeded() > 0) {
|
||||
this._waitForPause()
|
||||
.then(runFunc)
|
||||
.then(() => this._processNext());
|
||||
}
|
||||
else {
|
||||
runFunc()
|
||||
.then(() => this._processNext());
|
||||
}
|
||||
|
||||
this._numRunning++;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
ConcurrentCaller.prototype._getIntervalNeeded = function () {
|
||||
var interval = this._interval;
|
||||
var now = Date.now();
|
||||
if (this._pauseUntil > now && (this._pauseUntil - now > interval)) {
|
||||
interval = this._pauseUntil - now
|
||||
}
|
||||
return interval;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Wait until the specified interval has elapsed or the current pause (if there is one) is over,
|
||||
* whichever is longer
|
||||
*/
|
||||
ConcurrentCaller.prototype._waitForPause = Promise.coroutine(function* () {
|
||||
let interval = this._interval;
|
||||
let now = Date.now();
|
||||
if (this._pauseUntil > now && (this._pauseUntil - now > interval)) {
|
||||
interval = this._pauseUntil - now;
|
||||
}
|
||||
var interval = this._getIntervalNeeded();
|
||||
if (interval == 0) return;
|
||||
this._pausing = true;
|
||||
yield Promise.delay(interval);
|
||||
this._pausing = false;
|
||||
});
|
||||
|
||||
|
||||
ConcurrentCaller.prototype._log = function (msg) {
|
||||
if (this._logger) {
|
||||
this._logger("[ConcurrentCaller] " + (this._id ? `[${this._id}] ` : "") + msg);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue