signal-desktop/ts/sql/Queueing.ts
2021-03-19 16:57:35 -04:00

141 lines
4.2 KiB
TypeScript

// Copyright 2018-2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import Queue from 'p-queue';
import { ServerInterface } from './Interface';
let allQueriesDone: () => void | undefined;
let sqlQueries = 0;
let singleQueue: Queue | null = null;
let multipleQueue: Queue | null = null;
// Note: we don't want queue timeouts, because delays here are due to in-progress sql
// operations. For example we might try to start a transaction when the prevous isn't
// done, causing that database operation to fail.
function makeNewSingleQueue(): Queue {
singleQueue = new Queue({ concurrency: 1 });
return singleQueue;
}
function makeNewMultipleQueue(): Queue {
multipleQueue = new Queue({ concurrency: 10 });
return multipleQueue;
}
const DEBUG = false;
function makeSQLJob(
fn: ServerInterface[keyof ServerInterface],
args: Array<unknown>,
callName: keyof ServerInterface
) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) queued`);
}
return async () => {
sqlQueries += 1;
const start = Date.now();
if (DEBUG) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) started`);
}
let result;
try {
// Ignoring this error TS2556: Expected 3 arguments, but got 0 or more.
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
result = await fn(...args);
} finally {
sqlQueries -= 1;
if (allQueriesDone && sqlQueries <= 0) {
allQueriesDone();
}
}
const end = Date.now();
const delta = end - start;
if (DEBUG || delta > 10) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) succeeded in ${end - start}ms`);
}
return result;
};
}
async function handleCall(
fn: ServerInterface[keyof ServerInterface],
args: Array<unknown>,
callName: keyof ServerInterface
) {
if (!fn) {
throw new Error(`sql channel: ${callName} is not an available function`);
}
let result;
// We queue here to keep multi-query operations atomic. Without it, any multistage
// data operation (even within a BEGIN/COMMIT) can become interleaved, since all
// requests share one database connection.
// A needsSerial method must be run in our single concurrency queue.
if (fn.needsSerial) {
if (singleQueue) {
result = await singleQueue.add(makeSQLJob(fn, args, callName));
} else if (multipleQueue) {
const queue = makeNewSingleQueue();
const multipleQueueLocal = multipleQueue;
queue.add(() => multipleQueueLocal.onIdle());
multipleQueue = null;
result = await queue.add(makeSQLJob(fn, args, callName));
} else {
const queue = makeNewSingleQueue();
result = await queue.add(makeSQLJob(fn, args, callName));
}
} else {
// The request can be parallelized. To keep the same structure as the above block
// we force this section into the 'lonely if' pattern.
// eslint-disable-next-line no-lonely-if
if (multipleQueue) {
result = await multipleQueue.add(makeSQLJob(fn, args, callName));
} else if (singleQueue) {
const queue = makeNewMultipleQueue();
queue.pause();
const singleQueueRef = singleQueue;
singleQueue = null;
const promise = queue.add(makeSQLJob(fn, args, callName));
if (singleQueueRef) {
await singleQueueRef.onIdle();
}
queue.start();
result = await promise;
} else {
const queue = makeNewMultipleQueue();
result = await queue.add(makeSQLJob(fn, args, callName));
}
}
return result;
}
export async function waitForPendingQueries(): Promise<void> {
return new Promise<void>(resolve => {
if (sqlQueries === 0) {
resolve();
} else {
allQueriesDone = () => resolve();
}
});
}
export function applyQueueing(dataInterface: ServerInterface): ServerInterface {
return Object.keys(dataInterface).reduce((acc, callName) => {
const serverInterfaceKey = callName as keyof ServerInterface;
acc[serverInterfaceKey] = async (...args: Array<unknown>) =>
handleCall(dataInterface[serverInterfaceKey], args, serverInterfaceKey);
return acc;
}, {} as ServerInterface);
}