Move receipts and view/read syncs to new syncTasks system

Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
automated-signal 2024-06-17 19:36:57 -05:00 committed by GitHub
parent 949104c316
commit b95dd1a70f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 1242 additions and 612 deletions

View file

@ -1,6 +1,7 @@
// Copyright 2016 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { z } from 'zod';
import { groupBy } from 'lodash';
import type { MessageModel } from '../models/messages';
@ -10,7 +11,7 @@ import { isOutgoing, isStory } from '../state/selectors/message';
import { getOwn } from '../util/getOwn';
import { missingCaseError } from '../util/missingCaseError';
import { createWaitBatcher } from '../util/waitBatcher';
import type { ServiceIdString } from '../types/ServiceId';
import { isServiceIdString } from '../types/ServiceId';
import {
SendActionType,
SendStatus,
@ -23,7 +24,6 @@ import * as log from '../logging/log';
import { getSourceServiceId } from '../messages/helpers';
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
import { getMessageIdForLogging } from '../util/idForLogging';
import { generateCacheKey } from './generateCacheKey';
import { getPropForTimestamp } from '../util/editHelpers';
import {
DELETE_SENT_PROTO_BATCHER_WAIT_MS,
@ -31,34 +31,30 @@ import {
} from '../types/Receipt';
import { drop } from '../util/drop';
const { deleteSentProtoRecipient } = dataInterface;
const { deleteSentProtoRecipient, removeSyncTaskById } = dataInterface;
export enum MessageReceiptType {
Delivery = 'Delivery',
Read = 'Read',
View = 'View',
}
export const messageReceiptTypeSchema = z.enum(['Delivery', 'Read', 'View']);
export type MessageReceiptType = z.infer<typeof messageReceiptTypeSchema>;
export const receiptSyncTaskSchema = z.object({
messageSentAt: z.number(),
receiptTimestamp: z.number(),
sourceConversationId: z.string(),
sourceDevice: z.number(),
sourceServiceId: z.string().refine(isServiceIdString),
type: messageReceiptTypeSchema,
wasSentEncrypted: z.boolean(),
});
export type ReceiptSyncTaskType = z.infer<typeof receiptSyncTaskSchema>;
export type MessageReceiptAttributesType = {
envelopeId: string;
messageSentAt: number;
receiptTimestamp: number;
removeFromMessageReceiverCache: () => void;
sourceConversationId: string;
sourceDevice: number;
sourceServiceId: ServiceIdString;
type: MessageReceiptType;
wasSentEncrypted: boolean;
syncTaskId: string;
receiptSync: ReceiptSyncTaskType;
};
function getReceiptCacheKey(receipt: MessageReceiptAttributesType): string {
return generateCacheKey({
sender: receipt.sourceServiceId,
timestamp: receipt.messageSentAt,
type: receipt.type,
});
}
const cachedReceipts = new Map<string, MessageReceiptAttributesType>();
const processReceiptBatcher = createWaitBatcher({
@ -69,7 +65,7 @@ const processReceiptBatcher = createWaitBatcher({
// First group by sentAt, so that we can find the target message
const receiptsByMessageSentAt = groupBy(
receipts,
receipt => receipt.messageSentAt
receipt => receipt.receiptSync.messageSentAt
);
// Once we find the message, we'll group them by messageId to process
@ -99,7 +95,7 @@ const processReceiptBatcher = createWaitBatcher({
continue;
}
// All receipts have the same sentAt, so we can grab it from the first
const sentAt = receiptsForMessageSentAt[0].messageSentAt;
const sentAt = receiptsForMessageSentAt[0].receiptSync.messageSentAt;
const messagesMatchingTimestamp =
// eslint-disable-next-line no-await-in-loop
@ -114,14 +110,16 @@ const processReceiptBatcher = createWaitBatcher({
if (reaction) {
for (const receipt of receiptsForMessageSentAt) {
const { receiptSync } = receipt;
log.info(
'MesageReceipts.processReceiptBatcher: Got receipt for reaction',
receipt.messageSentAt,
receipt.type,
receipt.sourceConversationId,
receipt.sourceServiceId
receiptSync.messageSentAt,
receiptSync.type,
receiptSync.sourceConversationId,
receiptSync.sourceServiceId
);
remove(receipt);
// eslint-disable-next-line no-await-in-loop
await remove(receipt);
}
continue;
}
@ -129,7 +127,7 @@ const processReceiptBatcher = createWaitBatcher({
for (const receipt of receiptsForMessageSentAt) {
const targetMessage = getTargetMessage({
sourceConversationId: receipt.sourceConversationId,
sourceConversationId: receipt.receiptSync.sourceConversationId,
targetTimestamp: sentAt,
messagesMatchingTimestamp,
});
@ -144,7 +142,9 @@ const processReceiptBatcher = createWaitBatcher({
item.sendStateByConversationId &&
!item.deletedForEveryone &&
Boolean(
item.sendStateByConversationId[receipt.sourceConversationId]
item.sendStateByConversationId[
receipt.receiptSync.sourceConversationId
]
)
);
@ -154,12 +154,13 @@ const processReceiptBatcher = createWaitBatcher({
);
} else {
// Nope, no target message was found
const { receiptSync } = receipt;
log.info(
'MessageReceipts.processReceiptBatcher: No message for receipt',
receipt.messageSentAt,
receipt.type,
receipt.sourceConversationId,
receipt.sourceServiceId
receiptSync.messageSentAt,
receiptSync.type,
receiptSync.sourceConversationId,
receiptSync.sourceServiceId
);
}
}
@ -190,7 +191,7 @@ async function processReceiptsForMessage(
messageId
);
const { updatedMessage, validReceipts } = updateMessageWithReceipts(
const { updatedMessage, validReceipts } = await updateMessageWithReceipts(
message,
receipts
);
@ -204,7 +205,8 @@ async function processReceiptsForMessage(
// Confirm/remove receipts, and delete sent protos
for (const receipt of validReceipts) {
remove(receipt);
// eslint-disable-next-line no-await-in-loop
await remove(receipt);
drop(addToDeleteSentProtoBatcher(receipt, updatedMessage));
}
@ -215,25 +217,27 @@ async function processReceiptsForMessage(
conversation?.debouncedUpdateLastMessage?.();
}
function updateMessageWithReceipts(
async function updateMessageWithReceipts(
message: MessageAttributesType,
receipts: Array<MessageReceiptAttributesType>
): {
): Promise<{
updatedMessage: MessageAttributesType;
validReceipts: Array<MessageReceiptAttributesType>;
} {
}> {
const logId = `updateMessageWithReceipts(timestamp=${message.timestamp})`;
const toRemove: Array<MessageReceiptAttributesType> = [];
const receiptsToProcess = receipts.filter(receipt => {
if (shouldDropReceipt(receipt, message)) {
const { receiptSync } = receipt;
log.info(
`${logId}: Dropping a receipt ${receipt.type} for sentAt=${receipt.messageSentAt}`
`${logId}: Dropping a receipt ${receiptSync.type} for sentAt=${receiptSync.messageSentAt}`
);
remove(receipt);
toRemove.push(receipt);
return false;
}
if (!cachedReceipts.has(getReceiptCacheKey(receipt))) {
if (!cachedReceipts.has(receipt.syncTaskId)) {
// Between the time it was received and now, this receipt has already been handled!
return false;
}
@ -241,6 +245,8 @@ function updateMessageWithReceipts(
return true;
});
await Promise.all(toRemove.map(remove));
log.info(
`${logId}: batch processing ${receipts.length}` +
` receipt${receipts.length === 1 ? '' : 's'}`
@ -287,9 +293,10 @@ const deleteSentProtoBatcher = createWaitBatcher({
},
});
function remove(receipt: MessageReceiptAttributesType): void {
cachedReceipts.delete(getReceiptCacheKey(receipt));
receipt.removeFromMessageReceiverCache();
async function remove(receipt: MessageReceiptAttributesType): Promise<void> {
const { syncTaskId } = receipt;
cachedReceipts.delete(syncTaskId);
await removeSyncTaskById(syncTaskId);
}
function getTargetMessage({
@ -372,13 +379,13 @@ const shouldDropReceipt = (
receipt: MessageReceiptAttributesType,
message: MessageAttributesType
): boolean => {
const { type } = receipt;
const { type } = receipt.receiptSync;
switch (type) {
case MessageReceiptType.Delivery:
case messageReceiptTypeSchema.Enum.Delivery:
return false;
case MessageReceiptType.Read:
case messageReceiptTypeSchema.Enum.Read:
return !window.storage.get('read-receipt-setting');
case MessageReceiptType.View:
case messageReceiptTypeSchema.Enum.View:
if (isStory(message)) {
return !window.Events.getStoryViewReceiptsEnabled();
}
@ -388,9 +395,9 @@ const shouldDropReceipt = (
}
};
export function forMessage(
export async function forMessage(
message: MessageModel
): Array<MessageReceiptAttributesType> {
): Promise<Array<MessageReceiptAttributesType>> {
if (!isOutgoing(message.attributes) && !isStory(message.attributes)) {
return [];
}
@ -408,20 +415,23 @@ export function forMessage(
const receiptValues = Array.from(cachedReceipts.values());
const sentAt = getMessageSentTimestamp(message.attributes, { log });
const result = receiptValues.filter(item => item.messageSentAt === sentAt);
const result = receiptValues.filter(
item => item.receiptSync.messageSentAt === sentAt
);
if (result.length > 0) {
log.info(`${logId}: found early receipts for message ${sentAt}`);
result.forEach(receipt => {
remove(receipt);
});
await Promise.all(
result.map(async receipt => {
await remove(receipt);
})
);
}
return result.filter(receipt => {
if (shouldDropReceipt(receipt, message.attributes)) {
log.info(
`${logId}: Dropping an early receipt ${receipt.type} for message ${sentAt}`
`${logId}: Dropping an early receipt ${receipt.receiptSync.type} for message ${sentAt}`
);
remove(receipt);
return false;
}
@ -433,7 +443,7 @@ function getNewSendStateByConversationId(
oldSendStateByConversationId: SendStateByConversationId,
receipt: MessageReceiptAttributesType
): SendStateByConversationId {
const { receiptTimestamp, sourceConversationId, type } = receipt;
const { receiptTimestamp, sourceConversationId, type } = receipt.receiptSync;
const oldSendState = getOwn(
oldSendStateByConversationId,
sourceConversationId
@ -441,13 +451,13 @@ function getNewSendStateByConversationId(
let sendActionType: SendActionType;
switch (type) {
case MessageReceiptType.Delivery:
case messageReceiptTypeSchema.enum.Delivery:
sendActionType = SendActionType.GotDeliveryReceipt;
break;
case MessageReceiptType.Read:
case messageReceiptTypeSchema.enum.Read:
sendActionType = SendActionType.GotReadReceipt;
break;
case MessageReceiptType.View:
case messageReceiptTypeSchema.enum.View:
sendActionType = SendActionType.GotViewedReceipt;
break;
default:
@ -467,7 +477,7 @@ function updateMessageSendStateWithReceipt(
message: MessageAttributesType,
receipt: MessageReceiptAttributesType
): Partial<MessageAttributesType> {
const { messageSentAt } = receipt;
const { messageSentAt } = receipt.receiptSync;
const newAttributes: Partial<MessageAttributesType> = {};
@ -510,27 +520,34 @@ async function addToDeleteSentProtoBatcher(
receipt: MessageReceiptAttributesType,
message: MessageAttributesType
) {
const { sourceConversationId, type } = receipt;
const { receiptSync } = receipt;
const {
sourceConversationId,
type,
wasSentEncrypted,
messageSentAt,
sourceDevice,
} = receiptSync;
if (
(type === MessageReceiptType.Delivery &&
(type === messageReceiptTypeSchema.enum.Delivery &&
wasDeliveredWithSealedSender(sourceConversationId, message) &&
receipt.wasSentEncrypted) ||
type === MessageReceiptType.Read
wasSentEncrypted) ||
type === messageReceiptTypeSchema.enum.Read
) {
const recipient = window.ConversationController.get(sourceConversationId);
const recipientServiceId = recipient?.getServiceId();
const deviceId = receipt.sourceDevice;
const deviceId = sourceDevice;
if (recipientServiceId && deviceId) {
await deleteSentProtoBatcher.add({
timestamp: receipt.messageSentAt,
timestamp: messageSentAt,
recipientServiceId,
deviceId,
});
} else {
log.warn(
`MessageReceipts.deleteSentProto(sentAt=${receipt.messageSentAt}): ` +
`MessageReceipts.deleteSentProto(sentAt=${messageSentAt}): ` +
`Missing serviceId or deviceId for deliveredTo ${sourceConversationId}`
);
}
@ -540,6 +557,6 @@ async function addToDeleteSentProtoBatcher(
export async function onReceipt(
receipt: MessageReceiptAttributesType
): Promise<void> {
cachedReceipts.set(getReceiptCacheKey(receipt), receipt);
cachedReceipts.set(receipt.syncTaskId, receipt);
await processReceiptBatcher.add(receipt);
}

View file

@ -1,7 +1,8 @@
// Copyright 2017 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { AciString } from '../types/ServiceId';
import { z } from 'zod';
import type { MessageModel } from '../models/messages';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
@ -14,58 +15,69 @@ import { isMessageUnread } from '../util/isMessageUnread';
import { notificationService } from '../services/notifications';
import { queueUpdateMessage } from '../util/messageBatcher';
import { strictAssert } from '../util/assert';
import { generateCacheKey } from './generateCacheKey';
import { isAciString } from '../util/isAciString';
import dataInterface from '../sql/Client';
const { removeSyncTaskById } = dataInterface;
export const readSyncTaskSchema = z.object({
type: z.literal('ReadSync').readonly(),
readAt: z.number(),
sender: z.string().optional(),
senderAci: z.string().refine(isAciString),
senderId: z.string(),
timestamp: z.number(),
});
export type ReadSyncTaskType = z.infer<typeof readSyncTaskSchema>;
export type ReadSyncAttributesType = {
envelopeId: string;
readAt: number;
removeFromMessageReceiverCache: () => unknown;
sender?: string;
senderAci: AciString;
senderId: string;
timestamp: number;
syncTaskId: string;
readSync: ReadSyncTaskType;
};
const readSyncs = new Map<string, ReadSyncAttributesType>();
function remove(sync: ReadSyncAttributesType): void {
readSyncs.delete(
generateCacheKey({
sender: sync.senderId,
timestamp: sync.timestamp,
type: 'readsync',
})
);
sync.removeFromMessageReceiverCache();
async function remove(sync: ReadSyncAttributesType): Promise<void> {
const { syncTaskId } = sync;
readSyncs.delete(syncTaskId);
await removeSyncTaskById(syncTaskId);
}
async function maybeItIsAReactionReadSync(
sync: ReadSyncAttributesType
): Promise<void> {
const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`;
const { readSync } = sync;
const logId = `ReadSyncs.onSync(timestamp=${readSync.timestamp})`;
const readReaction = await window.Signal.Data.markReactionAsRead(
sync.senderAci,
Number(sync.timestamp)
readSync.senderAci,
Number(readSync.timestamp)
);
if (
!readReaction ||
readReaction?.targetAuthorAci !== window.storage.user.getCheckedAci()
) {
log.info(`${logId} not found:`, sync.senderId, sync.sender, sync.senderAci);
log.info(
`${logId} not found:`,
readSync.senderId,
readSync.sender,
readSync.senderAci
);
return;
}
log.info(
`${logId} read reaction sync found:`,
readReaction.conversationId,
sync.senderId,
sync.sender,
sync.senderAci
readSync.senderId,
readSync.sender,
readSync.senderAci
);
remove(sync);
await remove(sync);
notificationService.removeBy({
conversationId: readReaction.conversationId,
@ -75,9 +87,9 @@ async function maybeItIsAReactionReadSync(
});
}
export function forMessage(
export async function forMessage(
message: MessageModel
): ReadSyncAttributesType | null {
): Promise<ReadSyncAttributesType | null> {
const logId = `ReadSyncs.forMessage(${getMessageIdForLogging(
message.attributes
)})`;
@ -92,13 +104,17 @@ export function forMessage(
});
const readSyncValues = Array.from(readSyncs.values());
const foundSync = readSyncValues.find(item => {
return item.senderId === sender?.id && item.timestamp === messageTimestamp;
const { readSync } = item;
return (
readSync.senderId === sender?.id &&
readSync.timestamp === messageTimestamp
);
});
if (foundSync) {
log.info(
`${logId}: Found early read sync for message ${foundSync.timestamp}`
`${logId}: Found early read sync for message ${foundSync.readSync.timestamp}`
);
remove(foundSync);
await remove(foundSync);
return foundSync;
}
@ -106,20 +122,15 @@ export function forMessage(
}
export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
readSyncs.set(
generateCacheKey({
sender: sync.senderId,
timestamp: sync.timestamp,
type: 'readsync',
}),
sync
);
const { readSync, syncTaskId } = sync;
const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`;
readSyncs.set(syncTaskId, sync);
const logId = `ReadSyncs.onSync(timestamp=${readSync.timestamp})`;
try {
const messages = await window.Signal.Data.getMessagesBySentAt(
sync.timestamp
readSync.timestamp
);
const found = messages.find(item => {
@ -129,7 +140,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
reason: logId,
});
return isIncoming(item) && sender?.id === sync.senderId;
return isIncoming(item) && sender?.id === readSync.senderId;
});
if (!found) {
@ -144,8 +155,8 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
found,
'ReadSyncs.onSync'
);
const readAt = Math.min(sync.readAt, Date.now());
const newestSentAt = sync.timestamp;
const readAt = Math.min(readSync.readAt, Date.now());
const newestSentAt = readSync.timestamp;
// If message is unread, we mark it read. Otherwise, we update the expiration
// timer to the time specified by the read sync if it's earlier than
@ -193,9 +204,9 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
queueUpdateMessage(message.attributes);
remove(sync);
await remove(sync);
} catch (error) {
remove(sync);
log.error(`${logId} error:`, Errors.toLogFormat(error));
await remove(sync);
}
}

View file

@ -1,7 +1,8 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { AciString } from '../types/ServiceId';
import { z } from 'zod';
import type { MessageModel } from '../models/messages';
import * as Errors from '../types/errors';
import * as log from '../logging/log';
@ -15,35 +16,38 @@ import { markViewed } from '../services/MessageUpdater';
import { notificationService } from '../services/notifications';
import { queueAttachmentDownloads } from '../util/queueAttachmentDownloads';
import { queueUpdateMessage } from '../util/messageBatcher';
import { generateCacheKey } from './generateCacheKey';
import { AttachmentDownloadUrgency } from '../jobs/AttachmentDownloadManager';
import { isAciString } from '../util/isAciString';
import dataInterface from '../sql/Client';
const { removeSyncTaskById } = dataInterface;
export const viewSyncTaskSchema = z.object({
type: z.literal('ViewSync').readonly(),
senderAci: z.string().refine(isAciString),
senderE164: z.string().optional(),
senderId: z.string(),
timestamp: z.number(),
viewedAt: z.number(),
});
export type ViewSyncTaskType = z.infer<typeof viewSyncTaskSchema>;
export type ViewSyncAttributesType = {
envelopeId: string;
removeFromMessageReceiverCache: () => unknown;
senderAci: AciString;
senderE164?: string;
senderId: string;
timestamp: number;
viewedAt: number;
syncTaskId: string;
viewSync: ViewSyncTaskType;
};
const viewSyncs = new Map<string, ViewSyncAttributesType>();
function remove(sync: ViewSyncAttributesType): void {
viewSyncs.delete(
generateCacheKey({
sender: sync.senderId,
timestamp: sync.timestamp,
type: 'viewsync',
})
);
sync.removeFromMessageReceiverCache();
async function remove(sync: ViewSyncAttributesType): Promise<void> {
await removeSyncTaskById(sync.syncTaskId);
}
export function forMessage(
export async function forMessage(
message: MessageModel
): Array<ViewSyncAttributesType> {
): Promise<Array<ViewSyncAttributesType>> {
const logId = `ViewSyncs.forMessage(${getMessageIdForLogging(
message.attributes
)})`;
@ -60,7 +64,11 @@ export function forMessage(
const viewSyncValues = Array.from(viewSyncs.values());
const matchingSyncs = viewSyncValues.filter(item => {
return item.senderId === sender?.id && item.timestamp === messageTimestamp;
const { viewSync } = item;
return (
viewSync.senderId === sender?.id &&
viewSync.timestamp === messageTimestamp
);
});
if (matchingSyncs.length > 0) {
@ -68,28 +76,24 @@ export function forMessage(
`${logId}: Found ${matchingSyncs.length} early view sync(s) for message ${messageTimestamp}`
);
}
matchingSyncs.forEach(sync => {
remove(sync);
});
await Promise.all(
matchingSyncs.map(async sync => {
await remove(sync);
})
);
return matchingSyncs;
}
export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
viewSyncs.set(
generateCacheKey({
sender: sync.senderId,
timestamp: sync.timestamp,
type: 'viewsync',
}),
sync
);
viewSyncs.set(sync.syncTaskId, sync);
const { viewSync } = sync;
const logId = `ViewSyncs.onSync(timestamp=${sync.timestamp})`;
const logId = `ViewSyncs.onSync(timestamp=${viewSync.timestamp})`;
try {
const messages = await window.Signal.Data.getMessagesBySentAt(
sync.timestamp
viewSync.timestamp
);
const found = messages.find(item => {
@ -99,15 +103,15 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
reason: logId,
});
return sender?.id === sync.senderId;
return sender?.id === viewSync.senderId;
});
if (!found) {
log.info(
`${logId}: nothing found`,
sync.senderId,
sync.senderE164,
sync.senderAci
viewSync.senderId,
viewSync.senderE164,
viewSync.senderAci
);
return;
}
@ -123,7 +127,7 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
if (message.get('readStatus') !== ReadStatus.Viewed) {
didChangeMessage = true;
message.set(markViewed(message.attributes, sync.viewedAt));
message.set(markViewed(message.attributes, viewSync.viewedAt));
const attachments = message.get('attachments');
if (!attachments?.every(isDownloaded)) {
@ -154,9 +158,9 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
queueUpdateMessage(message.attributes);
}
remove(sync);
await remove(sync);
} catch (error) {
remove(sync);
log.error(`${logId} error:`, Errors.toLogFormat(error));
await remove(sync);
}
}