From ea3a7f70b654f007b04b0554b23468c402beb025 Mon Sep 17 00:00:00 2001 From: trevor-signal <131492920+trevor-signal@users.noreply.github.com> Date: Wed, 16 Jul 2025 13:47:07 -0400 Subject: [PATCH] Avoid race condition when marking messages read --- ts/messageModifiers/ReadSyncs.ts | 5 +--- ts/models/conversations.ts | 12 +++------ ts/sql/Interface.ts | 6 ++--- ts/sql/Server.ts | 24 ++++++++--------- ts/state/ducks/conversations.ts | 3 +-- ts/test-electron/sql/markRead_test.ts | 18 ++++++------- ts/util/markConversationRead.ts | 37 +++++++++++++++++++-------- ts/util/modifyTargetMessage.ts | 3 +-- 8 files changed, 58 insertions(+), 50 deletions(-) diff --git a/ts/messageModifiers/ReadSyncs.ts b/ts/messageModifiers/ReadSyncs.ts index 4af7c176d2..6043864c62 100644 --- a/ts/messageModifiers/ReadSyncs.ts +++ b/ts/messageModifiers/ReadSyncs.ts @@ -152,7 +152,6 @@ export async function onSync(sync: ReadSyncAttributesType): Promise { const message = window.MessageCache.register(new MessageModel(found)); const readAt = Math.min(readSync.readAt, Date.now()); - const newestSentAt = readSync.timestamp; // If message is unread, we mark it read. Otherwise, we update the expiration // timer to the time specified by the read sync if it's earlier than @@ -168,9 +167,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise { // onReadMessage may result in messages older than this one being // marked read. We want those messages to have the same expire timer // start time as this one, so we pass the readAt value through. - drop( - conversation.onReadMessage(message.attributes, readAt, newestSentAt) - ); + drop(conversation.onReadMessage(message.attributes, readAt)); }; // only available during initialization diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 204cbc4723..3cd32b6d69 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -3658,8 +3658,7 @@ export class ConversationModel extends window.Backbone async onReadMessage( message: MessageAttributesType, - readAt?: number, - newestSentAt?: number + readAt?: number ): Promise { // We mark as read everything older than this message - to clean up old stuff // still marked unread in the database. If the user generally doesn't read in @@ -3673,9 +3672,7 @@ export class ConversationModel extends window.Backbone // Lastly, we don't send read syncs for any message marked read due to a read // sync. That's a notification explosion we don't need. return this.queueJob('onReadMessage', () => - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.markRead(message.received_at!, { - newestSentAt: newestSentAt || message.sent_at, + this.markRead(message, { sendReadReceipts: false, readAt, }) @@ -4862,16 +4859,15 @@ export class ConversationModel extends window.Backbone } async markRead( - newestUnreadAt: number, + readMessage: { received_at: number; sent_at: number }, options: { readAt?: number; sendReadReceipts: boolean; - newestSentAt?: number; } = { sendReadReceipts: true, } ): Promise { - await markConversationRead(this.attributes, newestUnreadAt, options); + await markConversationRead(this.attributes, readMessage, options); this.throttledUpdateUnread(); window.reduxActions.callHistory.updateCallHistoryUnreadCount(); } diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 819d23ace1..538200fc01 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -983,18 +983,18 @@ type WritableInterface = { getUnreadByConversationAndMarkRead: (options: { conversationId: string; includeStoryReplies: boolean; - newestUnreadAt: number; + readMessageReceivedAt: number; now?: number; readAt?: number; storyId?: string; }) => GetUnreadByConversationAndMarkReadResultType; getUnreadEditedMessagesAndMarkRead: (options: { conversationId: string; - newestUnreadAt: number; + readMessageReceivedAt: number; }) => GetUnreadByConversationAndMarkReadResultType; getUnreadReactionsAndMarkRead: (options: { conversationId: string; - newestUnreadAt: number; + readMessageReceivedAt: number; storyId?: string; }) => Array; markReactionAsRead: ( diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index fa13e78c24..2919e37acc 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -3113,14 +3113,14 @@ function getUnreadByConversationAndMarkRead( { conversationId, includeStoryReplies, - newestUnreadAt, + readMessageReceivedAt, storyId, readAt, now = Date.now(), }: { conversationId: string; includeStoryReplies: boolean; - newestUnreadAt: number; + readMessageReceivedAt: number; storyId?: string; readAt?: number; now?: number; @@ -3147,7 +3147,7 @@ function getUnreadByConversationAndMarkRead( expirationStartTimestamp > ${expirationStartTimestamp} ) AND expireTimer > 0 AND - received_at <= ${newestUnreadAt}; + received_at <= ${readMessageReceivedAt}; `; db.prepare(updateExpirationQuery).run(updateExpirationParams); @@ -3161,7 +3161,7 @@ function getUnreadByConversationAndMarkRead( seenStatus = ${SeenStatus.Unseen} AND isStory = 0 AND (${_storyIdPredicate(storyId, includeStoryReplies)}) AND - received_at <= ${newestUnreadAt} + received_at <= ${readMessageReceivedAt} ORDER BY received_at DESC, sent_at DESC; `; @@ -3185,7 +3185,7 @@ function getUnreadByConversationAndMarkRead( seenStatus = ${SeenStatus.Unseen} AND isStory = 0 AND (${_storyIdPredicate(storyId, includeStoryReplies)}) AND - received_at <= ${newestUnreadAt}; + received_at <= ${readMessageReceivedAt}; `; db.prepare(updateStatusQuery).run(updateStatusParams); @@ -3211,11 +3211,11 @@ function getUnreadReactionsAndMarkRead( db: WritableDB, { conversationId, - newestUnreadAt, + readMessageReceivedAt, storyId, }: { conversationId: string; - newestUnreadAt: number; + readMessageReceivedAt: number; storyId?: string; } ): Array { @@ -3230,14 +3230,14 @@ function getUnreadReactionsAndMarkRead( WHERE reactions.conversationId IS $conversationId AND reactions.unread > 0 AND - messages.received_at <= $newestUnreadAt AND + messages.received_at <= $readMessageReceivedAt AND messages.storyId IS $storyId ORDER BY messageReceivedAt DESC; ` ) .all({ conversationId, - newestUnreadAt, + readMessageReceivedAt, storyId: storyId || null, }); @@ -8445,10 +8445,10 @@ function getUnreadEditedMessagesAndMarkRead( db: WritableDB, { conversationId, - newestUnreadAt, + readMessageReceivedAt, }: { conversationId: string; - newestUnreadAt: number; + readMessageReceivedAt: number; } ): GetUnreadByConversationAndMarkReadResultType { return db.transaction(() => { @@ -8467,7 +8467,7 @@ function getUnreadEditedMessagesAndMarkRead( WHERE edited_messages.readStatus = ${ReadStatus.Unread} AND edited_messages.conversationId = ${conversationId} AND - received_at <= ${newestUnreadAt} + received_at <= ${readMessageReceivedAt} ORDER BY messages.received_at DESC, messages.sent_at DESC; `; diff --git a/ts/state/ducks/conversations.ts b/ts/state/ducks/conversations.ts index c026eeb221..e3953cad38 100644 --- a/ts/state/ducks/conversations.ts +++ b/ts/state/ducks/conversations.ts @@ -1468,8 +1468,7 @@ function markMessageRead( throw new Error(`markMessageRead: failed to load message ${messageId}`); } - await conversation.markRead(message.get('received_at'), { - newestSentAt: message.get('sent_at'), + await conversation.markRead(message.attributes, { sendReadReceipts: true, }); diff --git a/ts/test-electron/sql/markRead_test.ts b/ts/test-electron/sql/markRead_test.ts index df1abf1b99..0de9c7a8a7 100644 --- a/ts/test-electron/sql/markRead_test.ts +++ b/ts/test-electron/sql/markRead_test.ts @@ -143,7 +143,7 @@ describe('sql/markRead', () => { const markedRead = await getUnreadByConversationAndMarkRead({ conversationId, - newestUnreadAt: unreadStoryReply.received_at, + readMessageReceivedAt: unreadStoryReply.received_at, readAt, includeStoryReplies: false, }); @@ -172,7 +172,7 @@ describe('sql/markRead', () => { const markedRead2 = await getUnreadByConversationAndMarkRead({ conversationId, - newestUnreadAt: newestUnread.received_at, + readMessageReceivedAt: newestUnread.received_at, readAt, includeStoryReplies: true, }); @@ -300,7 +300,7 @@ describe('sql/markRead', () => { const markedRead = await getUnreadByConversationAndMarkRead({ conversationId, - newestUnreadAt: message7.received_at, + readMessageReceivedAt: message7.received_at, readAt, storyId, includeStoryReplies: false, @@ -410,7 +410,7 @@ describe('sql/markRead', () => { const markedRead = await getUnreadByConversationAndMarkRead({ conversationId, - newestUnreadAt: message4.received_at, + readMessageReceivedAt: message4.received_at, readAt, includeStoryReplies: false, now, @@ -587,7 +587,7 @@ describe('sql/markRead', () => { assert.lengthOf(await _getAllReactions(), 5); const markedRead = await getUnreadReactionsAndMarkRead({ conversationId, - newestUnreadAt: reaction4.messageReceivedAt, + readMessageReceivedAt: reaction4.messageReceivedAt, }); assert.lengthOf(markedRead, 2, 'two reactions marked read'); @@ -606,7 +606,7 @@ describe('sql/markRead', () => { const markedRead2 = await getUnreadReactionsAndMarkRead({ conversationId, - newestUnreadAt: reaction5.messageReceivedAt, + readMessageReceivedAt: reaction5.messageReceivedAt, }); assert.lengthOf(markedRead2, 1); @@ -742,7 +742,7 @@ describe('sql/markRead', () => { assert.lengthOf(await _getAllReactions(), 5); const markedRead = await getUnreadReactionsAndMarkRead({ conversationId, - newestUnreadAt: reaction4.messageReceivedAt, + readMessageReceivedAt: reaction4.messageReceivedAt, storyId, }); @@ -762,7 +762,7 @@ describe('sql/markRead', () => { const markedRead2 = await getUnreadReactionsAndMarkRead({ conversationId, - newestUnreadAt: reaction5.messageReceivedAt, + readMessageReceivedAt: reaction5.messageReceivedAt, storyId, }); @@ -837,7 +837,7 @@ describe('sql/markRead', () => { const markedRead = await getUnreadByConversationAndMarkRead({ conversationId, includeStoryReplies: false, - newestUnreadAt: message4.received_at, + readMessageReceivedAt: message4.received_at, readAt, }); diff --git a/ts/util/markConversationRead.ts b/ts/util/markConversationRead.ts index 4a06bf7b44..149097edb6 100644 --- a/ts/util/markConversationRead.ts +++ b/ts/util/markConversationRead.ts @@ -25,16 +25,17 @@ import { import { ReceiptType } from '../types/Receipt'; import type { AciString } from '../types/ServiceId'; import { isAciString } from './isAciString'; +import type { MessageModel } from '../models/messages'; +import { postSaveUpdates } from './cleanup'; const log = createLogger('markConversationRead'); export async function markConversationRead( conversationAttrs: ConversationAttributesType, - newestUnreadAt: number, + readMessage: { received_at: number; sent_at: number }, options: { readAt?: number; sendReadReceipts: boolean; - newestSentAt?: number; } = { sendReadReceipts: true, } @@ -45,26 +46,28 @@ export async function markConversationRead( await Promise.all([ DataWriter.getUnreadByConversationAndMarkRead({ conversationId, - newestUnreadAt, + readMessageReceivedAt: readMessage.received_at, readAt: options.readAt, includeStoryReplies: !isGroup(conversationAttrs), }), DataWriter.getUnreadEditedMessagesAndMarkRead({ conversationId, - newestUnreadAt, + readMessageReceivedAt: readMessage.received_at, }), DataWriter.getUnreadReactionsAndMarkRead({ conversationId, - newestUnreadAt, + readMessageReceivedAt: readMessage.received_at, }), ]); const convoId = getConversationIdForLogging(conversationAttrs); - const logId = `markConversationRead(${convoId})`; + const logId = `(${convoId})`; log.info(logId, { - newestSentAt: options.newestSentAt, - newestUnreadAt, + markingReadBefore: { + sentAt: readMessage.sent_at, + receivedAt: readMessage.received_at, + }, unreadMessages: unreadMessages.length, unreadReactions: unreadReactions.length, }); @@ -103,6 +106,7 @@ export async function markConversationRead( const allUnreadMessages = [...unreadMessages, ...unreadEditedMessages]; + const updatedMessages: Array = []; const allReadMessagesSync = allUnreadMessages .map(messageSyncData => { const message = window.MessageCache.getById(messageSyncData.id); @@ -116,6 +120,7 @@ export async function markConversationRead( 'expirationStartTimestamp' ) ); + updatedMessages.push(message); } const { @@ -159,6 +164,18 @@ export async function markConversationRead( }) .filter(isNotNil); + // We need to save any messages that are in memory, since their read status could have + // been overwritten in the DB by a message save from a stale (unread) in-memory model + if (updatedMessages.length) { + await DataWriter.saveMessages( + updatedMessages.map(msg => msg.attributes), + { + ourAci: window.textsecure.storage.user.getCheckedAci(), + postSaveUpdates, + } + ); + } + // Some messages we're marking read are local notifications with no sender or were just // unseen and not unread. // Also, if a message has errors, we don't want to send anything out about it: @@ -183,11 +200,11 @@ export async function markConversationRead( }> = [...unreadMessagesSyncData, ...unreadReactionSyncData.values()]; if (readSyncs.length && options.sendReadReceipts) { - log.info(`Sending ${readSyncs.length} read syncs`); + log.info(logId, `Sending ${readSyncs.length} read syncs`); // Because syncReadMessages sends to our other devices, and sendReadReceipts goes // to a contact, we need accessKeys for both. if (window.ConversationController.areWePrimaryDevice()) { - log.warn('We are primary device; not sending read syncs'); + log.warn(logId, 'We are primary device; not sending read syncs'); } else { drop(readSyncJobQueue.add({ readSyncs })); } diff --git a/ts/util/modifyTargetMessage.ts b/ts/util/modifyTargetMessage.ts index 25d7e6e328..d11895a215 100644 --- a/ts/util/modifyTargetMessage.ts +++ b/ts/util/modifyTargetMessage.ts @@ -231,7 +231,6 @@ export async function modifyTargetMessage( const markReadAt = message.pendingMarkRead; // eslint-disable-next-line no-param-reassign message.pendingMarkRead = undefined; - const newestSentAt = maybeSingleReadSync?.readSync.timestamp; // This is primarily to allow the conversation to mark all older // messages as read, as is done when we receive a read sync for @@ -243,7 +242,7 @@ export async function modifyTargetMessage( drop( window.ConversationController.get( message.get('conversationId') - )?.onReadMessage(message.attributes, markReadAt, newestSentAt) + )?.onReadMessage(message.attributes, markReadAt) ); }