Apply out of order operations to edited messages

This commit is contained in:
Josh Perez 2023-07-19 20:17:13 -04:00 committed by GitHub
parent e724b2d9de
commit e2ab1b3444
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 296 additions and 266 deletions

View file

@ -3,7 +3,6 @@
import {
isEmpty,
isEqual,
isNumber,
isObject,
mapValues,
@ -20,21 +19,13 @@ import type {
MessageReactionType,
QuotedMessageType,
} from '../model-types.d';
import {
filter,
find,
map,
reduce,
repeat,
zipObject,
} from '../util/iterables';
import { filter, find, map, repeat, zipObject } from '../util/iterables';
import * as GoogleChrome from '../util/GoogleChrome';
import type { DeleteModel } from '../messageModifiers/Deletes';
import type { SentEventData } from '../textsecure/messageReceiverEvents';
import { isNotNil } from '../util/isNotNil';
import { isNormalNumber } from '../util/isNormalNumber';
import { softAssert, strictAssert } from '../util/assert';
import { missingCaseError } from '../util/missingCaseError';
import { drop } from '../util/drop';
import { dropNull } from '../util/dropNull';
import type { ConversationModel } from './conversations';
@ -86,7 +77,7 @@ import {
import { handleMessageSend } from '../util/handleMessageSend';
import { getSendOptions } from '../util/getSendOptions';
import { findAndFormatContact } from '../util/findAndFormatContact';
import { canConversationBeUnarchived } from '../util/canConversationBeUnarchived';
import { modifyTargetMessage } from '../util/modifyTargetMessage';
import {
getAttachmentsForMessage,
getMessagePropStatus,
@ -119,17 +110,8 @@ import {
getCallSelector,
getActiveCall,
} from '../state/selectors/calling';
import {
MessageReceipts,
MessageReceiptType,
} from '../messageModifiers/MessageReceipts';
import { Deletes } from '../messageModifiers/Deletes';
import type { ReactionModel } from '../messageModifiers/Reactions';
import { Reactions } from '../messageModifiers/Reactions';
import { ReactionSource } from '../reactions/ReactionSource';
import { ReadSyncs } from '../messageModifiers/ReadSyncs';
import { ViewSyncs } from '../messageModifiers/ViewSyncs';
import { ViewOnceOpenSyncs } from '../messageModifiers/ViewOnceOpenSyncs';
import * as LinkPreview from '../types/LinkPreview';
import { SignalService as Proto } from '../protobuf';
import {
@ -176,14 +158,11 @@ import {
} from '../util/attachmentDownloadQueue';
import { getTitleNoDefault, getNumber } from '../util/getTitle';
import dataInterface from '../sql/Client';
import * as Edits from '../messageModifiers/Edits';
import { handleEditMessage } from '../util/handleEditMessage';
import { getQuoteBodyText } from '../util/getQuoteBodyText';
import { shouldReplyNotifyUser } from '../util/shouldReplyNotifyUser';
import { isConversationAccepted } from '../util/isConversationAccepted';
import type { RawBodyRange } from '../types/BodyRange';
import { BodyRange, applyRangesForText } from '../types/BodyRange';
import { deleteForEveryone } from '../util/deleteForEveryone';
import { getStringForProfileChange } from '../util/getStringForProfileChange';
import {
queueUpdateMessage,
@ -2917,248 +2896,10 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
conversation: ConversationModel,
isFirstRun: boolean
): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const message = this;
const type = message.get('type');
let changed = false;
const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString();
const sourceUuid = getSourceUuid(message.attributes);
if (type === 'outgoing' || (type === 'story' && ourUuid === sourceUuid)) {
const sendActions = MessageReceipts.getSingleton()
.forMessage(message)
.map(receipt => {
let sendActionType: SendActionType;
const receiptType = receipt.get('type');
switch (receiptType) {
case MessageReceiptType.Delivery:
sendActionType = SendActionType.GotDeliveryReceipt;
break;
case MessageReceiptType.Read:
sendActionType = SendActionType.GotReadReceipt;
break;
case MessageReceiptType.View:
sendActionType = SendActionType.GotViewedReceipt;
break;
default:
throw missingCaseError(receiptType);
}
return {
destinationConversationId: receipt.get('sourceConversationId'),
action: {
type: sendActionType,
updatedAt: receipt.get('receiptTimestamp'),
},
};
return modifyTargetMessage(this, conversation, {
isFirstRun,
skipEdits: false,
});
const oldSendStateByConversationId =
this.get('sendStateByConversationId') || {};
const newSendStateByConversationId = reduce(
sendActions,
(
result: SendStateByConversationId,
{ destinationConversationId, action }
) => {
const oldSendState = getOwn(result, destinationConversationId);
if (!oldSendState) {
log.warn(
`Got a receipt for a conversation (${destinationConversationId}), but we have no record of sending to them`
);
return result;
}
const newSendState = sendStateReducer(oldSendState, action);
return {
...result,
[destinationConversationId]: newSendState,
};
},
oldSendStateByConversationId
);
if (
!isEqual(oldSendStateByConversationId, newSendStateByConversationId)
) {
message.set('sendStateByConversationId', newSendStateByConversationId);
changed = true;
}
}
if (type === 'incoming') {
// In a followup (see DESKTOP-2100), we want to make `ReadSyncs#forMessage` return
// an array, not an object. This array wrapping makes that future a bit easier.
const readSync = ReadSyncs.getSingleton().forMessage(message);
const readSyncs = readSync ? [readSync] : [];
const viewSyncs = ViewSyncs.getSingleton().forMessage(message);
const isGroupStoryReply =
isGroup(conversation.attributes) && message.get('storyId');
if (readSyncs.length !== 0 || viewSyncs.length !== 0) {
const markReadAt = Math.min(
Date.now(),
...readSyncs.map(sync => sync.get('readAt')),
...viewSyncs.map(sync => sync.get('viewedAt'))
);
if (message.get('expireTimer')) {
const existingExpirationStartTimestamp = message.get(
'expirationStartTimestamp'
);
message.set(
'expirationStartTimestamp',
Math.min(existingExpirationStartTimestamp ?? Date.now(), markReadAt)
);
changed = true;
}
let newReadStatus: ReadStatus.Read | ReadStatus.Viewed;
if (viewSyncs.length) {
newReadStatus = ReadStatus.Viewed;
} else {
strictAssert(
readSyncs.length !== 0,
'Should have either view or read syncs'
);
newReadStatus = ReadStatus.Read;
}
message.set({
readStatus: newReadStatus,
seenStatus: SeenStatus.Seen,
});
changed = true;
this.pendingMarkRead = Math.min(
this.pendingMarkRead ?? Date.now(),
markReadAt
);
} else if (
isFirstRun &&
!isGroupStoryReply &&
canConversationBeUnarchived(conversation.attributes)
) {
conversation.setArchived(false);
}
if (!isFirstRun && this.pendingMarkRead) {
const markReadAt = this.pendingMarkRead;
this.pendingMarkRead = undefined;
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
// a message we already know about.
//
// We run this when `isFirstRun` is false so that it triggers when the
// message and the other ones accompanying it in the batch are fully in
// the database.
void message.getConversation()?.onReadMessage(message, markReadAt);
}
// Check for out-of-order view once open syncs
if (isTapToView(message.attributes)) {
const viewOnceOpenSync =
ViewOnceOpenSyncs.getSingleton().forMessage(message);
if (viewOnceOpenSync) {
await message.markViewOnceMessageViewed({ fromSync: true });
changed = true;
}
}
}
if (isStory(message.attributes)) {
const viewSyncs = ViewSyncs.getSingleton().forMessage(message);
if (viewSyncs.length !== 0) {
message.set({
readStatus: ReadStatus.Viewed,
seenStatus: SeenStatus.Seen,
});
changed = true;
const markReadAt = Math.min(
Date.now(),
...viewSyncs.map(sync => sync.get('viewedAt'))
);
this.pendingMarkRead = Math.min(
this.pendingMarkRead ?? Date.now(),
markReadAt
);
}
if (!message.get('expirationStartTimestamp')) {
log.info(
`modifyTargetMessage/${this.idForLogging()}: setting story expiration`,
{
expirationStartTimestamp: message.get('timestamp'),
expireTimer: message.get('expireTimer'),
}
);
message.set('expirationStartTimestamp', message.get('timestamp'));
changed = true;
}
}
// Does this message have any pending, previously-received associated reactions?
const reactions = Reactions.getSingleton().forMessage(message);
await Promise.all(
reactions.map(async reaction => {
if (isStory(this.attributes)) {
// We don't set changed = true here, because we don't modify the original story
const generatedMessage = reaction.get('storyReactionMessage');
strictAssert(
generatedMessage,
'Story reactions must provide storyReactionMessage'
);
await generatedMessage.handleReaction(reaction, {
storyMessage: this.attributes,
});
} else {
changed = true;
await message.handleReaction(reaction, { shouldPersist: false });
}
})
);
// Does this message have any pending, previously-received associated
// delete for everyone messages?
const deletes = Deletes.getSingleton().forMessage(message);
await Promise.all(
deletes.map(async del => {
await deleteForEveryone(message, del, false);
changed = true;
})
);
// We want to make sure the message is saved first before applying any edits
if (!isFirstRun) {
const edits = Edits.forMessage(message);
log.info(
`modifyTargetMessage/${this.idForLogging()}: ${
edits.length
} edits in second run`
);
await Promise.all(
edits.map(editAttributes =>
conversation.queueJob('modifyTargetMessage/edits', () =>
handleEditMessage(message.attributes, editAttributes)
)
)
);
}
if (changed && !isFirstRun) {
log.info(
`modifyTargetMessage/${this.idForLogging()}: Changes in second run; saving.`
);
await window.Signal.Data.saveMessage(this.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
});
}
}
async handleReaction(
@ -3510,6 +3251,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
messageId: this.id,
});
}
getPendingMarkRead(): number | undefined {
return this.pendingMarkRead;
}
setPendingMarkRead(value: number | undefined): void {
this.pendingMarkRead = value;
}
}
window.Whisper.Message = MessageModel;

View file

@ -21,6 +21,7 @@ import { isIncoming, isOutgoing } from '../messages/helpers';
import { isOlderThan } from './timestamp';
import { isDirectConversation } from './whatTypeOfConversation';
import { queueAttachmentDownloads } from './queueAttachmentDownloads';
import { modifyTargetMessage } from './modifyTargetMessage';
export async function handleEditMessage(
mainMessage: MessageAttributesType,
@ -285,10 +286,18 @@ export async function handleEditMessage(
)
);
drop(mainMessageModel.getConversation()?.updateLastMessage());
if (conversation) {
// Clear typing indicator
const typingToken = `${editAttributes.fromId}.${editAttributes.fromDevice}`;
conversation.clearContactTypingTimer(typingToken);
}
const mainMessageConversation = mainMessageModel.getConversation();
if (mainMessageConversation) {
drop(mainMessageConversation.updateLastMessage());
await modifyTargetMessage(mainMessageModel, mainMessageConversation, {
isFirstRun: true,
skipEdits: true,
});
}
}

View file

@ -0,0 +1,272 @@
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isEqual } from 'lodash';
import type { ConversationModel } from '../models/conversations';
import type { MessageModel } from '../models/messages';
import type { SendStateByConversationId } from '../messages/MessageSendState';
import * as Edits from '../messageModifiers/Edits';
import * as log from '../logging/log';
import { Deletes } from '../messageModifiers/Deletes';
import {
MessageReceipts,
MessageReceiptType,
} from '../messageModifiers/MessageReceipts';
import { Reactions } from '../messageModifiers/Reactions';
import { ReadStatus } from '../messages/MessageReadStatus';
import { ReadSyncs } from '../messageModifiers/ReadSyncs';
import { SeenStatus } from '../MessageSeenStatus';
import { SendActionType, sendStateReducer } from '../messages/MessageSendState';
import { ViewOnceOpenSyncs } from '../messageModifiers/ViewOnceOpenSyncs';
import { ViewSyncs } from '../messageModifiers/ViewSyncs';
import { canConversationBeUnarchived } from './canConversationBeUnarchived';
import { deleteForEveryone } from './deleteForEveryone';
import { handleEditMessage } from './handleEditMessage';
import { isGroup } from './whatTypeOfConversation';
import { isStory, isTapToView } from '../state/selectors/message';
import { getOwn } from './getOwn';
import { getSourceUuid } from '../messages/helpers';
import { missingCaseError } from './missingCaseError';
import { reduce } from './iterables';
import { strictAssert } from './assert';
// This function is called twice - once from handleDataMessage, and then again from
// saveAndNotify, a function called at the end of handleDataMessage as a cleanup for
// any missed out-of-order events.
export async function modifyTargetMessage(
message: MessageModel,
conversation: ConversationModel,
options?: { isFirstRun: boolean; skipEdits: boolean }
): Promise<void> {
const { isFirstRun = false, skipEdits = false } = options ?? {};
const logId = `modifyTargetMessage/${message.idForLogging()}`;
const type = message.get('type');
let changed = false;
const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString();
const sourceUuid = getSourceUuid(message.attributes);
if (type === 'outgoing' || (type === 'story' && ourUuid === sourceUuid)) {
const sendActions = MessageReceipts.getSingleton()
.forMessage(message)
.map(receipt => {
let sendActionType: SendActionType;
const receiptType = receipt.get('type');
switch (receiptType) {
case MessageReceiptType.Delivery:
sendActionType = SendActionType.GotDeliveryReceipt;
break;
case MessageReceiptType.Read:
sendActionType = SendActionType.GotReadReceipt;
break;
case MessageReceiptType.View:
sendActionType = SendActionType.GotViewedReceipt;
break;
default:
throw missingCaseError(receiptType);
}
return {
destinationConversationId: receipt.get('sourceConversationId'),
action: {
type: sendActionType,
updatedAt: receipt.get('receiptTimestamp'),
},
};
});
const oldSendStateByConversationId =
message.get('sendStateByConversationId') || {};
const newSendStateByConversationId = reduce(
sendActions,
(
result: SendStateByConversationId,
{ destinationConversationId, action }
) => {
const oldSendState = getOwn(result, destinationConversationId);
if (!oldSendState) {
log.warn(
`${logId}: Got a receipt for a conversation (${destinationConversationId}), but we have no record of sending to them`
);
return result;
}
const newSendState = sendStateReducer(oldSendState, action);
return {
...result,
[destinationConversationId]: newSendState,
};
},
oldSendStateByConversationId
);
if (!isEqual(oldSendStateByConversationId, newSendStateByConversationId)) {
message.set('sendStateByConversationId', newSendStateByConversationId);
changed = true;
}
}
if (type === 'incoming') {
// In a followup (see DESKTOP-2100), we want to make `ReadSyncs#forMessage` return
// an array, not an object. This array wrapping makes that future a bit easier.
const readSync = ReadSyncs.getSingleton().forMessage(message);
const readSyncs = readSync ? [readSync] : [];
const viewSyncs = ViewSyncs.getSingleton().forMessage(message);
const isGroupStoryReply =
isGroup(conversation.attributes) && message.get('storyId');
if (readSyncs.length !== 0 || viewSyncs.length !== 0) {
const markReadAt = Math.min(
Date.now(),
...readSyncs.map(sync => sync.get('readAt')),
...viewSyncs.map(sync => sync.get('viewedAt'))
);
if (message.get('expireTimer')) {
const existingExpirationStartTimestamp = message.get(
'expirationStartTimestamp'
);
message.set(
'expirationStartTimestamp',
Math.min(existingExpirationStartTimestamp ?? Date.now(), markReadAt)
);
changed = true;
}
let newReadStatus: ReadStatus.Read | ReadStatus.Viewed;
if (viewSyncs.length) {
newReadStatus = ReadStatus.Viewed;
} else {
strictAssert(
readSyncs.length !== 0,
'Should have either view or read syncs'
);
newReadStatus = ReadStatus.Read;
}
message.set({
readStatus: newReadStatus,
seenStatus: SeenStatus.Seen,
});
changed = true;
message.setPendingMarkRead(
Math.min(message.getPendingMarkRead() ?? Date.now(), markReadAt)
);
} else if (
isFirstRun &&
!isGroupStoryReply &&
canConversationBeUnarchived(conversation.attributes)
) {
conversation.setArchived(false);
}
if (!isFirstRun && message.getPendingMarkRead()) {
const markReadAt = message.getPendingMarkRead();
message.setPendingMarkRead(undefined);
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
// a message we already know about.
//
// We run message when `isFirstRun` is false so that it triggers when the
// message and the other ones accompanying it in the batch are fully in
// the database.
void message.getConversation()?.onReadMessage(message, markReadAt);
}
// Check for out-of-order view once open syncs
if (isTapToView(message.attributes)) {
const viewOnceOpenSync =
ViewOnceOpenSyncs.getSingleton().forMessage(message);
if (viewOnceOpenSync) {
await message.markViewOnceMessageViewed({ fromSync: true });
changed = true;
}
}
}
if (isStory(message.attributes)) {
const viewSyncs = ViewSyncs.getSingleton().forMessage(message);
if (viewSyncs.length !== 0) {
message.set({
readStatus: ReadStatus.Viewed,
seenStatus: SeenStatus.Seen,
});
changed = true;
const markReadAt = Math.min(
Date.now(),
...viewSyncs.map(sync => sync.get('viewedAt'))
);
message.setPendingMarkRead(
Math.min(message.getPendingMarkRead() ?? Date.now(), markReadAt)
);
}
if (!message.get('expirationStartTimestamp')) {
log.info(`${logId}: setting story expiration`, {
expirationStartTimestamp: message.get('timestamp'),
expireTimer: message.get('expireTimer'),
});
message.set('expirationStartTimestamp', message.get('timestamp'));
changed = true;
}
}
// Does message message have any pending, previously-received associated reactions?
const reactions = Reactions.getSingleton().forMessage(message);
await Promise.all(
reactions.map(async reaction => {
if (isStory(message.attributes)) {
// We don't set changed = true here, because we don't modify the original story
const generatedMessage = reaction.get('storyReactionMessage');
strictAssert(
generatedMessage,
'Story reactions must provide storyReactionMessage'
);
await generatedMessage.handleReaction(reaction, {
storyMessage: message.attributes,
});
} else {
changed = true;
await message.handleReaction(reaction, { shouldPersist: false });
}
})
);
// Does message message have any pending, previously-received associated
// delete for everyone messages?
const deletes = Deletes.getSingleton().forMessage(message);
await Promise.all(
deletes.map(async del => {
await deleteForEveryone(message, del, false);
changed = true;
})
);
// We want to make sure the message is saved first before applying any edits
if (!isFirstRun && !skipEdits) {
const edits = Edits.forMessage(message);
log.info(`${logId}: ${edits.length} edits in second run`);
await Promise.all(
edits.map(editAttributes =>
conversation.queueJob('modifyTargetMessage/edits', () =>
handleEditMessage(message.attributes, editAttributes)
)
)
);
}
if (changed && !isFirstRun) {
log.info(`${logId}: Changes in second run; saving.`);
await window.Signal.Data.saveMessage(message.attributes, {
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
});
}
}