Avoid race condition when marking messages read

This commit is contained in:
trevor-signal 2025-07-16 13:47:07 -04:00 committed by GitHub
parent eb9476c291
commit ea3a7f70b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 58 additions and 50 deletions

View file

@ -152,7 +152,6 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
const message = window.MessageCache.register(new MessageModel(found)); const message = window.MessageCache.register(new MessageModel(found));
const readAt = Math.min(readSync.readAt, Date.now()); const readAt = Math.min(readSync.readAt, Date.now());
const newestSentAt = readSync.timestamp;
// If message is unread, we mark it read. Otherwise, we update the expiration // If message is unread, we mark it read. Otherwise, we update the expiration
// timer to the time specified by the read sync if it's earlier than // timer to the time specified by the read sync if it's earlier than
@ -168,9 +167,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
// onReadMessage may result in messages older than this one being // onReadMessage may result in messages older than this one being
// marked read. We want those messages to have the same expire timer // marked read. We want those messages to have the same expire timer
// start time as this one, so we pass the readAt value through. // start time as this one, so we pass the readAt value through.
drop( drop(conversation.onReadMessage(message.attributes, readAt));
conversation.onReadMessage(message.attributes, readAt, newestSentAt)
);
}; };
// only available during initialization // only available during initialization

View file

@ -3658,8 +3658,7 @@ export class ConversationModel extends window.Backbone
async onReadMessage( async onReadMessage(
message: MessageAttributesType, message: MessageAttributesType,
readAt?: number, readAt?: number
newestSentAt?: number
): Promise<void> { ): Promise<void> {
// We mark as read everything older than this message - to clean up old stuff // We mark as read everything older than this message - to clean up old stuff
// still marked unread in the database. If the user generally doesn't read in // still marked unread in the database. If the user generally doesn't read in
@ -3673,9 +3672,7 @@ export class ConversationModel extends window.Backbone
// Lastly, we don't send read syncs for any message marked read due to a read // Lastly, we don't send read syncs for any message marked read due to a read
// sync. That's a notification explosion we don't need. // sync. That's a notification explosion we don't need.
return this.queueJob('onReadMessage', () => return this.queueJob('onReadMessage', () =>
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.markRead(message, {
this.markRead(message.received_at!, {
newestSentAt: newestSentAt || message.sent_at,
sendReadReceipts: false, sendReadReceipts: false,
readAt, readAt,
}) })
@ -4862,16 +4859,15 @@ export class ConversationModel extends window.Backbone
} }
async markRead( async markRead(
newestUnreadAt: number, readMessage: { received_at: number; sent_at: number },
options: { options: {
readAt?: number; readAt?: number;
sendReadReceipts: boolean; sendReadReceipts: boolean;
newestSentAt?: number;
} = { } = {
sendReadReceipts: true, sendReadReceipts: true,
} }
): Promise<void> { ): Promise<void> {
await markConversationRead(this.attributes, newestUnreadAt, options); await markConversationRead(this.attributes, readMessage, options);
this.throttledUpdateUnread(); this.throttledUpdateUnread();
window.reduxActions.callHistory.updateCallHistoryUnreadCount(); window.reduxActions.callHistory.updateCallHistoryUnreadCount();
} }

View file

@ -983,18 +983,18 @@ type WritableInterface = {
getUnreadByConversationAndMarkRead: (options: { getUnreadByConversationAndMarkRead: (options: {
conversationId: string; conversationId: string;
includeStoryReplies: boolean; includeStoryReplies: boolean;
newestUnreadAt: number; readMessageReceivedAt: number;
now?: number; now?: number;
readAt?: number; readAt?: number;
storyId?: string; storyId?: string;
}) => GetUnreadByConversationAndMarkReadResultType; }) => GetUnreadByConversationAndMarkReadResultType;
getUnreadEditedMessagesAndMarkRead: (options: { getUnreadEditedMessagesAndMarkRead: (options: {
conversationId: string; conversationId: string;
newestUnreadAt: number; readMessageReceivedAt: number;
}) => GetUnreadByConversationAndMarkReadResultType; }) => GetUnreadByConversationAndMarkReadResultType;
getUnreadReactionsAndMarkRead: (options: { getUnreadReactionsAndMarkRead: (options: {
conversationId: string; conversationId: string;
newestUnreadAt: number; readMessageReceivedAt: number;
storyId?: string; storyId?: string;
}) => Array<ReactionResultType>; }) => Array<ReactionResultType>;
markReactionAsRead: ( markReactionAsRead: (

View file

@ -3113,14 +3113,14 @@ function getUnreadByConversationAndMarkRead(
{ {
conversationId, conversationId,
includeStoryReplies, includeStoryReplies,
newestUnreadAt, readMessageReceivedAt,
storyId, storyId,
readAt, readAt,
now = Date.now(), now = Date.now(),
}: { }: {
conversationId: string; conversationId: string;
includeStoryReplies: boolean; includeStoryReplies: boolean;
newestUnreadAt: number; readMessageReceivedAt: number;
storyId?: string; storyId?: string;
readAt?: number; readAt?: number;
now?: number; now?: number;
@ -3147,7 +3147,7 @@ function getUnreadByConversationAndMarkRead(
expirationStartTimestamp > ${expirationStartTimestamp} expirationStartTimestamp > ${expirationStartTimestamp}
) AND ) AND
expireTimer > 0 AND expireTimer > 0 AND
received_at <= ${newestUnreadAt}; received_at <= ${readMessageReceivedAt};
`; `;
db.prepare(updateExpirationQuery).run(updateExpirationParams); db.prepare(updateExpirationQuery).run(updateExpirationParams);
@ -3161,7 +3161,7 @@ function getUnreadByConversationAndMarkRead(
seenStatus = ${SeenStatus.Unseen} AND seenStatus = ${SeenStatus.Unseen} AND
isStory = 0 AND isStory = 0 AND
(${_storyIdPredicate(storyId, includeStoryReplies)}) AND (${_storyIdPredicate(storyId, includeStoryReplies)}) AND
received_at <= ${newestUnreadAt} received_at <= ${readMessageReceivedAt}
ORDER BY received_at DESC, sent_at DESC; ORDER BY received_at DESC, sent_at DESC;
`; `;
@ -3185,7 +3185,7 @@ function getUnreadByConversationAndMarkRead(
seenStatus = ${SeenStatus.Unseen} AND seenStatus = ${SeenStatus.Unseen} AND
isStory = 0 AND isStory = 0 AND
(${_storyIdPredicate(storyId, includeStoryReplies)}) AND (${_storyIdPredicate(storyId, includeStoryReplies)}) AND
received_at <= ${newestUnreadAt}; received_at <= ${readMessageReceivedAt};
`; `;
db.prepare(updateStatusQuery).run(updateStatusParams); db.prepare(updateStatusQuery).run(updateStatusParams);
@ -3211,11 +3211,11 @@ function getUnreadReactionsAndMarkRead(
db: WritableDB, db: WritableDB,
{ {
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt,
storyId, storyId,
}: { }: {
conversationId: string; conversationId: string;
newestUnreadAt: number; readMessageReceivedAt: number;
storyId?: string; storyId?: string;
} }
): Array<ReactionResultType> { ): Array<ReactionResultType> {
@ -3230,14 +3230,14 @@ function getUnreadReactionsAndMarkRead(
WHERE WHERE
reactions.conversationId IS $conversationId AND reactions.conversationId IS $conversationId AND
reactions.unread > 0 AND reactions.unread > 0 AND
messages.received_at <= $newestUnreadAt AND messages.received_at <= $readMessageReceivedAt AND
messages.storyId IS $storyId messages.storyId IS $storyId
ORDER BY messageReceivedAt DESC; ORDER BY messageReceivedAt DESC;
` `
) )
.all({ .all({
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt,
storyId: storyId || null, storyId: storyId || null,
}); });
@ -8445,10 +8445,10 @@ function getUnreadEditedMessagesAndMarkRead(
db: WritableDB, db: WritableDB,
{ {
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt,
}: { }: {
conversationId: string; conversationId: string;
newestUnreadAt: number; readMessageReceivedAt: number;
} }
): GetUnreadByConversationAndMarkReadResultType { ): GetUnreadByConversationAndMarkReadResultType {
return db.transaction(() => { return db.transaction(() => {
@ -8467,7 +8467,7 @@ function getUnreadEditedMessagesAndMarkRead(
WHERE WHERE
edited_messages.readStatus = ${ReadStatus.Unread} AND edited_messages.readStatus = ${ReadStatus.Unread} AND
edited_messages.conversationId = ${conversationId} AND edited_messages.conversationId = ${conversationId} AND
received_at <= ${newestUnreadAt} received_at <= ${readMessageReceivedAt}
ORDER BY messages.received_at DESC, messages.sent_at DESC; ORDER BY messages.received_at DESC, messages.sent_at DESC;
`; `;

View file

@ -1468,8 +1468,7 @@ function markMessageRead(
throw new Error(`markMessageRead: failed to load message ${messageId}`); throw new Error(`markMessageRead: failed to load message ${messageId}`);
} }
await conversation.markRead(message.get('received_at'), { await conversation.markRead(message.attributes, {
newestSentAt: message.get('sent_at'),
sendReadReceipts: true, sendReadReceipts: true,
}); });

View file

@ -143,7 +143,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({ const markedRead = await getUnreadByConversationAndMarkRead({
conversationId, conversationId,
newestUnreadAt: unreadStoryReply.received_at, readMessageReceivedAt: unreadStoryReply.received_at,
readAt, readAt,
includeStoryReplies: false, includeStoryReplies: false,
}); });
@ -172,7 +172,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadByConversationAndMarkRead({ const markedRead2 = await getUnreadByConversationAndMarkRead({
conversationId, conversationId,
newestUnreadAt: newestUnread.received_at, readMessageReceivedAt: newestUnread.received_at,
readAt, readAt,
includeStoryReplies: true, includeStoryReplies: true,
}); });
@ -300,7 +300,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({ const markedRead = await getUnreadByConversationAndMarkRead({
conversationId, conversationId,
newestUnreadAt: message7.received_at, readMessageReceivedAt: message7.received_at,
readAt, readAt,
storyId, storyId,
includeStoryReplies: false, includeStoryReplies: false,
@ -410,7 +410,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({ const markedRead = await getUnreadByConversationAndMarkRead({
conversationId, conversationId,
newestUnreadAt: message4.received_at, readMessageReceivedAt: message4.received_at,
readAt, readAt,
includeStoryReplies: false, includeStoryReplies: false,
now, now,
@ -587,7 +587,7 @@ describe('sql/markRead', () => {
assert.lengthOf(await _getAllReactions(), 5); assert.lengthOf(await _getAllReactions(), 5);
const markedRead = await getUnreadReactionsAndMarkRead({ const markedRead = await getUnreadReactionsAndMarkRead({
conversationId, conversationId,
newestUnreadAt: reaction4.messageReceivedAt, readMessageReceivedAt: reaction4.messageReceivedAt,
}); });
assert.lengthOf(markedRead, 2, 'two reactions marked read'); assert.lengthOf(markedRead, 2, 'two reactions marked read');
@ -606,7 +606,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadReactionsAndMarkRead({ const markedRead2 = await getUnreadReactionsAndMarkRead({
conversationId, conversationId,
newestUnreadAt: reaction5.messageReceivedAt, readMessageReceivedAt: reaction5.messageReceivedAt,
}); });
assert.lengthOf(markedRead2, 1); assert.lengthOf(markedRead2, 1);
@ -742,7 +742,7 @@ describe('sql/markRead', () => {
assert.lengthOf(await _getAllReactions(), 5); assert.lengthOf(await _getAllReactions(), 5);
const markedRead = await getUnreadReactionsAndMarkRead({ const markedRead = await getUnreadReactionsAndMarkRead({
conversationId, conversationId,
newestUnreadAt: reaction4.messageReceivedAt, readMessageReceivedAt: reaction4.messageReceivedAt,
storyId, storyId,
}); });
@ -762,7 +762,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadReactionsAndMarkRead({ const markedRead2 = await getUnreadReactionsAndMarkRead({
conversationId, conversationId,
newestUnreadAt: reaction5.messageReceivedAt, readMessageReceivedAt: reaction5.messageReceivedAt,
storyId, storyId,
}); });
@ -837,7 +837,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({ const markedRead = await getUnreadByConversationAndMarkRead({
conversationId, conversationId,
includeStoryReplies: false, includeStoryReplies: false,
newestUnreadAt: message4.received_at, readMessageReceivedAt: message4.received_at,
readAt, readAt,
}); });

View file

@ -25,16 +25,17 @@ import {
import { ReceiptType } from '../types/Receipt'; import { ReceiptType } from '../types/Receipt';
import type { AciString } from '../types/ServiceId'; import type { AciString } from '../types/ServiceId';
import { isAciString } from './isAciString'; import { isAciString } from './isAciString';
import type { MessageModel } from '../models/messages';
import { postSaveUpdates } from './cleanup';
const log = createLogger('markConversationRead'); const log = createLogger('markConversationRead');
export async function markConversationRead( export async function markConversationRead(
conversationAttrs: ConversationAttributesType, conversationAttrs: ConversationAttributesType,
newestUnreadAt: number, readMessage: { received_at: number; sent_at: number },
options: { options: {
readAt?: number; readAt?: number;
sendReadReceipts: boolean; sendReadReceipts: boolean;
newestSentAt?: number;
} = { } = {
sendReadReceipts: true, sendReadReceipts: true,
} }
@ -45,26 +46,28 @@ export async function markConversationRead(
await Promise.all([ await Promise.all([
DataWriter.getUnreadByConversationAndMarkRead({ DataWriter.getUnreadByConversationAndMarkRead({
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt: readMessage.received_at,
readAt: options.readAt, readAt: options.readAt,
includeStoryReplies: !isGroup(conversationAttrs), includeStoryReplies: !isGroup(conversationAttrs),
}), }),
DataWriter.getUnreadEditedMessagesAndMarkRead({ DataWriter.getUnreadEditedMessagesAndMarkRead({
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt: readMessage.received_at,
}), }),
DataWriter.getUnreadReactionsAndMarkRead({ DataWriter.getUnreadReactionsAndMarkRead({
conversationId, conversationId,
newestUnreadAt, readMessageReceivedAt: readMessage.received_at,
}), }),
]); ]);
const convoId = getConversationIdForLogging(conversationAttrs); const convoId = getConversationIdForLogging(conversationAttrs);
const logId = `markConversationRead(${convoId})`; const logId = `(${convoId})`;
log.info(logId, { log.info(logId, {
newestSentAt: options.newestSentAt, markingReadBefore: {
newestUnreadAt, sentAt: readMessage.sent_at,
receivedAt: readMessage.received_at,
},
unreadMessages: unreadMessages.length, unreadMessages: unreadMessages.length,
unreadReactions: unreadReactions.length, unreadReactions: unreadReactions.length,
}); });
@ -103,6 +106,7 @@ export async function markConversationRead(
const allUnreadMessages = [...unreadMessages, ...unreadEditedMessages]; const allUnreadMessages = [...unreadMessages, ...unreadEditedMessages];
const updatedMessages: Array<MessageModel> = [];
const allReadMessagesSync = allUnreadMessages const allReadMessagesSync = allUnreadMessages
.map(messageSyncData => { .map(messageSyncData => {
const message = window.MessageCache.getById(messageSyncData.id); const message = window.MessageCache.getById(messageSyncData.id);
@ -116,6 +120,7 @@ export async function markConversationRead(
'expirationStartTimestamp' 'expirationStartTimestamp'
) )
); );
updatedMessages.push(message);
} }
const { const {
@ -159,6 +164,18 @@ export async function markConversationRead(
}) })
.filter(isNotNil); .filter(isNotNil);
// We need to save any messages that are in memory, since their read status could have
// been overwritten in the DB by a message save from a stale (unread) in-memory model
if (updatedMessages.length) {
await DataWriter.saveMessages(
updatedMessages.map(msg => msg.attributes),
{
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
}
);
}
// Some messages we're marking read are local notifications with no sender or were just // Some messages we're marking read are local notifications with no sender or were just
// unseen and not unread. // unseen and not unread.
// Also, if a message has errors, we don't want to send anything out about it: // Also, if a message has errors, we don't want to send anything out about it:
@ -183,11 +200,11 @@ export async function markConversationRead(
}> = [...unreadMessagesSyncData, ...unreadReactionSyncData.values()]; }> = [...unreadMessagesSyncData, ...unreadReactionSyncData.values()];
if (readSyncs.length && options.sendReadReceipts) { if (readSyncs.length && options.sendReadReceipts) {
log.info(`Sending ${readSyncs.length} read syncs`); log.info(logId, `Sending ${readSyncs.length} read syncs`);
// Because syncReadMessages sends to our other devices, and sendReadReceipts goes // Because syncReadMessages sends to our other devices, and sendReadReceipts goes
// to a contact, we need accessKeys for both. // to a contact, we need accessKeys for both.
if (window.ConversationController.areWePrimaryDevice()) { if (window.ConversationController.areWePrimaryDevice()) {
log.warn('We are primary device; not sending read syncs'); log.warn(logId, 'We are primary device; not sending read syncs');
} else { } else {
drop(readSyncJobQueue.add({ readSyncs })); drop(readSyncJobQueue.add({ readSyncs }));
} }

View file

@ -231,7 +231,6 @@ export async function modifyTargetMessage(
const markReadAt = message.pendingMarkRead; const markReadAt = message.pendingMarkRead;
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
message.pendingMarkRead = undefined; message.pendingMarkRead = undefined;
const newestSentAt = maybeSingleReadSync?.readSync.timestamp;
// This is primarily to allow the conversation to mark all older // This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for // messages as read, as is done when we receive a read sync for
@ -243,7 +242,7 @@ export async function modifyTargetMessage(
drop( drop(
window.ConversationController.get( window.ConversationController.get(
message.get('conversationId') message.get('conversationId')
)?.onReadMessage(message.attributes, markReadAt, newestSentAt) )?.onReadMessage(message.attributes, markReadAt)
); );
} }