signal-desktop/ts/util/waitBatcher.ts

189 lines
4.5 KiB
TypeScript
Raw Normal View History

// Copyright 2019-2022 Signal Messenger, LLC
2020-10-30 20:34:04 +00:00
// SPDX-License-Identifier: AGPL-3.0-only
2019-09-26 19:56:31 +00:00
import PQueue from 'p-queue';
import { sleep } from './sleep';
import * as log from '../logging/log';
import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary';
declare global {
// We want to extend `window`'s properties, so we need an interface.
// eslint-disable-next-line no-restricted-syntax
interface Window {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
waitBatchers: Array<BatcherType<any>>;
waitForAllWaitBatchers: () => Promise<unknown>;
2021-03-26 00:00:03 +00:00
flushAllWaitBatchers: () => Promise<unknown>;
}
}
2019-09-26 19:56:31 +00:00
window.waitBatchers = [];
2021-03-26 00:00:03 +00:00
window.flushAllWaitBatchers = async () => {
log.info('waitBatcher#flushAllWaitBatchers');
2021-03-26 00:00:03 +00:00
await Promise.all(window.waitBatchers.map(item => item.flushAndWait()));
};
2019-09-26 19:56:31 +00:00
window.waitForAllWaitBatchers = async () => {
log.info('waitBatcher#waitForAllWaitBatchers');
2019-09-26 19:56:31 +00:00
await Promise.all(window.waitBatchers.map(item => item.onIdle()));
};
type ItemHolderType<ItemType> = {
resolve?: (value?: unknown) => void;
reject?: (error: Error) => void;
2019-09-26 19:56:31 +00:00
item: ItemType;
};
type ExplodedPromiseType = {
resolve?: (value?: unknown) => void;
reject?: (error: Error) => void;
promise: Promise<unknown>;
2019-09-26 19:56:31 +00:00
};
type BatcherOptionsType<ItemType> = {
2021-03-26 00:00:03 +00:00
name: string;
2019-09-26 19:56:31 +00:00
wait: number;
maxSize: number;
processBatch: (items: Array<ItemType>) => Promise<void>;
};
type BatcherType<ItemType> = {
add: (item: ItemType) => Promise<void>;
anyPending: () => boolean;
onIdle: () => Promise<void>;
unregister: () => void;
2021-03-26 00:00:03 +00:00
flushAndWait: () => void;
2019-09-26 19:56:31 +00:00
};
export function createWaitBatcher<ItemType>(
options: BatcherOptionsType<ItemType>
): BatcherType<ItemType> {
let waitBatcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null;
2019-09-26 19:56:31 +00:00
let items: Array<ItemHolderType<ItemType>> = [];
2021-11-23 22:01:03 +00:00
const queue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
throwOnTimeout: true,
});
2019-09-26 19:56:31 +00:00
2021-03-26 00:00:03 +00:00
async function _kickBatchOff() {
2019-09-26 19:56:31 +00:00
const itemsRef = items;
items = [];
2021-03-26 00:00:03 +00:00
await queue.add(async () => {
2019-09-26 19:56:31 +00:00
try {
await options.processBatch(itemsRef.map(item => item.item));
itemsRef.forEach(item => {
if (item.resolve) {
item.resolve();
}
2019-09-26 19:56:31 +00:00
});
} catch (error) {
itemsRef.forEach(item => {
if (item.reject) {
item.reject(error);
}
2019-09-26 19:56:31 +00:00
});
}
});
}
function _makeExplodedPromise(): ExplodedPromiseType {
let resolve;
let reject;
const promise = new Promise((resolveParam, rejectParam) => {
resolve = resolveParam;
reject = rejectParam;
});
return { promise, resolve, reject };
}
async function add(item: ItemType) {
const { promise, resolve, reject } = _makeExplodedPromise();
2019-09-26 19:56:31 +00:00
items.push({
resolve,
reject,
item,
});
2021-03-26 00:00:03 +00:00
if (items.length === 1) {
// Set timeout once when we just pushed the first item so that the wait
// time is bounded by `options.wait` and not extended by further pushes.
2019-09-26 19:56:31 +00:00
timeout = setTimeout(() => {
timeout = null;
_kickBatchOff();
}, options.wait);
}
2021-03-26 00:00:03 +00:00
if (items.length >= options.maxSize) {
clearTimeoutIfNecessary(timeout);
timeout = null;
2021-03-26 00:00:03 +00:00
_kickBatchOff();
}
2019-09-26 19:56:31 +00:00
await promise;
}
function anyPending(): boolean {
return queue.size > 0 || queue.pending > 0 || items.length > 0;
}
async function onIdle() {
while (anyPending()) {
if (queue.size > 0 || queue.pending > 0) {
// eslint-disable-next-line no-await-in-loop
2019-09-26 19:56:31 +00:00
await queue.onIdle();
}
if (items.length > 0) {
// eslint-disable-next-line no-await-in-loop
2019-09-26 19:56:31 +00:00
await sleep(options.wait * 2);
}
}
}
function unregister() {
window.waitBatchers = window.waitBatchers.filter(
item => item !== waitBatcher
2019-09-26 19:56:31 +00:00
);
}
2021-03-26 00:00:03 +00:00
async function flushAndWait() {
log.info(
2021-03-26 00:00:03 +00:00
`Flushing start ${options.name} for waitBatcher ` +
`items.length=${items.length}`
);
clearTimeoutIfNecessary(timeout);
timeout = null;
2021-03-26 00:00:03 +00:00
while (anyPending()) {
// eslint-disable-next-line no-await-in-loop
await _kickBatchOff();
if (queue.size > 0 || queue.pending > 0) {
// eslint-disable-next-line no-await-in-loop
await queue.onIdle();
}
}
log.info(`Flushing complete ${options.name} for waitBatcher`);
2021-03-26 00:00:03 +00:00
}
2019-09-26 19:56:31 +00:00
waitBatcher = {
add,
anyPending,
onIdle,
unregister,
2021-03-26 00:00:03 +00:00
flushAndWait,
2019-09-26 19:56:31 +00:00
};
window.waitBatchers.push(waitBatcher);
return waitBatcher;
}