Don't create models in backups/import
This commit is contained in:
parent
bdbc63ccf0
commit
026e9ef853
6 changed files with 203 additions and 29 deletions
|
@ -5,7 +5,10 @@ import { omit } from 'lodash';
|
||||||
|
|
||||||
import * as log from '../logging/log';
|
import * as log from '../logging/log';
|
||||||
import type { QuotedMessageType } from '../model-types';
|
import type { QuotedMessageType } from '../model-types';
|
||||||
import type { MessageAttributesType } from '../model-types.d';
|
import type {
|
||||||
|
MessageAttributesType,
|
||||||
|
ReadonlyMessageAttributesType,
|
||||||
|
} from '../model-types.d';
|
||||||
import { SignalService } from '../protobuf';
|
import { SignalService } from '../protobuf';
|
||||||
import { isGiftBadge, isTapToView } from '../state/selectors/message';
|
import { isGiftBadge, isTapToView } from '../state/selectors/message';
|
||||||
import type { ProcessedQuote } from '../textsecure/Types';
|
import type { ProcessedQuote } from '../textsecure/Types';
|
||||||
|
@ -16,10 +19,27 @@ import { isQuoteAMatch, messageHasPaymentEvent } from './helpers';
|
||||||
import * as Errors from '../types/errors';
|
import * as Errors from '../types/errors';
|
||||||
import { isDownloadable } from '../types/Attachment';
|
import { isDownloadable } from '../types/Attachment';
|
||||||
|
|
||||||
|
export type MinimalMessageCache = Readonly<{
|
||||||
|
findBySentAt(
|
||||||
|
sentAt: number,
|
||||||
|
predicate: (attributes: ReadonlyMessageAttributesType) => boolean
|
||||||
|
): Promise<MessageAttributesType | undefined>;
|
||||||
|
upgradeSchema(
|
||||||
|
attributes: MessageAttributesType,
|
||||||
|
minSchemaVersion: number
|
||||||
|
): Promise<MessageAttributesType>;
|
||||||
|
}>;
|
||||||
|
|
||||||
|
export type CopyQuoteOptionsType = Readonly<{
|
||||||
|
messageCache?: MinimalMessageCache;
|
||||||
|
}>;
|
||||||
|
|
||||||
export const copyFromQuotedMessage = async (
|
export const copyFromQuotedMessage = async (
|
||||||
quote: ProcessedQuote,
|
quote: ProcessedQuote,
|
||||||
conversationId: string
|
conversationId: string,
|
||||||
|
options: CopyQuoteOptionsType = {}
|
||||||
): Promise<QuotedMessageType> => {
|
): Promise<QuotedMessageType> => {
|
||||||
|
const { messageCache = window.MessageCache } = options;
|
||||||
const { id } = quote;
|
const { id } = quote;
|
||||||
strictAssert(id, 'Quote must have an id');
|
strictAssert(id, 'Quote must have an id');
|
||||||
|
|
||||||
|
@ -38,7 +58,7 @@ export const copyFromQuotedMessage = async (
|
||||||
messageId: '',
|
messageId: '',
|
||||||
};
|
};
|
||||||
|
|
||||||
const queryMessage = await window.MessageCache.findBySentAt(id, attributes =>
|
const queryMessage = await messageCache.findBySentAt(id, attributes =>
|
||||||
isQuoteAMatch(attributes, conversationId, result)
|
isQuoteAMatch(attributes, conversationId, result)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -48,7 +68,7 @@ export const copyFromQuotedMessage = async (
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queryMessage) {
|
if (queryMessage) {
|
||||||
await copyQuoteContentFromOriginal(queryMessage, result);
|
await copyQuoteContentFromOriginal(queryMessage, result, options);
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -56,7 +76,8 @@ export const copyFromQuotedMessage = async (
|
||||||
|
|
||||||
export const copyQuoteContentFromOriginal = async (
|
export const copyQuoteContentFromOriginal = async (
|
||||||
providedOriginalMessage: MessageAttributesType,
|
providedOriginalMessage: MessageAttributesType,
|
||||||
quote: QuotedMessageType
|
quote: QuotedMessageType,
|
||||||
|
{ messageCache = window.MessageCache }: CopyQuoteOptionsType = {}
|
||||||
): Promise<void> => {
|
): Promise<void> => {
|
||||||
let originalMessage = providedOriginalMessage;
|
let originalMessage = providedOriginalMessage;
|
||||||
|
|
||||||
|
@ -114,7 +135,7 @@ export const copyQuoteContentFromOriginal = async (
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
originalMessage = await window.MessageCache.upgradeSchema(
|
originalMessage = await messageCache.upgradeSchema(
|
||||||
originalMessage,
|
originalMessage,
|
||||||
window.Signal.Types.Message.VERSION_NEEDED_FOR_DISPLAY
|
window.Signal.Types.Message.VERSION_NEEDED_FOR_DISPLAY
|
||||||
);
|
);
|
||||||
|
|
|
@ -124,11 +124,6 @@ export function isQuoteAMatch(
|
||||||
}
|
}
|
||||||
|
|
||||||
const { authorAci, id } = quote;
|
const { authorAci, id } = quote;
|
||||||
const authorConversation = window.ConversationController.lookupOrCreate({
|
|
||||||
e164: 'author' in quote ? quote.author : undefined,
|
|
||||||
serviceId: authorAci,
|
|
||||||
reason: 'helpers.isQuoteAMatch',
|
|
||||||
});
|
|
||||||
|
|
||||||
const isSameTimestamp =
|
const isSameTimestamp =
|
||||||
message.sent_at === id ||
|
message.sent_at === id ||
|
||||||
|
@ -138,7 +133,7 @@ export function isQuoteAMatch(
|
||||||
return (
|
return (
|
||||||
isSameTimestamp &&
|
isSameTimestamp &&
|
||||||
message.conversationId === conversationId &&
|
message.conversationId === conversationId &&
|
||||||
getAuthorId(message) === authorConversation?.id
|
getSourceServiceId(message) === authorAci
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1714,6 +1714,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
||||||
`${storyContext.authorAci})`;
|
`${storyContext.authorAci})`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that quote author's conversation exist
|
||||||
|
if (initialMessage.quote) {
|
||||||
|
window.ConversationController.lookupOrCreate({
|
||||||
|
serviceId: initialMessage.quote.authorAci,
|
||||||
|
reason: 'handleDataMessage.quote.author',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const [quote, storyQuotes] = await Promise.all([
|
const [quote, storyQuotes] = await Promise.all([
|
||||||
initialMessage.quote
|
initialMessage.quote
|
||||||
? copyFromQuotedMessage(initialMessage.quote, conversation.id)
|
? copyFromQuotedMessage(initialMessage.quote, conversation.id)
|
||||||
|
|
|
@ -84,6 +84,7 @@ import {
|
||||||
convertBackupMessageAttachmentToAttachment,
|
convertBackupMessageAttachmentToAttachment,
|
||||||
convertFilePointerToAttachment,
|
convertFilePointerToAttachment,
|
||||||
} from './util/filePointers';
|
} from './util/filePointers';
|
||||||
|
import { CircularMessageCache } from './util/CircularMessageCache';
|
||||||
import { filterAndClean } from '../../types/BodyRange';
|
import { filterAndClean } from '../../types/BodyRange';
|
||||||
import { APPLICATION_OCTET_STREAM, stringToMIMEType } from '../../types/MIME';
|
import { APPLICATION_OCTET_STREAM, stringToMIMEType } from '../../types/MIME';
|
||||||
import { copyFromQuotedMessage } from '../../messages/copyQuote';
|
import { copyFromQuotedMessage } from '../../messages/copyQuote';
|
||||||
|
@ -107,6 +108,9 @@ import { getParametersForRedux, loadAll } from '../allLoaders';
|
||||||
|
|
||||||
const MAX_CONCURRENCY = 10;
|
const MAX_CONCURRENCY = 10;
|
||||||
|
|
||||||
|
// Keep 1000 recent messages in memory to speed up quote lookup.
|
||||||
|
const RECENT_MESSAGES_CACHE_SIZE = 1000;
|
||||||
|
|
||||||
type ConversationOpType = Readonly<{
|
type ConversationOpType = Readonly<{
|
||||||
isUpdate: boolean;
|
isUpdate: boolean;
|
||||||
attributes: ConversationAttributesType;
|
attributes: ConversationAttributesType;
|
||||||
|
@ -153,8 +157,6 @@ async function processMessagesBatch(
|
||||||
id: ids[index],
|
id: ids[index],
|
||||||
};
|
};
|
||||||
|
|
||||||
window.MessageCache.__DEPRECATED$unregister(attributes.id);
|
|
||||||
|
|
||||||
const { editHistory } = attributes;
|
const { editHistory } = attributes;
|
||||||
|
|
||||||
if (editHistory?.length) {
|
if (editHistory?.length) {
|
||||||
|
@ -286,6 +288,10 @@ export class BackupImportStream extends Writable {
|
||||||
private releaseNotesRecipientId: Long | undefined;
|
private releaseNotesRecipientId: Long | undefined;
|
||||||
private releaseNotesChatId: Long | undefined;
|
private releaseNotesChatId: Long | undefined;
|
||||||
private pendingGroupAvatars = new Map<string, string>();
|
private pendingGroupAvatars = new Map<string, string>();
|
||||||
|
private recentMessages = new CircularMessageCache({
|
||||||
|
size: RECENT_MESSAGES_CACHE_SIZE,
|
||||||
|
flush: () => this.saveMessageBatcher.flushAndWait(),
|
||||||
|
});
|
||||||
|
|
||||||
private constructor() {
|
private constructor() {
|
||||||
super({ objectMode: true });
|
super({ objectMode: true });
|
||||||
|
@ -491,28 +497,15 @@ export class BackupImportStream extends Writable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private saveConversation(attributes: ConversationAttributesType): void {
|
private saveConversation(attributes: ConversationAttributesType): void {
|
||||||
// add the conversation into memory without saving it to DB (that will happen in
|
|
||||||
// batcher); if we didn't do this, when we register messages to MessageCache, it would
|
|
||||||
// automatically create (and save to DB) a duplicate conversation which would have to
|
|
||||||
// be later merged
|
|
||||||
window.ConversationController.dangerouslyCreateAndAdd(attributes);
|
|
||||||
this.conversationOpBatcher.add({ isUpdate: false, attributes });
|
this.conversationOpBatcher.add({ isUpdate: false, attributes });
|
||||||
}
|
}
|
||||||
|
|
||||||
private updateConversation(attributes: ConversationAttributesType): void {
|
private updateConversation(attributes: ConversationAttributesType): void {
|
||||||
const existing = window.ConversationController.get(attributes.id);
|
|
||||||
if (existing) {
|
|
||||||
existing.set(attributes);
|
|
||||||
}
|
|
||||||
this.conversationOpBatcher.add({ isUpdate: true, attributes });
|
this.conversationOpBatcher.add({ isUpdate: true, attributes });
|
||||||
}
|
}
|
||||||
|
|
||||||
private saveMessage(attributes: MessageAttributesType): void {
|
private saveMessage(attributes: MessageAttributesType): void {
|
||||||
window.MessageCache.__DEPRECATED$register(
|
this.recentMessages.push(attributes);
|
||||||
attributes.id,
|
|
||||||
attributes,
|
|
||||||
'import.saveMessage'
|
|
||||||
);
|
|
||||||
this.saveMessageBatcher.add(attributes);
|
this.saveMessageBatcher.add(attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1593,7 +1586,10 @@ export class BackupImportStream extends Writable {
|
||||||
}) ?? [],
|
}) ?? [],
|
||||||
type: this.convertQuoteType(quote.type),
|
type: this.convertQuoteType(quote.type),
|
||||||
},
|
},
|
||||||
conversationId
|
conversationId,
|
||||||
|
{
|
||||||
|
messageCache: this.recentMessages,
|
||||||
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
80
ts/services/backups/util/CircularMessageCache.ts
Normal file
80
ts/services/backups/util/CircularMessageCache.ts
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
// Copyright 2024 Signal Messenger, LLC
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import type {
|
||||||
|
ReadonlyMessageAttributesType,
|
||||||
|
MessageAttributesType,
|
||||||
|
} from '../../../model-types.d';
|
||||||
|
import { find } from '../../../util/iterables';
|
||||||
|
import { DataReader } from '../../../sql/Client';
|
||||||
|
|
||||||
|
export type CircularMessageCacheOptionsType = Readonly<{
|
||||||
|
size: number;
|
||||||
|
flush: () => Promise<void>;
|
||||||
|
}>;
|
||||||
|
|
||||||
|
export class CircularMessageCache {
|
||||||
|
private readonly flush: () => Promise<void>;
|
||||||
|
private readonly buffer: Array<MessageAttributesType | undefined>;
|
||||||
|
private readonly sentAtToMessages = new Map<
|
||||||
|
number,
|
||||||
|
Set<MessageAttributesType>
|
||||||
|
>();
|
||||||
|
private offset = 0;
|
||||||
|
|
||||||
|
constructor({ size, flush }: CircularMessageCacheOptionsType) {
|
||||||
|
this.flush = flush;
|
||||||
|
this.buffer = new Array(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
public push(attributes: MessageAttributesType): void {
|
||||||
|
const stale = this.buffer[this.offset];
|
||||||
|
this.buffer[this.offset] = attributes;
|
||||||
|
this.offset = (this.offset + 1) % this.buffer.length;
|
||||||
|
|
||||||
|
let addedSet = this.sentAtToMessages.get(attributes.sent_at);
|
||||||
|
if (addedSet === undefined) {
|
||||||
|
addedSet = new Set();
|
||||||
|
this.sentAtToMessages.set(attributes.sent_at, addedSet);
|
||||||
|
}
|
||||||
|
addedSet.add(attributes);
|
||||||
|
|
||||||
|
if (stale === undefined) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const staleSet = this.sentAtToMessages.get(stale.sent_at);
|
||||||
|
if (staleSet === undefined) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
staleSet.delete(stale);
|
||||||
|
if (staleSet.size === 0) {
|
||||||
|
this.sentAtToMessages.delete(stale.sent_at);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async findBySentAt(
|
||||||
|
sentAt: number,
|
||||||
|
predicate: (attributes: ReadonlyMessageAttributesType) => boolean
|
||||||
|
): Promise<MessageAttributesType | undefined> {
|
||||||
|
const set = this.sentAtToMessages.get(sentAt);
|
||||||
|
if (set !== undefined) {
|
||||||
|
const cached = find(set.values(), predicate);
|
||||||
|
if (cached != null) {
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.flush();
|
||||||
|
|
||||||
|
const onDisk = await DataReader.getMessagesBySentAt(sentAt);
|
||||||
|
return onDisk.find(predicate);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just a stub to conform with the interface
|
||||||
|
public async upgradeSchema(
|
||||||
|
attributes: MessageAttributesType
|
||||||
|
): Promise<MessageAttributesType> {
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
}
|
74
ts/test-electron/backup/CircularMessageCache_test.ts
Normal file
74
ts/test-electron/backup/CircularMessageCache_test.ts
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
// Copyright 2024 Signal Messenger, LLC
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import { assert } from 'chai';
|
||||||
|
import * as sinon from 'sinon';
|
||||||
|
|
||||||
|
import { generateAci } from '../../types/ServiceId';
|
||||||
|
import { type MessageAttributesType } from '../../model-types.d';
|
||||||
|
import { CircularMessageCache } from '../../services/backups/util/CircularMessageCache';
|
||||||
|
import { DataWriter } from '../../sql/Client';
|
||||||
|
|
||||||
|
const OUR_ACI = generateAci();
|
||||||
|
|
||||||
|
function createMessage(sentAt: number): MessageAttributesType {
|
||||||
|
return {
|
||||||
|
sent_at: sentAt,
|
||||||
|
received_at: sentAt,
|
||||||
|
timestamp: sentAt,
|
||||||
|
|
||||||
|
id: 'abc',
|
||||||
|
type: 'incoming' as const,
|
||||||
|
conversationId: 'cid',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('backup/attachments', () => {
|
||||||
|
let messageCache: CircularMessageCache;
|
||||||
|
let flush: sinon.SinonStub;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await DataWriter.removeAll();
|
||||||
|
flush = sinon.stub();
|
||||||
|
messageCache = new CircularMessageCache({
|
||||||
|
size: 2,
|
||||||
|
flush,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await DataWriter.removeAll();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return a cached message', async () => {
|
||||||
|
const message = createMessage(123);
|
||||||
|
messageCache.push(message);
|
||||||
|
|
||||||
|
const found = await messageCache.findBySentAt(123, () => true);
|
||||||
|
sinon.assert.notCalled(flush);
|
||||||
|
assert.strictEqual(found, message);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should purge message from cache on overflow', async () => {
|
||||||
|
messageCache.push(createMessage(123));
|
||||||
|
messageCache.push(createMessage(124));
|
||||||
|
messageCache.push(createMessage(125));
|
||||||
|
|
||||||
|
const found = await messageCache.findBySentAt(123, () => true);
|
||||||
|
sinon.assert.calledOnce(flush);
|
||||||
|
assert.isUndefined(found);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should find message in the database', async () => {
|
||||||
|
const message = createMessage(123);
|
||||||
|
|
||||||
|
await DataWriter.saveMessage(message, {
|
||||||
|
ourAci: OUR_ACI,
|
||||||
|
forceSave: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const found = await messageCache.findBySentAt(123, () => true);
|
||||||
|
sinon.assert.calledOnce(flush);
|
||||||
|
assert.deepStrictEqual(found, message);
|
||||||
|
});
|
||||||
|
});
|
Loading…
Add table
Reference in a new issue