// Copyright 2020 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { ipcRenderer as ipc } from 'electron'; import PQueue from 'p-queue'; import { has, get, groupBy, isTypedArray, last, map, omit } from 'lodash'; import { deleteExternalFiles } from '../types/Conversation'; import { expiringMessagesDeletionService } from '../services/expiringMessagesDeletion'; import { tapToViewMessagesDeletionService } from '../services/tapToViewMessagesDeletionService'; import * as Bytes from '../Bytes'; import { createBatcher } from '../util/batcher'; import { assertDev, softAssert } from '../util/assert'; import { mapObjectWithSpec } from '../util/mapObjectWithSpec'; import type { ObjectMappingSpecType } from '../util/mapObjectWithSpec'; import { cleanDataForIpc } from './cleanDataForIpc'; import type { UUIDStringType } from '../types/UUID'; import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; import * as log from '../logging/log'; import { isValidUuid } from '../types/UUID'; import * as Errors from '../types/errors'; import type { StoredJob } from '../jobs/types'; import { formatJobForInsert } from '../jobs/formatJobForInsert'; import { cleanupMessage } from '../util/cleanup'; import { drop } from '../util/drop'; import { ipcInvoke, doShutdown } from './channels'; import type { AdjacentMessagesByConversationOptionsType, AllItemsType, AttachmentDownloadJobType, ClientInterface, ClientExclusiveInterface, ClientSearchResultMessageType, ConversationType, GetConversationRangeCenteredOnMessageResultType, IdentityKeyIdType, IdentityKeyType, StoredIdentityKeyType, ItemKeyType, ItemType, StoredItemType, MessageType, MessageTypeUnhydrated, PreKeyIdType, PreKeyType, StoredPreKeyType, ServerInterface, ServerSearchResultMessageType, SignedPreKeyIdType, SignedPreKeyType, StoredSignedPreKeyType, } from './Interface'; import { MINUTE } from '../util/durations'; import { getMessageIdForLogging } from '../util/idForLogging'; import type { MessageAttributesType } from '../model-types'; import { incrementMessageCounter } from '../util/incrementMessageCounter'; const ERASE_SQL_KEY = 'erase-sql-key'; const ERASE_ATTACHMENTS_KEY = 'erase-attachments'; const ERASE_STICKERS_KEY = 'erase-stickers'; const ERASE_TEMP_KEY = 'erase-temp'; const ERASE_DRAFTS_KEY = 'erase-drafts'; const CLEANUP_ORPHANED_ATTACHMENTS_KEY = 'cleanup-orphaned-attachments'; const ENSURE_FILE_PERMISSIONS = 'ensure-file-permissions'; const exclusiveInterface: ClientExclusiveInterface = { createOrUpdateIdentityKey, getIdentityKeyById, bulkAddIdentityKeys, getAllIdentityKeys, createOrUpdatePreKey, getPreKeyById, bulkAddPreKeys, getAllPreKeys, createOrUpdateSignedPreKey, getSignedPreKeyById, bulkAddSignedPreKeys, getAllSignedPreKeys, createOrUpdateItem, getItemById, getAllItems, updateConversation, removeConversation, searchMessages, searchMessagesInConversation, getOlderMessagesByConversation, getConversationRangeCenteredOnMessage, getNewerMessagesByConversation, // Client-side only shutdown, removeAllMessagesInConversation, removeOtherData, cleanupOrphanedAttachments, ensureFilePermissions, }; type ClientOverridesType = ClientExclusiveInterface & Pick< ServerInterface, | 'removeMessage' | 'removeMessages' | 'saveAttachmentDownloadJob' | 'saveMessage' | 'saveMessages' | 'updateConversations' >; const channels: ServerInterface = new Proxy({} as ServerInterface, { get(_target, name) { return async (...args: ReadonlyArray) => ipcInvoke(String(name), args); }, }); const clientExclusiveOverrides: ClientOverridesType = { ...exclusiveInterface, removeMessage, removeMessages, saveAttachmentDownloadJob, saveMessage, saveMessages, updateConversations, }; const dataInterface: ClientInterface = new Proxy( { ...clientExclusiveOverrides, } as ClientInterface, { get(target, name) { return async (...args: ReadonlyArray) => { if (has(target, name)) { return get(target, name)(...args); } return get(channels, name)(...args); }; }, } ); export default dataInterface; function _cleanData( data: unknown ): ReturnType['cleaned'] { const { cleaned, pathsChanged } = cleanDataForIpc(data); if (pathsChanged.length) { log.info( `_cleanData cleaned the following paths: ${pathsChanged.join(', ')}` ); } return cleaned; } export function _cleanMessageData(data: MessageType): MessageType { const result = { ...data }; // Ensure that all messages have the received_at set properly if (!data.received_at) { assertDev(false, 'received_at was not set on the message'); result.received_at = incrementMessageCounter(); } if (data.attachments) { const logId = getMessageIdForLogging(data); result.attachments = data.attachments.map((attachment, index) => { if (attachment.data && !isTypedArray(attachment.data)) { log.warn( `_cleanMessageData/${logId}: Attachment ${index} had non-array \`data\` field; deleting.` ); return omit(attachment, ['data']); } return attachment; }); } return _cleanData(omit(result, ['dataMessage'])); } function specToBytes( spec: ObjectMappingSpecType, data: Input ): Output { return mapObjectWithSpec(spec, data, x => Bytes.fromBase64(x) ); } function specFromBytes( spec: ObjectMappingSpecType, data: Input ): Output { return mapObjectWithSpec(spec, data, x => Bytes.toBase64(x) ); } // Top-level calls async function shutdown(): Promise { log.info('Client.shutdown'); // Stop accepting new SQL jobs, flush outstanding queue await doShutdown(); // Close database await channels.close(); } // Identity Keys const IDENTITY_KEY_SPEC = ['publicKey']; async function createOrUpdateIdentityKey(data: IdentityKeyType): Promise { const updated: StoredIdentityKeyType = specFromBytes(IDENTITY_KEY_SPEC, data); await channels.createOrUpdateIdentityKey(updated); } async function getIdentityKeyById( id: IdentityKeyIdType ): Promise { const data = await channels.getIdentityKeyById(id); return specToBytes(IDENTITY_KEY_SPEC, data); } async function bulkAddIdentityKeys( array: Array ): Promise { const updated: Array = map(array, data => specFromBytes(IDENTITY_KEY_SPEC, data) ); await channels.bulkAddIdentityKeys(updated); } async function getAllIdentityKeys(): Promise> { const keys = await channels.getAllIdentityKeys(); return keys.map(key => specToBytes(IDENTITY_KEY_SPEC, key)); } // Pre Keys async function createOrUpdatePreKey(data: PreKeyType): Promise { const updated: StoredPreKeyType = specFromBytes(PRE_KEY_SPEC, data); await channels.createOrUpdatePreKey(updated); } async function getPreKeyById( id: PreKeyIdType ): Promise { const data = await channels.getPreKeyById(id); return specToBytes(PRE_KEY_SPEC, data); } async function bulkAddPreKeys(array: Array): Promise { const updated: Array = map(array, data => specFromBytes(PRE_KEY_SPEC, data) ); await channels.bulkAddPreKeys(updated); } async function getAllPreKeys(): Promise> { const keys = await channels.getAllPreKeys(); return keys.map(key => specToBytes(PRE_KEY_SPEC, key)); } // Signed Pre Keys const PRE_KEY_SPEC = ['privateKey', 'publicKey']; async function createOrUpdateSignedPreKey( data: SignedPreKeyType ): Promise { const updated: StoredSignedPreKeyType = specFromBytes(PRE_KEY_SPEC, data); await channels.createOrUpdateSignedPreKey(updated); } async function getSignedPreKeyById( id: SignedPreKeyIdType ): Promise { const data = await channels.getSignedPreKeyById(id); return specToBytes(PRE_KEY_SPEC, data); } async function getAllSignedPreKeys(): Promise> { const keys = await channels.getAllSignedPreKeys(); return keys.map(key => specToBytes(PRE_KEY_SPEC, key)); } async function bulkAddSignedPreKeys( array: Array ): Promise { const updated: Array = map(array, data => specFromBytes(PRE_KEY_SPEC, data) ); await channels.bulkAddSignedPreKeys(updated); } // Items const ITEM_SPECS: Partial> = { identityKeyMap: { key: 'value', valueSpec: { isMap: true, valueSpec: ['privKey', 'pubKey'], }, }, profileKey: ['value'], senderCertificate: ['value.serialized'], senderCertificateNoE164: ['value.serialized'], subscriberId: ['value'], }; async function createOrUpdateItem( data: ItemType ): Promise { const { id } = data; if (!id) { throw new Error( 'createOrUpdateItem: Provided data did not have a truthy id' ); } const spec = ITEM_SPECS[id]; const updated: StoredItemType = spec ? specFromBytes(spec, data) : (data as unknown as StoredItemType); await channels.createOrUpdateItem(updated); } async function getItemById( id: K ): Promise | undefined> { const spec = ITEM_SPECS[id]; const data = await channels.getItemById(id); return spec ? specToBytes(spec, data) : (data as unknown as ItemType); } async function getAllItems(): Promise { const items = await channels.getAllItems(); const result = Object.create(null); for (const id of Object.keys(items)) { const key = id as ItemKeyType; const value = items[key]; const keys = ITEM_SPECS[key]; const deserializedValue = keys ? (specToBytes(keys, { value }) as ItemType).value : value; result[key] = deserializedValue; } return result; } // Conversation const updateConversationBatcher = createBatcher({ name: 'sql.Client.updateConversationBatcher', wait: 500, maxSize: 20, processBatch: async (items: Array) => { // We only care about the most recent update for each conversation const byId = groupBy(items, item => item.id); const ids = Object.keys(byId); const mostRecent = ids.map((id: string): ConversationType => { const maybeLast = last(byId[id]); assertDev(maybeLast !== undefined, 'Empty array in `groupBy` result'); return maybeLast; }); await updateConversations(mostRecent); }, }); function updateConversation(data: ConversationType): void { updateConversationBatcher.add(data); } async function updateConversations( array: Array ): Promise { const { cleaned, pathsChanged } = cleanDataForIpc(array); assertDev( !pathsChanged.length, `Paths were cleaned: ${JSON.stringify(pathsChanged)}` ); await channels.updateConversations(cleaned); } async function removeConversation(id: string): Promise { const existing = await channels.getConversationById(id); // Note: It's important to have a fully database-hydrated model to delete here because // it needs to delete all associated on-disk files along with the database delete. if (existing) { await channels.removeConversation(id); await deleteExternalFiles(existing, { deleteAttachmentData: window.Signal.Migrations.deleteAttachmentData, }); } } function handleSearchMessageJSON( messages: Array ): Array { return messages.map(message => ({ json: message.json, // Empty array is a default value. `message.json` has the real field bodyRanges: [], ...JSON.parse(message.json), snippet: message.snippet, })); } async function searchMessages( query: string, { limit }: { limit?: number } = {} ): Promise> { const messages = await channels.searchMessages(query, { limit }); return handleSearchMessageJSON(messages); } async function searchMessagesInConversation( query: string, conversationId: string, { limit }: { limit?: number } = {} ): Promise> { const messages = await channels.searchMessagesInConversation( query, conversationId, { limit } ); return handleSearchMessageJSON(messages); } // Message async function saveMessage( data: MessageType, options: { jobToInsert?: Readonly; forceSave?: boolean; ourUuid: UUIDStringType; } ): Promise { const id = await channels.saveMessage(_cleanMessageData(data), { ...options, jobToInsert: options.jobToInsert && formatJobForInsert(options.jobToInsert), }); softAssert(isValidUuid(id), 'saveMessage: messageId is not a UUID'); void expiringMessagesDeletionService.update(); void tapToViewMessagesDeletionService.update(); return id; } async function saveMessages( arrayOfMessages: ReadonlyArray, options: { forceSave?: boolean; ourUuid: UUIDStringType } ): Promise { await channels.saveMessages( arrayOfMessages.map(message => _cleanMessageData(message)), options ); void expiringMessagesDeletionService.update(); void tapToViewMessagesDeletionService.update(); } async function removeMessage(id: string): Promise { const message = await channels.getMessageById(id); // Note: It's important to have a fully database-hydrated model to delete here because // it needs to delete all associated on-disk files along with the database delete. if (message) { await channels.removeMessage(id); await cleanupMessage(message); } } async function _cleanupMessages( messages: ReadonlyArray ): Promise { const queue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 }); drop( queue.addAll( messages.map( (message: MessageAttributesType) => async () => cleanupMessage(message) ) ) ); await queue.onIdle(); } async function removeMessages( messageIds: ReadonlyArray ): Promise { const messages = await channels.getMessagesById(messageIds); await _cleanupMessages(messages); await channels.removeMessages(messageIds); } function handleMessageJSON( messages: Array ): Array { return messages.map(message => JSON.parse(message.json)); } async function getNewerMessagesByConversation( options: AdjacentMessagesByConversationOptionsType ): Promise> { const messages = await channels.getNewerMessagesByConversation(options); return handleMessageJSON(messages); } async function getOlderMessagesByConversation( options: AdjacentMessagesByConversationOptionsType ): Promise> { const messages = await channels.getOlderMessagesByConversation(options); return handleMessageJSON(messages); } async function getConversationRangeCenteredOnMessage( options: AdjacentMessagesByConversationOptionsType ): Promise> { const result = await channels.getConversationRangeCenteredOnMessage(options); return { ...result, older: handleMessageJSON(result.older), newer: handleMessageJSON(result.newer), }; } async function removeAllMessagesInConversation( conversationId: string, { logId, }: { logId: string; } ): Promise { let messages; do { const chunkSize = 20; log.info( `removeAllMessagesInConversation/${logId}: Fetching chunk of ${chunkSize} messages` ); // Yes, we really want the await in the loop. We're deleting a chunk at a // time so we don't use too much memory. // eslint-disable-next-line no-await-in-loop messages = await getOlderMessagesByConversation({ conversationId, limit: chunkSize, includeStoryReplies: true, storyId: undefined, }); if (!messages.length) { return; } const ids = messages.map(message => message.id); log.info(`removeAllMessagesInConversation/${logId}: Cleanup...`); // eslint-disable-next-line no-await-in-loop await _cleanupMessages(messages); log.info(`removeAllMessagesInConversation/${logId}: Deleting...`); // eslint-disable-next-line no-await-in-loop await channels.removeMessages(ids); } while (messages.length > 0); } // Attachment downloads async function saveAttachmentDownloadJob( job: AttachmentDownloadJobType ): Promise { await channels.saveAttachmentDownloadJob(_cleanData(job)); } // Other async function cleanupOrphanedAttachments(): Promise { try { await invokeWithTimeout(CLEANUP_ORPHANED_ATTACHMENTS_KEY); } catch (error) { log.warn( 'sql/Client: cleanupOrphanedAttachments failure', Errors.toLogFormat(error) ); } } async function ensureFilePermissions(): Promise { await invokeWithTimeout(ENSURE_FILE_PERMISSIONS); } // Note: will need to restart the app after calling this, to set up afresh async function removeOtherData(): Promise { await Promise.all([ invokeWithTimeout(ERASE_SQL_KEY), invokeWithTimeout(ERASE_ATTACHMENTS_KEY), invokeWithTimeout(ERASE_STICKERS_KEY), invokeWithTimeout(ERASE_TEMP_KEY), invokeWithTimeout(ERASE_DRAFTS_KEY), ]); } async function invokeWithTimeout(name: string): Promise { return createTaskWithTimeout( () => ipc.invoke(name), `callChannel call to ${name}` )(); }