signal-desktop/ts/util/batcher.ts
Scott Nonnenberg b7d56def82 Moves libtextsecure to Typescript
* Starting to work through lint errors

* libsignal-protocol: Update changes for primary repo compatibility

* Step 1: task_with_timeout rename

* Step 2: Apply the changes to TaskWithTimeout.ts

* Step 1: All to-be-converted libtextsecure/*.js files moved

* Step 2: No Typescript errors!

* Get libtextsecure tests passing again

* TSLint errors down to 1

* Compilation succeeds, no lint errors or test failures

* WebSocketResources - update import for case-sensitive filesystems

* Fixes for lint-deps

* Remove unnecessary @ts-ignore

* Fix inability to message your own contact after link

* Add log message for the end of migration 20

* lint fix
2020-04-15 14:45:11 -07:00

111 lines
2.3 KiB
TypeScript

import PQueue from 'p-queue';
// @ts-ignore
window.batchers = [];
// @ts-ignore
window.waitForAllBatchers = async () => {
// @ts-ignore
await Promise.all(window.batchers.map(item => item.flushAndWait()));
};
export type BatcherOptionsType<ItemType> = {
wait: number;
maxSize: number;
processBatch: (items: Array<ItemType>) => Promise<void>;
};
export type BatcherType<ItemType> = {
add: (item: ItemType) => void;
anyPending: () => boolean;
onIdle: () => Promise<void>;
flushAndWait: () => Promise<void>;
unregister: () => void;
};
async function sleep(ms: number): Promise<void> {
// tslint:disable-next-line:no-string-based-set-timeout
await new Promise(resolve => setTimeout(resolve, ms));
}
export function createBatcher<ItemType>(
options: BatcherOptionsType<ItemType>
): BatcherType<ItemType> {
let batcher: BatcherType<ItemType>;
let timeout: any;
let items: Array<ItemType> = [];
const queue = new PQueue({ concurrency: 1 });
function _kickBatchOff() {
const itemsRef = items;
items = [];
// tslint:disable-next-line:no-floating-promises
queue.add(async () => {
await options.processBatch(itemsRef);
});
}
function add(item: ItemType) {
items.push(item);
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
if (items.length >= options.maxSize) {
_kickBatchOff();
} else {
timeout = setTimeout(() => {
timeout = null;
_kickBatchOff();
}, options.wait);
}
}
function anyPending(): boolean {
return queue.size > 0 || queue.pending > 0 || items.length > 0;
}
async function onIdle() {
while (anyPending()) {
if (queue.size > 0 || queue.pending > 0) {
await queue.onIdle();
}
if (items.length > 0) {
await sleep(options.wait * 2);
}
}
}
function unregister() {
// @ts-ignore
window.batchers = window.batchers.filter((item: any) => item !== batcher);
}
async function flushAndWait() {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
if (items.length) {
_kickBatchOff();
}
return onIdle();
}
batcher = {
add,
anyPending,
onIdle,
flushAndWait,
unregister,
};
// @ts-ignore
window.batchers.push(batcher);
return batcher;
}