Moves SQL to full IPC
This commit is contained in:
parent
34baa0fa2f
commit
be60b3d225
11 changed files with 110 additions and 334 deletions
259
ts/sql/Client.ts
259
ts/sql/Client.ts
|
@ -2,29 +2,16 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { ipcRenderer as ipc } from 'electron';
|
||||
import fs from 'fs-extra';
|
||||
import pify from 'pify';
|
||||
import PQueue from 'p-queue';
|
||||
|
||||
import {
|
||||
compact,
|
||||
fromPairs,
|
||||
groupBy,
|
||||
isFunction,
|
||||
isTypedArray,
|
||||
last,
|
||||
map,
|
||||
omit,
|
||||
toPairs,
|
||||
} from 'lodash';
|
||||
import { has, get, groupBy, isTypedArray, last, map, omit } from 'lodash';
|
||||
|
||||
import { deleteExternalFiles } from '../types/Conversation';
|
||||
import { expiringMessagesDeletionService } from '../services/expiringMessagesDeletion';
|
||||
import { tapToViewMessagesDeletionService } from '../services/tapToViewMessagesDeletionService';
|
||||
import * as Bytes from '../Bytes';
|
||||
import { createBatcher } from '../util/batcher';
|
||||
import { explodePromise } from '../util/explodePromise';
|
||||
import { assertDev, softAssert, strictAssert } from '../util/assert';
|
||||
import { assertDev, softAssert } from '../util/assert';
|
||||
import { mapObjectWithSpec } from '../util/mapObjectWithSpec';
|
||||
import type { ObjectMappingSpecType } from '../util/mapObjectWithSpec';
|
||||
import { cleanDataForIpc } from './cleanDataForIpc';
|
||||
|
@ -38,6 +25,7 @@ import type { StoredJob } from '../jobs/types';
|
|||
import { formatJobForInsert } from '../jobs/formatJobForInsert';
|
||||
import { cleanupMessage } from '../util/cleanup';
|
||||
import { drop } from '../util/drop';
|
||||
import { ipcInvoke, doShutdown } from './channels';
|
||||
|
||||
import type {
|
||||
AdjacentMessagesByConversationOptionsType,
|
||||
|
@ -65,18 +53,11 @@ import type {
|
|||
SignedPreKeyType,
|
||||
StoredSignedPreKeyType,
|
||||
} from './Interface';
|
||||
import Server from './Server';
|
||||
import { parseSqliteError, SqliteErrorKind } from './errors';
|
||||
import { MINUTE } from '../util/durations';
|
||||
import { getMessageIdForLogging } from '../util/idForLogging';
|
||||
import type { MessageAttributesType } from '../model-types';
|
||||
import { incrementMessageCounter } from '../util/incrementMessageCounter';
|
||||
|
||||
const getRealPath = pify(fs.realpath);
|
||||
|
||||
const MIN_TRACE_DURATION = 10;
|
||||
|
||||
const SQL_CHANNEL_KEY = 'sql-channel';
|
||||
const ERASE_SQL_KEY = 'erase-sql-key';
|
||||
const ERASE_ATTACHMENTS_KEY = 'erase-attachments';
|
||||
const ERASE_STICKERS_KEY = 'erase-stickers';
|
||||
|
@ -85,92 +66,6 @@ const ERASE_DRAFTS_KEY = 'erase-drafts';
|
|||
const CLEANUP_ORPHANED_ATTACHMENTS_KEY = 'cleanup-orphaned-attachments';
|
||||
const ENSURE_FILE_PERMISSIONS = 'ensure-file-permissions';
|
||||
|
||||
enum RendererState {
|
||||
InMain = 'InMain',
|
||||
Opening = 'Opening',
|
||||
InRenderer = 'InRenderer',
|
||||
Closing = 'Closing',
|
||||
}
|
||||
|
||||
let activeJobCount = 0;
|
||||
let resolveShutdown: (() => void) | undefined;
|
||||
let shutdownPromise: Promise<void> | null = null;
|
||||
|
||||
let state = RendererState.InMain;
|
||||
const startupQueries = new Map<string, number>();
|
||||
|
||||
async function startInRendererProcess(isTesting = false): Promise<void> {
|
||||
strictAssert(
|
||||
state === RendererState.InMain,
|
||||
`startInRendererProcess: expected ${state} to be ${RendererState.InMain}`
|
||||
);
|
||||
|
||||
log.info('data.startInRendererProcess: switching to renderer process');
|
||||
state = RendererState.Opening;
|
||||
|
||||
if (!isTesting) {
|
||||
await ipc.invoke('database-ready');
|
||||
}
|
||||
|
||||
const configDir = await getRealPath(ipc.sendSync('get-user-data-path'));
|
||||
const key = ipc.sendSync('user-config-key');
|
||||
|
||||
await Server.initializeRenderer({ configDir, key });
|
||||
|
||||
log.info('data.startInRendererProcess: switched to renderer process');
|
||||
|
||||
state = RendererState.InRenderer;
|
||||
}
|
||||
|
||||
async function goBackToMainProcess(): Promise<void> {
|
||||
if (state === RendererState.InMain) {
|
||||
log.info('goBackToMainProcess: Already in the main process');
|
||||
return;
|
||||
}
|
||||
|
||||
strictAssert(
|
||||
state === RendererState.InRenderer,
|
||||
`goBackToMainProcess: expected ${state} to be ${RendererState.InRenderer}`
|
||||
);
|
||||
|
||||
// We don't need to wait for pending queries since they are synchronous.
|
||||
log.info('data.goBackToMainProcess: switching to main process');
|
||||
const closePromise = channels.close();
|
||||
|
||||
// It should be the last query we run in renderer process
|
||||
state = RendererState.Closing;
|
||||
await closePromise;
|
||||
state = RendererState.InMain;
|
||||
|
||||
// Print query statistics for whole startup
|
||||
const entries = Array.from(startupQueries.entries());
|
||||
startupQueries.clear();
|
||||
|
||||
// Sort by decreasing duration
|
||||
entries
|
||||
.sort((a, b) => b[1] - a[1])
|
||||
.filter(([_, duration]) => duration > MIN_TRACE_DURATION)
|
||||
.forEach(([query, duration]) => {
|
||||
log.info(`startup query: ${query} ${duration}ms`);
|
||||
});
|
||||
|
||||
log.info('data.goBackToMainProcess: switched to main process');
|
||||
}
|
||||
|
||||
const channelsAsUnknown = fromPairs(
|
||||
compact(
|
||||
map(toPairs(Server), ([name, value]: [string, unknown]) => {
|
||||
if (isFunction(value)) {
|
||||
return [name, makeChannel(name)];
|
||||
}
|
||||
|
||||
return null;
|
||||
})
|
||||
)
|
||||
) as unknown;
|
||||
|
||||
const channels: ServerInterface = channelsAsUnknown as ServerInterface;
|
||||
|
||||
const exclusiveInterface: ClientExclusiveInterface = {
|
||||
createOrUpdateIdentityKey,
|
||||
getIdentityKeyById,
|
||||
|
@ -209,30 +104,53 @@ const exclusiveInterface: ClientExclusiveInterface = {
|
|||
removeOtherData,
|
||||
cleanupOrphanedAttachments,
|
||||
ensureFilePermissions,
|
||||
|
||||
// Client-side only, and test-only
|
||||
|
||||
startInRendererProcess,
|
||||
goBackToMainProcess,
|
||||
};
|
||||
|
||||
// Because we can't force this module to conform to an interface, we narrow our exports
|
||||
// to this one default export, which does conform to the interface.
|
||||
// Note: In Javascript, you need to access the .default property when requiring it
|
||||
// https://github.com/microsoft/TypeScript/issues/420
|
||||
const dataInterface: ClientInterface = {
|
||||
...channels,
|
||||
...exclusiveInterface,
|
||||
type ClientOverridesType = ClientExclusiveInterface &
|
||||
Pick<
|
||||
ServerInterface,
|
||||
| 'removeMessage'
|
||||
| 'removeMessages'
|
||||
| 'saveAttachmentDownloadJob'
|
||||
| 'saveMessage'
|
||||
| 'saveMessages'
|
||||
| 'updateConversations'
|
||||
>;
|
||||
|
||||
// Overrides
|
||||
updateConversations,
|
||||
saveMessage,
|
||||
saveMessages,
|
||||
const channels: ServerInterface = new Proxy({} as ServerInterface, {
|
||||
get(_target, name) {
|
||||
return async (...args: ReadonlyArray<unknown>) =>
|
||||
ipcInvoke(String(name), args);
|
||||
},
|
||||
});
|
||||
|
||||
const clientExclusiveOverrides: ClientOverridesType = {
|
||||
...exclusiveInterface,
|
||||
removeMessage,
|
||||
removeMessages,
|
||||
saveAttachmentDownloadJob,
|
||||
saveMessage,
|
||||
saveMessages,
|
||||
updateConversations,
|
||||
};
|
||||
|
||||
const dataInterface: ClientInterface = new Proxy(
|
||||
{
|
||||
...clientExclusiveOverrides,
|
||||
} as ClientInterface,
|
||||
{
|
||||
get(target, name) {
|
||||
return async (...args: ReadonlyArray<unknown>) => {
|
||||
if (has(target, name)) {
|
||||
return get(target, name)(...args);
|
||||
}
|
||||
|
||||
return get(channels, name)(...args);
|
||||
};
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
export default dataInterface;
|
||||
|
||||
function _cleanData(
|
||||
|
@ -272,99 +190,6 @@ export function _cleanMessageData(data: MessageType): MessageType {
|
|||
return _cleanData(omit(result, ['dataMessage']));
|
||||
}
|
||||
|
||||
async function doShutdown() {
|
||||
log.info(
|
||||
`data.shutdown: shutdown requested. ${activeJobCount} jobs outstanding`
|
||||
);
|
||||
|
||||
if (shutdownPromise) {
|
||||
return shutdownPromise;
|
||||
}
|
||||
|
||||
// No outstanding jobs, return immediately
|
||||
if (activeJobCount === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
({ promise: shutdownPromise, resolve: resolveShutdown } =
|
||||
explodePromise<void>());
|
||||
|
||||
try {
|
||||
await shutdownPromise;
|
||||
} finally {
|
||||
log.info('data.shutdown: process complete');
|
||||
}
|
||||
}
|
||||
|
||||
function makeChannel(fnName: string) {
|
||||
return async (...args: ReadonlyArray<unknown>) => {
|
||||
// During startup we want to avoid the high overhead of IPC so we utilize
|
||||
// the db that exists in the renderer process to be able to boot up quickly
|
||||
// once the app is running we switch back to the main process to avoid the
|
||||
// UI from locking up whenever we do costly db operations.
|
||||
if (state === RendererState.InRenderer) {
|
||||
const serverFnName = fnName as keyof ServerInterface;
|
||||
const serverFn = Server[serverFnName] as (
|
||||
...fnArgs: ReadonlyArray<unknown>
|
||||
) => unknown;
|
||||
const start = Date.now();
|
||||
|
||||
try {
|
||||
// Ignoring this error TS2556: Expected 3 arguments, but got 0 or more.
|
||||
return await serverFn(...args);
|
||||
} catch (error) {
|
||||
const sqliteErrorKind = parseSqliteError(error);
|
||||
if (sqliteErrorKind === SqliteErrorKind.Corrupted) {
|
||||
log.error(
|
||||
'Detected sql corruption in renderer process. ' +
|
||||
`Restarting the application immediately. Error: ${error.message}`
|
||||
);
|
||||
ipc?.send('database-error', error.stack);
|
||||
} else if (sqliteErrorKind === SqliteErrorKind.Readonly) {
|
||||
log.error(`Detected readonly sql database: ${error.message}`);
|
||||
ipc?.send('database-readonly');
|
||||
}
|
||||
|
||||
log.error(
|
||||
`Renderer SQL channel job (${fnName}) error ${error.message}`
|
||||
);
|
||||
throw error;
|
||||
} finally {
|
||||
const duration = Date.now() - start;
|
||||
|
||||
startupQueries.set(
|
||||
serverFnName,
|
||||
(startupQueries.get(serverFnName) || 0) + duration
|
||||
);
|
||||
|
||||
if (duration > MIN_TRACE_DURATION) {
|
||||
log.info(
|
||||
`Renderer SQL channel job (${fnName}) completed in ${duration}ms`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (shutdownPromise && fnName !== 'close') {
|
||||
throw new Error(
|
||||
`Rejecting SQL channel job (${fnName}); application is shutting down`
|
||||
);
|
||||
}
|
||||
|
||||
activeJobCount += 1;
|
||||
return createTaskWithTimeout(async () => {
|
||||
try {
|
||||
return await ipc.invoke(SQL_CHANNEL_KEY, fnName, ...args);
|
||||
} finally {
|
||||
activeJobCount -= 1;
|
||||
if (activeJobCount === 0) {
|
||||
resolveShutdown?.();
|
||||
}
|
||||
}
|
||||
}, `SQL channel call (${fnName})`)();
|
||||
};
|
||||
}
|
||||
|
||||
function specToBytes<Input, Output>(
|
||||
spec: ObjectMappingSpecType,
|
||||
data: Input
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue