// Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only /* eslint-disable no-console */ import { join } from 'path'; import { Worker } from 'worker_threads'; import { explodePromise } from '../util/explodePromise'; import { isCorruptionError } from './errors'; const ASAR_PATTERN = /app\.asar$/; const MIN_TRACE_DURATION = 40; export type InitializeOptions = { readonly configDir: string; readonly key: string; }; export type WorkerRequest = | { readonly type: 'init'; readonly options: InitializeOptions; } | { readonly type: 'close'; } | { readonly type: 'removeDB'; } | { readonly type: 'sqlCall'; readonly method: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any readonly args: ReadonlyArray; }; export type WrappedWorkerRequest = { readonly seq: number; readonly request: WorkerRequest; }; export type WrappedWorkerResponse = { readonly seq: number; readonly error: string | undefined; // eslint-disable-next-line @typescript-eslint/no-explicit-any readonly response: any; }; type PromisePair = { resolve: (response: T) => void; reject: (error: Error) => void; }; export class MainSQL { private readonly worker: Worker; private isReady = false; private onReady: Promise | undefined; private readonly onExit: Promise; // This promise is resolved when any of the queries that we run against the // database reject with a corruption error (see `isCorruptionError`) private readonly onCorruption: Promise; private seq = 0; // eslint-disable-next-line @typescript-eslint/no-explicit-any private onResponse = new Map>(); constructor() { let appDir = join(__dirname, '..', '..'); let isBundled = false; if (ASAR_PATTERN.test(appDir)) { appDir = appDir.replace(ASAR_PATTERN, 'app.asar.unpacked'); isBundled = true; } const scriptDir = join(appDir, 'ts', 'sql'); this.worker = new Worker( join(scriptDir, isBundled ? 'mainWorker.bundle.js' : 'mainWorker.js') ); const { promise: onCorruption, resolve: resolveCorruption, } = explodePromise(); this.onCorruption = onCorruption; this.worker.on('message', (wrappedResponse: WrappedWorkerResponse) => { const { seq, error, response } = wrappedResponse; const pair = this.onResponse.get(seq); this.onResponse.delete(seq); if (!pair) { throw new Error(`Unexpected worker response with seq: ${seq}`); } if (error) { const errorObj = new Error(error); if (isCorruptionError(errorObj)) { resolveCorruption(errorObj); } pair.reject(errorObj); } else { pair.resolve(response); } }); this.onExit = new Promise(resolve => { this.worker.once('exit', resolve); }); } public async initialize(options: InitializeOptions): Promise { if (this.isReady || this.onReady) { throw new Error('Already initialized'); } this.onReady = this.send({ type: 'init', options }); await this.onReady; this.onReady = undefined; this.isReady = true; } public whenCorrupted(): Promise { return this.onCorruption; } public async close(): Promise { if (!this.isReady) { throw new Error('Not initialized'); } await this.send({ type: 'close' }); await this.onExit; } public async removeDB(): Promise { await this.send({ type: 'removeDB' }); } // eslint-disable-next-line @typescript-eslint/no-explicit-any public async sqlCall(method: string, args: ReadonlyArray): Promise { if (this.onReady) { await this.onReady; } if (!this.isReady) { throw new Error('Not initialized'); } const { result, duration } = await this.send({ type: 'sqlCall', method, args, }); if (duration > MIN_TRACE_DURATION) { console.log(`ts/sql/main: slow query ${method} duration=${duration}ms`); } return result; } private async send(request: WorkerRequest): Promise { const { seq } = this; this.seq += 1; const result = new Promise((resolve, reject) => { this.onResponse.set(seq, { resolve, reject }); }); const wrappedRequest: WrappedWorkerRequest = { seq, request, }; this.worker.postMessage(wrappedRequest); return result; } }