Types, better-sqlite3, and worker_threads for our sqlite

This commit is contained in:
Fedor Indutny 2021-04-05 15:18:19 -07:00 committed by Josh Perez
parent fc3004a183
commit 37c8c1727f
24 changed files with 2823 additions and 3121 deletions

View file

@ -36,6 +36,7 @@ import {
import {
AttachmentDownloadJobType,
ClientInterface,
ClientSearchResultMessageType,
ClientJobType,
ConversationType,
IdentityKeyType,
@ -55,7 +56,6 @@ import {
import Server from './Server';
import { MessageModel } from '../models/messages';
import { ConversationModel } from '../models/conversations';
import { waitForPendingQueries } from './Queueing';
// We listen to a lot of events on ipcRenderer, often on the same channel. This prevents
// any warnings that might be sent to the console in that case.
@ -243,12 +243,14 @@ const dataInterface: ClientInterface = {
export default dataInterface;
async function goBackToMainProcess(): Promise<void> {
window.log.info('data.goBackToMainProcess: waiting for pending queries');
// Let pending queries finish before we'll give write access to main process.
// We don't want to be writing from two processes at the same time!
await waitForPendingQueries();
if (!shouldUseRendererProcess) {
window.log.info(
'data.goBackToMainProcess: already switched to main process'
);
return;
}
// We don't need to wait for pending queries since they are synchronous.
window.log.info('data.goBackToMainProcess: switching to main process');
shouldUseRendererProcess = false;
@ -514,8 +516,6 @@ function keysFromArrayBuffer(keys: Array<string>, data: any) {
// Top-level calls
async function shutdown() {
await waitForPendingQueries();
// Stop accepting new SQL jobs, flush outstanding queue
await _shutdown();
@ -761,7 +761,13 @@ const updateConversationBatcher = createBatcher<ConversationType>({
// We only care about the most recent update for each conversation
const byId = groupBy(items, item => item.id);
const ids = Object.keys(byId);
const mostRecent = ids.map(id => last(byId[id]));
const mostRecent = ids.map(
(id: string): ConversationType => {
const maybeLast = last(byId[id]);
assert(maybeLast !== undefined, 'Empty array in `groupBy` result');
return maybeLast;
}
);
await updateConversations(mostRecent);
},
@ -857,9 +863,13 @@ async function searchConversations(query: string) {
return conversations;
}
function handleSearchMessageJSON(messages: Array<SearchResultMessageType>) {
function handleSearchMessageJSON(
messages: Array<SearchResultMessageType>
): Array<ClientSearchResultMessageType> {
return messages.map(message => ({
json: message.json,
...JSON.parse(message.json),
bodyRanges: [],
snippet: message.snippet,
}));
}
@ -940,7 +950,7 @@ async function getMessageById(
) {
const message = await channels.getMessageById(id);
if (!message) {
return null;
return undefined;
}
return new Message(message);
@ -1262,7 +1272,9 @@ async function updateUnprocessedAttempts(id: string, attempts: number) {
async function updateUnprocessedWithData(id: string, data: UnprocessedType) {
await channels.updateUnprocessedWithData(id, data);
}
async function updateUnprocessedsWithData(array: Array<UnprocessedType>) {
async function updateUnprocessedsWithData(
array: Array<{ id: string; data: UnprocessedType }>
) {
await channels.updateUnprocessedsWithData(array);
}

View file

@ -4,31 +4,129 @@
/* eslint-disable @typescript-eslint/ban-types */
/* eslint-disable camelcase */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { LocaleMessagesType } from '../types/I18N';
import {
ConversationAttributesType,
ConversationModelCollectionType,
MessageAttributesType,
MessageModelCollectionType,
} from '../model-types.d';
import { MessageModel } from '../models/messages';
import { ConversationModel } from '../models/conversations';
export type AttachmentDownloadJobType = any;
export type ConverationMetricsType = any;
export type ConversationType = any;
export type EmojiType = any;
export type IdentityKeyType = any;
export type AttachmentDownloadJobType = {
id: string;
timestamp: number;
pending: number;
attempts: number;
};
export type MessageMetricsType = {
id: string;
// eslint-disable-next-line camelcase
received_at: number;
// eslint-disable-next-line camelcase
sent_at: number;
};
export type ConversationMetricsType = {
oldest?: MessageMetricsType;
newest?: MessageMetricsType;
oldestUnread?: MessageMetricsType;
totalUnread: number;
};
export type ConversationType = ConversationAttributesType;
export type EmojiType = {
shortName: string;
lastUsage: number;
};
export type IdentityKeyType = {
firstUse: boolean;
id: string;
nonblockingApproval: boolean;
publicKey: ArrayBuffer;
timestamp: number;
verified: number;
};
export type ItemType = any;
export type MessageType = any;
export type MessageTypeUnhydrated = any;
export type PreKeyType = any;
export type SearchResultMessageType = any;
export type SessionType = any;
export type SignedPreKeyType = any;
export type StickerPackStatusType = string;
export type StickerPackType = any;
export type StickerType = any;
export type UnprocessedType = any;
export type MessageType = MessageAttributesType;
export type MessageTypeUnhydrated = {
json: string;
};
export type PreKeyType = {
id: number;
privateKey: ArrayBuffer;
publicKey: ArrayBuffer;
};
export type SearchResultMessageType = {
json: string;
snippet: string;
};
export type ClientSearchResultMessageType = MessageType & {
json: string;
bodyRanges: [];
snippet: string;
};
export type SessionType = {
id: string;
conversationId: string;
deviceId: number;
record: string;
};
export type SignedPreKeyType = {
confirmed: boolean;
// eslint-disable-next-line camelcase
created_at: number;
id: number;
privateKey: ArrayBuffer;
publicKey: ArrayBuffer;
};
export type StickerPackStatusType =
| 'known'
| 'ephemeral'
| 'downloaded'
| 'installed'
| 'pending'
| 'error';
export type StickerType = {
id: number;
packId: string;
emoji: string;
isCoverOnly: string;
lastUsed: number;
path: string;
width: number;
height: number;
};
export type StickerPackType = {
id: string;
key: string;
attemptedStatus: 'downloaded' | 'installed' | 'ephemeral';
author: string;
coverStickerId: number;
createdAt: number;
downloadAttempts: number;
installedAt: number | null;
lastUsed: number;
status: StickerPackStatusType;
stickerCount: number;
stickers: ReadonlyArray<string>;
title: string;
};
export type UnprocessedType = {
id: string;
timestamp: number;
version: number;
attempts: number;
envelope: string;
source?: string;
sourceUuid?: string;
sourceDevice?: string;
serverTimestamp?: number;
decrypted?: string;
};
export type DataInterface = {
close: () => Promise<void>;
@ -84,15 +182,6 @@ export type DataInterface = {
query: string,
options?: { limit?: number }
) => Promise<Array<ConversationType>>;
searchMessages: (
query: string,
options?: { limit?: number }
) => Promise<Array<SearchResultMessageType>>;
searchMessagesInConversation: (
query: string,
conversationId: string,
options?: { limit?: number }
) => Promise<Array<SearchResultMessageType>>;
getMessageCount: (conversationId?: string) => Promise<number>;
saveMessages: (
@ -102,7 +191,7 @@ export type DataInterface = {
getAllMessageIds: () => Promise<Array<string>>;
getMessageMetricsForConversation: (
conversationId: string
) => Promise<ConverationMetricsType>;
) => Promise<ConversationMetricsType>;
hasGroupCallHistoryMessage: (
conversationId: string,
eraId: string
@ -117,13 +206,15 @@ export type DataInterface = {
saveUnprocessed: (
data: UnprocessedType,
options?: { forceSave?: boolean }
) => Promise<number>;
) => Promise<string>;
updateUnprocessedAttempts: (id: string, attempts: number) => Promise<void>;
updateUnprocessedWithData: (
id: string,
data: UnprocessedType
) => Promise<void>;
updateUnprocessedsWithData: (array: Array<UnprocessedType>) => Promise<void>;
updateUnprocessedsWithData: (
array: Array<{ id: string; data: UnprocessedType }>
) => Promise<void>;
getUnprocessedById: (id: string) => Promise<UnprocessedType | undefined>;
saveUnprocesseds: (
arrayOfUnprocessed: Array<UnprocessedType>,
@ -203,7 +294,7 @@ export type ServerInterface = DataInterface & {
getAllConversations: () => Promise<Array<ConversationType>>;
getAllGroupsInvolvingId: (id: string) => Promise<Array<ConversationType>>;
getAllPrivateConversations: () => Promise<Array<ConversationType>>;
getConversationById: (id: string) => Promise<ConversationType | null>;
getConversationById: (id: string) => Promise<ConversationType | undefined>;
getExpiredMessages: () => Promise<Array<MessageType>>;
getMessageById: (id: string) => Promise<MessageType | undefined>;
getMessageBySender: (options: {
@ -234,8 +325,8 @@ export type ServerInterface = DataInterface & {
conversationId: string;
ourConversationId: string;
}) => Promise<MessageType | undefined>;
getNextExpiringMessage: () => Promise<MessageType>;
getNextTapToViewMessageToAgeOut: () => Promise<MessageType>;
getNextExpiringMessage: () => Promise<MessageType | undefined>;
getNextTapToViewMessageToAgeOut: () => Promise<MessageType | undefined>;
getOutgoingWithoutExpiresAt: () => Promise<Array<MessageType>>;
getTapToViewMessagesNeedingErase: () => Promise<Array<MessageType>>;
getUnreadByConversation: (
@ -244,6 +335,15 @@ export type ServerInterface = DataInterface & {
removeConversation: (id: Array<string> | string) => Promise<void>;
removeMessage: (id: string) => Promise<void>;
removeMessages: (ids: Array<string>) => Promise<void>;
searchMessages: (
query: string,
options?: { limit?: number }
) => Promise<Array<SearchResultMessageType>>;
searchMessagesInConversation: (
query: string,
conversationId: string,
options?: { limit?: number }
) => Promise<Array<SearchResultMessageType>>;
saveMessage: (
data: MessageType,
options: { forceSave?: boolean }
@ -255,11 +355,7 @@ export type ServerInterface = DataInterface & {
// Server-only
initialize: (options: {
configDir: string;
key: string;
messages: LocaleMessagesType;
}) => Promise<void>;
initialize: (options: { configDir: string; key: string }) => Promise<void>;
initializeRenderer: (options: {
configDir: string;
@ -298,7 +394,7 @@ export type ClientInterface = DataInterface & {
getMessageById: (
id: string,
options: { Message: typeof MessageModel }
) => Promise<MessageType | undefined>;
) => Promise<MessageModel | undefined>;
getMessageBySender: (
data: {
source: string;
@ -373,6 +469,15 @@ export type ClientInterface = DataInterface & {
data: MessageType,
options: { forceSave?: boolean; Message: typeof MessageModel }
) => Promise<string>;
searchMessages: (
query: string,
options?: { limit?: number }
) => Promise<Array<ClientSearchResultMessageType>>;
searchMessagesInConversation: (
query: string,
conversationId: string,
options?: { limit?: number }
) => Promise<Array<ClientSearchResultMessageType>>;
updateConversation: (data: ConversationType, extra?: unknown) => void;
// Test-only

View file

@ -1,141 +0,0 @@
// Copyright 2018-2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import Queue from 'p-queue';
import { ServerInterface } from './Interface';
let allQueriesDone: () => void | undefined;
let sqlQueries = 0;
let singleQueue: Queue | null = null;
let multipleQueue: Queue | null = null;
// Note: we don't want queue timeouts, because delays here are due to in-progress sql
// operations. For example we might try to start a transaction when the previous isn't
// done, causing that database operation to fail.
function makeNewSingleQueue(): Queue {
singleQueue = new Queue({ concurrency: 1 });
return singleQueue;
}
function makeNewMultipleQueue(): Queue {
multipleQueue = new Queue({ concurrency: 10 });
return multipleQueue;
}
const DEBUG = false;
function makeSQLJob(
fn: ServerInterface[keyof ServerInterface],
args: Array<unknown>,
callName: keyof ServerInterface
) {
if (DEBUG) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) queued`);
}
return async () => {
sqlQueries += 1;
const start = Date.now();
if (DEBUG) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) started`);
}
let result;
try {
// Ignoring this error TS2556: Expected 3 arguments, but got 0 or more.
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
result = await fn(...args);
} finally {
sqlQueries -= 1;
if (allQueriesDone && sqlQueries <= 0) {
allQueriesDone();
}
}
const end = Date.now();
const delta = end - start;
if (DEBUG || delta > 10) {
// eslint-disable-next-line no-console
console.log(`SQL(${callName}) succeeded in ${end - start}ms`);
}
return result;
};
}
async function handleCall(
fn: ServerInterface[keyof ServerInterface],
args: Array<unknown>,
callName: keyof ServerInterface
) {
if (!fn) {
throw new Error(`sql channel: ${callName} is not an available function`);
}
let result;
// We queue here to keep multi-query operations atomic. Without it, any multistage
// data operation (even within a BEGIN/COMMIT) can become interleaved, since all
// requests share one database connection.
// A needsSerial method must be run in our single concurrency queue.
if (fn.needsSerial) {
if (singleQueue) {
result = await singleQueue.add(makeSQLJob(fn, args, callName));
} else if (multipleQueue) {
const queue = makeNewSingleQueue();
const multipleQueueLocal = multipleQueue;
queue.add(() => multipleQueueLocal.onIdle());
multipleQueue = null;
result = await queue.add(makeSQLJob(fn, args, callName));
} else {
const queue = makeNewSingleQueue();
result = await queue.add(makeSQLJob(fn, args, callName));
}
} else {
// The request can be parallelized. To keep the same structure as the above block
// we force this section into the 'lonely if' pattern.
// eslint-disable-next-line no-lonely-if
if (multipleQueue) {
result = await multipleQueue.add(makeSQLJob(fn, args, callName));
} else if (singleQueue) {
const queue = makeNewMultipleQueue();
queue.pause();
const singleQueueRef = singleQueue;
singleQueue = null;
const promise = queue.add(makeSQLJob(fn, args, callName));
if (singleQueueRef) {
await singleQueueRef.onIdle();
}
queue.start();
result = await promise;
} else {
const queue = makeNewMultipleQueue();
result = await queue.add(makeSQLJob(fn, args, callName));
}
}
return result;
}
export async function waitForPendingQueries(): Promise<void> {
return new Promise<void>(resolve => {
if (sqlQueries === 0) {
resolve();
} else {
allQueriesDone = () => resolve();
}
});
}
export function applyQueueing(dataInterface: ServerInterface): ServerInterface {
return Object.keys(dataInterface).reduce((acc, callName) => {
const serverInterfaceKey = callName as keyof ServerInterface;
acc[serverInterfaceKey] = async (...args: Array<unknown>) =>
handleCall(dataInterface[serverInterfaceKey], args, serverInterfaceKey);
return acc;
}, {} as ServerInterface);
}

File diff suppressed because it is too large Load diff

113
ts/sql/main.ts Normal file
View file

@ -0,0 +1,113 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { join } from 'path';
import { Worker } from 'worker_threads';
export type InitializeOptions = {
readonly configDir: string;
readonly key: string;
};
export type WorkerRequest =
| {
readonly type: 'init';
readonly options: InitializeOptions;
}
| {
readonly type: 'close';
}
| {
readonly type: 'sqlCall';
readonly method: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly args: ReadonlyArray<any>;
};
export type WrappedWorkerRequest = {
readonly seq: number;
readonly request: WorkerRequest;
};
export type WrappedWorkerResponse = {
readonly seq: number;
readonly error: string | undefined;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
readonly response: any;
};
type PromisePair<T> = {
resolve: (response: T) => void;
reject: (error: Error) => void;
};
export class MainSQL {
private readonly worker: Worker;
private readonly onExit: Promise<void>;
private seq = 0;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private onResponse = new Map<number, PromisePair<any>>();
constructor() {
const appDir = join(__dirname, '..', '..').replace(
/app\.asar$/,
'app.asar.unpacked'
);
this.worker = new Worker(join(appDir, 'ts', 'sql', 'mainWorker.js'));
this.worker.on('message', (wrappedResponse: WrappedWorkerResponse) => {
const { seq, error, response } = wrappedResponse;
const pair = this.onResponse.get(seq);
this.onResponse.delete(seq);
if (!pair) {
throw new Error(`Unexpected worker response with seq: ${seq}`);
}
if (error) {
pair.reject(new Error(error));
} else {
pair.resolve(response);
}
});
this.onExit = new Promise<void>(resolve => {
this.worker.once('exit', resolve);
});
}
public async initialize(options: InitializeOptions): Promise<void> {
return this.send({ type: 'init', options });
}
public async close(): Promise<void> {
await this.send({ type: 'close' });
await this.onExit;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public async sqlCall(method: string, args: ReadonlyArray<any>): Promise<any> {
return this.send({ type: 'sqlCall', method, args });
}
private async send<Response>(request: WorkerRequest): Promise<Response> {
const { seq } = this;
this.seq += 1;
const result = new Promise<Response>((resolve, reject) => {
this.onResponse.set(seq, { resolve, reject });
});
const wrappedRequest: WrappedWorkerRequest = {
seq,
request,
};
this.worker.postMessage(wrappedRequest);
return result;
}
}

56
ts/sql/mainWorker.ts Normal file
View file

@ -0,0 +1,56 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { parentPort } from 'worker_threads';
import { WrappedWorkerRequest, WrappedWorkerResponse } from './main';
import db from './Server';
if (!parentPort) {
throw new Error('Must run as a worker thread');
}
const port = parentPort;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function respond(seq: number, error: Error | undefined, response?: any) {
const wrappedResponse: WrappedWorkerResponse = {
seq,
error: error ? error.stack : undefined,
response,
};
port.postMessage(wrappedResponse);
}
port.on('message', async ({ seq, request }: WrappedWorkerRequest) => {
try {
if (request.type === 'init') {
await db.initialize(request.options);
respond(seq, undefined, undefined);
return;
}
if (request.type === 'close') {
await db.close();
respond(seq, undefined, undefined);
process.exit(0);
return;
}
if (request.type === 'sqlCall') {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const method = (db as any)[request.method];
if (typeof method !== 'function') {
throw new Error(`Invalid sql method: ${method}`);
}
respond(seq, undefined, await method.apply(db, request.args));
} else {
throw new Error('Unexpected request type');
}
} catch (error) {
respond(seq, error, undefined);
}
});