signal-desktop/ts/util/StartupQueue.ts

81 lines
1.9 KiB
TypeScript
Raw Normal View History

2021-03-13 01:22:36 +00:00
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
2023-02-24 19:03:17 +00:00
import PQueue from 'p-queue';
2021-08-17 00:16:00 +00:00
import * as Errors from '../types/errors';
import * as log from '../logging/log';
2021-03-13 01:22:36 +00:00
2021-08-17 00:16:00 +00:00
type EntryType = Readonly<{
value: number;
2023-02-24 19:03:17 +00:00
callback(): Promise<void>;
2021-08-17 00:16:00 +00:00
}>;
2021-03-13 01:22:36 +00:00
2023-01-13 00:24:59 +00:00
let startupProcessingQueue: StartupQueue | undefined;
2021-08-17 00:16:00 +00:00
export class StartupQueue {
private readonly map = new Map<string, EntryType>();
2023-02-24 19:03:17 +00:00
private readonly running: PQueue = new PQueue({
// mostly io-bound work that is not very parallelizable
// small number should be sufficient
concurrency: 5,
});
2021-03-13 01:22:36 +00:00
2023-02-24 19:03:17 +00:00
public add(id: string, value: number, f: () => Promise<void>): void {
2021-08-17 00:16:00 +00:00
const existing = this.map.get(id);
if (existing && existing.value >= value) {
2021-03-13 01:22:36 +00:00
return;
}
2021-08-17 00:16:00 +00:00
this.map.set(id, { value, callback: f });
2021-03-13 01:22:36 +00:00
}
2021-08-17 00:16:00 +00:00
public flush(): void {
log.info('StartupQueue: Processing', this.map.size, 'actions');
2021-08-17 00:16:00 +00:00
const values = Array.from(this.map.values());
this.map.clear();
for (const { callback } of values) {
2023-02-24 19:03:17 +00:00
void this.running.add(async () => {
try {
return callback();
} catch (error) {
log.error(
'StartupQueue: Failed to process item due to error',
Errors.toLogFormat(error)
);
throw error;
}
});
2021-08-17 00:16:00 +00:00
}
2021-03-13 01:22:36 +00:00
}
2023-01-13 00:24:59 +00:00
2023-02-24 19:03:17 +00:00
private shutdown(): Promise<void> {
log.info(
`StartupQueue: Waiting for ${this.running.pending} tasks to drain`
);
return this.running.onIdle();
}
2023-01-13 00:24:59 +00:00
static initialize(): void {
startupProcessingQueue = new StartupQueue();
}
2023-02-24 19:03:17 +00:00
static isAvailable(): boolean {
2023-01-13 00:24:59 +00:00
return Boolean(startupProcessingQueue);
}
2023-02-24 19:03:17 +00:00
static add(id: string, value: number, f: () => Promise<void>): void {
2023-01-13 00:24:59 +00:00
startupProcessingQueue?.add(id, value, f);
}
static flush(): void {
startupProcessingQueue?.flush();
startupProcessingQueue = undefined;
}
2023-02-24 19:03:17 +00:00
static async shutdown(): Promise<void> {
await startupProcessingQueue?.shutdown();
}
2021-03-13 01:22:36 +00:00
}