TaskWithTimeout: After suspend, don't start timers for new tasks

This commit is contained in:
Scott Nonnenberg 2022-01-26 12:39:24 -08:00 committed by GitHub
parent 0cf28344a6
commit 5f34ece87c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 31 deletions

View file

@ -204,24 +204,20 @@ export default class MessageReceiver
this.incomingQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
this.appQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
// All envelopes start in encryptedQueue and progress to decryptedQueue
this.encryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
this.decryptedQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
@ -255,9 +251,11 @@ export default class MessageReceiver
request.respond(200, 'OK');
if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') {
this.incomingQueue.add(() => {
this.incomingQueue.add(
createTaskWithTimeout(async () => {
this.onEmpty();
});
}, 'incomingQueue/onEmpty')
);
}
return;
}
@ -326,12 +324,19 @@ export default class MessageReceiver
}
};
this.incomingQueue.add(job);
this.incomingQueue.add(
createTaskWithTimeout(job, 'incomingQueue/websocket')
);
}
public reset(): void {
// We always process our cache before processing a new websocket message
this.incomingQueue.add(async () => this.queueAllCached());
this.incomingQueue.add(
createTaskWithTimeout(
async () => this.queueAllCached(),
'incomingQueue/queueAllCached'
)
);
this.count = 0;
this.isEmptied = false;
@ -348,14 +353,24 @@ export default class MessageReceiver
public async drain(): Promise<void> {
const waitForEncryptedQueue = async () =>
this.addToQueue(async () => {
this.addToQueue(
async () => {
log.info('drained');
}, TaskType.Decrypted);
},
'drain/waitForDecrypted',
TaskType.Decrypted
);
const waitForIncomingQueue = async () =>
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
this.addToQueue(
waitForEncryptedQueue,
'drain/waitForEncrypted',
TaskType.Encrypted
);
return this.incomingQueue.add(waitForIncomingQueue);
return this.incomingQueue.add(
createTaskWithTimeout(waitForIncomingQueue, 'drain/waitForIncoming')
);
}
//
@ -508,7 +523,12 @@ export default class MessageReceiver
//
private async dispatchAndWait(event: Event): Promise<void> {
this.appQueue.add(async () => Promise.all(this.dispatchEvent(event)));
this.appQueue.add(
createTaskWithTimeout(
async () => Promise.all(this.dispatchEvent(event)),
'dispatchEvent'
)
);
}
private calculateMessageAge(
@ -542,6 +562,7 @@ export default class MessageReceiver
private async addToQueue<T>(
task: () => Promise<T>,
id: string,
taskType: TaskType
): Promise<T> {
if (taskType === TaskType.Encrypted) {
@ -554,7 +575,7 @@ export default class MessageReceiver
: this.decryptedQueue;
try {
return await queue.add(task);
return await queue.add(createTaskWithTimeout(task, id));
} finally {
this.updateProgress(this.count);
}
@ -580,24 +601,34 @@ export default class MessageReceiver
);
// We don't await here because we don't want this to gate future message processing
this.appQueue.add(emitEmpty);
this.appQueue.add(createTaskWithTimeout(emitEmpty, 'emitEmpty'));
};
const waitForEncryptedQueue = async () => {
this.addToQueue(waitForDecryptedQueue, TaskType.Decrypted);
this.addToQueue(
waitForDecryptedQueue,
'onEmpty/waitForDecrypted',
TaskType.Decrypted
);
};
const waitForIncomingQueue = () => {
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
const waitForIncomingQueue = async () => {
// Note: this.count is used in addToQueue
// Resetting count so everything from the websocket after this starts at zero
this.count = 0;
this.addToQueue(
waitForEncryptedQueue,
'onEmpty/waitForEncrypted',
TaskType.Encrypted
);
};
const waitForCacheAddBatcher = async () => {
await this.decryptAndCacheBatcher.onIdle();
this.incomingQueue.add(waitForIncomingQueue);
this.incomingQueue.add(
createTaskWithTimeout(waitForIncomingQueue, 'onEmpty/waitForIncoming')
);
};
waitForCacheAddBatcher();
@ -675,9 +706,13 @@ export default class MessageReceiver
}
// Maintain invariant: encrypted queue => decrypted queue
this.addToQueue(async () => {
this.addToQueue(
async () => {
this.queueDecryptedEnvelope(envelope, payloadPlaintext);
}, TaskType.Encrypted);
},
'queueDecryptedEnvelope',
TaskType.Encrypted
);
} else {
this.queueCachedEnvelope(item, envelope);
}
@ -729,7 +764,12 @@ export default class MessageReceiver
if (this.isEmptied) {
this.clearRetryTimeout();
this.retryCachedTimeout = setTimeout(() => {
this.incomingQueue.add(async () => this.queueAllCached());
this.incomingQueue.add(
createTaskWithTimeout(
async () => this.queueAllCached(),
'queueAllCached'
)
);
}, RETRY_TIMEOUT);
}
}
@ -967,7 +1007,11 @@ export default class MessageReceiver
);
try {
await this.addToQueue(taskWithTimeout, TaskType.Decrypted);
await this.addToQueue(
taskWithTimeout,
'dispatchEvent',
TaskType.Decrypted
);
} catch (error) {
log.error(
`queueDecryptedEnvelope error handling envelope ${id}:`,
@ -984,7 +1028,7 @@ export default class MessageReceiver
let logId = this.getEnvelopeId(envelope);
log.info(`queueing ${uuidKind} envelope`, logId);
const task = createTaskWithTimeout(async (): Promise<DecryptResult> => {
const task = async (): Promise<DecryptResult> => {
const unsealedEnvelope = await this.unsealEnvelope(
stores,
envelope,
@ -1000,14 +1044,19 @@ export default class MessageReceiver
this.addToQueue(
async () => this.dispatchEvent(new EnvelopeEvent(unsealedEnvelope)),
'dispatchEvent',
TaskType.Decrypted
);
return this.decryptEnvelope(stores, unsealedEnvelope, uuidKind);
}, `MessageReceiver: unseal and decrypt ${logId}`);
};
try {
return await this.addToQueue(task, TaskType.Encrypted);
return await this.addToQueue(
task,
`MessageReceiver: unseal and decrypt ${logId}`,
TaskType.Encrypted
);
} catch (error) {
const args = [
'queueEncryptedEnvelope error handling envelope',
@ -1630,6 +1679,7 @@ export default class MessageReceiver
// Avoid deadlocks by scheduling processing on decrypted queue
this.addToQueue(
async () => this.dispatchEvent(event),
'decrypted/dispatchEvent',
TaskType.Decrypted
);
} else {

View file

@ -12,9 +12,11 @@ type TaskType = {
};
const tasks = new Set<TaskType>();
let shouldStartTimers = true;
export function suspendTasksWithTimeout(): void {
log.info(`TaskWithTimeout: suspending ${tasks.size} tasks`);
shouldStartTimers = false;
for (const task of tasks) {
task.suspend();
}
@ -22,6 +24,7 @@ export function suspendTasksWithTimeout(): void {
export function resumeTasksWithTimeout(): void {
log.info(`TaskWithTimeout: resuming ${tasks.size} tasks`);
shouldStartTimers = true;
for (const task of tasks) {
task.resume();
}
@ -75,7 +78,9 @@ export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
};
tasks.add(entry);
if (shouldStartTimers) {
startTimer();
}
let result: unknown;