2023-01-03 19:55:46 +00:00
|
|
|
// Copyright 2019 Signal Messenger, LLC
|
2020-10-30 20:34:04 +00:00
|
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
|
2019-09-26 19:56:31 +00:00
|
|
|
import PQueue from 'p-queue';
|
|
|
|
|
2020-10-13 22:21:42 +00:00
|
|
|
import { sleep } from './sleep';
|
2021-09-17 18:27:53 +00:00
|
|
|
import * as log from '../logging/log';
|
2022-04-11 17:53:57 +00:00
|
|
|
import * as Errors from '../types/errors';
|
2022-02-25 18:37:15 +00:00
|
|
|
import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary';
|
2022-06-27 16:46:43 +00:00
|
|
|
import { MINUTE } from './durations';
|
2020-10-13 22:21:42 +00:00
|
|
|
|
2020-09-14 21:56:35 +00:00
|
|
|
declare global {
|
2021-01-14 18:07:05 +00:00
|
|
|
// We want to extend `window`'s properties, so we need an interface.
|
|
|
|
// eslint-disable-next-line no-restricted-syntax
|
2020-09-14 21:56:35 +00:00
|
|
|
interface Window {
|
|
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
|
|
waitBatchers: Array<BatcherType<any>>;
|
|
|
|
waitForAllWaitBatchers: () => Promise<unknown>;
|
2021-03-26 00:00:03 +00:00
|
|
|
flushAllWaitBatchers: () => Promise<unknown>;
|
2020-09-14 21:56:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-26 19:56:31 +00:00
|
|
|
window.waitBatchers = [];
|
|
|
|
|
2021-03-26 00:00:03 +00:00
|
|
|
window.flushAllWaitBatchers = async () => {
|
2021-09-17 18:27:53 +00:00
|
|
|
log.info('waitBatcher#flushAllWaitBatchers');
|
2022-04-11 17:53:57 +00:00
|
|
|
try {
|
|
|
|
await Promise.all(window.waitBatchers.map(item => item.flushAndWait()));
|
|
|
|
} catch (error) {
|
|
|
|
log.error(
|
|
|
|
'flushAllWaitBatchers: Error flushing all',
|
|
|
|
Errors.toLogFormat(error)
|
|
|
|
);
|
|
|
|
}
|
2021-03-26 00:00:03 +00:00
|
|
|
};
|
|
|
|
|
2019-09-26 19:56:31 +00:00
|
|
|
window.waitForAllWaitBatchers = async () => {
|
2021-09-17 18:27:53 +00:00
|
|
|
log.info('waitBatcher#waitForAllWaitBatchers');
|
2022-04-11 17:53:57 +00:00
|
|
|
try {
|
|
|
|
await Promise.all(window.waitBatchers.map(item => item.onIdle()));
|
|
|
|
} catch (error) {
|
|
|
|
log.error(
|
|
|
|
'waitForAllWaitBatchers: Error waiting for all',
|
|
|
|
Errors.toLogFormat(error)
|
|
|
|
);
|
|
|
|
}
|
2019-09-26 19:56:31 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
type ItemHolderType<ItemType> = {
|
2020-09-14 21:56:35 +00:00
|
|
|
resolve?: (value?: unknown) => void;
|
|
|
|
reject?: (error: Error) => void;
|
2019-09-26 19:56:31 +00:00
|
|
|
item: ItemType;
|
|
|
|
};
|
|
|
|
|
|
|
|
type ExplodedPromiseType = {
|
2020-09-14 21:56:35 +00:00
|
|
|
resolve?: (value?: unknown) => void;
|
|
|
|
reject?: (error: Error) => void;
|
|
|
|
promise: Promise<unknown>;
|
2019-09-26 19:56:31 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
type BatcherOptionsType<ItemType> = {
|
2021-03-26 00:00:03 +00:00
|
|
|
name: string;
|
2019-09-26 19:56:31 +00:00
|
|
|
wait: number;
|
|
|
|
maxSize: number;
|
|
|
|
processBatch: (items: Array<ItemType>) => Promise<void>;
|
|
|
|
};
|
|
|
|
|
|
|
|
type BatcherType<ItemType> = {
|
|
|
|
add: (item: ItemType) => Promise<void>;
|
|
|
|
anyPending: () => boolean;
|
|
|
|
onIdle: () => Promise<void>;
|
|
|
|
unregister: () => void;
|
2021-03-26 00:00:03 +00:00
|
|
|
flushAndWait: () => void;
|
2019-09-26 19:56:31 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
export function createWaitBatcher<ItemType>(
|
|
|
|
options: BatcherOptionsType<ItemType>
|
|
|
|
): BatcherType<ItemType> {
|
|
|
|
let waitBatcher: BatcherType<ItemType>;
|
2020-09-14 21:56:35 +00:00
|
|
|
let timeout: NodeJS.Timeout | null;
|
2019-09-26 19:56:31 +00:00
|
|
|
let items: Array<ItemHolderType<ItemType>> = [];
|
2021-11-23 22:01:03 +00:00
|
|
|
const queue = new PQueue({
|
|
|
|
concurrency: 1,
|
2022-06-27 16:46:43 +00:00
|
|
|
timeout: MINUTE * 30,
|
2021-11-23 22:01:03 +00:00
|
|
|
throwOnTimeout: true,
|
|
|
|
});
|
2019-09-26 19:56:31 +00:00
|
|
|
|
2021-03-26 00:00:03 +00:00
|
|
|
async function _kickBatchOff() {
|
2019-09-26 19:56:31 +00:00
|
|
|
const itemsRef = items;
|
|
|
|
items = [];
|
2021-03-26 00:00:03 +00:00
|
|
|
await queue.add(async () => {
|
2019-09-26 19:56:31 +00:00
|
|
|
try {
|
|
|
|
await options.processBatch(itemsRef.map(item => item.item));
|
|
|
|
itemsRef.forEach(item => {
|
2020-09-14 21:56:35 +00:00
|
|
|
if (item.resolve) {
|
|
|
|
item.resolve();
|
|
|
|
}
|
2019-09-26 19:56:31 +00:00
|
|
|
});
|
|
|
|
} catch (error) {
|
|
|
|
itemsRef.forEach(item => {
|
2020-09-14 21:56:35 +00:00
|
|
|
if (item.reject) {
|
|
|
|
item.reject(error);
|
|
|
|
}
|
2019-09-26 19:56:31 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
function _makeExplodedPromise(): ExplodedPromiseType {
|
|
|
|
let resolve;
|
|
|
|
let reject;
|
|
|
|
|
|
|
|
const promise = new Promise((resolveParam, rejectParam) => {
|
|
|
|
resolve = resolveParam;
|
|
|
|
reject = rejectParam;
|
|
|
|
});
|
|
|
|
|
|
|
|
return { promise, resolve, reject };
|
|
|
|
}
|
|
|
|
|
|
|
|
async function add(item: ItemType) {
|
|
|
|
const { promise, resolve, reject } = _makeExplodedPromise();
|
2020-09-14 21:56:35 +00:00
|
|
|
|
2019-09-26 19:56:31 +00:00
|
|
|
items.push({
|
|
|
|
resolve,
|
|
|
|
reject,
|
|
|
|
item,
|
|
|
|
});
|
|
|
|
|
2021-03-26 00:00:03 +00:00
|
|
|
if (items.length === 1) {
|
|
|
|
// Set timeout once when we just pushed the first item so that the wait
|
|
|
|
// time is bounded by `options.wait` and not extended by further pushes.
|
2019-09-26 19:56:31 +00:00
|
|
|
timeout = setTimeout(() => {
|
|
|
|
timeout = null;
|
2022-12-21 18:41:48 +00:00
|
|
|
void _kickBatchOff();
|
2019-09-26 19:56:31 +00:00
|
|
|
}, options.wait);
|
|
|
|
}
|
2021-03-26 00:00:03 +00:00
|
|
|
if (items.length >= options.maxSize) {
|
2022-02-25 18:37:15 +00:00
|
|
|
clearTimeoutIfNecessary(timeout);
|
|
|
|
timeout = null;
|
2021-03-26 00:00:03 +00:00
|
|
|
|
2022-12-21 18:41:48 +00:00
|
|
|
void _kickBatchOff();
|
2021-03-26 00:00:03 +00:00
|
|
|
}
|
2019-09-26 19:56:31 +00:00
|
|
|
|
|
|
|
await promise;
|
|
|
|
}
|
|
|
|
|
|
|
|
function anyPending(): boolean {
|
|
|
|
return queue.size > 0 || queue.pending > 0 || items.length > 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
async function onIdle() {
|
|
|
|
while (anyPending()) {
|
|
|
|
if (queue.size > 0 || queue.pending > 0) {
|
2020-09-14 21:56:35 +00:00
|
|
|
// eslint-disable-next-line no-await-in-loop
|
2019-09-26 19:56:31 +00:00
|
|
|
await queue.onIdle();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (items.length > 0) {
|
2020-09-14 21:56:35 +00:00
|
|
|
// eslint-disable-next-line no-await-in-loop
|
2019-09-26 19:56:31 +00:00
|
|
|
await sleep(options.wait * 2);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function unregister() {
|
|
|
|
window.waitBatchers = window.waitBatchers.filter(
|
2020-09-14 21:56:35 +00:00
|
|
|
item => item !== waitBatcher
|
2019-09-26 19:56:31 +00:00
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-03-26 00:00:03 +00:00
|
|
|
async function flushAndWait() {
|
2021-09-17 18:27:53 +00:00
|
|
|
log.info(
|
2021-03-26 00:00:03 +00:00
|
|
|
`Flushing start ${options.name} for waitBatcher ` +
|
|
|
|
`items.length=${items.length}`
|
|
|
|
);
|
2022-02-25 18:37:15 +00:00
|
|
|
clearTimeoutIfNecessary(timeout);
|
|
|
|
timeout = null;
|
2021-03-26 00:00:03 +00:00
|
|
|
|
|
|
|
while (anyPending()) {
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
await _kickBatchOff();
|
|
|
|
|
|
|
|
if (queue.size > 0 || queue.pending > 0) {
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
|
|
await queue.onIdle();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-17 18:27:53 +00:00
|
|
|
log.info(`Flushing complete ${options.name} for waitBatcher`);
|
2021-03-26 00:00:03 +00:00
|
|
|
}
|
|
|
|
|
2019-09-26 19:56:31 +00:00
|
|
|
waitBatcher = {
|
|
|
|
add,
|
|
|
|
anyPending,
|
|
|
|
onIdle,
|
|
|
|
unregister,
|
2021-03-26 00:00:03 +00:00
|
|
|
flushAndWait,
|
2019-09-26 19:56:31 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
window.waitBatchers.push(waitBatcher);
|
|
|
|
|
|
|
|
return waitBatcher;
|
|
|
|
}
|