signal-desktop/ts/util/AsyncQueue.ts

47 lines
1.1 KiB
TypeScript
Raw Normal View History

2021-04-29 18:02:27 -05:00
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { once, noop } from 'lodash';
/**
* You can do two things with an async queue:
*
* 1. Put values in.
* 2. Consume values out in the order they were added.
*
* Values are removed from the queue when they're consumed.
*
* There can only be one consumer, though this could be changed.
*
* See the tests to see how this works.
*/
export class AsyncQueue<T> implements AsyncIterable<T> {
#onAdd: () => void = noop;
#queue: Array<T> = [];
#isReading = false;
2021-04-29 18:02:27 -05:00
add(value: Readonly<T>): void {
this.#queue.push(value);
this.#onAdd();
2021-04-29 18:02:27 -05:00
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
if (this.#isReading) {
2021-04-29 18:02:27 -05:00
throw new Error('Cannot iterate over a queue more than once');
}
this.#isReading = true;
2021-04-29 18:02:27 -05:00
while (true) {
yield* this.#queue;
2021-04-29 18:02:27 -05:00
this.#queue = [];
2021-04-29 18:02:27 -05:00
// We want to iterate over the queue in series.
// eslint-disable-next-line no-await-in-loop
await new Promise<void>(resolve => {
this.#onAdd = once(resolve);
2021-04-29 18:02:27 -05:00
});
}
}
}