From 75c32e86f0b5aaf02500fd6b522b5c317ea204dc Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Mon, 17 Jun 2024 12:24:39 -0700 Subject: [PATCH] Move receipts and view/read syncs to new syncTasks system --- protos/SignalService.proto | 4 +- ts/background.ts | 463 +++++++++++------- ts/messageModifiers/MessageReceipts.ts | 163 +++--- ts/messageModifiers/ReadSyncs.ts | 101 ++-- ts/messageModifiers/ViewSyncs.ts | 84 ++-- ts/models/conversations.ts | 26 +- ts/models/messages.ts | 6 +- ts/services/expiringMessagesDeletion.ts | 31 +- ts/sql/Client.ts | 85 ++-- ts/sql/Interface.ts | 22 +- ts/sql/Server.ts | 14 + ts/state/ducks/conversations.ts | 12 +- ts/state/ducks/stories.ts | 8 +- ts/state/selectors/items-extra.ts | 38 ++ ts/state/selectors/items.ts | 7 - ts/state/smart/ConversationHeader.tsx | 6 +- ts/state/smart/DeleteMessagesModal.tsx | 6 +- .../util/filterAndSortConversations_test.ts | 4 +- ts/test-electron/MessageReceipts_test.ts | 85 +++- ts/test-electron/sql/sendLog_test.ts | 3 +- .../textsecure/KeyChangeListener_test.ts | 3 + ts/textsecure/MessageReceiver.ts | 256 ++++++---- ts/textsecure/SendMessage.ts | 81 ++- ts/textsecure/WebAPI.ts | 8 +- ts/textsecure/messageReceiverEvents.ts | 50 +- ts/util/callDisposition.ts | 35 +- ts/util/cleanup.ts | 105 +++- ts/util/deleteForMe.ts | 28 +- .../findAndDeleteOnboardingStoryIfExists.ts | 5 +- ts/util/getConversation.ts | 1 + ts/util/markConversationRead.ts | 4 +- ts/util/modifyTargetMessage.ts | 37 +- ts/util/syncTasks.ts | 73 ++- 33 files changed, 1242 insertions(+), 612 deletions(-) create mode 100644 ts/state/selectors/items-extra.ts diff --git a/protos/SignalService.proto b/protos/SignalService.proto index cf8cb3da340..392449671c5 100644 --- a/protos/SignalService.proto +++ b/protos/SignalService.proto @@ -636,7 +636,7 @@ message SyncMessage { message DeleteForMe { message ConversationIdentifier { oneof identifier { - string threadAci = 1; + string threadServiceId = 1; bytes threadGroupId = 2; string threadE164 = 3; } @@ -644,7 +644,7 @@ message SyncMessage { message AddressableMessage { oneof author { - string authorAci = 1; + string authorServiceId = 1; string authorE164 = 2; } optional uint64 sentTimestamp = 3; diff --git a/ts/background.ts b/ts/background.ts index b8b775cd2b8..c085a5d54ac 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -42,7 +42,10 @@ import { isNotNil } from './util/isNotNil'; import { isBackupEnabled } from './util/isBackupEnabled'; import { setAppLoadingScreenMessage } from './setAppLoadingScreenMessage'; import { IdleDetector } from './IdleDetector'; -import { expiringMessagesDeletionService } from './services/expiringMessagesDeletion'; +import { + initialize as initializeExpiringMessageService, + update as updateExpiringMessagesService, +} from './services/expiringMessagesDeletion'; import { tapToViewMessagesDeletionService } from './services/tapToViewMessagesDeletionService'; import { getStoriesForRedux, loadStories } from './services/storyLoader'; import { @@ -116,17 +119,12 @@ import * as Edits from './messageModifiers/Edits'; import * as MessageReceipts from './messageModifiers/MessageReceipts'; import * as MessageRequests from './messageModifiers/MessageRequests'; import * as Reactions from './messageModifiers/Reactions'; -import * as ReadSyncs from './messageModifiers/ReadSyncs'; import * as ViewOnceOpenSyncs from './messageModifiers/ViewOnceOpenSyncs'; -import * as ViewSyncs from './messageModifiers/ViewSyncs'; import type { DeleteAttributesType } from './messageModifiers/Deletes'; import type { EditAttributesType } from './messageModifiers/Edits'; -import type { MessageReceiptAttributesType } from './messageModifiers/MessageReceipts'; import type { MessageRequestAttributesType } from './messageModifiers/MessageRequests'; import type { ReactionAttributesType } from './messageModifiers/Reactions'; -import type { ReadSyncAttributesType } from './messageModifiers/ReadSyncs'; import type { ViewOnceOpenSyncAttributesType } from './messageModifiers/ViewOnceOpenSyncs'; -import type { ViewSyncAttributesType } from './messageModifiers/ViewSyncs'; import { ReadStatus } from './messages/MessageReadStatus'; import type { SendStateByConversationId } from './messages/MessageSendState'; import { SendStatus } from './messages/MessageSendState'; @@ -202,7 +200,11 @@ import { getThemeType } from './util/getThemeType'; import { AttachmentDownloadManager } from './jobs/AttachmentDownloadManager'; import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync'; import { CallMode } from './types/Calling'; +import type { SyncTaskType } from './util/syncTasks'; import { queueSyncTasks } from './util/syncTasks'; +import type { ViewSyncTaskType } from './messageModifiers/ViewSyncs'; +import type { ReceiptSyncTaskType } from './messageModifiers/MessageReceipts'; +import type { ReadSyncTaskType } from './messageModifiers/ReadSyncs'; import { isEnabled } from './RemoteConfig'; import { AttachmentBackupManager } from './jobs/AttachmentBackupManager'; import { getConversationIdForLogging } from './util/idForLogging'; @@ -1498,10 +1500,12 @@ export async function startApp(): Promise { window.Whisper.events.trigger('timetravel'); }); - void expiringMessagesDeletionService.update(); + initializeExpiringMessageService(singleProtoJobQueue); + + void updateExpiringMessagesService(); void tapToViewMessagesDeletionService.update(); window.Whisper.events.on('timetravel', () => { - void expiringMessagesDeletionService.update(); + void updateExpiringMessagesService(); void tapToViewMessagesDeletionService.update(); }); @@ -1833,7 +1837,9 @@ export async function startApp(): Promise { try { // Note: we always have to register our capabilities all at once, so we do this // after connect on every startup - await server.registerCapabilities({}); + await server.registerCapabilities({ + deleteSync: true, + }); } catch (error) { log.error( 'Error: Unable to register our capabilities.', @@ -3221,200 +3227,329 @@ export async function startApp(): Promise { drop(MessageRequests.onResponse(attributes)); } - function onReadReceipt(event: Readonly): void { - onReadOrViewReceipt({ + async function onReadReceipt(event: Readonly): Promise { + return onReadOrViewReceipt({ logTitle: 'read receipt', event, - type: MessageReceipts.MessageReceiptType.Read, + type: MessageReceipts.messageReceiptTypeSchema.enum.Read, }); } - function onViewReceipt(event: Readonly): void { - onReadOrViewReceipt({ + async function onViewReceipt(event: Readonly): Promise { + return onReadOrViewReceipt({ logTitle: 'view receipt', event, - type: MessageReceipts.MessageReceiptType.View, + type: MessageReceipts.messageReceiptTypeSchema.enum.View, }); } - function onReadOrViewReceipt({ + async function onReadOrViewReceipt({ event, logTitle, type, }: Readonly<{ event: ReadEvent | ViewEvent; logTitle: string; - type: - | MessageReceipts.MessageReceiptType.Read - | MessageReceipts.MessageReceiptType.View; - }>): void { - const { - envelopeTimestamp, - timestamp, - source, - sourceServiceId, - sourceDevice, - wasSentEncrypted, - } = event.receipt; - const sourceConversation = window.ConversationController.lookupOrCreate({ - serviceId: sourceServiceId, - e164: source, - reason: `onReadOrViewReceipt(${envelopeTimestamp})`, - }); - strictAssert(sourceConversation, 'Failed to create conversation'); - log.info( - logTitle, - `${sourceServiceId || source}.${sourceDevice}`, - envelopeTimestamp, - 'for sent message', - timestamp - ); + type: 'Read' | 'View'; + }>): Promise { + const { receipts, envelopeId, envelopeTimestamp, confirm } = event; + const logId = `onReadOrViewReceipt(type=${type}, envelope=${envelopeTimestamp}, envelopeId=${envelopeId})`; - strictAssert( - isServiceIdString(sourceServiceId), - 'onReadOrViewReceipt: Missing sourceServiceId' - ); - strictAssert(sourceDevice, 'onReadOrViewReceipt: Missing sourceDevice'); + const syncTasks = receipts + .map((receipt): SyncTaskType | undefined => { + const { + timestamp, + source, + sourceServiceId, + sourceDevice, + wasSentEncrypted, + } = receipt; + const sourceConversation = window.ConversationController.lookupOrCreate( + { + serviceId: sourceServiceId, + e164: source, + reason: `onReadOrViewReceipt(${envelopeTimestamp})`, + } + ); - const attributes: MessageReceiptAttributesType = { - envelopeId: event.receipt.envelopeId, - removeFromMessageReceiverCache: event.confirm, - messageSentAt: timestamp, - receiptTimestamp: envelopeTimestamp, - sourceConversationId: sourceConversation.id, - sourceServiceId, - sourceDevice, - type, - wasSentEncrypted, - }; - drop(MessageReceipts.onReceipt(attributes)); + log.info( + logTitle, + `${sourceServiceId || source}.${sourceDevice}`, + envelopeTimestamp, + 'for sent message', + timestamp + ); + + if (!sourceConversation) { + log.error(`${logId}: Failed to create conversation`); + return undefined; + } + if (!isServiceIdString(sourceServiceId)) { + log.error(`${logId}: Missing sourceServiceId`); + return undefined; + } + if (!sourceDevice) { + log.error(`${logId}: Missing sourceDevice`); + return undefined; + } + + const data: ReceiptSyncTaskType = { + messageSentAt: timestamp, + receiptTimestamp: envelopeTimestamp, + sourceConversationId: sourceConversation.id, + sourceServiceId, + sourceDevice, + type, + wasSentEncrypted, + }; + return { + id: generateUuid(), + attempts: 1, + createdAt: Date.now(), + data, + envelopeId, + sentAt: envelopeTimestamp, + type, + }; + }) + .filter(isNotNil); + + log.info(`${logId}: Saving ${syncTasks.length} sync tasks`); + + await window.Signal.Data.saveSyncTasks(syncTasks); + + confirm(); + + log.info(`${logId}: Queuing ${syncTasks.length} sync tasks`); + + await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); + + log.info(`${logId}: Done`); } async function onReadSync(ev: ReadSyncEvent): Promise { - const { envelopeTimestamp, sender, senderAci, timestamp } = ev.read; - const readAt = envelopeTimestamp; - const { conversation: senderConversation } = - window.ConversationController.maybeMergeContacts({ - aci: senderAci, - e164: sender, - reason: 'onReadSync', - }); - const senderId = senderConversation?.id; + const { reads, envelopeTimestamp, envelopeId, confirm } = ev; + const logId = `onReadSync(envelope=${envelopeTimestamp}, envelopeId=${envelopeId})`; - log.info( - 'read sync', - sender, - senderAci, - envelopeTimestamp, - senderId, - 'for message', - timestamp - ); + const syncTasks = reads + .map((read): SyncTaskType | undefined => { + const { sender, senderAci, timestamp } = read; + const readAt = envelopeTimestamp; + const { conversation: senderConversation } = + window.ConversationController.maybeMergeContacts({ + aci: senderAci, + e164: sender, + reason: 'onReadSync', + }); + const senderId = senderConversation?.id; - strictAssert(senderId, 'onReadSync missing senderId'); - strictAssert(senderAci, 'onReadSync missing senderAci'); - strictAssert(timestamp, 'onReadSync missing timestamp'); + log.info( + 'read sync', + sender, + senderAci, + envelopeTimestamp, + senderId, + 'for message', + timestamp + ); - const attributes: ReadSyncAttributesType = { - envelopeId: ev.read.envelopeId, - removeFromMessageReceiverCache: ev.confirm, - senderId, - sender, - senderAci, - timestamp, - readAt, - }; + if (!senderId) { + log.error(`${logId}: missing senderId`); + return undefined; + } + if (!senderAci) { + log.error(`${logId}: missing senderAci`); + return undefined; + } + if (!timestamp) { + log.error(`${logId}: missing timestamp`); + return undefined; + } - await ReadSyncs.onSync(attributes); + const data: ReadSyncTaskType = { + type: 'ReadSync', + senderId, + sender, + senderAci, + timestamp, + readAt, + }; + return { + id: generateUuid(), + attempts: 1, + createdAt: Date.now(), + data, + envelopeId, + sentAt: envelopeTimestamp, + type: 'ReadSync', + }; + }) + .filter(isNotNil); + + log.info(`${logId}: Saving ${syncTasks.length} sync tasks`); + + await window.Signal.Data.saveSyncTasks(syncTasks); + + confirm(); + + log.info(`${logId}: Queuing ${syncTasks.length} sync tasks`); + + await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); + + log.info(`${logId}: Done`); } async function onViewSync(ev: ViewSyncEvent): Promise { - const { envelopeTimestamp, senderE164, senderAci, timestamp } = ev.view; - const { conversation: senderConversation } = - window.ConversationController.maybeMergeContacts({ - e164: senderE164, - aci: senderAci, - reason: 'onViewSync', - }); - const senderId = senderConversation?.id; + const { envelopeTimestamp, envelopeId, views, confirm } = ev; + const logId = `onViewSync=(envelope=${envelopeTimestamp}, envelopeId=${envelopeId})`; - log.info( - 'view sync', - senderE164, - senderAci, - envelopeTimestamp, - senderId, - 'for message', - timestamp - ); + const syncTasks = views + .map((view): SyncTaskType | undefined => { + const { senderAci, senderE164, timestamp } = view; - strictAssert(senderId, 'onViewSync missing senderId'); - strictAssert(senderAci, 'onViewSync missing senderAci'); - strictAssert(timestamp, 'onViewSync missing timestamp'); + const { conversation: senderConversation } = + window.ConversationController.maybeMergeContacts({ + e164: senderE164, + aci: senderAci, + reason: 'onViewSync', + }); + const senderId = senderConversation?.id; - const attributes: ViewSyncAttributesType = { - envelopeId: ev.view.envelopeId, - removeFromMessageReceiverCache: ev.confirm, - senderId, - senderE164, - senderAci, - timestamp, - viewedAt: envelopeTimestamp, - }; + log.info( + 'view sync', + senderE164, + senderAci, + envelopeTimestamp, + senderId, + 'for message', + timestamp + ); - await ViewSyncs.onSync(attributes); + if (!senderId) { + log.error(`${logId}: missing senderId`); + return undefined; + } + if (!senderAci) { + log.error(`${logId}: missing senderAci`); + return undefined; + } + if (!timestamp) { + log.error(`${logId}: missing timestamp`); + return undefined; + } + + const data: ViewSyncTaskType = { + type: 'ViewSync', + senderId, + senderE164, + senderAci, + timestamp, + viewedAt: envelopeTimestamp, + }; + return { + id: generateUuid(), + attempts: 1, + createdAt: Date.now(), + data, + envelopeId, + sentAt: envelopeTimestamp, + type: 'ViewSync', + }; + }) + .filter(isNotNil); + + log.info(`${logId}: Saving ${syncTasks.length} sync tasks`); + + await window.Signal.Data.saveSyncTasks(syncTasks); + + confirm(); + + log.info(`${logId}: Queuing ${syncTasks.length} sync tasks`); + + await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); + + log.info(`${logId}: Done`); } - function onDeliveryReceipt(ev: DeliveryEvent): void { - const { deliveryReceipt } = ev; - const { - envelopeTimestamp, - sourceServiceId, - source, - sourceDevice, - timestamp, - wasSentEncrypted, - } = deliveryReceipt; + async function onDeliveryReceipt(ev: DeliveryEvent): Promise { + const { deliveryReceipts, envelopeId, envelopeTimestamp, confirm } = ev; + const logId = `onDeliveryReceipt(envelope=${envelopeTimestamp}, envelopeId=${envelopeId})`; - const sourceConversation = window.ConversationController.lookupOrCreate({ - serviceId: sourceServiceId, - e164: source, - reason: `onDeliveryReceipt(${envelopeTimestamp})`, - }); + strictAssert(envelopeTimestamp, `${logId}: missing envelopeTimestamp`); + strictAssert(envelopeTimestamp, `${logId}: missing envelopeId`); - log.info( - 'delivery receipt from', - `${sourceServiceId || source}.${sourceDevice}`, - envelopeTimestamp, - 'for sent message', - timestamp, - `wasSentEncrypted=${wasSentEncrypted}` - ); + const syncTasks = deliveryReceipts + .map((deliveryReceipt): SyncTaskType | undefined => { + const { + sourceServiceId, + source, + sourceDevice, + timestamp, + wasSentEncrypted, + } = deliveryReceipt; - strictAssert( - envelopeTimestamp, - 'onDeliveryReceipt: missing envelopeTimestamp' - ); - strictAssert( - isServiceIdString(sourceServiceId), - 'onDeliveryReceipt: missing valid sourceServiceId' - ); - strictAssert(sourceDevice, 'onDeliveryReceipt: missing sourceDevice'); - strictAssert(sourceConversation, 'onDeliveryReceipt: missing conversation'); + const sourceConversation = window.ConversationController.lookupOrCreate( + { + serviceId: sourceServiceId, + e164: source, + reason: `onDeliveryReceipt(${envelopeTimestamp})`, + } + ); - const attributes: MessageReceiptAttributesType = { - envelopeId: ev.deliveryReceipt.envelopeId, - removeFromMessageReceiverCache: ev.confirm, - messageSentAt: timestamp, - receiptTimestamp: envelopeTimestamp, - sourceConversationId: sourceConversation.id, - sourceServiceId, - sourceDevice, - type: MessageReceipts.MessageReceiptType.Delivery, - wasSentEncrypted, - }; + log.info( + 'delivery receipt from', + `${sourceServiceId || source}.${sourceDevice}`, + envelopeTimestamp, + 'for sent message', + timestamp, + `wasSentEncrypted=${wasSentEncrypted}` + ); - drop(MessageReceipts.onReceipt(attributes)); + if (!isServiceIdString(sourceServiceId)) { + log.error(`${logId}: missing valid sourceServiceId`); + return undefined; + } + if (!sourceDevice) { + log.error(`${logId}: missing sourceDevice`); + return undefined; + } + if (!sourceConversation) { + log.error(`${logId}: missing conversation`); + return undefined; + } + + const data: ReceiptSyncTaskType = { + messageSentAt: timestamp, + receiptTimestamp: envelopeTimestamp, + sourceConversationId: sourceConversation.id, + sourceServiceId, + sourceDevice, + type: MessageReceipts.messageReceiptTypeSchema.enum.Delivery, + wasSentEncrypted, + }; + return { + id: generateUuid(), + attempts: 1, + createdAt: Date.now(), + data, + envelopeId, + sentAt: envelopeTimestamp, + type: 'Delivery', + }; + }) + .filter(isNotNil); + + log.info(`${logId}: Saving ${syncTasks.length} sync tasks`); + + await window.Signal.Data.saveSyncTasks(syncTasks); + + confirm(); + + log.info(`${logId}: Queuing ${syncTasks.length} sync tasks`); + + await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById); + + log.info(`${logId}: Done`); } async function onDeleteForMeSync(ev: DeleteForMeSyncEvent) { diff --git a/ts/messageModifiers/MessageReceipts.ts b/ts/messageModifiers/MessageReceipts.ts index 40b1618bdad..c083f3eba7c 100644 --- a/ts/messageModifiers/MessageReceipts.ts +++ b/ts/messageModifiers/MessageReceipts.ts @@ -1,6 +1,7 @@ // Copyright 2016 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import { z } from 'zod'; import { groupBy } from 'lodash'; import type { MessageModel } from '../models/messages'; @@ -10,7 +11,7 @@ import { isOutgoing, isStory } from '../state/selectors/message'; import { getOwn } from '../util/getOwn'; import { missingCaseError } from '../util/missingCaseError'; import { createWaitBatcher } from '../util/waitBatcher'; -import type { ServiceIdString } from '../types/ServiceId'; +import { isServiceIdString } from '../types/ServiceId'; import { SendActionType, SendStatus, @@ -23,7 +24,6 @@ import * as log from '../logging/log'; import { getSourceServiceId } from '../messages/helpers'; import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp'; import { getMessageIdForLogging } from '../util/idForLogging'; -import { generateCacheKey } from './generateCacheKey'; import { getPropForTimestamp } from '../util/editHelpers'; import { DELETE_SENT_PROTO_BATCHER_WAIT_MS, @@ -31,34 +31,30 @@ import { } from '../types/Receipt'; import { drop } from '../util/drop'; -const { deleteSentProtoRecipient } = dataInterface; +const { deleteSentProtoRecipient, removeSyncTaskById } = dataInterface; -export enum MessageReceiptType { - Delivery = 'Delivery', - Read = 'Read', - View = 'View', -} +export const messageReceiptTypeSchema = z.enum(['Delivery', 'Read', 'View']); + +export type MessageReceiptType = z.infer; + +export const receiptSyncTaskSchema = z.object({ + messageSentAt: z.number(), + receiptTimestamp: z.number(), + sourceConversationId: z.string(), + sourceDevice: z.number(), + sourceServiceId: z.string().refine(isServiceIdString), + type: messageReceiptTypeSchema, + wasSentEncrypted: z.boolean(), +}); + +export type ReceiptSyncTaskType = z.infer; export type MessageReceiptAttributesType = { envelopeId: string; - messageSentAt: number; - receiptTimestamp: number; - removeFromMessageReceiverCache: () => void; - sourceConversationId: string; - sourceDevice: number; - sourceServiceId: ServiceIdString; - type: MessageReceiptType; - wasSentEncrypted: boolean; + syncTaskId: string; + receiptSync: ReceiptSyncTaskType; }; -function getReceiptCacheKey(receipt: MessageReceiptAttributesType): string { - return generateCacheKey({ - sender: receipt.sourceServiceId, - timestamp: receipt.messageSentAt, - type: receipt.type, - }); -} - const cachedReceipts = new Map(); const processReceiptBatcher = createWaitBatcher({ @@ -69,7 +65,7 @@ const processReceiptBatcher = createWaitBatcher({ // First group by sentAt, so that we can find the target message const receiptsByMessageSentAt = groupBy( receipts, - receipt => receipt.messageSentAt + receipt => receipt.receiptSync.messageSentAt ); // Once we find the message, we'll group them by messageId to process @@ -99,7 +95,7 @@ const processReceiptBatcher = createWaitBatcher({ continue; } // All receipts have the same sentAt, so we can grab it from the first - const sentAt = receiptsForMessageSentAt[0].messageSentAt; + const sentAt = receiptsForMessageSentAt[0].receiptSync.messageSentAt; const messagesMatchingTimestamp = // eslint-disable-next-line no-await-in-loop @@ -114,14 +110,16 @@ const processReceiptBatcher = createWaitBatcher({ if (reaction) { for (const receipt of receiptsForMessageSentAt) { + const { receiptSync } = receipt; log.info( 'MesageReceipts.processReceiptBatcher: Got receipt for reaction', - receipt.messageSentAt, - receipt.type, - receipt.sourceConversationId, - receipt.sourceServiceId + receiptSync.messageSentAt, + receiptSync.type, + receiptSync.sourceConversationId, + receiptSync.sourceServiceId ); - remove(receipt); + // eslint-disable-next-line no-await-in-loop + await remove(receipt); } continue; } @@ -129,7 +127,7 @@ const processReceiptBatcher = createWaitBatcher({ for (const receipt of receiptsForMessageSentAt) { const targetMessage = getTargetMessage({ - sourceConversationId: receipt.sourceConversationId, + sourceConversationId: receipt.receiptSync.sourceConversationId, targetTimestamp: sentAt, messagesMatchingTimestamp, }); @@ -144,7 +142,9 @@ const processReceiptBatcher = createWaitBatcher({ item.sendStateByConversationId && !item.deletedForEveryone && Boolean( - item.sendStateByConversationId[receipt.sourceConversationId] + item.sendStateByConversationId[ + receipt.receiptSync.sourceConversationId + ] ) ); @@ -154,12 +154,13 @@ const processReceiptBatcher = createWaitBatcher({ ); } else { // Nope, no target message was found + const { receiptSync } = receipt; log.info( 'MessageReceipts.processReceiptBatcher: No message for receipt', - receipt.messageSentAt, - receipt.type, - receipt.sourceConversationId, - receipt.sourceServiceId + receiptSync.messageSentAt, + receiptSync.type, + receiptSync.sourceConversationId, + receiptSync.sourceServiceId ); } } @@ -190,7 +191,7 @@ async function processReceiptsForMessage( messageId ); - const { updatedMessage, validReceipts } = updateMessageWithReceipts( + const { updatedMessage, validReceipts } = await updateMessageWithReceipts( message, receipts ); @@ -204,7 +205,8 @@ async function processReceiptsForMessage( // Confirm/remove receipts, and delete sent protos for (const receipt of validReceipts) { - remove(receipt); + // eslint-disable-next-line no-await-in-loop + await remove(receipt); drop(addToDeleteSentProtoBatcher(receipt, updatedMessage)); } @@ -215,25 +217,27 @@ async function processReceiptsForMessage( conversation?.debouncedUpdateLastMessage?.(); } -function updateMessageWithReceipts( +async function updateMessageWithReceipts( message: MessageAttributesType, receipts: Array -): { +): Promise<{ updatedMessage: MessageAttributesType; validReceipts: Array; -} { +}> { const logId = `updateMessageWithReceipts(timestamp=${message.timestamp})`; + const toRemove: Array = []; const receiptsToProcess = receipts.filter(receipt => { if (shouldDropReceipt(receipt, message)) { + const { receiptSync } = receipt; log.info( - `${logId}: Dropping a receipt ${receipt.type} for sentAt=${receipt.messageSentAt}` + `${logId}: Dropping a receipt ${receiptSync.type} for sentAt=${receiptSync.messageSentAt}` ); - remove(receipt); + toRemove.push(receipt); return false; } - if (!cachedReceipts.has(getReceiptCacheKey(receipt))) { + if (!cachedReceipts.has(receipt.syncTaskId)) { // Between the time it was received and now, this receipt has already been handled! return false; } @@ -241,6 +245,8 @@ function updateMessageWithReceipts( return true; }); + await Promise.all(toRemove.map(remove)); + log.info( `${logId}: batch processing ${receipts.length}` + ` receipt${receipts.length === 1 ? '' : 's'}` @@ -287,9 +293,10 @@ const deleteSentProtoBatcher = createWaitBatcher({ }, }); -function remove(receipt: MessageReceiptAttributesType): void { - cachedReceipts.delete(getReceiptCacheKey(receipt)); - receipt.removeFromMessageReceiverCache(); +async function remove(receipt: MessageReceiptAttributesType): Promise { + const { syncTaskId } = receipt; + cachedReceipts.delete(syncTaskId); + await removeSyncTaskById(syncTaskId); } function getTargetMessage({ @@ -372,13 +379,13 @@ const shouldDropReceipt = ( receipt: MessageReceiptAttributesType, message: MessageAttributesType ): boolean => { - const { type } = receipt; + const { type } = receipt.receiptSync; switch (type) { - case MessageReceiptType.Delivery: + case messageReceiptTypeSchema.Enum.Delivery: return false; - case MessageReceiptType.Read: + case messageReceiptTypeSchema.Enum.Read: return !window.storage.get('read-receipt-setting'); - case MessageReceiptType.View: + case messageReceiptTypeSchema.Enum.View: if (isStory(message)) { return !window.Events.getStoryViewReceiptsEnabled(); } @@ -388,9 +395,9 @@ const shouldDropReceipt = ( } }; -export function forMessage( +export async function forMessage( message: MessageModel -): Array { +): Promise> { if (!isOutgoing(message.attributes) && !isStory(message.attributes)) { return []; } @@ -408,20 +415,23 @@ export function forMessage( const receiptValues = Array.from(cachedReceipts.values()); const sentAt = getMessageSentTimestamp(message.attributes, { log }); - const result = receiptValues.filter(item => item.messageSentAt === sentAt); + const result = receiptValues.filter( + item => item.receiptSync.messageSentAt === sentAt + ); if (result.length > 0) { log.info(`${logId}: found early receipts for message ${sentAt}`); - result.forEach(receipt => { - remove(receipt); - }); + await Promise.all( + result.map(async receipt => { + await remove(receipt); + }) + ); } return result.filter(receipt => { if (shouldDropReceipt(receipt, message.attributes)) { log.info( - `${logId}: Dropping an early receipt ${receipt.type} for message ${sentAt}` + `${logId}: Dropping an early receipt ${receipt.receiptSync.type} for message ${sentAt}` ); - remove(receipt); return false; } @@ -433,7 +443,7 @@ function getNewSendStateByConversationId( oldSendStateByConversationId: SendStateByConversationId, receipt: MessageReceiptAttributesType ): SendStateByConversationId { - const { receiptTimestamp, sourceConversationId, type } = receipt; + const { receiptTimestamp, sourceConversationId, type } = receipt.receiptSync; const oldSendState = getOwn( oldSendStateByConversationId, sourceConversationId @@ -441,13 +451,13 @@ function getNewSendStateByConversationId( let sendActionType: SendActionType; switch (type) { - case MessageReceiptType.Delivery: + case messageReceiptTypeSchema.enum.Delivery: sendActionType = SendActionType.GotDeliveryReceipt; break; - case MessageReceiptType.Read: + case messageReceiptTypeSchema.enum.Read: sendActionType = SendActionType.GotReadReceipt; break; - case MessageReceiptType.View: + case messageReceiptTypeSchema.enum.View: sendActionType = SendActionType.GotViewedReceipt; break; default: @@ -467,7 +477,7 @@ function updateMessageSendStateWithReceipt( message: MessageAttributesType, receipt: MessageReceiptAttributesType ): Partial { - const { messageSentAt } = receipt; + const { messageSentAt } = receipt.receiptSync; const newAttributes: Partial = {}; @@ -510,27 +520,34 @@ async function addToDeleteSentProtoBatcher( receipt: MessageReceiptAttributesType, message: MessageAttributesType ) { - const { sourceConversationId, type } = receipt; + const { receiptSync } = receipt; + const { + sourceConversationId, + type, + wasSentEncrypted, + messageSentAt, + sourceDevice, + } = receiptSync; if ( - (type === MessageReceiptType.Delivery && + (type === messageReceiptTypeSchema.enum.Delivery && wasDeliveredWithSealedSender(sourceConversationId, message) && - receipt.wasSentEncrypted) || - type === MessageReceiptType.Read + wasSentEncrypted) || + type === messageReceiptTypeSchema.enum.Read ) { const recipient = window.ConversationController.get(sourceConversationId); const recipientServiceId = recipient?.getServiceId(); - const deviceId = receipt.sourceDevice; + const deviceId = sourceDevice; if (recipientServiceId && deviceId) { await deleteSentProtoBatcher.add({ - timestamp: receipt.messageSentAt, + timestamp: messageSentAt, recipientServiceId, deviceId, }); } else { log.warn( - `MessageReceipts.deleteSentProto(sentAt=${receipt.messageSentAt}): ` + + `MessageReceipts.deleteSentProto(sentAt=${messageSentAt}): ` + `Missing serviceId or deviceId for deliveredTo ${sourceConversationId}` ); } @@ -540,6 +557,6 @@ async function addToDeleteSentProtoBatcher( export async function onReceipt( receipt: MessageReceiptAttributesType ): Promise { - cachedReceipts.set(getReceiptCacheKey(receipt), receipt); + cachedReceipts.set(receipt.syncTaskId, receipt); await processReceiptBatcher.add(receipt); } diff --git a/ts/messageModifiers/ReadSyncs.ts b/ts/messageModifiers/ReadSyncs.ts index 71db8c548cc..bed78ca3e84 100644 --- a/ts/messageModifiers/ReadSyncs.ts +++ b/ts/messageModifiers/ReadSyncs.ts @@ -1,7 +1,8 @@ // Copyright 2017 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import type { AciString } from '../types/ServiceId'; +import { z } from 'zod'; + import type { MessageModel } from '../models/messages'; import * as Errors from '../types/errors'; import * as log from '../logging/log'; @@ -14,58 +15,69 @@ import { isMessageUnread } from '../util/isMessageUnread'; import { notificationService } from '../services/notifications'; import { queueUpdateMessage } from '../util/messageBatcher'; import { strictAssert } from '../util/assert'; -import { generateCacheKey } from './generateCacheKey'; +import { isAciString } from '../util/isAciString'; +import dataInterface from '../sql/Client'; + +const { removeSyncTaskById } = dataInterface; + +export const readSyncTaskSchema = z.object({ + type: z.literal('ReadSync').readonly(), + readAt: z.number(), + sender: z.string().optional(), + senderAci: z.string().refine(isAciString), + senderId: z.string(), + timestamp: z.number(), +}); + +export type ReadSyncTaskType = z.infer; export type ReadSyncAttributesType = { envelopeId: string; - readAt: number; - removeFromMessageReceiverCache: () => unknown; - sender?: string; - senderAci: AciString; - senderId: string; - timestamp: number; + syncTaskId: string; + readSync: ReadSyncTaskType; }; const readSyncs = new Map(); -function remove(sync: ReadSyncAttributesType): void { - readSyncs.delete( - generateCacheKey({ - sender: sync.senderId, - timestamp: sync.timestamp, - type: 'readsync', - }) - ); - sync.removeFromMessageReceiverCache(); +async function remove(sync: ReadSyncAttributesType): Promise { + const { syncTaskId } = sync; + readSyncs.delete(syncTaskId); + await removeSyncTaskById(syncTaskId); } async function maybeItIsAReactionReadSync( sync: ReadSyncAttributesType ): Promise { - const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`; + const { readSync } = sync; + const logId = `ReadSyncs.onSync(timestamp=${readSync.timestamp})`; const readReaction = await window.Signal.Data.markReactionAsRead( - sync.senderAci, - Number(sync.timestamp) + readSync.senderAci, + Number(readSync.timestamp) ); if ( !readReaction || readReaction?.targetAuthorAci !== window.storage.user.getCheckedAci() ) { - log.info(`${logId} not found:`, sync.senderId, sync.sender, sync.senderAci); + log.info( + `${logId} not found:`, + readSync.senderId, + readSync.sender, + readSync.senderAci + ); return; } log.info( `${logId} read reaction sync found:`, readReaction.conversationId, - sync.senderId, - sync.sender, - sync.senderAci + readSync.senderId, + readSync.sender, + readSync.senderAci ); - remove(sync); + await remove(sync); notificationService.removeBy({ conversationId: readReaction.conversationId, @@ -75,9 +87,9 @@ async function maybeItIsAReactionReadSync( }); } -export function forMessage( +export async function forMessage( message: MessageModel -): ReadSyncAttributesType | null { +): Promise { const logId = `ReadSyncs.forMessage(${getMessageIdForLogging( message.attributes )})`; @@ -92,13 +104,17 @@ export function forMessage( }); const readSyncValues = Array.from(readSyncs.values()); const foundSync = readSyncValues.find(item => { - return item.senderId === sender?.id && item.timestamp === messageTimestamp; + const { readSync } = item; + return ( + readSync.senderId === sender?.id && + readSync.timestamp === messageTimestamp + ); }); if (foundSync) { log.info( - `${logId}: Found early read sync for message ${foundSync.timestamp}` + `${logId}: Found early read sync for message ${foundSync.readSync.timestamp}` ); - remove(foundSync); + await remove(foundSync); return foundSync; } @@ -106,20 +122,15 @@ export function forMessage( } export async function onSync(sync: ReadSyncAttributesType): Promise { - readSyncs.set( - generateCacheKey({ - sender: sync.senderId, - timestamp: sync.timestamp, - type: 'readsync', - }), - sync - ); + const { readSync, syncTaskId } = sync; - const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`; + readSyncs.set(syncTaskId, sync); + + const logId = `ReadSyncs.onSync(timestamp=${readSync.timestamp})`; try { const messages = await window.Signal.Data.getMessagesBySentAt( - sync.timestamp + readSync.timestamp ); const found = messages.find(item => { @@ -129,7 +140,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise { reason: logId, }); - return isIncoming(item) && sender?.id === sync.senderId; + return isIncoming(item) && sender?.id === readSync.senderId; }); if (!found) { @@ -144,8 +155,8 @@ export async function onSync(sync: ReadSyncAttributesType): Promise { found, 'ReadSyncs.onSync' ); - const readAt = Math.min(sync.readAt, Date.now()); - const newestSentAt = sync.timestamp; + const readAt = Math.min(readSync.readAt, Date.now()); + const newestSentAt = readSync.timestamp; // If message is unread, we mark it read. Otherwise, we update the expiration // timer to the time specified by the read sync if it's earlier than @@ -193,9 +204,9 @@ export async function onSync(sync: ReadSyncAttributesType): Promise { queueUpdateMessage(message.attributes); - remove(sync); + await remove(sync); } catch (error) { - remove(sync); log.error(`${logId} error:`, Errors.toLogFormat(error)); + await remove(sync); } } diff --git a/ts/messageModifiers/ViewSyncs.ts b/ts/messageModifiers/ViewSyncs.ts index d3216c1cdd0..24ba6b5c450 100644 --- a/ts/messageModifiers/ViewSyncs.ts +++ b/ts/messageModifiers/ViewSyncs.ts @@ -1,7 +1,8 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import type { AciString } from '../types/ServiceId'; +import { z } from 'zod'; + import type { MessageModel } from '../models/messages'; import * as Errors from '../types/errors'; import * as log from '../logging/log'; @@ -15,35 +16,38 @@ import { markViewed } from '../services/MessageUpdater'; import { notificationService } from '../services/notifications'; import { queueAttachmentDownloads } from '../util/queueAttachmentDownloads'; import { queueUpdateMessage } from '../util/messageBatcher'; -import { generateCacheKey } from './generateCacheKey'; import { AttachmentDownloadUrgency } from '../jobs/AttachmentDownloadManager'; +import { isAciString } from '../util/isAciString'; +import dataInterface from '../sql/Client'; + +const { removeSyncTaskById } = dataInterface; + +export const viewSyncTaskSchema = z.object({ + type: z.literal('ViewSync').readonly(), + senderAci: z.string().refine(isAciString), + senderE164: z.string().optional(), + senderId: z.string(), + timestamp: z.number(), + viewedAt: z.number(), +}); + +export type ViewSyncTaskType = z.infer; export type ViewSyncAttributesType = { envelopeId: string; - removeFromMessageReceiverCache: () => unknown; - senderAci: AciString; - senderE164?: string; - senderId: string; - timestamp: number; - viewedAt: number; + syncTaskId: string; + viewSync: ViewSyncTaskType; }; const viewSyncs = new Map(); -function remove(sync: ViewSyncAttributesType): void { - viewSyncs.delete( - generateCacheKey({ - sender: sync.senderId, - timestamp: sync.timestamp, - type: 'viewsync', - }) - ); - sync.removeFromMessageReceiverCache(); +async function remove(sync: ViewSyncAttributesType): Promise { + await removeSyncTaskById(sync.syncTaskId); } -export function forMessage( +export async function forMessage( message: MessageModel -): Array { +): Promise> { const logId = `ViewSyncs.forMessage(${getMessageIdForLogging( message.attributes )})`; @@ -60,7 +64,11 @@ export function forMessage( const viewSyncValues = Array.from(viewSyncs.values()); const matchingSyncs = viewSyncValues.filter(item => { - return item.senderId === sender?.id && item.timestamp === messageTimestamp; + const { viewSync } = item; + return ( + viewSync.senderId === sender?.id && + viewSync.timestamp === messageTimestamp + ); }); if (matchingSyncs.length > 0) { @@ -68,28 +76,24 @@ export function forMessage( `${logId}: Found ${matchingSyncs.length} early view sync(s) for message ${messageTimestamp}` ); } - matchingSyncs.forEach(sync => { - remove(sync); - }); + await Promise.all( + matchingSyncs.map(async sync => { + await remove(sync); + }) + ); return matchingSyncs; } export async function onSync(sync: ViewSyncAttributesType): Promise { - viewSyncs.set( - generateCacheKey({ - sender: sync.senderId, - timestamp: sync.timestamp, - type: 'viewsync', - }), - sync - ); + viewSyncs.set(sync.syncTaskId, sync); + const { viewSync } = sync; - const logId = `ViewSyncs.onSync(timestamp=${sync.timestamp})`; + const logId = `ViewSyncs.onSync(timestamp=${viewSync.timestamp})`; try { const messages = await window.Signal.Data.getMessagesBySentAt( - sync.timestamp + viewSync.timestamp ); const found = messages.find(item => { @@ -99,15 +103,15 @@ export async function onSync(sync: ViewSyncAttributesType): Promise { reason: logId, }); - return sender?.id === sync.senderId; + return sender?.id === viewSync.senderId; }); if (!found) { log.info( `${logId}: nothing found`, - sync.senderId, - sync.senderE164, - sync.senderAci + viewSync.senderId, + viewSync.senderE164, + viewSync.senderAci ); return; } @@ -123,7 +127,7 @@ export async function onSync(sync: ViewSyncAttributesType): Promise { if (message.get('readStatus') !== ReadStatus.Viewed) { didChangeMessage = true; - message.set(markViewed(message.attributes, sync.viewedAt)); + message.set(markViewed(message.attributes, viewSync.viewedAt)); const attachments = message.get('attachments'); if (!attachments?.every(isDownloaded)) { @@ -154,9 +158,9 @@ export async function onSync(sync: ViewSyncAttributesType): Promise { queueUpdateMessage(message.attributes); } - remove(sync); + await remove(sync); } catch (error) { - remove(sync); log.error(`${logId} error:`, Errors.toLogFormat(error)); + await remove(sync); } } diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index eaeecd59595..251ac03be85 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -2945,6 +2945,13 @@ export class ConversationModel extends window.Backbone senderAci, }); + if (!this.get('active_at')) { + log.warn( + `addDeliveryIssue: ${this.idForLogging()} has no active_at, dropping delivery issue instead of adding` + ); + return; + } + const message = { conversationId: this.id, type: 'delivery-issue', @@ -3363,7 +3370,9 @@ export class ConversationModel extends window.Backbone const message = window.MessageCache.__DEPRECATED$getById(notificationId); if (message) { - await window.Signal.Data.removeMessage(message.id); + await window.Signal.Data.removeMessage(message.id, { + singleProtoJobQueue, + }); } return true; } @@ -3404,7 +3413,9 @@ export class ConversationModel extends window.Backbone const message = window.MessageCache.__DEPRECATED$getById(notificationId); if (message) { - await window.Signal.Data.removeMessage(message.id); + await window.Signal.Data.removeMessage(message.id, { + singleProtoJobQueue, + }); } return true; @@ -5003,7 +5014,14 @@ export class ConversationModel extends window.Backbone }); window.Signal.Data.updateConversation(this.attributes); - if (source === 'local-delete' && isEnabled('desktop.deleteSync.send')) { + const ourConversation = + window.ConversationController.getOurConversationOrThrow(); + const capable = Boolean(ourConversation.get('capabilities')?.deleteSync); + if ( + source === 'local-delete' && + capable && + isEnabled('desktop.deleteSync.send') + ) { log.info(`${logId}: Preparing sync message`); const timestamp = Date.now(); @@ -5044,7 +5062,9 @@ export class ConversationModel extends window.Backbone log.info(`${logId}: Starting delete`); await window.Signal.Data.removeMessagesInConversation(this.id, { + fromSync: source !== 'local-delete-sync', logId: this.idForLogging(), + singleProtoJobQueue, }); log.info(`${logId}: Delete complete`); } diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 662749254c3..5ea7d90bbe4 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -109,7 +109,7 @@ import { } from '../services/notifications'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; import * as log from '../logging/log'; -import { cleanupMessage, deleteMessageData } from '../util/cleanup'; +import { deleteMessageData } from '../util/cleanup'; import { getSource, getSourceServiceId, @@ -315,10 +315,6 @@ export class MessageModel extends window.Backbone.Model { this.set(attributes); } - async cleanup(): Promise { - await cleanupMessage(this.attributes); - } - async deleteData(): Promise { await deleteMessageData(this.attributes); } diff --git a/ts/services/expiringMessagesDeletion.ts b/ts/services/expiringMessagesDeletion.ts index dcb943041a6..237f11e86e3 100644 --- a/ts/services/expiringMessagesDeletion.ts +++ b/ts/services/expiringMessagesDeletion.ts @@ -4,18 +4,21 @@ import { batch } from 'react-redux'; import { debounce } from 'lodash'; -import type { MessageModel } from '../models/messages'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { sleep } from '../util/sleep'; import { SECOND } from '../util/durations'; import * as Errors from '../types/errors'; +import * as log from '../logging/log'; + +import type { MessageModel } from '../models/messages'; +import type { SingleProtoJobQueue } from '../jobs/singleProtoJobQueue'; class ExpiringMessagesDeletionService { public update: typeof this.checkExpiringMessages; private timeout?: ReturnType; - constructor() { + constructor(private readonly singleProtoJobQueue: SingleProtoJobQueue) { this.update = debounce(this.checkExpiringMessages, 1000); } @@ -42,7 +45,9 @@ class ExpiringMessagesDeletionService { inMemoryMessages.push(message); }); - await window.Signal.Data.removeMessages(messageIds); + await window.Signal.Data.removeMessages(messageIds, { + singleProtoJobQueue: this.singleProtoJobQueue, + }); batch(() => { inMemoryMessages.forEach(message => { @@ -108,5 +113,21 @@ class ExpiringMessagesDeletionService { } } -export const expiringMessagesDeletionService = - new ExpiringMessagesDeletionService(); +// Because this service is used inside of Client.ts, it can't directly reference +// SingleProtoJobQueue. Instead of direct access, it is provided once on startup. +export function initialize(singleProtoJobQueue: SingleProtoJobQueue): void { + if (instance) { + log.warn('Expiring Messages Deletion service is already initialized!'); + return; + } + instance = new ExpiringMessagesDeletionService(singleProtoJobQueue); +} + +export async function update(): Promise { + if (!instance) { + throw new Error('Expiring Messages Deletion service not yet initialized!'); + } + await instance.update(); +} + +let instance: ExpiringMessagesDeletionService; diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index 7622a61d2dd..d37be3e9e80 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -2,13 +2,11 @@ // SPDX-License-Identifier: AGPL-3.0-only import { ipcRenderer as ipc } from 'electron'; -import PQueue from 'p-queue'; -import { batch } from 'react-redux'; import { has, get, groupBy, isTypedArray, last, map, omit } from 'lodash'; import { deleteExternalFiles } from '../types/Conversation'; -import { expiringMessagesDeletionService } from '../services/expiringMessagesDeletion'; +import { update as updateExpiringMessagesService } from '../services/expiringMessagesDeletion'; import { tapToViewMessagesDeletionService } from '../services/tapToViewMessagesDeletionService'; import * as Bytes from '../Bytes'; import { createBatcher } from '../util/batcher'; @@ -24,12 +22,7 @@ import * as Errors from '../types/errors'; import type { StoredJob } from '../jobs/types'; import { formatJobForInsert } from '../jobs/formatJobForInsert'; -import { - cleanupMessage, - cleanupMessageFromMemory, - deleteMessageData, -} from '../util/cleanup'; -import { drop } from '../util/drop'; +import { cleanupMessages } from '../util/cleanup'; import { ipcInvoke, doShutdown } from './channels'; import type { @@ -60,12 +53,12 @@ import type { KyberPreKeyType, StoredKyberPreKeyType, } from './Interface'; -import { MINUTE } from '../util/durations'; import { getMessageIdForLogging } from '../util/idForLogging'; import type { MessageAttributesType } from '../model-types'; import { incrementMessageCounter } from '../util/incrementMessageCounter'; import { generateSnippetAroundMention } from '../util/search'; import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; +import type { SingleProtoJobQueue } from '../jobs/singleProtoJobQueue'; const ERASE_SQL_KEY = 'erase-sql-key'; const ERASE_ATTACHMENTS_KEY = 'erase-attachments'; @@ -104,6 +97,8 @@ const exclusiveInterface: ClientExclusiveInterface = { removeConversation, searchMessages, + removeMessage, + removeMessages, getRecentStoryReplies, getOlderMessagesByConversation, @@ -125,8 +120,6 @@ const exclusiveInterface: ClientExclusiveInterface = { type ClientOverridesType = ClientExclusiveInterface & Pick< ServerInterface, - | 'removeMessage' - | 'removeMessages' | 'saveAttachmentDownloadJob' | 'saveMessage' | 'saveMessages' @@ -142,8 +135,6 @@ const channels: ServerInterface = new Proxy({} as ServerInterface, { const clientExclusiveOverrides: ClientOverridesType = { ...exclusiveInterface, - removeMessage, - removeMessages, saveAttachmentDownloadJob, saveMessage, saveMessages, @@ -562,7 +553,7 @@ async function saveMessage( softAssert(isValidUuid(id), 'saveMessage: messageId is not a UUID'); - void expiringMessagesDeletionService.update(); + void updateExpiringMessagesService(); void tapToViewMessagesDeletionService.update(); return id; @@ -577,26 +568,39 @@ async function saveMessages( options ); - void expiringMessagesDeletionService.update(); + void updateExpiringMessagesService(); void tapToViewMessagesDeletionService.update(); return result; } -async function removeMessage(id: string): Promise { +async function removeMessage( + id: string, + options: { + singleProtoJobQueue: SingleProtoJobQueue; + fromSync?: boolean; + } +): Promise { const message = await channels.getMessageById(id); // Note: It's important to have a fully database-hydrated model to delete here because // it needs to delete all associated on-disk files along with the database delete. if (message) { await channels.removeMessage(id); - await cleanupMessage(message); + await cleanupMessages([message], { + ...options, + markCallHistoryDeleted: dataInterface.markCallHistoryDeleted, + }); } } export async function deleteAndCleanup( messages: Array, - logId: string + logId: string, + options: { + fromSync?: boolean; + singleProtoJobQueue: SingleProtoJobQueue; + } ): Promise { const ids = messages.map(message => message.id); @@ -604,37 +608,26 @@ export async function deleteAndCleanup( await channels.removeMessages(ids); log.info(`deleteAndCleanup/${logId}: Cleanup for ${ids.length} messages...`); - await _cleanupMessages(messages); + await cleanupMessages(messages, { + ...options, + markCallHistoryDeleted: dataInterface.markCallHistoryDeleted, + }); log.info(`deleteAndCleanup/${logId}: Complete`); } -async function _cleanupMessages( - messages: ReadonlyArray -): Promise { - // First, remove messages from memory, so we can batch the updates in redux - batch(() => { - messages.forEach(message => cleanupMessageFromMemory(message)); - }); - - // Then, handle any asynchronous actions (e.g. deleting data from disk) - const queue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 }); - drop( - queue.addAll( - messages.map( - (message: MessageAttributesType) => async () => - deleteMessageData(message) - ) - ) - ); - await queue.onIdle(); -} - async function removeMessages( - messageIds: ReadonlyArray + messageIds: ReadonlyArray, + options: { + fromSync?: boolean; + singleProtoJobQueue: SingleProtoJobQueue; + } ): Promise { const messages = await channels.getMessagesById(messageIds); - await _cleanupMessages(messages); + await cleanupMessages(messages, { + ...options, + markCallHistoryDeleted: dataInterface.markCallHistoryDeleted, + }); await channels.removeMessages(messageIds); } @@ -686,9 +679,13 @@ async function removeMessagesInConversation( { logId, receivedAt, + singleProtoJobQueue, + fromSync, }: { + fromSync?: boolean; logId: string; receivedAt?: number; + singleProtoJobQueue: SingleProtoJobQueue; } ): Promise { let messages; @@ -713,7 +710,7 @@ async function removeMessagesInConversation( } // eslint-disable-next-line no-await-in-loop - await deleteAndCleanup(messages, logId); + await deleteAndCleanup(messages, logId, { fromSync, singleProtoJobQueue }); } while (messages.length > 0); } diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 94d35c4a366..b6176f77e7b 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -34,6 +34,7 @@ import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; import type { GroupSendEndorsementsData } from '../types/GroupSendEndorsements'; import type { SyncTaskType } from '../util/syncTasks'; import type { AttachmentBackupJobType } from '../types/AttachmentBackup'; +import type { SingleProtoJobQueue } from '../jobs/singleProtoJobQueue'; export type AdjacentMessagesByConversationOptionsType = Readonly<{ conversationId: string; @@ -557,8 +558,6 @@ export type DataInterface = { arrayOfMessages: ReadonlyArray, options: { forceSave?: boolean; ourAci: AciString } ) => Promise>; - removeMessage: (id: string) => Promise; - removeMessages: (ids: ReadonlyArray) => Promise; pageMessages: ( cursor?: PageMessagesCursorType ) => Promise; @@ -667,6 +666,7 @@ export type DataInterface = { conversationId: string; }): Promise; getAllCallHistory: () => Promise>; + markCallHistoryDeleted: (callId: string) => Promise; clearCallHistory: (beforeTimestamp: number) => Promise>; cleanupCallHistoryMessages: () => Promise; getCallHistoryUnreadCount(): Promise; @@ -929,6 +929,8 @@ export type ServerInterface = DataInterface & { options?: { limit?: number }; contactServiceIdsMatchingQuery?: Array; }) => Promise>; + removeMessage: (id: string) => Promise; + removeMessages: (ids: ReadonlyArray) => Promise; getRecentStoryReplies( storyId: string, @@ -1022,6 +1024,20 @@ export type ClientExclusiveInterface = { removeConversation: (id: string) => Promise; flushUpdateConversationBatcher: () => Promise; + removeMessage: ( + id: string, + options: { + fromSync?: boolean; + singleProtoJobQueue: SingleProtoJobQueue; + } + ) => Promise; + removeMessages: ( + ids: ReadonlyArray, + options: { + fromSync?: boolean; + singleProtoJobQueue: SingleProtoJobQueue; + } + ) => Promise; searchMessages: ({ query, conversationId, @@ -1084,8 +1100,10 @@ export type ClientExclusiveInterface = { removeMessagesInConversation: ( conversationId: string, options: { + fromSync?: boolean; logId: string; receivedAt?: number; + singleProtoJobQueue: SingleProtoJobQueue; } ) => Promise; removeOtherData: () => Promise; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 7ecd348e35b..6385d894ace 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -347,6 +347,7 @@ const dataInterface: ServerInterface = { getLastConversationMessage, getAllCallHistory, clearCallHistory, + markCallHistoryDeleted, cleanupCallHistoryMessages, getCallHistoryUnreadCount, markCallHistoryRead, @@ -3635,6 +3636,19 @@ async function clearCallHistory( })(); } +async function markCallHistoryDeleted(callId: string): Promise { + const db = await getWritableInstance(); + const [query, params] = sql` + UPDATE callsHistory + SET + status = ${DirectCallStatus.Deleted}, + timestamp = ${Date.now()} + WHERE callId = ${callId}; + `; + + db.prepare(query).run(params); +} + async function cleanupCallHistoryMessages(): Promise { const db = await getWritableInstance(); return db diff --git a/ts/state/ducks/conversations.ts b/ts/state/ducks/conversations.ts index 4521a2119c4..59602a95c13 100644 --- a/ts/state/ducks/conversations.ts +++ b/ts/state/ducks/conversations.ts @@ -195,6 +195,7 @@ import { } from '../../util/deleteForMe'; import { MAX_MESSAGE_COUNT } from '../../util/deleteForMe.types'; import { isEnabled } from '../../RemoteConfig'; +import type { CapabilitiesType } from '../../textsecure/WebAPI'; // State @@ -265,6 +266,7 @@ export type ConversationType = ReadonlyDeep< firstName?: string; profileName?: string; profileLastUpdatedAt?: number; + capabilities?: CapabilitiesType; username?: string; about?: string; aboutText?: string; @@ -1753,7 +1755,9 @@ function deleteMessages({ } } - await window.Signal.Data.removeMessages(messageIds); + await window.Signal.Data.removeMessages(messageIds, { + singleProtoJobQueue, + }); popPanelForConversation()(dispatch, getState, undefined); @@ -1761,7 +1765,11 @@ function deleteMessages({ dispatch(scrollToMessage(conversationId, nearbyMessageId)); } - if (!isEnabled('desktop.deleteSync.send')) { + const ourConversation = + window.ConversationController.getOurConversationOrThrow(); + const capable = Boolean(ourConversation.get('capabilities')?.deleteSync); + + if (!capable || !isEnabled('desktop.deleteSync.send')) { return; } if (messages.length === 0) { diff --git a/ts/state/ducks/stories.ts b/ts/state/ducks/stories.ts index f36371051a6..65a219b4177 100644 --- a/ts/state/ducks/stories.ts +++ b/ts/state/ducks/stories.ts @@ -69,6 +69,7 @@ import { conversationQueueJobEnum, } from '../../jobs/conversationJobQueue'; import { ReceiptType } from '../../types/Receipt'; +import { singleProtoJobQueue } from '../../jobs/singleProtoJobQueue'; export type StoryDataType = ReadonlyDeep< { @@ -284,7 +285,7 @@ function deleteGroupStoryReply( messageId: string ): ThunkAction { return async dispatch => { - await window.Signal.Data.removeMessage(messageId); + await window.Signal.Data.removeMessage(messageId, { singleProtoJobQueue }); dispatch({ type: STORY_REPLY_DELETED, payload: messageId, @@ -1408,10 +1409,7 @@ function removeAllContactStories( log.info(`${logId}: removing ${messages.length} stories`); - await Promise.all([ - messages.map(m => m.cleanup()), - await dataInterface.removeMessages(messageIds), - ]); + await dataInterface.removeMessages(messageIds, { singleProtoJobQueue }); dispatch({ type: 'NOOP', diff --git a/ts/state/selectors/items-extra.ts b/ts/state/selectors/items-extra.ts new file mode 100644 index 00000000000..ea5245210ec --- /dev/null +++ b/ts/state/selectors/items-extra.ts @@ -0,0 +1,38 @@ +// Copyright 2019 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { createSelector } from 'reselect'; + +import { getUserACI } from './user'; +import { getConversationSelector } from './conversations'; +import { getRemoteConfig, isRemoteConfigFlagEnabled } from './items'; + +import type { AciString } from '../../types/ServiceId'; +import type { ConfigMapType } from '../../RemoteConfig'; +import type { GetConversationByIdType } from './conversations'; + +export const getDeleteSyncSendEnabled = createSelector( + getUserACI, + getConversationSelector, + getRemoteConfig, + ( + aci: AciString | undefined, + conversationSelector: GetConversationByIdType, + remoteConfig: ConfigMapType + ): boolean => { + if (!aci) { + return false; + } + const ourConversation = conversationSelector(aci); + if (!ourConversation) { + return false; + } + + const { capabilities } = ourConversation; + if (!capabilities || !capabilities.deleteSync) { + return false; + } + + return isRemoteConfigFlagEnabled(remoteConfig, 'desktop.deleteSync.send'); + } +); diff --git a/ts/state/selectors/items.ts b/ts/state/selectors/items.ts index 88d2367ba26..a3a001a6ea8 100644 --- a/ts/state/selectors/items.ts +++ b/ts/state/selectors/items.ts @@ -127,13 +127,6 @@ export const isInternalUser = createSelector( } ); -export const getDeleteSyncSendEnabled = createSelector( - getRemoteConfig, - (remoteConfig: ConfigMapType): boolean => { - return isRemoteConfigFlagEnabled(remoteConfig, 'desktop.deleteSync.send'); - } -); - // Note: ts/util/stories is the other place this check is done export const getStoriesEnabled = createSelector( getItems, diff --git a/ts/state/smart/ConversationHeader.tsx b/ts/state/smart/ConversationHeader.tsx index ee3157a3880..bb870b2760a 100644 --- a/ts/state/smart/ConversationHeader.tsx +++ b/ts/state/smart/ConversationHeader.tsx @@ -41,10 +41,8 @@ import { import { getHasStoriesSelector } from '../selectors/stories2'; import { getIntl, getTheme, getUserACI } from '../selectors/user'; import { useItemsActions } from '../ducks/items'; -import { - getDeleteSyncSendEnabled, - getLocalDeleteWarningShown, -} from '../selectors/items'; +import { getLocalDeleteWarningShown } from '../selectors/items'; +import { getDeleteSyncSendEnabled } from '../selectors/items-extra'; export type OwnProps = { id: string; diff --git a/ts/state/smart/DeleteMessagesModal.tsx b/ts/state/smart/DeleteMessagesModal.tsx index 3e8dbbf52ee..1a4a19eb97b 100644 --- a/ts/state/smart/DeleteMessagesModal.tsx +++ b/ts/state/smart/DeleteMessagesModal.tsx @@ -17,10 +17,8 @@ import { } from '../selectors/conversations'; import { getDeleteMessagesProps } from '../selectors/globalModals'; import { useItemsActions } from '../ducks/items'; -import { - getLocalDeleteWarningShown, - getDeleteSyncSendEnabled, -} from '../selectors/items'; +import { getLocalDeleteWarningShown } from '../selectors/items'; +import { getDeleteSyncSendEnabled } from '../selectors/items-extra'; import { LocalDeleteWarningModal } from '../../components/LocalDeleteWarningModal'; export const SmartDeleteMessagesModal = memo( diff --git a/ts/test-both/util/filterAndSortConversations_test.ts b/ts/test-both/util/filterAndSortConversations_test.ts index ee92e1381af..094281de5c9 100644 --- a/ts/test-both/util/filterAndSortConversations_test.ts +++ b/ts/test-both/util/filterAndSortConversations_test.ts @@ -59,9 +59,9 @@ describe('filterAndSortConversations', () => { check({ searchTerm: '9876', input: [ - { title: 'no' }, + { title: 'no', e164: undefined }, { title: 'yes', e164: '+16505559876' }, - { title: 'no' }, + { title: 'no', e164: undefined }, ], expected: [{ title: 'yes' }], }); diff --git a/ts/test-electron/MessageReceipts_test.ts b/ts/test-electron/MessageReceipts_test.ts index fff7841926a..458507ae35e 100644 --- a/ts/test-electron/MessageReceipts_test.ts +++ b/ts/test-electron/MessageReceipts_test.ts @@ -7,10 +7,13 @@ import { assert } from 'chai'; import { type AciString, generateAci } from '../types/ServiceId'; import type { MessageAttributesType } from '../model-types'; import { SendStatus } from '../messages/MessageSendState'; -import { - type MessageReceiptAttributesType, +import type { + MessageReceiptAttributesType, MessageReceiptType, +} from '../messageModifiers/MessageReceipts'; +import { onReceipt, + messageReceiptTypeSchema, } from '../messageModifiers/MessageReceipts'; import { ReadStatus } from '../messages/MessageReadStatus'; @@ -31,14 +34,16 @@ describe('MessageReceipts', () => { ): MessageReceiptAttributesType { return { envelopeId: uuid(), - messageSentAt, - receiptTimestamp: 1, - removeFromMessageReceiverCache: () => null, - sourceConversationId, - sourceDevice: 1, - sourceServiceId: generateAci(), - type, - wasSentEncrypted: true, + syncTaskId: uuid(), + receiptSync: { + messageSentAt, + receiptTimestamp: 1, + sourceConversationId, + sourceDevice: 1, + sourceServiceId: generateAci(), + type, + wasSentEncrypted: true, + }, }; } it('processes all receipts in a batch', async () => { @@ -78,10 +83,18 @@ describe('MessageReceipts', () => { }); await Promise.all([ - onReceipt(generateReceipt('aaaa', sentAt, MessageReceiptType.Delivery)), - onReceipt(generateReceipt('bbbb', sentAt, MessageReceiptType.Delivery)), - onReceipt(generateReceipt('cccc', sentAt, MessageReceiptType.Read)), - onReceipt(generateReceipt('aaaa', sentAt, MessageReceiptType.Read)), + onReceipt( + generateReceipt('aaaa', sentAt, messageReceiptTypeSchema.enum.Delivery) + ), + onReceipt( + generateReceipt('bbbb', sentAt, messageReceiptTypeSchema.enum.Delivery) + ), + onReceipt( + generateReceipt('cccc', sentAt, messageReceiptTypeSchema.enum.Read) + ), + onReceipt( + generateReceipt('aaaa', sentAt, messageReceiptTypeSchema.enum.Read) + ), ]); const messageFromDatabase = await window.Signal.Data.getMessageById(id); @@ -154,20 +167,48 @@ describe('MessageReceipts', () => { await Promise.all([ // send receipts for original message - onReceipt(generateReceipt('aaaa', sentAt, MessageReceiptType.Delivery)), - onReceipt(generateReceipt('bbbb', sentAt, MessageReceiptType.Delivery)), - onReceipt(generateReceipt('cccc', sentAt, MessageReceiptType.Read)), - onReceipt(generateReceipt('aaaa', sentAt, MessageReceiptType.Read)), + onReceipt( + generateReceipt('aaaa', sentAt, messageReceiptTypeSchema.enum.Delivery) + ), + onReceipt( + generateReceipt('bbbb', sentAt, messageReceiptTypeSchema.enum.Delivery) + ), + onReceipt( + generateReceipt('cccc', sentAt, messageReceiptTypeSchema.enum.Read) + ), + onReceipt( + generateReceipt('aaaa', sentAt, messageReceiptTypeSchema.enum.Read) + ), // and send receipts for edited message onReceipt( - generateReceipt('aaaa', editedSentAt, MessageReceiptType.Delivery) + generateReceipt( + 'aaaa', + editedSentAt, + messageReceiptTypeSchema.enum.Delivery + ) ), onReceipt( - generateReceipt('bbbb', editedSentAt, MessageReceiptType.Delivery) + generateReceipt( + 'bbbb', + editedSentAt, + messageReceiptTypeSchema.enum.Delivery + ) + ), + onReceipt( + generateReceipt( + 'cccc', + editedSentAt, + messageReceiptTypeSchema.enum.Read + ) + ), + onReceipt( + generateReceipt( + 'bbbb', + editedSentAt, + messageReceiptTypeSchema.enum.Read + ) ), - onReceipt(generateReceipt('cccc', editedSentAt, MessageReceiptType.Read)), - onReceipt(generateReceipt('bbbb', editedSentAt, MessageReceiptType.Read)), ]); const messageFromDatabase = await window.Signal.Data.getMessageById(id); diff --git a/ts/test-electron/sql/sendLog_test.ts b/ts/test-electron/sql/sendLog_test.ts index 9c495e2f2c2..4d87d726b4c 100644 --- a/ts/test-electron/sql/sendLog_test.ts +++ b/ts/test-electron/sql/sendLog_test.ts @@ -7,6 +7,7 @@ import { v4 as generateUuid } from 'uuid'; import dataInterface from '../../sql/Client'; import { generateAci } from '../../types/ServiceId'; import { constantTimeEqual, getRandomBytes } from '../../Crypto'; +import { singleProtoJobQueue } from '../../jobs/singleProtoJobQueue'; const { _getAllSentProtoMessageIds, @@ -148,7 +149,7 @@ describe('sql/sendLog', () => { assert.strictEqual(actual.timestamp, proto.timestamp); - await removeMessage(id); + await removeMessage(id, { singleProtoJobQueue }); assert.lengthOf(await getAllSentProtos(), 0); }); diff --git a/ts/test-electron/textsecure/KeyChangeListener_test.ts b/ts/test-electron/textsecure/KeyChangeListener_test.ts index d96774ffe3d..ea78279b0bd 100644 --- a/ts/test-electron/textsecure/KeyChangeListener_test.ts +++ b/ts/test-electron/textsecure/KeyChangeListener_test.ts @@ -11,6 +11,7 @@ import { SignalProtocolStore } from '../../SignalProtocolStore'; import type { ConversationModel } from '../../models/conversations'; import * as KeyChangeListener from '../../textsecure/KeyChangeListener'; import * as Bytes from '../../Bytes'; +import { singleProtoJobQueue } from '../../jobs/singleProtoJobQueue'; describe('KeyChangeListener', () => { let oldNumberId: string | undefined; @@ -69,6 +70,7 @@ describe('KeyChangeListener', () => { afterEach(async () => { await window.Signal.Data.removeMessagesInConversation(convo.id, { logId: ourServiceIdWithKeyChange, + singleProtoJobQueue, }); await window.Signal.Data.removeConversation(convo.id); @@ -106,6 +108,7 @@ describe('KeyChangeListener', () => { afterEach(async () => { await window.Signal.Data.removeMessagesInConversation(groupConvo.id, { logId: ourServiceIdWithKeyChange, + singleProtoJobQueue, }); await window.Signal.Data.removeConversation(groupConvo.id); }); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 3fd97b6f6bc..b0f09386287 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -137,6 +137,8 @@ import type { DeleteForMeSyncEventData, DeleteForMeSyncTarget, ConversationToDelete, + ViewSyncEventData, + ReadSyncEventData, } from './messageReceiverEvents'; import * as log from '../logging/log'; import * as durations from '../util/durations'; @@ -1728,15 +1730,17 @@ export default class MessageReceiver await this.dispatchAndWait( getEnvelopeId(envelope), new DeliveryEvent( - { - envelopeId: envelope.id, - timestamp: envelope.timestamp, - envelopeTimestamp: envelope.timestamp, - source: envelope.source, - sourceServiceId: envelope.sourceServiceId, - sourceDevice: envelope.sourceDevice, - wasSentEncrypted: false, - }, + [ + { + timestamp: envelope.timestamp, + source: envelope.source, + sourceServiceId: envelope.sourceServiceId, + sourceDevice: envelope.sourceDevice, + wasSentEncrypted: false, + }, + ], + envelope.id, + envelope.timestamp, this.removeFromCache.bind(this, envelope) ) ); @@ -2907,22 +2911,22 @@ export default class MessageReceiver const logId = getEnvelopeId(envelope); - await Promise.all( - receiptMessage.timestamp.map(async rawTimestamp => { - const ev = new EventClass( - { - envelopeId: envelope.id, - timestamp: rawTimestamp?.toNumber(), - envelopeTimestamp: envelope.timestamp, - source: envelope.source, - sourceServiceId: envelope.sourceServiceId, - sourceDevice: envelope.sourceDevice, - wasSentEncrypted: true, - }, - this.removeFromCache.bind(this, envelope) - ); - await this.dispatchAndWait(logId, ev); - }) + const receipts = receiptMessage.timestamp.map(rawTimestamp => ({ + timestamp: rawTimestamp?.toNumber(), + source: envelope.source, + sourceServiceId: envelope.sourceServiceId, + sourceDevice: envelope.sourceDevice, + wasSentEncrypted: true as const, + })); + + await this.dispatchAndWait( + logId, + new EventClass( + receipts, + envelope.id, + envelope.timestamp, + this.removeFromCache.bind(this, envelope) + ) ); } @@ -3469,23 +3473,27 @@ export default class MessageReceiver logUnexpectedUrgentValue(envelope, 'readSync'); - const results = []; - for (const { timestamp, sender, senderAci } of read) { - const ev = new ReadSyncEvent( - { - envelopeId: envelope.id, - envelopeTimestamp: envelope.timestamp, - timestamp: timestamp?.toNumber(), - sender: dropNull(sender), - senderAci: senderAci - ? normalizeAci(senderAci, 'handleRead.senderAci') - : undefined, - }, + const reads = read.map( + ({ timestamp, sender, senderAci }): ReadSyncEventData => ({ + envelopeId: envelope.id, + envelopeTimestamp: envelope.timestamp, + timestamp: timestamp?.toNumber(), + sender: dropNull(sender), + senderAci: senderAci + ? normalizeAci(senderAci, 'handleRead.senderAci') + : undefined, + }) + ); + + await this.dispatchAndWait( + logId, + new ReadSyncEvent( + reads, + envelope.id, + envelope.timestamp, this.removeFromCache.bind(this, envelope) - ); - results.push(this.dispatchAndWait(logId, ev)); - } - await Promise.all(results); + ) + ); } private async handleViewed( @@ -3497,23 +3505,25 @@ export default class MessageReceiver logUnexpectedUrgentValue(envelope, 'viewSync'); - await Promise.all( - viewed.map(async ({ timestamp, senderE164, senderAci }) => { - const ev = new ViewSyncEvent( - { - envelopeId: envelope.id, - envelopeTimestamp: envelope.timestamp, - timestamp: timestamp?.toNumber(), - senderE164: dropNull(senderE164), - senderAci: senderAci - ? normalizeAci(senderAci, 'handleViewed.senderAci') - : undefined, - }, - this.removeFromCache.bind(this, envelope) - ); - await this.dispatchAndWait(logId, ev); + const views = viewed.map( + ({ timestamp, senderE164, senderAci }): ViewSyncEventData => ({ + timestamp: timestamp?.toNumber(), + senderE164: dropNull(senderE164), + senderAci: senderAci + ? normalizeAci(senderAci, 'handleViewed.senderAci') + : undefined, }) ); + + await this.dispatchAndWait( + logId, + new ViewSyncEvent( + views, + envelope.id, + envelope.timestamp, + this.removeFromCache.bind(this, envelope) + ) + ); } private async handleCallEvent( @@ -3663,19 +3673,28 @@ export default class MessageReceiver ? processConversationToDelete(item.conversation, logId) : undefined; - if (messages?.length && conversation) { - // We want each message in its own task - return messages.map(innerItem => { - return { - type: 'delete-message' as const, - message: innerItem, - conversation, - timestamp, - }; - }); + if (!conversation) { + log.warn( + `${logId}/handleDeleteForMeSync/messageDeletes: No target conversation` + ); + return undefined; + } + if (!messages?.length) { + log.warn( + `${logId}/handleDeleteForMeSync/messageDeletes: No target messages` + ); + return undefined; } - return undefined; + // We want each message in its own task + return messages.map(innerItem => { + return { + type: 'delete-message' as const, + message: innerItem, + conversation, + timestamp, + }; + }); }) .filter(isNotNil); @@ -3692,17 +3711,26 @@ export default class MessageReceiver ? processConversationToDelete(item.conversation, logId) : undefined; - if (mostRecentMessages?.length && conversation) { - return { - type: 'delete-conversation' as const, - conversation, - isFullDelete: Boolean(item.isFullDelete), - mostRecentMessages, - timestamp, - }; + if (!conversation) { + log.warn( + `${logId}/handleDeleteForMeSync/conversationDeletes: No target conversation` + ); + return undefined; + } + if (!mostRecentMessages?.length) { + log.warn( + `${logId}/handleDeleteForMeSync/conversationDeletes: No target messages` + ); + return undefined; } - return undefined; + return { + type: 'delete-conversation' as const, + conversation, + isFullDelete: Boolean(item.isFullDelete), + mostRecentMessages, + timestamp, + }; }) .filter(isNotNil); @@ -3716,15 +3744,18 @@ export default class MessageReceiver ? processConversationToDelete(item.conversation, logId) : undefined; - if (conversation) { - return { - type: 'delete-local-conversation' as const, - conversation, - timestamp, - }; + if (!conversation) { + log.warn( + `${logId}/handleDeleteForMeSync/localOnlyConversationDeletes: No target conversation` + ); + return undefined; } - return undefined; + return { + type: 'delete-local-conversation' as const, + conversation, + timestamp, + }; }) .filter(isNotNil); @@ -3969,15 +4000,32 @@ function processMessageToDelete( return undefined; } - if (target.authorAci) { - return { - type: 'aci' as const, - authorAci: normalizeAci( - target.authorAci, - `${logId}/processMessageToDelete` - ), - sentAt, - }; + const { authorServiceId } = target; + if (authorServiceId) { + if (isAciString(authorServiceId)) { + return { + type: 'aci' as const, + authorAci: normalizeAci( + authorServiceId, + `${logId}/processMessageToDelete/aci` + ), + sentAt, + }; + } + if (isPniString(authorServiceId)) { + return { + type: 'pni' as const, + authorPni: normalizePni( + authorServiceId, + `${logId}/processMessageToDelete/pni` + ), + sentAt, + }; + } + log.error( + `${logId}/processMessageToDelete: invalid authorServiceId, Dropping AddressableMessage.` + ); + return undefined; } if (target.authorE164) { return { @@ -3997,13 +4045,25 @@ function processConversationToDelete( target: Proto.SyncMessage.DeleteForMe.IConversationIdentifier, logId: string ): ConversationToDelete | undefined { - const { threadAci, threadGroupId, threadE164 } = target; + const { threadServiceId, threadGroupId, threadE164 } = target; - if (threadAci) { - return { - type: 'aci' as const, - aci: normalizeAci(threadAci, `${logId}/threadAci`), - }; + if (threadServiceId) { + if (isAciString(threadServiceId)) { + return { + type: 'aci' as const, + aci: normalizeAci(threadServiceId, `${logId}/aci`), + }; + } + if (isPniString(threadServiceId)) { + return { + type: 'pni' as const, + pni: normalizePni(threadServiceId, `${logId}/pni`), + }; + } + log.error( + `${logId}/processConversationToDelete: Invalid threadServiceId, dropping ConversationIdentifier.` + ); + return undefined; } if (threadGroupId) { return { diff --git a/ts/textsecure/SendMessage.ts b/ts/textsecure/SendMessage.ts index c914986942f..9e952f60509 100644 --- a/ts/textsecure/SendMessage.ts +++ b/ts/textsecure/SendMessage.ts @@ -89,6 +89,14 @@ import type { MessageToDelete, } from './messageReceiverEvents'; import { getConversationFromTarget } from '../util/deleteForMe'; +import type { CallDetails } from '../types/CallDisposition'; +import { + AdhocCallStatus, + DirectCallStatus, + GroupCallStatus, +} from '../types/CallDisposition'; +import { getProtoForCallHistory } from '../util/callDisposition'; +import { CallMode } from '../types/Calling'; export type SendMetadataType = { [serviceId: ServiceIdString]: { @@ -1567,6 +1575,71 @@ export default class MessageSender { }; } + static getClearCallHistoryMessage(timestamp: number): SingleProtoJobData { + const ourAci = window.textsecure.storage.user.getCheckedAci(); + const callLogEvent = new Proto.SyncMessage.CallLogEvent({ + type: Proto.SyncMessage.CallLogEvent.Type.CLEAR, + timestamp: Long.fromNumber(timestamp), + }); + + const syncMessage = MessageSender.createSyncMessage(); + syncMessage.callLogEvent = callLogEvent; + + const contentMessage = new Proto.Content(); + contentMessage.syncMessage = syncMessage; + + const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; + + return { + contentHint: ContentHint.RESENDABLE, + serviceId: ourAci, + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'callLogEventSync', + urgent: false, + }; + } + + static getDeleteCallEvent(callDetails: CallDetails): SingleProtoJobData { + const ourAci = window.textsecure.storage.user.getCheckedAci(); + const { mode } = callDetails; + let status; + if (mode === CallMode.Adhoc) { + status = AdhocCallStatus.Deleted; + } else if (mode === CallMode.Direct) { + status = DirectCallStatus.Deleted; + } else if (mode === CallMode.Group) { + status = GroupCallStatus.Deleted; + } else { + throw missingCaseError(mode); + } + const callEvent = getProtoForCallHistory({ + ...callDetails, + status, + }); + + const syncMessage = MessageSender.createSyncMessage(); + syncMessage.callEvent = callEvent; + + const contentMessage = new Proto.Content(); + contentMessage.syncMessage = syncMessage; + + const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; + + return { + contentHint: ContentHint.RESENDABLE, + serviceId: ourAci, + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'callLogEventSync', + urgent: false, + }; + } + async syncReadMessages( reads: ReadonlyArray<{ senderAci?: AciString; @@ -2353,9 +2426,11 @@ function toAddressableMessage(message: MessageToDelete) { targetMessage.sentTimestamp = Long.fromNumber(message.sentAt); if (message.type === 'aci') { - targetMessage.authorAci = message.authorAci; + targetMessage.authorServiceId = message.authorAci; } else if (message.type === 'e164') { targetMessage.authorE164 = message.authorE164; + } else if (message.type === 'pni') { + targetMessage.authorServiceId = message.authorPni; } else { throw missingCaseError(message); } @@ -2368,7 +2443,9 @@ function toConversationIdentifier(conversation: ConversationToDelete) { new Proto.SyncMessage.DeleteForMe.ConversationIdentifier(); if (conversation.type === 'aci') { - targetConversation.threadAci = conversation.aci; + targetConversation.threadServiceId = conversation.aci; + } else if (conversation.type === 'pni') { + targetConversation.threadServiceId = conversation.pni; } else if (conversation.type === 'group') { targetConversation.threadGroupId = Bytes.fromBase64(conversation.groupId); } else if (conversation.type === 'e164') { diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index 5f19a6eb786..28fc650f6e8 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -703,8 +703,12 @@ export type WebAPIConnectType = { connect: (options: WebAPIConnectOptionsType) => WebAPIType; }; -export type CapabilitiesType = Record; -export type CapabilitiesUploadType = Record; +export type CapabilitiesType = { + deleteSync: boolean; +}; +export type CapabilitiesUploadType = { + deleteSync: true; +}; type StickerPackManifestType = Uint8Array; diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index b7b6ff792ac..33e5976b354 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -6,7 +6,11 @@ import type { PublicKey } from '@signalapp/libsignal-client'; import { z } from 'zod'; import type { SignalService as Proto } from '../protobuf'; -import type { ServiceIdString, AciString } from '../types/ServiceId'; +import { + type ServiceIdString, + type AciString, + isPniString, +} from '../types/ServiceId'; import type { StoryDistributionIdString } from '../types/StoryDistributionId'; import type { ProcessedEnvelope, @@ -93,7 +97,6 @@ export class EnvelopeUnsealedEvent extends Event { } } -// Emitted when we queue previously-decrypted events from the cache export class EnvelopeQueuedEvent extends Event { constructor(public readonly envelope: ProcessedEnvelope) { super('envelopeQueued'); @@ -113,9 +116,7 @@ export class ConfirmableEvent extends Event { } export type DeliveryEventData = Readonly<{ - envelopeId: string; timestamp: number; - envelopeTimestamp: number; source?: string; sourceServiceId?: ServiceIdString; sourceDevice?: number; @@ -124,7 +125,9 @@ export type DeliveryEventData = Readonly<{ export class DeliveryEvent extends ConfirmableEvent { constructor( - public readonly deliveryReceipt: DeliveryEventData, + public readonly deliveryReceipts: ReadonlyArray, + public readonly envelopeId: string, + public readonly envelopeTimestamp: number, confirm: ConfirmCallback ) { super('delivery', confirm); @@ -245,9 +248,7 @@ export class MessageEvent extends ConfirmableEvent { } export type ReadOrViewEventData = Readonly<{ - envelopeId: string; timestamp: number; - envelopeTimestamp: number; source?: string; sourceServiceId?: ServiceIdString; sourceDevice?: number; @@ -256,7 +257,9 @@ export type ReadOrViewEventData = Readonly<{ export class ReadEvent extends ConfirmableEvent { constructor( - public readonly receipt: ReadOrViewEventData, + public readonly receipts: ReadonlyArray, + public readonly envelopeId: string, + public readonly envelopeTimestamp: number, confirm: ConfirmCallback ) { super('read', confirm); @@ -265,7 +268,9 @@ export class ReadEvent extends ConfirmableEvent { export class ViewEvent extends ConfirmableEvent { constructor( - public readonly receipt: ReadOrViewEventData, + public readonly receipts: ReadonlyArray, + public readonly envelopeId: string, + public readonly envelopeTimestamp: number, confirm: ConfirmCallback ) { super('view', confirm); @@ -405,7 +410,9 @@ export type ReadSyncEventData = Readonly<{ export class ReadSyncEvent extends ConfirmableEvent { constructor( - public readonly read: ReadSyncEventData, + public readonly reads: ReadonlyArray, + public readonly envelopeId: string, + public readonly envelopeTimestamp: number, confirm: ConfirmCallback ) { super('readSync', confirm); @@ -413,16 +420,16 @@ export class ReadSyncEvent extends ConfirmableEvent { } export type ViewSyncEventData = Readonly<{ - envelopeId: string; timestamp?: number; - envelopeTimestamp: number; senderE164?: string; senderAci?: AciString; }>; export class ViewSyncEvent extends ConfirmableEvent { constructor( - public readonly view: ViewSyncEventData, + public readonly views: ReadonlyArray, + public readonly envelopeId: string, + public readonly envelopeTimestamp: number, confirm: ConfirmCallback ) { super('viewSync', confirm); @@ -470,15 +477,16 @@ const messageToDeleteSchema = z.union([ authorE164: z.string(), sentAt: z.number(), }), + z.object({ + type: z.literal('pni').readonly(), + authorPni: z.string().refine(isPniString), + sentAt: z.number(), + }), ]); export type MessageToDelete = z.infer; const conversationToDeleteSchema = z.union([ - z.object({ - type: z.literal('group').readonly(), - groupId: z.string(), - }), z.object({ type: z.literal('aci').readonly(), aci: z.string().refine(isAciString), @@ -487,6 +495,14 @@ const conversationToDeleteSchema = z.union([ type: z.literal('e164').readonly(), e164: z.string(), }), + z.object({ + type: z.literal('group').readonly(), + groupId: z.string(), + }), + z.object({ + type: z.literal('pni').readonly(), + pni: z.string().refine(isPniString), + }), ]); export type ConversationToDelete = z.infer; diff --git a/ts/util/callDisposition.ts b/ts/util/callDisposition.ts index 75ca52f872e..30ba96394e4 100644 --- a/ts/util/callDisposition.ts +++ b/ts/util/callDisposition.ts @@ -280,7 +280,7 @@ function shouldSyncStatus(callStatus: CallStatus) { return statusToProto[callStatus] != null; } -function getProtoForCallHistory( +export function getProtoForCallHistory( callHistory: CallHistoryDetails ): Proto.SyncMessage.ICallEvent | null { const event = statusToProto[callHistory.status]; @@ -1026,7 +1026,10 @@ async function saveCallHistory({ if (isDeleted) { if (prevMessage != null) { - await window.Signal.Data.removeMessage(prevMessage.id); + await window.Signal.Data.removeMessage(prevMessage.id, { + fromSync: true, + singleProtoJobQueue, + }); } return callHistory; } @@ -1209,32 +1212,10 @@ export async function clearCallHistoryDataAndSync(): Promise { window.MessageCache.__DEPRECATED$unregister(messageId); }); - const ourAci = window.textsecure.storage.user.getCheckedAci(); - - const callLogEvent = new Proto.SyncMessage.CallLogEvent({ - type: Proto.SyncMessage.CallLogEvent.Type.CLEAR, - timestamp: Long.fromNumber(timestamp), - }); - - const syncMessage = MessageSender.createSyncMessage(); - syncMessage.callLogEvent = callLogEvent; - - const contentMessage = new Proto.Content(); - contentMessage.syncMessage = syncMessage; - - const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - log.info('clearCallHistory: Queueing sync message'); - await singleProtoJobQueue.add({ - contentHint: ContentHint.RESENDABLE, - serviceId: ourAci, - isSyncMessage: true, - protoBase64: Bytes.toBase64( - Proto.Content.encode(contentMessage).finish() - ), - type: 'callLogEventSync', - urgent: false, - }); + await singleProtoJobQueue.add( + MessageSender.getClearCallHistoryMessage(timestamp) + ); } catch (error) { log.error('clearCallHistory: Failed to clear call history', error); } diff --git a/ts/util/cleanup.ts b/ts/util/cleanup.ts index 0fb5cf09fab..11dac01c894 100644 --- a/ts/util/cleanup.ts +++ b/ts/util/cleanup.ts @@ -1,17 +1,67 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import PQueue from 'p-queue'; +import { batch } from 'react-redux'; + import type { MessageAttributesType } from '../model-types.d'; import { deletePackReference } from '../types/Stickers'; import { isStory } from '../messages/helpers'; import { isDirectConversation } from './whatTypeOfConversation'; import * as log from '../logging/log'; +import { getCallHistorySelector } from '../state/selectors/callHistory'; +import { + DirectCallStatus, + GroupCallStatus, + AdhocCallStatus, +} from '../types/CallDisposition'; +import { getMessageIdForLogging } from './idForLogging'; +import type { SingleProtoJobQueue } from '../jobs/singleProtoJobQueue'; +import { MINUTE } from './durations'; +import { drop } from './drop'; -export async function cleanupMessage( - message: MessageAttributesType +export async function cleanupMessages( + messages: ReadonlyArray, + { + fromSync, + markCallHistoryDeleted, + singleProtoJobQueue, + }: { + fromSync?: boolean; + markCallHistoryDeleted: (callId: string) => Promise; + singleProtoJobQueue: SingleProtoJobQueue; + } ): Promise { - cleanupMessageFromMemory(message); - await deleteMessageData(message); + // First, handle any calls that need to be deleted + const inMemoryQueue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 }); + drop( + inMemoryQueue.addAll( + messages.map((message: MessageAttributesType) => async () => { + await maybeDeleteCall(message, { + fromSync, + markCallHistoryDeleted, + singleProtoJobQueue, + }); + }) + ) + ); + await inMemoryQueue.onIdle(); + + // Then, remove messages from memory, so we can batch the updates in redux + batch(() => { + messages.forEach(message => cleanupMessageFromMemory(message)); + }); + + // Then, handle any asynchronous actions (e.g. deleting data from disk) + const unloadedQueue = new PQueue({ concurrency: 3, timeout: MINUTE * 30 }); + drop( + unloadedQueue.addAll( + messages.map((message: MessageAttributesType) => async () => { + await deleteMessageData(message); + }) + ) + ); + await unloadedQueue.onIdle(); } /** Removes a message from redux caches & backbone, but does NOT delete files on disk, @@ -122,3 +172,50 @@ export async function deleteMessageData( await deletePackReference(message.id, packId); } } + +export async function maybeDeleteCall( + message: MessageAttributesType, + { + fromSync, + markCallHistoryDeleted, + singleProtoJobQueue, + }: { + fromSync?: boolean; + markCallHistoryDeleted: (callId: string) => Promise; + singleProtoJobQueue: SingleProtoJobQueue; + } +): Promise { + const { callId } = message; + const logId = `maybeDeleteCall(${getMessageIdForLogging(message)})`; + if (!callId) { + return; + } + + const callHistory = getCallHistorySelector(window.reduxStore.getState())( + callId + ); + if (!callHistory) { + return; + } + + if ( + callHistory.status === DirectCallStatus.Pending || + callHistory.status === GroupCallStatus.Joined || + callHistory.status === GroupCallStatus.OutgoingRing || + callHistory.status === GroupCallStatus.Ringing || + callHistory.status === AdhocCallStatus.Pending + ) { + log.warn( + `${logId}: Call status is ${callHistory.status}; not deleting from Call Tab` + ); + return; + } + + if (!fromSync) { + await singleProtoJobQueue.add( + window.textsecure.MessageSender.getDeleteCallEvent(callHistory) + ); + } + await markCallHistoryDeleted(callId); + window.reduxActions.callHistory.removeCallHistory(callId); +} diff --git a/ts/util/deleteForMe.ts b/ts/util/deleteForMe.ts index 65c78d4f3ea..27ba5a315f3 100644 --- a/ts/util/deleteForMe.ts +++ b/ts/util/deleteForMe.ts @@ -24,7 +24,9 @@ import type { ConversationToDelete, MessageToDelete, } from '../textsecure/messageReceiverEvents'; -import type { AciString } from '../types/ServiceId'; +import { isPniString } from '../types/ServiceId'; +import type { AciString, PniString } from '../types/ServiceId'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; const { getMessagesBySentAt, @@ -48,12 +50,16 @@ export function doesMessageMatch({ const conversationMatches = message.conversationId === conversationId; const aciMatches = query.authorAci && author?.attributes.serviceId === query.authorAci; + const pniMatches = + query.authorPni && author?.attributes.serviceId === query.authorPni; const e164Matches = query.authorE164 && author?.attributes.e164 === query.authorE164; const timestampMatches = sentTimestamps.has(query.sentAt); return Boolean( - conversationMatches && timestampMatches && (aciMatches || e164Matches) + conversationMatches && + timestampMatches && + (aciMatches || e164Matches || pniMatches) ); } @@ -91,7 +97,10 @@ export async function deleteMessage( return false; } - await deleteAndCleanup([found], logId); + await deleteAndCleanup([found], logId, { + fromSync: true, + singleProtoJobQueue, + }); return true; } @@ -113,8 +122,10 @@ export async function deleteConversation( const { received_at: receivedAt } = newestMessage; await removeMessagesInConversation(conversation.id, { + fromSync: true, receivedAt, logId: `${logId}(receivedAt=${receivedAt})`, + singleProtoJobQueue, }); } @@ -170,6 +181,9 @@ export function getConversationFromTarget( if (type === 'e164') { return window.ConversationController.get(targetConversation.e164); } + if (type === 'pni') { + return window.ConversationController.get(targetConversation.pni); + } throw missingCaseError(type); } @@ -178,6 +192,7 @@ type MessageQuery = { sentAt: number; authorAci?: AciString; authorE164?: string; + authorPni?: PniString; }; export function getMessageQueryFromTarget( @@ -191,6 +206,13 @@ export function getMessageQueryFromTarget( } return { sentAt, authorAci: targetMessage.authorAci }; } + if (type === 'pni') { + if (!isPniString(targetMessage.authorPni)) { + throw new Error('Provided authorPni was not a PNI!'); + } + return { sentAt, authorPni: targetMessage.authorPni }; + } + if (type === 'e164') { return { sentAt, authorE164: targetMessage.authorE164 }; } diff --git a/ts/util/findAndDeleteOnboardingStoryIfExists.ts b/ts/util/findAndDeleteOnboardingStoryIfExists.ts index 79f621544ce..4a1faacc3fa 100644 --- a/ts/util/findAndDeleteOnboardingStoryIfExists.ts +++ b/ts/util/findAndDeleteOnboardingStoryIfExists.ts @@ -4,6 +4,7 @@ import * as log from '../logging/log'; import { calculateExpirationTimestamp } from './expirationTimer'; import { DAY } from './durations'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; export async function findAndDeleteOnboardingStoryIfExists(): Promise { const existingOnboardingStoryMessageIds = window.storage.get( @@ -43,7 +44,9 @@ export async function findAndDeleteOnboardingStoryIfExists(): Promise { log.info('findAndDeleteOnboardingStoryIfExists: removing onboarding stories'); - await window.Signal.Data.removeMessages(existingOnboardingStoryMessageIds); + await window.Signal.Data.removeMessages(existingOnboardingStoryMessageIds, { + singleProtoJobQueue, + }); await window.storage.put('existingOnboardingStoryMessageIds', undefined); diff --git a/ts/util/getConversation.ts b/ts/util/getConversation.ts index d1e40edab9b..5fafe941868 100644 --- a/ts/util/getConversation.ts +++ b/ts/util/getConversation.ts @@ -217,6 +217,7 @@ export function getConversation(model: ConversationModel): ConversationType { profileName: getProfileName(attributes), profileSharing: attributes.profileSharing, profileLastUpdatedAt: attributes.profileLastUpdatedAt, + capabilities: attributes.capabilities, sharingPhoneNumber: attributes.sharingPhoneNumber, publicParams: attributes.publicParams, secretParams: attributes.secretParams, diff --git a/ts/util/markConversationRead.ts b/ts/util/markConversationRead.ts index 60deb343d6d..1fea7480d20 100644 --- a/ts/util/markConversationRead.ts +++ b/ts/util/markConversationRead.ts @@ -7,7 +7,7 @@ import type { ConversationAttributesType } from '../model-types.d'; import { hasErrors } from '../state/selectors/message'; import { readSyncJobQueue } from '../jobs/readSyncJobQueue'; import { notificationService } from '../services/notifications'; -import { expiringMessagesDeletionService } from '../services/expiringMessagesDeletion'; +import { update as updateExpiringMessagesService } from '../services/expiringMessagesDeletion'; import { tapToViewMessagesDeletionService } from '../services/tapToViewMessagesDeletionService'; import { isGroup, isDirectConversation } from './whatTypeOfConversation'; import * as log from '../logging/log'; @@ -196,7 +196,7 @@ export async function markConversationRead( } } - void expiringMessagesDeletionService.update(); + void updateExpiringMessagesService(); void tapToViewMessagesDeletionService.update(); return true; diff --git a/ts/util/modifyTargetMessage.ts b/ts/util/modifyTargetMessage.ts index f2f63b4fca4..3f276ce6e40 100644 --- a/ts/util/modifyTargetMessage.ts +++ b/ts/util/modifyTargetMessage.ts @@ -29,6 +29,7 @@ import { getSourceServiceId } from '../messages/helpers'; import { missingCaseError } from './missingCaseError'; import { reduce } from './iterables'; import { strictAssert } from './assert'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; export enum ModifyTargetMessageResult { Modified = 'Modified', @@ -55,24 +56,28 @@ export async function modifyTargetMessage( const syncDeletes = await DeletesForMe.forMessage(message.attributes); if (syncDeletes.length) { if (!isFirstRun) { - await window.Signal.Data.removeMessage(message.id); + await window.Signal.Data.removeMessage(message.id, { + fromSync: true, + singleProtoJobQueue, + }); } return ModifyTargetMessageResult.Deleted; } if (type === 'outgoing' || (type === 'story' && ourAci === sourceServiceId)) { - const sendActions = MessageReceipts.forMessage(message).map(receipt => { + const receipts = await MessageReceipts.forMessage(message); + const sendActions = receipts.map(({ receiptSync }) => { let sendActionType: SendActionType; - const receiptType = receipt.type; + const receiptType = receiptSync.type; switch (receiptType) { - case MessageReceipts.MessageReceiptType.Delivery: + case MessageReceipts.messageReceiptTypeSchema.enum.Delivery: sendActionType = SendActionType.GotDeliveryReceipt; break; - case MessageReceipts.MessageReceiptType.Read: + case MessageReceipts.messageReceiptTypeSchema.enum.Read: sendActionType = SendActionType.GotReadReceipt; break; - case MessageReceipts.MessageReceiptType.View: + case MessageReceipts.messageReceiptTypeSchema.enum.View: sendActionType = SendActionType.GotViewedReceipt; break; default: @@ -80,10 +85,10 @@ export async function modifyTargetMessage( } return { - destinationConversationId: receipt.sourceConversationId, + destinationConversationId: receiptSync.sourceConversationId, action: { type: sendActionType, - updatedAt: receipt.receiptTimestamp, + updatedAt: receiptSync.receiptTimestamp, }, }; }); @@ -123,10 +128,10 @@ export async function modifyTargetMessage( if (type === 'incoming') { // In a followup (see DESKTOP-2100), we want to make `ReadSyncs#forMessage` return // an array, not an object. This array wrapping makes that future a bit easier. - const readSync = ReadSyncs.forMessage(message); - const readSyncs = readSync ? [readSync] : []; + const maybeSingleReadSync = await ReadSyncs.forMessage(message); + const readSyncs = maybeSingleReadSync ? [maybeSingleReadSync] : []; - const viewSyncs = ViewSyncs.forMessage(message); + const viewSyncs = await ViewSyncs.forMessage(message); const isGroupStoryReply = isGroup(conversation.attributes) && message.get('storyId'); @@ -134,8 +139,8 @@ export async function modifyTargetMessage( if (readSyncs.length !== 0 || viewSyncs.length !== 0) { const markReadAt = Math.min( Date.now(), - ...readSyncs.map(sync => sync.readAt), - ...viewSyncs.map(sync => sync.viewedAt) + ...readSyncs.map(({ readSync }) => readSync.readAt), + ...viewSyncs.map(({ viewSync }) => viewSync.viewedAt) ); if (message.get('expireTimer')) { @@ -180,7 +185,7 @@ export async function modifyTargetMessage( if (!isFirstRun && message.getPendingMarkRead()) { const markReadAt = message.getPendingMarkRead(); message.setPendingMarkRead(undefined); - const newestSentAt = readSync?.timestamp; + const newestSentAt = maybeSingleReadSync?.readSync.timestamp; // This is primarily to allow the conversation to mark all older // messages as read, as is done when we receive a read sync for @@ -207,7 +212,7 @@ export async function modifyTargetMessage( } if (isStory(message.attributes)) { - const viewSyncs = ViewSyncs.forMessage(message); + const viewSyncs = await ViewSyncs.forMessage(message); if (viewSyncs.length !== 0) { message.set({ @@ -218,7 +223,7 @@ export async function modifyTargetMessage( const markReadAt = Math.min( Date.now(), - ...viewSyncs.map(sync => sync.viewedAt) + ...viewSyncs.map(({ viewSync }) => viewSync.viewedAt) ); message.setPendingMarkRead( Math.min(message.getPendingMarkRead() ?? Date.now(), markReadAt) diff --git a/ts/util/syncTasks.ts b/ts/util/syncTasks.ts index ce883a4e3c6..0a0b16f7290 100644 --- a/ts/util/syncTasks.ts +++ b/ts/util/syncTasks.ts @@ -4,6 +4,7 @@ import { z } from 'zod'; import type { ZodSchema } from 'zod'; +import { drop } from './drop'; import * as log from '../logging/log'; import * as DeletesForMe from '../messageModifiers/DeletesForMe'; import { @@ -11,18 +12,31 @@ import { deleteConversationSchema, deleteLocalConversationSchema, } from '../textsecure/messageReceiverEvents'; - +import { + receiptSyncTaskSchema, + onReceipt, +} from '../messageModifiers/MessageReceipts'; import { deleteConversation, deleteLocalOnlyConversation, getConversationFromTarget, } from './deleteForMe'; -import { drop } from './drop'; +import { + onSync as onReadSync, + readSyncTaskSchema, +} from '../messageModifiers/ReadSyncs'; +import { + onSync as onViewSync, + viewSyncTaskSchema, +} from '../messageModifiers/ViewSyncs'; const syncTaskDataSchema = z.union([ deleteMessageSchema, deleteConversationSchema, deleteLocalConversationSchema, + receiptSyncTaskSchema, + readSyncTaskSchema, + viewSyncTaskSchema, ]); export type SyncTaskData = z.infer; @@ -40,6 +54,11 @@ const SCHEMAS_BY_TYPE: Record = { 'delete-message': deleteMessageSchema, 'delete-conversation': deleteConversationSchema, 'delete-local-conversation': deleteLocalConversationSchema, + Delivery: receiptSyncTaskSchema, + Read: receiptSyncTaskSchema, + View: receiptSyncTaskSchema, + ReadSync: readSyncTaskSchema, + ViewSync: viewSyncTaskSchema, }; function toLogId(task: SyncTaskType) { @@ -77,14 +96,15 @@ export async function queueSyncTasks( const { data: parsed } = parseResult; if (parsed.type === 'delete-message') { - // eslint-disable-next-line no-await-in-loop - await DeletesForMe.onDelete({ - conversation: parsed.conversation, - envelopeId, - message: parsed.message, - syncTaskId: id, - timestamp: sentAt, - }); + drop( + DeletesForMe.onDelete({ + conversation: parsed.conversation, + envelopeId, + message: parsed.message, + syncTaskId: id, + timestamp: sentAt, + }) + ); } else if (parsed.type === 'delete-conversation') { const { conversation: targetConversation, @@ -133,6 +153,39 @@ export async function queueSyncTasks( log.info(`${logId}: Done; result=${result}`); }) ); + } else if ( + parsed.type === 'Delivery' || + parsed.type === 'Read' || + parsed.type === 'View' + ) { + drop( + onReceipt({ + envelopeId, + receiptSync: parsed, + syncTaskId: id, + }) + ); + } else if (parsed.type === 'ReadSync') { + drop( + onReadSync({ + envelopeId, + readSync: parsed, + syncTaskId: id, + }) + ); + } else if (parsed.type === 'ViewSync') { + drop( + onViewSync({ + envelopeId, + viewSync: parsed, + syncTaskId: id, + }) + ); + } else { + const parsedType: never = parsed.type; + log.error(`${logId}: Encountered job of type ${parsedType}, removing`); + // eslint-disable-next-line no-await-in-loop + await removeSyncTaskById(id); } } }