New getRecentStoryReplies function to clean up replies in multiple convos

This commit is contained in:
Scott Nonnenberg 2023-07-21 15:10:32 -07:00 committed by GitHub
parent ca84d637ae
commit 716f852970
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 356 additions and 63 deletions

View file

@ -312,13 +312,13 @@ export async function sendReaction(
if (!ephemeralMessageForReactionSend.doNotSave) {
const reactionMessage = ephemeralMessageForReactionSend;
await Promise.all([
await window.Signal.Data.saveMessage(reactionMessage.attributes, {
ourUuid,
forceSave: true,
}),
reactionMessage.hydrateStoryContext(message.attributes),
]);
await reactionMessage.hydrateStoryContext(message.attributes, {
shouldSave: false,
});
await window.Signal.Data.saveMessage(reactionMessage.attributes, {
ourUuid,
forceSave: true,
});
void conversation.addSingleMessage(
window.MessageController.register(reactionMessage.id, reactionMessage)

View file

@ -1395,7 +1395,7 @@ export class ConversationModel extends window.Backbone
}
private async beforeAddSingleMessage(message: MessageModel): Promise<void> {
await message.hydrateStoryContext();
await message.hydrateStoryContext(undefined, { shouldSave: true });
if (!this.newMessageQueue) {
this.newMessageQueue = new PQueue({
@ -1778,7 +1778,11 @@ export class ConversationModel extends window.Backbone
log.warn(`cleanModels: Upgraded schema of ${upgraded} messages`);
}
await Promise.all(result.map(model => model.hydrateStoryContext()));
await Promise.all(
result.map(model =>
model.hydrateStoryContext(undefined, { shouldSave: true })
)
);
return result;
}

View file

@ -332,8 +332,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
}
async hydrateStoryContext(
inMemoryMessage?: MessageAttributesType
inMemoryMessage?: MessageAttributesType,
{
shouldSave,
}: {
shouldSave?: boolean;
} = {}
): Promise<void> {
const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString();
const storyId = this.get('storyId');
if (!storyId) {
return;
@ -366,6 +372,9 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
messageId: '',
},
});
if (shouldSave) {
await window.Signal.Data.saveMessage(this.attributes, { ourUuid });
}
return;
}
@ -382,6 +391,9 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
messageId: message.id,
},
});
if (shouldSave) {
await window.Signal.Data.saveMessage(this.attributes, { ourUuid });
}
}
// Dependencies of prop-generation functions
@ -1028,7 +1040,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
if (this.get('storyReplyContext')) {
this.unset('storyReplyContext');
}
await this.hydrateStoryContext(message.attributes);
await this.hydrateStoryContext(message.attributes, { shouldSave: true });
return;
}
@ -2610,7 +2622,9 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
});
if (storyQuote) {
await this.hydrateStoryContext(storyQuote.attributes);
await this.hydrateStoryContext(storyQuote.attributes, {
shouldSave: true,
});
}
const isSupported = !isUnsupportedMessage(message.attributes);
@ -3003,14 +3017,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
},
});
await generatedMessage.hydrateStoryContext(storyMessage, {
shouldSave: false,
});
// Note: generatedMessage comes with an id, so we have to force this save
await Promise.all([
window.Signal.Data.saveMessage(generatedMessage.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
forceSave: true,
}),
generatedMessage.hydrateStoryContext(storyMessage),
]);
await window.Signal.Data.saveMessage(generatedMessage.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
forceSave: true,
});
log.info('Reactions.onReaction adding reaction to story', {
reactionMessageId: getMessageIdForLogging(
@ -3159,13 +3173,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
generatedMessage,
'Story reactions must provide storyReactionmessage'
);
await Promise.all([
await window.Signal.Data.saveMessage(generatedMessage.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
forceSave: true,
}),
generatedMessage.hydrateStoryContext(this.attributes),
]);
await generatedMessage.hydrateStoryContext(this.attributes, {
shouldSave: false,
});
await window.Signal.Data.saveMessage(generatedMessage.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
forceSave: true,
});
void conversation.addSingleMessage(
window.MessageController.register(

View file

@ -36,6 +36,7 @@ import type {
ClientSearchResultMessageType,
ConversationType,
GetConversationRangeCenteredOnMessageResultType,
GetRecentStoryRepliesOptionsType,
IdentityKeyIdType,
IdentityKeyType,
StoredIdentityKeyType,
@ -99,6 +100,7 @@ const exclusiveInterface: ClientExclusiveInterface = {
searchMessages,
getRecentStoryReplies,
getOlderMessagesByConversation,
getConversationRangeCenteredOnMessage,
getNewerMessagesByConversation,
@ -613,6 +615,15 @@ async function getNewerMessagesByConversation(
return handleMessageJSON(messages);
}
async function getRecentStoryReplies(
storyId: string,
options?: GetRecentStoryRepliesOptionsType
): Promise<Array<MessageType>> {
const messages = await channels.getRecentStoryReplies(storyId, options);
return handleMessageJSON(messages);
}
async function getOlderMessagesByConversation(
options: AdjacentMessagesByConversationOptionsType
): Promise<Array<MessageType>> {

View file

@ -825,6 +825,11 @@ export type ServerInterface = DataInterface & {
options?: { limit?: number };
contactUuidsMatchingQuery?: Array<string>;
}) => Promise<Array<ServerSearchResultMessageType>>;
getRecentStoryReplies(
storyId: string,
options?: GetRecentStoryRepliesOptionsType
): Promise<Array<MessageTypeUnhydrated>>;
getOlderMessagesByConversation: (
options: AdjacentMessagesByConversationOptionsType
) => Promise<Array<MessageTypeUnhydrated>>;
@ -895,6 +900,13 @@ export type ServerInterface = DataInterface & {
getAllBadgeImageFileLocalPaths: () => Promise<Set<string>>;
};
export type GetRecentStoryRepliesOptionsType = {
limit?: number;
messageId?: string;
receivedAt?: number;
sentAt?: number;
};
// Differing signature on client/server
export type ClientExclusiveInterface = {
// Differing signature on client/server
@ -913,6 +925,11 @@ export type ClientExclusiveInterface = {
options?: { limit?: number };
contactUuidsMatchingQuery?: Array<string>;
}) => Promise<Array<ClientSearchResultMessageType>>;
getRecentStoryReplies(
storyId: string,
options?: GetRecentStoryRepliesOptionsType
): Promise<Array<MessageAttributesType>>;
getOlderMessagesByConversation: (
options: AdjacentMessagesByConversationOptionsType
) => Promise<Array<MessageAttributesType>>;

View file

@ -93,6 +93,7 @@ import type {
GetAllStoriesResultType,
GetConversationRangeCenteredOnMessageResultType,
GetKnownMessageAttachmentsResultType,
GetRecentStoryRepliesOptionsType,
GetUnreadByConversationAndMarkReadResultType,
IdentityKeyIdType,
StoredIdentityKeyType,
@ -251,6 +252,7 @@ const dataInterface: ServerInterface = {
getMessageCount,
getStoryCount,
getRecentStoryReplies,
saveMessage,
saveMessages,
removeMessage,
@ -2530,6 +2532,53 @@ enum AdjacentDirection {
Newer = 'Newer',
}
async function getRecentStoryReplies(
storyId: string,
options?: GetRecentStoryRepliesOptionsType
): Promise<Array<MessageTypeUnhydrated>> {
return getRecentStoryRepliesSync(storyId, options);
}
// This function needs to pull story replies from all conversations, because when we send
// a story to one or more distribution lists, each reply to it will be in the sender's
// 1:1 conversation with us.
function getRecentStoryRepliesSync(
storyId: string,
{
limit = 100,
messageId,
receivedAt = Number.MAX_VALUE,
sentAt = Number.MAX_VALUE,
}: GetRecentStoryRepliesOptionsType = {}
): Array<MessageTypeUnhydrated> {
const db = getInstance();
const timeFilters = {
first: sqlFragment`received_at = ${receivedAt} AND sent_at < ${sentAt}`,
second: sqlFragment`received_at < ${receivedAt}`,
};
const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment`
SELECT json FROM messages WHERE
(${messageId} IS NULL OR id IS NOT ${messageId}) AND
isStory IS 0 AND
storyId IS ${storyId} AND
(
${timeFilter}
)
ORDER BY received_at DESC, sent_at DESC
`;
const template = sqlFragment`
SELECT first.json FROM (${createQuery(timeFilters.first)}) as first
UNION ALL
SELECT second.json FROM (${createQuery(timeFilters.second)}) as second
`;
const [query, params] = sql`${template} LIMIT ${limit}`;
return db.prepare(query).all(params);
}
function getAdjacentMessagesByConversationSync(
direction: AdjacentDirection,
{

View file

@ -0,0 +1,32 @@
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
export default function updateToSchemaVersion86(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 86) {
return;
}
db.transaction(() => {
// The key reason for this new schema is that all of our previous schemas start with
// conversationId. This query is meant to find all replies to a given story, no
// matter the conversation.
db.exec(
`CREATE INDEX messages_story_replies
ON messages (storyId, received_at, sent_at)
WHERE isStory IS 0;
`
);
db.pragma('user_version = 86');
})();
logger.info('updateToSchemaVersion86: success!');
}

View file

@ -61,6 +61,7 @@ import updateToSchemaVersion82 from './82-edited-messages-read-index';
import updateToSchemaVersion83 from './83-mentions';
import updateToSchemaVersion84 from './84-all-mentions';
import updateToSchemaVersion85 from './85-add-kyber-keys';
import updateToSchemaVersion86 from './86-story-replies-index';
function updateToSchemaVersion1(
currentVersion: number,
@ -1992,6 +1993,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion83,
updateToSchemaVersion84,
updateToSchemaVersion85,
updateToSchemaVersion86,
];
export function updateSchema(db: Database, logger: LoggerType): void {

View file

@ -0,0 +1,118 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import dataInterface from '../../sql/Client';
import { UUID } from '../../types/UUID';
import type { UUIDStringType } from '../../types/UUID';
import type { MessageAttributesType } from '../../model-types.d';
const { _getAllMessages, getRecentStoryReplies, removeAll, saveMessages } =
dataInterface;
function getUuid(): UUIDStringType {
return UUID.generate().toString();
}
describe('sql/getRecentStoryReplies', () => {
beforeEach(async () => {
await removeAll();
});
it('returns message matching storyId in all converssations ', async () => {
assert.lengthOf(await _getAllMessages(), 0);
const now = Date.now();
const conversationId1 = getUuid();
const conversationId2 = getUuid();
const conversationId3 = getUuid();
const ourUuid = getUuid();
const storyId = getUuid();
const message1: MessageAttributesType = {
id: getUuid(),
body: 'message 1 - reply #1',
type: 'incoming',
conversationId: conversationId1,
sent_at: now - 20,
received_at: now - 20,
timestamp: now - 20,
storyId,
};
const message2: MessageAttributesType = {
id: getUuid(),
body: 'message 2 - reply #2',
type: 'incoming',
conversationId: conversationId2,
sent_at: now - 10,
received_at: now - 10,
timestamp: now - 10,
storyId,
};
const message3: MessageAttributesType = {
id: getUuid(),
body: 'message 3 - reply #3',
type: 'incoming',
conversationId: conversationId3,
sent_at: now,
received_at: now,
timestamp: now,
storyId,
};
const message4: MessageAttributesType = {
id: getUuid(),
body: 'message 4 - the story itself',
type: 'story',
conversationId: conversationId3,
sent_at: now,
received_at: now,
timestamp: now,
storyId,
};
const message5: MessageAttributesType = {
id: getUuid(),
body: 'message 5 - different story reply',
type: 'incoming',
conversationId: conversationId1,
sent_at: now,
received_at: now,
timestamp: now,
storyId: getUuid(),
};
const message6: MessageAttributesType = {
id: getUuid(),
body: 'message 6 - no story fields',
type: 'incoming',
conversationId: conversationId1,
sent_at: now,
received_at: now,
timestamp: now,
};
await saveMessages(
[message1, message2, message3, message4, message5, message6],
{
forceSave: true,
ourUuid,
}
);
assert.lengthOf(await _getAllMessages(), 6);
const searchResultsPage1 = await getRecentStoryReplies(storyId, {
limit: 2,
});
assert.lengthOf(searchResultsPage1, 2, 'page 1');
assert.strictEqual(searchResultsPage1[0].body, message3.body);
assert.strictEqual(searchResultsPage1[1].body, message2.body);
const searchResultsPage2 = await getRecentStoryReplies(storyId, {
messageId: message2.id,
receivedAt: message2.received_at,
limit: 2,
});
assert.lengthOf(searchResultsPage2, 1, 'page 2');
assert.strictEqual(searchResultsPage2[0].body, message1.body);
});
});

View file

@ -3528,4 +3528,50 @@ describe('SQL migrations test', () => {
assert.isAtLeast(object.createdAt, startingTime);
});
});
describe('updateToSchemaVersion86', () => {
it('supports the right index for first query used in getRecentStoryRepliesSync', () => {
updateToVersion(86);
const [query, params] = sql`
EXPLAIN QUERY PLAN
SELECT json FROM messages WHERE
('messageId' IS NULL OR id IS NOT 'messageId') AND
isStory IS 0 AND
storyId IS 'storyId' AND
received_at = 100000 AND sent_at < 100000
ORDER BY received_at DESC, sent_at DESC
LIMIT 100
`;
const { detail } = db.prepare(query).get(params);
assert.notInclude(detail, 'B-TREE');
assert.notInclude(detail, 'SCAN');
assert.include(
detail,
'SEARCH messages USING INDEX messages_story_replies (storyId=? AND received_at=? AND sent_at<?)'
);
});
it('supports the right index for second query used in getRecentStoryRepliesSync', () => {
updateToVersion(86);
const [query, params] = sql`
EXPLAIN QUERY PLAN
SELECT json FROM messages WHERE
('messageId' IS NULL OR id IS NOT 'messageId') AND
isStory IS 0 AND
storyId IS 'storyId' AND
received_at < 100000
ORDER BY received_at DESC, sent_at DESC
LIMIT 100
`;
const { detail } = db.prepare(query).get(params);
assert.notInclude(detail, 'B-TREE');
assert.notInclude(detail, 'SCAN');
assert.include(
detail,
'SEARCH messages USING INDEX messages_story_replies (storyId=? AND received_at<?)'
);
});
});
});

View file

@ -5,7 +5,7 @@ import type { MessageAttributesType } from '../model-types.d';
import { deletePackReference } from '../types/Stickers';
import { isStory } from '../messages/helpers';
import { isDirectConversation } from './whatTypeOfConversation';
import { drop } from './drop';
import * as log from '../logging/log';
export async function cleanupMessage(
message: MessageAttributesType
@ -20,44 +20,45 @@ export async function cleanupMessage(
window.MessageController.unregister(id);
await deleteMessageData(message);
const isGroupConversation = Boolean(
parentConversation && !isDirectConversation(parentConversation.attributes)
);
if (isStory(message)) {
await cleanupStoryReplies(conversationId, id, isGroupConversation);
}
}
async function cleanupStoryReplies(
conversationId: string,
storyId: string,
isGroupConversation: boolean,
story: MessageAttributesType,
pagination?: {
messageId: string;
receivedAt: number;
}
): Promise<void> {
const { messageId, receivedAt } = pagination || {};
const storyId = story.id;
const parentConversation = window.ConversationController.get(
story.conversationId
);
const isGroupConversation = Boolean(
parentConversation && !isDirectConversation(parentConversation.attributes)
);
const replies = await window.Signal.Data.getOlderMessagesByConversation({
conversationId,
includeStoryReplies: false,
messageId,
receivedAt,
const replies = await window.Signal.Data.getRecentStoryReplies(
storyId,
});
pagination
);
const logId = `cleanupStoryReplies(${storyId}/isGroup=${isGroupConversation})`;
const lastMessage = replies[replies.length - 1];
const lastMessageId = lastMessage?.id;
const lastReceivedAt = lastMessage?.received_at;
log.info(
`${logId}: Cleaning ${replies.length} replies, ending with message ${lastMessageId}`
);
if (!replies.length) {
return;
}
const lastMessage = replies[replies.length - 1];
const lastMessageId = lastMessage.id;
const lastReceivedAt = lastMessage.received_at;
if (messageId === lastMessageId) {
if (pagination?.messageId === lastMessageId) {
log.info(
`${logId}: Returning early; last message id is pagination starting id`
);
return;
}
@ -74,14 +75,16 @@ async function cleanupStoryReplies(
);
} else {
// Refresh the storyReplyContext data for 1:1 conversations
replies.forEach(reply => {
const model = window.MessageController.register(reply.id, reply);
model.unset('storyReplyContext');
drop(model.hydrateStoryContext());
});
await Promise.all(
replies.map(async reply => {
const model = window.MessageController.register(reply.id, reply);
model.unset('storyReplyContext');
await model.hydrateStoryContext(story, { shouldSave: true });
})
);
}
return cleanupStoryReplies(conversationId, storyId, isGroupConversation, {
return cleanupStoryReplies(story, {
messageId: lastMessageId,
receivedAt: lastReceivedAt,
});
@ -93,13 +96,9 @@ export async function deleteMessageData(
await window.Signal.Migrations.deleteExternalMessageFiles(message);
if (isStory(message)) {
const { id, conversationId } = message;
const parentConversation =
window.ConversationController.get(conversationId);
const isGroupConversation = Boolean(
parentConversation && !isDirectConversation(parentConversation.attributes)
);
await cleanupStoryReplies(conversationId, id, isGroupConversation);
// Attachments have been deleted from disk; remove from memory before replies update
const storyWithoutAttachments = { ...message, attachments: undefined };
await cleanupStoryReplies(storyWithoutAttachments);
}
const { sticker } = message;