diff --git a/ts/messages/copyQuote.ts b/ts/messages/copyQuote.ts index 2309f342b8..ccc7d357d0 100644 --- a/ts/messages/copyQuote.ts +++ b/ts/messages/copyQuote.ts @@ -5,7 +5,10 @@ import { omit } from 'lodash'; import * as log from '../logging/log'; import type { QuotedMessageType } from '../model-types'; -import type { MessageAttributesType } from '../model-types.d'; +import type { + MessageAttributesType, + ReadonlyMessageAttributesType, +} from '../model-types.d'; import { SignalService } from '../protobuf'; import { isGiftBadge, isTapToView } from '../state/selectors/message'; import type { ProcessedQuote } from '../textsecure/Types'; @@ -16,10 +19,27 @@ import { isQuoteAMatch, messageHasPaymentEvent } from './helpers'; import * as Errors from '../types/errors'; import { isDownloadable } from '../types/Attachment'; +export type MinimalMessageCache = Readonly<{ + findBySentAt( + sentAt: number, + predicate: (attributes: ReadonlyMessageAttributesType) => boolean + ): Promise; + upgradeSchema( + attributes: MessageAttributesType, + minSchemaVersion: number + ): Promise; +}>; + +export type CopyQuoteOptionsType = Readonly<{ + messageCache?: MinimalMessageCache; +}>; + export const copyFromQuotedMessage = async ( quote: ProcessedQuote, - conversationId: string + conversationId: string, + options: CopyQuoteOptionsType = {} ): Promise => { + const { messageCache = window.MessageCache } = options; const { id } = quote; strictAssert(id, 'Quote must have an id'); @@ -38,7 +58,7 @@ export const copyFromQuotedMessage = async ( messageId: '', }; - const queryMessage = await window.MessageCache.findBySentAt(id, attributes => + const queryMessage = await messageCache.findBySentAt(id, attributes => isQuoteAMatch(attributes, conversationId, result) ); @@ -48,7 +68,7 @@ export const copyFromQuotedMessage = async ( } if (queryMessage) { - await copyQuoteContentFromOriginal(queryMessage, result); + await copyQuoteContentFromOriginal(queryMessage, result, options); } return result; @@ -56,7 +76,8 @@ export const copyFromQuotedMessage = async ( export const copyQuoteContentFromOriginal = async ( providedOriginalMessage: MessageAttributesType, - quote: QuotedMessageType + quote: QuotedMessageType, + { messageCache = window.MessageCache }: CopyQuoteOptionsType = {} ): Promise => { let originalMessage = providedOriginalMessage; @@ -114,7 +135,7 @@ export const copyQuoteContentFromOriginal = async ( } try { - originalMessage = await window.MessageCache.upgradeSchema( + originalMessage = await messageCache.upgradeSchema( originalMessage, window.Signal.Types.Message.VERSION_NEEDED_FOR_DISPLAY ); diff --git a/ts/messages/helpers.ts b/ts/messages/helpers.ts index 4322507354..44df2ba88b 100644 --- a/ts/messages/helpers.ts +++ b/ts/messages/helpers.ts @@ -124,11 +124,6 @@ export function isQuoteAMatch( } const { authorAci, id } = quote; - const authorConversation = window.ConversationController.lookupOrCreate({ - e164: 'author' in quote ? quote.author : undefined, - serviceId: authorAci, - reason: 'helpers.isQuoteAMatch', - }); const isSameTimestamp = message.sent_at === id || @@ -138,7 +133,7 @@ export function isQuoteAMatch( return ( isSameTimestamp && message.conversationId === conversationId && - getAuthorId(message) === authorConversation?.id + getSourceServiceId(message) === authorAci ); } diff --git a/ts/models/messages.ts b/ts/models/messages.ts index b7e3836539..beb5465496 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -1714,6 +1714,14 @@ export class MessageModel extends window.Backbone.Model { `${storyContext.authorAci})`; } + // Ensure that quote author's conversation exist + if (initialMessage.quote) { + window.ConversationController.lookupOrCreate({ + serviceId: initialMessage.quote.authorAci, + reason: 'handleDataMessage.quote.author', + }); + } + const [quote, storyQuotes] = await Promise.all([ initialMessage.quote ? copyFromQuotedMessage(initialMessage.quote, conversation.id) diff --git a/ts/services/backups/import.ts b/ts/services/backups/import.ts index 11480c9dec..d51c3d50c0 100644 --- a/ts/services/backups/import.ts +++ b/ts/services/backups/import.ts @@ -84,6 +84,7 @@ import { convertBackupMessageAttachmentToAttachment, convertFilePointerToAttachment, } from './util/filePointers'; +import { CircularMessageCache } from './util/CircularMessageCache'; import { filterAndClean } from '../../types/BodyRange'; import { APPLICATION_OCTET_STREAM, stringToMIMEType } from '../../types/MIME'; import { copyFromQuotedMessage } from '../../messages/copyQuote'; @@ -107,6 +108,9 @@ import { getParametersForRedux, loadAll } from '../allLoaders'; const MAX_CONCURRENCY = 10; +// Keep 1000 recent messages in memory to speed up quote lookup. +const RECENT_MESSAGES_CACHE_SIZE = 1000; + type ConversationOpType = Readonly<{ isUpdate: boolean; attributes: ConversationAttributesType; @@ -153,8 +157,6 @@ async function processMessagesBatch( id: ids[index], }; - window.MessageCache.__DEPRECATED$unregister(attributes.id); - const { editHistory } = attributes; if (editHistory?.length) { @@ -286,6 +288,10 @@ export class BackupImportStream extends Writable { private releaseNotesRecipientId: Long | undefined; private releaseNotesChatId: Long | undefined; private pendingGroupAvatars = new Map(); + private recentMessages = new CircularMessageCache({ + size: RECENT_MESSAGES_CACHE_SIZE, + flush: () => this.saveMessageBatcher.flushAndWait(), + }); private constructor() { super({ objectMode: true }); @@ -491,28 +497,15 @@ export class BackupImportStream extends Writable { } private saveConversation(attributes: ConversationAttributesType): void { - // add the conversation into memory without saving it to DB (that will happen in - // batcher); if we didn't do this, when we register messages to MessageCache, it would - // automatically create (and save to DB) a duplicate conversation which would have to - // be later merged - window.ConversationController.dangerouslyCreateAndAdd(attributes); this.conversationOpBatcher.add({ isUpdate: false, attributes }); } private updateConversation(attributes: ConversationAttributesType): void { - const existing = window.ConversationController.get(attributes.id); - if (existing) { - existing.set(attributes); - } this.conversationOpBatcher.add({ isUpdate: true, attributes }); } private saveMessage(attributes: MessageAttributesType): void { - window.MessageCache.__DEPRECATED$register( - attributes.id, - attributes, - 'import.saveMessage' - ); + this.recentMessages.push(attributes); this.saveMessageBatcher.add(attributes); } @@ -1593,7 +1586,10 @@ export class BackupImportStream extends Writable { }) ?? [], type: this.convertQuoteType(quote.type), }, - conversationId + conversationId, + { + messageCache: this.recentMessages, + } ); } diff --git a/ts/services/backups/util/CircularMessageCache.ts b/ts/services/backups/util/CircularMessageCache.ts new file mode 100644 index 0000000000..7bff2f77e4 --- /dev/null +++ b/ts/services/backups/util/CircularMessageCache.ts @@ -0,0 +1,80 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { + ReadonlyMessageAttributesType, + MessageAttributesType, +} from '../../../model-types.d'; +import { find } from '../../../util/iterables'; +import { DataReader } from '../../../sql/Client'; + +export type CircularMessageCacheOptionsType = Readonly<{ + size: number; + flush: () => Promise; +}>; + +export class CircularMessageCache { + private readonly flush: () => Promise; + private readonly buffer: Array; + private readonly sentAtToMessages = new Map< + number, + Set + >(); + private offset = 0; + + constructor({ size, flush }: CircularMessageCacheOptionsType) { + this.flush = flush; + this.buffer = new Array(size); + } + + public push(attributes: MessageAttributesType): void { + const stale = this.buffer[this.offset]; + this.buffer[this.offset] = attributes; + this.offset = (this.offset + 1) % this.buffer.length; + + let addedSet = this.sentAtToMessages.get(attributes.sent_at); + if (addedSet === undefined) { + addedSet = new Set(); + this.sentAtToMessages.set(attributes.sent_at, addedSet); + } + addedSet.add(attributes); + + if (stale === undefined) { + return; + } + + const staleSet = this.sentAtToMessages.get(stale.sent_at); + if (staleSet === undefined) { + return; + } + staleSet.delete(stale); + if (staleSet.size === 0) { + this.sentAtToMessages.delete(stale.sent_at); + } + } + + public async findBySentAt( + sentAt: number, + predicate: (attributes: ReadonlyMessageAttributesType) => boolean + ): Promise { + const set = this.sentAtToMessages.get(sentAt); + if (set !== undefined) { + const cached = find(set.values(), predicate); + if (cached != null) { + return cached; + } + } + + await this.flush(); + + const onDisk = await DataReader.getMessagesBySentAt(sentAt); + return onDisk.find(predicate); + } + + // Just a stub to conform with the interface + public async upgradeSchema( + attributes: MessageAttributesType + ): Promise { + return attributes; + } +} diff --git a/ts/test-electron/backup/CircularMessageCache_test.ts b/ts/test-electron/backup/CircularMessageCache_test.ts new file mode 100644 index 0000000000..c151074741 --- /dev/null +++ b/ts/test-electron/backup/CircularMessageCache_test.ts @@ -0,0 +1,74 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import * as sinon from 'sinon'; + +import { generateAci } from '../../types/ServiceId'; +import { type MessageAttributesType } from '../../model-types.d'; +import { CircularMessageCache } from '../../services/backups/util/CircularMessageCache'; +import { DataWriter } from '../../sql/Client'; + +const OUR_ACI = generateAci(); + +function createMessage(sentAt: number): MessageAttributesType { + return { + sent_at: sentAt, + received_at: sentAt, + timestamp: sentAt, + + id: 'abc', + type: 'incoming' as const, + conversationId: 'cid', + }; +} + +describe('backup/attachments', () => { + let messageCache: CircularMessageCache; + let flush: sinon.SinonStub; + + beforeEach(async () => { + await DataWriter.removeAll(); + flush = sinon.stub(); + messageCache = new CircularMessageCache({ + size: 2, + flush, + }); + }); + + afterEach(async () => { + await DataWriter.removeAll(); + }); + + it('should return a cached message', async () => { + const message = createMessage(123); + messageCache.push(message); + + const found = await messageCache.findBySentAt(123, () => true); + sinon.assert.notCalled(flush); + assert.strictEqual(found, message); + }); + + it('should purge message from cache on overflow', async () => { + messageCache.push(createMessage(123)); + messageCache.push(createMessage(124)); + messageCache.push(createMessage(125)); + + const found = await messageCache.findBySentAt(123, () => true); + sinon.assert.calledOnce(flush); + assert.isUndefined(found); + }); + + it('should find message in the database', async () => { + const message = createMessage(123); + + await DataWriter.saveMessage(message, { + ourAci: OUR_ACI, + forceSave: true, + }); + + const found = await messageCache.findBySentAt(123, () => true); + sinon.assert.calledOnce(flush); + assert.deepStrictEqual(found, message); + }); +});