Parallelize SQL queries

This commit is contained in:
Fedor Indutny 2024-07-22 11:16:33 -07:00 committed by GitHub
parent 86b4da1ec2
commit c64762858e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
178 changed files with 3377 additions and 3618 deletions

View file

@ -3,7 +3,7 @@
import { ipcRenderer as ipc } from 'electron';
import { has, get, groupBy, isTypedArray, last, map, omit } from 'lodash';
import { groupBy, isTypedArray, last, map, omit } from 'lodash';
import { deleteExternalFiles } from '../types/Conversation';
import { update as updateExpiringMessagesService } from '../services/expiringMessagesDeletion';
@ -23,13 +23,16 @@ import * as Errors from '../types/errors';
import type { StoredJob } from '../jobs/types';
import { formatJobForInsert } from '../jobs/formatJobForInsert';
import { cleanupMessages } from '../util/cleanup';
import { ipcInvoke, doShutdown } from './channels';
import { AccessType, ipcInvoke, doShutdown } from './channels';
import type {
ClientInterfaceWrap,
AdjacentMessagesByConversationOptionsType,
AllItemsType,
ClientInterface,
ClientExclusiveInterface,
ServerReadableDirectInterface,
ServerWritableDirectInterface,
ClientReadableInterface,
ClientWritableInterface,
ClientSearchResultMessageType,
ConversationType,
GetConversationRangeCenteredOnMessageResultType,
@ -45,13 +48,14 @@ import type {
PreKeyIdType,
PreKeyType,
StoredPreKeyType,
ServerInterface,
ServerSearchResultMessageType,
SignedPreKeyIdType,
SignedPreKeyType,
StoredSignedPreKeyType,
KyberPreKeyType,
StoredKyberPreKeyType,
ClientOnlyReadableInterface,
ClientOnlyWritableInterface,
} from './Interface';
import { getMessageIdForLogging } from '../util/idForLogging';
import type { MessageAttributesType } from '../model-types';
@ -67,44 +71,54 @@ const ERASE_TEMP_KEY = 'erase-temp';
const ERASE_DRAFTS_KEY = 'erase-drafts';
const CLEANUP_ORPHANED_ATTACHMENTS_KEY = 'cleanup-orphaned-attachments';
const ENSURE_FILE_PERMISSIONS = 'ensure-file-permissions';
const PAUSE_WRITE_ACCESS = 'pause-sql-writes';
const RESUME_WRITE_ACCESS = 'resume-sql-writes';
const exclusiveInterface: ClientExclusiveInterface = {
createOrUpdateIdentityKey,
const clientOnlyReadable: ClientOnlyReadableInterface = {
getIdentityKeyById,
bulkAddIdentityKeys,
getAllIdentityKeys,
createOrUpdateKyberPreKey,
getKyberPreKeyById,
bulkAddKyberPreKeys,
getAllKyberPreKeys,
createOrUpdatePreKey,
getPreKeyById,
bulkAddPreKeys,
getAllPreKeys,
createOrUpdateSignedPreKey,
getSignedPreKeyById,
bulkAddSignedPreKeys,
getAllSignedPreKeys,
createOrUpdateItem,
getItemById,
getAllItems,
searchMessages,
getRecentStoryReplies,
getOlderMessagesByConversation,
getNewerMessagesByConversation,
getConversationRangeCenteredOnMessage,
};
const clientOnlyWritable: ClientOnlyWritableInterface = {
createOrUpdateIdentityKey,
bulkAddIdentityKeys,
createOrUpdateKyberPreKey,
bulkAddKyberPreKeys,
createOrUpdatePreKey,
bulkAddPreKeys,
createOrUpdateSignedPreKey,
bulkAddSignedPreKeys,
createOrUpdateItem,
updateConversation,
removeConversation,
searchMessages,
removeMessage,
removeMessages,
getRecentStoryReplies,
getOlderMessagesByConversation,
getConversationRangeCenteredOnMessage,
getNewerMessagesByConversation,
// Client-side only
flushUpdateConversationBatcher,
@ -117,48 +131,82 @@ const exclusiveInterface: ClientExclusiveInterface = {
ensureFilePermissions,
};
type ClientOverridesType = ClientExclusiveInterface &
type ClientOverridesType = ClientOnlyWritableInterface &
Pick<
ServerInterface,
ClientInterfaceWrap<ServerWritableDirectInterface>,
| 'saveAttachmentDownloadJob'
| 'saveMessage'
| 'saveMessages'
| 'updateConversations'
>;
const channels: ServerInterface = new Proxy({} as ServerInterface, {
get(_target, name) {
return async (...args: ReadonlyArray<unknown>) =>
ipcInvoke(String(name), args);
},
});
const clientExclusiveOverrides: ClientOverridesType = {
...exclusiveInterface,
const clientOnlyWritableOverrides: ClientOverridesType = {
...clientOnlyWritable,
saveAttachmentDownloadJob,
saveMessage,
saveMessages,
updateConversations,
};
const dataInterface: ClientInterface = new Proxy(
type ReadableChannelInterface =
ClientInterfaceWrap<ServerReadableDirectInterface>;
const readableChannel: ReadableChannelInterface = new Proxy(
{} as ReadableChannelInterface,
{
...clientExclusiveOverrides,
} as ClientInterface,
get(_target, name) {
return async (...args: ReadonlyArray<unknown>) =>
ipcInvoke(AccessType.Read, String(name), args);
},
}
);
type WritableChannelInterface =
ClientInterfaceWrap<ServerWritableDirectInterface>;
const writableChannel: WritableChannelInterface = new Proxy(
{} as WritableChannelInterface,
{
get(_target, name) {
return async (...args: ReadonlyArray<unknown>) =>
ipcInvoke(AccessType.Write, String(name), args);
},
}
);
export const DataReader: ClientReadableInterface = new Proxy(
{
...clientOnlyReadable,
} as ClientReadableInterface,
{
get(target, name) {
return async (...args: ReadonlyArray<unknown>) => {
if (has(target, name)) {
return get(target, name)(...args);
if (Reflect.has(target, name)) {
return Reflect.get(target, name)(...args);
}
return get(channels, name)(...args);
return Reflect.get(readableChannel, name)(...args);
};
},
}
);
export default dataInterface;
export const DataWriter: ClientWritableInterface = new Proxy(
{
...clientOnlyWritableOverrides,
} as ClientWritableInterface,
{
get(target, name) {
return async (...args: ReadonlyArray<unknown>) => {
if (Reflect.has(target, name)) {
return Reflect.get(target, name)(...args);
}
return Reflect.get(writableChannel, name)(...args);
};
},
}
);
function _cleanData(
data: unknown
@ -246,9 +294,6 @@ async function shutdown(): Promise<void> {
// Stop accepting new SQL jobs, flush outstanding queue
await doShutdown();
// Close database
await channels.close();
}
// Identity Keys
@ -256,12 +301,12 @@ async function shutdown(): Promise<void> {
const IDENTITY_KEY_SPEC = ['publicKey'];
async function createOrUpdateIdentityKey(data: IdentityKeyType): Promise<void> {
const updated: StoredIdentityKeyType = specFromBytes(IDENTITY_KEY_SPEC, data);
await channels.createOrUpdateIdentityKey(updated);
await writableChannel.createOrUpdateIdentityKey(updated);
}
async function getIdentityKeyById(
id: IdentityKeyIdType
): Promise<IdentityKeyType | undefined> {
const data = await channels.getIdentityKeyById(id);
const data = await readableChannel.getIdentityKeyById(id);
return specToBytes(IDENTITY_KEY_SPEC, data);
}
@ -271,10 +316,10 @@ async function bulkAddIdentityKeys(
const updated: Array<StoredIdentityKeyType> = map(array, data =>
specFromBytes(IDENTITY_KEY_SPEC, data)
);
await channels.bulkAddIdentityKeys(updated);
await writableChannel.bulkAddIdentityKeys(updated);
}
async function getAllIdentityKeys(): Promise<Array<IdentityKeyType>> {
const keys = await channels.getAllIdentityKeys();
const keys = await readableChannel.getAllIdentityKeys();
return keys.map(key => specToBytes(IDENTITY_KEY_SPEC, key));
}
@ -287,12 +332,12 @@ async function createOrUpdateKyberPreKey(data: KyberPreKeyType): Promise<void> {
KYBER_PRE_KEY_SPEC,
data
);
await channels.createOrUpdateKyberPreKey(updated);
await writableChannel.createOrUpdateKyberPreKey(updated);
}
async function getKyberPreKeyById(
id: PreKeyIdType
): Promise<KyberPreKeyType | undefined> {
const data = await channels.getPreKeyById(id);
const data = await readableChannel.getPreKeyById(id);
return specToBytes(KYBER_PRE_KEY_SPEC, data);
}
@ -302,10 +347,10 @@ async function bulkAddKyberPreKeys(
const updated: Array<StoredKyberPreKeyType> = map(array, data =>
specFromBytes(KYBER_PRE_KEY_SPEC, data)
);
await channels.bulkAddKyberPreKeys(updated);
await writableChannel.bulkAddKyberPreKeys(updated);
}
async function getAllKyberPreKeys(): Promise<Array<KyberPreKeyType>> {
const keys = await channels.getAllKyberPreKeys();
const keys = await readableChannel.getAllKyberPreKeys();
return keys.map(key => specToBytes(KYBER_PRE_KEY_SPEC, key));
}
@ -314,12 +359,12 @@ async function getAllKyberPreKeys(): Promise<Array<KyberPreKeyType>> {
async function createOrUpdatePreKey(data: PreKeyType): Promise<void> {
const updated: StoredPreKeyType = specFromBytes(PRE_KEY_SPEC, data);
await channels.createOrUpdatePreKey(updated);
await writableChannel.createOrUpdatePreKey(updated);
}
async function getPreKeyById(
id: PreKeyIdType
): Promise<PreKeyType | undefined> {
const data = await channels.getPreKeyById(id);
const data = await readableChannel.getPreKeyById(id);
return specToBytes(PRE_KEY_SPEC, data);
}
@ -327,10 +372,10 @@ async function bulkAddPreKeys(array: Array<PreKeyType>): Promise<void> {
const updated: Array<StoredPreKeyType> = map(array, data =>
specFromBytes(PRE_KEY_SPEC, data)
);
await channels.bulkAddPreKeys(updated);
await writableChannel.bulkAddPreKeys(updated);
}
async function getAllPreKeys(): Promise<Array<PreKeyType>> {
const keys = await channels.getAllPreKeys();
const keys = await readableChannel.getAllPreKeys();
return keys.map(key => specToBytes(PRE_KEY_SPEC, key));
}
@ -342,17 +387,17 @@ async function createOrUpdateSignedPreKey(
data: SignedPreKeyType
): Promise<void> {
const updated: StoredSignedPreKeyType = specFromBytes(PRE_KEY_SPEC, data);
await channels.createOrUpdateSignedPreKey(updated);
await writableChannel.createOrUpdateSignedPreKey(updated);
}
async function getSignedPreKeyById(
id: SignedPreKeyIdType
): Promise<SignedPreKeyType | undefined> {
const data = await channels.getSignedPreKeyById(id);
const data = await readableChannel.getSignedPreKeyById(id);
return specToBytes(PRE_KEY_SPEC, data);
}
async function getAllSignedPreKeys(): Promise<Array<SignedPreKeyType>> {
const keys = await channels.getAllSignedPreKeys();
const keys = await readableChannel.getAllSignedPreKeys();
return keys.map(key => specToBytes(PRE_KEY_SPEC, key));
}
@ -362,7 +407,7 @@ async function bulkAddSignedPreKeys(
const updated: Array<StoredSignedPreKeyType> = map(array, data =>
specFromBytes(PRE_KEY_SPEC, data)
);
await channels.bulkAddSignedPreKeys(updated);
await writableChannel.bulkAddSignedPreKeys(updated);
}
// Items
@ -398,13 +443,13 @@ async function createOrUpdateItem<K extends ItemKeyType>(
? specFromBytes(spec, data)
: (data as unknown as StoredItemType<K>);
await channels.createOrUpdateItem(updated);
await writableChannel.createOrUpdateItem(updated);
}
async function getItemById<K extends ItemKeyType>(
id: K
): Promise<ItemType<K> | undefined> {
const spec = ITEM_SPECS[id];
const data = await channels.getItemById(id);
const data = await readableChannel.getItemById(id);
try {
return spec ? specToBytes(spec, data) : (data as unknown as ItemType<K>);
@ -414,7 +459,7 @@ async function getItemById<K extends ItemKeyType>(
}
}
async function getAllItems(): Promise<AllItemsType> {
const items = await channels.getAllItems();
const items = await readableChannel.getAllItems();
const result = Object.create(null);
@ -458,7 +503,7 @@ const updateConversationBatcher = createBatcher<ConversationType>({
},
});
function updateConversation(data: ConversationType): void {
async function updateConversation(data: ConversationType): Promise<void> {
updateConversationBatcher.add(data);
}
async function flushUpdateConversationBatcher(): Promise<void> {
@ -473,16 +518,16 @@ async function updateConversations(
!pathsChanged.length,
`Paths were cleaned: ${JSON.stringify(pathsChanged)}`
);
await channels.updateConversations(cleaned);
await writableChannel.updateConversations(cleaned);
}
async function removeConversation(id: string): Promise<void> {
const existing = await channels.getConversationById(id);
const existing = await readableChannel.getConversationById(id);
// Note: It's important to have a fully database-hydrated model to delete here because
// it needs to delete all associated on-disk files along with the database delete.
if (existing) {
await channels.removeConversation(id);
await writableChannel.removeConversation(id);
await deleteExternalFiles(existing, {
deleteAttachmentData: window.Signal.Migrations.deleteAttachmentData,
});
@ -528,7 +573,7 @@ async function searchMessages({
contactServiceIdsMatchingQuery?: Array<ServiceIdString>;
conversationId?: string;
}): Promise<Array<ClientSearchResultMessageType>> {
const messages = await channels.searchMessages({
const messages = await readableChannel.searchMessages({
query,
conversationId,
options,
@ -548,7 +593,7 @@ async function saveMessage(
ourAci: AciString;
}
): Promise<string> {
const id = await channels.saveMessage(_cleanMessageData(data), {
const id = await writableChannel.saveMessage(_cleanMessageData(data), {
...options,
jobToInsert: options.jobToInsert && formatJobForInsert(options.jobToInsert),
});
@ -565,7 +610,7 @@ async function saveMessages(
arrayOfMessages: ReadonlyArray<MessageType>,
options: { forceSave?: boolean; ourAci: AciString }
): Promise<Array<string>> {
const result = await channels.saveMessages(
const result = await writableChannel.saveMessages(
arrayOfMessages.map(message => _cleanMessageData(message)),
options
);
@ -583,15 +628,15 @@ async function removeMessage(
fromSync?: boolean;
}
): Promise<void> {
const message = await channels.getMessageById(id);
const message = await readableChannel.getMessageById(id);
// Note: It's important to have a fully database-hydrated model to delete here because
// it needs to delete all associated on-disk files along with the database delete.
if (message) {
await channels.removeMessage(id);
await writableChannel.removeMessage(id);
await cleanupMessages([message], {
...options,
markCallHistoryDeleted: dataInterface.markCallHistoryDeleted,
markCallHistoryDeleted: DataWriter.markCallHistoryDeleted,
});
}
}
@ -607,12 +652,12 @@ export async function deleteAndCleanup(
const ids = messages.map(message => message.id);
log.info(`deleteAndCleanup/${logId}: Deleting ${ids.length} messages...`);
await channels.removeMessages(ids);
await writableChannel.removeMessages(ids);
log.info(`deleteAndCleanup/${logId}: Cleanup for ${ids.length} messages...`);
await cleanupMessages(messages, {
...options,
markCallHistoryDeleted: dataInterface.markCallHistoryDeleted,
markCallHistoryDeleted: DataWriter.markCallHistoryDeleted,
});
log.info(`deleteAndCleanup/${logId}: Complete`);
@ -625,12 +670,12 @@ async function removeMessages(
singleProtoJobQueue: SingleProtoJobQueue;
}
): Promise<void> {
const messages = await channels.getMessagesById(messageIds);
const messages = await readableChannel.getMessagesById(messageIds);
await cleanupMessages(messages, {
...options,
markCallHistoryDeleted: dataInterface.markCallHistoryDeleted,
markCallHistoryDeleted: DataWriter.markCallHistoryDeleted,
});
await channels.removeMessages(messageIds);
await writableChannel.removeMessages(messageIds);
}
function handleMessageJSON(
@ -642,7 +687,9 @@ function handleMessageJSON(
async function getNewerMessagesByConversation(
options: AdjacentMessagesByConversationOptionsType
): Promise<Array<MessageType>> {
const messages = await channels.getNewerMessagesByConversation(options);
const messages = await readableChannel.getNewerMessagesByConversation(
options
);
return handleMessageJSON(messages);
}
@ -651,7 +698,10 @@ async function getRecentStoryReplies(
storyId: string,
options?: GetRecentStoryRepliesOptionsType
): Promise<Array<MessageType>> {
const messages = await channels.getRecentStoryReplies(storyId, options);
const messages = await readableChannel.getRecentStoryReplies(
storyId,
options
);
return handleMessageJSON(messages);
}
@ -659,7 +709,9 @@ async function getRecentStoryReplies(
async function getOlderMessagesByConversation(
options: AdjacentMessagesByConversationOptionsType
): Promise<Array<MessageType>> {
const messages = await channels.getOlderMessagesByConversation(options);
const messages = await readableChannel.getOlderMessagesByConversation(
options
);
return handleMessageJSON(messages);
}
@ -667,7 +719,9 @@ async function getOlderMessagesByConversation(
async function getConversationRangeCenteredOnMessage(
options: AdjacentMessagesByConversationOptionsType
): Promise<GetConversationRangeCenteredOnMessageResultType<MessageType>> {
const result = await channels.getConversationRangeCenteredOnMessage(options);
const result = await readableChannel.getConversationRangeCenteredOnMessage(
options
);
return {
...result,
@ -721,7 +775,7 @@ async function removeMessagesInConversation(
async function saveAttachmentDownloadJob(
job: AttachmentDownloadJobType
): Promise<void> {
await channels.saveAttachmentDownloadJob(_cleanData(job));
await writableChannel.saveAttachmentDownloadJob(_cleanData(job));
}
// Other
@ -758,3 +812,11 @@ async function invokeWithTimeout(name: string): Promise<void> {
`callChannel call to ${name}`
)();
}
export function pauseWriteAccess(): Promise<void> {
return invokeWithTimeout(PAUSE_WRITE_ACCESS);
}
export function resumeWriteAccess(): Promise<void> {
return invokeWithTimeout(RESUME_WRITE_ACCESS);
}