Batch receipt processing
This commit is contained in:
parent
a03aab14cb
commit
e7086a83b2
6 changed files with 342 additions and 212 deletions
|
@ -1,7 +1,7 @@
|
||||||
// Copyright 2016 Signal Messenger, LLC
|
// Copyright 2016 Signal Messenger, LLC
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
import { isEqual } from 'lodash';
|
import { groupBy } from 'lodash';
|
||||||
|
|
||||||
import type { MessageModel } from '../models/messages';
|
import type { MessageModel } from '../models/messages';
|
||||||
import type { MessageAttributesType } from '../model-types.d';
|
import type { MessageAttributesType } from '../model-types.d';
|
||||||
|
@ -11,7 +11,6 @@ import { getOwn } from '../util/getOwn';
|
||||||
import { missingCaseError } from '../util/missingCaseError';
|
import { missingCaseError } from '../util/missingCaseError';
|
||||||
import { createWaitBatcher } from '../util/waitBatcher';
|
import { createWaitBatcher } from '../util/waitBatcher';
|
||||||
import type { ServiceIdString } from '../types/ServiceId';
|
import type { ServiceIdString } from '../types/ServiceId';
|
||||||
import * as Errors from '../types/errors';
|
|
||||||
import {
|
import {
|
||||||
SendActionType,
|
SendActionType,
|
||||||
SendStatus,
|
SendStatus,
|
||||||
|
@ -22,11 +21,16 @@ import type { DeleteSentProtoRecipientOptionsType } from '../sql/Interface';
|
||||||
import dataInterface from '../sql/Client';
|
import dataInterface from '../sql/Client';
|
||||||
import * as log from '../logging/log';
|
import * as log from '../logging/log';
|
||||||
import { getSourceServiceId } from '../messages/helpers';
|
import { getSourceServiceId } from '../messages/helpers';
|
||||||
import { queueUpdateMessage } from '../util/messageBatcher';
|
|
||||||
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
|
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
|
||||||
import { getMessageIdForLogging } from '../util/idForLogging';
|
import { getMessageIdForLogging } from '../util/idForLogging';
|
||||||
import { generateCacheKey } from './generateCacheKey';
|
import { generateCacheKey } from './generateCacheKey';
|
||||||
import { getPropForTimestamp } from '../util/editHelpers';
|
import { getPropForTimestamp } from '../util/editHelpers';
|
||||||
|
import {
|
||||||
|
DELETE_SENT_PROTO_BATCHER_WAIT_MS,
|
||||||
|
RECEIPT_BATCHER_WAIT_MS,
|
||||||
|
} from '../types/Receipt';
|
||||||
|
import { drop } from '../util/drop';
|
||||||
|
import { strictAssert } from '../util/assert';
|
||||||
|
|
||||||
const { deleteSentProtoRecipient } = dataInterface;
|
const { deleteSentProtoRecipient } = dataInterface;
|
||||||
|
|
||||||
|
@ -48,11 +52,193 @@ export type MessageReceiptAttributesType = {
|
||||||
wasSentEncrypted: boolean;
|
wasSentEncrypted: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
const receipts = new Map<string, MessageReceiptAttributesType>();
|
function getReceiptCacheKey(receipt: MessageReceiptAttributesType): string {
|
||||||
|
return generateCacheKey({
|
||||||
|
sender: receipt.sourceServiceId,
|
||||||
|
timestamp: receipt.messageSentAt,
|
||||||
|
type: receipt.type,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const cachedReceipts = new Map<string, MessageReceiptAttributesType>();
|
||||||
|
|
||||||
|
const processReceiptBatcher = createWaitBatcher({
|
||||||
|
name: 'processReceiptBatcher',
|
||||||
|
wait: RECEIPT_BATCHER_WAIT_MS,
|
||||||
|
maxSize: 250,
|
||||||
|
async processBatch(receipts: Array<MessageReceiptAttributesType>) {
|
||||||
|
// First group by sentAt, so that we can find the target message
|
||||||
|
const receiptsByMessageSentAt = groupBy(
|
||||||
|
receipts,
|
||||||
|
receipt => receipt.messageSentAt
|
||||||
|
);
|
||||||
|
|
||||||
|
// Once we find the message, we'll group them by messageId to process
|
||||||
|
// all receipts for a given message
|
||||||
|
const receiptsByMessageId: Map<
|
||||||
|
string,
|
||||||
|
Array<MessageReceiptAttributesType>
|
||||||
|
> = new Map();
|
||||||
|
|
||||||
|
function addReceiptAndTargetMessage(
|
||||||
|
message: MessageAttributesType,
|
||||||
|
receipt: MessageReceiptAttributesType
|
||||||
|
): void {
|
||||||
|
const existing = receiptsByMessageId.get(message.id);
|
||||||
|
|
||||||
|
if (!existing) {
|
||||||
|
window.MessageCache.toMessageAttributes(message);
|
||||||
|
receiptsByMessageId.set(message.id, [receipt]);
|
||||||
|
} else {
|
||||||
|
existing.push(receipt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const receiptsForMessageSentAt of Object.values(
|
||||||
|
receiptsByMessageSentAt
|
||||||
|
)) {
|
||||||
|
if (!receiptsForMessageSentAt.length) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// All receipts have the same sentAt, so we can grab it from the first
|
||||||
|
const sentAt = receiptsForMessageSentAt[0].messageSentAt;
|
||||||
|
|
||||||
|
const messagesMatchingTimestamp =
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
await window.Signal.Data.getMessagesBySentAt(sentAt);
|
||||||
|
|
||||||
|
for (const receipt of receiptsForMessageSentAt) {
|
||||||
|
const targetMessage = getTargetMessage({
|
||||||
|
sourceConversationId: receipt.sourceConversationId,
|
||||||
|
targetTimestamp: sentAt,
|
||||||
|
messagesMatchingTimestamp,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (targetMessage) {
|
||||||
|
addReceiptAndTargetMessage(targetMessage, receipt);
|
||||||
|
} else {
|
||||||
|
// We didn't find any messages but maybe it's a story sent message
|
||||||
|
const targetMessages = messagesMatchingTimestamp.filter(
|
||||||
|
item =>
|
||||||
|
item.storyDistributionListId &&
|
||||||
|
item.sendStateByConversationId &&
|
||||||
|
!item.deletedForEveryone &&
|
||||||
|
Boolean(
|
||||||
|
item.sendStateByConversationId[receipt.sourceConversationId]
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (targetMessages.length) {
|
||||||
|
targetMessages.forEach(msg =>
|
||||||
|
addReceiptAndTargetMessage(msg, receipt)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// Nope, no target message was found
|
||||||
|
log.info(
|
||||||
|
'MessageReceipts.processReceiptBatcher: No message for receipt',
|
||||||
|
receipt.messageSentAt,
|
||||||
|
receipt.type,
|
||||||
|
receipt.sourceConversationId,
|
||||||
|
receipt.sourceServiceId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [
|
||||||
|
messageId,
|
||||||
|
receiptsForMessage,
|
||||||
|
] of receiptsByMessageId.entries()) {
|
||||||
|
drop(processReceiptsForMessage(messageId, receiptsForMessage));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
async function processReceiptsForMessage(
|
||||||
|
messageId: string,
|
||||||
|
receipts: Array<MessageReceiptAttributesType>
|
||||||
|
) {
|
||||||
|
if (!receipts.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get message from cache or DB
|
||||||
|
const message = await window.MessageCache.resolveAttributes(
|
||||||
|
'processReceiptsForMessage',
|
||||||
|
messageId
|
||||||
|
);
|
||||||
|
|
||||||
|
const { updatedMessage, validReceipts } = updateMessageWithReceipts(
|
||||||
|
message,
|
||||||
|
receipts
|
||||||
|
);
|
||||||
|
|
||||||
|
// Save it to cache & to DB
|
||||||
|
await window.MessageCache.setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes: updatedMessage,
|
||||||
|
skipSaveToDatabase: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Confirm/remove receipts, and delete sent protos
|
||||||
|
for (const receipt of validReceipts) {
|
||||||
|
remove(receipt);
|
||||||
|
drop(addToDeleteSentProtoBatcher(receipt, updatedMessage));
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify frontend listeners
|
||||||
|
const conversation = window.ConversationController.get(
|
||||||
|
message.conversationId
|
||||||
|
);
|
||||||
|
conversation?.debouncedUpdateLastMessage?.();
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateMessageWithReceipts(
|
||||||
|
message: MessageAttributesType,
|
||||||
|
receipts: Array<MessageReceiptAttributesType>
|
||||||
|
): {
|
||||||
|
updatedMessage: MessageAttributesType;
|
||||||
|
validReceipts: Array<MessageReceiptAttributesType>;
|
||||||
|
} {
|
||||||
|
const logId = `updateMessageWithReceipts(timestamp=${message.timestamp})`;
|
||||||
|
|
||||||
|
const receiptsToProcess = receipts.filter(receipt => {
|
||||||
|
if (shouldDropReceipt(receipt, message)) {
|
||||||
|
log.info(
|
||||||
|
`${logId}: Dropping a receipt ${receipt.type} for sentAt=${receipt.messageSentAt}`
|
||||||
|
);
|
||||||
|
remove(receipt);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cachedReceipts.has(getReceiptCacheKey(receipt))) {
|
||||||
|
// Between the time it was received and now, this receipt has already been handled!
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
`${logId}: batch processing ${receipts.length}` +
|
||||||
|
` receipt${receipts.length === 1 ? '' : 's'}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// Generate the updated message synchronously
|
||||||
|
let updatedMessage: MessageAttributesType = { ...message };
|
||||||
|
for (const receipt of receiptsToProcess) {
|
||||||
|
updatedMessage = {
|
||||||
|
...updatedMessage,
|
||||||
|
...updateMessageSendStateWithReceipt(message.id, receipt),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return { updatedMessage, validReceipts: receiptsToProcess };
|
||||||
|
}
|
||||||
|
|
||||||
const deleteSentProtoBatcher = createWaitBatcher({
|
const deleteSentProtoBatcher = createWaitBatcher({
|
||||||
name: 'deleteSentProtoBatcher',
|
name: 'deleteSentProtoBatcher',
|
||||||
wait: 250,
|
wait: DELETE_SENT_PROTO_BATCHER_WAIT_MS,
|
||||||
maxSize: 30,
|
maxSize: 30,
|
||||||
async processBatch(items: Array<DeleteSentProtoRecipientOptionsType>) {
|
async processBatch(items: Array<DeleteSentProtoRecipientOptionsType>) {
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -81,30 +267,24 @@ const deleteSentProtoBatcher = createWaitBatcher({
|
||||||
});
|
});
|
||||||
|
|
||||||
function remove(receipt: MessageReceiptAttributesType): void {
|
function remove(receipt: MessageReceiptAttributesType): void {
|
||||||
receipts.delete(
|
cachedReceipts.delete(getReceiptCacheKey(receipt));
|
||||||
generateCacheKey({
|
|
||||||
sender: receipt.sourceServiceId,
|
|
||||||
timestamp: receipt.messageSentAt,
|
|
||||||
type: receipt.type,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
receipt.removeFromMessageReceiverCache();
|
receipt.removeFromMessageReceiverCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
function getTargetMessage({
|
function getTargetMessage({
|
||||||
sourceConversationId,
|
sourceConversationId,
|
||||||
messages,
|
messagesMatchingTimestamp,
|
||||||
targetTimestamp,
|
targetTimestamp,
|
||||||
}: {
|
}: {
|
||||||
sourceConversationId: string;
|
sourceConversationId: string;
|
||||||
messages: ReadonlyArray<MessageAttributesType>;
|
messagesMatchingTimestamp: ReadonlyArray<MessageAttributesType>;
|
||||||
targetTimestamp: number;
|
targetTimestamp: number;
|
||||||
}): MessageModel | null {
|
}): MessageAttributesType | null {
|
||||||
if (messages.length === 0) {
|
if (messagesMatchingTimestamp.length === 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
const matchingMessages = messages
|
const matchingMessages = messagesMatchingTimestamp
|
||||||
.filter(msg => isOutgoing(msg) || isStory(msg))
|
.filter(msg => isOutgoing(msg) || isStory(msg))
|
||||||
.filter(msg => {
|
.filter(msg => {
|
||||||
const sendStateByConversationId = getPropForTimestamp({
|
const sendStateByConversationId = getPropForTimestamp({
|
||||||
|
@ -155,18 +335,13 @@ function getTargetMessage({
|
||||||
}
|
}
|
||||||
|
|
||||||
const message = matchingMessages[0];
|
const message = matchingMessages[0];
|
||||||
return window.MessageCache.__DEPRECATED$register(
|
return window.MessageCache.toMessageAttributes(message);
|
||||||
message.id,
|
|
||||||
message,
|
|
||||||
'MessageReceipts.getTargetMessage'
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const wasDeliveredWithSealedSender = (
|
const wasDeliveredWithSealedSender = (
|
||||||
conversationId: string,
|
conversationId: string,
|
||||||
message: MessageModel
|
message: MessageAttributesType
|
||||||
): boolean =>
|
): boolean =>
|
||||||
(message.get('unidentifiedDeliveries') || []).some(
|
(message.unidentifiedDeliveries || []).some(
|
||||||
identifier =>
|
identifier =>
|
||||||
window.ConversationController.getConversationId(identifier) ===
|
window.ConversationController.getConversationId(identifier) ===
|
||||||
conversationId
|
conversationId
|
||||||
|
@ -174,7 +349,7 @@ const wasDeliveredWithSealedSender = (
|
||||||
|
|
||||||
const shouldDropReceipt = (
|
const shouldDropReceipt = (
|
||||||
receipt: MessageReceiptAttributesType,
|
receipt: MessageReceiptAttributesType,
|
||||||
message: MessageModel
|
message: MessageAttributesType
|
||||||
): boolean => {
|
): boolean => {
|
||||||
const { type } = receipt;
|
const { type } = receipt;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -183,7 +358,7 @@ const shouldDropReceipt = (
|
||||||
case MessageReceiptType.Read:
|
case MessageReceiptType.Read:
|
||||||
return !window.storage.get('read-receipt-setting');
|
return !window.storage.get('read-receipt-setting');
|
||||||
case MessageReceiptType.View:
|
case MessageReceiptType.View:
|
||||||
if (isStory(message.attributes)) {
|
if (isStory(message)) {
|
||||||
return !window.Events.getStoryViewReceiptsEnabled();
|
return !window.Events.getStoryViewReceiptsEnabled();
|
||||||
}
|
}
|
||||||
return !window.storage.get('read-receipt-setting');
|
return !window.storage.get('read-receipt-setting');
|
||||||
|
@ -209,7 +384,7 @@ export function forMessage(
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
const receiptValues = Array.from(receipts.values());
|
const receiptValues = Array.from(cachedReceipts.values());
|
||||||
|
|
||||||
const sentAt = getMessageSentTimestamp(message.attributes, { log });
|
const sentAt = getMessageSentTimestamp(message.attributes, { log });
|
||||||
const result = receiptValues.filter(item => item.messageSentAt === sentAt);
|
const result = receiptValues.filter(item => item.messageSentAt === sentAt);
|
||||||
|
@ -221,10 +396,11 @@ export function forMessage(
|
||||||
}
|
}
|
||||||
|
|
||||||
return result.filter(receipt => {
|
return result.filter(receipt => {
|
||||||
if (shouldDropReceipt(receipt, message)) {
|
if (shouldDropReceipt(receipt, message.attributes)) {
|
||||||
log.info(
|
log.info(
|
||||||
`${logId}: Dropping an early receipt ${receipt.type} for message ${sentAt}`
|
`${logId}: Dropping an early receipt ${receipt.type} for message ${sentAt}`
|
||||||
);
|
);
|
||||||
|
remove(receipt);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +413,6 @@ function getNewSendStateByConversationId(
|
||||||
receipt: MessageReceiptAttributesType
|
receipt: MessageReceiptAttributesType
|
||||||
): SendStateByConversationId {
|
): SendStateByConversationId {
|
||||||
const { receiptTimestamp, sourceConversationId, type } = receipt;
|
const { receiptTimestamp, sourceConversationId, type } = receipt;
|
||||||
|
|
||||||
const oldSendState = getOwn(
|
const oldSendState = getOwn(
|
||||||
oldSendStateByConversationId,
|
oldSendStateByConversationId,
|
||||||
sourceConversationId
|
sourceConversationId
|
||||||
|
@ -257,43 +432,35 @@ function getNewSendStateByConversationId(
|
||||||
default:
|
default:
|
||||||
throw missingCaseError(type);
|
throw missingCaseError(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
const newSendState = sendStateReducer(oldSendState, {
|
const newSendState = sendStateReducer(oldSendState, {
|
||||||
type: sendActionType,
|
type: sendActionType,
|
||||||
updatedAt: receiptTimestamp,
|
updatedAt: receiptTimestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
return {
|
return {
|
||||||
...oldSendStateByConversationId,
|
...oldSendStateByConversationId,
|
||||||
[sourceConversationId]: newSendState,
|
[sourceConversationId]: newSendState,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async function updateMessageSendState(
|
function updateMessageSendStateWithReceipt(
|
||||||
receipt: MessageReceiptAttributesType,
|
messageId: string,
|
||||||
message: MessageModel
|
receipt: MessageReceiptAttributesType
|
||||||
): Promise<void> {
|
): Partial<MessageAttributesType> {
|
||||||
const { messageSentAt } = receipt;
|
const { messageSentAt } = receipt;
|
||||||
const logId = `MessageReceipts.updateMessageSendState(sentAt=${receipt.messageSentAt})`;
|
|
||||||
|
|
||||||
if (shouldDropReceipt(receipt, message)) {
|
// Get message from cache to make sure we have most recent
|
||||||
log.info(
|
const message = window.MessageCache.accessAttributes(messageId);
|
||||||
`${logId}: Dropping a receipt ${receipt.type} for message ${messageSentAt}`
|
strictAssert(message, 'Message should exist in cache');
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let hasChanges = false;
|
const newAttributes: Partial<MessageAttributesType> = {};
|
||||||
|
|
||||||
const editHistory = message.get('editHistory') ?? [];
|
const newEditHistory = (message.editHistory ?? []).map(edit => {
|
||||||
const newEditHistory = editHistory?.map(edit => {
|
|
||||||
if (messageSentAt !== edit.timestamp) {
|
if (messageSentAt !== edit.timestamp) {
|
||||||
return edit;
|
return edit;
|
||||||
}
|
}
|
||||||
|
|
||||||
const oldSendStateByConversationId = edit.sendStateByConversationId ?? {};
|
|
||||||
const newSendStateByConversationId = getNewSendStateByConversationId(
|
const newSendStateByConversationId = getNewSendStateByConversationId(
|
||||||
oldSendStateByConversationId,
|
edit.sendStateByConversationId ?? {},
|
||||||
receipt
|
receipt
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -302,46 +469,30 @@ async function updateMessageSendState(
|
||||||
sendStateByConversationId: newSendStateByConversationId,
|
sendStateByConversationId: newSendStateByConversationId,
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
if (!isEqual(newEditHistory, editHistory)) {
|
|
||||||
message.set('editHistory', newEditHistory);
|
if (message.editHistory?.length) {
|
||||||
hasChanges = true;
|
newAttributes.editHistory = newEditHistory;
|
||||||
}
|
}
|
||||||
|
|
||||||
const editMessageTimestamp = message.get('editMessageTimestamp');
|
const { editMessageTimestamp, timestamp } = message;
|
||||||
if (
|
if (
|
||||||
(!editMessageTimestamp && messageSentAt === message.get('timestamp')) ||
|
(!editMessageTimestamp && messageSentAt === timestamp) ||
|
||||||
messageSentAt === editMessageTimestamp
|
messageSentAt === editMessageTimestamp
|
||||||
) {
|
) {
|
||||||
const oldSendStateByConversationId =
|
|
||||||
message.get('sendStateByConversationId') ?? {};
|
|
||||||
const newSendStateByConversationId = getNewSendStateByConversationId(
|
const newSendStateByConversationId = getNewSendStateByConversationId(
|
||||||
oldSendStateByConversationId,
|
message.sendStateByConversationId ?? {},
|
||||||
receipt
|
receipt
|
||||||
);
|
);
|
||||||
|
newAttributes.sendStateByConversationId = newSendStateByConversationId;
|
||||||
// The send state may not change. For example, this can happen if we get a read
|
|
||||||
// receipt before a delivery receipt.
|
|
||||||
if (!isEqual(oldSendStateByConversationId, newSendStateByConversationId)) {
|
|
||||||
message.set('sendStateByConversationId', newSendStateByConversationId);
|
|
||||||
hasChanges = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasChanges) {
|
return newAttributes;
|
||||||
queueUpdateMessage(message.attributes);
|
}
|
||||||
|
|
||||||
// notify frontend listeners
|
|
||||||
const conversation = window.ConversationController.get(
|
|
||||||
message.get('conversationId')
|
|
||||||
);
|
|
||||||
const updateLeftPane = conversation
|
|
||||||
? conversation.debouncedUpdateLastMessage
|
|
||||||
: undefined;
|
|
||||||
if (updateLeftPane) {
|
|
||||||
updateLeftPane();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
async function addToDeleteSentProtoBatcher(
|
||||||
|
receipt: MessageReceiptAttributesType,
|
||||||
|
message: MessageAttributesType
|
||||||
|
) {
|
||||||
const { sourceConversationId, type } = receipt;
|
const { sourceConversationId, type } = receipt;
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
@ -355,22 +506,15 @@ async function updateMessageSendState(
|
||||||
const deviceId = receipt.sourceDevice;
|
const deviceId = receipt.sourceDevice;
|
||||||
|
|
||||||
if (recipientServiceId && deviceId) {
|
if (recipientServiceId && deviceId) {
|
||||||
await Promise.all([
|
await deleteSentProtoBatcher.add({
|
||||||
deleteSentProtoBatcher.add({
|
timestamp: receipt.messageSentAt,
|
||||||
timestamp: messageSentAt,
|
recipientServiceId,
|
||||||
recipientServiceId,
|
deviceId,
|
||||||
deviceId,
|
});
|
||||||
}),
|
|
||||||
|
|
||||||
// We want the above call to not be delayed when testing with
|
|
||||||
// CI.
|
|
||||||
window.SignalCI
|
|
||||||
? deleteSentProtoBatcher.flushAndWait()
|
|
||||||
: Promise.resolve(),
|
|
||||||
]);
|
|
||||||
} else {
|
} else {
|
||||||
log.warn(
|
log.warn(
|
||||||
`${logId}: Missing serviceId or deviceId for deliveredTo ${sourceConversationId}`
|
`MessageReceipts.deleteSentProto(sentAt=${receipt.messageSentAt}): ` +
|
||||||
|
`Missing serviceId or deviceId for deliveredTo ${sourceConversationId}`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -379,69 +523,6 @@ async function updateMessageSendState(
|
||||||
export async function onReceipt(
|
export async function onReceipt(
|
||||||
receipt: MessageReceiptAttributesType
|
receipt: MessageReceiptAttributesType
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
receipts.set(
|
cachedReceipts.set(getReceiptCacheKey(receipt), receipt);
|
||||||
generateCacheKey({
|
await processReceiptBatcher.add(receipt);
|
||||||
sender: receipt.sourceServiceId,
|
|
||||||
timestamp: receipt.messageSentAt,
|
|
||||||
type: receipt.type,
|
|
||||||
}),
|
|
||||||
receipt
|
|
||||||
);
|
|
||||||
|
|
||||||
const { messageSentAt, sourceConversationId, sourceServiceId, type } =
|
|
||||||
receipt;
|
|
||||||
|
|
||||||
const logId = `MessageReceipts.onReceipt(sentAt=${receipt.messageSentAt})`;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const messages = await window.Signal.Data.getMessagesBySentAt(
|
|
||||||
messageSentAt
|
|
||||||
);
|
|
||||||
|
|
||||||
const message = getTargetMessage({
|
|
||||||
sourceConversationId,
|
|
||||||
messages,
|
|
||||||
targetTimestamp: receipt.messageSentAt,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (message) {
|
|
||||||
await updateMessageSendState(receipt, message);
|
|
||||||
} else {
|
|
||||||
// We didn't find any messages but maybe it's a story sent message
|
|
||||||
const targetMessages = messages.filter(
|
|
||||||
item =>
|
|
||||||
item.storyDistributionListId &&
|
|
||||||
item.sendStateByConversationId &&
|
|
||||||
!item.deletedForEveryone &&
|
|
||||||
Boolean(item.sendStateByConversationId[sourceConversationId])
|
|
||||||
);
|
|
||||||
|
|
||||||
// Nope, no target message was found
|
|
||||||
if (!targetMessages.length) {
|
|
||||||
log.info(
|
|
||||||
`${logId}: No message for receipt`,
|
|
||||||
type,
|
|
||||||
sourceConversationId,
|
|
||||||
sourceServiceId
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await Promise.all(
|
|
||||||
targetMessages.map(msg => {
|
|
||||||
const model = window.MessageCache.__DEPRECATED$register(
|
|
||||||
msg.id,
|
|
||||||
msg,
|
|
||||||
'MessageReceipts.onReceipt'
|
|
||||||
);
|
|
||||||
return updateMessageSendState(receipt, model);
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
remove(receipt);
|
|
||||||
} catch (error) {
|
|
||||||
remove(receipt);
|
|
||||||
log.error(`${logId} error:`, Errors.toLogFormat(error));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import type { MessageAttributesType } from '../model-types.d';
|
||||||
import type { MessageModel } from '../models/messages';
|
import type { MessageModel } from '../models/messages';
|
||||||
import * as Errors from '../types/errors';
|
import * as Errors from '../types/errors';
|
||||||
import * as log from '../logging/log';
|
import * as log from '../logging/log';
|
||||||
import { drop } from '../util/drop';
|
|
||||||
import { getEnvironment, Environment } from '../environment';
|
import { getEnvironment, Environment } from '../environment';
|
||||||
import { getMessageConversation } from '../util/getMessageConversation';
|
import { getMessageConversation } from '../util/getMessageConversation';
|
||||||
import { getMessageModelLogger } from '../util/MessageModelLogger';
|
import { getMessageModelLogger } from '../util/MessageModelLogger';
|
||||||
|
@ -144,15 +143,39 @@ export class MessageCache {
|
||||||
|
|
||||||
// Updates a message's attributes and saves the message to cache and to the
|
// Updates a message's attributes and saves the message to cache and to the
|
||||||
// database. Option to skip the save to the database.
|
// database. Option to skip the save to the database.
|
||||||
|
|
||||||
|
// Overload #1: if skipSaveToDatabase = true, returns void
|
||||||
|
public setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes,
|
||||||
|
skipSaveToDatabase,
|
||||||
|
}: {
|
||||||
|
messageId: string;
|
||||||
|
messageAttributes: Partial<MessageAttributesType>;
|
||||||
|
skipSaveToDatabase: true;
|
||||||
|
}): void;
|
||||||
|
|
||||||
|
// Overload #2: if skipSaveToDatabase = false, returns DB save promise
|
||||||
|
public setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes,
|
||||||
|
skipSaveToDatabase,
|
||||||
|
}: {
|
||||||
|
messageId: string;
|
||||||
|
messageAttributes: Partial<MessageAttributesType>;
|
||||||
|
skipSaveToDatabase: false;
|
||||||
|
}): Promise<string>;
|
||||||
|
|
||||||
|
// Implementation
|
||||||
public setAttributes({
|
public setAttributes({
|
||||||
messageId,
|
messageId,
|
||||||
messageAttributes: partialMessageAttributes,
|
messageAttributes: partialMessageAttributes,
|
||||||
skipSaveToDatabase = false,
|
skipSaveToDatabase,
|
||||||
}: {
|
}: {
|
||||||
messageId: string;
|
messageId: string;
|
||||||
messageAttributes: Partial<MessageAttributesType>;
|
messageAttributes: Partial<MessageAttributesType>;
|
||||||
skipSaveToDatabase: boolean;
|
skipSaveToDatabase: boolean;
|
||||||
}): void {
|
}): Promise<string> | undefined {
|
||||||
let messageAttributes = this.accessAttributes(messageId);
|
let messageAttributes = this.accessAttributes(messageId);
|
||||||
|
|
||||||
softAssert(messageAttributes, 'could not find message attributes');
|
softAssert(messageAttributes, 'could not find message attributes');
|
||||||
|
@ -206,11 +229,10 @@ export class MessageCache {
|
||||||
if (skipSaveToDatabase) {
|
if (skipSaveToDatabase) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
drop(
|
|
||||||
window.Signal.Data.saveMessage(messageAttributes, {
|
return window.Signal.Data.saveMessage(messageAttributes, {
|
||||||
ourAci: window.textsecure.storage.user.getCheckedAci(),
|
ourAci: window.textsecure.storage.user.getCheckedAci(),
|
||||||
})
|
});
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private throttledReduxUpdaters = new LRU<string, typeof this.updateRedux>({
|
private throttledReduxUpdaters = new LRU<string, typeof this.updateRedux>({
|
||||||
|
|
|
@ -10,13 +10,15 @@ import type { Page } from 'playwright';
|
||||||
import type { App } from '../playwright';
|
import type { App } from '../playwright';
|
||||||
import * as durations from '../../util/durations';
|
import * as durations from '../../util/durations';
|
||||||
import { Bootstrap } from '../bootstrap';
|
import { Bootstrap } from '../bootstrap';
|
||||||
import { ReceiptType } from '../../types/Receipt';
|
import { RECEIPT_BATCHER_WAIT_MS, ReceiptType } from '../../types/Receipt';
|
||||||
import { SendStatus } from '../../messages/MessageSendState';
|
import { SendStatus } from '../../messages/MessageSendState';
|
||||||
import { drop } from '../../util/drop';
|
import { drop } from '../../util/drop';
|
||||||
import { strictAssert } from '../../util/assert';
|
import { strictAssert } from '../../util/assert';
|
||||||
import { generateAci } from '../../types/ServiceId';
|
import { generateAci } from '../../types/ServiceId';
|
||||||
import { IMAGE_GIF } from '../../types/MIME';
|
import { IMAGE_GIF } from '../../types/MIME';
|
||||||
import { type } from '../helpers';
|
import { type } from '../helpers';
|
||||||
|
import type { MessageAttributesType } from '../../model-types';
|
||||||
|
import { sleep } from '../../util/sleep';
|
||||||
|
|
||||||
export const debug = createDebug('mock:test:edit');
|
export const debug = createDebug('mock:test:edit');
|
||||||
|
|
||||||
|
@ -501,6 +503,20 @@ describe('editing', function (this: Mocha.Suite) {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('tracks message send state for edits', async () => {
|
it('tracks message send state for edits', async () => {
|
||||||
|
async function getMessageFromApp(
|
||||||
|
originalMessageTimestamp: number
|
||||||
|
): Promise<MessageAttributesType> {
|
||||||
|
await sleep(RECEIPT_BATCHER_WAIT_MS + 20);
|
||||||
|
const messages = await page.evaluate(
|
||||||
|
timestamp => window.SignalCI?.getMessagesBySentAt(timestamp),
|
||||||
|
originalMessageTimestamp
|
||||||
|
);
|
||||||
|
strictAssert(messages, 'messages does not exist');
|
||||||
|
strictAssert(messages.length === 1, 'message does not exist');
|
||||||
|
|
||||||
|
return messages[0];
|
||||||
|
}
|
||||||
|
|
||||||
async function editMessage(
|
async function editMessage(
|
||||||
page: Page,
|
page: Page,
|
||||||
timestamp: number,
|
timestamp: number,
|
||||||
|
@ -601,14 +617,8 @@ describe('editing', function (this: Mocha.Suite) {
|
||||||
debug("testing message's send state (original)");
|
debug("testing message's send state (original)");
|
||||||
{
|
{
|
||||||
debug('getting message from app (original)');
|
debug('getting message from app (original)');
|
||||||
const messages = await page.evaluate(
|
const message = await getMessageFromApp(originalMessageTimestamp);
|
||||||
timestamp => window.SignalCI?.getMessagesBySentAt(timestamp),
|
|
||||||
originalMessageTimestamp
|
|
||||||
);
|
|
||||||
strictAssert(messages, 'messages does not exist');
|
|
||||||
|
|
||||||
debug('verifying message send state (original)');
|
|
||||||
const [message] = messages;
|
|
||||||
strictAssert(
|
strictAssert(
|
||||||
message.sendStateByConversationId,
|
message.sendStateByConversationId,
|
||||||
'sendStateByConversationId'
|
'sendStateByConversationId'
|
||||||
|
@ -651,15 +661,7 @@ describe('editing', function (this: Mocha.Suite) {
|
||||||
|
|
||||||
debug("testing message's send state (current(v2) and original (v1))");
|
debug("testing message's send state (current(v2) and original (v1))");
|
||||||
{
|
{
|
||||||
debug('getting message from app');
|
const message = await getMessageFromApp(originalMessageTimestamp);
|
||||||
const messages = await page.evaluate(
|
|
||||||
timestamp => window.SignalCI?.getMessagesBySentAt(timestamp),
|
|
||||||
originalMessageTimestamp
|
|
||||||
);
|
|
||||||
strictAssert(messages, 'messages does not exist');
|
|
||||||
|
|
||||||
debug('verifying message send state & edit');
|
|
||||||
const [message] = messages;
|
|
||||||
strictAssert(message.editHistory, 'edit history exists');
|
strictAssert(message.editHistory, 'edit history exists');
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||||
const [_v2, v1] = message.editHistory;
|
const [_v2, v1] = message.editHistory;
|
||||||
|
@ -747,14 +749,7 @@ describe('editing', function (this: Mocha.Suite) {
|
||||||
debug("testing v4's send state");
|
debug("testing v4's send state");
|
||||||
{
|
{
|
||||||
debug('getting edited message from app (v4)');
|
debug('getting edited message from app (v4)');
|
||||||
const messages = await page.evaluate(
|
const message = await getMessageFromApp(originalMessageTimestamp);
|
||||||
timestamp => window.SignalCI?.getMessagesBySentAt(timestamp),
|
|
||||||
originalMessageTimestamp
|
|
||||||
);
|
|
||||||
strictAssert(messages, 'messages does not exist');
|
|
||||||
|
|
||||||
debug('verifying edited message send state (v4)');
|
|
||||||
const [message] = messages;
|
|
||||||
|
|
||||||
strictAssert(
|
strictAssert(
|
||||||
message.sendStateByConversationId,
|
message.sendStateByConversationId,
|
||||||
|
|
|
@ -18,6 +18,11 @@ import { MY_STORY_ID } from '../../types/Stories';
|
||||||
import { isUntaggedPniString, toTaggedPni } from '../../types/ServiceId';
|
import { isUntaggedPniString, toTaggedPni } from '../../types/ServiceId';
|
||||||
import { Bootstrap } from '../bootstrap';
|
import { Bootstrap } from '../bootstrap';
|
||||||
import type { App } from '../bootstrap';
|
import type { App } from '../bootstrap';
|
||||||
|
import {
|
||||||
|
DELETE_SENT_PROTO_BATCHER_WAIT_MS,
|
||||||
|
RECEIPT_BATCHER_WAIT_MS,
|
||||||
|
} from '../../types/Receipt';
|
||||||
|
import { sleep } from '../../util/sleep';
|
||||||
|
|
||||||
export const debug = createDebug('mock:test:pni-signature');
|
export const debug = createDebug('mock:test:pni-signature');
|
||||||
|
|
||||||
|
@ -221,7 +226,12 @@ describe('pnp/PNI Signature', function (this: Mocha.Suite) {
|
||||||
messageTimestamps: [dataMessage.timestamp?.toNumber() ?? 0],
|
messageTimestamps: [dataMessage.timestamp?.toNumber() ?? 0],
|
||||||
timestamp: receiptTimestamp,
|
timestamp: receiptTimestamp,
|
||||||
});
|
});
|
||||||
|
// Wait for receipts to be batched and processed (+ buffer)
|
||||||
|
await sleep(
|
||||||
|
RECEIPT_BATCHER_WAIT_MS + DELETE_SENT_PROTO_BATCHER_WAIT_MS + 20
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug('Enter third message text');
|
debug('Enter third message text');
|
||||||
{
|
{
|
||||||
const compositionInput = await app.waitForEnabledComposer();
|
const compositionInput = await app.waitForEnabledComposer();
|
||||||
|
|
|
@ -21,3 +21,6 @@ export enum ReceiptType {
|
||||||
}
|
}
|
||||||
|
|
||||||
export type Receipt = z.infer<typeof receiptSchema>;
|
export type Receipt = z.infer<typeof receiptSchema>;
|
||||||
|
|
||||||
|
export const RECEIPT_BATCHER_WAIT_MS = 250;
|
||||||
|
export const DELETE_SENT_PROTO_BATCHER_WAIT_MS = 250;
|
||||||
|
|
|
@ -60,20 +60,30 @@ export async function hydrateStoryContext(
|
||||||
conversation && isDirectConversation(conversation.attributes),
|
conversation && isDirectConversation(conversation.attributes),
|
||||||
'hydrateStoryContext: Not a type=direct conversation'
|
'hydrateStoryContext: Not a type=direct conversation'
|
||||||
);
|
);
|
||||||
window.MessageCache.setAttributes({
|
const newMessageAttributes: Partial<MessageAttributesType> = {
|
||||||
messageId,
|
storyReplyContext: {
|
||||||
messageAttributes: {
|
attachment: undefined,
|
||||||
storyReplyContext: {
|
// This is ok to do because story replies only show in 1:1 conversations
|
||||||
attachment: undefined,
|
// so the story that was quoted should be from the same conversation.
|
||||||
// This is ok to do because story replies only show in 1:1 conversations
|
authorAci: conversation?.getAci(),
|
||||||
// so the story that was quoted should be from the same conversation.
|
// No messageId = referenced story not found
|
||||||
authorAci: conversation?.getAci(),
|
messageId: '',
|
||||||
// No messageId = referenced story not found
|
|
||||||
messageId: '',
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
skipSaveToDatabase: !shouldSave,
|
};
|
||||||
});
|
if (shouldSave) {
|
||||||
|
await window.MessageCache.setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes: newMessageAttributes,
|
||||||
|
skipSaveToDatabase: false,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
window.MessageCache.setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes: newMessageAttributes,
|
||||||
|
skipSaveToDatabase: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,15 +95,24 @@ export async function hydrateStoryContext(
|
||||||
|
|
||||||
const { sourceServiceId: authorAci } = storyMessage;
|
const { sourceServiceId: authorAci } = storyMessage;
|
||||||
strictAssert(isAciString(authorAci), 'Story message from pni');
|
strictAssert(isAciString(authorAci), 'Story message from pni');
|
||||||
window.MessageCache.setAttributes({
|
const newMessageAttributes: Partial<MessageAttributesType> = {
|
||||||
messageId,
|
storyReplyContext: {
|
||||||
messageAttributes: {
|
attachment: omit(attachment, 'screenshotData'),
|
||||||
storyReplyContext: {
|
authorAci,
|
||||||
attachment: omit(attachment, 'screenshotData'),
|
messageId: storyMessage.id,
|
||||||
authorAci,
|
|
||||||
messageId: storyMessage.id,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
skipSaveToDatabase: !shouldSave,
|
};
|
||||||
});
|
if (shouldSave) {
|
||||||
|
await window.MessageCache.setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes: newMessageAttributes,
|
||||||
|
skipSaveToDatabase: false,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
window.MessageCache.setAttributes({
|
||||||
|
messageId,
|
||||||
|
messageAttributes: newMessageAttributes,
|
||||||
|
skipSaveToDatabase: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue