From 08da49a0aa46a7d72379a66e38df131fd9a01e39 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Mon, 24 Jun 2024 10:49:36 -0700 Subject: [PATCH] Delete Sync: Handle and send mostRecentNonExpiringMessages if needed --- protos/SignalService.proto | 1 + ts/background.ts | 28 ++- ts/models/conversations.ts | 25 ++- ts/sql/Interface.ts | 4 + ts/sql/Server.ts | 34 ++++ .../1080-nondisappearing-addressable.ts | 31 ++++ ts/sql/migrations/index.ts | 6 +- ts/test-node/sql/migration_1080_test.ts | 167 ++++++++++++++++++ ts/textsecure/MessageReceiver.ts | 5 + ts/textsecure/SendMessage.ts | 14 +- ts/textsecure/messageReceiverEvents.ts | 1 + ts/util/deleteForMe.ts | 70 ++++++-- ts/util/syncTasks.ts | 16 +- 13 files changed, 356 insertions(+), 46 deletions(-) create mode 100644 ts/sql/migrations/1080-nondisappearing-addressable.ts create mode 100644 ts/test-node/sql/migration_1080_test.ts diff --git a/protos/SignalService.proto b/protos/SignalService.proto index 66d10a29951..dd3837ab214 100644 --- a/protos/SignalService.proto +++ b/protos/SignalService.proto @@ -670,6 +670,7 @@ message SyncMessage { optional ConversationIdentifier conversation = 1; repeated AddressableMessage mostRecentMessages = 2; optional bool isFullDelete = 3; + repeated AddressableMessage mostRecentNonExpiringMessages = 4; } message LocalOnlyConversationDelete { diff --git a/ts/background.ts b/ts/background.ts index c085a5d54ac..a17fd982dc7 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -565,24 +565,6 @@ export async function startApp(): Promise { storage: window.storage, serverTrustRoot: window.getServerTrustRoot(), }); - const onFirstEmpty = async () => { - log.info('onFirstEmpty: Starting'); - - // We want to remove this handler on the next tick so we don't interfere with - // the other handlers being notified of this instance of the 'empty' event. - setTimeout(() => { - messageReceiver?.removeEventListener('empty', onFirstEmpty); - }, 1); - - log.info('onFirstEmpty: Fetching sync tasks'); - const syncTasks = await window.Signal.Data.getAllSyncTasks(); - - log.info(`onFirstEmpty: Queuing ${syncTasks.length} sync tasks`); - await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); - - log.info('onFirstEmpty: Done'); - }; - messageReceiver.addEventListener('empty', onFirstEmpty); function queuedEventListener( handler: (event: E) => Promise | void, @@ -1459,6 +1441,16 @@ export async function startApp(): Promise { } log.info('Expiration start timestamp cleanup: complete'); + { + log.info('Startup/syncTasks: Fetching tasks'); + const syncTasks = await window.Signal.Data.getAllSyncTasks(); + + log.info(`Startup/syncTasks: Queueing ${syncTasks.length} sync tasks`); + await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); + + log.info('`Startup/syncTasks: Done'); + } + log.info('listening for registration events'); window.Whisper.events.on('registration_done', () => { log.info('handling registration event'); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 251ac03be85..ac0b11893d8 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -193,6 +193,7 @@ const { getMessageMetricsForConversation, getMessageById, getMostRecentAddressableMessages, + getMostRecentAddressableNondisappearingMessages, getNewerMessagesByConversation, } = window.Signal.Data; @@ -4999,12 +5000,13 @@ export class ConversationModel extends window.Backbone } async destroyMessagesInner({ - logId, + logId: providedLogId, source, }: { logId: string; source: 'message-request' | 'local-delete-sync' | 'local-delete'; }): Promise { + const logId = `${providedLogId}/destroyMessagesInner`; this.set({ lastMessage: null, lastMessageAuthor: null, @@ -5032,6 +5034,26 @@ export class ConversationModel extends window.Backbone .map(getMessageToDelete) .filter(isNotNil) .slice(0, 5); + log.info( + `${logId}: Found ${mostRecentMessages.length} most recent messages` + ); + + const areAnyDisappearing = addressableMessages.some( + item => item.expireTimer + ); + + let mostRecentNonExpiringMessages: Array | undefined; + if (areAnyDisappearing) { + const nondisappearingAddressableMessages = + await getMostRecentAddressableNondisappearingMessages(this.id); + mostRecentNonExpiringMessages = nondisappearingAddressableMessages + .map(getMessageToDelete) + .filter(isNotNil) + .slice(0, 5); + log.info( + `${logId}: Found ${mostRecentNonExpiringMessages.length} most recent nondisappearing messages` + ); + } if (mostRecentMessages.length > 0) { await singleProtoJobQueue.add( @@ -5041,6 +5063,7 @@ export class ConversationModel extends window.Backbone conversation: getConversationToDelete(this.attributes), isFullDelete: true, mostRecentMessages, + mostRecentNonExpiringMessages, timestamp, }, ]) diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index b6176f77e7b..0f7c1974ab4 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -733,6 +733,10 @@ export type DataInterface = { conversationId: string, limit?: number ) => Promise>; + getMostRecentAddressableNondisappearingMessages: ( + conversationId: string, + limit?: number + ) => Promise>; removeSyncTaskById: (id: string) => Promise; saveSyncTasks: (tasks: Array) => Promise; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 6385d894ace..d02ce288c54 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -372,6 +372,7 @@ const dataInterface: ServerInterface = { saveEditedMessage, saveEditedMessages, getMostRecentAddressableMessages, + getMostRecentAddressableNondisappearingMessages, removeSyncTaskById, saveSyncTasks, @@ -2119,6 +2120,39 @@ export function getMostRecentAddressableMessagesSync( return rows.map(row => jsonToObject(row.json)); } +async function getMostRecentAddressableNondisappearingMessages( + conversationId: string, + limit = 5 +): Promise> { + const db = getReadonlyInstance(); + return getMostRecentAddressableNondisappearingMessagesSync( + db, + conversationId, + limit + ); +} + +export function getMostRecentAddressableNondisappearingMessagesSync( + db: Database, + conversationId: string, + limit = 5 +): Array { + const [query, parameters] = sql` + SELECT json FROM messages + INDEXED BY messages_by_date_addressable_nondisappearing + WHERE + expireTimer IS NULL AND + conversationId IS ${conversationId} AND + isAddressableMessage = 1 + ORDER BY received_at DESC, sent_at DESC + LIMIT ${limit}; + `; + + const rows = db.prepare(query).all(parameters); + + return rows.map(row => jsonToObject(row.json)); +} + async function removeSyncTaskById(id: string): Promise { const db = await getWritableInstance(); removeSyncTaskByIdSync(db, id); diff --git a/ts/sql/migrations/1080-nondisappearing-addressable.ts b/ts/sql/migrations/1080-nondisappearing-addressable.ts new file mode 100644 index 00000000000..e8702bb412c --- /dev/null +++ b/ts/sql/migrations/1080-nondisappearing-addressable.ts @@ -0,0 +1,31 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Database } from '@signalapp/better-sqlite3'; + +import type { LoggerType } from '../../types/Logging'; + +export const version = 1080; + +export function updateToSchemaVersion1080( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 1080) { + return; + } + + db.transaction(() => { + db.exec(` + CREATE INDEX messages_by_date_addressable_nondisappearing + ON messages ( + conversationId, isAddressableMessage, received_at, sent_at + ) WHERE expireTimer IS NULL; + `); + })(); + + db.pragma('user_version = 1080'); + + logger.info('updateToSchemaVersion1080: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index 8cf807f195a..4c4da9c470d 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -82,10 +82,11 @@ import { updateToSchemaVersion1030 } from './1030-unblock-event'; import { updateToSchemaVersion1040 } from './1040-undownloaded-backed-up-media'; import { updateToSchemaVersion1050 } from './1050-group-send-endorsements'; import { updateToSchemaVersion1060 } from './1060-addressable-messages-and-sync-tasks'; +import { updateToSchemaVersion1070 } from './1070-attachment-backup'; import { - updateToSchemaVersion1070, + updateToSchemaVersion1080, version as MAX_VERSION, -} from './1070-attachment-backup'; +} from './1080-nondisappearing-addressable'; function updateToSchemaVersion1( currentVersion: number, @@ -2036,6 +2037,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1050, updateToSchemaVersion1060, updateToSchemaVersion1070, + updateToSchemaVersion1080, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/test-node/sql/migration_1080_test.ts b/ts/test-node/sql/migration_1080_test.ts new file mode 100644 index 00000000000..cca34c428ef --- /dev/null +++ b/ts/test-node/sql/migration_1080_test.ts @@ -0,0 +1,167 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import type { Database } from '@signalapp/better-sqlite3'; +import SQL from '@signalapp/better-sqlite3'; +import { v4 as generateGuid } from 'uuid'; + +import { getMostRecentAddressableNondisappearingMessagesSync } from '../../sql/Server'; +import { insertData, updateToVersion } from './helpers'; + +import type { MessageAttributesType } from '../../model-types'; +import { DurationInSeconds } from '../../util/durations/duration-in-seconds'; + +/* eslint-disable camelcase */ + +function generateMessage(json: MessageAttributesType) { + const { conversationId, expireTimer, received_at, sent_at, type } = json; + + return { + conversationId, + json, + received_at, + sent_at, + expireTimer: Number(expireTimer), + type, + }; +} + +describe('SQL/updateToSchemaVersion1080', () => { + let db: Database; + beforeEach(() => { + db = new SQL(':memory:'); + updateToVersion(db, 1080); + }); + + afterEach(() => { + db.close(); + }); + + describe('Addressable Messages', () => { + describe('Storing of new attachment jobs', () => { + it('returns only incoming/outgoing messages', () => { + const conversationId = generateGuid(); + const otherConversationId = generateGuid(); + + insertData(db, 'messages', [ + generateMessage({ + id: '1', + conversationId, + type: 'incoming', + received_at: 1, + sent_at: 1, + timestamp: 1, + }), + generateMessage({ + id: '2', + conversationId, + type: 'story', + received_at: 2, + sent_at: 2, + timestamp: 2, + }), + generateMessage({ + id: '3', + conversationId, + type: 'outgoing', + received_at: 3, + sent_at: 3, + timestamp: 3, + }), + generateMessage({ + id: '4', + conversationId, + type: 'group-v1-migration', + received_at: 4, + sent_at: 4, + timestamp: 4, + }), + generateMessage({ + id: '5', + conversationId, + type: 'group-v2-change', + received_at: 5, + sent_at: 5, + timestamp: 5, + }), + generateMessage({ + id: '6', + conversationId, + type: 'incoming', + received_at: 6, + sent_at: 6, + timestamp: 6, + expireTimer: DurationInSeconds.fromMinutes(10), + }), + generateMessage({ + id: '7', + conversationId, + type: 'profile-change', + received_at: 7, + sent_at: 7, + timestamp: 7, + }), + generateMessage({ + id: '8', + conversationId: otherConversationId, + type: 'incoming', + received_at: 8, + sent_at: 8, + timestamp: 8, + }), + ]); + + const messages = getMostRecentAddressableNondisappearingMessagesSync( + db, + conversationId + ); + + assert.lengthOf(messages, 2); + assert.deepEqual(messages, [ + { + id: '3', + conversationId, + type: 'outgoing', + received_at: 3, + sent_at: 3, + timestamp: 3, + }, + { + id: '1', + conversationId, + type: 'incoming', + received_at: 1, + sent_at: 1, + timestamp: 1, + }, + ]); + }); + + it('ensures that index is used for getMostRecentAddressableNondisappearingMessagesSync, with storyId', () => { + const { detail } = db + .prepare( + ` + EXPLAIN QUERY PLAN + SELECT json FROM messages + INDEXED BY messages_by_date_addressable_nondisappearing + WHERE + expireTimer IS NULL AND + conversationId IS 'not-important' AND + isAddressableMessage = 1 + ORDER BY received_at DESC, sent_at DESC + LIMIT 5; + ` + ) + .get(); + + assert.notInclude(detail, 'B-TREE'); + assert.notInclude(detail, 'SCAN'); + assert.include( + detail, + 'SEARCH messages USING INDEX messages_by_date_addressable_nondisappearing (conversationId=? AND isAddressableMessage=?)' + ); + }); + }); + }); +}); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 07d1afb7c2c..5d3c0349757 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -3708,6 +3708,10 @@ export default class MessageReceiver const mostRecentMessages = item.mostRecentMessages ?.map(message => processMessageToDelete(message, logId)) .filter(isNotNil); + const mostRecentNonExpiringMessages = + item.mostRecentNonExpiringMessages + ?.map(message => processMessageToDelete(message, logId)) + .filter(isNotNil); const conversation = item.conversation ? processConversationToDelete(item.conversation, logId) : undefined; @@ -3730,6 +3734,7 @@ export default class MessageReceiver conversation, isFullDelete: Boolean(item.isFullDelete), mostRecentMessages, + mostRecentNonExpiringMessages, timestamp, }; }) diff --git a/ts/textsecure/SendMessage.ts b/ts/textsecure/SendMessage.ts index c208d4cd1d6..a3cfb46c240 100644 --- a/ts/textsecure/SendMessage.ts +++ b/ts/textsecure/SendMessage.ts @@ -97,6 +97,7 @@ import { } from '../types/CallDisposition'; import { getProtoForCallHistory } from '../util/callDisposition'; import { CallMode } from '../types/Calling'; +import { MAX_MESSAGE_COUNT } from '../util/deleteForMe.types'; export type SendMetadataType = { [serviceId: ServiceIdString]: { @@ -1518,13 +1519,16 @@ export default class MessageSender { } else if (item.type === 'delete-conversation') { const mostRecentMessages = item.mostRecentMessages.map(toAddressableMessage); + const mostRecentNonExpiringMessages = + item.mostRecentNonExpiringMessages?.map(toAddressableMessage); const conversation = toConversationIdentifier(item.conversation); deleteForMe.conversationDeletes = deleteForMe.conversationDeletes || []; deleteForMe.conversationDeletes.push({ - mostRecentMessages, conversation, isFullDelete: true, + mostRecentMessages, + mostRecentNonExpiringMessages, }); } else if (item.type === 'delete-local-conversation') { const conversation = toConversationIdentifier(item.conversation); @@ -1544,7 +1548,7 @@ export default class MessageSender { }); if (messageDeletes.size > 0) { - for (const items of messageDeletes.values()) { + for (const [conversationId, items] of messageDeletes.entries()) { const first = items[0]; if (!first) { throw new Error('Failed to fetch first from items'); @@ -1552,6 +1556,12 @@ export default class MessageSender { const messages = items.map(item => toAddressableMessage(item.message)); const conversation = toConversationIdentifier(first.conversation); + if (items.length > MAX_MESSAGE_COUNT) { + log.warn( + `getDeleteForMeSyncMessage: Sending ${items.length} message deletes for conversationId ${conversationId}` + ); + } + deleteForMe.messageDeletes = deleteForMe.messageDeletes || []; deleteForMe.messageDeletes.push({ messages, diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index b3e6cdc7f0c..4f54e4b3425 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -518,6 +518,7 @@ export const deleteConversationSchema = z.object({ type: z.literal('delete-conversation').readonly(), conversation: conversationToDeleteSchema, mostRecentMessages: z.array(messageToDeleteSchema), + mostRecentNonExpiringMessages: z.array(messageToDeleteSchema).optional(), isFullDelete: z.boolean(), timestamp: z.number(), }); diff --git a/ts/util/deleteForMe.ts b/ts/util/deleteForMe.ts index 88539850d3b..c1c27323e50 100644 --- a/ts/util/deleteForMe.ts +++ b/ts/util/deleteForMe.ts @@ -256,20 +256,36 @@ export async function applyDeleteAttachmentFromMessage( return true; } -export async function deleteConversation( - conversation: ConversationModel, - mostRecentMessages: Array, - isFullDelete: boolean, - logId: string -): Promise { - const queries = mostRecentMessages.map(getMessageQueryFromTarget); +async function getMostRecentMatchingMessage( + conversationId: string, + targetMessages: Array +): Promise { + const queries = targetMessages.map(getMessageQueryFromTarget); const found = await Promise.all( - queries.map(query => findMatchingMessage(conversation.id, query)) + queries.map(query => findMatchingMessage(conversationId, query)) ); const sorted = sortBy(found, 'received_at'); - const newestMessage = last(sorted); - if (newestMessage) { + return last(sorted); +} + +export async function deleteConversation( + conversation: ConversationModel, + mostRecentMessages: Array, + mostRecentNonExpiringMessages: Array | undefined, + isFullDelete: boolean, + providedLogId: string +): Promise { + const logId = `${providedLogId}/deleteConversation`; + + const newestMessage = await getMostRecentMatchingMessage( + conversation.id, + mostRecentMessages + ); + if (!newestMessage) { + log.warn(`${logId}: Found no messages from mostRecentMessages set`); + } else { + log.info(`${logId}: Found most recent message from mostRecentMessages set`); const { received_at: receivedAt } = newestMessage; await removeMessagesInConversation(conversation.id, { @@ -280,13 +296,34 @@ export async function deleteConversation( }); } - if (!newestMessage) { - log.warn(`${logId}: Found no target messages for delete`); + if (!newestMessage && mostRecentNonExpiringMessages?.length) { + const newestNondisappearingMessage = await getMostRecentMatchingMessage( + conversation.id, + mostRecentNonExpiringMessages + ); + + if (!newestNondisappearingMessage) { + log.warn( + `${logId}: Found no messages from mostRecentNonExpiringMessages set` + ); + } else { + log.info( + `${logId}: Found most recent message from mostRecentNonExpiringMessages set` + ); + const { received_at: receivedAt } = newestNondisappearingMessage; + + await removeMessagesInConversation(conversation.id, { + fromSync: true, + receivedAt, + logId: `${logId}(receivedAt=${receivedAt})`, + singleProtoJobQueue, + }); + } } if (isFullDelete) { log.info(`${logId}: isFullDelete=true, proceeding to local-only delete`); - return deleteLocalOnlyConversation(conversation, logId); + return deleteLocalOnlyConversation(conversation, providedLogId); } return true; @@ -294,17 +331,16 @@ export async function deleteConversation( export async function deleteLocalOnlyConversation( conversation: ConversationModel, - logId: string + providedLogId: string ): Promise { + const logId = `${providedLogId}/deleteLocalOnlyConversation`; const limit = 1; const messages = await getMostRecentAddressableMessages( conversation.id, limit ); if (messages.length > 0) { - log.warn( - `${logId}: Attempted local-only delete but found an addressable message` - ); + log.warn(`${logId}: Cannot delete; found an addressable message`); return false; } diff --git a/ts/util/syncTasks.ts b/ts/util/syncTasks.ts index 2e1ae638b2e..0c0eb0df3d4 100644 --- a/ts/util/syncTasks.ts +++ b/ts/util/syncTasks.ts @@ -65,7 +65,7 @@ const SCHEMAS_BY_TYPE: Record = { }; function toLogId(task: SyncTaskType) { - return `task=${task.id},timestamp:${task},type=${task.type},envelopeId=${task.envelopeId}`; + return `type=${task.type},envelopeId=${task.envelopeId}`; } export async function queueSyncTasks( @@ -112,6 +112,7 @@ export async function queueSyncTasks( const { conversation: targetConversation, mostRecentMessages, + mostRecentNonExpiringMessages, isFullDelete, } = parsed; const conversation = getConversationFromTarget(targetConversation); @@ -121,17 +122,18 @@ export async function queueSyncTasks( } drop( conversation.queueJob(innerLogId, async () => { - log.info(`${logId}: Starting...`); + log.info(`${innerLogId}: Starting...`); const result = await deleteConversation( conversation, mostRecentMessages, + mostRecentNonExpiringMessages, isFullDelete, innerLogId ); if (result) { await removeSyncTaskById(id); } - log.info(`${logId}: Done, result=${result}`); + log.info(`${innerLogId}: Done, result=${result}`); }) ); } else if (parsed.type === 'delete-local-conversation') { @@ -143,7 +145,7 @@ export async function queueSyncTasks( } drop( conversation.queueJob(innerLogId, async () => { - log.info(`${logId}: Starting...`); + log.info(`${innerLogId}: Starting...`); const result = await deleteLocalOnlyConversation( conversation, innerLogId @@ -153,7 +155,7 @@ export async function queueSyncTasks( // get more messages in this conversation from here! await removeSyncTaskById(id); - log.info(`${logId}: Done; result=${result}`); + log.info(`${innerLogId}: Done; result=${result}`); }) ); } else if (parsed.type === 'delete-single-attachment') { @@ -201,7 +203,9 @@ export async function queueSyncTasks( ); } else { const parsedType: never = parsed.type; - log.error(`${logId}: Encountered job of type ${parsedType}, removing`); + log.error( + `${innerLogId}: Encountered job of type ${parsedType}, removing` + ); // eslint-disable-next-line no-await-in-loop await removeSyncTaskById(id); }