Move receipts and view/read syncs to new syncTasks system
This commit is contained in:
parent
1a263e63da
commit
75c32e86f0
33 changed files with 1242 additions and 612 deletions
|
@ -1,6 +1,7 @@
|
|||
// Copyright 2016 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { z } from 'zod';
|
||||
import { groupBy } from 'lodash';
|
||||
|
||||
import type { MessageModel } from '../models/messages';
|
||||
|
@ -10,7 +11,7 @@ import { isOutgoing, isStory } from '../state/selectors/message';
|
|||
import { getOwn } from '../util/getOwn';
|
||||
import { missingCaseError } from '../util/missingCaseError';
|
||||
import { createWaitBatcher } from '../util/waitBatcher';
|
||||
import type { ServiceIdString } from '../types/ServiceId';
|
||||
import { isServiceIdString } from '../types/ServiceId';
|
||||
import {
|
||||
SendActionType,
|
||||
SendStatus,
|
||||
|
@ -23,7 +24,6 @@ import * as log from '../logging/log';
|
|||
import { getSourceServiceId } from '../messages/helpers';
|
||||
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
|
||||
import { getMessageIdForLogging } from '../util/idForLogging';
|
||||
import { generateCacheKey } from './generateCacheKey';
|
||||
import { getPropForTimestamp } from '../util/editHelpers';
|
||||
import {
|
||||
DELETE_SENT_PROTO_BATCHER_WAIT_MS,
|
||||
|
@ -31,34 +31,30 @@ import {
|
|||
} from '../types/Receipt';
|
||||
import { drop } from '../util/drop';
|
||||
|
||||
const { deleteSentProtoRecipient } = dataInterface;
|
||||
const { deleteSentProtoRecipient, removeSyncTaskById } = dataInterface;
|
||||
|
||||
export enum MessageReceiptType {
|
||||
Delivery = 'Delivery',
|
||||
Read = 'Read',
|
||||
View = 'View',
|
||||
}
|
||||
export const messageReceiptTypeSchema = z.enum(['Delivery', 'Read', 'View']);
|
||||
|
||||
export type MessageReceiptType = z.infer<typeof messageReceiptTypeSchema>;
|
||||
|
||||
export const receiptSyncTaskSchema = z.object({
|
||||
messageSentAt: z.number(),
|
||||
receiptTimestamp: z.number(),
|
||||
sourceConversationId: z.string(),
|
||||
sourceDevice: z.number(),
|
||||
sourceServiceId: z.string().refine(isServiceIdString),
|
||||
type: messageReceiptTypeSchema,
|
||||
wasSentEncrypted: z.boolean(),
|
||||
});
|
||||
|
||||
export type ReceiptSyncTaskType = z.infer<typeof receiptSyncTaskSchema>;
|
||||
|
||||
export type MessageReceiptAttributesType = {
|
||||
envelopeId: string;
|
||||
messageSentAt: number;
|
||||
receiptTimestamp: number;
|
||||
removeFromMessageReceiverCache: () => void;
|
||||
sourceConversationId: string;
|
||||
sourceDevice: number;
|
||||
sourceServiceId: ServiceIdString;
|
||||
type: MessageReceiptType;
|
||||
wasSentEncrypted: boolean;
|
||||
syncTaskId: string;
|
||||
receiptSync: ReceiptSyncTaskType;
|
||||
};
|
||||
|
||||
function getReceiptCacheKey(receipt: MessageReceiptAttributesType): string {
|
||||
return generateCacheKey({
|
||||
sender: receipt.sourceServiceId,
|
||||
timestamp: receipt.messageSentAt,
|
||||
type: receipt.type,
|
||||
});
|
||||
}
|
||||
|
||||
const cachedReceipts = new Map<string, MessageReceiptAttributesType>();
|
||||
|
||||
const processReceiptBatcher = createWaitBatcher({
|
||||
|
@ -69,7 +65,7 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
// First group by sentAt, so that we can find the target message
|
||||
const receiptsByMessageSentAt = groupBy(
|
||||
receipts,
|
||||
receipt => receipt.messageSentAt
|
||||
receipt => receipt.receiptSync.messageSentAt
|
||||
);
|
||||
|
||||
// Once we find the message, we'll group them by messageId to process
|
||||
|
@ -99,7 +95,7 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
continue;
|
||||
}
|
||||
// All receipts have the same sentAt, so we can grab it from the first
|
||||
const sentAt = receiptsForMessageSentAt[0].messageSentAt;
|
||||
const sentAt = receiptsForMessageSentAt[0].receiptSync.messageSentAt;
|
||||
|
||||
const messagesMatchingTimestamp =
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
|
@ -114,14 +110,16 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
|
||||
if (reaction) {
|
||||
for (const receipt of receiptsForMessageSentAt) {
|
||||
const { receiptSync } = receipt;
|
||||
log.info(
|
||||
'MesageReceipts.processReceiptBatcher: Got receipt for reaction',
|
||||
receipt.messageSentAt,
|
||||
receipt.type,
|
||||
receipt.sourceConversationId,
|
||||
receipt.sourceServiceId
|
||||
receiptSync.messageSentAt,
|
||||
receiptSync.type,
|
||||
receiptSync.sourceConversationId,
|
||||
receiptSync.sourceServiceId
|
||||
);
|
||||
remove(receipt);
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await remove(receipt);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -129,7 +127,7 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
|
||||
for (const receipt of receiptsForMessageSentAt) {
|
||||
const targetMessage = getTargetMessage({
|
||||
sourceConversationId: receipt.sourceConversationId,
|
||||
sourceConversationId: receipt.receiptSync.sourceConversationId,
|
||||
targetTimestamp: sentAt,
|
||||
messagesMatchingTimestamp,
|
||||
});
|
||||
|
@ -144,7 +142,9 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
item.sendStateByConversationId &&
|
||||
!item.deletedForEveryone &&
|
||||
Boolean(
|
||||
item.sendStateByConversationId[receipt.sourceConversationId]
|
||||
item.sendStateByConversationId[
|
||||
receipt.receiptSync.sourceConversationId
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -154,12 +154,13 @@ const processReceiptBatcher = createWaitBatcher({
|
|||
);
|
||||
} else {
|
||||
// Nope, no target message was found
|
||||
const { receiptSync } = receipt;
|
||||
log.info(
|
||||
'MessageReceipts.processReceiptBatcher: No message for receipt',
|
||||
receipt.messageSentAt,
|
||||
receipt.type,
|
||||
receipt.sourceConversationId,
|
||||
receipt.sourceServiceId
|
||||
receiptSync.messageSentAt,
|
||||
receiptSync.type,
|
||||
receiptSync.sourceConversationId,
|
||||
receiptSync.sourceServiceId
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -190,7 +191,7 @@ async function processReceiptsForMessage(
|
|||
messageId
|
||||
);
|
||||
|
||||
const { updatedMessage, validReceipts } = updateMessageWithReceipts(
|
||||
const { updatedMessage, validReceipts } = await updateMessageWithReceipts(
|
||||
message,
|
||||
receipts
|
||||
);
|
||||
|
@ -204,7 +205,8 @@ async function processReceiptsForMessage(
|
|||
|
||||
// Confirm/remove receipts, and delete sent protos
|
||||
for (const receipt of validReceipts) {
|
||||
remove(receipt);
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await remove(receipt);
|
||||
drop(addToDeleteSentProtoBatcher(receipt, updatedMessage));
|
||||
}
|
||||
|
||||
|
@ -215,25 +217,27 @@ async function processReceiptsForMessage(
|
|||
conversation?.debouncedUpdateLastMessage?.();
|
||||
}
|
||||
|
||||
function updateMessageWithReceipts(
|
||||
async function updateMessageWithReceipts(
|
||||
message: MessageAttributesType,
|
||||
receipts: Array<MessageReceiptAttributesType>
|
||||
): {
|
||||
): Promise<{
|
||||
updatedMessage: MessageAttributesType;
|
||||
validReceipts: Array<MessageReceiptAttributesType>;
|
||||
} {
|
||||
}> {
|
||||
const logId = `updateMessageWithReceipts(timestamp=${message.timestamp})`;
|
||||
|
||||
const toRemove: Array<MessageReceiptAttributesType> = [];
|
||||
const receiptsToProcess = receipts.filter(receipt => {
|
||||
if (shouldDropReceipt(receipt, message)) {
|
||||
const { receiptSync } = receipt;
|
||||
log.info(
|
||||
`${logId}: Dropping a receipt ${receipt.type} for sentAt=${receipt.messageSentAt}`
|
||||
`${logId}: Dropping a receipt ${receiptSync.type} for sentAt=${receiptSync.messageSentAt}`
|
||||
);
|
||||
remove(receipt);
|
||||
toRemove.push(receipt);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!cachedReceipts.has(getReceiptCacheKey(receipt))) {
|
||||
if (!cachedReceipts.has(receipt.syncTaskId)) {
|
||||
// Between the time it was received and now, this receipt has already been handled!
|
||||
return false;
|
||||
}
|
||||
|
@ -241,6 +245,8 @@ function updateMessageWithReceipts(
|
|||
return true;
|
||||
});
|
||||
|
||||
await Promise.all(toRemove.map(remove));
|
||||
|
||||
log.info(
|
||||
`${logId}: batch processing ${receipts.length}` +
|
||||
` receipt${receipts.length === 1 ? '' : 's'}`
|
||||
|
@ -287,9 +293,10 @@ const deleteSentProtoBatcher = createWaitBatcher({
|
|||
},
|
||||
});
|
||||
|
||||
function remove(receipt: MessageReceiptAttributesType): void {
|
||||
cachedReceipts.delete(getReceiptCacheKey(receipt));
|
||||
receipt.removeFromMessageReceiverCache();
|
||||
async function remove(receipt: MessageReceiptAttributesType): Promise<void> {
|
||||
const { syncTaskId } = receipt;
|
||||
cachedReceipts.delete(syncTaskId);
|
||||
await removeSyncTaskById(syncTaskId);
|
||||
}
|
||||
|
||||
function getTargetMessage({
|
||||
|
@ -372,13 +379,13 @@ const shouldDropReceipt = (
|
|||
receipt: MessageReceiptAttributesType,
|
||||
message: MessageAttributesType
|
||||
): boolean => {
|
||||
const { type } = receipt;
|
||||
const { type } = receipt.receiptSync;
|
||||
switch (type) {
|
||||
case MessageReceiptType.Delivery:
|
||||
case messageReceiptTypeSchema.Enum.Delivery:
|
||||
return false;
|
||||
case MessageReceiptType.Read:
|
||||
case messageReceiptTypeSchema.Enum.Read:
|
||||
return !window.storage.get('read-receipt-setting');
|
||||
case MessageReceiptType.View:
|
||||
case messageReceiptTypeSchema.Enum.View:
|
||||
if (isStory(message)) {
|
||||
return !window.Events.getStoryViewReceiptsEnabled();
|
||||
}
|
||||
|
@ -388,9 +395,9 @@ const shouldDropReceipt = (
|
|||
}
|
||||
};
|
||||
|
||||
export function forMessage(
|
||||
export async function forMessage(
|
||||
message: MessageModel
|
||||
): Array<MessageReceiptAttributesType> {
|
||||
): Promise<Array<MessageReceiptAttributesType>> {
|
||||
if (!isOutgoing(message.attributes) && !isStory(message.attributes)) {
|
||||
return [];
|
||||
}
|
||||
|
@ -408,20 +415,23 @@ export function forMessage(
|
|||
const receiptValues = Array.from(cachedReceipts.values());
|
||||
|
||||
const sentAt = getMessageSentTimestamp(message.attributes, { log });
|
||||
const result = receiptValues.filter(item => item.messageSentAt === sentAt);
|
||||
const result = receiptValues.filter(
|
||||
item => item.receiptSync.messageSentAt === sentAt
|
||||
);
|
||||
if (result.length > 0) {
|
||||
log.info(`${logId}: found early receipts for message ${sentAt}`);
|
||||
result.forEach(receipt => {
|
||||
remove(receipt);
|
||||
});
|
||||
await Promise.all(
|
||||
result.map(async receipt => {
|
||||
await remove(receipt);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
return result.filter(receipt => {
|
||||
if (shouldDropReceipt(receipt, message.attributes)) {
|
||||
log.info(
|
||||
`${logId}: Dropping an early receipt ${receipt.type} for message ${sentAt}`
|
||||
`${logId}: Dropping an early receipt ${receipt.receiptSync.type} for message ${sentAt}`
|
||||
);
|
||||
remove(receipt);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -433,7 +443,7 @@ function getNewSendStateByConversationId(
|
|||
oldSendStateByConversationId: SendStateByConversationId,
|
||||
receipt: MessageReceiptAttributesType
|
||||
): SendStateByConversationId {
|
||||
const { receiptTimestamp, sourceConversationId, type } = receipt;
|
||||
const { receiptTimestamp, sourceConversationId, type } = receipt.receiptSync;
|
||||
const oldSendState = getOwn(
|
||||
oldSendStateByConversationId,
|
||||
sourceConversationId
|
||||
|
@ -441,13 +451,13 @@ function getNewSendStateByConversationId(
|
|||
|
||||
let sendActionType: SendActionType;
|
||||
switch (type) {
|
||||
case MessageReceiptType.Delivery:
|
||||
case messageReceiptTypeSchema.enum.Delivery:
|
||||
sendActionType = SendActionType.GotDeliveryReceipt;
|
||||
break;
|
||||
case MessageReceiptType.Read:
|
||||
case messageReceiptTypeSchema.enum.Read:
|
||||
sendActionType = SendActionType.GotReadReceipt;
|
||||
break;
|
||||
case MessageReceiptType.View:
|
||||
case messageReceiptTypeSchema.enum.View:
|
||||
sendActionType = SendActionType.GotViewedReceipt;
|
||||
break;
|
||||
default:
|
||||
|
@ -467,7 +477,7 @@ function updateMessageSendStateWithReceipt(
|
|||
message: MessageAttributesType,
|
||||
receipt: MessageReceiptAttributesType
|
||||
): Partial<MessageAttributesType> {
|
||||
const { messageSentAt } = receipt;
|
||||
const { messageSentAt } = receipt.receiptSync;
|
||||
|
||||
const newAttributes: Partial<MessageAttributesType> = {};
|
||||
|
||||
|
@ -510,27 +520,34 @@ async function addToDeleteSentProtoBatcher(
|
|||
receipt: MessageReceiptAttributesType,
|
||||
message: MessageAttributesType
|
||||
) {
|
||||
const { sourceConversationId, type } = receipt;
|
||||
const { receiptSync } = receipt;
|
||||
const {
|
||||
sourceConversationId,
|
||||
type,
|
||||
wasSentEncrypted,
|
||||
messageSentAt,
|
||||
sourceDevice,
|
||||
} = receiptSync;
|
||||
|
||||
if (
|
||||
(type === MessageReceiptType.Delivery &&
|
||||
(type === messageReceiptTypeSchema.enum.Delivery &&
|
||||
wasDeliveredWithSealedSender(sourceConversationId, message) &&
|
||||
receipt.wasSentEncrypted) ||
|
||||
type === MessageReceiptType.Read
|
||||
wasSentEncrypted) ||
|
||||
type === messageReceiptTypeSchema.enum.Read
|
||||
) {
|
||||
const recipient = window.ConversationController.get(sourceConversationId);
|
||||
const recipientServiceId = recipient?.getServiceId();
|
||||
const deviceId = receipt.sourceDevice;
|
||||
const deviceId = sourceDevice;
|
||||
|
||||
if (recipientServiceId && deviceId) {
|
||||
await deleteSentProtoBatcher.add({
|
||||
timestamp: receipt.messageSentAt,
|
||||
timestamp: messageSentAt,
|
||||
recipientServiceId,
|
||||
deviceId,
|
||||
});
|
||||
} else {
|
||||
log.warn(
|
||||
`MessageReceipts.deleteSentProto(sentAt=${receipt.messageSentAt}): ` +
|
||||
`MessageReceipts.deleteSentProto(sentAt=${messageSentAt}): ` +
|
||||
`Missing serviceId or deviceId for deliveredTo ${sourceConversationId}`
|
||||
);
|
||||
}
|
||||
|
@ -540,6 +557,6 @@ async function addToDeleteSentProtoBatcher(
|
|||
export async function onReceipt(
|
||||
receipt: MessageReceiptAttributesType
|
||||
): Promise<void> {
|
||||
cachedReceipts.set(getReceiptCacheKey(receipt), receipt);
|
||||
cachedReceipts.set(receipt.syncTaskId, receipt);
|
||||
await processReceiptBatcher.add(receipt);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue